"""Odor/gas sensor (OGS) allocation wrapper for electrochemical and reference analyzers.
Manages a multiplexed array of gas sensors (ADS122U04, Winsen, Cubic, Semeatech,
NevadaNano, HORIBA, Serinus) accessed through UART multiplexer GPIO selection,
providing unified read/calibrate/publish for NO2, O3, CO, SO2, H2S, NO, and O2.
"""
import json
import linecache
import os
import random
import sys
import time
from drivers.ADS122U04 import ADS122U04
from drivers.CubicGeneric.CubicGeneric import CubicGeneric
# import gpio
from drivers.gpio import gpio
from drivers.HORIBA.HORIBA import HORIBA
from drivers.NEVADA.NEVADA import NEVADA
from drivers.Semeatech.Semeatech import Semeatech
from drivers.SemeatechA4.SemeatechA4 import SemeatechA4
from drivers.Serinus import Serinus
from drivers.WinsenO2.WinsenO2 import WinsenO2
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
# TODO:
# [] GPIO SET PINS 3 pin for IC
"""
NO2: partNo: 201, sendCode: g3
O3: partNo: 203, sendCode: g5
CO: partNo: 202, sendCode: g2
SO2: partNo: 204, sendCode: g8
H2S: partNo: 205, sendCode: g6
NO: partNo: 206, sendCode: g7
"""
[docs]
class OzOGSParam:
"""Container for a single OGS sensor's configuration and driver reference.
Attributes:
partNo: Part number identifying the sensor type (101-109).
port: UART serial port path.
baud: Baud rate for the serial connection.
address: Multiplexer channel address within an IC.
ic: Multiplexer IC index (0 or 1).
pga: Programmable gain amplifier setting for ADS122U04.
ogs_port: Sensor driver instance (ADS122U04, WinsenO2, etc.), or None.
"""
partNo = 0
port = "/dev/ttyAMA0"
baud = 115200
address = -1
ic = -1
pga = 0
ogs_port = None # OGS Driver Object
[docs]
def __init__(self, partNo: int, port: str, baud: int, address: int, ic: int) -> None:
"""Initialize OGS parameter container.
Args:
partNo: Part number for this sensor.
port: UART serial port path.
baud: Baud rate for serial communication.
address: Multiplexer channel address.
ic: Multiplexer IC index.
"""
self.partNo = partNo
self.port = port
self.baud = baud
self.address = address
self.ic = ic
[docs]
def setOGS(
self,
ogs_port: ADS122U04 | WinsenO2 | CubicGeneric | Semeatech | SemeatechA4 | NEVADA | HORIBA | Serinus,
) -> None:
"""Assign the sensor driver instance.
Args:
ogs_port: Initialized sensor driver object for this OGS channel.
"""
self.ogs_port = ogs_port
[docs]
class OzOGS(GenericSensor):
"""Multiplexed gas sensor wrapper supporting multiple electrochemical and reference analyzers.
Manages a UART multiplexer (2 ICs x 4 channels) to address up to 8
gas sensors, handling initialization, GPIO-based channel selection,
reading, calibration-aware averaging, and payload assembly.
Attributes:
AVERAGE_COUNT: Number of readings to average per cycle.
GPIO_A: GPIO pin A for multiplexer address selection.
GPIO_B: GPIO pin B for multiplexer address selection.
GPIO_ENABLE_1: GPIO enable pin for multiplexer IC 0.
GPIO_ENABLE_2: GPIO enable pin for multiplexer IC 1.
SENSOR_PER_IC: Number of sensor channels per multiplexer IC.
configuration: Gas sensor configuration list from the Gateway.
v_ogs: Nested list of per-sensor, per-parameter accumulated values.
p_ogs: List of OzOGSParam instances (one per configured sensor).
"""
AVERAGE_COUNT = 1
GPIO_A = os.getenv("GPIO_A", 24)
GPIO_B = os.getenv("GPIO_B", 25)
GPIO_ENABLE_1 = os.getenv("GPIO_ENABLE_1", 27)
GPIO_ENABLE_2 = os.getenv("GPIO_ENABLE_2", 17)
SENSOR_PER_IC = os.getenv("SENSOR_PER_IC", 4)
[docs]
def __init__(self) -> None:
"""Initialize OzOGS with empty configuration and sensor lists."""
super().__init__()
self.configuration = {}
self.v_ogs = []
self.p_ogs = []
[docs]
def updateTempForADS(self, sensor: dict) -> None:
"""Append an internal temperature parameter to ADS122U04 sensor config if missing.
Args:
sensor: Sensor configuration dict whose parameters list may be extended.
"""
if len(sensor["parameters"]) == 2:
sc = sensor["parameters"][0]["sc"][:-1] + "t"
sensor["parameters"].append({"ch": 2, "cr": 0, "pm": 3, "sc": sc, "se": 100})
[docs]
def updateTempHumForSemeatech(self, sensor: dict) -> None:
"""Append temperature and humidity parameters to Semeatech sensor config if missing.
Args:
sensor: Sensor configuration dict whose parameters list may be extended.
"""
if len(sensor["parameters"]) == 2:
sc = sensor["parameters"][0]["sc"][:-1] + "t"
sensor["parameters"].append({"ch": 2, "cr": 0, "pm": 3, "sc": sc, "se": 100})
sc = sensor["parameters"][0]["sc"][:-1] + "h"
sensor["parameters"].append({"ch": 3, "cr": 0, "pm": 4, "sc": sc, "se": 100})
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialize all configured gas sensors and set up GPIO multiplexer pins.
Args:
config: List of OGS sensor configuration dicts from the Gateway.
init_value: Dict to populate with per-sensor initialization flags.
Returns:
True if all sensors were iterated without fatal error.
"""
self.configuration = config
gpio.setup(self.GPIO_A, gpio.OUT)
gpio.setup(self.GPIO_B, gpio.OUT)
gpio.setup(self.GPIO_ENABLE_1, gpio.OUT)
gpio.setup(self.GPIO_ENABLE_2, gpio.OUT)
context_logger.info_with_context("OGS", "Initialize OGS Sensor")
_success = False
try:
for sensor in self.configuration:
try:
if "pn" in sensor:
sensor["init"] = self.initializeSensor(sensor)
context_logger.info_with_context("OGS", f"Init Done {sensor['pn']} {sensor['init']}")
except Exception as e:
sensor["init"] = 0
context_logger.error_with_context(
"OGS",
f"InitializeSensor Failed {sensor['pn']} {sensor['init']} {e}",
)
_success = True
except Exception as e:
context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}")
sensor["init"] = 0
_success = False
finally:
self.putInitvalues(init_value)
return _success
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialize a single gas sensor based on its part number.
Creates the appropriate driver, selects the multiplexer channel,
and takes initial readings for each configured parameter.
Part numbers:
101 = ADS122U04, 102 = Winsen, 103 = Cubic, 104 = Semeatech,
105 = SemeatechA4, 106 = NevadaNano, 107 = HORIBA, 109 = Serinus.
Args:
sensor: Configuration dict with part number, gpio, and parameters.
Returns:
1 on success, 0 on failure.
"""
value = []
_success = False
_p_ogs = None
pos = -1
"""
Partno:
101 -> Normal OGS with ADS122U04
102 -> Winsen
103 -> CUBIC_GENERIC
104 -> Semeatech
105 -> SemeatechA4
106 -> NEVADANANO
107 -> Reference
"""
if sensor["pn"] == 101 and sensor["en"] == self.baseConfig["oz_enable"]:
self.updateTempForADS(sensor)
port = "/dev/ttyAMA0"
baud = 115200
ic = -1
address = -1
pga = 1
debug = False
if "gpio" in sensor:
gpioConfig = sensor["gpio"]
if "port" in gpioConfig:
port = gpioConfig["port"]
if "baud" in gpioConfig:
baud = gpioConfig["baud"]
if "pos" in gpioConfig:
pos = gpioConfig["pos"]
ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC)
else:
if "address" in gpioConfig:
address = gpioConfig["address"]
if "ic" in gpioConfig:
ic = gpioConfig["ic"]
if "pga" in gpioConfig:
pga = gpioConfig["pga"]
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
try:
# Create Object for all ogs
_p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic)
_p_ogs.pga = pga
_p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud))
_p_ogs.ogs_port.DEBUG = debug
context_logger.info_with_context(
"OGS",
f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}",
)
self.__selectSensor(_p_ogs.ic, _p_ogs.address)
self.__initialize(_p_ogs)
for param in sensor["parameters"]:
try:
val = self._getOGS_param_data(_p_ogs, param["ch"])
context_logger.info_with_context(
"OGS",
f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}",
)
_val = {"value": [val], "count": 1}
value.append(_val)
_success = True
except Exception as e:
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context(
"OGS",
f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}",
)
except Exception as e:
_success = False
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}")
elif sensor["pn"] == 102 and sensor["en"] == self.baseConfig["oz_enable"]:
port = "/dev/ttyAMA0"
baud = 115200
ic = -1
address = -1
pga = 1
if "gpio" in sensor:
gpioConfig = sensor["gpio"]
if "port" in gpioConfig:
port = gpioConfig["port"]
if "baud" in gpioConfig:
baud = gpioConfig["baud"]
if "pos" in gpioConfig:
pos = gpioConfig["pos"]
ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC)
else:
if "address" in gpioConfig:
address = gpioConfig["address"]
if "ic" in gpioConfig:
ic = gpioConfig["ic"]
try:
# Create Object for all ogs
_p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic)
_p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud))
context_logger.info_with_context(
"OGS",
f"PORT: {_p_ogs.port} Baud:{_p_ogs.baud} POS: {pos} IC: {_p_ogs.ic} ADDRESS {_p_ogs.address}",
)
self.__selectSensor(_p_ogs.ic, _p_ogs.address)
self.__initialize(_p_ogs)
for param in sensor["parameters"]:
try:
val = self._getOGS_param_data(_p_ogs, param["ch"])
context_logger.info_with_context(
"OGS",
f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}",
)
_val = {"value": [val], "count": 1}
value.append(_val)
_success = True
except Exception as e:
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context(
"OGS",
f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}",
)
except Exception as e:
_success = False
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}")
elif sensor["pn"] == 103 and sensor["en"] == self.baseConfig["oz_enable"]:
port = "/dev/ttyAMA0"
baud = 115200
ic = -1
address = -1
pga = 1
if "gpio" in sensor:
gpioConfig = sensor["gpio"]
if "port" in gpioConfig:
port = gpioConfig["port"]
if "baud" in gpioConfig:
baud = gpioConfig["baud"]
if "pos" in gpioConfig:
pos = gpioConfig["pos"]
ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC)
else:
if "address" in gpioConfig:
address = gpioConfig["address"]
if "ic" in gpioConfig:
ic = gpioConfig["ic"]
try:
# Create Object for all ogs
_p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic)
_p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud))
context_logger.info_with_context(
"OGS",
f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}",
)
self.__selectSensor(_p_ogs.ic, _p_ogs.address)
self.__initialize(_p_ogs)
for param in sensor["parameters"]:
try:
val = self._getOGS_param_data(_p_ogs, param["ch"])
context_logger.info_with_context(
"OGS",
f"Init -> PN {sensor[self.partNumber]} POS: {pos} CH:{param[self.parameter]} VAL:{val}",
)
_val = {"value": [val], "count": 1}
value.append(_val)
_success = True
except Exception as e:
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context(
"OGS",
f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}",
)
except Exception as e:
_success = False
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}")
elif sensor["pn"] == 104 and sensor["en"] == self.baseConfig["oz_enable"]:
self.updateTempHumForSemeatech(sensor)
port = "/dev/ttyAMA0"
baud = 115200
ic = -1
address = -1
pga = 1
debug = False
if "gpio" in sensor:
gpioConfig = sensor["gpio"]
if "port" in gpioConfig:
port = gpioConfig["port"]
if "baud" in gpioConfig:
baud = gpioConfig["baud"]
if "pos" in gpioConfig:
pos = gpioConfig["pos"]
ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC)
else:
if "address" in gpioConfig:
address = gpioConfig["address"]
if "ic" in gpioConfig:
ic = gpioConfig["ic"]
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
try:
# Create Object for all ogs
_p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic)
_p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud))
_p_ogs.ogs_port.DEBUG = debug
context_logger.info_with_context(
"OGS",
f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}",
)
self.__selectSensor(_p_ogs.ic, _p_ogs.address)
self.__initialize(_p_ogs)
for param in sensor["parameters"]:
try:
val = self._getOGS_param_data(_p_ogs, param["ch"])
context_logger.info_with_context(
"OGS",
f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}",
)
_val = {"value": [val], "count": 1}
value.append(_val)
_success = True
except Exception as e:
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context(
"OGS",
f"Init Failed PN:{sensor[self.partNumber]} POS::{pos} CH:{param[self.parameter]} - {e}",
)
except Exception as e:
_success = False
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}")
elif sensor["pn"] == 105 and sensor["en"] == self.baseConfig["oz_enable"]:
port = "/dev/ttyAMA0"
baud = 9600
ic = -1
address = -1
pga = 1
debug = False
if "gpio" in sensor:
gpioConfig = sensor["gpio"]
if "port" in gpioConfig:
port = gpioConfig["port"]
if "baud" in gpioConfig:
baud = gpioConfig["baud"]
if "pos" in gpioConfig:
pos = gpioConfig["pos"]
ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC)
else:
if "address" in gpioConfig:
address = gpioConfig["address"]
if "ic" in gpioConfig:
ic = gpioConfig["ic"]
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
try:
# Create Object for all ogs
_p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic)
_p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud))
_p_ogs.ogs_port.DEBUG = debug
context_logger.info_with_context(
"OGS",
f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}",
)
self.__selectSensor(_p_ogs.ic, _p_ogs.address)
self.__initialize(_p_ogs)
for param in sensor["parameters"]:
try:
val = self._getOGS_param_data(_p_ogs, param["ch"])
context_logger.info_with_context(
"OGS",
f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}",
)
_val = {"value": [val], "count": 1}
value.append(_val)
_success = True
except Exception as e:
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context(
"OGS",
f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}",
)
except Exception as e:
_success = False
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}")
if sensor["pn"] == 106 and sensor["en"] == self.baseConfig["oz_enable"]:
port = "/dev/ttyAMA0"
baud = 38400
ic = -1
address = -1
pga = 1
debug = False
if "gpio" in sensor:
gpioConfig = sensor["gpio"]
if "port" in gpioConfig:
port = gpioConfig["port"]
if "baud" in gpioConfig:
baud = gpioConfig["baud"]
if "pos" in gpioConfig:
pos = gpioConfig["pos"]
ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC)
else:
if "address" in gpioConfig:
address = gpioConfig["address"]
if "ic" in gpioConfig:
ic = gpioConfig["ic"]
if "pga" in gpioConfig:
pga = gpioConfig["pga"]
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
try:
# Create Object for all ogs
_p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic)
_p_ogs.pga = pga
_p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud))
_p_ogs.ogs_port.DEBUG = debug
context_logger.info_with_context(
"OGS",
f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}",
)
self.__selectSensor(_p_ogs.ic, _p_ogs.address)
self.__initialize(_p_ogs)
for param in sensor["parameters"]:
try:
val = self._getOGS_param_data(_p_ogs, param["ch"])
context_logger.info_with_context(
"OGS",
f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}",
)
_val = {"value": [val], "count": 1}
value.append(_val)
_success = True
except Exception as e:
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context(
"OGS",
f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}",
)
except Exception as e:
_success = False
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}")
if sensor["pn"] == 107 and sensor["en"] == self.baseConfig["oz_enable"]:
port = "/dev/ttyAMA0"
baud = 19200
ic = -1
address = -1
pga = 1
debug = False
if "gpio" in sensor:
gpioConfig = sensor["gpio"]
if "port" in gpioConfig:
port = gpioConfig["port"]
if "baud" in gpioConfig:
baud = gpioConfig["baud"]
if "pos" in gpioConfig:
pos = gpioConfig["pos"]
ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC)
else:
if "address" in gpioConfig:
address = gpioConfig["address"]
if "ic" in gpioConfig:
ic = gpioConfig["ic"]
if "pga" in gpioConfig:
pga = gpioConfig["pga"]
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
try:
# Create Object for all ogs
_p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic)
_p_ogs.pga = pga
_p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud))
_p_ogs.ogs_port.DEBUG = debug
context_logger.info_with_context(
"OGS",
f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}",
)
self.__selectSensor(_p_ogs.ic, _p_ogs.address)
self.__initialize(_p_ogs)
for param in sensor["parameters"]:
try:
val = self._getOGS_param_data(_p_ogs, param["ch"])
context_logger.info_with_context(
"OGS",
f"Init -> PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} VAL:{val}",
)
_val = {"value": [val], "count": 1}
value.append(_val)
_success = True
except Exception as e:
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context(
"OGS",
f"Init Failed PN:{sensor[self.partNumber]} POS:{pos} CH:{param[self.parameter]} - {e}",
)
except Exception as e:
_success = False
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context("OGS", f"Init Failed {sensor['pn']} {e}")
if sensor["pn"] == 109 and sensor["en"] == self.baseConfig["oz_enable"]:
port = "/dev/ttyAMA0" # Not Required for Serinus
baud = 38400
ic = -1 # Not Required for Serinus
address = -1 # Not Required for Serinus
pga = 1
debug = False
if "gpio" in sensor:
gpioConfig = sensor["gpio"]
if "port" in gpioConfig:
port = gpioConfig["port"]
if "baud" in gpioConfig:
baud = gpioConfig["baud"]
if "pos" in gpioConfig:
pos = gpioConfig["pos"]
ic, address = divmod(int(gpioConfig["pos"]), self.SENSOR_PER_IC)
else:
if "address" in gpioConfig:
address = gpioConfig["address"]
if "ic" in gpioConfig:
ic = gpioConfig["ic"]
if "pga" in gpioConfig:
pga = gpioConfig["pga"]
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
try:
# Create Object for all ogs
_p_ogs = OzOGSParam(sensor["pn"], port, baud, address, ic)
_p_ogs.pga = pga
_p_ogs.setOGS(self.__setDriver(_p_ogs.partNo, _p_ogs.port, _p_ogs.baud))
_p_ogs.ogs_port.DEBUG = debug
context_logger.info_with_context(
"OGS",
f"PORT:{_p_ogs.port} Baud:{_p_ogs.baud} POS:{pos} IC:{_p_ogs.ic} ADDRESS:{_p_ogs.address}",
)
self.__selectSensor(_p_ogs.ic, _p_ogs.address)
self.__initialize(_p_ogs, sensor)
for param in sensor["parameters"]:
try:
val = self._getOGS_param_data(_p_ogs, param["ch"])
context_logger.info_with_context(
"OGS",
f"Init -> PN {sensor[self.partNumber]} POS: {pos} CH:{param[self.parameter]} VAL:{val}",
)
_val = {"value": [val], "count": 1}
value.append(_val)
_success = True
except Exception as e:
_val = {"value": [], "count": 0}
value.append(_val)
context_logger.error_with_context(
"OGS",
f"Init Failed PN:{sensor[self.partNumber]} POS: {pos} CH:{param[self.parameter]} - {e}",
)
except Exception as e:
_success = False
_val = {"value": [], "count": 0}
value.append(_val)
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
context_logger.error_with_context(
"OGS",
f"initializeSensor 107: {exc_type}, {fname}, {exc_tb.tb_lineno} {e}",
)
self.p_ogs.append(_p_ogs)
self.v_ogs.append(value)
return int(_success)
[docs]
def findKeyinList(self, keys: list, sensor_list: dict) -> bool:
"""Check if any sensor parameter send-code matches the provided calibration keys.
Args:
keys: List of calibration key prefixes to match.
sensor_list: Sensor configuration dict with a 'parameters' list.
Returns:
True if at least one parameter matches a calibration key.
"""
for parameter in sensor_list["parameters"]:
if "_" in parameter["sc"]:
if any(parameter["sc"][1:3] in s for s in keys):
return True
if parameter["sc"][:2] in keys:
return True
return False
[docs]
def getSensorReading(self, calibrationKeys: list[str] | None = None) -> dict:
"""Read all initialized gas sensors and accumulate values.
Args:
calibrationKeys: Optional list of send-code prefixes to restrict
which sensors are read (empty = read all).
Returns:
Dict mapping send-codes to their latest raw or calibrated values.
"""
if calibrationKeys is None:
calibrationKeys = []
data = {}
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
context_logger.info_with_context(
"OGS",
f"PART: {sensor['pn']} POS: {sensor.get('gpio', {}).get('pos', -1)}",
)
if len(calibrationKeys) == 0 or self.findKeyinList(calibrationKeys, sensor):
if self.p_ogs[X] is None:
continue
self.__selectSensor(self.p_ogs[X].ic, self.p_ogs[X].address)
for Y, parameters in enumerate(sensor["parameters"]):
try:
value = self._getOGS_param_data(self.p_ogs[X], parameters["ch"])
if len(calibrationKeys) == 0:
value = round(
((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)),
2,
)
if "rd" in parameters:
addn = random.uniform(-1 * parameters["rd"], parameters["rd"])
context_logger.debug_with_context("OGS", f"rd: {addn}")
value = value + addn
self.v_ogs[X][Y]["value"].append(value)
self.v_ogs[X][Y]["count"] += 1
if isinstance(parameters["sc"], str):
data[parameters["sc"]] = value
else:
for scVal in parameters["sc"]:
data[scVal] = value
context_logger.info_with_context(
"OGS",
f"PN:{sensor[self.partNumber]} SC:{parameters['sc']} PM:{parameters[self.parameter]} VAL:{value} CNT:{self.v_ogs[X][Y]['count']}",
)
except Exception as e:
context_logger.error_with_context("OGS", f"getSensorReading: {e}")
return data
[docs]
def putSensorValue(self, value: dict, calibrationKeys: list[str] | None = None) -> dict:
"""Transfer accumulated gas sensor readings into the output payload and reset.
Args:
value: Shared output dict to populate with OGS time-series data.
calibrationKeys: Optional list of send-code prefixes to restrict output.
Returns:
The updated output dict with gas sensor data added.
"""
if calibrationKeys is None:
calibrationKeys = []
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
if len(calibrationKeys) == 0 or self.findKeyinList(calibrationKeys, sensor):
for Y, parameters in enumerate(sensor["parameters"]):
try:
if isinstance(parameters["sc"], str):
value[parameters["sc"]] = self.v_ogs[X][Y]["value"]
else:
for scVal in parameters["sc"]:
value[scVal] = self.v_ogs[X][Y]["value"]
self.v_ogs[X][Y]["value"] = []
self.v_ogs[X][Y]["count"] = 0
except Exception as e:
context_logger.error_with_context("OGS", f"putSensorValue: {e}")
return value
[docs]
def putInitvalues(self, value: json) -> json:
"""Add OGS sensor initialization flags to the init payload.
Args:
value: Shared init payload dict.
Returns:
The updated init payload with an 'ogs' key listing init statuses.
"""
try:
_sensor_init = []
for sensor in self.configuration:
_sensor_init.append(sensor["init"])
value.update({"ogs": _sensor_init})
return value
except Exception as e:
context_logger.error_with_context("OGS", f"putInitvalues: {e}")
return value
# =============== OzWrapper Functions for different sensors =========================#
def __initialize(self, ogs_param: OzOGSParam, sensor: dict | None = None) -> None:
if ogs_param.partNo == 101 or ogs_param.partNo == 106 or ogs_param.partNo == 102 or ogs_param.partNo == 107:
ogs_param.ogs_port.initialize()
elif ogs_param.partNo == 109:
if sensor is not None:
ogs_param.ogs_port.initialize(sensor)
else:
time.sleep(0.05)
def __setDriver(
self, partNo: int, port: int, baud: int
) -> ADS122U04 | WinsenO2 | CubicGeneric | Semeatech | SemeatechA4 | NEVADA | HORIBA | Serinus | None:
if partNo == 101:
return ADS122U04(port, baud)
if partNo == 102:
return WinsenO2(port, baud)
if partNo == 103:
return CubicGeneric(port, baud)
if partNo == 104:
return Semeatech(port, baud)
if partNo == 105:
return SemeatechA4(port, baud)
if partNo == 106:
return NEVADA(port, baud)
if partNo == 107:
return HORIBA(port, baud)
if partNo == 109:
return Serinus()
return None
def _getOGS_param_data(self, ogs_param: OzOGSParam | None, sensor_param: int | None) -> float | None:
if ogs_param is not None:
if ogs_param.partNo == 101: # normal ogs
self.__selectChannel(ogs_param.ogs_port, sensor_param, ogs_param.pga)
val = self.__get_ADS_Data(ogs_param.ogs_port, sensor_param)
if val is None:
return 0.0
return round(val, 2)
if ogs_param.partNo == 102: # winsen
val = ogs_param.ogs_port.getSensorData()
if val is None:
return 0.0
return round(val, 2)
if ogs_param.partNo == 103: # cubic generic
val = ogs_param.ogs_port.getSensorData()
if val is None:
return 0.0
return round(val, 2)
if ogs_param.partNo == 104: # Semeatech
val = ogs_param.ogs_port.getSensorData(sensor_param)
if val is None:
return 0.0
return round(val, 2)
if ogs_param.partNo == 105: # SemeatechA4
val = ogs_param.ogs_port.getSensorData()
if val is None:
return 0.0
return round(val, 2)
if ogs_param.partNo == 106: # Nevada
val = ogs_param.ogs_port.getSensorData()
if val is None:
return 0.0
return round(val, 2)
if ogs_param.partNo == 107: # HORIBA
val = ogs_param.ogs_port.getSensorData(sensor_param)
if val is None:
return 0.0
return round(val, 2)
if ogs_param.partNo == 109: # Serinus
val = ogs_param.ogs_port.getSensorData(sensor_param)
if val is None:
return 0.0
return round(val, 2)
return 0.0
# This Function selects sensor and only accessible for uart multiplexer
def __selectSensor(self, ic: int, address: int) -> None:
if ic == 0:
gpio.set(self.GPIO_ENABLE_1, 0)
gpio.set(self.GPIO_ENABLE_2, 1)
elif ic == 1:
gpio.set(self.GPIO_ENABLE_1, 1)
gpio.set(self.GPIO_ENABLE_2, 0)
if address == 0:
gpio.set(self.GPIO_A, 0)
gpio.set(self.GPIO_B, 0)
elif address == 1:
gpio.set(self.GPIO_A, 1)
gpio.set(self.GPIO_B, 0)
elif address == 2:
gpio.set(self.GPIO_A, 0)
gpio.set(self.GPIO_B, 1)
elif address == 3:
gpio.set(self.GPIO_A, 1)
gpio.set(self.GPIO_B, 1)
time.sleep(1) # necessary timeout after selecting channel
# ====================================================================================#
# =================== ADS FUNCTIONS ============================#
def __selectChannel(
self,
ads: ADS122U04 | WinsenO2 | CubicGeneric | Semeatech | SemeatechA4 | NEVADA | HORIBA,
channel: int,
pga: int,
) -> None:
ads.setChannel(channel, pga)
def __get_ADS_Data(
self,
ads: ADS122U04 | WinsenO2 | CubicGeneric | Semeatech | SemeatechA4 | NEVADA | HORIBA,
channel: int,
) -> float | None:
try:
if channel == 2:
val = ads.get_internal_temperature()
else:
val = ads.getData()
return val
except Exception as e:
context_logger.error_with_context("OGSxADS", f"getData failed: {e}")
return None # or float('nan')
# ==============================================================#
[docs]
def PrintException(self) -> None:
"""Log the current exception with file name and line number context."""
exc_type, exc_obj, tb = sys.exc_info()
f = tb.tb_frame
lineno = tb.tb_lineno
filename = f.f_code.co_filename
linecache.checkcache(filename)
line = linecache.getline(filename, lineno, f.f_globals)
context_logger.error_with_context("EXCEPTION", f'IN ({filename}, LINE {lineno} "{line.strip()}"): {exc_obj}')
# Example code
# python3 -m OzWrapper.OzOGS.OzOGS
if __name__ == "__main__":
import os
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, "ogs.config.json")
with open(file_name) as ogsConfigFile:
data = ogsConfigFile.read()
ogsConfig = json.loads(data)
context_logger.info_with_context("OGS", f"Config: {ogsConfig}")
ogs = OzOGS()
ogs.initialize(ogsConfig["ogs"], {})
context_logger.info_with_context("OGS", "Reading start")
for _ in range(0, 4):
ogs.getSensorReading()
data = {}
context_logger.info_with_context("OGS", "PutSensor value start")
context_logger.info_with_context("OGS", ogs.putSensorValue(data))
# calibList = []
# calibList.append('g2')
# calibList.append('g3')
# print("getSensorReading value start \n\n")
# for X, sensor in enumerate(calibList):
# print(f"Calib sensor: {calibList[X]}")
# print(f"OGS GetSensor: {ogs.getCalSensorReading(calibList[X])}")
# data = {}
# print("PutSensor value start \n\n")
# print(ogs.putSensorValue(data))