Source code for OzWrapper.OzFlood.OzFlood

"""Flood level sensor wrapper.

Abstracts over flood-level hardware drivers -- Flood (I2C ultrasonic) and
FloodUART (UART ultrasonic) -- providing a unified interface for water-level
distance measurements through the GenericSensor contract.
"""

import json
import os

from drivers.Flood.Flood import Flood
from drivers.FloodUART.FloodUART import FloodUART
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzFlood(GenericSensor): """Wrapper for flood-level distance sensors. Manages initialisation, periodic reading, and value aggregation for ultrasonic flood-level sensors (I2C and UART variants). Attributes: flood_driver: Flood I2C driver instance, or None. flood_UARTport: FloodUART driver instance, or None. configuration: List of sensor config dicts from the Gateway. v_flood: Nested list holding accumulated readings per sensor/parameter. s_flood: Supplementary flood state list. """ flood_driver = None flood_UARTport = None
[docs] def __init__(self) -> None: """Initialise the OzFlood wrapper with default state.""" super().__init__() self.configuration: dict = {} self.v_flood: list = [] self.s_flood: list = []
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialise all flood sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. init_value: Mutable dict updated with per-sensor init status. Returns: True if at least one sensor initialised successfully. """ self.configuration = config context_logger.debug_with_context("Flood", f"Initializing Flood sensor: {self.configuration}") _success = False for sensor in self.configuration: try: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) _success = True except Exception as e: context_logger.error_with_context("Flood", f"Initialize : {e}") sensor["init"] = 0 self.putInitvalues(init_value) return _success
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialise a single flood sensor by part number. Creates the appropriate driver (I2C or UART), takes an initial reading, and stores bootstrap values in ``v_flood``. Args: sensor: Single sensor configuration dict containing part number, enable flag, GPIO/I2C settings, and parameter list. Returns: 1 if the sensor was initialised successfully, 0 otherwise. """ value = [] _success = False self.flood_driver = None self.flood_UARTport = None if sensor[self.partNumber] == 71 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] port = 0 if "gpio" in sensor: if "port" in sensor["gpio"]: port = int(sensor["gpio"]["port"]) self.flood_driver = Flood(i2c_bus=port) for param in parameters: if param[self.parameter] == 1: val = self.getFlood(sensor[self.partNumber], param[self.parameter]) oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = True if sensor[self.partNumber] == 72 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] port = 0 if "gpio" in sensor: if "port" in sensor["gpio"]: port = sensor["gpio"]["port"] self.flood_UARTport = FloodUART() for param in parameters: if param[self.parameter] == 1: val = self.getFlood(sensor[self.partNumber], param[self.parameter]) oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = True self.v_flood.append(value) return int(_success)
[docs] def getSensorReading(self) -> dict: """Read current flood levels from all initialised sensors. Applies sensitivity and correction offsets, accumulates values for later averaging, and returns real-time data keyed by short-code. Returns: Dict mapping parameter short-codes to their latest readings. """ data = {} for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): try: value = self.getFlood(sensor[self.partNumber], parameters[self.parameter]) value = round( ((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)), 2, ) self.v_flood[X][Y]["value"].append(value) self.v_flood[X][Y]["count"] += 1 data[parameters["sc"]] = value except Exception as e: context_logger.error_with_context("Flood", f"getSensorReading: {e}") return data
[docs] def putSensorValue(self, value: dict) -> dict: """Flush accumulated flood readings into the output dict and reset counters. Args: value: Mutable output dict to populate with aggregated readings. Returns: The updated output dict. """ for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): try: value[parameters["sc"]] = self.v_flood[X][Y]["value"] self.v_flood[X][Y]["value"] = [] self.v_flood[X][Y]["count"] = 0 except Exception as e: context_logger.error_with_context("Flood", f"putSensorValue: {e}") return value
[docs] def putInitvalues(self, value: dict) -> dict: """Record each sensor's init status into the shared init-value dict. Args: value: Mutable dict updated in-place with a ``"flood"`` key. Returns: The updated init-value dict. """ _sensor_init = [] for sensor in self.configuration: _sensor_init.append(sensor["init"]) value.update({"flood": _sensor_init}) return value
[docs] def getFlood(self, partNo: int, pm: int) -> int | None: """Read a flood-level measurement from the correct driver. Args: partNo: Hardware part number (71=I2C, 72=UART). pm: Parameter identifier (currently unused; single measurement). Returns: Distance measurement in mm, or None if unsupported. """ # TODO: pm is not used since it is not necessary # look at temp or uv light library for flood function if partNo == 71: return self.flood_driver.getFloodData() if partNo == 72: return self.flood_UARTport.measure() # returns the data in "mm" return None
# Example code # python3 -m OzWrapper.OzFlood.OzFlood if __name__ == "__main__": import os sensor_string = "flood" file_string = "flood.config.json" dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, file_string) with open(file_name) as configFile: sensor = configFile.read() sensorConfig = json.loads(sensor) context_logger.info_with_context("flood", f"Config: {sensorConfig}") sensor = OzFlood() sensor.initialize(sensorConfig[sensor_string], init_value={}) for _ in range(0, 4): sensor.getSensorReading() data = {} context_logger.info_with_context("flood", f"{sensor.putSensorValue(data)}")