"""Visible light sensor wrapper.
Communicates with a SAMD-based co-processor over UART (via the OzLan driver)
to read ambient visible-light intensity measurements through the GenericSensor
contract.
"""
import json
import time
from typing import ClassVar
from drivers.OzLan import OzLan
from SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzVisible(GenericSensor):
"""Wrapper for visible-light sensors via a SAMD co-processor.
Reads ambient visible-light intensity from a SAMD-connected sensor
through serial JSON commands.
Attributes:
configuration: List of sensor config dicts from the Gateway.
v_visible: Nested list holding visible-light readings.
visible_port: OzLan serial port instance for SAMD communication.
"""
configuration: ClassVar[dict] = {}
v_visible: ClassVar[list] = []
visible_port: ClassVar[OzLan] = None
[docs]
def __init__(self) -> None:
"""Initialise the OzVisible wrapper with default state."""
super().__init__()
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialise all visible-light sensors listed in the Gateway config.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Mutable dict updated with per-sensor init status.
Returns:
True (always returns True for SAMD-based sensors).
"""
self.configuration = config
context_logger.debug_with_context("Visible", "Initializing OzVisible sensors")
_samd_sensor_init = []
for sensor in self.configuration:
if self.partNumber in sensor:
sensor["init"] = self.initializeSensor(sensor)
_samd_sensor_init.append(sensor["init"])
init_value.update({"visible": _samd_sensor_init})
return True # send confirmation to main file
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialise a single visible-light sensor by part number.
Opens the serial connection to the SAMD and takes an initial reading.
Args:
sensor: Single sensor configuration dict containing part number,
enable flag, and GPIO/UART settings.
Returns:
1 if the SAMD reports the sensor is alive, 0 or -1 otherwise.
"""
_success = -1
value = []
if sensor[self.partNumber] == 311 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
try:
val = [0]
init = self.initializeVisible(sensor)
time.sleep(3)
if init:
val = self.getVisibleSensorReading(sensor[self.partNumber], True)
value.append(val)
_success = init
except Exception as e:
context_logger.error_with_context("Visible", f"initializeSensor: {e}")
_success = 0
self.v_visible.append(value)
return _success
[docs]
def initializeVisible(self, sensor: dict) -> int:
"""Open the serial connection to the SAMD and verify sensor liveness.
Args:
sensor: Sensor config dict with GPIO/UART port and baud settings.
Returns:
1 if the SAMD reports the visible sensor is alive, 0 otherwise.
"""
port = "/dev/ttyACM0"
baud = 115200
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = sensor["gpio"]["port"]
if "baud" in sensor["gpio"]:
baud = sensor["gpio"]["baud"]
live = 0
command = {"pid": "config", "pay": {"visible": 1}}
try:
self.visible_port = OzLan(port, baud)
self.visible_port.send_command(command)
# time.sleep(1)
command = {
"pid": "visible",
"data": 2, # 1 start noise | 2 noise check alive or not | 3 read samd reading
}
self.visible_port.send_command(command, True)
data = self.visible_port.read_response()
data = data[1:-3]
if len(data) > 3:
data = data.decode("utf-8")
json_value = json.loads(data)
live = json_value["live"]
context_logger.debug_with_context("Visible", f"activate : {live}")
except Exception as e:
context_logger.error_with_context("Visible", f"initializeVisible: {e}")
return live
[docs]
def getSensorReading(self) -> None:
"""Trigger a visible-light reading from all initialised sensors."""
for sensor in self.configuration:
if sensor["init"] == 1:
self.getVisibleSensorReading(sensor[self.partNumber])
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Read final visible-light values and populate the output dict.
Args:
value: Mutable output dict to populate with readings.
Returns:
The updated output dict.
"""
for sensor in self.configuration:
if sensor["init"] == 1:
visible_value = self.getVisibleSensorReading(sensor[self.partNumber], True)
for Y, parameters in enumerate(sensor["parameters"]):
value[parameters["sc"]] = visible_value[Y]
return value
[docs]
def getVisibleSensorReading(self, partNo: int, flag: bool = False) -> int:
"""Conditionally read visible-light data from the SAMD.
Args:
partNo: Hardware part number identifying the sensor model.
flag: If True, perform the actual read; otherwise return 0.
Returns:
Visible-light value when flag is True, 0 otherwise.
"""
if flag:
return self.getVisible(partNo)
return 0
[docs]
def getVisible(self, partNo: int) -> int:
"""Read visible-light intensity from the SAMD co-processor.
Args:
partNo: Hardware part number (311=visible-light sensor).
Returns:
Visible-light intensity value, or 0 on error.
"""
if partNo == 311:
command = {"pid": "visible", "data": 1}
vs = 0
try:
self.visible_port.send_command(command, True)
data = self.visible_port.read_response()
data = data[1:-3]
if len(data) > 3:
data = data.decode("utf-8")
data = json.loads(data)
vs = data["vs"]
context_logger.debug_with_context("Visible", f"getVisible: {vs}")
except Exception as e:
context_logger.error_with_context("Visible", f"getVisible: {e}")
return vs
return 0
if __name__ == "__main__":
import os
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, "visible.config.json")
with open(file_name) as configFile:
configJson = configFile.read()
config = json.loads(configJson)
context_logger.info_with_context("Visible", f"Config loaded: {config}")
visible = OzVisible()
visible.initialize(config["visible"], {})