Source code for drivers.Noise.Noise

"""Driver for the Oizom custom noise sensor (SAMD-based).

Communicates via UART with a SAMD microcontroller that measures LAeq and LZeq
noise levels. Responses are 8-byte binary frames.

Hardware:
    Interface: UART (serial)
    Port: /dev/ttyACM0 (typical)
    Baud rate: 115200
    Protocol: ASCII command -> 8-byte binary response frame

Typical usage::

    import serial
    from drivers.Noise.Noise import Noise

    ser = serial.Serial(port="/dev/ttyACM0", baudrate=115200, timeout=2)
    noise = Noise()
    noise.initialize(ser, {"en": 1})
    reading = noise.get_noise()

Note:
    Requires ``pyserial`` for serial communication.
"""

import math
import time

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class Noise: """Driver for the SAMD-based noise level sensor. Sends ``NOISE?\\r\\n`` commands over UART and parses 8-byte binary response frames containing LAeq (A-weighted) and LZeq (Z-weighted) noise levels. Attributes: FRAME_LEN: Expected response frame length in bytes. TIMEOUT: Maximum wait time for a complete frame in milliseconds. ser: Open ``serial.Serial`` instance for UART communication. LAeq: Accumulated A-weighted Leq readings for energy averaging. LZeq: Accumulated Z-weighted Leq readings for energy averaging. debug: Whether verbose debug logging is enabled. part_number: Hardware part number identifier. """
[docs] def __init__(self) -> None: """Initialize default attributes for the Noise sensor.""" self.FRAME_LEN = 8 self.TIMEOUT = 1600 self.ser = None self.configuration: dict = {} self.LAeq: list[float] = [] self.LZeq: list[float] = [] self.debug = False self.part_number = 41 self.EOL = b"\r\n"
[docs] def initialize(self, serial_port, configuration: dict) -> bool: """Configure the noise sensor with a serial port and device configuration. Args: serial_port: An open ``serial.Serial`` instance. configuration: Device configuration dict. Must contain ``"en"`` key to enable the sensor. Optional keys: ``"debug"``, ``"pn"``. Returns: True if the sensor is enabled and ready, False otherwise. """ try: self.ser = serial_port self.configuration = configuration self.debug = configuration.get("debug", False) self.part_number = configuration.get("pn", 41) if configuration.get("en", 0): return True # TODO: 📋 Write perfect init code return False except Exception: context_logger.error_with_context("Noise", "Initialization failed") return False
[docs] def send_command(self, command: bytes = b"NOISE?\r\n", delay: float = 0.5) -> bytes | None: """Send a command over UART and read the 8-byte frame response. Args: command: Raw bytes to transmit. Defaults to ``b"NOISE?\\r\\n"``. delay: Seconds to wait after sending before reading. Allows the SAMD microcontroller time to process and respond. Returns: The 8-byte response frame, or None on timeout / error. """ if self.debug: context_logger.debug_with_context("Noise", f"Sending command: {command}") try: if not self.ser.is_open: self.ser.open() # Clear any pending data self.ser.reset_input_buffer() self.ser.reset_output_buffer() # Send command self.ser.write(command) self.ser.flush() # Give SAMD time to process and respond time.sleep(delay) # Read 8-byte frame response if self.ser.in_waiting > 0: buf = bytearray() start = time.time() while len(buf) < self.FRAME_LEN: chunk = self.ser.read(self.FRAME_LEN - len(buf)) if chunk: buf.extend(chunk) if self.debug: context_logger.debug_with_context( "Noise", f"Read {len(chunk)} bytes: {chunk.hex()}" ) if time.time() - start > self.TIMEOUT: if self.debug: context_logger.debug_with_context( "Noise", f"Timeout! Only got {len(buf)} bytes: {buf.hex() if buf else 'empty'}", ) return None if self.debug: context_logger.debug_with_context( "Noise", f"Complete frame ({len(buf)} bytes): {buf.hex()}" ) context_logger.debug_with_context( "Noise", f"Frame breakdown: {[hex(b) for b in buf]}" ) return bytes(buf) if self.debug: context_logger.debug_with_context("Noise", "[Noise] No data in buffer") return None except Exception as e: context_logger.error_with_context("Noise", f"send_command error: {e}") return None finally: # Ensure port is closed even on error try: if self.ser and self.ser.is_open: self.ser.close() except Exception as close_error: context_logger.error_with_context( "Noise", f"[Noise] Error closing serial port: {close_error}" )
[docs] def parse_noise_response(self, frame: bytes) -> dict | None: """Parse an 8-byte binary frame to extract LAeq and LZeq values. Frame format: ``[0x01, 0x03, 0x00, LAeq_H, LAeq_L, LZeq_H, LZeq_L, 0x0A]`` Args: frame: The 8-byte response frame from the sensor. Returns: A dict with ``"LAeq"`` and ``"LZeq"`` float values in dB, or None if the frame is invalid. """ laeq = None lzeq = None if not frame: return None if len(frame) != self.FRAME_LEN: if self.debug: context_logger.debug_with_context( "Noise", f"Invalid frame length: {len(frame)} (expected {self.FRAME_LEN})", ) return None if self.debug: context_logger.debug_with_context( "Noise", f"Checking frame header: [0]={hex(frame[0])}, [1]={hex(frame[1])}, [7]={hex(frame[7])}", ) # Validate frame format: 0x01 0x03 ... 0x0A if frame[0] != 0x01 or frame[1] != 0x03 or frame[7] != 0x0A: if self.debug: context_logger.debug_with_context("Noise", "Invalid frame format!") return None # Extract LAeq and LZeq from bytes 3-6 laeq = ((frame[3] << 8) | frame[4]) / 10.0 lzeq = ((frame[5] << 8) | frame[6]) / 10.0 if self.debug: context_logger.debug_with_context( "Noise", f"Parsed -> LAeq: {laeq} dB, LZeq: {lzeq} dB" ) return {"LAeq": laeq, "LZeq": lzeq}
[docs] def get_noise(self) -> dict | None: """Query the sensor for a noise reading and accumulate results. Sends the default ``NOISE?`` command, parses the response, and appends valid LAeq/LZeq values to the internal accumulation lists. Returns: A dict with ``"LAeq"`` and ``"LZeq"`` float values, or None on failure. """ try: if not self.ser: context_logger.debug_with_context( "Noise", "Serial port not initialized" ) return None frame = self.send_command() if frame is None: return None result = self.parse_noise_response(frame) if result is not None: laeq = result.get("LAeq") lzeq = result.get("LZeq") if laeq is not None: self.LAeq.append(laeq) if lzeq is not None: self.LZeq.append(lzeq) return result except Exception as e: context_logger.error_with_context("Noise", f"get_noise: {e}") return None
[docs] def energy_average(self) -> float: """Compute the energy-averaged (logarithmic) mean of accumulated LAeq values. Uses the formula ``10 * log10(mean(10^(dB/10)))`` which is the acoustically correct way to average decibel values. Falls back to arithmetic mean on numerical errors. Returns: The energy-averaged LAeq value in dB, rounded to 2 decimal places. """ energy_sum = 0.0 energy_avg = 0.0 result = 0.0 try: values = self.LAeq # Acoustic energy averaging: 10*log10(mean(10^(dB/10))) max_val = max(values) # For numerical stability for val in values: energy_sum += 10 ** ((val - max_val) / 10.0) energy_avg = energy_sum / len(values) if energy_avg <= 0: return 0.0 result = max_val + 10.0 * math.log10(energy_avg) return round(result, 2) except (OverflowError, ValueError, ZeroDivisionError): # Fallback to arithmetic mean value = sum(values) / len(values) return round(value, 2)
[docs] def getSensorReading(self) -> dict: """Get the latest noise reading mapped to configured parameter codes. Returns: A dict mapping parameter short-codes to their current values. Empty dict on failure. """ result = {} try: readings = self.get_noise() if readings is None: return result laeq_value = readings.get("LAeq") lzeq_value = readings.get("LZeq") for parameter in self.configuration.get("parameters", []): pm = parameter.get("pm", 0) if pm == 1 and laeq_value is not None: result[parameter["sc"]] = laeq_value if pm == 2 and len(self.LAeq) > 0: result[parameter["sc"]] = max(self.LAeq) if pm == 3 and len(self.LAeq) > 0: result[parameter["sc"]] = min(self.LAeq) return result except Exception as e: context_logger.error_with_context("Noise", f"getSensorReading: {e}") return result
[docs] def putSensorValue(self, result: dict | None = None) -> dict: """Aggregate accumulated noise data and reset internal buffers. Computes energy-averaged LAeq, max LAeq, and min LAeq from the readings collected since the last call, then clears the buffers. Args: result: Optional dict to populate with aggregated values. Creates a new dict if None. Returns: The result dict populated with aggregated noise parameters. """ if self.debug: context_logger.debug_with_context("Noise", "Aggregating Noise data...") if result is None: result = {} for parameter in self.configuration.get("parameters", []): pm = parameter.get("pm", 0) if len(self.LAeq) > 0: if pm == 1: self.old_laeq = self.energy_average() result[parameter["sc"]] = self.old_laeq if pm == 2: self.old_max_laeq = max(self.LAeq) result[parameter["sc"]] = self.old_max_laeq if pm == 3: self.old_min_laeq = min(self.LAeq) result[parameter["sc"]] = self.old_min_laeq else: if pm == 1: result[parameter["sc"]] = self.old_laeq if pm == 2: result[parameter["sc"]] = self.old_max_laeq if pm == 3: result[parameter["sc"]] = self.old_min_laeq # Clear lists for the next aggregation period self.LAeq.clear() self.LZeq.clear() return result
if __name__ == "__main__": import serial context_logger.debug_with_context( "Noise", "[Noise Test] Starting noise sensor test on /dev/ttyACM0" ) try: # Open serial port ser = serial.Serial(port="/dev/ttyACM0", baudrate=115200, timeout=2) context_logger.debug_with_context( "Noise", f"[Noise Test] Serial port opened: {ser.name}" ) # Initialize noise sensor noise = Noise() config = {"en": 1} if noise.initialize(ser, config): context_logger.debug_with_context( "Noise", "[Noise Test] ✅ Noise sensor initialized" ) else: context_logger.error_with_context( "Noise", "[Noise Test] ❌ Failed to initialize" ) exit(1) # Performance tracking response_times = [] byte_times = [] # Test loop context_logger.debug_with_context( "Noise", "[Noise Test] Starting continuous readings (Ctrl+C to stop)..." ) iteration = 0 while True: iteration += 1 print(f"\n{'=' * 60}") print(f"Iteration #{iteration}") print(f"{'=' * 60}") # Clear buffers ser.reset_input_buffer() ser.reset_output_buffer() # Measure command send time cmd_start = time.time() ser.write(b"NOISE?\r\n") ser.flush() cmd_end = time.time() cmd_time = (cmd_end - cmd_start) * 1000 # Measure time to first byte first_byte_start = time.time() first_byte = ser.read(1) first_byte_time = (time.time() - first_byte_start) * 1000 if not first_byte: print("⚠️ No response received!") time.sleep(2) continue # Read remaining bytes and measure per-byte timing frame = bytearray(first_byte) byte_timings = [first_byte_time] for i in range(7): byte_start = time.time() byte = ser.read(1) byte_time = (time.time() - byte_start) * 1000 if byte: frame.extend(byte) byte_timings.append(byte_time) total_response_time = (time.time() - cmd_start) * 1000 # Performance metrics print("\n[Performance Metrics]") print(f" Command send time: {cmd_time:.2f} ms") print(f" Time to first byte: {first_byte_time:.1f} ms") print(f" Total response time: {total_response_time:.1f} ms") print(f" Bytes received: {len(frame)}") if len(byte_timings) == 8: print("\n[Per-Byte Timing (ms)]") for idx, bt in enumerate(byte_timings): print(f" Byte {idx}: {bt:.2f} ms") print(f" Sum of byte times: {sum(byte_timings):.1f} ms") print(f" Avg time per byte: {sum(byte_timings) / 8:.2f} ms") print( f" Min/Max byte time: {min(byte_timings):.2f} / {max(byte_timings):.2f} ms" ) # Track statistics response_times.append(total_response_time) byte_times.extend(byte_timings) # Data analysis print("\n[Data Analysis]") print(f" Raw hex: {frame.hex()}") print(f" Raw bytes: {[hex(b) for b in frame]}") if len(frame) == 8: result = noise.parse_noise_response(bytes(frame)) if result: print( f"\n✅ LAeq: {result['LAeq']:.1f} dB | LZeq: {result['LZeq']:.1f} dB" ) else: print("\n❌ Invalid frame format") else: print("\n⚠️ Incomplete frame") # Running statistics if len(response_times) > 1: print(f"\n[Running Statistics - {len(response_times)} samples]") print( f" Avg response time: {sum(response_times) / len(response_times):.1f} ms" ) print(f" Min response time: {min(response_times):.1f} ms") print(f" Max response time: {max(response_times):.1f} ms") print(f" Suggested timeout: {max(response_times) * 1.5:.0f} ms") time.sleep(2) except serial.SerialException as e: print(f"\n[Noise Test] ❌ Serial error: {e}") except KeyboardInterrupt: print(f"\n\n{'=' * 60}") print("[Noise Test] Stopped by user") if response_times: print(f"\n[Final Statistics - {len(response_times)} samples]") print( f" Average response time: {sum(response_times) / len(response_times):.1f} ms" ) print(f" Min response time: {min(response_times):.1f} ms") print(f" Max response time: {max(response_times):.1f} ms") print(f" Recommended timeout: {max(response_times) * 1.5:.0f} ms") except Exception as e: print(f"\n[Noise Test] ❌ Error: {e}") import traceback traceback.print_exc() finally: if "ser" in locals() and ser.is_open: ser.close() print("\n[Noise Test] Serial port closed")