Hardware Drivers

Low-level hardware drivers for individual sensor ICs and communication modules. Each driver handles protocol-specific communication (I2C, SPI, UART, Modbus).

I2C Environmental Sensors

BME280 – Temperature, Humidity, Pressure

Driver for the Bosch BME280 temperature, humidity, and pressure sensor.

Provides I2C communication with the BME280 environmental sensor via the Adafruit CircuitPython BME280 library. The driver handles initialization on either I2C bus 0 or 1, automatic reinitialization on I2C errors, and MCP23017-controlled 3.3V power rail reset for hardware fault recovery.

The BME280 is a combined digital temperature, humidity, and barometric pressure sensor with high accuracy and low power consumption.

Hardware:
  • Interface: I2C (bus 0 or 1)

  • Default address: 0x76 (alternate: 0x77)

  • Supply: 3.3V

  • Accuracy: +/-1C temp, +/-3% RH, +/-1 hPa pressure

Typical usage:

sensor = BME280(i2c_port=0, i2c_addr=0x76)
temp = sensor.get_temperature()
hum = sensor.get_humidity()
pressure = sensor.get_pressure()

Note

Requires adafruit-circuitpython-bme280, board, and busio packages, plus I2C bus access on the Raspberry Pi.

class drivers.BME280.BME280.BME280[source]

Bases: object

I2C driver for the Bosch BME280 environmental sensor.

Wraps the Adafruit CircuitPython BME280 library with automatic retry and reinitialization logic. On persistent I2C errors, resets the 3.3V power rail via MCP23017 GPIO expander to recover the sensor.

Variables:
  • sensor – Active Adafruit BME280 sensor instance, or None if initialization failed.

  • i2c_errorTrue when the sensor is in an error state requiring reinitialization.

Parameters:
  • i2c_port (int)

  • i2c_addr (int)

  • max_retries (int)

sensor = None
__init__(i2c_port=0, i2c_addr=118, max_retries=3)[source]

Initialize the BME280 sensor on the specified I2C bus.

Creates the I2C bus connection, initializes the MCP23017 power control, and attempts the first sensor initialization. If initialization fails, the sensor attribute remains None and subsequent reads will trigger reinitialization attempts.

Parameters:
  • i2c_port (int) – I2C bus number (0 for /dev/i2c-0, 1 for /dev/i2c-1).

  • i2c_addr (int) – 7-bit I2C address. Use 0x76 (SDO to GND) or 0x77 (SDO to VCC).

  • max_retries (int) – Maximum number of read attempts before returning a zero value.

Return type:

None

reinitialize()[source]

Reinitialize the sensor after an I2C error.

Releases the existing I2C bus and sensor references, waits for bus recovery, selects the correct I2C multiplexer channel, and attempts a fresh initialization.

Return type:

bool

Returns:

True if reinitialization succeeded, False otherwise.

get_temperature()[source]

Read the current temperature from the BME280 sensor.

Attempts to read the temperature up to max_retries times. On I2C bus errors, resets the 3.3V power rail and reinitializes the sensor before retrying.

Return type:

float

Returns:

Temperature in degrees Celsius, or 0.0 if all read attempts fail.

get_humidity()[source]

Read the current relative humidity from the BME280 sensor.

Attempts to read the humidity up to max_retries times. On I2C bus errors, resets the 3.3V power rail and reinitializes the sensor before retrying.

Return type:

float

Returns:

Relative humidity in percent (0-100), or 0.0 if all read attempts fail.

get_pressure()[source]

Read the current barometric pressure from the BME280 sensor.

Attempts to read the pressure up to max_retries times. On I2C bus errors, resets the 3.3V power rail and reinitializes the sensor before retrying.

Return type:

float

Returns:

Barometric pressure in hPa (hectopascals), or 0.0 if all read attempts fail.

getAltitude()[source]

Read the calculated altitude from the BME280 sensor.

The altitude is derived from the pressure reading using the international barometric formula. Attempts to read up to max_retries times with automatic reinitialization.

Return type:

float

Returns:

Altitude in meters above sea level, or 0.0 if all read attempts fail.

ATH20 – Temperature & Humidity

Driver for the ASAIR ATH20 (AHT20) temperature and humidity sensor.

Provides I2C communication with the ATH20 capacitive humidity and temperature sensor. The driver handles initialization, soft reset, CRC-verified measurement readout, and automatic reinitialization with MCP23017-controlled power cycling on persistent I2C bus errors.

The ATH20 returns 20-bit raw values for both temperature and humidity, which are converted to physical units using the formulas from the datasheet.

Hardware:
  • Interface: I2C

  • Default address: 0x38

  • Supply: 3.3V

  • Measurement time: ~80 ms per cycle

  • Accuracy: +/-0.3C temperature, +/-2% RH humidity

Typical usage:

sensor = ATH20(port=0)
data = sensor.getTemp()
print(f"Temperature: {data['temp']}C, Humidity: {data['hum']}%")

Note

Requires smbus2 and I2C bus access. Uses MCP23017 GPIO expander for 3.3V power rail reset on unrecoverable I2C errors.

class drivers.ATH20.ATH20.ATH20[source]

Bases: object

I2C driver for the ASAIR ATH20 (AHT20) temperature and humidity sensor.

Manages I2C communication, sensor initialization, calibration verification, and automatic recovery from bus errors via MCP23017-controlled power cycling. Implements a retry mechanism with configurable maximum attempts.

Variables:
  • ATH20_ADDR – I2C slave address of the sensor (0x38).

  • debug – Enable verbose debug logging of I2C transactions.

  • data – Last measurement result with "temp" and "hum" keys.

  • i2c_errorTrue when the sensor has experienced an I2C error requiring reinitialization.

Parameters:
  • port (int)

  • max_retries (int)

ATH20_ADDR = 56
ATH20_SOFTRESET = 186
ATH20_INITIALIZE: ClassVar[list[int]] = [190, 8, 0]
AHT20_MEASURE: ClassVar[list[int]] = [172, 51, 0]
ATH20_STATUS: ClassVar[int] = 113
ATH20_BUSY_BIT: ClassVar[int] = 128
ATH20_CALIBRATION_BIT: ClassVar[int] = 8
debug: ClassVar[bool] = False
data: ClassVar[dict[str, float]] = {'hum': 0.0, 'temp': 0.0}
__init__(port=0, max_retries=3)[source]
Parameters:
  • port (int)

  • max_retries (int)

Return type:

None

reinitialize()[source]

Public method to reinitialize the sensor. Returns True if successful, False otherwise.

Return type:

bool

begin()[source]

Initialize the sensor after reset.

Return type:

bool

get_busy_status()[source]

Check if sensor is busy.

Return type:

bool

get_calibration_status()[source]

Check if sensor is calibrated.

Return type:

bool

reset()[source]

Soft reset the sensor.

Return type:

bool

get_temperature()[source]

Get temperature with automatic reinitialization on I/O errors.

Return type:

float

get_humidity()[source]

Get humidity with automatic reinitialization on I/O errors.

Return type:

float

read_data()[source]

Read a single byte from the sensor.

Return type:

int

write_data(value)[source]

Write a single byte to the sensor.

Parameters:

value (int)

Return type:

bool

write_block_data(value)[source]

Write a block of data to the sensor.

Parameters:

value (list[int])

Return type:

bool

read_block_data()[source]

Read a block of data from the sensor.

Return type:

list[int]

getTemp()[source]

Backward compatible method to get temperature and humidity. Uses the new retry logic.

Return type:

dict[str, float]

getHum(new_measurement=False)[source]

Backward compatible method to get humidity. If new_measurement is True, triggers a new measurement.

Parameters:

new_measurement (bool)

Return type:

dict[str, float]

SHT31 – Temperature & Humidity

Driver for the Sensirion SHT31 temperature and humidity sensor.

Provides I2C communication with the SHT31-D digital humidity and temperature sensor via the Adafruit CircuitPython SHT31D library. Supports reading temperature and humidity values, and activating the on-chip heater for condensation removal.

Hardware:
  • Interface: I2C (bus 0 or 1)

  • Default address: 0x44 (alternate: 0x45)

  • Supply: 3.3V

  • Accuracy: +/-0.3C temperature, +/-2% RH humidity

  • Features: Built-in heater for de-condensation

Typical usage:

sensor = SHT31(i2c_port=0, address=0x44)
temp = sensor.get_temperature()
hum = sensor.get_humidity()

Note

Requires adafruit-circuitpython-sht31d, board, and busio.

class drivers.SHT31.SHT31.SHT31[source]

Bases: object

I2C driver for the Sensirion SHT31-D temperature and humidity sensor.

Wraps the Adafruit SHT31D library for temperature and humidity readout. Includes a heater activation method for clearing condensation from the sensor element in high-humidity environments.

Variables:

sensor – Active Adafruit SHT31D sensor instance, or None if initialization failed.

Parameters:
  • i2c_port (int)

  • address (int)

__init__(i2c_port=0, address=68)[source]
Parameters:
  • i2c_port (int)

  • address (int)

Return type:

None

sensor = None
get_temperature()[source]
Return type:

float

get_humidity()[source]
Return type:

float

start_heater()[source]
Return type:

None

