"""CO2 sensor wrapper.
Abstracts over CO2 hardware drivers -- ELTCO2 and SCD40 -- providing a
unified interface for carbon dioxide concentration, temperature, and humidity
readings through the GenericSensor contract.
"""
import json
import os
import time
from drivers.ELTCO2.ELTCO2 import ELTCO2
from drivers.gpio import gpio
from drivers.SCD40.SCD40 import SCD40
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzCO2(GenericSensor):
"""Wrapper for CO2 sensors.
Manages initialisation, periodic reading, and value aggregation for CO2
sensors. Supports ELT CO2 (I2C) and SCD40 (I2C) drivers, selected by
the part number in the Gateway configuration.
Attributes:
configuration: List of sensor config dicts from the Gateway.
v_co2: Nested list holding accumulated readings per sensor/parameter.
ELT_CO2: ELTCO2 driver instance, or None if not configured.
SCD_CO2: SCD40 driver instance, or None if not configured.
"""
[docs]
def __init__(self) -> None:
"""Initialise the OzCO2 wrapper with default state."""
super().__init__()
self.configuration: dict = {}
self.v_co2: list = []
self.ELT_CO2: ELTCO2 | None = None
self.SCD_CO2: SCD40 | None = None
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialise all CO2 sensors listed in the Gateway config.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Mutable dict updated with per-sensor init status.
Returns:
True if at least one sensor initialised successfully.
"""
self.configuration = config
context_logger.debug_with_context("CO2", f"CO2 Config: {self.configuration}")
_success = False
for sensor in self.configuration:
try:
if self.partNumber in sensor:
sensor["init"] = self.initializeSensor(sensor)
_success = True # send confirmation to main file
except Exception as e:
context_logger.error_with_context("CO2", f"Initialize : {e}")
sensor["init"] = 0
self.putInitvalues(init_value)
return _success
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialise a single CO2 sensor by part number.
Creates the appropriate hardware driver, takes initial readings with
retries, and stores bootstrap values in ``v_co2``.
Args:
sensor: Single sensor configuration dict containing part number,
enable flag, GPIO settings, and parameter list.
Returns:
1 if the sensor was initialised successfully, 0 otherwise.
"""
value = []
_success = False
gpio.select_I2C(sensor[self.partNumber])
if sensor[self.partNumber] == 53 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
port = 0
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = int(sensor["gpio"]["port"])
self.ELT_CO2 = ELTCO2(device_number=port)
for param in parameters:
if param[self.parameter] == 1:
val = 0.0
count = 3
while count > 0:
try:
val = self.getCO2(sensor[self.partNumber], param[self.parameter])
if val > 0:
count = 0
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = True
else:
count = count - 1
time.sleep(2)
except Exception as e:
context_logger.error_with_context("CO2", f"initializeSensor 53: {e}")
count = count - 1
_success = False
time.sleep(2)
if sensor[self.partNumber] == 54 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
port = 0
asc = False
freset = False
debug = False
alt = 0
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = int(sensor["gpio"]["port"])
if "alt" in sensor["gpio"]:
alt = sensor["gpio"]["alt"]
if "asc" in sensor["gpio"]:
asc = True if sensor["gpio"]["asc"] == 1 else False
if "freset" in sensor["gpio"]:
freset = True if sensor["gpio"]["freset"] == 1 else False
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
self.SCD_CO2 = SCD40()
self.SCD_CO2.DEBUG = debug
if freset:
self.SCD_CO2.factoryReset() # Factory Reset
self.SCD_CO2.initializeSensor(asc, alt)
for param in parameters:
if param[self.parameter] == 1:
val = 0.0
count = 3
while count > 0:
try:
val = self.getCO2(sensor[self.partNumber], param[self.parameter])
if val > 0:
count = 0
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = True
else:
count = count - 1
time.sleep(2)
except Exception as e:
context_logger.error_with_context("CO2", f"initializeSensor 54_1: {e}")
count = count - 1
_success = False
elif param[self.parameter] == 2:
val = 0.0
count = 3
while count > 0:
try:
val = self.getCO2(sensor[self.partNumber], param[self.parameter])
if val > 0:
count = 0
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = True
else:
count = count - 1
time.sleep(2)
except Exception as e:
context_logger.error_with_context("CO2", f"initializeSensor 54_2: {e}")
count = count - 1
_success = False
if param[self.parameter] == 3:
val = 0.0
count = 3
while count > 0:
try:
val = self.getCO2(sensor[self.partNumber], param[self.parameter])
if val > 0:
count = 0
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = True
else:
count = count - 1
time.sleep(2)
except Exception as e:
context_logger.error_with_context("CO2", f"initializeSensor 54_3: {e}")
count = count - 1
_success = False
self.v_co2.append(value)
return int(_success)
[docs]
def getCO2(self, partNo: int, pm: int) -> float | int:
"""Read a CO2-related measurement from the correct driver.
Args:
partNo: Hardware part number identifying the sensor model.
pm: Parameter identifier (1=CO2 ppm, 2=temperature, 3=humidity).
Returns:
The measured value, or 0.0 if unsupported.
"""
gpio.select_I2C(partNo)
if partNo == 53:
if pm == 1:
val = self.ELT_CO2.getCO2()
context_logger.info_with_context("CO2", f"CO2 is : {val} ppm")
return val
if partNo == 54:
if pm == 1:
val = self.SCD_CO2.getCO2(True)
context_logger.info_with_context("CO2", f"CO2 is : {val[0]} ppm")
return val[0]
if pm == 2:
val = self.SCD_CO2.getCO2(False)
context_logger.info_with_context("CO2", f"CO2 temperature is : {val[1]} °C")
return val[1]
if pm == 3:
val = self.SCD_CO2.getCO2(False)
context_logger.info_with_context("CO2", f"CO2 humidity is : {val[2]} %")
return val[2]
return 0.0
[docs]
def findKeyinList(self, keys: list[str], sensor: dict) -> bool:
"""Check whether any parameter short-code in the sensor matches the given keys.
Args:
keys: List of short-code prefixes to match against.
sensor: Sensor config dict containing a ``"parameters"`` list.
Returns:
True if at least one parameter short-code matches a key.
"""
for parameter in sensor["parameters"]:
if "_" in parameter["sc"]:
if any(parameter["sc"][1:3] in s for s in keys):
return True
if parameter["sc"][:2] in keys:
return True
return False
[docs]
def getSensorReading(self, calibrationKeys: list[str] | None = None) -> dict:
"""Read current values from all initialised CO2 sensors.
Applies sensitivity and correction offsets, accumulates values for
later averaging, and returns real-time data keyed by short-code.
Args:
calibrationKeys: Optional list of short-code keys to restrict
which parameters are read.
Returns:
Dict mapping parameter short-codes to their latest readings.
"""
data = {}
if calibrationKeys is None:
calibrationKeys = []
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
if len(calibrationKeys) == 0 or self.findKeyinList(calibrationKeys, sensor):
for Y, parameters in enumerate(sensor["parameters"]):
try:
value = self.getCO2(sensor[self.partNumber], parameters[self.parameter])
if (value is not None) and value >= 0:
if len(calibrationKeys) == 0:
value = round(
((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)),
2,
)
self.v_co2[X][Y]["value"].append(value)
self.v_co2[X][Y]["count"] += 1
data[parameters["sc"]] = value
except Exception as e:
# self.v_co2[X][Y]['value'].append(self.v_co2[X][Y]['oldvalue'])
# self.v_co2[X][Y]['count'] +=1
context_logger.error_with_context(
"CO2",
f"getSensorReading failed at [{X}][{Y}] - {parameters['sc']}: {e}",
)
return data
[docs]
def putSensorValue(self, value: dict, calibrationKeys: list[str] | None = None) -> dict:
"""Flush accumulated CO2 readings into the output dict and reset counters.
Args:
value: Mutable output dict to populate with aggregated readings.
calibrationKeys: Optional list of short-code keys to restrict
which parameters are flushed.
Returns:
The updated output dict.
"""
if calibrationKeys is None:
calibrationKeys = []
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
if len(calibrationKeys) == 0 or self.findKeyinList(calibrationKeys, sensor):
for Y, parameters in enumerate(sensor["parameters"]):
try:
value[parameters["sc"]] = self.v_co2[X][Y]["value"]
self.v_co2[X][Y]["value"] = []
self.v_co2[X][Y]["count"] = 0
except Exception as e:
context_logger.error_with_context(
"CO2",
f"putSensorValue failed at [{X}][{Y}] - {parameters['sc']}: {e}",
)
return value
[docs]
def putInitvalues(self, value: dict) -> dict:
"""Record each sensor's init status into the shared init-value dict.
Args:
value: Mutable dict updated in-place with a ``"co2"`` key.
Returns:
The updated init-value dict.
"""
_sensor_init = []
for sensor in self.configuration:
_sensor_init.append(sensor["init"])
value.update({"co2": _sensor_init})
return value
# Example code
# python3 -m OzWrapper.OzCO2.OzCO2
if __name__ == "__main__":
import os
sensor_string = "co2"
file_string = "co2.config.json"
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, file_string)
with open(file_name) as configFile:
sensor = configFile.read()
sensorConfig = json.loads(sensor)
context_logger.info_with_context("co2", f"Sensor Config: {sensorConfig}")
sensor = OzCO2()
sensor.initialize(sensorConfig[sensor_string], {})
for _ in range(0, 4):
sensor.getSensorReading()
data = {}
context_logger.info_with_context("co2", f"Sensor Values: {sensor.putSensorValue(data)}")