Source code for OzWrapper.OzRain.OzRain

"""Rainfall sensor wrapper.

Communicates with a SAMD-based co-processor over UART (via the OzLan driver)
to read tipping-bucket rain gauge counts and compute rainfall amounts through
the GenericSensor contract.
"""

import json
from typing import ClassVar

from drivers.OzLan import OzLan
from SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


# TODO: ADD INTERRUPT AND CALLBACK FUNCTION FOR RAIN
# TODO : RAIN PIN FROM CONFIGURATION WHICH WILL COME FROM THE SERVER
# {"pid":"config","pay":{"rn":{ "en" : 61, "gp":4}}}
[docs] class OzRain(GenericSensor): """Wrapper for rainfall sensors via a SAMD co-processor. Configures the rain gauge GPIO on the SAMD, reads accumulated tip counts, and converts them to rainfall amounts. Attributes: configuration: List of sensor config dicts from the Gateway. v_rain: Nested list holding accumulated readings per sensor. rain_port: OzLan serial port instance for SAMD communication. """ configuration: ClassVar[dict] = {} v_rain: ClassVar[list[list[int]]] = [] rain_port: OzLan | None = None
[docs] def __init__(self) -> None: """Initialise the OzRain wrapper with default state.""" super().__init__()
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialise all rainfall sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. init_value: Mutable dict updated with per-sensor init status. Returns: True (always returns True for SAMD-based sensors). """ self.configuration = config context_logger.info_with_context("Rain", f"Configuration: {self.configuration}") _samd_sensor_init = [] for sensor in self.configuration: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) _samd_sensor_init.append(sensor["init"]) init_value.update({"rain": _samd_sensor_init}) return True # send confirmation tobit main file
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialise a single rainfall sensor by part number. Opens the serial connection to the SAMD and sends the rain gauge GPIO configuration command. Args: sensor: Single sensor configuration dict containing part number, enable flag, and GPIO/UART settings. Returns: 1 if the sensor was initialised successfully, 0 otherwise. """ _success = False # if sensor[self.partNumber] == self.baseConfig['pn_ina219'] and sensor['en'] == self.baseConfig['oz_enable']: # #TODO: Add gpio configuration # parameters = sensor['parameters'] # value = [] # for param in parameters: # if param[self.parameter] == self.baseConfig['pm_ina219']: # # val = self.getRain(sensor[self.partNumber], param[self.parameter]) # # gpio.setup(40, gpio.IN) # pin = Pin('40') # pin.direction = 'in' # pin.register_callback(self.__callback) # pin.enabled = True # oldVal = val # _val = { # "value": val, # "oldvalue": oldVal, # "count": 1 # } # value.append(_val) # self.v_rain.append(value) # _success = True if sensor[self.partNumber] == 61 and sensor["en"] == self.baseConfig["oz_enable"]: value = [] port = "/dev/ttyUSB0" baud = 115200 rain_pin = 10 if "gpio" in sensor: if "port" in sensor["gpio"]: port = sensor["gpio"]["port"] if "baud" in sensor["gpio"]: baud = sensor["gpio"]["baud"] if "pin" in sensor["gpio"]: rain_pin = sensor["gpio"]["pin"] try: command = { "pid": "config", "pay": {"rn": {"en": sensor[self.partNumber], "gp": rain_pin}}, } self.rain_port = OzLan(port, baud) self.rain_port.send_command(command) _success = True except Exception as e: context_logger.error_with_context("Rain", f"initializeSensor: {e}") _success = False self.v_rain.append(value) return int(_success)
[docs] def getRain(self, partNo: int, pm: int) -> int: """Read the accumulated rain count from the SAMD co-processor. Args: partNo: Hardware part number identifying the sensor model. pm: Parameter identifier (1=rain count). Returns: Rain count value, or 0 on error or unsupported part. """ if partNo == self.baseConfig["pn_ina219"]: if pm == 1: val = 12.25 context_logger.info_with_context("Rain", f"getRain: {val}") # return 12.25 return 0 if partNo == 61: val = 0 try: command = {"pid": "rn", "data": 1} self.rain_port.send_command(command, True) data = self.rain_port.read_response() data = data[1:-3] if len(data) > 3: data = data.decode("utf-8") data = json.loads(data) val = data["rain"] context_logger.info_with_context("Rain", f"getRain: {val}") except Exception as e: context_logger.error_with_context("Rain", f"getRain: {e}") return val return 0
[docs] def getSensorReading(self) -> None: """Trigger a rain reading from all initialised sensors. This method does not return data directly; values are collected in ``putSensorValue``. """ for sensor in self.configuration: if sensor["init"] == 1: self.getRainSensorReading(sensor[self.partNumber])
[docs] def putSensorValue(self, value: dict) -> dict: """Read final rainfall values and populate the output dict. Applies sensitivity and correction offsets to the raw rain count. Args: value: Mutable output dict to populate with rainfall readings. Returns: The updated output dict. """ for sensor in self.configuration: try: if sensor["init"] == 1: r_value = self.getRainSensorReading(sensor[self.partNumber], True) if "se" not in sensor["parameters"]: sensor["parameters"]["se"] = 100 if "cr" not in sensor["parameters"]: sensor["parameters"]["cr"] = 0 rain_value = round( ((r_value * (sensor["parameters"]["se"] / 100.0)) + (sensor["parameters"]["cr"] / 10.0)), 4, ) if "parameters" in sensor: if "sc" in sensor["parameters"]: sc = sensor["parameters"]["sc"] value[sc] = float(rain_value) except Exception as e: context_logger.error_with_context("Rain", f"putSensorValue: {e}") return value
[docs] def getRainSensorReading(self, partNo: int, flag: bool = False) -> int: """Conditionally read rain data from the SAMD. Args: partNo: Hardware part number identifying the sensor model. flag: If True, perform the actual read; otherwise return 0. Returns: Rain count value when flag is True, 0 otherwise. """ if flag: value = 0 try: value = self.getRain(partNo, 1) # parameter is hardcoded just for the samd except Exception as e: context_logger.error_with_context("Rain", f"getRainSensorReading: {e}") return value return 0
def __callback(self, sender: int, value: float) -> None: context_logger.info_with_context("Rain", f"pin={sender}, value={value}")