SHT41 – Temperature & Humidity

Driver for the Sensirion SHT41 temperature and humidity sensor.

Provides low-level I2C communication with the SHT4x series digital humidity and temperature sensors using raw smbus2 commands. The driver handles CRC-8 verification of measurement data, multiple precision/heater modes, and provides both a low-level driver (SHT41_driver) and a high-level wrapper (SHT41) with a simplified API.

The SHT4x series supports multiple measurement modes combining different precision levels with optional on-chip heater activation for condensation removal.

Hardware:
  • Interface: I2C

  • Default address: 0x44

  • Supply: 3.3V

  • Accuracy: +/-0.2C temperature, +/-1.8% RH humidity (high precision)

  • Features: Configurable heater (high/med/low, 100ms/1s pulses)

Typical usage:

sensor = SHT41(port=0)
data = sensor.getTemp()
print(f"Temperature: {data['temp']}C, Humidity: {data['hum']}%")

Note

Requires smbus2 for raw I2C access. CRC-8 verification uses polynomial 0x31 (CRC-8/NRSC) as specified in the Sensirion datasheet.

class drivers.SHT41.SHT41.SHT41_driver[source]

Bases: object

Low-level I2C driver for the Sensirion SHT4x sensor.

Handles raw I2C byte-level communication, CRC-8 verification, and sensor mode configuration. Supports all SHT4x measurement modes including no-heat, high-heat, medium-heat, and low-heat variants at different pulse durations.

Variables:
  • SHT41_ADDR – I2C slave address (0x44).

  • current_mode – Currently active measurement mode command byte.

Parameters:

port (int)

SHT41_ADDR = 68
SHT41_READSERIAL = 137
SHT41_SOFTRESET = 148
NOHEAT_HIGHPRECISION = 253
NOHEAT_MEDPRECISION = 246
NOHEAT_LOWPRECISION = 224
HIGHHEAT_1S = 57
HIGHHEAT_100MS = 50
MEDHEAT_1S = 47
MEDHEAT_100MS = 36
LOWHEAT_1S = 30
LOWHEAT_100MS = 21
current_mode = 253
__init__(port=0)[source]
Parameters:

port (int)

Return type:

None

begin()[source]
Return type:

bool

reset()[source]
Return type:

bool

get_serial_number()[source]

The unique 32-bit serial number

Return type:

int

get_mode()[source]
Return type:

int

set_mode(mode)[source]
Parameters:

mode (int)

Return type:

None

get_measurements()[source]

Both temperature and relative_humidity, read simultaneously

Return type:

tuple[float, float]

crc8(buffer)[source]

Verify the crc8 checksum

Parameters:

buffer (bytearray)

Return type:

int

write_byte_data(value)[source]
Parameters:

value (int)

Return type:

None

read_byte()[source]
Return type:

int | None

read_block_data(buffer)[source]

Read data byte-by-byte into the provided buffer using read_byte.

Parameters:

buffer (bytearray)

Return type:

bytearray | None

class drivers.SHT41.SHT41.SHT41[source]

Bases: object

High-level wrapper for the SHT41 temperature and humidity sensor.

Provides a simplified API compatible with the OzTemp wrapper pattern. Wraps SHT41_driver and returns measurement data as dictionaries with "temp" and "hum" keys.

Variables:
  • sht41 – Underlying SHT41_driver instance.

  • data – Last measurement result dictionary.

Parameters:

port (int)

debug = False
data: ClassVar[dict[str, float]] = {'hum': 0.0, 'temp': 0.0}
__init__(port=0)[source]
Parameters:

port (int)

Return type:

None

sht41: SHT41_driver | None = None
getTemp()[source]
Return type:

dict[str, float]

getHumidity(new_measurement=False)[source]
Parameters:

new_measurement (bool)

Return type:

dict[str, float]

LPS25HB – Barometric Pressure

Driver for the STMicroelectronics LPS25HB barometric pressure sensor.

Provides I2C communication with the LPS25HB MEMS pressure sensor via the Adafruit CircuitPython LPS2x library. The driver handles initialization on either I2C bus 0 or 1, automatic reinitialization on I2C errors, and MCP23017-controlled 3.3V power rail reset for hardware fault recovery.

The LPS25HB measures absolute barometric pressure (260-1260 hPa) and includes an embedded temperature sensor for compensation.

Hardware:
  • Interface: I2C (bus 0 or 1)

  • Default address: 0x5D (alternate: 0x5C)

  • Supply: 3.3V

  • Range: 260-1260 hPa

  • Accuracy: +/-0.2 hPa (absolute), +/-2C temperature

Typical usage:

sensor = LPS25HB(i2c_port=0, i2c_addr=0x5D)
temp = sensor.get_temperature()
pressure = sensor.get_pressure()

Note

Requires adafruit-circuitpython-lps2x, board, and busio.

class drivers.LPS25HB.LPS25HB.LPS25HB[source]

Bases: object

I2C driver for the STMicroelectronics LPS25HB pressure sensor.

Wraps the Adafruit LPS2x library with automatic retry and reinitialization logic. On persistent I2C errors, resets the 3.3V power rail via MCP23017 GPIO expander to recover the sensor.

Variables:
  • sensor – Active Adafruit LPS25 sensor instance, or None if initialization failed.

  • i2c_errorTrue when the sensor is in an error state requiring reinitialization.

Parameters:
  • i2c_port (int)

  • i2c_addr (int)

  • max_retries (int)

sensor = None
__init__(i2c_port=0, i2c_addr=93, max_retries=3)[source]
Parameters:
  • i2c_port (int)

  • i2c_addr (int)

  • max_retries (int)

Return type:

None

reinitialize()[source]

Public method to reinitialize the sensor. Returns True if successful, False otherwise.

Return type:

bool

get_temperature()[source]

Get temperature with automatic reinitialization on I/O errors.

Return type:

float

get_pressure()[source]

Get pressure with automatic reinitialization on I/O errors.

Return type:

float

SCD40 – CO2 Sensor

Driver for the Sensirion SCD40 CO2, temperature, and humidity sensor.

Provides I2C communication with the SCD40 photoacoustic NDIR CO2 sensor via the Adafruit CircuitPython SCD4x library. The driver supports periodic measurement mode, automatic and manual calibration, altitude compensation, and factory reset.

The SCD40 uses a photoacoustic sensing principle for CO2 measurement, combined with a Sensirion humidity and temperature sensor on the same chip.

Hardware:
  • Interface: I2C (bus 0)

  • Address: 0x62

  • Supply: 3.3V

  • CO2 range: 400-2000 ppm (extended to 5000 ppm)

  • CO2 accuracy: +/-(50 ppm + 5% of reading)

  • Measurement interval: 5 seconds (periodic mode)

Typical usage:

sensor = SCD40()
sensor.initializeSensor(auto=True, alt=0)
co2, temp, hum = sensor.getCO2(data=True)

Note

Requires adafruit-circuitpython-scd4x, board, and busio. The sensor requires a 5-second warm-up period after starting periodic measurements before the first valid reading is available.

class drivers.SCD40.SCD40.SCD40[source]

Bases: object

I2C driver for the Sensirion SCD40 CO2 sensor.

Wraps the Adafruit SCD4x library providing CO2, temperature, and humidity readout with configurable auto-calibration and altitude compensation. Settings are persisted to the sensor’s EEPROM.

Variables:
  • co2 – Last CO2 reading in ppm.

  • temp – Last temperature reading in degrees Celsius.

  • hum – Last relative humidity reading in percent.

  • DEBUG – Enable verbose sensor data logging.

co2 = 0
temp = 0
hum = 0
DEBUG = False
__init__()[source]
Return type:

None

initializeSensor(auto=False, alt=0)[source]
Parameters:
Return type:

None

factoryReset()[source]
Return type:

None

getCO2(data)[source]
Parameters:

data (bool)

Return type:

list[float]

ELTCO2 – CO2 Sensor

Driver for the ELT CO2 I2C carbon dioxide sensor.

Provides I2C communication with the ELT (Environmental Sensor Technology) CO2 sensor module. The driver reads CO2 concentration from the sensor’s internal registers using a command-response protocol over I2C.

The sensor supports sleep/wake control, manual and automatic calibration (MCDL/ACDL), and returns CO2 concentration in ppm.

Hardware:
  • Interface: I2C

  • Default address: 0x31

  • Supply: 3.3V / 5V

  • CO2 range: 0-5000 ppm (typical)

  • Response time: <60 seconds (T90)

Typical usage:

sensor = ELTCO2(address=0x31, device_number=0)
co2_ppm = sensor.getCO2()

Note

Requires smbus2 for I2C access. The sensor register protocol uses a status byte (data[0] == 0x08) to indicate valid measurement data.

class drivers.ELTCO2.ELTCO2.ELTCO2[source]

Bases: object

I2C driver for the ELT CO2 sensor module.

Reads CO2 concentration from the sensor via I2C register reads. Supports sleep/wake power management and manual/automatic calibration modes. Implements a 3-retry mechanism for CO2 readings.

Variables:

I2C_ADDRESS – I2C slave address of the sensor.

Parameters:
  • address (int)

  • device_number (int)

__init__(address=49, device_number=0)[source]
Parameters:
  • address (int)

  • device_number (int)

