"""Flood level sensor wrapper.
Abstracts over flood-level hardware drivers -- Flood (I2C ultrasonic) and
FloodUART (UART ultrasonic) -- providing a unified interface for water-level
distance measurements through the GenericSensor contract.
"""
import json
import os
from drivers.Flood.Flood import Flood
from drivers.FloodUART.FloodUART import FloodUART
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzFlood(GenericSensor):
"""Wrapper for flood-level distance sensors.
Manages initialisation, periodic reading, and value aggregation for
ultrasonic flood-level sensors (I2C and UART variants).
Attributes:
flood_driver: Flood I2C driver instance, or None.
flood_UARTport: FloodUART driver instance, or None.
configuration: List of sensor config dicts from the Gateway.
v_flood: Nested list holding accumulated readings per sensor/parameter.
s_flood: Supplementary flood state list.
"""
flood_driver = None
flood_UARTport = None
[docs]
def __init__(self) -> None:
"""Initialise the OzFlood wrapper with default state."""
super().__init__()
self.configuration: dict = {}
self.v_flood: list = []
self.s_flood: list = []
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialise all flood sensors listed in the Gateway config.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Mutable dict updated with per-sensor init status.
Returns:
True if at least one sensor initialised successfully.
"""
self.configuration = config
context_logger.debug_with_context("Flood", f"Initializing Flood sensor: {self.configuration}")
_success = False
for sensor in self.configuration:
try:
if self.partNumber in sensor:
sensor["init"] = self.initializeSensor(sensor)
_success = True
except Exception as e:
context_logger.error_with_context("Flood", f"Initialize : {e}")
sensor["init"] = 0
self.putInitvalues(init_value)
return _success
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialise a single flood sensor by part number.
Creates the appropriate driver (I2C or UART), takes an initial
reading, and stores bootstrap values in ``v_flood``.
Args:
sensor: Single sensor configuration dict containing part number,
enable flag, GPIO/I2C settings, and parameter list.
Returns:
1 if the sensor was initialised successfully, 0 otherwise.
"""
value = []
_success = False
self.flood_driver = None
self.flood_UARTport = None
if sensor[self.partNumber] == 71 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
port = 0
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = int(sensor["gpio"]["port"])
self.flood_driver = Flood(i2c_bus=port)
for param in parameters:
if param[self.parameter] == 1:
val = self.getFlood(sensor[self.partNumber], param[self.parameter])
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = True
if sensor[self.partNumber] == 72 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
port = 0
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = sensor["gpio"]["port"]
self.flood_UARTport = FloodUART()
for param in parameters:
if param[self.parameter] == 1:
val = self.getFlood(sensor[self.partNumber], param[self.parameter])
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = True
self.v_flood.append(value)
return int(_success)
[docs]
def getSensorReading(self) -> dict:
"""Read current flood levels from all initialised sensors.
Applies sensitivity and correction offsets, accumulates values for
later averaging, and returns real-time data keyed by short-code.
Returns:
Dict mapping parameter short-codes to their latest readings.
"""
data = {}
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
for Y, parameters in enumerate(sensor["parameters"]):
try:
value = self.getFlood(sensor[self.partNumber], parameters[self.parameter])
value = round(
((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)),
2,
)
self.v_flood[X][Y]["value"].append(value)
self.v_flood[X][Y]["count"] += 1
data[parameters["sc"]] = value
except Exception as e:
context_logger.error_with_context("Flood", f"getSensorReading: {e}")
return data
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Flush accumulated flood readings into the output dict and reset counters.
Args:
value: Mutable output dict to populate with aggregated readings.
Returns:
The updated output dict.
"""
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
for Y, parameters in enumerate(sensor["parameters"]):
try:
value[parameters["sc"]] = self.v_flood[X][Y]["value"]
self.v_flood[X][Y]["value"] = []
self.v_flood[X][Y]["count"] = 0
except Exception as e:
context_logger.error_with_context("Flood", f"putSensorValue: {e}")
return value
[docs]
def putInitvalues(self, value: dict) -> dict:
"""Record each sensor's init status into the shared init-value dict.
Args:
value: Mutable dict updated in-place with a ``"flood"`` key.
Returns:
The updated init-value dict.
"""
_sensor_init = []
for sensor in self.configuration:
_sensor_init.append(sensor["init"])
value.update({"flood": _sensor_init})
return value
[docs]
def getFlood(self, partNo: int, pm: int) -> int | None:
"""Read a flood-level measurement from the correct driver.
Args:
partNo: Hardware part number (71=I2C, 72=UART).
pm: Parameter identifier (currently unused; single measurement).
Returns:
Distance measurement in mm, or None if unsupported.
"""
# TODO: pm is not used since it is not necessary
# look at temp or uv light library for flood function
if partNo == 71:
return self.flood_driver.getFloodData()
if partNo == 72:
return self.flood_UARTport.measure() # returns the data in "mm"
return None
# Example code
# python3 -m OzWrapper.OzFlood.OzFlood
if __name__ == "__main__":
import os
sensor_string = "flood"
file_string = "flood.config.json"
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, file_string)
with open(file_name) as configFile:
sensor = configFile.read()
sensorConfig = json.loads(sensor)
context_logger.info_with_context("flood", f"Config: {sensorConfig}")
sensor = OzFlood()
sensor.initialize(sensorConfig[sensor_string], init_value={})
for _ in range(0, 4):
sensor.getSensorReading()
data = {}
context_logger.info_with_context("flood", f"{sensor.putSensorValue(data)}")