"""Methane (CH4) sensor wrapper.
Abstracts over the ELT CH4 UART driver to provide a unified interface for
methane concentration readings through the GenericSensor contract.
"""
import json
import os
from drivers.Elt_ch4.Elt_ch4 import Elt_ch4
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzCH4(GenericSensor):
"""Wrapper for methane (CH4) sensors.
Manages initialisation, periodic reading, and value aggregation for CH4
sensors using the ELT CH4 UART driver.
Attributes:
configuration: List of sensor config dicts from the Gateway.
v_ch4: Nested list holding accumulated readings per sensor/parameter.
CH4: ELT CH4 driver instance, or None if not configured.
"""
[docs]
def __init__(self) -> None:
"""Initialise the OzCH4 wrapper with default state."""
super().__init__()
# initialize mutable attributes on the instance to avoid shared state
self.configuration: dict = {}
self.v_ch4: list = []
self.CH4 = None
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialise all CH4 sensors listed in the Gateway config.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Mutable dict updated with per-sensor init status.
Returns:
True if at least one sensor initialised successfully.
"""
self.configuration = config
context_logger.info_with_context("CH4", "Initialize Ch4 sensor")
_success = False
for sensor in self.configuration:
try:
if self.partNumber in sensor:
sensor["init"] = self.initializeSensor(sensor)
_success = True # send confirmation to main file
except Exception as e:
context_logger.error_with_context("CH4", f"Initialize : {e}")
sensor["init"] = 0
self.putInitvalues(init_value)
return _success
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialise a single CH4 sensor by part number.
Creates the ELT CH4 UART driver, takes an initial reading, and
stores bootstrap values in ``v_ch4``.
Args:
sensor: Single sensor configuration dict containing part number,
enable flag, GPIO/UART settings, and parameter list.
Returns:
1 if the sensor was initialised successfully, 0 otherwise.
"""
value = []
_success = False
if sensor[self.partNumber] == 91 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
port = "/dev/ttyAMA1"
baud = 9600
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = sensor["gpio"]["port"]
if "baud" in sensor["gpio"]:
baud = sensor["gpio"]["baud"]
self.CH4 = Elt_ch4(port, baud)
for param in parameters:
if param[self.parameter] == 1:
val = 0.0
try:
val = self.getCH4(sensor[self.partNumber], param[self.parameter])
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = True
except Exception as e:
context_logger.error_with_context("CH4", f"initializeSensor: {e}")
_success = False
self.v_ch4.append(value)
return int(_success)
[docs]
def getCH4(self, partNo: int, pm: int) -> float:
"""Read a CH4 measurement from the correct driver.
Args:
partNo: Hardware part number identifying the sensor model.
pm: Parameter identifier (1=CH4 concentration).
Returns:
The measured value as a float, or 0.0 if unsupported.
"""
if partNo == 91:
if pm == 1:
val = self.CH4.getSensorData()
context_logger.info_with_context("CH4", f"CH4 is : {val}")
return val
return 0.0
[docs]
def getSensorReading(self) -> dict:
"""Read current values from all initialised CH4 sensors.
Applies sensitivity and correction offsets, accumulates values for
later averaging, and returns real-time data keyed by short-code.
Returns:
Dict mapping parameter short-codes to their latest readings.
"""
data = {}
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
for Y, parameters in enumerate(sensor["parameters"]):
try:
value = self.getCH4(sensor[self.partNumber], parameters[self.parameter])
if value is not None:
value = round(
((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)),
2,
)
self.v_ch4[X][Y]["value"].append(value)
self.v_ch4[X][Y]["count"] += 1
data[parameters["sc"]] = value
else:
context_logger.info_with_context(
"CH4",
f"getSensorReading: {sensor[self.partNumber]} - {parameters[self.parameter]} - No value",
)
except Exception as e:
# self.v_ch4[X][Y]['value'] += self.v_ch4[X][Y]['oldvalue']
# self.v_ch4[X][Y]['count'] +=1
context_logger.error_with_context(
"CH4",
f"getsensorReading failed at [{X}][{Y}] - {parameters['sc']}: {e}",
)
return data
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Flush accumulated CH4 readings into the output dict and reset counters.
Args:
value: Mutable output dict to populate with aggregated readings.
Returns:
The updated output dict.
"""
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
for Y, parameters in enumerate(sensor["parameters"]):
try:
value[parameters["sc"]] = self.v_ch4[X][Y]["value"]
self.v_ch4[X][Y]["value"] = []
self.v_ch4[X][Y]["count"] = 0
except Exception as e:
context_logger.error_with_context(
"CH4",
f"putSensorValue failed at [{X}][{Y}] - {parameters['sc']}: {e}",
)
return value
[docs]
def putInitvalues(self, value: dict) -> dict:
"""Record each sensor's init status into the shared init-value dict.
Args:
value: Mutable dict updated in-place with a ``"ch4"`` key.
Returns:
The updated init-value dict.
"""
_sensor_init = []
for sensor in self.configuration:
_sensor_init.append(sensor["init"])
value.update({"ch4": _sensor_init})
return value
# Example code
# python3 -m OzWrapper.OzCH4.OzCH4
if __name__ == "__main__":
import os
sensor_string = "ch4"
file_string = "ch4.config.json"
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, file_string)
with open(file_name) as configFile:
sensor = configFile.read()
sensorConfig = json.loads(sensor)
context_logger.info_with_context("ch4", f"Sensor Config: {sensorConfig}")
sensor = OzCH4()
sensor.initialize(sensorConfig[sensor_string])
for _ in range(0, 4):
sensor.getSensorReading()
data = {}
context_logger.info_with_context("ch4", f"Sensor Values: {sensor.putSensorValue(data)}")