"""Driver for the Oizom tipping-bucket rain gauge (SAMD-based).
Communicates via UART with a SAMD microcontroller that measures cumulative
rainfall. Sends ASCII commands and receives colon-delimited text responses.
Hardware:
Interface: UART (serial)
Port: Configured via serial port argument
Baud rate: Configured externally on the serial.Serial instance
Protocol: ASCII command (``RAIN?\\r\\n``) -> text response (``RAIN:<value>``)
Typical usage::
import serial
from drivers.Rain.Rain import Rain
ser = serial.Serial(port="/dev/ttyACM0", baudrate=115200, timeout=2)
rain = Rain()
rain.initialize(ser, {"en": 1, "parameters": [{"sc": "rain"}]})
value = rain.get_rain()
Note:
Requires ``pyserial`` for serial communication.
"""
import time
import serial
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class Rain:
"""Driver for an SAMD-based tipping-bucket rain gauge.
Sends ``RAIN?`` commands over UART and parses text responses containing
cumulative rainfall values.
Attributes:
ser: Open ``serial.Serial`` instance for UART communication.
configuration: Device configuration dictionary.
debug: Whether verbose debug logging is enabled.
requested_parameters: List of parameter short-codes from configuration.
"""
[docs]
def __init__(self) -> None:
"""Initialize default attributes for the Rain sensor."""
self.ser: serial.Serial = None
self.configuration: dict = {}
self.debug = True
self.rain_enabled = False
self.requested_parameters: list[str] = []
[docs]
def initialize(self, serial_port: serial.Serial, configuration: dict) -> bool:
"""Configure the rain sensor with a serial port and device configuration.
Args:
serial_port: An open ``serial.Serial`` instance.
configuration: Device configuration dict. Must contain ``"en"`` key
to enable the sensor and a ``"parameters"`` list.
Returns:
True if the sensor is enabled and ready, False otherwise.
"""
try:
self.ser = serial_port
self.configuration = configuration
context_logger.info_with_context(
"Rain", f"Configuration: {self.configuration}"
)
if configuration.get("en", 0):
self.extract_requested_parameters(configuration)
return True
return False
except Exception as e:
context_logger.error_with_context("Rain", f"initialize: {e}")
return False
[docs]
def send_command(self, command: bytes = b"RAIN?\r\n") -> float:
"""Send a command over UART and parse the rainfall response.
Args:
command: Raw bytes to transmit. Defaults to ``b"RAIN?\\r\\n"``.
Returns:
Rainfall value in inches, or 0.0 on error.
"""
if self.debug:
context_logger.debug_with_context("Rain", f"Sending: {command}")
try:
if not self.ser.is_open:
self.ser.open()
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
self.ser.write(command)
self.ser.flush()
time.sleep(0.3)
response = self.ser.readline().decode().strip()
if response.startswith("RAIN:"):
value = float(response.split(":")[1])
context_logger.info_with_context("Rain", f"Rainfall: {value} inches")
return value
context_logger.warning_with_context(
"Rain", f"Unexpected rain response: {response}"
)
return 0.0
except Exception as e:
context_logger.error_with_context("Rain", f"getSensorReading: {e}")
return 0.0
[docs]
def get_rain(self) -> float:
"""Query the sensor for the current rainfall measurement.
Returns:
Rainfall value in inches, or 0.0 on error.
"""
return self.send_command()
[docs]
def getSensorReading(self) -> dict:
"""Get the latest rain reading (not implemented).
Returns:
Empty dict (placeholder for interface compliance).
"""
pass
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Fetch the current rain reading and add it to the result dict.
Args:
value: Dict to populate with the rain measurement.
Returns:
The updated dict with the rain parameter short-code mapped to the value.
"""
try:
rain = self.get_rain()
value.update({self.requested_parameters[0]: rain})
context_logger.info_with_context("Rain", f"Updated sensor values: {value}")
return value
except Exception as e:
context_logger.error_with_context("Rain", f"putSensorValue: {e}")
return value