Source code for OzWrapper.OzWind.OzWind

"""Wind speed, direction, and gust sensor wrapper.

Communicates with a SAMD-based co-processor over UART (via the OzLan driver)
to read wind direction, wind speed, and wind gust measurements through the
GenericSensor contract.
"""

import json
import time
from typing import ClassVar

from drivers.OzLan import OzLan
from SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzWind(GenericSensor): """Wrapper for wind sensors communicating via a SAMD co-processor. Reads wind direction, wind speed, and optionally wind gust from a SAMD-based board over serial. Includes spike filtering to reject implausible jumps in consecutive readings. Attributes: configuration: List of sensor config dicts from the Gateway. v_wind: Nested list holding wind reading arrays per sensor. wind_old: Previous wind reading used for spike filtering. wind_port: OzLan serial port instance for SAMD communication. speed_threshold: Maximum acceptable speed jump between consecutive reads. dir_threshold: Maximum acceptable direction jump between consecutive reads. """ configuration: ClassVar[dict] = {} v_wind: ClassVar[list] = [] wind_old = None wind_port: ClassVar[OzLan] = None speed_threshold = 10 dir_threshold = 10
[docs] def __init__(self) -> None: """Initialise the OzWind wrapper with default state.""" super().__init__()
[docs] def is_valid_wind(self, wind_old, wind_new): """Validate a new wind reading against the previous one using thresholds. Rejects readings where both speed and direction jump exceed their respective thresholds simultaneously, which indicates a sensor glitch. Args: wind_old: Previous wind reading as a list [direction, speed, ...]. wind_new: New wind reading as a list [direction, speed, ...]. Returns: True if the new reading is plausible, False if it should be rejected. """ try: if len(wind_old) > 2: old_wd, old_ws, _ = wind_old else: old_wd, old_ws = wind_old if len(wind_new) > 2: new_wd, new_ws, _ = wind_new else: new_wd, new_ws = wind_new # Wind speed difference speed_diff = abs(new_ws - old_ws) # Wind direction difference (circular difference, since direction is 0–360) dir_diff = abs(new_wd - old_wd) dir_diff = min(dir_diff, 360 - dir_diff) # If speed jump > threshold and direction jump > threshold → ignore if speed_diff > self.speed_threshold and dir_diff > self.dir_threshold: return False # ignore this value return True # accept this value except Exception as e: context_logger.error_with_context("Wind", f"is_valid_wind: {e}") return False # in case of error, accept the value
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialise all wind sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. init_value: Mutable dict updated with per-sensor init status. Returns: True (always returns True for SAMD-based sensors). """ self.configuration = config context_logger.debug_with_context("Wind", "Initializing OzWind sensors") _samd_sensor_init = [] for sensor in self.configuration: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) _samd_sensor_init.append(sensor["init"]) init_value.update({"wind": _samd_sensor_init}) return True # send confirmation to main file
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialise a single wind sensor by part number. Opens the serial connection to the SAMD co-processor, configures the wind measurement type, and takes an initial reading. Args: sensor: Single sensor configuration dict containing part number, enable flag, and GPIO/UART settings. Returns: 1 if the SAMD reports the sensor is alive, 0 or -1 otherwise. """ _success = -1 value = [] if sensor[self.partNumber] == 211 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration try: val = [0, 0] init = self.initializeWind(sensor) time.sleep(3) if init: val = self.getWindSensorReading(sensor[self.partNumber], True) value.append(val) _success = init except Exception as e: context_logger.error_with_context("Wind", f"initializeSensor: {e}") _success = 0 if sensor[self.partNumber] == 213 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration try: val = [0, 0, 0] init = self.initializeWind(sensor) time.sleep(3) if init: val = self.getWindSensorReading(sensor[self.partNumber], True) value.append(val) _success = init except Exception as e: context_logger.error_with_context("Wind", f"initializeSensor 212: {e}") _success = 0 self.v_wind.append(value) self.wind_old = value return _success
[docs] def initializeWind(self, sensor: dict) -> int: """Open the serial connection to the SAMD and verify sensor liveness. Sends a configuration command followed by a liveness check. Args: sensor: Sensor config dict with GPIO/UART port, baud, and optional thresholds. Returns: 1 if the SAMD reports the wind sensor is alive, 0 otherwise. """ port = "/dev/ttyACM0" baud = 115200 part = 1 self.debug = False interval = 300 if "gpio" in sensor: if "port" in sensor["gpio"]: port = sensor["gpio"]["port"] if "baud" in sensor["gpio"]: baud = sensor["gpio"]["baud"] if "part" in sensor["gpio"]: part = int(sensor["gpio"]["part"]) if "debug" in sensor["gpio"]: self.debug = True if sensor["gpio"]["debug"] == 1 else False if "interval" in sensor["gpio"]: interval = sensor["gpio"]["interval"] if "speedThreshold" in sensor["gpio"]: self.speed_threshold = sensor["gpio"]["speedThreshold"] if "dirThreshold" in sensor["gpio"]: self.dir_threshold = sensor["gpio"]["dirThreshold"] live = 0 command = {"pid": "config", "pay": {"wind": part, "interval": interval}} try: self.wind_port = OzLan(port, baud) self.wind_port.send_command(command) time.sleep(1) command = { "pid": "wind", "data": 2, # 1 start noise | 2 noise check alive or not | 3 read samd reading } self.wind_port.send_command(command, True) data = self.wind_port.read_response() data = data[1:-3] if len(data) > 3: data = data.decode("utf-8") json_value = json.loads(data) live = json_value["live"] context_logger.info_with_context("Wind", f"activate : {live}") except Exception as e: context_logger.error_with_context("Wind", f"initializeWind: {e}") return live
[docs] def getSensorReading(self) -> None: """Trigger a wind reading from all initialised sensors. This method does not return data directly; values are collected in ``putSensorValue``. """ for sensor in self.configuration: if sensor["init"] == 1: self.getWindSensorReading(sensor[self.partNumber])
[docs] def putSensorValue(self, value: dict) -> dict: """Read final wind values and populate the output dict. Applies spike filtering against the previous reading; substitutes zeroes if the new reading is rejected. Args: value: Mutable output dict to populate with wind readings. Returns: The updated output dict. """ for sensor in self.configuration: if sensor["init"] == 1: wind_value = self.getWindSensorReading(sensor[self.partNumber], True) if not self.is_valid_wind(self.wind_old, wind_value): wind_value = [0, 0, 0] for Y, parameters in enumerate(sensor["parameters"]): value[parameters["sc"]] = wind_value[Y] self.wind_old = wind_value return value
[docs] def getWindSensorReading(self, partNo: int, flag: bool = False) -> int: """Conditionally read wind data from the SAMD. Args: partNo: Hardware part number identifying the sensor model. flag: If True, perform the actual read; otherwise return 0. Returns: Wind data list when flag is True, 0 otherwise. """ if flag: return self.getWind(partNo) return 0
[docs] def getWind(self, partNo: int) -> list: """Read wind direction, speed, and optionally gust from the SAMD. Args: partNo: Hardware part number (211=dir+speed, 213=dir+speed+gust). Returns: List of wind values: [direction, speed] or [direction, speed, gust]. """ if partNo == 211: command = {"pid": "wind", "data": 1} wd = 0 ws = 0 try: self.wind_port.send_command(command, True) data = self.wind_port.read_response() data = data[1:-3] if len(data) > 3: data = data.decode("utf-8") data = json.loads(data) wd = data["wd"] ws = data["ws"] context_logger.info_with_context("Wind", f"Wind direction is :{wd} and Wind Speed is : {ws}") except Exception as e: context_logger.error_with_context("Wind", f"[ERR_WIND] getWind: {e}") return [wd, ws] if partNo == 213: command = {"pid": "wind", "data": 1} if self.debug: command = {"pid": "wind", "data": 3} wd = 0 ws = 0 wg = 0 try: self.wind_port.send_command(command, True) data = self.wind_port.read_response() data = data[1:-3] if len(data) > 3: data = data.decode("utf-8") data = json.loads(data) wd = data["wd"] ws = data["ws"] wg = round(data["wg"], 2) context_logger.info_with_context( "Wind", f"Wind direction is :{wd} and Wind Speed is : {ws} and Wind Gust is : {wg}", ) except Exception as e: context_logger.error_with_context("Wind", f"[ERR_WIND] getWind: {e}") return [wd, ws, wg] return [0, 0, 0]
if __name__ == "__main__": import os dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, "wind.config.json") with open(file_name) as configFile: configJson = configFile.read() config = json.loads(configJson) context_logger.info_with_context("Wind", f"Config loaded: {config}") wind = OzWind() wind.initialize(config["wind"], {})