Source code for drivers.BAM.BAM

"""Driver for the Met One BAM (Beta Attenuation Monitor) particulate matter sensor.

Communicates over RS-232 serial (UART) using a wake-then-query protocol that
mirrors the original Node-RED integration flow.  The BAM reports PM
concentration, raw PM counts, temperature, humidity, pressure, and two
auxiliary temperatures via a CSV response.

Typical usage::

    bam = BAM()
    if bam.initialize(port="/dev/ttyUSB0", baud=115200):
        data = bam.getPM(send_command=True)
"""

import time

import serial

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class BAM: """Driver for Met One BAM (Beta Attenuation Monitor) sensors. Protocol (matching Node-RED flow): 1. Send wake command (\\r\\n\\r\\n\\r\\n) 2. Wait 10 seconds for BAM to respond 3. Send data request ("4\\r\\n") 4. Read 8 lines of response 5. Line[4] = device ID, Line[7] = CSV data CSV fields (index): [0]?, [1]=PM conc, [2]=PM raw, [3-7]=?, [8]=temp, [9]=humidity, [10]=pressure, [11]=t1, [12]=t2 """ WAKE_COMMAND = "\r\n\r\n\r\n" DATA_REQUEST = "4\r\n" WAKE_DELAY = 10 # seconds, matches Node-RED delay node EXPECTED_LINES = 8
[docs] def __init__(self) -> None: """Initialize BAM driver with default data fields and no serial connection.""" self.DEBUG = False self._serial: serial.Serial | None = None self.bam_id: str = "" self.data: dict[str, float] = { "pm": 0.0, "pm_raw": 0.0, "temp": 0.0, "humidity": 0.0, "pressure": 0.0, "t1": 0.0, "t2": 0.0, }
[docs] def initialize(self, port: str = "/dev/ttyUSB0", baud: int = 115200) -> bool: """Open the serial connection to the BAM sensor. Args: port: Serial device path (e.g. ``/dev/ttyUSB0``). baud: Baud rate for the RS-232 link. Returns: True if the serial port was opened successfully, False otherwise. """ try: self._serial = serial.Serial( port=port, baudrate=baud, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=10, write_timeout=5, ) context_logger.info_with_context("BAM", f"Initialized on {port} @ {baud}") return True except Exception as e: context_logger.error_with_context("BAM", f"Failed to initialize: {e}") return False
[docs] def getPM(self, send_command: bool = False) -> dict[str, float]: """Return the latest PM and environmental readings. Args: send_command: When True, query the BAM for fresh data before returning. When False, return the last cached readings. Returns: Dictionary with keys ``pm``, ``pm_raw``, ``temp``, ``humidity``, ``pressure``, ``t1``, and ``t2``. """ if send_command: self._query_bam() return self.data
def _query_bam(self) -> None: if self._serial is None: context_logger.error_with_context("BAM", "Serial port not initialized") return try: if not self._serial.is_open: self._serial.open() # Flush input buffer self._serial.reset_input_buffer() # Step 1: Send wake command for ch in self.WAKE_COMMAND: self._serial.write(ch.encode("ascii")) if self.DEBUG: context_logger.debug_with_context("BAM", "Sent wake command") # Step 2: Wait for BAM to wake up time.sleep(self.WAKE_DELAY) # Drain any wake response while self._serial.in_waiting > 0: self._serial.readline() time.sleep(0.01) # Step 3: Send data request for ch in self.DATA_REQUEST: self._serial.write(ch.encode("ascii")) if self.DEBUG: context_logger.debug_with_context("BAM", "Sent data request") # Wait for response to arrive time_prev = int(time.time() * 1000) timeout_ms = 10000 while self._serial.in_waiting <= 0: if int(time.time() * 1000) - time_prev > timeout_ms: context_logger.error_with_context("BAM", "Timeout waiting for data") return time.sleep(0.5) # Step 4: Read response lines lines: list[str] = [] for _ in range(self.EXPECTED_LINES): line = self._serial.readline().decode("ascii", errors="ignore").strip() lines.append(line) if self.DEBUG: context_logger.debug_with_context("BAM", f"RX line[{len(lines) - 1}]: {line}") time.sleep(0.01) if len(lines) < self.EXPECTED_LINES: context_logger.error_with_context( "BAM", f"Incomplete response: got {len(lines)} lines, expected {self.EXPECTED_LINES}", ) return # Step 5: Parse self.bam_id = lines[4] self._parse_csv(lines[7]) except Exception as e: context_logger.error_with_context("BAM", f"Query failed: {e}") def _parse_csv(self, csv_line: str) -> None: try: fields = csv_line.split(",") if len(fields) < 13: context_logger.error_with_context( "BAM", f"CSV has {len(fields)} fields, expected at least 13: {csv_line}", ) return self.data["pm"] = self._safe_float(fields[1]) self.data["pm_raw"] = self._safe_float(fields[2]) self.data["temp"] = self._safe_float(fields[8]) self.data["humidity"] = self._safe_float(fields[9]) self.data["pressure"] = self._safe_float(fields[10]) self.data["t1"] = self._safe_float(fields[11]) self.data["t2"] = self._safe_float(fields[12]) context_logger.info_with_context( "BAM", f"ID: {self.bam_id}, PM: {self.data['pm']}, Raw: {self.data['pm_raw']}", ) except Exception as e: context_logger.error_with_context("BAM", f"CSV parse failed: {e}") def _safe_float(self, value: str) -> float: try: return float(value.strip()) except (ValueError, AttributeError): return 0.0
if __name__ == "__main__": bam = BAM() bam.DEBUG = True if bam.initialize(port="/dev/ttyUSB0", baud=115200): while True: data = bam.getPM(send_command=True) context_logger.debug_with_context("BAM", f"BAM ID: {bam.bam_id}") context_logger.debug_with_context("BAM", f"Data: {data}") time.sleep(300)