#!/usr/bin/python
"""Driver for the ELT CO2 I2C carbon dioxide sensor.
Provides I2C communication with the ELT (Environmental Sensor Technology)
CO2 sensor module. The driver reads CO2 concentration from the sensor's
internal registers using a command-response protocol over I2C.
The sensor supports sleep/wake control, manual and automatic calibration
(MCDL/ACDL), and returns CO2 concentration in ppm.
Hardware:
- Interface: I2C
- Default address: 0x31
- Supply: 3.3V / 5V
- CO2 range: 0-5000 ppm (typical)
- Response time: <60 seconds (T90)
Typical usage::
sensor = ELTCO2(address=0x31, device_number=0)
co2_ppm = sensor.getCO2()
Note:
Requires ``smbus2`` for I2C access. The sensor register protocol uses
a status byte (data[0] == 0x08) to indicate valid measurement data.
"""
import time
from smbus2 import SMBus
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class ELTCO2:
"""I2C driver for the ELT CO2 sensor module.
Reads CO2 concentration from the sensor via I2C register reads.
Supports sleep/wake power management and manual/automatic calibration
modes. Implements a 3-retry mechanism for CO2 readings.
Attributes:
I2C_ADDRESS: I2C slave address of the sensor.
"""
I2C_ADDRESS = 0x31
_SENSOR_REG = 0x52
_SLEEP = 0x53
_WAKEUP = 0x57
_CLEAR_CALIB = 0x43
_START_MCDL = 0x4D
_END_MCDL = 0x45
_START_ACDL = 0x41
_END_ACDL = 0x45
[docs]
def __init__(self, address: int = 0x31, device_number: int = 0) -> None:
time.sleep(0.5)
self.bus = SMBus(device_number)
self.I2C_ADDRESS = address
time.sleep(1)
[docs]
def read_byte_data(self, reg: int) -> int:
return self.bus.read_byte_data(self.I2C_ADDRESS, reg)
[docs]
def write_byte_data(self, reg: int, value: int) -> None:
return self.bus.write_byte_data(self.I2C_ADDRESS, reg, value)
[docs]
def writeReg16Bit(self, reg: int, value: int) -> None:
val = [(value & 0xFF), ((value >> 8) & 0xFF)] # 0-> LSB | 1-> MSB
self.bus.write_i2c_block_data(self.I2C_ADDRESS, reg, val)
time.sleep(0.1)
[docs]
def readReg16Bit(self, reg: int) -> int:
# Returned value is a list of 16 bytes
block = self.bus.read_i2c_block_data(self.I2C_ADDRESS, reg, 2) # 0 -> LSB | 1 -> MSB
value = block[0]
value |= block[1] << 8
return value
def _twos_comp(self, val: int, bits: int) -> int:
if (val & (1 << (bits - 1))) != 0: # if sign bit is set e.g., 8bit: 128-255
val = val - (1 << bits) # compute negative value
return val
[docs]
def getCO2(self) -> float | int:
value = 0.0
counter = 3
while counter > 0:
context_logger.debug_with_context("ELT_CO2", f"CO2counter :{counter}")
try:
data = self.bus.read_i2c_block_data(self.I2C_ADDRESS, self._SENSOR_REG, 7)
if data[0] == 0x08:
value = data[1] * 255 + data[2]
counter = 0
else:
counter += -1
time.sleep(0.1)
except Exception as e:
counter += -1
context_logger.error_with_context("ELT_CO2", f"getCO2: {e}")
return value
[docs]
def sleep(self) -> None:
self.write_byte_data(self.I2C_ADDRESS, 0x00, self._SLEEP)
[docs]
def wakeup(self) -> None:
self.write_byte_data(self.I2C_ADDRESS, 0x00, self._WAKEUP)
[docs]
def clear_calib(self) -> None:
self.write_byte_data(self.I2C_ADDRESS, 0x00, self._CLEAR_CALIB)
[docs]
def start_mcdl(self) -> None:
self.write_byte_data(self.I2C_ADDRESS, 0x00, self._START_MCDL)
[docs]
def end_mcdl(self) -> None:
self.write_byte_data(self.I2C_ADDRESS, 0x00, self._END_MCDL)
[docs]
def start_acdl(self) -> None:
self.write_byte_data(self.I2C_ADDRESS, 0x00, self._START_ACDL)
[docs]
def end_acdl(self) -> None:
self.write_byte_data(self.I2C_ADDRESS, 0x00, self._END_ACDL)
if __name__ == "__main__":
try:
elt = ELTCO2()
while 1:
elt.getCO2()
time.sleep(2)
except Exception as e:
context_logger.error_with_context("ELT_CO2", f"main: {e}")