"""PyModbus Modbus TCP/RTU server driver.
Wraps the pymodbus library to expose a Modbus server (TCP or RTU) that other
devices can poll for sensor data. The server holds a configurable number of
32-bit IEEE-754 float registers that the Oizom firmware updates each sensing
cycle.
Protocol: Modbus TCP or Modbus RTU over RS-485.
Typical usage::
server = PyModbus(size=20)
ctx = server.build_context(slaveId=1, fixId=True)
server.update_data(1, 3, 0, 23.5)
server.run_tcp_server("0.0.0.0", 5020)
"""
from pymodbus.constants import Endian
from pymodbus.datastore import (
ModbusSequentialDataBlock,
ModbusServerContext,
ModbusSlaveContext,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.payload import BinaryPayloadBuilder
from pymodbus.server.sync import StartSerialServer, StartTcpServer
from pymodbus.transaction import ModbusRtuFramer
from pymodbus.version import version
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class PyModbus:
"""Modbus server wrapper for exposing sensor data over TCP or RTU.
Attributes:
size: Number of 32-bit float register slots to allocate.
default_value: Default register fill value.
context: Active ``ModbusServerContext`` after ``build_context`` is called.
"""
size = 0
default_value = 0
context = None
[docs]
def __init__(self, size: int, default_value: int = 0) -> None:
"""Initialize the PyModbus server with a register block size.
Args:
size: Number of 32-bit float register slots.
default_value: Default fill value for all registers.
"""
self.size = size
self.default_value = default_value
[docs]
def build_context(self, slaveId: int, fixId: bool) -> ModbusServerContext:
"""Build and return a Modbus data-store context.
Args:
slaveId: Modbus slave/unit identifier.
fixId: If True, create a multi-slave context keyed by *slaveId*;
otherwise create a single-slave context.
Returns:
The constructed ``ModbusServerContext``.
"""
store = ModbusSlaveContext(
di=ModbusSequentialDataBlock(0, [0xBF80, 0x0000] * self.size),
co=ModbusSequentialDataBlock(0, [0xBF80, 0x0000] * self.size),
hr=ModbusSequentialDataBlock(0, [0xBF80, 0x0000] * self.size),
ir=ModbusSequentialDataBlock(0, [0xBF80, 0x0000] * self.size),
)
slave = {slaveId: store}
# self.context = ModbusServerContext(slaves=slave, single=False)
if fixId:
self.context = ModbusServerContext(slaves=slave, single=False)
else:
self.context = ModbusServerContext(slaves=store, single=True)
return self.context
[docs]
def run_tcp_server(self, server_ip: str, server_port: int) -> None:
"""Start a blocking Modbus TCP server.
Args:
server_ip: IP address to bind (e.g. ``"0.0.0.0"``).
server_port: TCP port number.
"""
# ----------------------------------------------------------------------- #
# initialize the server information
# ----------------------------------------------------------------------- #
identity = ModbusDeviceIdentification()
identity.VendorName = "Oizom"
identity.ProductCode = "PP"
identity.VendorUrl = "http://github.com/oizom-iots"
identity.ProductName = "Oizom Server"
identity.ModelName = "Polludron"
identity.MajorMinorRevision = version.short()
StartTcpServer(
self.context, identity=identity, address=(server_ip, server_port)
)
# reactor.run()
[docs]
def run_rtu_server(
self, server_port: str, server_baud: int, server_parity: str
) -> None:
"""Start a blocking Modbus RTU (serial) server.
Args:
server_port: Serial device path (e.g. ``"/dev/ttyAMA0"``).
server_baud: Baud rate for the serial link.
server_parity: Parity setting (``"N"``, ``"E"``, or ``"O"``).
"""
# ----------------------------------------------------------------------- #
# initialize the server information
# ----------------------------------------------------------------------- #
identity = ModbusDeviceIdentification()
identity.VendorName = "Oizom"
identity.ProductCode = "PP"
identity.VendorUrl = "http://github.com/oizom-iots"
identity.ProductName = "Oizom Server"
identity.ModelName = "Polludron"
identity.MajorMinorRevision = version.short()
StartSerialServer(
self.context,
framer=ModbusRtuFramer,
identity=identity,
port=server_port,
timeout=0.005,
baudrate=server_baud,
parity=server_parity,
)
[docs]
def update_data(
self, slaveId: int, register: str, register_address: int, register_value: float
) -> None:
"""Write a 32-bit float value into the Modbus register store.
Args:
slaveId: Target slave/unit ID in the context.
register: Register type code used by ``setValues``.
register_address: Starting register address.
register_value: Float value to encode and store.
"""
builder = BinaryPayloadBuilder(byteorder=Endian.Big, wordorder=Endian.Little)
builder.add_32bit_float(float(register_value))
payload = builder.to_registers()
context_logger.info_with_context(
"MODBUS",
f"update_data: SlaveId:{slaveId} Register:{register} Address:{register_address} Value:{register_value} Payload:{payload}",
)
self.context[slaveId].setValues(register, register_address, payload)
if __name__ == "__main__":
pymodbus = PyModbus()