Source code for drivers.PyModbus.PyModbus

"""PyModbus Modbus TCP/RTU server driver.

Wraps the pymodbus library to expose a Modbus server (TCP or RTU) that other
devices can poll for sensor data.  The server holds a configurable number of
32-bit IEEE-754 float registers that the Oizom firmware updates each sensing
cycle.

Protocol: Modbus TCP or Modbus RTU over RS-485.

Typical usage::

    server = PyModbus(size=20)
    ctx = server.build_context(slaveId=1, fixId=True)
    server.update_data(1, 3, 0, 23.5)
    server.run_tcp_server("0.0.0.0", 5020)
"""

from pymodbus.constants import Endian
from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSlaveContext,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.payload import BinaryPayloadBuilder
from pymodbus.server.sync import StartSerialServer, StartTcpServer
from pymodbus.transaction import ModbusRtuFramer
from pymodbus.version import version

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class PyModbus: """Modbus server wrapper for exposing sensor data over TCP or RTU. Attributes: size: Number of 32-bit float register slots to allocate. default_value: Default register fill value. context: Active ``ModbusServerContext`` after ``build_context`` is called. """ size = 0 default_value = 0 context = None
[docs] def __init__(self, size: int, default_value: int = 0) -> None: """Initialize the PyModbus server with a register block size. Args: size: Number of 32-bit float register slots. default_value: Default fill value for all registers. """ self.size = size self.default_value = default_value
[docs] def build_context(self, slaveId: int, fixId: bool) -> ModbusServerContext: """Build and return a Modbus data-store context. Args: slaveId: Modbus slave/unit identifier. fixId: If True, create a multi-slave context keyed by *slaveId*; otherwise create a single-slave context. Returns: The constructed ``ModbusServerContext``. """ store = ModbusSlaveContext( di=ModbusSequentialDataBlock(0, [0xBF80, 0x0000] * self.size), co=ModbusSequentialDataBlock(0, [0xBF80, 0x0000] * self.size), hr=ModbusSequentialDataBlock(0, [0xBF80, 0x0000] * self.size), ir=ModbusSequentialDataBlock(0, [0xBF80, 0x0000] * self.size), ) slave = {slaveId: store} # self.context = ModbusServerContext(slaves=slave, single=False) if fixId: self.context = ModbusServerContext(slaves=slave, single=False) else: self.context = ModbusServerContext(slaves=store, single=True) return self.context
[docs] def run_tcp_server(self, server_ip: str, server_port: int) -> None: """Start a blocking Modbus TCP server. Args: server_ip: IP address to bind (e.g. ``"0.0.0.0"``). server_port: TCP port number. """ # ----------------------------------------------------------------------- # # initialize the server information # ----------------------------------------------------------------------- # identity = ModbusDeviceIdentification() identity.VendorName = "Oizom" identity.ProductCode = "PP" identity.VendorUrl = "http://github.com/oizom-iots" identity.ProductName = "Oizom Server" identity.ModelName = "Polludron" identity.MajorMinorRevision = version.short() StartTcpServer( self.context, identity=identity, address=(server_ip, server_port) )
# reactor.run()
[docs] def run_rtu_server( self, server_port: str, server_baud: int, server_parity: str ) -> None: """Start a blocking Modbus RTU (serial) server. Args: server_port: Serial device path (e.g. ``"/dev/ttyAMA0"``). server_baud: Baud rate for the serial link. server_parity: Parity setting (``"N"``, ``"E"``, or ``"O"``). """ # ----------------------------------------------------------------------- # # initialize the server information # ----------------------------------------------------------------------- # identity = ModbusDeviceIdentification() identity.VendorName = "Oizom" identity.ProductCode = "PP" identity.VendorUrl = "http://github.com/oizom-iots" identity.ProductName = "Oizom Server" identity.ModelName = "Polludron" identity.MajorMinorRevision = version.short() StartSerialServer( self.context, framer=ModbusRtuFramer, identity=identity, port=server_port, timeout=0.005, baudrate=server_baud, parity=server_parity, )
[docs] def update_data( self, slaveId: int, register: str, register_address: int, register_value: float ) -> None: """Write a 32-bit float value into the Modbus register store. Args: slaveId: Target slave/unit ID in the context. register: Register type code used by ``setValues``. register_address: Starting register address. register_value: Float value to encode and store. """ builder = BinaryPayloadBuilder(byteorder=Endian.Big, wordorder=Endian.Little) builder.add_32bit_float(float(register_value)) payload = builder.to_registers() context_logger.info_with_context( "MODBUS", f"update_data: SlaveId:{slaveId} Register:{register} Address:{register_address} Value:{register_value} Payload:{payload}", ) self.context[slaveId].setValues(register, register_address, payload)
if __name__ == "__main__": pymodbus = PyModbus()