Return type:

None

I2C_ADDRESS = 49
read_byte_data(reg)[source]
Parameters:

reg (int)

Return type:

int

write_byte_data(reg, value)[source]
Parameters:
Return type:

None

writeReg16Bit(reg, value)[source]
Parameters:
Return type:

None

readReg16Bit(reg)[source]
Parameters:

reg (int)

Return type:

int

getCO2()[source]
Return type:

float | int

sleep()[source]
Return type:

None

wakeup()[source]
Return type:

None

clear_calib()[source]
Return type:

None

start_mcdl()[source]
Return type:

None

end_mcdl()[source]
Return type:

None

start_acdl()[source]
Return type:

None

end_acdl()[source]
Return type:

None

I2C Light & UV Sensors

LTR390 – UV & Ambient Light

Driver for the Lite-On LTR-390UV-01 UV and ambient light sensor.

Hardware:

Interface: I2C (via Adafruit bus device abstraction) Address: 0x53 (default, managed by adafruit_ltr390 library) Supply: 1.7V - 3.6V Measures: UV index, UVS raw counts, ambient light (lux), raw light counts

Typical usage:

from drivers.LTR390.LTR390 import LTR390

sensor = LTR390(port=1)
lux = sensor.getLux()
uv_index = sensor.getUV()

Note

Requires adafruit-circuitpython-ltr390 library. The Adafruit library handles register-level communication; this wrapper provides simplified read methods with error handling and logging.

class drivers.LTR390.LTR390.LTR390[source]

Bases: object

Driver wrapper for the LTR-390UV-01 UV and ambient light sensor.

Wraps the Adafruit LTR390 CircuitPython library to provide simplified read methods with error handling and logging.

Variables:
  • sensor – Adafruit LTR390 sensor instance, or None if initialization failed.

  • i2c – busio.I2C instance for the selected port.

Parameters:

port (int)

__init__(port=0)[source]

Initialize the LTR390 sensor on the specified I2C port.

Parameters:

port (int) – I2C port number (0 for D1/D0, 1 for SCL/SDA).

Return type:

None

sensor = None
getLux()[source]

Read the ambient light level in lux.

Return type:

float

Returns:

Lux value as a float, or 0.0 if the sensor is uninitialized or read fails.

getUV()[source]

Read the UV index value.

Return type:

float

Returns:

UV index as a float, or 0.0 if the sensor is uninitialized or read fails.

getLight()[source]

Read the raw ambient light sensor count.

Return type:

float

Returns:

Raw light count as a float, or 0.0 if the sensor is uninitialized or read fails.

getUVS()[source]

Read the raw UVS (UV sensor) count.

Return type:

float

Returns:

Raw UVS count as a float, or 0.0 if the sensor is uninitialized or read fails.

Si1133 – UV Index Sensor

Driver for the Silicon Labs Si1133 UV Index and ambient light sensor.

Hardware:

Interface: I2C Address: 0x55 (default) Supply: 1.62V - 3.6V Measures: UV index, ambient light (lux) via 4 configurable ADC channels

Typical usage:

from drivers.Si1133.Si1133 import Si1133

sensor = Si1133(i2c_bus=1)
sensor.begin()
uv = sensor.readUV()

Note

Requires smbus2 for I2C communication. Uses polynomial evaluation with factory-calibrated coefficients for UV index and lux calculations.

class drivers.Si1133.Si1133.SI1133_Coeff_TypeDef[source]

Bases: object

Single polynomial coefficient used in lux/UV index calculation.

Variables:
  • info – Encodes x-order, y-order, and sign for polynomial evaluation.

  • mag – Magnitude divisor for the polynomial term.

  • shift – Bit-shift value applied to the polynomial term result.

Parameters:
info: int = 0
mag: int = 0
shift: int = 0
__init__(info=0, mag=0, shift=0)
Parameters:
Return type:

None

class drivers.Si1133.Si1133.SI1133_LuxCoeff_TypeDef[source]

Bases: object

Collection of polynomial coefficients for lux calculation.

Variables:
  • coeff_high – Coefficients used when ADC values exceed the high threshold.

  • coeff_low – Coefficients used when ADC values are below the high threshold.

Parameters:
coeff_high: list[SI1133_Coeff_TypeDef]
coeff_low: list[SI1133_Coeff_TypeDef]
__init__(coeff_high, coeff_low)
Parameters:
Return type:

None

class drivers.Si1133.Si1133.SI1133_Samples_TypeDef[source]

Bases: object

Raw ADC sample data read from the Si1133 sensor channels.

Variables:
  • irq_status – Interrupt status byte indicating which channels completed.

  • ch0 – Raw 24-bit ADC value from channel 0 (UV photodiode).

  • ch1 – Raw 24-bit ADC value from channel 1 (visible high).

  • ch2 – Raw 24-bit ADC value from channel 2 (IR).

  • ch3 – Raw 24-bit ADC value from channel 3 (visible low).

Parameters:
irq_status: int = 0
ch0: int = 0
ch1: int = 0
ch2: int = 0
ch3: int = 0
__init__(irq_status=0, ch0=0, ch1=0, ch2=0, ch3=0)
Parameters:
Return type:

None

class drivers.Si1133.Si1133.Si1133[source]

Bases: object

Driver for the Si1133 UV index and ambient light sensor over I2C.

Communicates with the Si1133 via SMBus to configure ADC channels, trigger forced measurements, and compute calibrated UV index and lux values using factory polynomial coefficients.

Variables:
  • i2c_bus – I2C bus number (e.g. 0 or 1).

  • bus – SMBus instance for I2C communication.

  • addr – 7-bit I2C address of the sensor.

Parameters:
  • i2c_bus (int)

  • sensor_address (int)

