"""Driver for the Met One BAM (Beta Attenuation Monitor) particulate matter sensor.
Communicates over RS-232 serial (UART) using a wake-then-query protocol that
mirrors the original Node-RED integration flow. The BAM reports PM
concentration, raw PM counts, temperature, humidity, pressure, and two
auxiliary temperatures via a CSV response.
Typical usage::
bam = BAM()
if bam.initialize(port="/dev/ttyUSB0", baud=115200):
data = bam.getPM(send_command=True)
"""
import time
import serial
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class BAM:
"""Driver for Met One BAM (Beta Attenuation Monitor) sensors.
Protocol (matching Node-RED flow):
1. Send wake command (\\r\\n\\r\\n\\r\\n)
2. Wait 10 seconds for BAM to respond
3. Send data request ("4\\r\\n")
4. Read 8 lines of response
5. Line[4] = device ID, Line[7] = CSV data
CSV fields (index): [0]?, [1]=PM conc, [2]=PM raw, [3-7]=?,
[8]=temp, [9]=humidity, [10]=pressure,
[11]=t1, [12]=t2
"""
WAKE_COMMAND = "\r\n\r\n\r\n"
DATA_REQUEST = "4\r\n"
WAKE_DELAY = 10 # seconds, matches Node-RED delay node
EXPECTED_LINES = 8
[docs]
def __init__(self) -> None:
"""Initialize BAM driver with default data fields and no serial connection."""
self.DEBUG = False
self._serial: serial.Serial | None = None
self.bam_id: str = ""
self.data: dict[str, float] = {
"pm": 0.0,
"pm_raw": 0.0,
"temp": 0.0,
"humidity": 0.0,
"pressure": 0.0,
"t1": 0.0,
"t2": 0.0,
}
[docs]
def initialize(self, port: str = "/dev/ttyUSB0", baud: int = 115200) -> bool:
"""Open the serial connection to the BAM sensor.
Args:
port: Serial device path (e.g. ``/dev/ttyUSB0``).
baud: Baud rate for the RS-232 link.
Returns:
True if the serial port was opened successfully, False otherwise.
"""
try:
self._serial = serial.Serial(
port=port,
baudrate=baud,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=10,
write_timeout=5,
)
context_logger.info_with_context("BAM", f"Initialized on {port} @ {baud}")
return True
except Exception as e:
context_logger.error_with_context("BAM", f"Failed to initialize: {e}")
return False
[docs]
def getPM(self, send_command: bool = False) -> dict[str, float]:
"""Return the latest PM and environmental readings.
Args:
send_command: When True, query the BAM for fresh data before
returning. When False, return the last cached readings.
Returns:
Dictionary with keys ``pm``, ``pm_raw``, ``temp``, ``humidity``,
``pressure``, ``t1``, and ``t2``.
"""
if send_command:
self._query_bam()
return self.data
def _query_bam(self) -> None:
if self._serial is None:
context_logger.error_with_context("BAM", "Serial port not initialized")
return
try:
if not self._serial.is_open:
self._serial.open()
# Flush input buffer
self._serial.reset_input_buffer()
# Step 1: Send wake command
for ch in self.WAKE_COMMAND:
self._serial.write(ch.encode("ascii"))
if self.DEBUG:
context_logger.debug_with_context("BAM", "Sent wake command")
# Step 2: Wait for BAM to wake up
time.sleep(self.WAKE_DELAY)
# Drain any wake response
while self._serial.in_waiting > 0:
self._serial.readline()
time.sleep(0.01)
# Step 3: Send data request
for ch in self.DATA_REQUEST:
self._serial.write(ch.encode("ascii"))
if self.DEBUG:
context_logger.debug_with_context("BAM", "Sent data request")
# Wait for response to arrive
time_prev = int(time.time() * 1000)
timeout_ms = 10000
while self._serial.in_waiting <= 0:
if int(time.time() * 1000) - time_prev > timeout_ms:
context_logger.error_with_context("BAM", "Timeout waiting for data")
return
time.sleep(0.5)
# Step 4: Read response lines
lines: list[str] = []
for _ in range(self.EXPECTED_LINES):
line = self._serial.readline().decode("ascii", errors="ignore").strip()
lines.append(line)
if self.DEBUG:
context_logger.debug_with_context("BAM", f"RX line[{len(lines) - 1}]: {line}")
time.sleep(0.01)
if len(lines) < self.EXPECTED_LINES:
context_logger.error_with_context(
"BAM",
f"Incomplete response: got {len(lines)} lines, expected {self.EXPECTED_LINES}",
)
return
# Step 5: Parse
self.bam_id = lines[4]
self._parse_csv(lines[7])
except Exception as e:
context_logger.error_with_context("BAM", f"Query failed: {e}")
def _parse_csv(self, csv_line: str) -> None:
try:
fields = csv_line.split(",")
if len(fields) < 13:
context_logger.error_with_context(
"BAM",
f"CSV has {len(fields)} fields, expected at least 13: {csv_line}",
)
return
self.data["pm"] = self._safe_float(fields[1])
self.data["pm_raw"] = self._safe_float(fields[2])
self.data["temp"] = self._safe_float(fields[8])
self.data["humidity"] = self._safe_float(fields[9])
self.data["pressure"] = self._safe_float(fields[10])
self.data["t1"] = self._safe_float(fields[11])
self.data["t2"] = self._safe_float(fields[12])
context_logger.info_with_context(
"BAM",
f"ID: {self.bam_id}, PM: {self.data['pm']}, Raw: {self.data['pm_raw']}",
)
except Exception as e:
context_logger.error_with_context("BAM", f"CSV parse failed: {e}")
def _safe_float(self, value: str) -> float:
try:
return float(value.strip())
except (ValueError, AttributeError):
return 0.0
if __name__ == "__main__":
bam = BAM()
bam.DEBUG = True
if bam.initialize(port="/dev/ttyUSB0", baud=115200):
while True:
data = bam.getPM(send_command=True)
context_logger.debug_with_context("BAM", f"BAM ID: {bam.bam_id}")
context_logger.debug_with_context("BAM", f"Data: {data}")
time.sleep(300)