Source code for OzWrapper.OzOSP.OzOSP

"""Oizom Sensor Protocol (OSP) wrapper for Modbus RTU sensor integration.

Reads sensor data from external Modbus RTU slave devices over a shared serial
bus, supporting configurable function codes, register addresses, endianness,
and multi-register float parsing.
"""

import struct

from pymodbus.client.sync import ModbusSerialClient as ModbusClient

from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


# OSP = Oizom Sensor Protocol
[docs] class OzOSP(GenericSensor): """Modbus RTU client wrapper for reading external sensor slaves. Creates a shared Modbus RTU serial client and reads holding/input registers from multiple slave devices, applying sensitivity and correction factors to raw register values. Attributes: configuration: OSP sensor configuration dict from the Gateway. client: PyModbus serial client instance, or None before initialization. OSP_data: Dict of accumulated sensor readings keyed by send-code. debug: Whether verbose debug logging is enabled. """
[docs] def __init__(self): """Initialize OzOSP with empty configuration and data stores.""" self.configuration = {} self.client = None self.OSP_data = {}
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialize the Modbus RTU client and probe all configured slave sensors. Args: config: OSP configuration dict with gpio, sensors, and debug settings. init_value: Dict to populate with per-sensor initialization flags. Returns: True if at least one sensor initialized successfully. """ self.configuration = config self.debug = config.get("debug", False) context_logger.info_with_context("OSP", f"Initializing MODBUS sensors {self.configuration}") _success = False gpio_config = self.configuration.get("gpio") port = gpio_config.get("port", "/dev/ttyAMA2") baudrate = gpio_config.get("baudrate", 9600) parity = gpio_config.get("parity", "N") """ ATTENTION: ⚠️ Here common modbus client port is created so all slaves should have same parity, baudrate etc else it will fail """ try: self.client = ModbusClient( method="rtu", port=port, baudrate=baudrate, parity=parity, stopbits=1, bytesize=8, timeout=3, ) except Exception as e: context_logger.error_with_context("OSP", f"Modbus Client Initialization Error: {e}") return False try: self.client.connect() except Exception as e: context_logger.error_with_context("OSP", f"Modbus Client Connection Error: {e}") return False for sensor in self.configuration.get("sensors", []): try: if "en" in sensor: sensor["init"] = self.initializeSensor(sensor) _success = sensor["init"] # send confirmation to main file except Exception as e: sensor["init"] = 0 context_logger.error_with_context("OSP", f"Init: {e}") self.putInitvalues(init_value) return _success
[docs] def initializeSensor(self, sensor: dict) -> None: """Probe a Modbus slave sensor by reading all its configured parameters. Args: sensor: Sensor configuration dict with slave_id and parameters. Returns: True if at least one parameter returned a valid reading. """ success = False for parameter in sensor.get("parameters", []): val = self.getSensorData(self.client, sensor["slave_id"], parameter) if val is not None: success = True return success
[docs] def getSensorData(self, modbus_client, slave_id, config): """Read a single parameter from a Modbus slave register. Args: modbus_client: PyModbus client instance. slave_id: Modbus slave/unit ID. config: Parameter configuration dict with register, fn_code, count, endian, cr (correction), and se (sensitivity). Returns: Corrected sensor value as float, or None on error. """ register = config.get("register", 0) fn_code = int(config.get("fn_code", 3)) count = int(config.get("count", 1)) endian = config.get("endian", "big") cr = config.get("cr", None) # cr stand for correction factor se = config.get("se", None) # se stand for sensitivity """ CONTEXT: 🌐 modbus_client.read_holding_registers(register, 1, unit=slave_id) address: starting register number to read, count: how many registers to read (1 = read 1 register), unit: slave ID of the Modbus device """ try: if fn_code == 3: resp = modbus_client.read_holding_registers(register, count, unit=slave_id) elif fn_code == 4: resp = modbus_client.read_input_registers(register, count, unit=slave_id) else: context_logger.error_with_context("OSP", f"Invalid fn_code {fn_code}") return None if resp.isError(): context_logger.error_with_context("OSP", f"Modbus error (slave={slave_id}, reg={register})") return None # value = resp.registers[0] # Extract single variable value from response list value = self._parse_registers(resp.registers, count, endian) if self.debug: context_logger.debug_with_context( "OSP", f"Raw value from sensor (slave={slave_id}, reg={register}): {value}", ) if se is not None: value = value * se / 100.0 if cr is not None: value = value + cr / 10.0 return value except Exception as e: context_logger.error_with_context("OSP", f"Exception: {e}") return None
[docs] def getSensorReading(self, calibrationKeys=[]) -> dict: """Read all configured Modbus slave sensors and accumulate values. Args: calibrationKeys: Optional list of send-codes to restrict which sensors are read (empty = read all). Returns: Dict mapping send-codes to their accumulated value lists. """ try: self.OSP_data = {} for sensor in self.configuration.get("sensors", []): slave_id = sensor.get("slave_id", 1) for parameter in sensor.get("parameters", []): value = self.getSensorData(self.client, slave_id, parameter) if value is not None: self.OSP_data.setdefault(parameter["sc"], []) self.OSP_data[parameter["sc"]].append(value) except Exception as e: context_logger.error_with_context("OSP", f"Error in getSensorReading {e}") return self.OSP_data
[docs] def putSensorValue(self, value: dict, calibrationKeys: list[str] | None = []): """Transfer accumulated OSP sensor readings into the output payload. Args: value: Shared output dict to populate with OSP data. calibrationKeys: Optional list of send-codes to restrict output. Returns: The updated output dict with OSP sensor data added. """ try: for sensor in self.configuration.get("sensors", []): parameters = sensor.get("parameters", []) for parameter in parameters: if len(calibrationKeys) == 0 or parameter["sc"] in calibrationKeys: value[parameter["sc"]] = self.OSP_data.get(parameter["sc"], []) except Exception as e: context_logger.error_with_context("OSP", f"Error in putSensorValue: {e}") return value
[docs] def putInitvalues(self, value: dict) -> dict: """Add OSP sensor initialization flags to the init payload. Args: value: Shared init payload dict. Returns: The updated init payload with an 'OSP' key listing init statuses. """ _sensor_init = [] for sensor in self.configuration.get("sensors", []): _sensor_init.append(sensor["init"]) value.update({"OSP": _sensor_init}) return value
def _parse_registers(self, registers, count, endian): """Parse raw Modbus register values into a numeric result. Handles single-register integers, dual-register floats, and special 5-register visibility sensors. Args: registers: List of raw register values from the Modbus response. count: Number of registers read (1, 2, or 5). endian: Byte order ('big' or 'little'). Returns: Parsed numeric value, or None if count is unsupported. """ endian_char = ">" if endian == "big" else "<" if count == 1: return registers[0] elif count == 5: print("NIUBOL VISIBILITY WITH RESPONSE 10 BYTE in SINGLE COUNT") return registers[0] elif count == 2: packed = struct.pack(f"{endian_char}HH", registers[1], registers[0]) return struct.unpack(f"{endian_char}f", packed)[0] return None
if __name__ == "__main__": # Example usage config = { "gpio": {"port": "/dev/ttyAMA2", "baudrate": 9600, "parity": "N"}, "sensors": [ { "slave_id": 1, "parameters": [ {"sc": "temperature", "register": 0, "fn_code": 3, "count": 2, "endian": "big", "se": 100} ], } ], } ozosp = OzOSP() ozosp.initialize(config, {}) readings = ozosp.getSensorReading() print(readings)