RATE_SHORT = 96
RATE_NORMAL = 0
RATE_LONG = 32
RATE_VLONG = 64
BITS_24 = 0
BITS_16 = 64
COUNT0 = 64
COUNT1 = 128
F_SMALL_IR = 0
F_MEDIUM_IR = 1
F_LARGE_IR = 2
F_WHITE = 11
F_LARGE_WHITE = 12
F_UV = 24
F_UV_DEEP = 25
SI1133_RESET_CMD_CTR = 0
SI1133_RESET_SW = 1
SI1133_FORCE = 17
SI1133_PAUSE = 18
SI1133_START = 19
SI1133_PARAM_QUERY = 64
SI1133_PARAM_SET = 128
SI1133_REG_PARTID = 0
SI1133_REG_REVID = 1
SI1133_REG_MFRID = 2
SI1133_REG_INFO0 = 3
SI1133_REG_INFO1 = 4
SI1133_REG_HOSTIN3 = 7
SI1133_REG_HOSTIN2 = 8
SI1133_REG_HOSTIN1 = 9
SI1133_REG_HOSTIN0 = 10
SI1133_REG_COMMAND = 11
SI1133_REG_IRQ_ENABLE = 15
SI1133_REG_RESPONSE1 = 16
SI1133_REG_RESPONSE0 = 17
SI1133_REG_IRQ_STATUS = 18
SI1133_REG_HOSTOUT0 = 19
SI1133_REG_HOSTOUT1 = 20
SI1133_REG_HOSTOUT2 = 21
SI1133_REG_HOSTOUT3 = 22
SI1133_REG_HOSTOUT4 = 23
SI1133_REG_HOSTOUT5 = 24
SI1133_REG_HOSTOUT6 = 25
SI1133_REG_HOSTOUT7 = 26
SI1133_REG_HOSTOUT8 = 27
SI1133_REG_HOSTOUT9 = 28
SI1133_REG_HOSTOUT10 = 29
SI1133_REG_HOSTOUT11 = 30
SI1133_REG_HOSTOUT12 = 31
SI1133_REG_HOSTOUT13 = 32
SI1133_REG_HOSTOUT14 = 33
SI1133_REG_HOSTOUT15 = 34
SI1133_REG_HOSTOUT16 = 35
SI1133_REG_HOSTOUT17 = 36
SI1133_REG_HOSTOUT18 = 37
SI1133_REG_HOSTOUT19 = 38
SI1133_REG_HOSTOUT20 = 39
SI1133_REG_HOSTOUT21 = 40
SI1133_REG_HOSTOUT22 = 41
SI1133_REG_HOSTOUT23 = 42
SI1133_REG_HOSTOUT24 = 43
SI1133_REG_HOSTOUT25 = 44
SI1133_PARAM_I2CADDR = 0
SI1133_PARAM_CH_LIST = 1
SI1133_PARAM_ADCCONFIG0 = 2
SI1133_PARAM_ADCSENS0 = 3
SI1133_PARAM_ADCPOST0 = 4
SI1133_PARAM_MEASCONFIG0 = 5
SI1133_PARAM_ADCCONFIG1 = 6
SI1133_PARAM_ADCSENS1 = 7
SI1133_PARAM_ADCPOST1 = 8
SI1133_PARAM_MEASCONFIG1 = 9
SI1133_PARAM_ADCCONFIG2 = 10
SI1133_PARAM_ADCSENS2 = 11
SI1133_PARAM_ADCPOST2 = 12
SI1133_PARAM_MEASCONFIG2 = 13
SI1133_PARAM_ADCCONFIG3 = 14
SI1133_PARAM_ADCSENS3 = 15
SI1133_PARAM_ADCPOST3 = 16
SI1133_PARAM_MEASCONFIG3 = 17
SI1133_PARAM_ADCCONFIG4 = 18
SI1133_PARAM_ADCSENS4 = 19
SI1133_PARAM_ADCPSOT4 = 20
SI1133_PARAM_MEASCONFIG4 = 21
SI1133_PARAM_ADCCONFIG5 = 22
SI1133_PARAM_ADCSENS5 = 23
SI1133_PARAM_ADCPSOT5 = 24
SI1133_PARAM_MEASCONFIG5 = 25
SI1133_PARAM_MEASRATEH = 26
SI1133_PARAM_MEASRATEL = 27
SI1133_PARAM_MEASCOUNT0 = 28
SI1133_PARAM_MEASCOUNT1 = 29
SI1133_PARAM_MEASCOUNT2 = 30
SI1133_PARAM_THRESHOLD0_H = 37
SI1133_PARAM_THRESHOLD0_L = 38
SI1133_PARAM_THRESHOLD1_H = 39
SI1133_PARAM_THRESHOLD1_L = 40
SI1133_PARAM_THRESHOLD2_H = 41
SI1133_PARAM_THRESHOLD2_L = 42
SI1133_PARAM_BURST = 43
SI1133_CMD_RESET_CMD_CTR = 0
SI1133_CMD_RESET = 1
SI1133_CMD_NEW_ADDR = 2
SI1133_CMD_FORCE_CH = 17
SI1133_CMD_PAUSE_CH = 18
SI1133_CMD_START = 19
SI1133_CMD_PARAM_SET = 128
SI1133_CMD_PARAM_QUERY = 64
SI1133_RSP0_CHIPSTAT_MASK = 224
SI1133_RSP0_COUNTER_MASK = 31
SI1133_RSP0_SLEEP = 32
X_ORDER_MASK = 112
Y_ORDER_MASK = 7
SIGN_MASK = 128
UV_INPUT_FRACTION = 15
UV_OUTPUT_FRACTION = 12
UV_NUMCOEFF = 2
ADC_THRESHOLD = 16000
INPUT_FRACTION_HIGH = 7
INPUT_FRACTION_LOW = 15
LUX_OUTPUT_FRACTION = 12
NUMCOEFF_LOW = 9
NUMCOEFF_HIGH = 4
SI1133_OK = 0
SI1133_ERROR_I2C_TRANSACTION_FAILED = 1
SI1133_ERROR_SLEEP_FAILED = 2
uk: ClassVar[list] = [SI1133_Coeff_TypeDef(info=1, mag=30902, shift=5), SI1133_Coeff_TypeDef(info=130, mag=46301, shift=-3)]
lk = SI1133_LuxCoeff_TypeDef(coeff_high=[SI1133_Coeff_TypeDef(info=0, mag=209, shift=0), SI1133_Coeff_TypeDef(info=1665, mag=93, shift=0), SI1133_Coeff_TypeDef(info=2064, mag=65, shift=0), SI1133_Coeff_TypeDef(info=-2671, mag=234, shift=0)], coeff_low=[SI1133_Coeff_TypeDef(info=0, mag=0, shift=0), SI1133_Coeff_TypeDef(info=1921, mag=29053, shift=0), SI1133_Coeff_TypeDef(info=-1022, mag=36363, shift=0), SI1133_Coeff_TypeDef(info=2320, mag=20789, shift=0), SI1133_Coeff_TypeDef(info=-367, mag=57909, shift=0), SI1133_Coeff_TypeDef(info=-1774, mag=38240, shift=0), SI1133_Coeff_TypeDef(info=-608, mag=46775, shift=0), SI1133_Coeff_TypeDef(info=-1503, mag=51831, shift=0), SI1133_Coeff_TypeDef(info=-1886, mag=58928, shift=0)])
__init__(i2c_bus=0, sensor_address=85)[source]

Initialize the Si1133 driver.

Parameters:
  • i2c_bus (int) – I2C bus number to use (0 or 1).

  • sensor_address (int) – 7-bit I2C address of the Si1133 sensor.

Return type:

None

current_milli_time()[source]

Return the current time in milliseconds since epoch.

Return type:

int

Returns:

Current time in milliseconds.

delay_ms(delaytime)[source]

Sleep for the specified number of milliseconds.

Parameters:

delaytime (int) – Delay duration in milliseconds.

Return type:

None

SI1133_registerRead(reg)[source]

Read a single byte from the specified register.

Parameters:

reg (int) – Register address to read from.

Return type:

int

Returns:

The byte value read from the register.

SI1133_registerWrite(reg, data)[source]

Write a single byte to the specified register.

Parameters:
  • reg (int) – Register address to write to.

  • data (int) – Byte value to write.

Return type:

None

SI1133_registerBlockWrite(reg, data)[source]

Write a block of bytes starting at the specified register.

Parameters:
  • reg (int) – Starting register address.

  • data (list[int]) – List of byte values to write.

Return type:

None

SI1133_registerBlockRead(reg, length)[source]

Read a block of bytes starting at the specified register.

Parameters:
  • reg (int) – Starting register address.

  • length (int) – Number of bytes to read.

Return type:

list[int]

Returns:

List of byte values read from the registers.

SI1133_waitUntilSleep()[source]

Poll the response register until the sensor enters sleep state.

Return type:

int

Returns:

Status code (SI1133_OK on success).

SI1133_reset()[source]

Perform a software reset of the Si1133 sensor.

Waits 30 ms before issuing the reset command, then waits 10 ms for the internal reset sequence to complete.

Return type:

None

SI1133_sendCmd(command)[source]

Send a command to the Si1133 and wait for acknowledgment.

Verifies the response counter increments to confirm command execution.

Parameters:

command (int) – Command byte to write to the command register.

Return type:

int

Returns:

Status code (SI1133_OK on success).

SI1133_resetCmdCtr()[source]

Reset the command counter register.

Return type:

int

Returns:

Status code (SI1133_OK on success).

SI1133_measurementForce()[source]

Force a single measurement cycle on all configured channels.

Return type:

int

Returns:

Status code (SI1133_OK on success).

SI1133_measurementStart()[source]

Start autonomous (continuous) measurement mode.

Return type:

int

Returns:

Status code (SI1133_OK on success).

SI1133_paramRead(address)[source]

Read a parameter table value from the sensor.

Parameters:

address (int) – Parameter table address (0x00 - 0x3F).

Return type:

int

Returns:

The parameter value, or an error code if the command failed.

SI1133_paramSet(address, value)[source]

Write a value to the sensor parameter table.

Parameters:
  • address (int) – Parameter table address (0x00 - 0x3F).

  • value (int) – Byte value to set.

Return type:

int

Returns:

Status code (SI1133_OK on success).

SI1133_measurementPause()[source]

Pause autonomous measurement mode.

Return type:

int

Returns:

Status code (SI1133_OK on success).

begin()[source]

Initialize the sensor with default 4-channel configuration.

Resets the sensor and configures channels 0-3 for UV, visible (high/low), and IR measurements with appropriate ADC settings and gain.

Return type:

int

Returns:

Cumulative status code (0 indicates all operations succeeded).

SI1133_deInit()[source]

De-initialize the sensor by pausing measurements and entering sleep.

Return type:

int

Returns:

Cumulative status code (0 indicates all operations succeeded).

SI1133_measurementGet()[source]

Read raw ADC measurement results from all 4 channels.

Reads 13 bytes from the host output registers and assembles them into signed 24-bit values for each channel.

Return type:

SI1133_Samples_TypeDef

Returns:

Sample data containing IRQ status and raw ADC values for channels 0-3.

SI1133_calcPolyInner(_input, fraction, mag, shift)[source]

Compute a single inner term of the polynomial evaluation.

Parameters:
  • _input (int) – Input ADC value.

  • fraction (int) – Fractional bit count for fixed-point scaling.

  • mag (int) – Magnitude divisor.

  • shift (int) – Bit-shift direction and amount (negative = right shift).

Return type:

int

Returns:

Scaled polynomial term value.

SI1133_calcEvalPoly(x, y, input_fraction, output_fraction, numm_coeff, kp)[source]

Evaluate a two-variable polynomial using factory calibration coefficients.

Used internally to convert raw ADC readings to physical units (lux or UV index).

Parameters:
  • x (int) – First input variable (e.g. visible high ADC value).

  • y (int) – Second input variable (e.g. IR ADC value).

  • input_fraction (int) – Fixed-point fractional bits for input scaling.

  • output_fraction (int) – Fixed-point fractional bits for output scaling.

  • numm_coeff (int) – Number of coefficients to evaluate.

  • kp (list[SI1133_Coeff_TypeDef]) – List of polynomial coefficients.

Return type:

int

Returns:

Absolute value of the evaluated polynomial result.

SI1133_getUv(uv)[source]

Convert a raw UV ADC value to a scaled UV index (fixed-point).

Parameters:

uv (int) – Raw ADC value from the UV channel.

Return type:

int

Returns:

Scaled UV index value in fixed-point representation.

SI1133_getLux(vis_high, vis_low, ir)[source]

