Source code for drivers.Rain.Rain

"""Driver for the Oizom tipping-bucket rain gauge (SAMD-based).

Communicates via UART with a SAMD microcontroller that measures cumulative
rainfall. Sends ASCII commands and receives colon-delimited text responses.

Hardware:
    Interface: UART (serial)
    Port: Configured via serial port argument
    Baud rate: Configured externally on the serial.Serial instance
    Protocol: ASCII command (``RAIN?\\r\\n``) -> text response (``RAIN:<value>``)

Typical usage::

    import serial
    from drivers.Rain.Rain import Rain

    ser = serial.Serial(port="/dev/ttyACM0", baudrate=115200, timeout=2)
    rain = Rain()
    rain.initialize(ser, {"en": 1, "parameters": [{"sc": "rain"}]})
    value = rain.get_rain()

Note:
    Requires ``pyserial`` for serial communication.
"""

import time

import serial

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class Rain: """Driver for an SAMD-based tipping-bucket rain gauge. Sends ``RAIN?`` commands over UART and parses text responses containing cumulative rainfall values. Attributes: ser: Open ``serial.Serial`` instance for UART communication. configuration: Device configuration dictionary. debug: Whether verbose debug logging is enabled. requested_parameters: List of parameter short-codes from configuration. """
[docs] def __init__(self) -> None: """Initialize default attributes for the Rain sensor.""" self.ser: serial.Serial = None self.configuration: dict = {} self.debug = True self.rain_enabled = False self.requested_parameters: list[str] = []
[docs] def initialize(self, serial_port: serial.Serial, configuration: dict) -> bool: """Configure the rain sensor with a serial port and device configuration. Args: serial_port: An open ``serial.Serial`` instance. configuration: Device configuration dict. Must contain ``"en"`` key to enable the sensor and a ``"parameters"`` list. Returns: True if the sensor is enabled and ready, False otherwise. """ try: self.ser = serial_port self.configuration = configuration context_logger.info_with_context( "Rain", f"Configuration: {self.configuration}" ) if configuration.get("en", 0): self.extract_requested_parameters(configuration) return True return False except Exception as e: context_logger.error_with_context("Rain", f"initialize: {e}") return False
[docs] def extract_requested_parameters(self, configuration: dict) -> list[str]: """Extract parameter short-codes from configuration. Args: configuration: Device configuration dict containing a ``"parameters"`` list. Returns: The list of requested parameter short-codes. """ if "parameters" in configuration: for parameter in configuration["parameters"]: param = parameter.get("sc", "rain") self.requested_parameters.append(param) return self.requested_parameters
[docs] def send_command(self, command: bytes = b"RAIN?\r\n") -> float: """Send a command over UART and parse the rainfall response. Args: command: Raw bytes to transmit. Defaults to ``b"RAIN?\\r\\n"``. Returns: Rainfall value in inches, or 0.0 on error. """ if self.debug: context_logger.debug_with_context("Rain", f"Sending: {command}") try: if not self.ser.is_open: self.ser.open() self.ser.reset_input_buffer() self.ser.reset_output_buffer() self.ser.write(command) self.ser.flush() time.sleep(0.3) response = self.ser.readline().decode().strip() if response.startswith("RAIN:"): value = float(response.split(":")[1]) context_logger.info_with_context("Rain", f"Rainfall: {value} inches") return value context_logger.warning_with_context( "Rain", f"Unexpected rain response: {response}" ) return 0.0 except Exception as e: context_logger.error_with_context("Rain", f"getSensorReading: {e}") return 0.0
[docs] def get_rain(self) -> float: """Query the sensor for the current rainfall measurement. Returns: Rainfall value in inches, or 0.0 on error. """ return self.send_command()
[docs] def getSensorReading(self) -> dict: """Get the latest rain reading (not implemented). Returns: Empty dict (placeholder for interface compliance). """ pass
[docs] def putSensorValue(self, value: dict) -> dict: """Fetch the current rain reading and add it to the result dict. Args: value: Dict to populate with the rain measurement. Returns: The updated dict with the rain parameter short-code mapped to the value. """ try: rain = self.get_rain() value.update({self.requested_parameters[0]: rain}) context_logger.info_with_context("Rain", f"Updated sensor values: {value}") return value except Exception as e: context_logger.error_with_context("Rain", f"putSensorValue: {e}") return value