Source code for OzWrapper.OzModbus.OzModbus

"""Modbus RTU/TCP server wrapper for exposing sensor data as holding registers.

Maps sensor telemetry keys to Modbus holding register addresses and serves
them over RTU (serial) and/or TCP transports using PyModbus, enabling
integration with SCADA and PLC systems.
"""

import threading
import time
from queue import Queue

from drivers.PyModbus import PyModbus
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzModbus: """Modbus server wrapper mapping sensor keys to holding registers. Receives sensor payloads from a queue, writes values into a Modbus register context, and serves them via RTU and/or TCP server threads. Attributes: pymodbus: PyModbus driver instance managing the server context. slaveId: Modbus slave/unit ID for this device. modbus_context: Server data context holding register values. holding_register: Function code for holding registers (3). isRealtime: Whether to process only realtime data payloads. payloaddata: Latest sensor payload received from the queue. modbus_keys: Mapping of sensor send-codes to register addresses. can_negative: List of send-codes allowed to have negative values. """ pymodbus = None # driver slaveId = 0x00 modbus_context = None holding_register = 3 isRealtime = 0 # Payload payloaddata = None
[docs] def __init__(self) -> None: """Initialize OzModbus with the default register map and negative-value whitelist.""" self.modbus_keys: dict[str, int] = { "temp": 0, # TSL2561 "hum": 2, # INA219 "p2": 4, "p1": 6, "leq": 8, "light": 10, "uv": 12, "g1": 14, "g2": 16, "g3": 18, "g5": 20, "g6": 22, "g7": 24, "g8": 26, "g9": 28, "g4": 30, "v2": 32, "v3": 34, "v4": 36, "v5": 38, "v6": 40, "pr": 42, "p3": 44, "p4": 46, "wd": 48, "ws": 50, "rain": 52, "bs": 54, "lat": 56, "lon": 58, "g21": 60, "g31": 62, "g45": 64, "g51": 66, "g61": 68, "g71": 70, "g81": 72, "v7": 74, "x1": 76, "x2": 78, "x3": 80, "x4": 82, "x5": 84, "x6": 86, "solar": 88, "dp": 90, "vb1": 92, "vb2": 94, "vb3": 96, "vb4": 98, "vb5": 100, "vb6": 102, "n11": 104, "n12": 106, "n13": 108, "n21": 110, "n22": 112, "n23": 114, "n31": 116, "n32": 118, "n33": 120, } self.can_negative: list[str] = [ "temp", "lat", "lon", "dp", "vb1", "vb2", "vb3", "vb4", "vb5", "vb6", ]
[docs] def setup(self, modbusConfig: dict, modbus_queue: Queue) -> None: """Configure and start the Modbus RTU and/or TCP server(s). Parses serial port, baud rate, parity, server mode, host/port, and slave ID from the configuration, builds the register context, and starts server threads. Args: modbusConfig: Configuration dict with gpio, modconfig, and hostconfig sections. modbus_queue: Queue supplying sensor data payloads for register updates. """ # TODO: add more modbus configs rtu_port = "/dev/ttyAMA2" rtu_baud = 19200 rtu_parity = "N" # 'N' - None, 'E' - Even, 'O' - Odd size = 200 slaveId = 0x00 fixId = 0 mode = 1 # 1 : rtu 2: tcp server_host = "0.0.0.0" # by default broadcast server_port = 502 default_value = -1 if "gpio" in modbusConfig: gpioConfig = modbusConfig["gpio"] if "port" in gpioConfig: rtu_port = gpioConfig["port"] if "baud" in gpioConfig: rtu_baud = gpioConfig["baud"] if "parity" in gpioConfig: rtu_parity = gpioConfig["parity"] if "modconfig" in modbusConfig: modconf = modbusConfig["modconfig"] if "mode" in modconf: mode = modconf["mode"] if "size" in modconf: size = modconf["size"] if "slaveid" in modconf: slaveId = modconf["slaveid"] if "default" in modconf: default_value = modconf["default"] if "fix_id" in modconf: fixId = int(modconf["fix_id"]) if "hostconfig" in modbusConfig: hostconf = modbusConfig["hostconfig"] if "host" in hostconf: server_host = hostconf["host"] if "port" in hostconf: server_port = hostconf["port"] if "rdata" in modbusConfig: self.isRealtime = modbusConfig["rdata"] self.slaveId = slaveId self.fixId = fixId self.pymodbus = PyModbus(size=size, default_value=default_value) # TODO: DANGEROUS Find another way to start loop since it is very risky !! to spawn thread inside thread threading.Thread(target=self.loop, args=(modbus_queue,), daemon=True).start() context_logger.info_with_context("MODBUS", f"SlaveId: {slaveId} size: {size}") self.modbus_context = self.pymodbus.build_context(self.slaveId, self.fixId) # Start RTU server in a separate thread if enabled if mode == 1 or mode == 3: context_logger.info_with_context("MODBUS", f"Init [RTU] {rtu_port} {rtu_baud} {rtu_parity}") rtu_thread = threading.Thread( target=self.pymodbus.run_rtu_server, args=(rtu_port, rtu_baud, rtu_parity), daemon=True, name="ModbusRTUServer", ) rtu_thread.start() context_logger.info_with_context("MODBUS", "RTU server thread started") # Start TCP server in a separate thread if enabled if mode == 2 or mode == 3: context_logger.info_with_context("MODBUS", f"Init [TCP] {server_host} {server_port}") tcp_thread = threading.Thread( target=self.pymodbus.run_tcp_server, args=(server_host, server_port), daemon=True, name="ModbusTCPServer", ) tcp_thread.start() context_logger.info_with_context("MODBUS", "TCP server thread started") # Give servers time to initialize time.sleep(2)
[docs] def loop(self, modbus_queue: Queue) -> None: """Continuously read payloads from the queue and update Modbus registers. Args: modbus_queue: Queue supplying sensor data payloads. """ context_logger.info_with_context("MODBUS", "Running..") while 1: try: data = modbus_queue.get() if self.isRealtime: if data.get("d") and data["d"].get("rdata"): self.payloaddata = data["d"] context_logger.info_with_context("MODBUS", f"Sending realtime data {data}") else: continue else: if data.get("d") and not data["d"].get("rdata"): self.payloaddata = data["d"] context_logger.info_with_context("MODBUS", f"Sending data {data}") else: continue if self.payloaddata is not None: for _d in self.payloaddata: if _d in list(self.modbus_keys.keys()): _sensor_value = self.payloaddata.get(_d) # sensor_value context_logger.info_with_context( "MODBUS", f"Key:{_d} Val:{_sensor_value} Register:{self.modbus_keys.get(_d)}", ) if _d not in self.can_negative and _sensor_value < 0: _sensor_value = 0 modbus_register = self.modbus_keys.get(_d) self.pymodbus.update_data( self.slaveId, self.holding_register, modbus_register, round(float(_sensor_value), 2), ) except Exception as e: context_logger.error_with_context("MODBUS", f"loop: {e}") finally: time.sleep(1)
[docs] def run(self, modbusConfig: dict, modbus_queue: Queue) -> None: """Entry point: set up the Modbus server and start the data loop if enabled. Args: modbusConfig: Configuration dict with 'en' enable flag. modbus_queue: Queue supplying sensor data payloads. """ if "en" in modbusConfig: if modbusConfig["en"] == 1: self.setup(modbusConfig, modbus_queue) self.loop(modbus_queue)