Source code for OzWrapper.OzCH4.OzCH4

"""Methane (CH4) sensor wrapper.

Abstracts over the ELT CH4 UART driver to provide a unified interface for
methane concentration readings through the GenericSensor contract.
"""

import json
import os

from drivers.Elt_ch4.Elt_ch4 import Elt_ch4
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzCH4(GenericSensor): """Wrapper for methane (CH4) sensors. Manages initialisation, periodic reading, and value aggregation for CH4 sensors using the ELT CH4 UART driver. Attributes: configuration: List of sensor config dicts from the Gateway. v_ch4: Nested list holding accumulated readings per sensor/parameter. CH4: ELT CH4 driver instance, or None if not configured. """
[docs] def __init__(self) -> None: """Initialise the OzCH4 wrapper with default state.""" super().__init__() # initialize mutable attributes on the instance to avoid shared state self.configuration: dict = {} self.v_ch4: list = [] self.CH4 = None
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialise all CH4 sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. init_value: Mutable dict updated with per-sensor init status. Returns: True if at least one sensor initialised successfully. """ self.configuration = config context_logger.info_with_context("CH4", "Initialize Ch4 sensor") _success = False for sensor in self.configuration: try: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) _success = True # send confirmation to main file except Exception as e: context_logger.error_with_context("CH4", f"Initialize : {e}") sensor["init"] = 0 self.putInitvalues(init_value) return _success
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialise a single CH4 sensor by part number. Creates the ELT CH4 UART driver, takes an initial reading, and stores bootstrap values in ``v_ch4``. Args: sensor: Single sensor configuration dict containing part number, enable flag, GPIO/UART settings, and parameter list. Returns: 1 if the sensor was initialised successfully, 0 otherwise. """ value = [] _success = False if sensor[self.partNumber] == 91 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] port = "/dev/ttyAMA1" baud = 9600 if "gpio" in sensor: if "port" in sensor["gpio"]: port = sensor["gpio"]["port"] if "baud" in sensor["gpio"]: baud = sensor["gpio"]["baud"] self.CH4 = Elt_ch4(port, baud) for param in parameters: if param[self.parameter] == 1: val = 0.0 try: val = self.getCH4(sensor[self.partNumber], param[self.parameter]) oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = True except Exception as e: context_logger.error_with_context("CH4", f"initializeSensor: {e}") _success = False self.v_ch4.append(value) return int(_success)
[docs] def getCH4(self, partNo: int, pm: int) -> float: """Read a CH4 measurement from the correct driver. Args: partNo: Hardware part number identifying the sensor model. pm: Parameter identifier (1=CH4 concentration). Returns: The measured value as a float, or 0.0 if unsupported. """ if partNo == 91: if pm == 1: val = self.CH4.getSensorData() context_logger.info_with_context("CH4", f"CH4 is : {val}") return val return 0.0
[docs] def getSensorReading(self) -> dict: """Read current values from all initialised CH4 sensors. Applies sensitivity and correction offsets, accumulates values for later averaging, and returns real-time data keyed by short-code. Returns: Dict mapping parameter short-codes to their latest readings. """ data = {} for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): try: value = self.getCH4(sensor[self.partNumber], parameters[self.parameter]) if value is not None: value = round( ((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)), 2, ) self.v_ch4[X][Y]["value"].append(value) self.v_ch4[X][Y]["count"] += 1 data[parameters["sc"]] = value else: context_logger.info_with_context( "CH4", f"getSensorReading: {sensor[self.partNumber]} - {parameters[self.parameter]} - No value", ) except Exception as e: # self.v_ch4[X][Y]['value'] += self.v_ch4[X][Y]['oldvalue'] # self.v_ch4[X][Y]['count'] +=1 context_logger.error_with_context( "CH4", f"getsensorReading failed at [{X}][{Y}] - {parameters['sc']}: {e}", ) return data
[docs] def putSensorValue(self, value: dict) -> dict: """Flush accumulated CH4 readings into the output dict and reset counters. Args: value: Mutable output dict to populate with aggregated readings. Returns: The updated output dict. """ for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): try: value[parameters["sc"]] = self.v_ch4[X][Y]["value"] self.v_ch4[X][Y]["value"] = [] self.v_ch4[X][Y]["count"] = 0 except Exception as e: context_logger.error_with_context( "CH4", f"putSensorValue failed at [{X}][{Y}] - {parameters['sc']}: {e}", ) return value
[docs] def putInitvalues(self, value: dict) -> dict: """Record each sensor's init status into the shared init-value dict. Args: value: Mutable dict updated in-place with a ``"ch4"`` key. Returns: The updated init-value dict. """ _sensor_init = [] for sensor in self.configuration: _sensor_init.append(sensor["init"]) value.update({"ch4": _sensor_init}) return value
# Example code # python3 -m OzWrapper.OzCH4.OzCH4 if __name__ == "__main__": import os sensor_string = "ch4" file_string = "ch4.config.json" dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, file_string) with open(file_name) as configFile: sensor = configFile.read() sensorConfig = json.loads(sensor) context_logger.info_with_context("ch4", f"Sensor Config: {sensorConfig}") sensor = OzCH4() sensor.initialize(sensorConfig[sensor_string]) for _ in range(0, 4): sensor.getSensorReading() data = {} context_logger.info_with_context("ch4", f"Sensor Values: {sensor.putSensorValue(data)}")