"""Noise level sensor wrapper.
Communicates with a SAMD-based co-processor over UART (via the OzLan driver)
to read ambient noise-level measurements (average, max, min dB) through the
GenericSensor contract.
"""
import json
import time
from typing import ClassVar
from drivers.OzLan import OzLan
from SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzNoise(GenericSensor):
"""Wrapper for noise-level sensors via a SAMD co-processor.
Reads average, maximum, and minimum noise levels (dB) from a
SAMD-connected microphone sensor through serial JSON commands.
Attributes:
configuration: List of sensor config dicts from the Gateway.
v_noise: Nested list holding noise reading arrays per sensor.
noise_port: OzLan serial port instance for SAMD communication.
debug: Whether debug mode is active for raw SAMD readings.
"""
configuration: ClassVar[dict] = {}
v_noise: ClassVar[list[list[int]]] = []
noise_port: OzLan | None = None
debug: bool = False
[docs]
def __init__(self) -> None:
"""Initialise the OzNoise wrapper with default state."""
super().__init__()
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialise all noise sensors listed in the Gateway config.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Mutable dict updated with per-sensor init status.
Returns:
True (always returns True for SAMD-based sensors).
"""
self.configuration = config
context_logger.info_with_context("Noise", f"Config: {self.configuration}")
_samd_sensor_init = []
for sensor in self.configuration:
if self.partNumber in sensor:
sensor["init"] = self.initializeSensor(sensor)
_samd_sensor_init.append(sensor["init"])
context_logger.info_with_context("Noise", f"Init: {sensor['init']}")
init_value.update({"noise": _samd_sensor_init})
return True
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialise a single noise sensor by part number.
Opens the serial connection to the SAMD, sends the noise
configuration, and takes an initial reading.
Args:
sensor: Single sensor configuration dict containing part number,
enable flag, and GPIO/UART settings.
Returns:
1 if the SAMD reports the sensor is alive, 0 otherwise.
"""
_success = False
value = []
if sensor[self.partNumber] == 41 and sensor["en"] == self.baseConfig["oz_enable"]:
try:
val = [0, 0, 0]
init = self.initializeNoise(sensor)
if init:
val = self.getNoiseSensorReading(sensor[self.partNumber], True)
value.append(val)
_success = init
except Exception as e:
context_logger.error_with_context("Noise", f"initializeSensor: {e}")
_success = False
self.v_noise.append(value)
return int(_success)
[docs]
def initializeNoise(self, sensor: dict) -> int:
"""Open the serial connection to the SAMD and verify sensor liveness.
Sends a noise configuration command followed by a liveness check.
Args:
sensor: Sensor config dict with GPIO/UART port, baud, and
optional debug settings.
Returns:
1 if the SAMD reports the noise sensor is alive, 0 otherwise.
"""
port = "/dev/ttyACM0"
baud = 115200
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = sensor["gpio"]["port"]
if "baud" in sensor["gpio"]:
baud = sensor["gpio"]["baud"]
if "debug" in sensor["gpio"]:
self.debug = True if sensor["gpio"]["debug"] == 1 else False
live = 0
command = {"pid": "config", "pay": {"noise": 1}}
try:
self.noise_port = OzLan(port, baud)
self.noise_port.send_command(command)
time.sleep(1)
command = {
"pid": "noise",
"data": 2, # 1 start noise | 2 noise check alive or not | 3 read samd reading
}
self.noise_port.send_command(command, True)
data = self.noise_port.read_response()
# remove #, \r\n from ending and remove ~ from beginning currently some things are unsupported
data = data[1:-3]
if len(data) > 3:
data = data.decode("utf-8")
json_value = json.loads(data)
live = json_value["live"]
context_logger.info_with_context("Noise", f"activate : {live}")
except Exception as e:
context_logger.error_with_context("Noise", f"initializeNoise: {e}")
return live
[docs]
def getSensorReading(self) -> None:
"""Trigger a noise reading from all initialised sensors.
This method does not return data directly; values are collected
in ``putSensorValue``.
"""
for sensor in self.configuration:
if sensor["init"] == 1:
self.getNoiseSensorReading(sensor[self.partNumber])
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Read final noise values and populate the output dict.
Args:
value: Mutable output dict to populate with noise readings.
Returns:
The updated output dict.
"""
for sensor in self.configuration:
try:
if sensor["init"] == 1:
noise_value = self.getNoiseSensorReading(sensor[self.partNumber], True)
for Y, parameters in enumerate(sensor["parameters"]):
value[parameters["sc"]] = round(float(noise_value[Y]), 2)
except Exception as e:
context_logger.error_with_context("Noise", f"putSensorValue: {e}")
return value
[docs]
def getNoise(self, partNo: int) -> list[int]:
"""Read noise levels (avg, max, min) from the SAMD co-processor.
Args:
partNo: Hardware part number (41=noise sensor).
Returns:
List of [average, max, min] noise levels in dB.
"""
if partNo == 41:
val = 0
lmax = 0
lmin = 0
try:
command = {"pid": "noise", "data": 1}
if self.debug:
command = {"pid": "noise", "data": 3}
self.noise_port.send_command(command, True)
data = self.noise_port.read_response()
data = data[1:-3]
if len(data) > 3:
data = data.decode("utf-8")
data = json.loads(data)
val = data["avg"]
lmax = data["max"]
lmin = data["min"]
context_logger.info_with_context("Noise", f"getNoise: {val}")
except Exception as e:
context_logger.error_with_context("Noise", f"getNoise: {e}")
return [val, lmax, lmin]
return [0, 0, 0]
[docs]
def getNoiseSensorReading(self, partNo: int, flag: bool = False) -> list[int]:
"""Conditionally read noise data from the SAMD.
Args:
partNo: Hardware part number identifying the sensor model.
flag: If True, perform the actual read; otherwise return zeroes.
Returns:
List of [average, max, min] noise levels when flag is True,
[0, 0, 0] otherwise.
"""
if flag:
return self.getNoise(partNo)
return [0, 0, 0]
if __name__ == "__main__":
import os
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, "noise.config.json")
with open(file_name) as configFile:
configJson = configFile.read()
config = json.loads(configJson)
context_logger.info_with_context("Noise", f"Config: {config}")
noise = OzNoise()
noise.initialize(config["noise"], {})