Convert raw visible and IR ADC values to a scaled lux value (fixed-point).

Selects high-range or low-range coefficients based on whether the ADC values exceed the threshold.

Parameters:
  • vis_high (int) – Raw ADC value from the visible high-sensitivity channel.

  • vis_low (int) – Raw ADC value from the visible low-sensitivity channel.

  • ir (int) – Raw ADC value from the IR channel.

Return type:

int

Returns:

Scaled lux value in fixed-point representation.

SI1133_measureLuxUvi()[source]

Force a measurement and return calibrated lux and UV index values.

Triggers a forced measurement, waits for completion, then computes lux and UV index from the raw ADC samples.

Return type:

tuple[float, float]

Returns:

Tuple of (lux, uv_index) as floating-point values.

readUV()[source]

Read the UV index from the sensor with timeout protection.

Forces a measurement, waits for completion (up to TIMEOUT ms), and returns the calibrated UV index. Values above 15 are treated as invalid and return 0.

Return type:

Union[float, Literal[0]]

Returns:

UV index as a float, or 0 if the reading is invalid or timed out.

SI1133_getHardwareID()[source]

Read the hardware part ID register.

Return type:

int

Returns:

Part ID byte (expected 0x33 for Si1133).

SI1133_getMeasurement()[source]

Read the latest measurement results and compute lux and UV index.

Unlike SI1133_measureLuxUvi, this does not force a new measurement; it reads whatever data is currently in the output registers.

Return type:

tuple[float, float]

Returns:

Tuple of (lux, uv_index) as floating-point values.

SI1133_getIrqStatus()[source]

Read the interrupt status register.

Return type:

int

Returns:

IRQ status byte indicating which channels have completed measurement.

Particulate Matter Sensors

CubicPM3006

CubicPM6303

Driver for the Cubic PM6303 particulate matter sensor (Modbus RTU).

Reads PM1, PM2.5, PM4.25, PM10, and TSP mass concentrations plus airflow rate over a Modbus RTU serial link. Also supports configuring the pump stop interval.

Hardware:

Interface: Modbus RTU (RS-485 / UART), 9600 baud Default port: /dev/ttyAMA2 Supply: 5 V (with integrated pump)

Typical usage:

sensor = CubicPM6303()
sensor.initialize("/dev/ttyAMA2", baud=9600)
data = sensor.getPM(sendCommand=True)

Note

Requires the pymodbus package (legacy sync API).

class drivers.CubicPM6303.CubicPM6303.CubicPM6303[source]

Bases: object

Modbus RTU driver for the Cubic PM6303 particulate matter sensor.

Variables:
  • DEBUG – When True, enables verbose Modbus-level logging.

  • data – Most recent PM, particle-count, and flow readings.

  • pump_stop_interval – Pump stop interval in minutes written at init.

DEBUG = False
TIMEOUT = 8000
cubic_client = None
data: ClassVar[dict[str, float]] = {'flow': 0.0, 'p1': 0.0, 'p2': 0.0, 'p3': 0.0, 'p4': 0.0, 'p5': 0.0, 'p6': 0.0, 'pm1': 0.0, 'pm10': 0.0, 'pm100': 0.0, 'pm2_5': 0.0, 'pm4_25': 0.0}
pump_stop_interval = 0
__init__()[source]

Create an uninitialised CubicPM6303 instance.

Call initialize() to open the Modbus connection.

Return type:

None

initialize(cubic_port='/dev/ttyAMA2', baud=9600)[source]

Open the Modbus serial connection and configure the pump.

Parameters:
  • cubic_port (str) – Serial device path for the Modbus bus.

  • baud (int) – Baud rate.

Return type:

Literal[1]

Returns:

Always 1 on success.

getPM(sendCommand=False)[source]

Read particulate-matter concentrations and flow rate.

Parameters:

sendCommand (bool) – If True, query the sensor before returning data.

Return type:

dict[str, float]

Returns:

Dictionary with keys pm1, pm2_5, pm4_25, pm10, pm100 (mass concentrations) and flow (L/min).

getData()[source]

Read 24 Modbus input registers and return them as hex strings.

Return type:

list[str]

Returns:

List of hex-string register values, or an empty list on error.

set_pump_stop_interval(interval_minutes)[source]

Set the pump stop interval on the Cubic PM6303 sensor.

Parameters:

interval_minutes (int) – Pump interval in minutes (0–10000).

Return type:

bool

Returns:

True if the command succeeded, False otherwise.

micros()[source]

Return the current time in microseconds since the epoch.

Return type:

int

Returns:

Integer microsecond timestamp.

millis()[source]

Return the current time in milliseconds since the epoch.

Return type:

int

Returns:

Integer millisecond timestamp.

UART / Serial Sensors

Noise – Noise Level Sensor

Driver for the Oizom custom noise sensor (SAMD-based).

Communicates via UART with a SAMD microcontroller that measures LAeq and LZeq noise levels. Responses are 8-byte binary frames.

Hardware:

Interface: UART (serial) Port: /dev/ttyACM0 (typical) Baud rate: 115200 Protocol: ASCII command -> 8-byte binary response frame

Typical usage:

import serial
from drivers.Noise.Noise import Noise

ser = serial.Serial(port="/dev/ttyACM0", baudrate=115200, timeout=2)
noise = Noise()
noise.initialize(ser, {"en": 1})
reading = noise.get_noise()

Note

Requires pyserial for serial communication.

class drivers.Noise.Noise.Noise[source]

Bases: object

Driver for the SAMD-based noise level sensor.

Sends NOISE?\r\n commands over UART and parses 8-byte binary response frames containing LAeq (A-weighted) and LZeq (Z-weighted) noise levels.

Variables:
  • FRAME_LEN – Expected response frame length in bytes.

  • TIMEOUT – Maximum wait time for a complete frame in milliseconds.

  • ser – Open serial.Serial instance for UART communication.

  • LAeq (list[float]) – Accumulated A-weighted Leq readings for energy averaging.

  • LZeq (list[float]) – Accumulated Z-weighted Leq readings for energy averaging.

  • debug – Whether verbose debug logging is enabled.

  • part_number – Hardware part number identifier.

__init__()[source]

Initialize default attributes for the Noise sensor.

Return type:

None

initialize(serial_port, configuration)[source]

Configure the noise sensor with a serial port and device configuration.

Parameters:
  • serial_port – An open serial.Serial instance.

  • configuration (dict) – Device configuration dict. Must contain "en" key to enable the sensor. Optional keys: "debug", "pn".

Return type:

bool

Returns:

True if the sensor is enabled and ready, False otherwise.

send_command(command=b'NOISE?\\r\\n', delay=0.5)[source]

Send a command over UART and read the 8-byte frame response.

Parameters:
  • command (bytes) – Raw bytes to transmit. Defaults to b"NOISE?\r\n".

  • delay (float) – Seconds to wait after sending before reading. Allows the SAMD microcontroller time to process and respond.

Return type:

bytes | None

Returns:

The 8-byte response frame, or None on timeout / error.

parse_noise_response(frame)[source]

Parse an 8-byte binary frame to extract LAeq and LZeq values.

Frame format: [0x01, 0x03, 0x00, LAeq_H, LAeq_L, LZeq_H, LZeq_L, 0x0A]

Parameters:

frame (bytes) – The 8-byte response frame from the sensor.

Return type:

dict | None

Returns:

A dict with "LAeq" and "LZeq" float values in dB, or None if the frame is invalid.

get_noise()[source]

Query the sensor for a noise reading and accumulate results.

Sends the default NOISE? command, parses the response, and appends valid LAeq/LZeq values to the internal accumulation lists.

Return type:

dict | None

Returns:

A dict with "LAeq" and "LZeq" float values, or None on failure.

energy_average()[source]

Compute the energy-averaged (logarithmic) mean of accumulated LAeq values.

Uses the formula 10 * log10(mean(10^(dB/10))) which is the acoustically correct way to average decibel values. Falls back to arithmetic mean on numerical errors.

Return type:

float

Returns:

The energy-averaged LAeq value in dB, rounded to 2 decimal places.

getSensorReading()[source]

Get the latest noise reading mapped to configured parameter codes.

Return type:

dict

Returns:

A dict mapping parameter short-codes to their current values. Empty dict on failure.

putSensorValue(result=None)[source]

Aggregate accumulated noise data and reset internal buffers.

Computes energy-averaged LAeq, max LAeq, and min LAeq from the readings collected since the last call, then clears the buffers.

Parameters:

result (dict | None) – Optional dict to populate with aggregated values. Creates a new dict if None.

Return type:

dict

Returns:

The result dict populated with aggregated noise parameters.

Wind – Wind Speed & Direction

Driver for HONGYUV and Calypso ultrasonic wind sensors via SDI-12.

Communicates via a serial SDI-12 bridge to read wind speed, direction, and gust values. Supports multiple sensor variants identified by part number.

Hardware:

Interface: UART via SDI-12 bridge Port: /dev/ttyACM0 (typical) Baud rate: 1200 (SDI-12 standard), 7E1 framing Protocol: SDI-12 ASCII commands (e.g., 0R0!, 0R6!)

Typical usage:

import serial
from drivers.Wind.Wind import Wind

