"""Wind speed, direction, and gust sensor wrapper.
Communicates with a SAMD-based co-processor over UART (via the OzLan driver)
to read wind direction, wind speed, and wind gust measurements through the
GenericSensor contract.
"""
import json
import time
from typing import ClassVar
from drivers.OzLan import OzLan
from SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzWind(GenericSensor):
"""Wrapper for wind sensors communicating via a SAMD co-processor.
Reads wind direction, wind speed, and optionally wind gust from a
SAMD-based board over serial. Includes spike filtering to reject
implausible jumps in consecutive readings.
Attributes:
configuration: List of sensor config dicts from the Gateway.
v_wind: Nested list holding wind reading arrays per sensor.
wind_old: Previous wind reading used for spike filtering.
wind_port: OzLan serial port instance for SAMD communication.
speed_threshold: Maximum acceptable speed jump between consecutive reads.
dir_threshold: Maximum acceptable direction jump between consecutive reads.
"""
configuration: ClassVar[dict] = {}
v_wind: ClassVar[list] = []
wind_old = None
wind_port: ClassVar[OzLan] = None
speed_threshold = 10
dir_threshold = 10
[docs]
def __init__(self) -> None:
"""Initialise the OzWind wrapper with default state."""
super().__init__()
[docs]
def is_valid_wind(self, wind_old, wind_new):
"""Validate a new wind reading against the previous one using thresholds.
Rejects readings where both speed and direction jump exceed their
respective thresholds simultaneously, which indicates a sensor glitch.
Args:
wind_old: Previous wind reading as a list [direction, speed, ...].
wind_new: New wind reading as a list [direction, speed, ...].
Returns:
True if the new reading is plausible, False if it should be rejected.
"""
try:
if len(wind_old) > 2:
old_wd, old_ws, _ = wind_old
else:
old_wd, old_ws = wind_old
if len(wind_new) > 2:
new_wd, new_ws, _ = wind_new
else:
new_wd, new_ws = wind_new
# Wind speed difference
speed_diff = abs(new_ws - old_ws)
# Wind direction difference (circular difference, since direction is 0–360)
dir_diff = abs(new_wd - old_wd)
dir_diff = min(dir_diff, 360 - dir_diff)
# If speed jump > threshold and direction jump > threshold → ignore
if speed_diff > self.speed_threshold and dir_diff > self.dir_threshold:
return False # ignore this value
return True # accept this value
except Exception as e:
context_logger.error_with_context("Wind", f"is_valid_wind: {e}")
return False # in case of error, accept the value
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialise all wind sensors listed in the Gateway config.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Mutable dict updated with per-sensor init status.
Returns:
True (always returns True for SAMD-based sensors).
"""
self.configuration = config
context_logger.debug_with_context("Wind", "Initializing OzWind sensors")
_samd_sensor_init = []
for sensor in self.configuration:
if self.partNumber in sensor:
sensor["init"] = self.initializeSensor(sensor)
_samd_sensor_init.append(sensor["init"])
init_value.update({"wind": _samd_sensor_init})
return True # send confirmation to main file
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialise a single wind sensor by part number.
Opens the serial connection to the SAMD co-processor, configures
the wind measurement type, and takes an initial reading.
Args:
sensor: Single sensor configuration dict containing part number,
enable flag, and GPIO/UART settings.
Returns:
1 if the SAMD reports the sensor is alive, 0 or -1 otherwise.
"""
_success = -1
value = []
if sensor[self.partNumber] == 211 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
try:
val = [0, 0]
init = self.initializeWind(sensor)
time.sleep(3)
if init:
val = self.getWindSensorReading(sensor[self.partNumber], True)
value.append(val)
_success = init
except Exception as e:
context_logger.error_with_context("Wind", f"initializeSensor: {e}")
_success = 0
if sensor[self.partNumber] == 213 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
try:
val = [0, 0, 0]
init = self.initializeWind(sensor)
time.sleep(3)
if init:
val = self.getWindSensorReading(sensor[self.partNumber], True)
value.append(val)
_success = init
except Exception as e:
context_logger.error_with_context("Wind", f"initializeSensor 212: {e}")
_success = 0
self.v_wind.append(value)
self.wind_old = value
return _success
[docs]
def initializeWind(self, sensor: dict) -> int:
"""Open the serial connection to the SAMD and verify sensor liveness.
Sends a configuration command followed by a liveness check.
Args:
sensor: Sensor config dict with GPIO/UART port, baud, and
optional thresholds.
Returns:
1 if the SAMD reports the wind sensor is alive, 0 otherwise.
"""
port = "/dev/ttyACM0"
baud = 115200
part = 1
self.debug = False
interval = 300
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = sensor["gpio"]["port"]
if "baud" in sensor["gpio"]:
baud = sensor["gpio"]["baud"]
if "part" in sensor["gpio"]:
part = int(sensor["gpio"]["part"])
if "debug" in sensor["gpio"]:
self.debug = True if sensor["gpio"]["debug"] == 1 else False
if "interval" in sensor["gpio"]:
interval = sensor["gpio"]["interval"]
if "speedThreshold" in sensor["gpio"]:
self.speed_threshold = sensor["gpio"]["speedThreshold"]
if "dirThreshold" in sensor["gpio"]:
self.dir_threshold = sensor["gpio"]["dirThreshold"]
live = 0
command = {"pid": "config", "pay": {"wind": part, "interval": interval}}
try:
self.wind_port = OzLan(port, baud)
self.wind_port.send_command(command)
time.sleep(1)
command = {
"pid": "wind",
"data": 2, # 1 start noise | 2 noise check alive or not | 3 read samd reading
}
self.wind_port.send_command(command, True)
data = self.wind_port.read_response()
data = data[1:-3]
if len(data) > 3:
data = data.decode("utf-8")
json_value = json.loads(data)
live = json_value["live"]
context_logger.info_with_context("Wind", f"activate : {live}")
except Exception as e:
context_logger.error_with_context("Wind", f"initializeWind: {e}")
return live
[docs]
def getSensorReading(self) -> None:
"""Trigger a wind reading from all initialised sensors.
This method does not return data directly; values are collected
in ``putSensorValue``.
"""
for sensor in self.configuration:
if sensor["init"] == 1:
self.getWindSensorReading(sensor[self.partNumber])
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Read final wind values and populate the output dict.
Applies spike filtering against the previous reading; substitutes
zeroes if the new reading is rejected.
Args:
value: Mutable output dict to populate with wind readings.
Returns:
The updated output dict.
"""
for sensor in self.configuration:
if sensor["init"] == 1:
wind_value = self.getWindSensorReading(sensor[self.partNumber], True)
if not self.is_valid_wind(self.wind_old, wind_value):
wind_value = [0, 0, 0]
for Y, parameters in enumerate(sensor["parameters"]):
value[parameters["sc"]] = wind_value[Y]
self.wind_old = wind_value
return value
[docs]
def getWindSensorReading(self, partNo: int, flag: bool = False) -> int:
"""Conditionally read wind data from the SAMD.
Args:
partNo: Hardware part number identifying the sensor model.
flag: If True, perform the actual read; otherwise return 0.
Returns:
Wind data list when flag is True, 0 otherwise.
"""
if flag:
return self.getWind(partNo)
return 0
[docs]
def getWind(self, partNo: int) -> list:
"""Read wind direction, speed, and optionally gust from the SAMD.
Args:
partNo: Hardware part number (211=dir+speed, 213=dir+speed+gust).
Returns:
List of wind values: [direction, speed] or [direction, speed, gust].
"""
if partNo == 211:
command = {"pid": "wind", "data": 1}
wd = 0
ws = 0
try:
self.wind_port.send_command(command, True)
data = self.wind_port.read_response()
data = data[1:-3]
if len(data) > 3:
data = data.decode("utf-8")
data = json.loads(data)
wd = data["wd"]
ws = data["ws"]
context_logger.info_with_context("Wind", f"Wind direction is :{wd} and Wind Speed is : {ws}")
except Exception as e:
context_logger.error_with_context("Wind", f"[ERR_WIND] getWind: {e}")
return [wd, ws]
if partNo == 213:
command = {"pid": "wind", "data": 1}
if self.debug:
command = {"pid": "wind", "data": 3}
wd = 0
ws = 0
wg = 0
try:
self.wind_port.send_command(command, True)
data = self.wind_port.read_response()
data = data[1:-3]
if len(data) > 3:
data = data.decode("utf-8")
data = json.loads(data)
wd = data["wd"]
ws = data["ws"]
wg = round(data["wg"], 2)
context_logger.info_with_context(
"Wind",
f"Wind direction is :{wd} and Wind Speed is : {ws} and Wind Gust is : {wg}",
)
except Exception as e:
context_logger.error_with_context("Wind", f"[ERR_WIND] getWind: {e}")
return [wd, ws, wg]
return [0, 0, 0]
if __name__ == "__main__":
import os
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, "wind.config.json")
with open(file_name) as configFile:
configJson = configFile.read()
config = json.loads(configJson)
context_logger.info_with_context("Wind", f"Config loaded: {config}")
wind = OzWind()
wind.initialize(config["wind"], {})