Source code for OzWrapper.OzNoise.OzNoise

"""Noise level sensor wrapper.

Communicates with a SAMD-based co-processor over UART (via the OzLan driver)
to read ambient noise-level measurements (average, max, min dB) through the
GenericSensor contract.
"""

import json
import time
from typing import ClassVar

from drivers.OzLan import OzLan
from SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzNoise(GenericSensor): """Wrapper for noise-level sensors via a SAMD co-processor. Reads average, maximum, and minimum noise levels (dB) from a SAMD-connected microphone sensor through serial JSON commands. Attributes: configuration: List of sensor config dicts from the Gateway. v_noise: Nested list holding noise reading arrays per sensor. noise_port: OzLan serial port instance for SAMD communication. debug: Whether debug mode is active for raw SAMD readings. """ configuration: ClassVar[dict] = {} v_noise: ClassVar[list[list[int]]] = [] noise_port: OzLan | None = None debug: bool = False
[docs] def __init__(self) -> None: """Initialise the OzNoise wrapper with default state.""" super().__init__()
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialise all noise sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. init_value: Mutable dict updated with per-sensor init status. Returns: True (always returns True for SAMD-based sensors). """ self.configuration = config context_logger.info_with_context("Noise", f"Config: {self.configuration}") _samd_sensor_init = [] for sensor in self.configuration: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) _samd_sensor_init.append(sensor["init"]) context_logger.info_with_context("Noise", f"Init: {sensor['init']}") init_value.update({"noise": _samd_sensor_init}) return True
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialise a single noise sensor by part number. Opens the serial connection to the SAMD, sends the noise configuration, and takes an initial reading. Args: sensor: Single sensor configuration dict containing part number, enable flag, and GPIO/UART settings. Returns: 1 if the SAMD reports the sensor is alive, 0 otherwise. """ _success = False value = [] if sensor[self.partNumber] == 41 and sensor["en"] == self.baseConfig["oz_enable"]: try: val = [0, 0, 0] init = self.initializeNoise(sensor) if init: val = self.getNoiseSensorReading(sensor[self.partNumber], True) value.append(val) _success = init except Exception as e: context_logger.error_with_context("Noise", f"initializeSensor: {e}") _success = False self.v_noise.append(value) return int(_success)
[docs] def initializeNoise(self, sensor: dict) -> int: """Open the serial connection to the SAMD and verify sensor liveness. Sends a noise configuration command followed by a liveness check. Args: sensor: Sensor config dict with GPIO/UART port, baud, and optional debug settings. Returns: 1 if the SAMD reports the noise sensor is alive, 0 otherwise. """ port = "/dev/ttyACM0" baud = 115200 if "gpio" in sensor: if "port" in sensor["gpio"]: port = sensor["gpio"]["port"] if "baud" in sensor["gpio"]: baud = sensor["gpio"]["baud"] if "debug" in sensor["gpio"]: self.debug = True if sensor["gpio"]["debug"] == 1 else False live = 0 command = {"pid": "config", "pay": {"noise": 1}} try: self.noise_port = OzLan(port, baud) self.noise_port.send_command(command) time.sleep(1) command = { "pid": "noise", "data": 2, # 1 start noise | 2 noise check alive or not | 3 read samd reading } self.noise_port.send_command(command, True) data = self.noise_port.read_response() # remove #, \r\n from ending and remove ~ from beginning currently some things are unsupported data = data[1:-3] if len(data) > 3: data = data.decode("utf-8") json_value = json.loads(data) live = json_value["live"] context_logger.info_with_context("Noise", f"activate : {live}") except Exception as e: context_logger.error_with_context("Noise", f"initializeNoise: {e}") return live
[docs] def getSensorReading(self) -> None: """Trigger a noise reading from all initialised sensors. This method does not return data directly; values are collected in ``putSensorValue``. """ for sensor in self.configuration: if sensor["init"] == 1: self.getNoiseSensorReading(sensor[self.partNumber])
[docs] def putSensorValue(self, value: dict) -> dict: """Read final noise values and populate the output dict. Args: value: Mutable output dict to populate with noise readings. Returns: The updated output dict. """ for sensor in self.configuration: try: if sensor["init"] == 1: noise_value = self.getNoiseSensorReading(sensor[self.partNumber], True) for Y, parameters in enumerate(sensor["parameters"]): value[parameters["sc"]] = round(float(noise_value[Y]), 2) except Exception as e: context_logger.error_with_context("Noise", f"putSensorValue: {e}") return value
[docs] def getNoise(self, partNo: int) -> list[int]: """Read noise levels (avg, max, min) from the SAMD co-processor. Args: partNo: Hardware part number (41=noise sensor). Returns: List of [average, max, min] noise levels in dB. """ if partNo == 41: val = 0 lmax = 0 lmin = 0 try: command = {"pid": "noise", "data": 1} if self.debug: command = {"pid": "noise", "data": 3} self.noise_port.send_command(command, True) data = self.noise_port.read_response() data = data[1:-3] if len(data) > 3: data = data.decode("utf-8") data = json.loads(data) val = data["avg"] lmax = data["max"] lmin = data["min"] context_logger.info_with_context("Noise", f"getNoise: {val}") except Exception as e: context_logger.error_with_context("Noise", f"getNoise: {e}") return [val, lmax, lmin] return [0, 0, 0]
[docs] def getNoiseSensorReading(self, partNo: int, flag: bool = False) -> list[int]: """Conditionally read noise data from the SAMD. Args: partNo: Hardware part number identifying the sensor model. flag: If True, perform the actual read; otherwise return zeroes. Returns: List of [average, max, min] noise levels when flag is True, [0, 0, 0] otherwise. """ if flag: return self.getNoise(partNo) return [0, 0, 0]
if __name__ == "__main__": import os dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, "noise.config.json") with open(file_name) as configFile: configJson = configFile.read() config = json.loads(configJson) context_logger.info_with_context("Noise", f"Config: {config}") noise = OzNoise() noise.initialize(config["noise"], {})