ser = serial.Serial(
    port="/dev/ttyACM0", baudrate=1200,
    bytesize=serial.SEVENBITS, parity=serial.PARITY_EVEN,
    stopbits=serial.STOPBITS_ONE, timeout=2,
)
wind = Wind()
wind.initialize(ser, {"en": 1, "pn": 211, "parameters": [...]})
reading = wind.getSensorReading()

Note

Requires pyserial for serial communication. Part numbers: 211 = 40 m/s HONGYUV, 212 = Calypso, 213 = 60 m/s HONGYUV with gust support.

class drivers.Wind.Wind.Wind[source]

Bases: object

Driver for HONGYUV / Calypso ultrasonic wind sensors over SDI-12.

Sends SDI-12 commands through a serial bridge to read wind speed, direction, and (optionally) gust. Accumulates time-series data for batch export.

Variables:
  • ser (Serial) – Open serial.Serial instance for the SDI-12 bridge.

  • configuration (dict) – Device configuration dictionary from Gateway.

  • address – SDI-12 sensor address (single character, default "0").

  • part_number – Hardware variant identifier (211, 212, or 213).

  • wind_angle (list[float]) – Accumulated wind direction readings in degrees.

  • wind_speed (list[float]) – Accumulated wind speed readings in m/s.

  • wind_gust (list[float]) – Accumulated gust speed readings in m/s (part 213 only).

  • timestamp (list[int]) – Accumulated epoch timestamps in milliseconds.

  • max_wind_speed – Maximum measurable wind speed for the sensor variant.

__init__()[source]

Initialize default attributes for the Wind sensor.

Return type:

None

initialize(serial_port, configuration)[source]

Configure the wind sensor with a serial port and device configuration.

Verifies that the sensor responds at the configured SDI-12 address and optionally sets the gust averaging period for part 213 sensors.

Parameters:
  • serial_port (Serial) – An open serial.Serial instance connected to the SDI-12 bridge.

  • configuration (dict) – Device configuration dict with keys like "pn", "debug", "sensor_address", "gust_avg_period", and "parameters".

Return type:

bool

Returns:

True if address verification succeeded, False otherwise.

send_command(command, delay=1.5)[source]

Send an SDI-12 command via the serial bridge and read the response.

Parameters:
  • command (str) – ASCII SDI-12 command string (e.g., "0R0!").

  • delay (float) – Seconds to wait after sending before reading. SDI-12 timing requires a minimum delay for sensor processing.

Return type:

str

Returns:

The decoded ASCII response string, or empty string on error.

verify_address()[source]

Verify the sensor responds at the configured SDI-12 address.

Return type:

str

Returns:

The raw response string from the sensor.

set_gust_avg_period(interval=300)[source]

Set the gust averaging period on the sensor (part 213 only).

Parameters:

interval (int) – Averaging period in seconds (default 300).

Return type:

None

parse_wind_response(response, cmd_type)[source]

Parse an SDI-12 response string into wind measurement values.

Handles different response formats based on command type and sensor part number (211=HONGYUV, 212=Calypso, 213=HONGYUV with gust).

Parameters:
  • response (str) – Raw ASCII response string from the sensor.

  • cmd_type (str) – The SDI-12 command type that generated the response ("R0" for standard, "R6" for gust).

Return type:

dict | None

Returns:

A dict with "address", "speed", "angle", and "gust" keys, or None if the response could not be parsed.

get_speed_and_angle()[source]

Query the sensor for wind speed and direction via the R0 command.

Return type:

tuple[float, float] | None

Returns:

A tuple (angle, speed) in (degrees, m/s), or None on failure.

get_gust()[source]

Query the sensor for gust speed via the R6 command (part 213 only).

Return type:

float | None

Returns:

Gust speed in m/s, or None on failure.

getSensorReading()[source]

Get the latest wind reading and accumulate values for batch export.

Queries speed, angle, and optionally gust, then maps them to the configured parameter short-codes.

Return type:

dict

Returns:

A dict mapping parameter short-codes to their current values, plus a "t" timestamp. Empty dict on failure.

putSensorValue(result=None)[source]

Export accumulated wind data as time-series lists and reset buffers.

Parameters:

result (dict | None) – Optional dict to populate with time-series data. Creates a new dict if None.

Return type:

dict

Returns:

The result dict with each parameter mapped to [[values...], [timestamps...]].

Rain – Rainfall Sensor

Driver for the Oizom tipping-bucket rain gauge (SAMD-based).

Communicates via UART with a SAMD microcontroller that measures cumulative rainfall. Sends ASCII commands and receives colon-delimited text responses.

Hardware:

Interface: UART (serial) Port: Configured via serial port argument Baud rate: Configured externally on the serial.Serial instance Protocol: ASCII command (RAIN?\r\n) -> text response (RAIN:<value>)

Typical usage:

import serial
from drivers.Rain.Rain import Rain

ser = serial.Serial(port="/dev/ttyACM0", baudrate=115200, timeout=2)
rain = Rain()
rain.initialize(ser, {"en": 1, "parameters": [{"sc": "rain"}]})
value = rain.get_rain()

Note

Requires pyserial for serial communication.

class drivers.Rain.Rain.Rain[source]

Bases: object

Driver for an SAMD-based tipping-bucket rain gauge.

Sends RAIN? commands over UART and parses text responses containing cumulative rainfall values.

Variables:
  • ser (Serial) – Open serial.Serial instance for UART communication.

  • configuration (dict) – Device configuration dictionary.

  • debug – Whether verbose debug logging is enabled.

  • requested_parameters (list[str]) – List of parameter short-codes from configuration.

__init__()[source]

Initialize default attributes for the Rain sensor.

Return type:

None

initialize(serial_port, configuration)[source]

Configure the rain sensor with a serial port and device configuration.

Parameters:
  • serial_port (Serial) – An open serial.Serial instance.

  • configuration (dict) – Device configuration dict. Must contain "en" key to enable the sensor and a "parameters" list.

Return type:

bool

Returns:

True if the sensor is enabled and ready, False otherwise.

extract_requested_parameters(configuration)[source]

Extract parameter short-codes from configuration.

Parameters:

configuration (dict) – Device configuration dict containing a "parameters" list.

Return type:

list[str]

Returns:

The list of requested parameter short-codes.

send_command(command=b'RAIN?\\r\\n')[source]

Send a command over UART and parse the rainfall response.

Parameters:

command (bytes) – Raw bytes to transmit. Defaults to b"RAIN?\r\n".

Return type:

float

Returns:

Rainfall value in inches, or 0.0 on error.

get_rain()[source]

Query the sensor for the current rainfall measurement.

Return type:

float

Returns:

Rainfall value in inches, or 0.0 on error.

getSensorReading()[source]

Get the latest rain reading (not implemented).

Return type:

dict

Returns:

Empty dict (placeholder for interface compliance).

putSensorValue(value)[source]

Fetch the current rain reading and add it to the result dict.

Parameters:

value (dict) – Dict to populate with the rain measurement.

Return type:

dict

Returns:

The updated dict with the rain parameter short-code mapped to the value.

Modbus / Industrial Sensors

BAM – Beta Attenuation Monitor

Driver for the Met One BAM (Beta Attenuation Monitor) particulate matter sensor.

Communicates over RS-232 serial (UART) using a wake-then-query protocol that mirrors the original Node-RED integration flow. The BAM reports PM concentration, raw PM counts, temperature, humidity, pressure, and two auxiliary temperatures via a CSV response.

Typical usage:

bam = BAM()
if bam.initialize(port="/dev/ttyUSB0", baud=115200):
    data = bam.getPM(send_command=True)
class drivers.BAM.BAM.BAM[source]

Bases: object

Driver for Met One BAM (Beta Attenuation Monitor) sensors.

Protocol (matching Node-RED flow): 1. Send wake command (rnrnrn) 2. Wait 10 seconds for BAM to respond 3. Send data request (“4rn”) 4. Read 8 lines of response 5. Line[4] = device ID, Line[7] = CSV data

CSV fields (index): [0]?, [1]=PM conc, [2]=PM raw, [3-7]=?,

[8]=temp, [9]=humidity, [10]=pressure, [11]=t1, [12]=t2

WAKE_COMMAND = '\r\n\r\n\r\n'
DATA_REQUEST = '4\r\n'
WAKE_DELAY = 10
EXPECTED_LINES = 8
__init__()[source]

Initialize BAM driver with default data fields and no serial connection.

Return type:

None

initialize(port='/dev/ttyUSB0', baud=115200)[source]

Open the serial connection to the BAM sensor.

Parameters:
  • port (str) – Serial device path (e.g. /dev/ttyUSB0).

  • baud (int) – Baud rate for the RS-232 link.

Return type:

bool

Returns:

True if the serial port was opened successfully, False otherwise.

getPM(send_command=False)[source]

Return the latest PM and environmental readings.

Parameters:

send_command (bool) – When True, query the BAM for fresh data before returning. When False, return the last cached readings.

Return type:

dict[str, float]

Returns:

Dictionary with keys pm, pm_raw, temp, humidity, pressure, t1, and t2.

PyModbus – Generic Modbus Driver

PyModbus Modbus TCP/RTU server driver.

