Source code for OzWrapper.OzOGS.OzOGS

"""Odor/gas sensor (OGS) allocation wrapper for electrochemical and reference analyzers.

Manages a multiplexed array of gas sensors (ADS122U04, Winsen, Cubic, Semeatech,
NevadaNano, HORIBA, Serinus) accessed through UART multiplexer GPIO selection,
providing unified read/calibrate/publish for NO2, O3, CO, SO2, H2S, NO, and O2.
"""

import json
import linecache
import os
import random
import sys
import time

from drivers.ADS122U04 import ADS122U04
from drivers.CubicGeneric.CubicGeneric import CubicGeneric

# import gpio
from drivers.gpio import gpio
from drivers.HORIBA.HORIBA import HORIBA
from drivers.NEVADA.NEVADA import NEVADA
from drivers.Semeatech.Semeatech import Semeatech
from drivers.SemeatechA4.SemeatechA4 import SemeatechA4
from drivers.Serinus import Serinus
from drivers.WinsenO2.WinsenO2 import WinsenO2
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)

# TODO:
# [] GPIO SET PINS 3 pin for IC
"""
    NO2: partNo: 201, sendCode: g3
    O3: partNo: 203, sendCode: g5
    CO: partNo: 202, sendCode: g2
    SO2: partNo: 204, sendCode: g8
    H2S: partNo: 205, sendCode: g6
    NO: partNo: 206, sendCode: g7
"""


