Source code for drivers.CubicPM6303.CubicPM6303

#!/usr/bin/python
"""Driver for the Cubic PM6303 particulate matter sensor (Modbus RTU).

Reads PM1, PM2.5, PM4.25, PM10, and TSP mass concentrations plus airflow
rate over a Modbus RTU serial link.  Also supports configuring the pump
stop interval.

Hardware:
    Interface: Modbus RTU (RS-485 / UART), 9600 baud
    Default port: /dev/ttyAMA2
    Supply: 5 V (with integrated pump)

Typical usage::

    sensor = CubicPM6303()
    sensor.initialize("/dev/ttyAMA2", baud=9600)
    data = sensor.getPM(sendCommand=True)

Note:
    Requires the ``pymodbus`` package (legacy sync API).
"""

import time
from typing import ClassVar, Literal

from pymodbus.client.sync import ModbusSerialClient as ModbusClient

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class CubicPM6303: """Modbus RTU driver for the Cubic PM6303 particulate matter sensor. Attributes: DEBUG: When ``True``, enables verbose Modbus-level logging. data: Most recent PM, particle-count, and flow readings. pump_stop_interval: Pump stop interval in minutes written at init. """ DEBUG = False _timeout_ = 8 TIMEOUT = _timeout_ * 1000 cubic_client = None data: ClassVar[dict[str, float]] = { "pm2_5": 0.0, "pm10": 0.0, "pm100": 0.0, "pm1": 0.0, "pm4_25": 0.0, "p1": 0.0, "p2": 0.0, "p3": 0.0, "p4": 0.0, "p5": 0.0, "p6": 0.0, "flow": 0.0, } pump_stop_interval = 0
[docs] def __init__(self) -> None: """Create an uninitialised CubicPM6303 instance. Call :meth:`initialize` to open the Modbus connection. """ return
[docs] def initialize( self, cubic_port: str = "/dev/ttyAMA2", baud: int = 9600 ) -> Literal[1]: """Open the Modbus serial connection and configure the pump. Args: cubic_port: Serial device path for the Modbus bus. baud: Baud rate. Returns: Always ``1`` on success. """ self.cubic_client = ModbusClient( method="rtu", port=cubic_port, baudrate=baud, timeout=1 ) self.set_pump_stop_interval(self.pump_stop_interval) return 1
[docs] def getPM(self, sendCommand: bool = False) -> dict[str, float]: """Read particulate-matter concentrations and flow rate. Args: sendCommand: If ``True``, query the sensor before returning data. Returns: Dictionary with keys ``pm1``, ``pm2_5``, ``pm4_25``, ``pm10``, ``pm100`` (mass concentrations) and ``flow`` (L/min). """ if sendCommand: cubic_data = self.getData() buffer = [] self.data = { "pm2_5": 0.0, "pm10": 0.0, "pm100": 0.0, "pm1": 0.0, "pm4_25": 0.0, # PM4_25 added for new sensors "p1": 0.0, "p2": 0.0, "p3": 0.0, "p4": 0.0, "p5": 0.0, "p6": 0.0, "flow": 0.0, } for _data in cubic_data: buffer.append(int(_data, 16)) if len(buffer) > 23: # '0x19', '0x0', '0x0', '0x0', '0x0', '0x0', '0x2f', '0x0', '0xc', '0x0', '0x14', '0x0', '0x22', '0x0', '0x23'] self.data["pm2_5"] = buffer[9] * 256**2 + buffer[10] self.data["pm10"] = buffer[13] * 256**2 + buffer[14] self.data["pm100"] = buffer[5] * 256**2 + buffer[6] self.data["pm1"] = buffer[7] * 256**2 + buffer[8] self.data["pm4_25"] = buffer[11] * 256**2 + buffer[12] self.data["flow"] = buffer[23] / 100 return self.data
[docs] def getData(self) -> list[str]: """Read 24 Modbus input registers and return them as hex strings. Returns: List of hex-string register values, or an empty list on error. """ recieve_bytes = [] try: self.cubic_client.connect() time.sleep(0.1) read = self.cubic_client.read_input_registers(0, 24, unit=1) for value in read.registers: recieve_bytes.append(hex(value)) if self.DEBUG: context_logger.debug_with_context( "Dust", f"CUBIC_PM6303 RX:{recieve_bytes}" ) self.cubic_client.close() return recieve_bytes except Exception as e: context_logger.error_with_context("Dust", f"CUBIC_PM6303 getData:{e}") self.cubic_client.close() return recieve_bytes
[docs] def set_pump_stop_interval(self, interval_minutes: int) -> bool: """Set the pump stop interval on the Cubic PM6303 sensor. Args: interval_minutes: Pump interval in minutes (0--10000). Returns: ``True`` if the command succeeded, ``False`` otherwise. """ if not (0 <= interval_minutes <= 10000): context_logger.error_with_context( "Dust", "CUBIC_PM6303 set_pump_interval: Interval must be between 1 and 60 minutes.", ) return False try: self.cubic_client.connect() time.sleep(0.1) # Assuming the pump interval is set by writing to register 30 self.cubic_client.write_register(13, interval_minutes, unit=1) resp = self.cubic_client.read_input_registers(30, 1, unit=1) context_logger.debug_with_context( "Dust", f"Pump Interval Set Response: {resp.registers}" ) if self.DEBUG: context_logger.debug_with_context( "Dust", f"CUBIC_PM6303 Pump interval set to {interval_minutes} minutes.", ) self.cubic_client.close() return True except Exception as e: context_logger.error_with_context( "Dust", f"CUBIC_PM6303 set_pump_interval:{e}" ) self.cubic_client.close() return False
[docs] def micros(self) -> int: """Return the current time in microseconds since the epoch. Returns: Integer microsecond timestamp. """ return int(time.time() * 1000000)
[docs] def millis(self) -> int: """Return the current time in milliseconds since the epoch. Returns: Integer millisecond timestamp. """ return int(time.time() * 1000)
if __name__ == "__main__": cubic = CubicPM6303() cubic.initialize() # cubic.DEBUG = True while 1: data = cubic.getPM() context_logger.debug_with_context("Dust", f"Cubic data {data}") time.sleep(10)