Wraps the pymodbus library to expose a Modbus server (TCP or RTU) that other devices can poll for sensor data. The server holds a configurable number of 32-bit IEEE-754 float registers that the Oizom firmware updates each sensing cycle.

Protocol: Modbus TCP or Modbus RTU over RS-485.

Typical usage:

server = PyModbus(size=20)
ctx = server.build_context(slaveId=1, fixId=True)
server.update_data(1, 3, 0, 23.5)
server.run_tcp_server("0.0.0.0", 5020)
class drivers.PyModbus.PyModbus.PyModbus[source]

Bases: object

Modbus server wrapper for exposing sensor data over TCP or RTU.

Variables:
  • size – Number of 32-bit float register slots to allocate.

  • default_value – Default register fill value.

  • context – Active ModbusServerContext after build_context is called.

Parameters:
  • size (int)

  • default_value (int)

context = None
__init__(size, default_value=0)[source]

Initialize the PyModbus server with a register block size.

Parameters:
  • size (int) – Number of 32-bit float register slots.

  • default_value (int) – Default fill value for all registers.

Return type:

None

size = 0
default_value = 0
build_context(slaveId, fixId)[source]

Build and return a Modbus data-store context.

Parameters:
  • slaveId (int) – Modbus slave/unit identifier.

  • fixId (bool) – If True, create a multi-slave context keyed by slaveId; otherwise create a single-slave context.

Return type:

ModbusServerContext

Returns:

The constructed ModbusServerContext.

run_tcp_server(server_ip, server_port)[source]

Start a blocking Modbus TCP server.

Parameters:
  • server_ip (str) – IP address to bind (e.g. "0.0.0.0").

  • server_port (int) – TCP port number.

Return type:

None

run_rtu_server(server_port, server_baud, server_parity)[source]

Start a blocking Modbus RTU (serial) server.

Parameters:
  • server_port (str) – Serial device path (e.g. "/dev/ttyAMA0").

  • server_baud (int) – Baud rate for the serial link.

  • server_parity (str) – Parity setting ("N", "E", or "O").

Return type:

None

update_data(slaveId, register, register_address, register_value)[source]

Write a 32-bit float value into the Modbus register store.

Parameters:
  • slaveId (int) – Target slave/unit ID in the context.

  • register (str) – Register type code used by setValues.

  • register_address (int) – Starting register address.

  • register_value (float) – Float value to encode and store.

Return type:

None

Infrastructure Drivers

MAX17261 – Battery Fuel Gauge

Driver for the Maxim MAX17261 fuel gauge IC.

Provides I2C communication with the MAX17261 ModelGauge m5 EZ fuel gauge for monitoring lithium-ion battery state of charge (SOC), voltage, current, and remaining capacity. The driver handles power-on-reset (POR) detection, automatic initialization with battery parameters, and register-level access.

The MAX17261 uses Maxim’s ModelGauge m5 EZ algorithm which requires minimal configuration – only battery capacity, empty voltage, and charge termination current. The IC learns the battery characteristics over time.

Hardware:
  • Interface: I2C

  • Default address: 0x36

  • Supply: 1.7V to 5.5V

  • Current sense: External shunt resistor (default 0.02 ohm)

  • Supported chemistries: Li-ion, LiPo, LiFePO4

Typical usage:

gauge = MAX17261(address=0x36, device_number=0, batteryCapacity=6000)
gauge.setValues(debug=True)
soc = gauge.getSOC()
voltage = gauge.getInstantaneousVoltage()
current = gauge.getInstantaneousCurrent()

Note

Requires smbus2. Register format follows Maxim AN6358 “Standard Register Formats”. The driver auto-detects POR events and re-initializes the IC accordingly.

class drivers.MAX17261.MAX17261.MAX17261[source]

Bases: object

I2C driver for the Maxim MAX17261 battery fuel gauge.

Monitors battery state including voltage, current, state of charge, remaining capacity, and time-to-empty/full. Handles POR detection and automatic re-initialization with configured battery parameters.

The register multipliers follow Maxim AN6358 “Standard Register Formats” and are adjusted based on the sense resistor value.

Variables:
  • I2C_ADDRESS – I2C slave address (default 0x36).

  • debug – Enable verbose fuel gauge logging.

Parameters:
  • address (int)

  • device_number (int)

  • batteryCapacity (int)

Status = 0
VCell = 9
AvgVCell = 25
Current = 10
AvgCurrent = 11
RepSOC = 6
RepCap = 5
TimeToEmpty = 17
TimeToFull = 32
DesignCap = 24
VEmpty = 58
Modelcfg = 219
debug = False
__init__(address=54, device_number=0, batteryCapacity=6000)[source]
Parameters:
  • address (int)

  • device_number (int)

  • batteryCapacity (int)

Return type:

None

I2C_ADDRESS = 54
read_byte_data(reg)[source]
Parameters:

reg (int)

Return type:

int

write_byte_data(reg, value)[source]
Parameters:
Return type:

None

writeReg16Bit(reg, value)[source]
Parameters:
Return type:

None

readReg16Bit(reg)[source]
Parameters:

reg (int)

Return type:

int

set_target(target)[source]
Parameters:

target (int)

Return type:

None

getInstantaneousCurrent()[source]
Return type:

float

getInstantaneousVoltage()[source]
Return type:

float

setCapacity(batteryCapacity)[source]
Parameters:

batteryCapacity (int)

Return type:

None

setEmptyCell(EmptyCell)[source]
Parameters:

EmptyCell (int)

Return type:

None

setModelcfg(CHEM_ID)[source]
Parameters:

CHEM_ID (int)

Return type:

None

getCapacity()[source]
Return type:

float

getCell()[source]
Return type:

float

getModel()[source]
Return type:

int

getValues()[source]
Return type:

None

setValues(debug=True, res=0.02, resFlag=False)[source]
Parameters:
Return type:

None

getRemainingCapacity()[source]
Return type:

float

check_por()[source]
Return type:

bool

wait_dnr()[source]
Return type:

None

initialize()[source]
Return type:

None

write_and_verify_register(reg, value)[source]
Parameters:
Return type:

None

clear_por()[source]
Return type:

None

setResistSensor(resistorValue)[source]
Parameters:

resistorValue (float)

Return type:

None

getResistSensor()[source]
Return type:

float

getSOC()[source]
Return type:

float

getTimeToEmpty()[source]
Return type:

float

getTimeToFull()[source]
Return type:

float

get_variables(offset, length)[source]
Parameters:
Return type:

list[int]

get_target()[source]
Return type:

int

get_feedback()[source]
Return type:

int

MCP230XX – GPIO Expander

Driver for MCP23008 and MCP23017 I2C GPIO expander ICs.

Provides digital I/O, interrupt management, fan control, power rail resets, UART mux selection, and SAMD co-processor reset via the MCP230XX registers. Supports both 8-bit (MCP23008) and 16-bit (MCP23017) register addressing schemes.

Typical usage:

mcp = MCP230XX(chip="MCP23017", i2cAddress=0x21, devicenumber=1)
mcp.resetDefault()
mcp.FAN_HIGH()
class drivers.MCP230XX.MCP230XX.MCP230XX[source]

Bases: object

I2C GPIO expander driver for MCP23008/MCP23017.

Variables:
  • i2cAddress – 7-bit I2C address of the expander.

  • bussmbus2.SMBus instance for I2C communication.

  • chip – Chip variant string ("MCP23008" or "MCP23017").

  • bank – Register addressing scheme ("8bit" or "16bit").

  • callBackFuncts – Per-pin interrupt callback function list.

Parameters:
  • chip (str)

  • i2cAddress (int)

  • devicenumber (int)

  • regScheme (str)

FAN = 2
SAMD_RST = 0
B4SW = 3
PIN_A = 6
PIN_B = 5
FAN_STATUS = 4
RPI_FAN = 1
POWER_3V3 = 9
POWER_5V = 8
A_UART = 3
B_UART = 7
sensor_bus = None
__init__(chip='MCP23017', i2cAddress=33, devicenumber=1, regScheme='16bit')[source]

Initialize the MCP230XX expander over I2C.

Parameters:
  • chip (str) – IC variant – "MCP23008" or "MCP23017".

  • i2cAddress (int) – 7-bit I2C address (default 0x21).

  • devicenumber (int) – Linux I2C bus number.

  • regScheme (str) – Register addressing ("8bit" or "16bit").

Return type:

None

bus = None
resetDefault()[source]

Configure all application-specific pins to their default directions and levels.

Return type:

None

FAN_HIGH()[source]

Drive the external fan power pin HIGH (on).

Return type:

None

FAN_LOW()[source]

Drive the external fan power pin LOW (off).

Return type:

None

samd_rst()[source]

Pulse the SAMD co-processor reset line (5-second hold).

Return type:

None

samd_boot_mode()[source]

Double-tap the SAMD reset line to enter bootloader mode.

Return type:

None

power_3v3_rst()[source]

Cycle the 3.3 V power rail off for 5 seconds, then back on.

Return type:

None

power_5v_rst()[source]

Cycle the 5 V power rail off for 5 seconds, then back on.

Return type:

None

select_lora()[source]

Set the UART mux to route to the LoRa module.

Return type:

None

select_PM100()[source]