[docs] class OzOGSParam: """Container for a single OGS sensor's configuration and driver reference. Attributes: partNo: Part number identifying the sensor type (101-109). port: UART serial port path. baud: Baud rate for the serial connection. address: Multiplexer channel address within an IC. ic: Multiplexer IC index (0 or 1). pga: Programmable gain amplifier setting for ADS122U04. ogs_port: Sensor driver instance (ADS122U04, WinsenO2, etc.), or None. """ partNo = 0 port = "/dev/ttyAMA0" baud = 115200 address = -1 ic = -1 pga = 0 ogs_port = None # OGS Driver Object
[docs] def __init__(self, partNo: int, port: str, baud: int, address: int, ic: int) -> None: """Initialize OGS parameter container. Args: partNo: Part number for this sensor. port: UART serial port path. baud: Baud rate for serial communication. address: Multiplexer channel address. ic: Multiplexer IC index. """ self.partNo = partNo self.port = port self.baud = baud self.address = address self.ic = ic
[docs] def setOGS( self, ogs_port: ADS122U04 | WinsenO2 | CubicGeneric | Semeatech | SemeatechA4 | NEVADA | HORIBA | Serinus, ) -> None: """Assign the sensor driver instance. Args: ogs_port: Initialized sensor driver object for this OGS channel. """ self.ogs_port = ogs_port
[docs] class OzOGS(GenericSensor): """Multiplexed gas sensor wrapper supporting multiple electrochemical and reference analyzers. Manages a UART multiplexer (2 ICs x 4 channels) to address up to 8 gas sensors, handling initialization, GPIO-based channel selection, reading, calibration-aware averaging, and payload assembly. Attributes: AVERAGE_COUNT: Number of readings to average per cycle. GPIO_A: GPIO pin A for multiplexer address selection. GPIO_B: GPIO pin B for multiplexer address selection. GPIO_ENABLE_1: GPIO enable pin for multiplexer IC 0. GPIO_ENABLE_2: GPIO enable pin for multiplexer IC 1. SENSOR_PER_IC: Number of sensor channels per multiplexer IC. configuration: Gas sensor configuration list from the Gateway. v_ogs: Nested list of per-sensor, per-parameter accumulated values. p_ogs: List of OzOGSParam instances (one per configured sensor). """ AVERAGE_COUNT = 1 GPIO_A = os.getenv("GPIO_A", 24) GPIO_B = os.getenv("GPIO_B", 25) GPIO_ENABLE_1 = os.getenv("GPIO_ENABLE_1", 27) GPIO_ENABLE_2 = os.getenv("GPIO_ENABLE_2", 17) SENSOR_PER_IC = os.getenv("SENSOR_PER_IC", 4)
[docs] def __init__(self) -> None: """Initialize OzOGS with empty configuration and sensor lists.""" super().__init__() self.configuration = {} self.v_ogs = [] self.p_ogs = []
[docs] def updateTempForADS(self, sensor: dict) -> None: """Append an internal temperature parameter to ADS122U04 sensor config if missing. Args: sensor: Sensor configuration dict whose parameters list may be extended. """ if len(sensor["parameters"]) == 2: sc = sensor["parameters"][0]["sc"][:-1] + "t" sensor["parameters"].append({"ch": 2, "cr": 0, "pm": 3, "sc": sc, "se": 100})
[docs] def updateTempHumForSemeatech(self, sensor: dict) -> None: """Append temperature and humidity parameters to Semeatech sensor config if missing. Args: sensor: Sensor configuration dict whose parameters list may be extended. """ if len(sensor["parameters"]) == 2: sc = sensor["parameters"][0]["sc"][:-1] + "t" sensor["parameters"].append({"ch": 2, "cr": 0, "pm": 3, "sc": sc, "se": 100}) sc = sensor["parameters"][0]["sc"][:-1] + "h" sensor["parameters"].append({"ch": 3, "cr": 0, "pm": 4, "sc": sc, "se": 100})
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialize all configured gas sensors and set up GPIO multiplexer pins. Args: config: List of OGS sensor configuration dicts from the Gateway. init_value: Dict to populate with per-sensor initialization flags. Returns: True if all sensors were iterated without fatal error. """ self.configuration = config gpio.setup(self.GPIO_A, gpio.OUT) gpio.setup(self.GPIO_B, gpio.OUT) gpio.setup(self.GPIO_ENABLE_1, gpio.OUT) gpio.setup(self.GPIO_ENABLE_2, gpio.OUT) context_logger.info_with_context("OGS", "Initialize OGS Sensor") _success = False try: for sensor in self.configuration: try: if "pn" in sensor: sensor["init"] = self.initializeSensor(sensor) context_logger.info_with_context("OGS", f"Init Done {sensor['pn']} {sensor['init']}") except Exception as e: sensor["init"] = 0 context_logger.error_with_context( "OGS", f"InitializeSensor Failed {sensor['pn']} {sensor['init']} {e}", ) _success = True except Exception as e: context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}") sensor["init"] = 0 _success = False finally: self.putInitvalues(init_value) return _success
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialize a single gas sensor based on its part number. Creates the appropriate driver, selects the multiplexer channel, and takes initial readings for each configured parameter. Part numbers: 101 = ADS122U04, 102 = Winsen, 103 = Cubic, 104 = Semeatech, 105 = SemeatechA4, 106 = NevadaNano, 107 = HORIBA, 109 = Serinus. Args: sensor: Configuration dict with part number, gpio, and parameters. Returns: 1 on success, 0 on failure. """ value = [] _success = False _p_ogs = None pos = -1 """ Partno: 101 -> Normal OGS with ADS122U04 102 -> Winsen 103 -> CUBIC_GENERIC 104 -> Semeatech 105 -> SemeatechA4 106 -> NEVADANANO 107 -> Reference """ if sensor["pn"] == 101 and sensor["en"] == self.baseConfig["oz_enable"]: self.updateTempForADS(sensor) port = "/dev/ttyAMA0" baud = 115200 ic = -1 address = -1 pga = 1 debug = False if "gpio" in sensor: gpioConfig = sensor["gpio"] if "port" in gpioConfig: port = gpioConfig["port"] if "baud" in gpioConfig: baud = gpioConfig["baud"] if "pos" in gpioConfig: pos = gpioConfig["pos"] ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC) else: if "address" in gpioConfig: address = gpioConfig["address"] if "ic" in gpioConfig: ic = gpioConfig["ic"] if "pga" in gpioConfig: pga = gpioConfig["pga"] if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False try: # Create Object for all ogs _p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic) _p_ogs.pga = pga _p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud)) _p_ogs.ogs_port.DEBUG = debug context_logger.info_with_context( "OGS", f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}", ) self.__selectSensor(_p_ogs.ic, _p_ogs.address) self.__initialize(_p_ogs) for param in sensor["parameters"]: try: val = self._getOGS_param_data(_p_ogs, param["ch"]) context_logger.info_with_context( "OGS", f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}", ) _val = {"value": [val], "count": 1} value.append(_val) _success = True except Exception as e: _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context( "OGS", f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}", ) except Exception as e: _success = False _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}") elif sensor["pn"] == 102 and sensor["en"] == self.baseConfig["oz_enable"]: port = "/dev/ttyAMA0" baud = 115200 ic = -1 address = -1 pga = 1 if "gpio" in sensor: gpioConfig = sensor["gpio"] if "port" in gpioConfig: port = gpioConfig["port"] if "baud" in gpioConfig: baud = gpioConfig["baud"] if "pos" in gpioConfig: pos = gpioConfig["pos"] ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC) else: if "address" in gpioConfig: address = gpioConfig["address"] if "ic" in gpioConfig: ic = gpioConfig["ic"] try: # Create Object for all ogs _p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic) _p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud)) context_logger.info_with_context( "OGS", f"PORT: {_p_ogs.port} Baud:{_p_ogs.baud} POS: {pos} IC: {_p_ogs.ic} ADDRESS {_p_ogs.address}", ) self.__selectSensor(_p_ogs.ic, _p_ogs.address) self.__initialize(_p_ogs) for param in sensor["parameters"]: try: val = self._getOGS_param_data(_p_ogs, param["ch"]) context_logger.info_with_context( "OGS", f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}", ) _val = {"value": [val], "count": 1} value.append(_val) _success = True except Exception as e: _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context( "OGS", f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}", ) except Exception as e: _success = False _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}") elif sensor["pn"] == 103 and sensor["en"] == self.baseConfig["oz_enable"]: port = "/dev/ttyAMA0" baud = 115200 ic = -1 address = -1 pga = 1 if "gpio" in sensor: gpioConfig = sensor["gpio"] if "port" in gpioConfig: port = gpioConfig["port"] if "baud" in gpioConfig: baud = gpioConfig["baud"] if "pos" in gpioConfig: pos = gpioConfig["pos"] ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC) else: if "address" in gpioConfig: address = gpioConfig["address"] if "ic" in gpioConfig: ic = gpioConfig["ic"] try: # Create Object for all ogs _p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic) _p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud)) context_logger.info_with_context( "OGS", f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}", ) self.__selectSensor(_p_ogs.ic, _p_ogs.address) self.__initialize(_p_ogs) for param in sensor["parameters"]: try: val = self._getOGS_param_data(_p_ogs, param["ch"]) context_logger.info_with_context( "OGS", f"Init -> PN {sensor[self.partNumber]} POS: {pos} CH:{param[self.parameter]} VAL:{val}", ) _val = {"value": [val], "count": 1} value.append(_val) _success = True except Exception as e: _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context( "OGS", f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}", ) except Exception as e: _success = False _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}") elif sensor["pn"] == 104 and sensor["en"] == self.baseConfig["oz_enable"]: self.updateTempHumForSemeatech(sensor) port = "/dev/ttyAMA0" baud = 115200 ic = -1 address = -1 pga = 1 debug = False if "gpio" in sensor: gpioConfig = sensor["gpio"] if "port" in gpioConfig: port = gpioConfig["port"] if "baud" in gpioConfig: baud = gpioConfig["baud"] if "pos" in gpioConfig: pos = gpioConfig["pos"] ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC) else: if "address" in gpioConfig: address = gpioConfig["address"] if "ic" in gpioConfig: ic = gpioConfig["ic"] if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False try: # Create Object for all ogs _p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic) _p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud)) _p_ogs.ogs_port.DEBUG = debug context_logger.info_with_context( "OGS", f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}", ) self.__selectSensor(_p_ogs.ic, _p_ogs.address) self.__initialize(_p_ogs) for param in sensor["parameters"]: try: val = self._getOGS_param_data(_p_ogs, param["ch"]) context_logger.info_with_context( "OGS", f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}", ) _val = {"value": [val], "count": 1} value.append(_val) _success = True except Exception as e: _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context( "OGS", f"Init Failed PN:{sensor[self.partNumber]} POS::{pos} CH:{param[self.parameter]} - {e}", ) except Exception as e: _success = False _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}") elif sensor["pn"] == 105 and sensor["en"] == self.baseConfig["oz_enable"]: port = "/dev/ttyAMA0" baud = 9600 ic = -1 address = -1 pga = 1 debug = False if "gpio" in sensor: gpioConfig = sensor["gpio"] if "port" in gpioConfig: port = gpioConfig["port"] if "baud" in gpioConfig: baud = gpioConfig["baud"] if "pos" in gpioConfig: pos = gpioConfig["pos"] ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC) else: if "address" in gpioConfig: address = gpioConfig["address"] if "ic" in gpioConfig: ic = gpioConfig["ic"] if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False try: # Create Object for all ogs _p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic) _p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud)) _p_ogs.ogs_port.DEBUG = debug context_logger.info_with_context( "OGS", f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}", ) self.__selectSensor(_p_ogs.ic, _p_ogs.address) self.__initialize(_p_ogs) for param in sensor["parameters"]: try: val = self._getOGS_param_data(_p_ogs, param["ch"]) context_logger.info_with_context( "OGS", f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}", ) _val = {"value": [val], "count": 1} value.append(_val) _success = True except Exception as e: _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context( "OGS", f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}", ) except Exception as e: _success = False _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}") if sensor["pn"] == 106 and sensor["en"] == self.baseConfig["oz_enable"]: port = "/dev/ttyAMA0" baud = 38400 ic = -1 address = -1 pga = 1 debug = False if "gpio" in sensor: gpioConfig = sensor["gpio"] if "port" in gpioConfig: port = gpioConfig["port"] if "baud" in gpioConfig: baud = gpioConfig["baud"] if "pos" in gpioConfig: pos = gpioConfig["pos"] ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC) else: if "address" in gpioConfig: address = gpioConfig["address"] if "ic" in gpioConfig: ic = gpioConfig["ic"] if "pga" in gpioConfig: pga = gpioConfig["pga"] if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False try: # Create Object for all ogs _p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic) _p_ogs.pga = pga _p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud)) _p_ogs.ogs_port.DEBUG = debug context_logger.info_with_context( "OGS", f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}", ) self.__selectSensor(_p_ogs.ic, _p_ogs.address) self.__initialize(_p_ogs) for param in sensor["parameters"]: try: val = self._getOGS_param_data(_p_ogs, param["ch"]) context_logger.info_with_context( "OGS", f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}", ) _val = {"value": [val], "count": 1} value.append(_val) _success = True except Exception as e: _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context( "OGS", f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}", ) except Exception as e: _success = False _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}") if sensor["pn"] == 107 and sensor["en"] == self.baseConfig["oz_enable"]: port = "/dev/ttyAMA0" baud = 19200 ic = -1 address = -1 pga = 1 debug = False if "gpio" in sensor: gpioConfig = sensor["gpio"] if "port" in gpioConfig: port = gpioConfig["port"] if "baud" in gpioConfig: baud = gpioConfig["baud"] if "pos" in gpioConfig: pos = gpioConfig["pos"] ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC) else: if "address" in gpioConfig: address = gpioConfig["address"] if "ic" in gpioConfig: ic = gpioConfig["ic"] if "pga" in gpioConfig: pga = gpioConfig["pga"] if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False try: # Create Object for all ogs _p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic) _p_ogs.pga = pga _p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud)) _p_ogs.ogs_port.DEBUG = debug context_logger.info_with_context( "OGS", f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}", ) self.__selectSensor(_p_ogs.ic, _p_ogs.address) self.__initialize(_p_ogs) for param in sensor["parameters"]: try: val = self._getOGS_param_data(_p_ogs, param["ch"]) context_logger.info_with_context( "OGS", f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}", ) _val = {"value": [val], "count": 1} value.append(_val) _success = True except Exception as e: _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context( "OGS", f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}", ) except Exception as e: _success = False _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}") if sensor["pn"] == 109 and sensor["en"] == self.baseConfig["oz_enable"]: port = "/dev/ttyAMA0" # Not Required for Serinus baud = 38400 ic = -1 # Not Required for Serinus address = -1 # Not Required for Serinus pga = 1 debug = False if "gpio" in sensor: gpioConfig = sensor["gpio"] if "port" in gpioConfig: port = gpioConfig["port"] if "baud" in gpioConfig: baud = gpioConfig["baud"] if "pos" in gpioConfig: pos = gpioConfig["pos"] ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC) else: if "address" in gpioConfig: address = gpioConfig["address"] if "ic" in gpioConfig: ic = gpioConfig["ic"] if "pga" in gpioConfig: pga = gpioConfig["pga"] if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False try: # Create Object for all ogs _p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic) _p_ogs.pga = pga _p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud)) _p_ogs.ogs_port.DEBUG = debug context_logger.info_with_context( "OGS", f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}", ) self.__selectSensor(_p_ogs.ic, _p_ogs.address) self.__initialize(_p_ogs, sensor) for param in sensor["parameters"]: try: val = self._getOGS_param_data(_p_ogs, param["ch"]) context_logger.info_with_context( "OGS", f"Init -> PN {sensor[self.partNumber]} POS: {pos} CH:{param[self.parameter]} VAL:{val}", ) _val = {"value": [val], "count": 1} value.append(_val) _success = True except Exception as e: _val = {"value": [], "count": 0} value.append(_val) context_logger.error_with_context( "OGS", f"Init Failed PN:{sensor[self.partNumber]} POS: {pos} CH:{param[self.parameter]} - {e}", ) except Exception as e: _success = False _val = {"value": [], "count": 0} value.append(_val) exc_type, exc_obj, exc_tb = sys.exc_info() fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] context_logger.error_with_context( "OGS", f"initializeSensor 107: {exc_type}, {fname}, {exc_tb.tb_lineno} {e}", ) self.p_ogs.append(_p_ogs) self.v_ogs.append(value) return int(_success)
[docs] def findKeyinList(self, keys: list, sensor_list: dict) -> bool: """Check if any sensor parameter send-code matches the provided calibration keys. Args: keys: List of calibration key prefixes to match. sensor_list: Sensor configuration dict with a 'parameters' list. Returns: True if at least one parameter matches a calibration key. """ for parameter in sensor_list["parameters"]: if "_" in parameter["sc"]: if any(parameter["sc"][1:3] in s for s in keys): return True if parameter["sc"][:2] in keys: return True return False
[docs] def getSensorReading(self, calibrationKeys: list[str] | None = None) -> dict: """Read all initialized gas sensors and accumulate values. Args: calibrationKeys: Optional list of send-code prefixes to restrict which sensors are read (empty = read all). Returns: Dict mapping send-codes to their latest raw or calibrated values. """ if calibrationKeys is None: calibrationKeys = [] data = {} for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: context_logger.info_with_context( "OGS", f"PART: {sensor['pn']} POS: {sensor.get('gpio', {}).get('pos', -1)}", ) if len(calibrationKeys) == 0 or self.findKeyinList(calibrationKeys, sensor): if self.p_ogs[X] is None: continue self.__selectSensor(self.p_ogs[X].ic, self.p_ogs[X].address) for Y, parameters in enumerate(sensor["parameters"]): try: value = self._getOGS_param_data(self.p_ogs[X], parameters["ch"]) if len(calibrationKeys) == 0: value = round( ((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)), 2, ) if "rd" in parameters: addn = random.uniform(-1 * parameters["rd"], parameters["rd"]) context_logger.debug_with_context("OGS", f"rd: {addn}") value = value + addn self.v_ogs[X][Y]["value"].append(value) self.v_ogs[X][Y]["count"] += 1 if isinstance(parameters["sc"], str): data[parameters["sc"]] = value else: for scVal in parameters["sc"]: data[scVal] = value context_logger.info_with_context( "OGS", f"PN:{sensor[self.partNumber]} SC:{parameters['sc']} PM:{parameters[self.parameter]} VAL:{value} CNT:{self.v_ogs[X][Y]['count']}", ) except Exception as e: context_logger.error_with_context("OGS", f"getSensorReading: {e}") return data
[docs] def putSensorValue(self, value: dict, calibrationKeys: list[str] | None = None) -> dict: """Transfer accumulated gas sensor readings into the output payload and reset. Args: value: Shared output dict to populate with OGS time-series data. calibrationKeys: Optional list of send-code prefixes to restrict output. Returns: The updated output dict with gas sensor data added. """ if calibrationKeys is None: calibrationKeys = [] for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: if len(calibrationKeys) == 0 or self.findKeyinList(calibrationKeys, sensor): for Y, parameters in enumerate(sensor["parameters"]): try: if isinstance(parameters["sc"], str): value[parameters["sc"]] = self.v_ogs[X][Y]["value"] else: for scVal in parameters["sc"]: value[scVal] = self.v_ogs[X][Y]["value"] self.v_ogs[X][Y]["value"] = [] self.v_ogs[X][Y]["count"] = 0 except Exception as e: context_logger.error_with_context("OGS", f"putSensorValue: {e}") return value
[docs] def putInitvalues(self, value: json) -> json: """Add OGS sensor initialization flags to the init payload. Args: value: Shared init payload dict. Returns: The updated init payload with an 'ogs' key listing init statuses. """ try: _sensor_init = [] for sensor in self.configuration: _sensor_init.append(sensor["init"]) value.update({"ogs": _sensor_init}) return value except Exception as e: context_logger.error_with_context("OGS", f"putInitvalues: {e}") return value
# =============== OzWrapper Functions for different sensors =========================# def __initialize(self, ogs_param: OzOGSParam, sensor: dict | None = None) -> None: if ogs_param.partNo == 101 or ogs_param.partNo == 106 or ogs_param.partNo == 102 or ogs_param.partNo == 107: ogs_param.ogs_port.initialize() elif ogs_param.partNo == 109: if sensor is not None: ogs_param.ogs_port.initialize(sensor) else: time.sleep(0.05) def __setDriver( self, partNo: int, port: int, baud: int ) -> ADS122U04 | WinsenO2 | CubicGeneric | Semeatech | SemeatechA4 | NEVADA | HORIBA | Serinus | None: if partNo == 101: return ADS122U04(port, baud) if partNo == 102: return WinsenO2(port, baud) if partNo == 103: return CubicGeneric(port, baud) if partNo == 104: return Semeatech(port, baud) if partNo == 105: return SemeatechA4(port, baud) if partNo == 106: return NEVADA(port, baud) if partNo == 107: return HORIBA(port, baud) if partNo == 109: return Serinus() return None def _getOGS_param_data(self, ogs_param: OzOGSParam | None, sensor_param: int | None) -> float | None: if ogs_param is not None: if ogs_param.partNo == 101: # normal ogs self.__selectChannel(ogs_param.ogs_port, sensor_param, ogs_param.pga) val = self.__get_ADS_Data(ogs_param.ogs_port, sensor_param) if val is None: return 0.0 return round(val, 2) if ogs_param.partNo == 102: # winsen val = ogs_param.ogs_port.getSensorData() if val is None: return 0.0 return round(val, 2) if ogs_param.partNo == 103: # cubic generic val = ogs_param.ogs_port.getSensorData() if val is None: return 0.0 return round(val, 2) if ogs_param.partNo == 104: # Semeatech val = ogs_param.ogs_port.getSensorData(sensor_param) if val is None: return 0.0 return round(val, 2) if ogs_param.partNo == 105: # SemeatechA4 val = ogs_param.ogs_port.getSensorData() if val is None: return 0.0 return round(val, 2) if ogs_param.partNo == 106: # Nevada val = ogs_param.ogs_port.getSensorData() if val is None: return 0.0 return round(val, 2) if ogs_param.partNo == 107: # HORIBA val = ogs_param.ogs_port.getSensorData(sensor_param) if val is None: return 0.0 return round(val, 2) if ogs_param.partNo == 109: # Serinus val = ogs_param.ogs_port.getSensorData(sensor_param) if val is None: return 0.0 return round(val, 2) return 0.0 # This Function selects sensor and only accessible for uart multiplexer def __selectSensor(self, ic: int, address: int) -> None: if ic == 0: gpio.set(self.GPIO_ENABLE_1, 0) gpio.set(self.GPIO_ENABLE_2, 1) elif ic == 1: gpio.set(self.GPIO_ENABLE_1, 1) gpio.set(self.GPIO_ENABLE_2, 0) if address == 0: gpio.set(self.GPIO_A, 0) gpio.set(self.GPIO_B, 0) elif address == 1: gpio.set(self.GPIO_A, 1) gpio.set(self.GPIO_B, 0) elif address == 2: gpio.set(self.GPIO_A, 0) gpio.set(self.GPIO_B, 1) elif address == 3: gpio.set(self.GPIO_A, 1) gpio.set(self.GPIO_B, 1) time.sleep(1) # necessary timeout after selecting channel # ====================================================================================# # =================== ADS FUNCTIONS ============================# def __selectChannel( self, ads: ADS122U04 | WinsenO2 | CubicGeneric | Semeatech | SemeatechA4 | NEVADA | HORIBA, channel: int, pga: int, ) -> None: ads.setChannel(channel, pga) def __get_ADS_Data( self, ads: ADS122U04 | WinsenO2 | CubicGeneric | Semeatech | SemeatechA4 | NEVADA | HORIBA, channel: int, ) -> float | None: try: if channel == 2: val = ads.get_internal_temperature() else: val = ads.getData() return val except Exception as e: context_logger.error_with_context("OGSxADS", f"getData failed: {e}") return None # or float('nan') # ==============================================================#
[docs] def PrintException(self) -> None: """Log the current exception with file name and line number context.""" exc_type, exc_obj, tb = sys.exc_info() f = tb.tb_frame lineno = tb.tb_lineno filename = f.f_code.co_filename linecache.checkcache(filename) line = linecache.getline(filename, lineno, f.f_globals) context_logger.error_with_context("EXCEPTION", f'IN ({filename}, LINE {lineno} "{line.strip()}"): {exc_obj}')
# Example code # python3 -m OzWrapper.OzOGS.OzOGS if __name__ == "__main__": import os dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, "ogs.config.json") with open(file_name) as ogsConfigFile: data = ogsConfigFile.read() ogsConfig = json.loads(data) context_logger.info_with_context("OGS", f"Config: {ogsConfig}") ogs = OzOGS() ogs.initialize(ogsConfig["ogs"], {}) context_logger.info_with_context("OGS", "Reading start") for _ in range(0, 4): ogs.getSensorReading() data = {} context_logger.info_with_context("OGS", "PutSensor value start") context_logger.info_with_context("OGS", ogs.putSensorValue(data)) # calibList = [] # calibList.append('g2') # calibList.append('g3') # print("getSensorReading value start \n\n") # for X, sensor in enumerate(calibList): # print(f"Calib sensor: {calibList[X]}") # print(f"OGS GetSensor: {ogs.getCalSensorReading(calibList[X])}") # data = {} # print("PutSensor value start \n\n") # print(ogs.putSensorValue(data))