"""Lightning detection sensor wrapper.
Abstracts over the AS3935 lightning detector IC (via I2C) to provide a unified
interface for lightning strike count, estimated distance, and disturber count
readings through the GenericSensor contract.
"""
import json
import os
import time
from drivers.AS3935.AS3935 import AS3935
from drivers.gpio import gpio
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzLightning(GenericSensor):
"""Wrapper for the AS3935 lightning detection sensor.
Manages initialisation, periodic reading, and value aggregation for
lightning strike count, estimated distance, and disturber detection.
Attributes:
uv_as3935: AS3935 I2C driver instance, or None if not configured.
configuration: List of sensor config dicts from the Gateway.
v_uvlight: Nested list holding accumulated readings per sensor/parameter.
"""
uv_as3935 = None
[docs]
def __init__(self) -> None:
"""Initialise the OzLightning wrapper with default state."""
super().__init__()
self.configuration = {}
self.v_uvlight = []
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialise all lightning sensors listed in the Gateway config.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Mutable dict updated with per-sensor init status.
Returns:
True if at least one sensor initialised successfully.
"""
self.configuration = config
context_logger.debug_with_context("Lightning", f"Sensor config: {self.configuration}")
_success = False
for sensor in self.configuration:
try:
if self.partNumber in sensor:
sensor["init"] = self.initializeSensor(sensor)
_success = True
except Exception as e:
context_logger.error_with_context(
"Lightning",
f"Error initializing sensor {sensor} : {e}",
)
sensor["init"] = 0
self.putInitvalues(init_value)
return int(_success)
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialise a single lightning sensor by part number.
Creates the AS3935 I2C driver, takes initial readings with retries,
and stores bootstrap values in ``v_uvlight``.
Args:
sensor: Single sensor configuration dict containing part number,
enable flag, and parameter list.
Returns:
1 if the sensor was initialised successfully, 0 otherwise.
"""
value = []
_success = False
gpio.select_I2C(sensor[self.partNumber])
if sensor[self.partNumber] == 39 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
try:
time.sleep(1)
self.uv_as3935 = AS3935()
time.sleep(2)
for param in parameters:
if param[self.parameter] == 1:
val = 0.0
count = 3
while count > 0:
try:
val = self.getLightning(sensor[self.partNumber], param[self.parameter])
if val >= 0:
count = 0
oldVal = val
_val = {
"value": [val],
"oldvalue": oldVal,
"count": 1,
}
value.append(_val)
_success = True
break
count = count - 1
except Exception as e:
context_logger.error_with_context("Lightning", f"Error getting lightning data: {e}")
count = count - 1
_success = False
if not _success:
_val = {"value": [0], "oldvalue": 0, "count": 1}
value.append(_val)
if param[self.parameter] == 2:
val = 0.0
count = 3
while count > 0:
try:
val = self.getLightning(sensor[self.partNumber], param[self.parameter])
if val >= 0:
count = 0
oldVal = val
_val = {
"value": [val],
"oldvalue": oldVal,
"count": 1,
}
value.append(_val)
_success = True
break
count = count - 1
except Exception as e:
context_logger.error_with_context("Lightning", f"Error getting lightning data: {e}")
count = count - 1
_success = False
time.sleep(2)
if not _success:
_val = {"value": [0], "oldvalue": 0, "count": 1}
value.append(_val)
if param[self.parameter] == 3:
val = 0.0
count = 3
while count > 0:
try:
val = self.getLightning(sensor[self.partNumber], param[self.parameter])
if val >= 0:
count = 0
oldVal = val
_val = {
"value": [val],
"oldvalue": oldVal,
"count": 1,
}
value.append(_val)
_success = True
break
count = count - 1
except Exception as e:
context_logger.error_with_context("Lightning", f"Error getting lightning data: {e}")
count = count - 1
_success = False
time.sleep(2)
if not _success:
_val = {"value": [0], "oldvalue": 0, "count": 1}
value.append(_val)
except Exception as e:
context_logger.error_with_context("Lightning", f"Error initializing sensor {sensor}: {e}")
_success = False
_val = {"value": [0], "oldvalue": 0, "count": 1}
value.append(_val)
self.v_uvlight.append(value)
return int(_success)
[docs]
def getSensorReading(self) -> dict:
"""Read current lightning data from all initialised sensors.
Applies sensitivity and correction offsets, accumulates values for
later averaging, and returns real-time data keyed by short-code.
Returns:
Dict mapping parameter short-codes to their latest readings.
"""
data = {}
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
for Y, parameters in enumerate(sensor["parameters"]):
try:
value = self.getLightning(sensor[self.partNumber], parameters[self.parameter])
if value is not None:
value = round(
((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)),
2,
)
self.v_uvlight[X][Y]["value"].append(value)
self.v_uvlight[X][Y]["count"] += 1
data[parameters["sc"]] = value
else:
context_logger.error_with_context(
"Lightning",
f"getSensorReading failed at [{X}][{Y}] - {parameters['sc']}: value is None",
)
except Exception as e:
context_logger.error_with_context(
"Lightning",
f"getSensorReading failed at [{X}][{Y}] - {parameters['sc']}: {e}",
)
return data
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Flush the last accumulated lightning readings and clear counters.
Clears the AS3935 strike/disturber counters after flushing.
Args:
value: Mutable output dict to populate with the latest readings.
Returns:
The updated output dict.
"""
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
self.clearData()
for Y, parameters in enumerate(sensor["parameters"]):
try:
value[parameters["sc"]] = self.v_uvlight[X][Y]["value"][-1]
self.v_uvlight[X][Y]["value"] = []
self.v_uvlight[X][Y]["count"] = 0
except Exception as e:
context_logger.error_with_context(
"Lightning",
f"putSensorValue failed at [{X}][{Y}] - {parameters['sc']}: {e}",
)
return value
[docs]
def putInitvalues(self, value: dict) -> dict:
"""Record each sensor's init status into the shared init-value dict.
Args:
value: Mutable dict updated in-place with a ``"lightning"`` key.
Returns:
The updated init-value dict.
"""
_sensor_init = []
for sensor in self.configuration:
_sensor_init.append(sensor["init"])
value.update({"lightning": _sensor_init})
return value
[docs]
def clearData(self) -> None:
"""Reset the AS3935 strike count, disturber count, and distance list."""
self.uv_as3935.count_lightning = 0
self.uv_as3935.count_disturber = 0
self.uv_as3935.distance = []
[docs]
def getLightning(self, partNo: int, pm: int) -> int | float:
"""Dispatch a lightning reading to the appropriate sub-method.
Args:
partNo: Hardware part number identifying the sensor model.
pm: Parameter identifier (1=strike count, 2=distance, 3=disturbers).
Returns:
The measured value, or 0.0 if unsupported.
"""
value = 0.0
gpio.select_I2C(partNo)
if partNo == 39:
value = self.getLightningDistance(partNo, pm)
return value
[docs]
def getLightningDistance(self, partNo: int, pm: int) -> int | float:
"""Read lightning data from the AS3935 sensor.
Args:
partNo: Hardware part number (39=AS3935).
pm: Parameter identifier (1=strike count, 2=distance km,
3=disturber count).
Returns:
The measured value, or 0.0 on error.
"""
value = 0.0
try:
if partNo == 39:
if pm == 1:
value = self.uv_as3935.getLightning()
context_logger.info_with_context("Lightning", f"getLightningDistance: {value}")
elif pm == 2:
value = int(self.uv_as3935.getDistance())
context_logger.info_with_context("Lightning", f"getLightningDistance: {value}")
elif pm == 3:
value = int(self.uv_as3935.getDisturber())
context_logger.info_with_context("Lightning", f"getLightningDistance: {value}")
except Exception as e:
context_logger.error_with_context("Lightning", f"getLightningDistance: {e}")
return value
# Example code
# python3 -m OzWrapper.OzLightning.OzLightning
if __name__ == "__main__":
import os
sensor_string = "lightning"
file_string = "lightning.config.json"
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, file_string)
with open(file_name) as configFile:
sensor = configFile.read()
sensorConfig = json.loads(sensor)
context_logger.info_with_context("Lightning", f"sensorConfig: {sensorConfig}")
sensor = OzLightning()
sensor.initialize(sensorConfig[sensor_string], {})
for _ in range(0, 20):
sensor.getSensorReading()
time.sleep(1)
data = {}
context_logger.info_with_context("Lightning", f"Before putSensorValue: {data}")
for _ in range(0, 20):
sensor.getSensorReading()
time.sleep(1)
data = {}
context_logger.info_with_context("Lightning", f"After putSensorValue: {sensor.putSensorValue(data)}")