Source code for OzWrapper.OzCO2.OzCO2

"""CO2 sensor wrapper.

Abstracts over CO2 hardware drivers -- ELTCO2 and SCD40 -- providing a
unified interface for carbon dioxide concentration, temperature, and humidity
readings through the GenericSensor contract.
"""

import json
import os
import time

from drivers.ELTCO2.ELTCO2 import ELTCO2
from drivers.gpio import gpio
from drivers.SCD40.SCD40 import SCD40
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzCO2(GenericSensor): """Wrapper for CO2 sensors. Manages initialisation, periodic reading, and value aggregation for CO2 sensors. Supports ELT CO2 (I2C) and SCD40 (I2C) drivers, selected by the part number in the Gateway configuration. Attributes: configuration: List of sensor config dicts from the Gateway. v_co2: Nested list holding accumulated readings per sensor/parameter. ELT_CO2: ELTCO2 driver instance, or None if not configured. SCD_CO2: SCD40 driver instance, or None if not configured. """
[docs] def __init__(self) -> None: """Initialise the OzCO2 wrapper with default state.""" super().__init__() self.configuration: dict = {} self.v_co2: list = [] self.ELT_CO2: ELTCO2 | None = None self.SCD_CO2: SCD40 | None = None
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialise all CO2 sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. init_value: Mutable dict updated with per-sensor init status. Returns: True if at least one sensor initialised successfully. """ self.configuration = config context_logger.debug_with_context("CO2", f"CO2 Config: {self.configuration}") _success = False for sensor in self.configuration: try: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) _success = True # send confirmation to main file except Exception as e: context_logger.error_with_context("CO2", f"Initialize : {e}") sensor["init"] = 0 self.putInitvalues(init_value) return _success
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialise a single CO2 sensor by part number. Creates the appropriate hardware driver, takes initial readings with retries, and stores bootstrap values in ``v_co2``. Args: sensor: Single sensor configuration dict containing part number, enable flag, GPIO settings, and parameter list. Returns: 1 if the sensor was initialised successfully, 0 otherwise. """ value = [] _success = False gpio.select_I2C(sensor[self.partNumber]) if sensor[self.partNumber] == 53 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] port = 0 if "gpio" in sensor: if "port" in sensor["gpio"]: port = int(sensor["gpio"]["port"]) self.ELT_CO2 = ELTCO2(device_number=port) for param in parameters: if param[self.parameter] == 1: val = 0.0 count = 3 while count > 0: try: val = self.getCO2(sensor[self.partNumber], param[self.parameter]) if val > 0: count = 0 oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = True else: count = count - 1 time.sleep(2) except Exception as e: context_logger.error_with_context("CO2", f"initializeSensor 53: {e}") count = count - 1 _success = False time.sleep(2) if sensor[self.partNumber] == 54 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] port = 0 asc = False freset = False debug = False alt = 0 if "gpio" in sensor: if "port" in sensor["gpio"]: port = int(sensor["gpio"]["port"]) if "alt" in sensor["gpio"]: alt = sensor["gpio"]["alt"] if "asc" in sensor["gpio"]: asc = True if sensor["gpio"]["asc"] == 1 else False if "freset" in sensor["gpio"]: freset = True if sensor["gpio"]["freset"] == 1 else False if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False self.SCD_CO2 = SCD40() self.SCD_CO2.DEBUG = debug if freset: self.SCD_CO2.factoryReset() # Factory Reset self.SCD_CO2.initializeSensor(asc, alt) for param in parameters: if param[self.parameter] == 1: val = 0.0 count = 3 while count > 0: try: val = self.getCO2(sensor[self.partNumber], param[self.parameter]) if val > 0: count = 0 oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = True else: count = count - 1 time.sleep(2) except Exception as e: context_logger.error_with_context("CO2", f"initializeSensor 54_1: {e}") count = count - 1 _success = False elif param[self.parameter] == 2: val = 0.0 count = 3 while count > 0: try: val = self.getCO2(sensor[self.partNumber], param[self.parameter]) if val > 0: count = 0 oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = True else: count = count - 1 time.sleep(2) except Exception as e: context_logger.error_with_context("CO2", f"initializeSensor 54_2: {e}") count = count - 1 _success = False if param[self.parameter] == 3: val = 0.0 count = 3 while count > 0: try: val = self.getCO2(sensor[self.partNumber], param[self.parameter]) if val > 0: count = 0 oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = True else: count = count - 1 time.sleep(2) except Exception as e: context_logger.error_with_context("CO2", f"initializeSensor 54_3: {e}") count = count - 1 _success = False self.v_co2.append(value) return int(_success)
[docs] def getCO2(self, partNo: int, pm: int) -> float | int: """Read a CO2-related measurement from the correct driver. Args: partNo: Hardware part number identifying the sensor model. pm: Parameter identifier (1=CO2 ppm, 2=temperature, 3=humidity). Returns: The measured value, or 0.0 if unsupported. """ gpio.select_I2C(partNo) if partNo == 53: if pm == 1: val = self.ELT_CO2.getCO2() context_logger.info_with_context("CO2", f"CO2 is : {val} ppm") return val if partNo == 54: if pm == 1: val = self.SCD_CO2.getCO2(True) context_logger.info_with_context("CO2", f"CO2 is : {val[0]} ppm") return val[0] if pm == 2: val = self.SCD_CO2.getCO2(False) context_logger.info_with_context("CO2", f"CO2 temperature is : {val[1]} °C") return val[1] if pm == 3: val = self.SCD_CO2.getCO2(False) context_logger.info_with_context("CO2", f"CO2 humidity is : {val[2]} %") return val[2] return 0.0
[docs] def findKeyinList(self, keys: list[str], sensor: dict) -> bool: """Check whether any parameter short-code in the sensor matches the given keys. Args: keys: List of short-code prefixes to match against. sensor: Sensor config dict containing a ``"parameters"`` list. Returns: True if at least one parameter short-code matches a key. """ for parameter in sensor["parameters"]: if "_" in parameter["sc"]: if any(parameter["sc"][1:3] in s for s in keys): return True if parameter["sc"][:2] in keys: return True return False
[docs] def getSensorReading(self, calibrationKeys: list[str] | None = None) -> dict: """Read current values from all initialised CO2 sensors. Applies sensitivity and correction offsets, accumulates values for later averaging, and returns real-time data keyed by short-code. Args: calibrationKeys: Optional list of short-code keys to restrict which parameters are read. Returns: Dict mapping parameter short-codes to their latest readings. """ data = {} if calibrationKeys is None: calibrationKeys = [] for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: if len(calibrationKeys) == 0 or self.findKeyinList(calibrationKeys, sensor): for Y, parameters in enumerate(sensor["parameters"]): try: value = self.getCO2(sensor[self.partNumber], parameters[self.parameter]) if (value is not None) and value >= 0: if len(calibrationKeys) == 0: value = round( ((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)), 2, ) self.v_co2[X][Y]["value"].append(value) self.v_co2[X][Y]["count"] += 1 data[parameters["sc"]] = value except Exception as e: # self.v_co2[X][Y]['value'].append(self.v_co2[X][Y]['oldvalue']) # self.v_co2[X][Y]['count'] +=1 context_logger.error_with_context( "CO2", f"getSensorReading failed at [{X}][{Y}] - {parameters['sc']}: {e}", ) return data
[docs] def putSensorValue(self, value: dict, calibrationKeys: list[str] | None = None) -> dict: """Flush accumulated CO2 readings into the output dict and reset counters. Args: value: Mutable output dict to populate with aggregated readings. calibrationKeys: Optional list of short-code keys to restrict which parameters are flushed. Returns: The updated output dict. """ if calibrationKeys is None: calibrationKeys = [] for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: if len(calibrationKeys) == 0 or self.findKeyinList(calibrationKeys, sensor): for Y, parameters in enumerate(sensor["parameters"]): try: value[parameters["sc"]] = self.v_co2[X][Y]["value"] self.v_co2[X][Y]["value"] = [] self.v_co2[X][Y]["count"] = 0 except Exception as e: context_logger.error_with_context( "CO2", f"putSensorValue failed at [{X}][{Y}] - {parameters['sc']}: {e}", ) return value
[docs] def putInitvalues(self, value: dict) -> dict: """Record each sensor's init status into the shared init-value dict. Args: value: Mutable dict updated in-place with a ``"co2"`` key. Returns: The updated init-value dict. """ _sensor_init = [] for sensor in self.configuration: _sensor_init.append(sensor["init"]) value.update({"co2": _sensor_init}) return value
# Example code # python3 -m OzWrapper.OzCO2.OzCO2 if __name__ == "__main__": import os sensor_string = "co2" file_string = "co2.config.json" dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, file_string) with open(file_name) as configFile: sensor = configFile.read() sensorConfig = json.loads(sensor) context_logger.info_with_context("co2", f"Sensor Config: {sensorConfig}") sensor = OzCO2() sensor.initialize(sensorConfig[sensor_string], {}) for _ in range(0, 4): sensor.getSensorReading() data = {} context_logger.info_with_context("co2", f"Sensor Values: {sensor.putSensorValue(data)}")