"""Surface temperature sensor wrapper.
Communicates with a SAMD-based co-processor over UART (via the OzLan driver)
to read surface temperature measurements through the GenericSensor contract.
"""
import json
import time
from typing import ClassVar
from drivers.OzLan import OzLan
from SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzSurface(GenericSensor):
"""Wrapper for surface temperature sensors via a SAMD co-processor.
Reads surface temperature from a SAMD-connected probe through serial
JSON commands.
Attributes:
configuration: List of sensor config dicts from the Gateway.
v_surface: Nested list holding surface temperature readings.
surface_port: OzLan serial port instance for SAMD communication.
"""
configuration: ClassVar[dict] = {}
v_surface: ClassVar[list[list[int]]] = []
surface_port: OzLan | None = None
[docs]
def __init__(self) -> None:
"""Initialise the OzSurface wrapper with default state."""
super().__init__()
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialise all surface temperature sensors from the Gateway config.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Mutable dict updated with per-sensor init status.
Returns:
True (always returns True for SAMD-based sensors).
"""
self.configuration = config
context_logger.info_with_context("Surface", f"Config: {self.configuration}")
_samd_sensor_init = []
for sensor in self.configuration:
if self.partNumber in sensor:
sensor["init"] = self.initializeSensor(sensor)
_samd_sensor_init.append(sensor["init"])
init_value.update({"surface": _samd_sensor_init})
return True # send confirmation to main file
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialise a single surface temperature sensor by part number.
Opens the serial connection to the SAMD and takes an initial reading.
Args:
sensor: Single sensor configuration dict containing part number,
enable flag, and GPIO/UART settings.
Returns:
1 if the SAMD reports the sensor is alive, 0 or -1 otherwise.
"""
_success = -1
value = []
if sensor[self.partNumber] == 411 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
try:
val = [0]
init = self.initializeSurface(sensor)
time.sleep(3)
if init:
val = self.getSurfaceSensorReading(sensor[self.partNumber], True)
value.append(val)
_success = init
except Exception as e:
context_logger.error_with_context("Surface", f"initializeSensor: {e}")
_success = 0
self.v_surface.append(value)
return _success
[docs]
def initializeSurface(self, sensor: dict) -> int:
"""Open the serial connection to the SAMD and verify sensor liveness.
Sends a configuration command followed by a liveness check.
Args:
sensor: Sensor config dict with GPIO/UART port and baud settings.
Returns:
1 if the SAMD reports the surface sensor is alive, 0 otherwise.
"""
port = "/dev/ttyACM0"
baud = 115200
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = sensor["gpio"]["port"]
if "baud" in sensor["gpio"]:
baud = sensor["gpio"]["baud"]
live = 0
command = {"pid": "config", "pay": {"surface": 1}}
try:
self.surface_port = OzLan(port, baud)
self.surface_port.send_command(command)
time.sleep(3)
command = {
"pid": "surface",
"data": 2, # 1 start noise | 2 noise check alive or not | 3 read samd reading
}
self.surface_port.send_command(command, True)
data = self.surface_port.read_response()
# remove #, \r\n from ending and remove ~ from beginning currently some things are unsupported
data = data[1:-3]
if len(data) > 3:
data = data.decode("utf-8")
json_value = json.loads(data)
live = json_value["live"]
context_logger.info_with_context("Surface", f"activate : {live}")
except Exception as e:
context_logger.error_with_context("Surface", f"initializeSurface: {e}")
return live
[docs]
def getSensorReading(self) -> None:
"""Trigger a surface temperature reading from all initialised sensors."""
for sensor in self.configuration:
if sensor["init"] == 1:
self.getSurfaceSensorReading(sensor[self.partNumber])
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Read final surface temperature values and populate the output dict.
Args:
value: Mutable output dict to populate with readings.
Returns:
The updated output dict.
"""
for sensor in self.configuration:
if sensor["init"] == 1:
visible_value = self.getSurfaceSensorReading(sensor[self.partNumber], True)
for Y, parameters in enumerate(sensor["parameters"]):
value[parameters["sc"]] = visible_value[Y]
return value
[docs]
def getSurfaceSensorReading(self, partNo: int, flag: bool = False) -> int:
"""Conditionally read surface temperature data from the SAMD.
Args:
partNo: Hardware part number identifying the sensor model.
flag: If True, perform the actual read; otherwise return 0.
Returns:
Surface temperature data list when flag is True, 0 otherwise.
"""
if flag:
return self.getSurface(partNo)
return 0
[docs]
def getSurface(self, partNo: int) -> list[int]:
"""Read surface temperature from the SAMD co-processor.
Args:
partNo: Hardware part number (411=surface temperature probe).
Returns:
List containing the surface temperature value.
"""
if partNo == 411:
command = {"pid": "surface", "data": 1}
val = 0
try:
self.surface_port.send_command(command, True)
data = self.surface_port.read_response()
# remove #, \r\n from ending and remove ~ from beginning currently some things are unsupported
data = data[1:-3]
if len(data) > 3:
data = data.decode("utf-8")
data = json.loads(data)
val = data["st"]
context_logger.info_with_context("Surface", f"getSurface: {val}")
except Exception as e:
context_logger.error_with_context("Surface", f"getSurface: {e}")
return [val]
return [0]
if __name__ == "__main__":
import os
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, "surface.config.json")
with open(file_name) as configFile:
configJson = configFile.read()
config = json.loads(configJson)
context_logger.info_with_context("Surface", f"Config loaded: {config}")
visible = OzSurface()
visible.initialize(config["surface"], {})