Source code for OzWrapper.OzLightning.OzLightning

"""Lightning detection sensor wrapper.

Abstracts over the AS3935 lightning detector IC (via I2C) to provide a unified
interface for lightning strike count, estimated distance, and disturber count
readings through the GenericSensor contract.
"""

import json
import os
import time

from drivers.AS3935.AS3935 import AS3935
from drivers.gpio import gpio
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzLightning(GenericSensor): """Wrapper for the AS3935 lightning detection sensor. Manages initialisation, periodic reading, and value aggregation for lightning strike count, estimated distance, and disturber detection. Attributes: uv_as3935: AS3935 I2C driver instance, or None if not configured. configuration: List of sensor config dicts from the Gateway. v_uvlight: Nested list holding accumulated readings per sensor/parameter. """ uv_as3935 = None
[docs] def __init__(self) -> None: """Initialise the OzLightning wrapper with default state.""" super().__init__() self.configuration = {} self.v_uvlight = []
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialise all lightning sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. init_value: Mutable dict updated with per-sensor init status. Returns: True if at least one sensor initialised successfully. """ self.configuration = config context_logger.debug_with_context("Lightning", f"Sensor config: {self.configuration}") _success = False for sensor in self.configuration: try: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) _success = True except Exception as e: context_logger.error_with_context( "Lightning", f"Error initializing sensor {sensor} : {e}", ) sensor["init"] = 0 self.putInitvalues(init_value) return int(_success)
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialise a single lightning sensor by part number. Creates the AS3935 I2C driver, takes initial readings with retries, and stores bootstrap values in ``v_uvlight``. Args: sensor: Single sensor configuration dict containing part number, enable flag, and parameter list. Returns: 1 if the sensor was initialised successfully, 0 otherwise. """ value = [] _success = False gpio.select_I2C(sensor[self.partNumber]) if sensor[self.partNumber] == 39 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] try: time.sleep(1) self.uv_as3935 = AS3935() time.sleep(2) for param in parameters: if param[self.parameter] == 1: val = 0.0 count = 3 while count > 0: try: val = self.getLightning(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) _success = True break count = count - 1 except Exception as e: context_logger.error_with_context("Lightning", f"Error getting lightning data: {e}") count = count - 1 _success = False if not _success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if param[self.parameter] == 2: val = 0.0 count = 3 while count > 0: try: val = self.getLightning(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) _success = True break count = count - 1 except Exception as e: context_logger.error_with_context("Lightning", f"Error getting lightning data: {e}") count = count - 1 _success = False time.sleep(2) if not _success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if param[self.parameter] == 3: val = 0.0 count = 3 while count > 0: try: val = self.getLightning(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) _success = True break count = count - 1 except Exception as e: context_logger.error_with_context("Lightning", f"Error getting lightning data: {e}") count = count - 1 _success = False time.sleep(2) if not _success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) except Exception as e: context_logger.error_with_context("Lightning", f"Error initializing sensor {sensor}: {e}") _success = False _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) self.v_uvlight.append(value) return int(_success)
[docs] def getSensorReading(self) -> dict: """Read current lightning data from all initialised sensors. Applies sensitivity and correction offsets, accumulates values for later averaging, and returns real-time data keyed by short-code. Returns: Dict mapping parameter short-codes to their latest readings. """ data = {} for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): try: value = self.getLightning(sensor[self.partNumber], parameters[self.parameter]) if value is not None: value = round( ((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)), 2, ) self.v_uvlight[X][Y]["value"].append(value) self.v_uvlight[X][Y]["count"] += 1 data[parameters["sc"]] = value else: context_logger.error_with_context( "Lightning", f"getSensorReading failed at [{X}][{Y}] - {parameters['sc']}: value is None", ) except Exception as e: context_logger.error_with_context( "Lightning", f"getSensorReading failed at [{X}][{Y}] - {parameters['sc']}: {e}", ) return data
[docs] def putSensorValue(self, value: dict) -> dict: """Flush the last accumulated lightning readings and clear counters. Clears the AS3935 strike/disturber counters after flushing. Args: value: Mutable output dict to populate with the latest readings. Returns: The updated output dict. """ for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: self.clearData() for Y, parameters in enumerate(sensor["parameters"]): try: value[parameters["sc"]] = self.v_uvlight[X][Y]["value"][-1] self.v_uvlight[X][Y]["value"] = [] self.v_uvlight[X][Y]["count"] = 0 except Exception as e: context_logger.error_with_context( "Lightning", f"putSensorValue failed at [{X}][{Y}] - {parameters['sc']}: {e}", ) return value
[docs] def putInitvalues(self, value: dict) -> dict: """Record each sensor's init status into the shared init-value dict. Args: value: Mutable dict updated in-place with a ``"lightning"`` key. Returns: The updated init-value dict. """ _sensor_init = [] for sensor in self.configuration: _sensor_init.append(sensor["init"]) value.update({"lightning": _sensor_init}) return value
[docs] def clearData(self) -> None: """Reset the AS3935 strike count, disturber count, and distance list.""" self.uv_as3935.count_lightning = 0 self.uv_as3935.count_disturber = 0 self.uv_as3935.distance = []
[docs] def getLightning(self, partNo: int, pm: int) -> int | float: """Dispatch a lightning reading to the appropriate sub-method. Args: partNo: Hardware part number identifying the sensor model. pm: Parameter identifier (1=strike count, 2=distance, 3=disturbers). Returns: The measured value, or 0.0 if unsupported. """ value = 0.0 gpio.select_I2C(partNo) if partNo == 39: value = self.getLightningDistance(partNo, pm) return value
[docs] def getLightningDistance(self, partNo: int, pm: int) -> int | float: """Read lightning data from the AS3935 sensor. Args: partNo: Hardware part number (39=AS3935). pm: Parameter identifier (1=strike count, 2=distance km, 3=disturber count). Returns: The measured value, or 0.0 on error. """ value = 0.0 try: if partNo == 39: if pm == 1: value = self.uv_as3935.getLightning() context_logger.info_with_context("Lightning", f"getLightningDistance: {value}") elif pm == 2: value = int(self.uv_as3935.getDistance()) context_logger.info_with_context("Lightning", f"getLightningDistance: {value}") elif pm == 3: value = int(self.uv_as3935.getDisturber()) context_logger.info_with_context("Lightning", f"getLightningDistance: {value}") except Exception as e: context_logger.error_with_context("Lightning", f"getLightningDistance: {e}") return value
# Example code # python3 -m OzWrapper.OzLightning.OzLightning if __name__ == "__main__": import os sensor_string = "lightning" file_string = "lightning.config.json" dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, file_string) with open(file_name) as configFile: sensor = configFile.read() sensorConfig = json.loads(sensor) context_logger.info_with_context("Lightning", f"sensorConfig: {sensorConfig}") sensor = OzLightning() sensor.initialize(sensorConfig[sensor_string], {}) for _ in range(0, 20): sensor.getSensorReading() time.sleep(1) data = {} context_logger.info_with_context("Lightning", f"Before putSensorValue: {data}") for _ in range(0, 20): sensor.getSensorReading() time.sleep(1) data = {} context_logger.info_with_context("Lightning", f"After putSensorValue: {sensor.putSensorValue(data)}")