Set the UART mux to route to the PM100 dust sensor.

Return type:

None

select_UART_1()[source]

Set the UART mux to route to auxiliary UART channel 1.

Return type:

None

select_UART_UV()[source]

Set the UART mux to route to the UV sensor UART channel.

Return type:

None

readFAN(fan_delay)[source]

Sample the fan tachometer pins for fan_delay seconds and evaluate health.

Parameters:

fan_delay (int) – Duration in seconds to sample the two fan status pins.

Return type:

bool

Returns:

True if the fan is spinning (signal is toggling), False if stuck.

single_access_read(reg=0)[source]

single_access_read, function to read a single data register of the MCP230xx gpio expander

Parameters:

reg (int)

Return type:

int

single_access_write(reg=0, regValue=0)[source]

single_access_write, function to write to a single data register of the MCP230xx gpio expander

Parameters:
Return type:

None

register_bit_select(pin, reg8A, reg16A, reg8B, reg16B)[source]

register_bit_select, function to return the proper register and bit position to use for a particular GPIO and GPIO function

Parameters:
Return type:

tuple[int, int]

interrupt_options(outputType='activehigh', bankControl='separate')[source]

interrupt_options, function to set the options for the 2 interrupt pins

Parameters:
  • outputType (str)

  • bankControl (str)

Return type:

None

set_register_addressing(regScheme='8bit')[source]

set_register_addressing, function to change how the registers are mapped. For an MCP23008, bank should always equal ‘8bit’. This sets bit 7 of the IOCON register

Parameters:

regScheme (str)

Return type:

None

pinMode(pin, mode, pullUp='disable')[source]

pinMode, function to set up a GPIO pin to either an input or output. The input pullup resistor can also be enabled. This sets the appropriate bits in the IODIRA/B and GPPUA/B registers

Parameters:
Return type:

None

invert_input(pin, invert=False)[source]

invert_input, function to invert the output of the pins corresponding GPIO register bit. Sets bit in IPOLA/B

Parameters:
Return type:

None

digitalWrite(pin, value)[source]

digitalWrite, function to set the state of a GPIO output pin via the appropriate bit in the OLATA/B registers

Parameters:
Return type:

None

input(pin)[source]

input, function to get the current level of a GPIO input pin by reading the appropriate bit in the GPIOA/B registers

Parameters:

pin (int)

Return type:

int

input_at_interrupt(pin)[source]

input_at_interrupt, function to get the current level of a GPIO input pin when an interrupt has occurred by reading the appropriate bit in the INTCAPA/B registers

Parameters:

pin (int)

Return type:

int

add_interrupt(pin, callbackFunctLow='empty', callbackFunctHigh='empty')[source]

add_interrupt, function to set up the interrupt options for a specific GPIO including callback functions to be executed when an interrupt occurs

Parameters:
Return type:

None

remove_interrupt(pin)[source]

remove_interrupt, function to remove the interrupt settings from an MCP230xx pin

Parameters:

pin (int)

Return type:

None

callbackA(gpio)[source]

function called by RPI.GPIO on an bank A interrupt condition. This function will figure out which MCP230xx pin caused the interrupt and initiate the appropriate callback function

Parameters:

gpio (int)

Return type:

None

callbackB(gpio)[source]

function called by RPI.GPIO on an bank B interrupt condition. This function will figure out which MCP230xx pin caused the interrupt and initiate the appropriate callback function

Parameters:

gpio (int)

Return type:

None

callbackBoth(gpio)[source]

function called by RPI.GPIO on either a bank A or bank B interrupt condition. This function will figure out which MCP230xx pin caused the interrupt and initiate the appropriate callback function

Parameters:

gpio (int)

Return type:

None

register_reset()[source]

register_reset, function to put chip back to default settings

Return type:

None

GPIO Utilities

GPIO utility functions for Raspberry Pi BCM pin control.

Wraps RPi.GPIO to provide a simplified interface for pin setup, digital read/write, cleanup, and I2C multiplexer channel selection. Tracks pin modes internally so that pins are auto-configured on first use.

drivers.gpio.gpio.setup(pin, mode, pullup=None, initial=False)[source]

Configure a GPIO pin as input or output.

Parameters:
  • pin (int) – BCM GPIO pin number.

  • mode (str) – "in" for input or "out" for output.

  • pullup (bool | None) – If not None, enable pull-up (True) or pull-down (False) for inputs.

  • initial (bool) – Initial output level when mode is "out" (True = HIGH).

Return type:

None

drivers.gpio.gpio.set(pin, value)[source]

Set a GPIO pin HIGH or LOW, auto-configuring as output if needed.

Parameters:
  • pin (int) – BCM GPIO pin number.

  • value (bool) – True for HIGH, False for LOW.

Return type:

None

drivers.gpio.gpio.read(pin)[source]

Read the logic level of a GPIO pin, auto-configuring as input if needed.

Parameters:

pin (int) – BCM GPIO pin number.

Returns:

True if HIGH, False if LOW.

Return type:

bool

drivers.gpio.gpio.input(pin)[source]

Alias for read() – read the logic level of a GPIO pin.

Parameters:

pin (int) – BCM GPIO pin number.

Returns:

True if HIGH, False if LOW.

Return type:

bool

drivers.gpio.gpio.output(pin, value)[source]

Alias for set() – set a GPIO pin HIGH or LOW.

Parameters:
  • pin (int) – BCM GPIO pin number.

  • value (bool) – True for HIGH, False for LOW.

Return type:

None

drivers.gpio.gpio.mode(pin)[source]

Return the current configured mode of a pin.

Parameters:

pin (int) – BCM GPIO pin number.

Returns:

"in", "out", or None if the pin has not been set up.

Return type:

str | None

drivers.gpio.gpio.cleanup(pin=None, assert_exists=False)[source]

Release GPIO resources for a specific pin or all pins.

Parameters:
  • pin (int | None) – BCM GPIO pin number, or None to clean up all pins.

  • assert_exists (bool) – Unused; kept for API compatibility.

Return type:

None

drivers.gpio.gpio.setwarnings(value)[source]

Enable or disable RPi.GPIO runtime warnings.

Parameters:

value (bool) – True to enable warnings, False to suppress.

Return type:

None

drivers.gpio.gpio.setmode(value)[source]

Set the GPIO pin numbering scheme (BCM or BOARD).

Parameters:

value (int) – GPIO.BCM or GPIO.BOARD.

Return type:

None

drivers.gpio.gpio.select_I2C(pn=24, ch=-1)[source]

Select an I2C multiplexer channel via GPIO pins 24 and 25.

Maps a part-number or channel identifier to the two-bit address lines that control the Oizom I2C mux. A 100 ms settling delay is applied after switching.

Parameters:
  • pn (int) – Part number or logical channel identifier.

  • ch (int) – Explicit channel override; if not -1, overrides pn.

Return type:

None

DeviceDetector – I2C Device Scanner

I2C and USB device scanner for identifying connected serial peripherals.

Detects ACM devices (OizomDust, Lasan), GSM modules (Quectel, Telit), and generic USB-serial adapters (QinHeng HL-340) by matching vendor/product IDs. Provides stable port mapping based on USB topology so that device assignments survive reboots and replug events.

class drivers.DeviceDetector.DeviceDetector.DeviceDetector[source]

Bases: object

Scans USB and serial buses to detect and classify connected hardware devices.

Maintains VID/PID lookup tables for known ACM, GSM, and USB-serial devices and exposes methods that return structured port information suitable for driver initialization.

__init__()[source]

Initialize device configuration tables for all known peripherals.

Return type:

None

scan_acm_devices()[source]

Scan for ACM devices (OizomDust, Lasan) and map them to their ports. Returns: { “OizomDust”: {“vid”: …, “pid”: …, “port”: …}, … }

Return type:

dict[str, str]

scan_gsm_devices()[source]

Scan for GSM devices and map ports by function (gps, modem, etc.)

Return type:

dict[str, dict]

Returns:

{
“Telit”: {

“vid”: “0x1bc7”, “pid”: “0x1033”, “ports”: {

”aux”: “/dev/ttyUSB0”, “gps”: “/dev/ttyUSB1”, “modem”: “/dev/ttyUSB2”,

}

}

}

get_gsm_gps_port(device_name='Quectel')[source]

Get GPS port for specified GSM device :type device_name: str :param device_name: “Quectel” or “Telit”

Return type:

str | None

Returns:

GPS port path or None

Parameters:

device_name (str)

get_gsm_modem_port(device_name='Quectel')[source]

Get Modem/AT port for specified GSM device :type device_name: str :param device_name: “Quectel” or “Telit”

Return type:

str | None

Returns:

Modem port path or None

Parameters:

device_name (str)

scan_usb_devices()[source]

Scan for USB serial devices (QinHeng HL-340 etc.) and assign logical names dynamically from ttyUSB indices when available.

Return type:

dict

Returns:

{

“QinHengHL340_1”: {“vid”: “0x1a86”, “pid”: “0x7523”, “port”: “/dev/ttyUSB3”, “path”: “1-1.1.2.2:1.0”}, “QinHengHL340_2”: {…}, … “Unmapped_1-1.1.2.9:1.0”: {…}, # if tty index is unavailable

}