#!/usr/bin/python
"""Driver for the Cubic PM6303 particulate matter sensor (Modbus RTU).
Reads PM1, PM2.5, PM4.25, PM10, and TSP mass concentrations plus airflow
rate over a Modbus RTU serial link. Also supports configuring the pump
stop interval.
Hardware:
Interface: Modbus RTU (RS-485 / UART), 9600 baud
Default port: /dev/ttyAMA2
Supply: 5 V (with integrated pump)
Typical usage::
sensor = CubicPM6303()
sensor.initialize("/dev/ttyAMA2", baud=9600)
data = sensor.getPM(sendCommand=True)
Note:
Requires the ``pymodbus`` package (legacy sync API).
"""
import time
from typing import ClassVar, Literal
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class CubicPM6303:
"""Modbus RTU driver for the Cubic PM6303 particulate matter sensor.
Attributes:
DEBUG: When ``True``, enables verbose Modbus-level logging.
data: Most recent PM, particle-count, and flow readings.
pump_stop_interval: Pump stop interval in minutes written at init.
"""
DEBUG = False
_timeout_ = 8
TIMEOUT = _timeout_ * 1000
cubic_client = None
data: ClassVar[dict[str, float]] = {
"pm2_5": 0.0,
"pm10": 0.0,
"pm100": 0.0,
"pm1": 0.0,
"pm4_25": 0.0,
"p1": 0.0,
"p2": 0.0,
"p3": 0.0,
"p4": 0.0,
"p5": 0.0,
"p6": 0.0,
"flow": 0.0,
}
pump_stop_interval = 0
[docs]
def __init__(self) -> None:
"""Create an uninitialised CubicPM6303 instance.
Call :meth:`initialize` to open the Modbus connection.
"""
return
[docs]
def initialize(
self, cubic_port: str = "/dev/ttyAMA2", baud: int = 9600
) -> Literal[1]:
"""Open the Modbus serial connection and configure the pump.
Args:
cubic_port: Serial device path for the Modbus bus.
baud: Baud rate.
Returns:
Always ``1`` on success.
"""
self.cubic_client = ModbusClient(
method="rtu", port=cubic_port, baudrate=baud, timeout=1
)
self.set_pump_stop_interval(self.pump_stop_interval)
return 1
[docs]
def getPM(self, sendCommand: bool = False) -> dict[str, float]:
"""Read particulate-matter concentrations and flow rate.
Args:
sendCommand: If ``True``, query the sensor before returning data.
Returns:
Dictionary with keys ``pm1``, ``pm2_5``, ``pm4_25``, ``pm10``,
``pm100`` (mass concentrations) and ``flow`` (L/min).
"""
if sendCommand:
cubic_data = self.getData()
buffer = []
self.data = {
"pm2_5": 0.0,
"pm10": 0.0,
"pm100": 0.0,
"pm1": 0.0,
"pm4_25": 0.0, # PM4_25 added for new sensors
"p1": 0.0,
"p2": 0.0,
"p3": 0.0,
"p4": 0.0,
"p5": 0.0,
"p6": 0.0,
"flow": 0.0,
}
for _data in cubic_data:
buffer.append(int(_data, 16))
if len(buffer) > 23:
# '0x19', '0x0', '0x0', '0x0', '0x0', '0x0', '0x2f', '0x0', '0xc', '0x0', '0x14', '0x0', '0x22', '0x0', '0x23']
self.data["pm2_5"] = buffer[9] * 256**2 + buffer[10]
self.data["pm10"] = buffer[13] * 256**2 + buffer[14]
self.data["pm100"] = buffer[5] * 256**2 + buffer[6]
self.data["pm1"] = buffer[7] * 256**2 + buffer[8]
self.data["pm4_25"] = buffer[11] * 256**2 + buffer[12]
self.data["flow"] = buffer[23] / 100
return self.data
[docs]
def getData(self) -> list[str]:
"""Read 24 Modbus input registers and return them as hex strings.
Returns:
List of hex-string register values, or an empty list on error.
"""
recieve_bytes = []
try:
self.cubic_client.connect()
time.sleep(0.1)
read = self.cubic_client.read_input_registers(0, 24, unit=1)
for value in read.registers:
recieve_bytes.append(hex(value))
if self.DEBUG:
context_logger.debug_with_context(
"Dust", f"CUBIC_PM6303 RX:{recieve_bytes}"
)
self.cubic_client.close()
return recieve_bytes
except Exception as e:
context_logger.error_with_context("Dust", f"CUBIC_PM6303 getData:{e}")
self.cubic_client.close()
return recieve_bytes
[docs]
def set_pump_stop_interval(self, interval_minutes: int) -> bool:
"""Set the pump stop interval on the Cubic PM6303 sensor.
Args:
interval_minutes: Pump interval in minutes (0--10000).
Returns:
``True`` if the command succeeded, ``False`` otherwise.
"""
if not (0 <= interval_minutes <= 10000):
context_logger.error_with_context(
"Dust",
"CUBIC_PM6303 set_pump_interval: Interval must be between 1 and 60 minutes.",
)
return False
try:
self.cubic_client.connect()
time.sleep(0.1)
# Assuming the pump interval is set by writing to register 30
self.cubic_client.write_register(13, interval_minutes, unit=1)
resp = self.cubic_client.read_input_registers(30, 1, unit=1)
context_logger.debug_with_context(
"Dust", f"Pump Interval Set Response: {resp.registers}"
)
if self.DEBUG:
context_logger.debug_with_context(
"Dust",
f"CUBIC_PM6303 Pump interval set to {interval_minutes} minutes.",
)
self.cubic_client.close()
return True
except Exception as e:
context_logger.error_with_context(
"Dust", f"CUBIC_PM6303 set_pump_interval:{e}"
)
self.cubic_client.close()
return False
[docs]
def micros(self) -> int:
"""Return the current time in microseconds since the epoch.
Returns:
Integer microsecond timestamp.
"""
return int(time.time() * 1000000)
[docs]
def millis(self) -> int:
"""Return the current time in milliseconds since the epoch.
Returns:
Integer millisecond timestamp.
"""
return int(time.time() * 1000)
if __name__ == "__main__":
cubic = CubicPM6303()
cubic.initialize()
# cubic.DEBUG = True
while 1:
data = cubic.getPM()
context_logger.debug_with_context("Dust", f"Cubic data {data}")
time.sleep(10)