"""Rainfall sensor wrapper.
Communicates with a SAMD-based co-processor over UART (via the OzLan driver)
to read tipping-bucket rain gauge counts and compute rainfall amounts through
the GenericSensor contract.
"""
import json
from typing import ClassVar
from drivers.OzLan import OzLan
from SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
# TODO: ADD INTERRUPT AND CALLBACK FUNCTION FOR RAIN
# TODO : RAIN PIN FROM CONFIGURATION WHICH WILL COME FROM THE SERVER
# {"pid":"config","pay":{"rn":{ "en" : 61, "gp":4}}}
[docs]
class OzRain(GenericSensor):
"""Wrapper for rainfall sensors via a SAMD co-processor.
Configures the rain gauge GPIO on the SAMD, reads accumulated tip
counts, and converts them to rainfall amounts.
Attributes:
configuration: List of sensor config dicts from the Gateway.
v_rain: Nested list holding accumulated readings per sensor.
rain_port: OzLan serial port instance for SAMD communication.
"""
configuration: ClassVar[dict] = {}
v_rain: ClassVar[list[list[int]]] = []
rain_port: OzLan | None = None
[docs]
def __init__(self) -> None:
"""Initialise the OzRain wrapper with default state."""
super().__init__()
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialise all rainfall sensors listed in the Gateway config.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Mutable dict updated with per-sensor init status.
Returns:
True (always returns True for SAMD-based sensors).
"""
self.configuration = config
context_logger.info_with_context("Rain", f"Configuration: {self.configuration}")
_samd_sensor_init = []
for sensor in self.configuration:
if self.partNumber in sensor:
sensor["init"] = self.initializeSensor(sensor)
_samd_sensor_init.append(sensor["init"])
init_value.update({"rain": _samd_sensor_init})
return True # send confirmation tobit main file
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialise a single rainfall sensor by part number.
Opens the serial connection to the SAMD and sends the rain gauge
GPIO configuration command.
Args:
sensor: Single sensor configuration dict containing part number,
enable flag, and GPIO/UART settings.
Returns:
1 if the sensor was initialised successfully, 0 otherwise.
"""
_success = False
# if sensor[self.partNumber] == self.baseConfig['pn_ina219'] and sensor['en'] == self.baseConfig['oz_enable']:
# #TODO: Add gpio configuration
# parameters = sensor['parameters']
# value = []
# for param in parameters:
# if param[self.parameter] == self.baseConfig['pm_ina219']:
# # val = self.getRain(sensor[self.partNumber], param[self.parameter])
# # gpio.setup(40, gpio.IN)
# pin = Pin('40')
# pin.direction = 'in'
# pin.register_callback(self.__callback)
# pin.enabled = True
# oldVal = val
# _val = {
# "value": val,
# "oldvalue": oldVal,
# "count": 1
# }
# value.append(_val)
# self.v_rain.append(value)
# _success = True
if sensor[self.partNumber] == 61 and sensor["en"] == self.baseConfig["oz_enable"]:
value = []
port = "/dev/ttyUSB0"
baud = 115200
rain_pin = 10
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = sensor["gpio"]["port"]
if "baud" in sensor["gpio"]:
baud = sensor["gpio"]["baud"]
if "pin" in sensor["gpio"]:
rain_pin = sensor["gpio"]["pin"]
try:
command = {
"pid": "config",
"pay": {"rn": {"en": sensor[self.partNumber], "gp": rain_pin}},
}
self.rain_port = OzLan(port, baud)
self.rain_port.send_command(command)
_success = True
except Exception as e:
context_logger.error_with_context("Rain", f"initializeSensor: {e}")
_success = False
self.v_rain.append(value)
return int(_success)
[docs]
def getRain(self, partNo: int, pm: int) -> int:
"""Read the accumulated rain count from the SAMD co-processor.
Args:
partNo: Hardware part number identifying the sensor model.
pm: Parameter identifier (1=rain count).
Returns:
Rain count value, or 0 on error or unsupported part.
"""
if partNo == self.baseConfig["pn_ina219"]:
if pm == 1:
val = 12.25
context_logger.info_with_context("Rain", f"getRain: {val}")
# return 12.25
return 0
if partNo == 61:
val = 0
try:
command = {"pid": "rn", "data": 1}
self.rain_port.send_command(command, True)
data = self.rain_port.read_response()
data = data[1:-3]
if len(data) > 3:
data = data.decode("utf-8")
data = json.loads(data)
val = data["rain"]
context_logger.info_with_context("Rain", f"getRain: {val}")
except Exception as e:
context_logger.error_with_context("Rain", f"getRain: {e}")
return val
return 0
[docs]
def getSensorReading(self) -> None:
"""Trigger a rain reading from all initialised sensors.
This method does not return data directly; values are collected
in ``putSensorValue``.
"""
for sensor in self.configuration:
if sensor["init"] == 1:
self.getRainSensorReading(sensor[self.partNumber])
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Read final rainfall values and populate the output dict.
Applies sensitivity and correction offsets to the raw rain count.
Args:
value: Mutable output dict to populate with rainfall readings.
Returns:
The updated output dict.
"""
for sensor in self.configuration:
try:
if sensor["init"] == 1:
r_value = self.getRainSensorReading(sensor[self.partNumber], True)
if "se" not in sensor["parameters"]:
sensor["parameters"]["se"] = 100
if "cr" not in sensor["parameters"]:
sensor["parameters"]["cr"] = 0
rain_value = round(
((r_value * (sensor["parameters"]["se"] / 100.0)) + (sensor["parameters"]["cr"] / 10.0)),
4,
)
if "parameters" in sensor:
if "sc" in sensor["parameters"]:
sc = sensor["parameters"]["sc"]
value[sc] = float(rain_value)
except Exception as e:
context_logger.error_with_context("Rain", f"putSensorValue: {e}")
return value
[docs]
def getRainSensorReading(self, partNo: int, flag: bool = False) -> int:
"""Conditionally read rain data from the SAMD.
Args:
partNo: Hardware part number identifying the sensor model.
flag: If True, perform the actual read; otherwise return 0.
Returns:
Rain count value when flag is True, 0 otherwise.
"""
if flag:
value = 0
try:
value = self.getRain(partNo, 1) # parameter is hardcoded just for the samd
except Exception as e:
context_logger.error_with_context("Rain", f"getRainSensorReading: {e}")
return value
return 0
def __callback(self, sender: int, value: float) -> None:
context_logger.info_with_context("Rain", f"pin={sender}, value={value}")