"""Oizom Sensor Protocol (OSP) wrapper for Modbus RTU sensor integration.
Reads sensor data from external Modbus RTU slave devices over a shared serial
bus, supporting configurable function codes, register addresses, endianness,
and multi-register float parsing.
"""
import struct
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
# OSP = Oizom Sensor Protocol
[docs]
class OzOSP(GenericSensor):
"""Modbus RTU client wrapper for reading external sensor slaves.
Creates a shared Modbus RTU serial client and reads holding/input
registers from multiple slave devices, applying sensitivity and
correction factors to raw register values.
Attributes:
configuration: OSP sensor configuration dict from the Gateway.
client: PyModbus serial client instance, or None before initialization.
OSP_data: Dict of accumulated sensor readings keyed by send-code.
debug: Whether verbose debug logging is enabled.
"""
[docs]
def __init__(self):
"""Initialize OzOSP with empty configuration and data stores."""
self.configuration = {}
self.client = None
self.OSP_data = {}
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialize the Modbus RTU client and probe all configured slave sensors.
Args:
config: OSP configuration dict with gpio, sensors, and debug settings.
init_value: Dict to populate with per-sensor initialization flags.
Returns:
True if at least one sensor initialized successfully.
"""
self.configuration = config
self.debug = config.get("debug", False)
context_logger.info_with_context("OSP", f"Initializing MODBUS sensors {self.configuration}")
_success = False
gpio_config = self.configuration.get("gpio")
port = gpio_config.get("port", "/dev/ttyAMA2")
baudrate = gpio_config.get("baudrate", 9600)
parity = gpio_config.get("parity", "N")
""" ATTENTION: ⚠️ Here common modbus client port is created so all slaves should have same parity, baudrate etc else it will fail """
try:
self.client = ModbusClient(
method="rtu",
port=port,
baudrate=baudrate,
parity=parity,
stopbits=1,
bytesize=8,
timeout=3,
)
except Exception as e:
context_logger.error_with_context("OSP", f"Modbus Client Initialization Error: {e}")
return False
try:
self.client.connect()
except Exception as e:
context_logger.error_with_context("OSP", f"Modbus Client Connection Error: {e}")
return False
for sensor in self.configuration.get("sensors", []):
try:
if "en" in sensor:
sensor["init"] = self.initializeSensor(sensor)
_success = sensor["init"] # send confirmation to main file
except Exception as e:
sensor["init"] = 0
context_logger.error_with_context("OSP", f"Init: {e}")
self.putInitvalues(init_value)
return _success
[docs]
def initializeSensor(self, sensor: dict) -> None:
"""Probe a Modbus slave sensor by reading all its configured parameters.
Args:
sensor: Sensor configuration dict with slave_id and parameters.
Returns:
True if at least one parameter returned a valid reading.
"""
success = False
for parameter in sensor.get("parameters", []):
val = self.getSensorData(self.client, sensor["slave_id"], parameter)
if val is not None:
success = True
return success
[docs]
def getSensorData(self, modbus_client, slave_id, config):
"""Read a single parameter from a Modbus slave register.
Args:
modbus_client: PyModbus client instance.
slave_id: Modbus slave/unit ID.
config: Parameter configuration dict with register, fn_code, count,
endian, cr (correction), and se (sensitivity).
Returns:
Corrected sensor value as float, or None on error.
"""
register = config.get("register", 0)
fn_code = int(config.get("fn_code", 3))
count = int(config.get("count", 1))
endian = config.get("endian", "big")
cr = config.get("cr", None) # cr stand for correction factor
se = config.get("se", None) # se stand for sensitivity
""" CONTEXT: 🌐 modbus_client.read_holding_registers(register, 1, unit=slave_id)
address: starting register number to read,
count: how many registers to read (1 = read 1 register),
unit: slave ID of the Modbus device
"""
try:
if fn_code == 3:
resp = modbus_client.read_holding_registers(register, count, unit=slave_id)
elif fn_code == 4:
resp = modbus_client.read_input_registers(register, count, unit=slave_id)
else:
context_logger.error_with_context("OSP", f"Invalid fn_code {fn_code}")
return None
if resp.isError():
context_logger.error_with_context("OSP", f"Modbus error (slave={slave_id}, reg={register})")
return None
# value = resp.registers[0] # Extract single variable value from response list
value = self._parse_registers(resp.registers, count, endian)
if self.debug:
context_logger.debug_with_context(
"OSP",
f"Raw value from sensor (slave={slave_id}, reg={register}): {value}",
)
if se is not None:
value = value * se / 100.0
if cr is not None:
value = value + cr / 10.0
return value
except Exception as e:
context_logger.error_with_context("OSP", f"Exception: {e}")
return None
[docs]
def getSensorReading(self, calibrationKeys=[]) -> dict:
"""Read all configured Modbus slave sensors and accumulate values.
Args:
calibrationKeys: Optional list of send-codes to restrict which
sensors are read (empty = read all).
Returns:
Dict mapping send-codes to their accumulated value lists.
"""
try:
self.OSP_data = {}
for sensor in self.configuration.get("sensors", []):
slave_id = sensor.get("slave_id", 1)
for parameter in sensor.get("parameters", []):
value = self.getSensorData(self.client, slave_id, parameter)
if value is not None:
self.OSP_data.setdefault(parameter["sc"], [])
self.OSP_data[parameter["sc"]].append(value)
except Exception as e:
context_logger.error_with_context("OSP", f"Error in getSensorReading {e}")
return self.OSP_data
[docs]
def putSensorValue(self, value: dict, calibrationKeys: list[str] | None = []):
"""Transfer accumulated OSP sensor readings into the output payload.
Args:
value: Shared output dict to populate with OSP data.
calibrationKeys: Optional list of send-codes to restrict output.
Returns:
The updated output dict with OSP sensor data added.
"""
try:
for sensor in self.configuration.get("sensors", []):
parameters = sensor.get("parameters", [])
for parameter in parameters:
if len(calibrationKeys) == 0 or parameter["sc"] in calibrationKeys:
value[parameter["sc"]] = self.OSP_data.get(parameter["sc"], [])
except Exception as e:
context_logger.error_with_context("OSP", f"Error in putSensorValue: {e}")
return value
[docs]
def putInitvalues(self, value: dict) -> dict:
"""Add OSP sensor initialization flags to the init payload.
Args:
value: Shared init payload dict.
Returns:
The updated init payload with an 'OSP' key listing init statuses.
"""
_sensor_init = []
for sensor in self.configuration.get("sensors", []):
_sensor_init.append(sensor["init"])
value.update({"OSP": _sensor_init})
return value
def _parse_registers(self, registers, count, endian):
"""Parse raw Modbus register values into a numeric result.
Handles single-register integers, dual-register floats, and
special 5-register visibility sensors.
Args:
registers: List of raw register values from the Modbus response.
count: Number of registers read (1, 2, or 5).
endian: Byte order ('big' or 'little').
Returns:
Parsed numeric value, or None if count is unsupported.
"""
endian_char = ">" if endian == "big" else "<"
if count == 1:
return registers[0]
elif count == 5:
print("NIUBOL VISIBILITY WITH RESPONSE 10 BYTE in SINGLE COUNT")
return registers[0]
elif count == 2:
packed = struct.pack(f"{endian_char}HH", registers[1], registers[0])
return struct.unpack(f"{endian_char}f", packed)[0]
return None
if __name__ == "__main__":
# Example usage
config = {
"gpio": {"port": "/dev/ttyAMA2", "baudrate": 9600, "parity": "N"},
"sensors": [
{
"slave_id": 1,
"parameters": [
{"sc": "temperature", "register": 0, "fn_code": 3, "count": 2, "endian": "big", "se": 100}
],
}
],
}
ozosp = OzOSP()
ozosp.initialize(config, {})
readings = ozosp.getSensorReading()
print(readings)