Source code for OzWrapper.OzCO.OzCO

"""Carbon monoxide (CO) sensor wrapper.

Provides a stub implementation for CO sensor readings through the
GenericSensor contract.  Currently uses hardcoded test values as the
hardware driver has not yet been integrated.
"""

import json

from SensorBase.SensorBase import GenericSensor


[docs] class OzCO(GenericSensor): """Wrapper for carbon monoxide (CO) sensors. Currently a stub implementation with hardcoded values. Designed to be extended with a real CO sensor driver once hardware is available. Attributes: sensor_config: List of sensor config dicts from the Gateway. v_co: Nested list holding accumulated readings per sensor/parameter. """ sensor_config = {} v_co = []
[docs] def __init__(self): """Initialise the OzCO wrapper with default state.""" super().__init__()
[docs] def initialize(self, config: json): """Initialise all CO sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. Returns: True (always, as this is a stub implementation). """ # TODO: add nA_per_ppm and offset self.sensor_config = config print("[CO] Initialize CO sensor") for sensor in self.sensor_config: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) # print("Sensor => {}".format(self.sensor_config)) # print("v_co => {}".format(json.dumps(self.v_co))) return True # send confirmation to main file
[docs] def initializeSensor(self, sensor: json): """Initialise a single CO sensor by part number. Args: sensor: Single sensor configuration dict containing part number, enable flag, and parameter list. Returns: 1 if the sensor was initialised successfully. """ if sensor[self.partNumber] == 86 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] value = [] for param in parameters: if param[self.parameter] == 1: val = self.getCO(sensor[self.partNumber], param[self.parameter]) oldVal = val _val = {"value": val, "oldvalue": oldVal, "count": 1} value.append(_val) self.v_co.append(value) return 1
[docs] def getCO(self, partNo: int, pm: int): """Read a CO measurement (stub returning hardcoded value). Args: partNo: Hardware part number identifying the sensor model. pm: Parameter identifier (1=CO concentration). Returns: Hardcoded CO value for testing, or None if unsupported. """ if partNo == 86: if pm == 1: val = 86.23 print(f"CO => {val}") return val
[docs] def getSensorReading(self): """Read current values from all initialised CO sensors. Accumulates values for later averaging and returns real-time data. Returns: Dict mapping parameter short-codes to their latest readings. """ data = {} for X, sensor in enumerate(self.sensor_config): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): try: value = self.getCO(sensor[self.partNumber], parameters[self.parameter]) self.v_co[X][Y]["value"] += value self.v_co[X][Y]["count"] += 1 data[parameters["sc"]] = value except Exception as e: print(f"[ERR_CO] getSensorReading: {e}") return data
[docs] def putSensorValue(self, value: json) -> json: """Compute averages and flush CO readings into the output dict. Applies sensitivity and correction offsets to the averaged value. Args: value: Mutable output dict to populate with averaged readings. Returns: The updated output dict. """ for X, sensor in enumerate(self.sensor_config): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): self.v_co[X][Y]["value"] = self.v_co[X][Y]["value"] / int(self.v_co[X][Y]["count"]) value[parameters["sc"]] = round( ((self.v_co[X][Y]["value"] * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)), 2, ) self.v_co[X][Y]["value"] = 0 self.v_co[X][Y]["count"] = 0 return value