Source code for OzWrapper.OzVisible.OzVisible

"""Visible light sensor wrapper.

Communicates with a SAMD-based co-processor over UART (via the OzLan driver)
to read ambient visible-light intensity measurements through the GenericSensor
contract.
"""

import json
import time
from typing import ClassVar

from drivers.OzLan import OzLan
from SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzVisible(GenericSensor): """Wrapper for visible-light sensors via a SAMD co-processor. Reads ambient visible-light intensity from a SAMD-connected sensor through serial JSON commands. Attributes: configuration: List of sensor config dicts from the Gateway. v_visible: Nested list holding visible-light readings. visible_port: OzLan serial port instance for SAMD communication. """ configuration: ClassVar[dict] = {} v_visible: ClassVar[list] = [] visible_port: ClassVar[OzLan] = None
[docs] def __init__(self) -> None: """Initialise the OzVisible wrapper with default state.""" super().__init__()
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialise all visible-light sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. init_value: Mutable dict updated with per-sensor init status. Returns: True (always returns True for SAMD-based sensors). """ self.configuration = config context_logger.debug_with_context("Visible", "Initializing OzVisible sensors") _samd_sensor_init = [] for sensor in self.configuration: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) _samd_sensor_init.append(sensor["init"]) init_value.update({"visible": _samd_sensor_init}) return True # send confirmation to main file
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialise a single visible-light sensor by part number. Opens the serial connection to the SAMD and takes an initial reading. Args: sensor: Single sensor configuration dict containing part number, enable flag, and GPIO/UART settings. Returns: 1 if the SAMD reports the sensor is alive, 0 or -1 otherwise. """ _success = -1 value = [] if sensor[self.partNumber] == 311 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration try: val = [0] init = self.initializeVisible(sensor) time.sleep(3) if init: val = self.getVisibleSensorReading(sensor[self.partNumber], True) value.append(val) _success = init except Exception as e: context_logger.error_with_context("Visible", f"initializeSensor: {e}") _success = 0 self.v_visible.append(value) return _success
[docs] def initializeVisible(self, sensor: dict) -> int: """Open the serial connection to the SAMD and verify sensor liveness. Args: sensor: Sensor config dict with GPIO/UART port and baud settings. Returns: 1 if the SAMD reports the visible sensor is alive, 0 otherwise. """ port = "/dev/ttyACM0" baud = 115200 if "gpio" in sensor: if "port" in sensor["gpio"]: port = sensor["gpio"]["port"] if "baud" in sensor["gpio"]: baud = sensor["gpio"]["baud"] live = 0 command = {"pid": "config", "pay": {"visible": 1}} try: self.visible_port = OzLan(port, baud) self.visible_port.send_command(command) # time.sleep(1) command = { "pid": "visible", "data": 2, # 1 start noise | 2 noise check alive or not | 3 read samd reading } self.visible_port.send_command(command, True) data = self.visible_port.read_response() data = data[1:-3] if len(data) > 3: data = data.decode("utf-8") json_value = json.loads(data) live = json_value["live"] context_logger.debug_with_context("Visible", f"activate : {live}") except Exception as e: context_logger.error_with_context("Visible", f"initializeVisible: {e}") return live
[docs] def getSensorReading(self) -> None: """Trigger a visible-light reading from all initialised sensors.""" for sensor in self.configuration: if sensor["init"] == 1: self.getVisibleSensorReading(sensor[self.partNumber])
[docs] def putSensorValue(self, value: dict) -> dict: """Read final visible-light values and populate the output dict. Args: value: Mutable output dict to populate with readings. Returns: The updated output dict. """ for sensor in self.configuration: if sensor["init"] == 1: visible_value = self.getVisibleSensorReading(sensor[self.partNumber], True) for Y, parameters in enumerate(sensor["parameters"]): value[parameters["sc"]] = visible_value[Y] return value
[docs] def getVisibleSensorReading(self, partNo: int, flag: bool = False) -> int: """Conditionally read visible-light data from the SAMD. Args: partNo: Hardware part number identifying the sensor model. flag: If True, perform the actual read; otherwise return 0. Returns: Visible-light value when flag is True, 0 otherwise. """ if flag: return self.getVisible(partNo) return 0
[docs] def getVisible(self, partNo: int) -> int: """Read visible-light intensity from the SAMD co-processor. Args: partNo: Hardware part number (311=visible-light sensor). Returns: Visible-light intensity value, or 0 on error. """ if partNo == 311: command = {"pid": "visible", "data": 1} vs = 0 try: self.visible_port.send_command(command, True) data = self.visible_port.read_response() data = data[1:-3] if len(data) > 3: data = data.decode("utf-8") data = json.loads(data) vs = data["vs"] context_logger.debug_with_context("Visible", f"getVisible: {vs}") except Exception as e: context_logger.error_with_context("Visible", f"getVisible: {e}") return vs return 0
if __name__ == "__main__": import os dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, "visible.config.json") with open(file_name) as configFile: configJson = configFile.read() config = json.loads(configJson) context_logger.info_with_context("Visible", f"Config loaded: {config}") visible = OzVisible() visible.initialize(config["visible"], {})