"""Driver for the Oizom custom noise sensor (SAMD-based).
Communicates via UART with a SAMD microcontroller that measures LAeq and LZeq
noise levels. Responses are 8-byte binary frames.
Hardware:
Interface: UART (serial)
Port: /dev/ttyACM0 (typical)
Baud rate: 115200
Protocol: ASCII command -> 8-byte binary response frame
Typical usage::
import serial
from drivers.Noise.Noise import Noise
ser = serial.Serial(port="/dev/ttyACM0", baudrate=115200, timeout=2)
noise = Noise()
noise.initialize(ser, {"en": 1})
reading = noise.get_noise()
Note:
Requires ``pyserial`` for serial communication.
"""
import math
import time
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class Noise:
"""Driver for the SAMD-based noise level sensor.
Sends ``NOISE?\\r\\n`` commands over UART and parses 8-byte binary response
frames containing LAeq (A-weighted) and LZeq (Z-weighted) noise levels.
Attributes:
FRAME_LEN: Expected response frame length in bytes.
TIMEOUT: Maximum wait time for a complete frame in milliseconds.
ser: Open ``serial.Serial`` instance for UART communication.
LAeq: Accumulated A-weighted Leq readings for energy averaging.
LZeq: Accumulated Z-weighted Leq readings for energy averaging.
debug: Whether verbose debug logging is enabled.
part_number: Hardware part number identifier.
"""
[docs]
def __init__(self) -> None:
"""Initialize default attributes for the Noise sensor."""
self.FRAME_LEN = 8
self.TIMEOUT = 1600
self.ser = None
self.configuration: dict = {}
self.LAeq: list[float] = []
self.LZeq: list[float] = []
self.debug = False
self.part_number = 41
self.EOL = b"\r\n"
[docs]
def initialize(self, serial_port, configuration: dict) -> bool:
"""Configure the noise sensor with a serial port and device configuration.
Args:
serial_port: An open ``serial.Serial`` instance.
configuration: Device configuration dict. Must contain ``"en"`` key
to enable the sensor. Optional keys: ``"debug"``, ``"pn"``.
Returns:
True if the sensor is enabled and ready, False otherwise.
"""
try:
self.ser = serial_port
self.configuration = configuration
self.debug = configuration.get("debug", False)
self.part_number = configuration.get("pn", 41)
if configuration.get("en", 0):
return True # TODO: 📋 Write perfect init code
return False
except Exception:
context_logger.error_with_context("Noise", "Initialization failed")
return False
[docs]
def send_command(self, command: bytes = b"NOISE?\r\n", delay: float = 0.5) -> bytes | None:
"""Send a command over UART and read the 8-byte frame response.
Args:
command: Raw bytes to transmit. Defaults to ``b"NOISE?\\r\\n"``.
delay: Seconds to wait after sending before reading. Allows the
SAMD microcontroller time to process and respond.
Returns:
The 8-byte response frame, or None on timeout / error.
"""
if self.debug:
context_logger.debug_with_context("Noise", f"Sending command: {command}")
try:
if not self.ser.is_open:
self.ser.open()
# Clear any pending data
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
# Send command
self.ser.write(command)
self.ser.flush()
# Give SAMD time to process and respond
time.sleep(delay)
# Read 8-byte frame response
if self.ser.in_waiting > 0:
buf = bytearray()
start = time.time()
while len(buf) < self.FRAME_LEN:
chunk = self.ser.read(self.FRAME_LEN - len(buf))
if chunk:
buf.extend(chunk)
if self.debug:
context_logger.debug_with_context(
"Noise", f"Read {len(chunk)} bytes: {chunk.hex()}"
)
if time.time() - start > self.TIMEOUT:
if self.debug:
context_logger.debug_with_context(
"Noise",
f"Timeout! Only got {len(buf)} bytes: {buf.hex() if buf else 'empty'}",
)
return None
if self.debug:
context_logger.debug_with_context(
"Noise", f"Complete frame ({len(buf)} bytes): {buf.hex()}"
)
context_logger.debug_with_context(
"Noise", f"Frame breakdown: {[hex(b) for b in buf]}"
)
return bytes(buf)
if self.debug:
context_logger.debug_with_context("Noise", "[Noise] No data in buffer")
return None
except Exception as e:
context_logger.error_with_context("Noise", f"send_command error: {e}")
return None
finally:
# Ensure port is closed even on error
try:
if self.ser and self.ser.is_open:
self.ser.close()
except Exception as close_error:
context_logger.error_with_context(
"Noise", f"[Noise] Error closing serial port: {close_error}"
)
[docs]
def parse_noise_response(self, frame: bytes) -> dict | None:
"""Parse an 8-byte binary frame to extract LAeq and LZeq values.
Frame format: ``[0x01, 0x03, 0x00, LAeq_H, LAeq_L, LZeq_H, LZeq_L, 0x0A]``
Args:
frame: The 8-byte response frame from the sensor.
Returns:
A dict with ``"LAeq"`` and ``"LZeq"`` float values in dB,
or None if the frame is invalid.
"""
laeq = None
lzeq = None
if not frame:
return None
if len(frame) != self.FRAME_LEN:
if self.debug:
context_logger.debug_with_context(
"Noise",
f"Invalid frame length: {len(frame)} (expected {self.FRAME_LEN})",
)
return None
if self.debug:
context_logger.debug_with_context(
"Noise",
f"Checking frame header: [0]={hex(frame[0])}, [1]={hex(frame[1])}, [7]={hex(frame[7])}",
)
# Validate frame format: 0x01 0x03 ... 0x0A
if frame[0] != 0x01 or frame[1] != 0x03 or frame[7] != 0x0A:
if self.debug:
context_logger.debug_with_context("Noise", "Invalid frame format!")
return None
# Extract LAeq and LZeq from bytes 3-6
laeq = ((frame[3] << 8) | frame[4]) / 10.0
lzeq = ((frame[5] << 8) | frame[6]) / 10.0
if self.debug:
context_logger.debug_with_context(
"Noise", f"Parsed -> LAeq: {laeq} dB, LZeq: {lzeq} dB"
)
return {"LAeq": laeq, "LZeq": lzeq}
[docs]
def get_noise(self) -> dict | None:
"""Query the sensor for a noise reading and accumulate results.
Sends the default ``NOISE?`` command, parses the response, and appends
valid LAeq/LZeq values to the internal accumulation lists.
Returns:
A dict with ``"LAeq"`` and ``"LZeq"`` float values, or None on failure.
"""
try:
if not self.ser:
context_logger.debug_with_context(
"Noise", "Serial port not initialized"
)
return None
frame = self.send_command()
if frame is None:
return None
result = self.parse_noise_response(frame)
if result is not None:
laeq = result.get("LAeq")
lzeq = result.get("LZeq")
if laeq is not None:
self.LAeq.append(laeq)
if lzeq is not None:
self.LZeq.append(lzeq)
return result
except Exception as e:
context_logger.error_with_context("Noise", f"get_noise: {e}")
return None
[docs]
def energy_average(self) -> float:
"""Compute the energy-averaged (logarithmic) mean of accumulated LAeq values.
Uses the formula ``10 * log10(mean(10^(dB/10)))`` which is the acoustically
correct way to average decibel values. Falls back to arithmetic mean on
numerical errors.
Returns:
The energy-averaged LAeq value in dB, rounded to 2 decimal places.
"""
energy_sum = 0.0
energy_avg = 0.0
result = 0.0
try:
values = self.LAeq
# Acoustic energy averaging: 10*log10(mean(10^(dB/10)))
max_val = max(values) # For numerical stability
for val in values:
energy_sum += 10 ** ((val - max_val) / 10.0)
energy_avg = energy_sum / len(values)
if energy_avg <= 0:
return 0.0
result = max_val + 10.0 * math.log10(energy_avg)
return round(result, 2)
except (OverflowError, ValueError, ZeroDivisionError):
# Fallback to arithmetic mean
value = sum(values) / len(values)
return round(value, 2)
[docs]
def getSensorReading(self) -> dict:
"""Get the latest noise reading mapped to configured parameter codes.
Returns:
A dict mapping parameter short-codes to their current values.
Empty dict on failure.
"""
result = {}
try:
readings = self.get_noise()
if readings is None:
return result
laeq_value = readings.get("LAeq")
lzeq_value = readings.get("LZeq")
for parameter in self.configuration.get("parameters", []):
pm = parameter.get("pm", 0)
if pm == 1 and laeq_value is not None:
result[parameter["sc"]] = laeq_value
if pm == 2 and len(self.LAeq) > 0:
result[parameter["sc"]] = max(self.LAeq)
if pm == 3 and len(self.LAeq) > 0:
result[parameter["sc"]] = min(self.LAeq)
return result
except Exception as e:
context_logger.error_with_context("Noise", f"getSensorReading: {e}")
return result
[docs]
def putSensorValue(self, result: dict | None = None) -> dict:
"""Aggregate accumulated noise data and reset internal buffers.
Computes energy-averaged LAeq, max LAeq, and min LAeq from the
readings collected since the last call, then clears the buffers.
Args:
result: Optional dict to populate with aggregated values.
Creates a new dict if None.
Returns:
The result dict populated with aggregated noise parameters.
"""
if self.debug:
context_logger.debug_with_context("Noise", "Aggregating Noise data...")
if result is None:
result = {}
for parameter in self.configuration.get("parameters", []):
pm = parameter.get("pm", 0)
if len(self.LAeq) > 0:
if pm == 1:
self.old_laeq = self.energy_average()
result[parameter["sc"]] = self.old_laeq
if pm == 2:
self.old_max_laeq = max(self.LAeq)
result[parameter["sc"]] = self.old_max_laeq
if pm == 3:
self.old_min_laeq = min(self.LAeq)
result[parameter["sc"]] = self.old_min_laeq
else:
if pm == 1:
result[parameter["sc"]] = self.old_laeq
if pm == 2:
result[parameter["sc"]] = self.old_max_laeq
if pm == 3:
result[parameter["sc"]] = self.old_min_laeq
# Clear lists for the next aggregation period
self.LAeq.clear()
self.LZeq.clear()
return result
if __name__ == "__main__":
import serial
context_logger.debug_with_context(
"Noise", "[Noise Test] Starting noise sensor test on /dev/ttyACM0"
)
try:
# Open serial port
ser = serial.Serial(port="/dev/ttyACM0", baudrate=115200, timeout=2)
context_logger.debug_with_context(
"Noise", f"[Noise Test] Serial port opened: {ser.name}"
)
# Initialize noise sensor
noise = Noise()
config = {"en": 1}
if noise.initialize(ser, config):
context_logger.debug_with_context(
"Noise", "[Noise Test] ✅ Noise sensor initialized"
)
else:
context_logger.error_with_context(
"Noise", "[Noise Test] ❌ Failed to initialize"
)
exit(1)
# Performance tracking
response_times = []
byte_times = []
# Test loop
context_logger.debug_with_context(
"Noise", "[Noise Test] Starting continuous readings (Ctrl+C to stop)..."
)
iteration = 0
while True:
iteration += 1
print(f"\n{'=' * 60}")
print(f"Iteration #{iteration}")
print(f"{'=' * 60}")
# Clear buffers
ser.reset_input_buffer()
ser.reset_output_buffer()
# Measure command send time
cmd_start = time.time()
ser.write(b"NOISE?\r\n")
ser.flush()
cmd_end = time.time()
cmd_time = (cmd_end - cmd_start) * 1000
# Measure time to first byte
first_byte_start = time.time()
first_byte = ser.read(1)
first_byte_time = (time.time() - first_byte_start) * 1000
if not first_byte:
print("⚠️ No response received!")
time.sleep(2)
continue
# Read remaining bytes and measure per-byte timing
frame = bytearray(first_byte)
byte_timings = [first_byte_time]
for i in range(7):
byte_start = time.time()
byte = ser.read(1)
byte_time = (time.time() - byte_start) * 1000
if byte:
frame.extend(byte)
byte_timings.append(byte_time)
total_response_time = (time.time() - cmd_start) * 1000
# Performance metrics
print("\n[Performance Metrics]")
print(f" Command send time: {cmd_time:.2f} ms")
print(f" Time to first byte: {first_byte_time:.1f} ms")
print(f" Total response time: {total_response_time:.1f} ms")
print(f" Bytes received: {len(frame)}")
if len(byte_timings) == 8:
print("\n[Per-Byte Timing (ms)]")
for idx, bt in enumerate(byte_timings):
print(f" Byte {idx}: {bt:.2f} ms")
print(f" Sum of byte times: {sum(byte_timings):.1f} ms")
print(f" Avg time per byte: {sum(byte_timings) / 8:.2f} ms")
print(
f" Min/Max byte time: {min(byte_timings):.2f} / {max(byte_timings):.2f} ms"
)
# Track statistics
response_times.append(total_response_time)
byte_times.extend(byte_timings)
# Data analysis
print("\n[Data Analysis]")
print(f" Raw hex: {frame.hex()}")
print(f" Raw bytes: {[hex(b) for b in frame]}")
if len(frame) == 8:
result = noise.parse_noise_response(bytes(frame))
if result:
print(
f"\n✅ LAeq: {result['LAeq']:.1f} dB | LZeq: {result['LZeq']:.1f} dB"
)
else:
print("\n❌ Invalid frame format")
else:
print("\n⚠️ Incomplete frame")
# Running statistics
if len(response_times) > 1:
print(f"\n[Running Statistics - {len(response_times)} samples]")
print(
f" Avg response time: {sum(response_times) / len(response_times):.1f} ms"
)
print(f" Min response time: {min(response_times):.1f} ms")
print(f" Max response time: {max(response_times):.1f} ms")
print(f" Suggested timeout: {max(response_times) * 1.5:.0f} ms")
time.sleep(2)
except serial.SerialException as e:
print(f"\n[Noise Test] ❌ Serial error: {e}")
except KeyboardInterrupt:
print(f"\n\n{'=' * 60}")
print("[Noise Test] Stopped by user")
if response_times:
print(f"\n[Final Statistics - {len(response_times)} samples]")
print(
f" Average response time: {sum(response_times) / len(response_times):.1f} ms"
)
print(f" Min response time: {min(response_times):.1f} ms")
print(f" Max response time: {max(response_times):.1f} ms")
print(f" Recommended timeout: {max(response_times) * 1.5:.0f} ms")
except Exception as e:
print(f"\n[Noise Test] ❌ Error: {e}")
import traceback
traceback.print_exc()
finally:
if "ser" in locals() and ser.is_open:
ser.close()
print("\n[Noise Test] Serial port closed")