Source code for drivers.ELTCO2.ELTCO2

#!/usr/bin/python
"""Driver for the ELT CO2 I2C carbon dioxide sensor.

Provides I2C communication with the ELT (Environmental Sensor Technology)
CO2 sensor module. The driver reads CO2 concentration from the sensor's
internal registers using a command-response protocol over I2C.

The sensor supports sleep/wake control, manual and automatic calibration
(MCDL/ACDL), and returns CO2 concentration in ppm.

Hardware:
    - Interface: I2C
    - Default address: 0x31
    - Supply: 3.3V / 5V
    - CO2 range: 0-5000 ppm (typical)
    - Response time: <60 seconds (T90)

Typical usage::

    sensor = ELTCO2(address=0x31, device_number=0)
    co2_ppm = sensor.getCO2()

Note:
    Requires ``smbus2`` for I2C access. The sensor register protocol uses
    a status byte (data[0] == 0x08) to indicate valid measurement data.
"""

import time

from smbus2 import SMBus

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class ELTCO2: """I2C driver for the ELT CO2 sensor module. Reads CO2 concentration from the sensor via I2C register reads. Supports sleep/wake power management and manual/automatic calibration modes. Implements a 3-retry mechanism for CO2 readings. Attributes: I2C_ADDRESS: I2C slave address of the sensor. """ I2C_ADDRESS = 0x31 _SENSOR_REG = 0x52 _SLEEP = 0x53 _WAKEUP = 0x57 _CLEAR_CALIB = 0x43 _START_MCDL = 0x4D _END_MCDL = 0x45 _START_ACDL = 0x41 _END_ACDL = 0x45
[docs] def __init__(self, address: int = 0x31, device_number: int = 0) -> None: time.sleep(0.5) self.bus = SMBus(device_number) self.I2C_ADDRESS = address time.sleep(1)
[docs] def read_byte_data(self, reg: int) -> int: return self.bus.read_byte_data(self.I2C_ADDRESS, reg)
[docs] def write_byte_data(self, reg: int, value: int) -> None: return self.bus.write_byte_data(self.I2C_ADDRESS, reg, value)
[docs] def writeReg16Bit(self, reg: int, value: int) -> None: val = [(value & 0xFF), ((value >> 8) & 0xFF)] # 0-> LSB | 1-> MSB self.bus.write_i2c_block_data(self.I2C_ADDRESS, reg, val) time.sleep(0.1)
[docs] def readReg16Bit(self, reg: int) -> int: # Returned value is a list of 16 bytes block = self.bus.read_i2c_block_data(self.I2C_ADDRESS, reg, 2) # 0 -> LSB | 1 -> MSB value = block[0] value |= block[1] << 8 return value
def _twos_comp(self, val: int, bits: int) -> int: if (val & (1 << (bits - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255 val = val - (1 << bits) # compute negative value return val
[docs] def getCO2(self) -> float | int: value = 0.0 counter = 3 while counter > 0: context_logger.debug_with_context("ELT_CO2", f"CO2counter :{counter}") try: data = self.bus.read_i2c_block_data(self.I2C_ADDRESS, self._SENSOR_REG, 7) if data[0] == 0x08: value = data[1] * 255 + data[2] counter = 0 else: counter += -1 time.sleep(0.1) except Exception as e: counter += -1 context_logger.error_with_context("ELT_CO2", f"getCO2: {e}") return value
[docs] def sleep(self) -> None: self.write_byte_data(self.I2C_ADDRESS, 0x00, self._SLEEP)
[docs] def wakeup(self) -> None: self.write_byte_data(self.I2C_ADDRESS, 0x00, self._WAKEUP)
[docs] def clear_calib(self) -> None: self.write_byte_data(self.I2C_ADDRESS, 0x00, self._CLEAR_CALIB)
[docs] def start_mcdl(self) -> None: self.write_byte_data(self.I2C_ADDRESS, 0x00, self._START_MCDL)
[docs] def end_mcdl(self) -> None: self.write_byte_data(self.I2C_ADDRESS, 0x00, self._END_MCDL)
[docs] def start_acdl(self) -> None: self.write_byte_data(self.I2C_ADDRESS, 0x00, self._START_ACDL)
[docs] def end_acdl(self) -> None: self.write_byte_data(self.I2C_ADDRESS, 0x00, self._END_ACDL)
if __name__ == "__main__": try: elt = ELTCO2() while 1: elt.getCO2() time.sleep(2) except Exception as e: context_logger.error_with_context("ELT_CO2", f"main: {e}")