Source code for OzWrapper.OzUVLight.OzUVLight

"""UV light, visible light, IR, and solar radiation sensor wrapper.

Abstracts over multiple light sensor drivers -- TSL2591, VEML6070, Si1133,
Si1147, LTR390, and RikaPyro -- providing a unified interface for lux,
UV index, visible light, infrared, and solar irradiance readings through
the GenericSensor contract.
"""

import json
import os
import time
from typing import ClassVar

from drivers import veml6070
from drivers.gpio import gpio
from drivers.LTR390 import LTR390
from drivers.RikaPyro import RikaPyro
from drivers.Si1133.Si1133 import Si1133
from drivers.Si1147 import Si1147
from drivers.TSL2591 import TSL2591
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzUVLight(GenericSensor): """Wrapper for UV, visible light, IR, and solar radiation sensors. Manages initialisation, periodic reading, and value aggregation for multiple light sensor ICs. The specific driver instantiated depends on the part number in the Gateway configuration. Attributes: configuration: List of sensor config dicts from the Gateway. v_uvlight: Nested list holding accumulated readings per sensor/parameter. light: TSL2591 visible-light driver instance. uv: VEML6070 UVA driver instance. uv_si1133: Si1133 UV driver instance. uv_si1147: Si1147 multi-spectrum driver instance. uv_ltr390: LTR390 UV/ambient-light driver instance. rika_pyro: RikaPyro solar pyranometer driver instance. light_full: Cached full-spectrum reading from TSL2591. """ configuration: ClassVar[dict] = {} v_uvlight: ClassVar[list] = [] light = None uv = None uv_si1133 = None uv_si1147 = None uv_ltr390 = None rika_pyro = None light_full = None
[docs] def __init__(self) -> None: """Initialise the OzUVLight wrapper with default state.""" super().__init__()
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialise all UV/light sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. init_value: Mutable dict updated with per-sensor init status. Returns: True if at least one sensor initialised successfully. """ self.configuration = config context_logger.info_with_context("UV/LIGHT", "Initializing UV/Light sensors") _success = False for sensor in self.configuration: try: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) _success = True except Exception as e: context_logger.error_with_context("UV/LIGHT", f"Failed to initialize UV/Light sensor: {e}") sensor["init"] = 0 self.putInitvalues(init_value) return _success
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialise a single UV/light sensor by part number. Creates the appropriate hardware driver, configures gain/timing, takes initial readings with retries, and stores bootstrap values in ``v_uvlight``. Args: sensor: Single sensor configuration dict containing part number, enable flag, GPIO/I2C settings, and parameter list. Returns: 1 if the sensor was initialised successfully, 0 otherwise. """ value = [] _success = False gpio.select_I2C(sensor[self.partNumber]) if sensor[self.partNumber] == 32 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] port = 0 if "gpio" in sensor: if "port" in sensor["gpio"]: port = int(sensor["gpio"]["port"]) try: self.light = TSL2591(port) # gain set low for bright light since device is for outdoor for indoor it has to be high self.light.set_gain(self.light.GAIN_LOW) # same for integration time it has to be low for outdoor and for indoor it has to be high self.light.set_timing(self.light.INTEGRATIONTIME_100MS) for param in parameters: if param[self.parameter] == 1 or param[self.parameter] == 2 or param[self.parameter] == 3: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) oldVal = val _success = True except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (TSL2591) sensor: {e}", ) _success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (TSL2591) sensor: {e}", ) _success = False _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if sensor[self.partNumber] == 33 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] try: self.uv = veml6070.Veml6070() self.uv.set_integration_time(veml6070.INTEGRATIONTIME_2T) for param in parameters: if param[self.parameter] == 2: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) oldVal = val _success = True except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (VEML6070) sensor: {e}", ) _success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (VEML6070) sensor: {e}", ) _success = False _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if sensor[self.partNumber] == 34 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] port = 0 if "gpio" in sensor: if "port" in sensor["gpio"]: port = int(sensor["gpio"]["port"]) try: self.uv_si1133 = Si1133(port) self.uv_si1133.begin() for param in parameters: if param[self.parameter] == 2: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) oldVal = val _success = True except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1133) sensor: {e}", ) _success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1133) sensor: {e}", ) _success = False _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if sensor[self.partNumber] == 35 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] port = 0 if "gpio" in sensor: if "port" in sensor["gpio"]: port = int(sensor["gpio"]["port"]) try: time.sleep(1) self.uv_si1147 = Si1147(port) context_logger.debug_with_context("UV/LIGHT", "SI1147 sensor initialized successfully") time.sleep(2) for param in parameters: if param[self.parameter] == 1: val = 0.0 count = 3 while count > 0: try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) _success = True break count = count - 1 self.uv_si1147.begin() time.sleep(2) except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1147) sensor: {e}", ) count = count - 1 _success = False if not _success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if param[self.parameter] == 2: val = 0.0 count = 3 while count > 0: try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) _success = True break count = count - 1 self.uv_si1147.begin() time.sleep(2) except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1147) sensor: {e}", ) count = count - 1 _success = False time.sleep(2) if not _success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if param[self.parameter] == 3: val = 0.0 count = 3 while count > 0: try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) _success = True break count = count - 1 self.uv_si1147.begin() time.sleep(2) except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1147) sensor: {e}", ) count = count - 1 _success = False time.sleep(2) if not _success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if param[self.parameter] == 4: val = 0.0 count = 3 while count >= 0: try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) _success = True break count = count - 1 self.uv_si1147.begin() time.sleep(2) except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1147) sensor: {e}", ) count = count - 1 _success = False time.sleep(2) if not _success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1147) sensor: {e}", ) _success = False _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if sensor[self.partNumber] == 36 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] port = 0 if "gpio" in sensor: if "port" in sensor["gpio"]: port = int(sensor["gpio"]["port"]) try: time.sleep(1) self.uv_ltr390 = LTR390(port) time.sleep(2) for param in parameters: if param[self.parameter] == 1: val = 0.0 count = 3 while count > 0: try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) _success = True break count = count - 1 except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1147) sensor: {e}", ) count = count - 1 _success = False if not _success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if param[self.parameter] == 2: val = 0.0 count = 3 while count > 0: try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) _success = True break count = count - 1 except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1147) sensor: {e}", ) count = count - 1 _success = False time.sleep(2) if not _success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if param[self.parameter] == 3: val = 0.0 count = 3 while count > 0: try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) _success = True break count = count - 1 except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1147) sensor: {e}", ) count = count - 1 _success = False time.sleep(2) if not _success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if param[self.parameter] == 4: val = 0.0 count = 3 while count > 0: try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) _success = True break count = count - 1 except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1147) sensor: {e}", ) count = count - 1 _success = False time.sleep(2) if not _success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1147) sensor: {e}", ) _success = False _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if sensor[self.partNumber] == 37 and sensor["en"] == self.baseConfig["oz_enable"]: parameters = sensor["parameters"] port = "/dev/ttyAMA2" baud = 9600 debug = True if "gpio" in sensor: if "port" in sensor["gpio"]: port = sensor["gpio"]["port"] if "baud" in sensor["gpio"]: baud = sensor["gpio"]["baud"] if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False context_logger.info_with_context( "UV/LIGHT", f"Initializing UV/Light (SI1147) sensor on port {port} with baud {baud}", ) try: self.rika_pyro = RikaPyro() self.rika_pyro.initialize(rika_pyro_port=port, baud=baud) self.rika_pyro.DEBUG = debug for param in parameters: if param[self.parameter] == 1: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getUVLight(sensor[self.partNumber], param[self.parameter]) oldVal = val _success = True except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to get UV/Light (SI1147) sensor reading: {e}", ) _success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"Failed to initialize UV/Light (SI1147) sensor: {e}", ) _success = False _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) self.v_uvlight.append(value) # oldvalues = [obj['oldvalue'] for obj in value if 'oldvalue' in obj] # _success = 1 if sum(oldvalues) > 0 else 0 return int(_success)
[docs] def getSensorReading(self) -> dict: """Read current values from all initialised UV/light sensors. Applies sensitivity and correction offsets, accumulates values for later averaging, and returns real-time data keyed by short-code. Returns: Dict mapping parameter short-codes to their latest readings. """ data = {} for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): try: value = self.getUVLight(sensor[self.partNumber], parameters[self.parameter]) if value is not None: value = round( ((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)), 2, ) self.v_uvlight[X][Y]["value"].append(value) self.v_uvlight[X][Y]["count"] += 1 data[parameters["sc"]] = value else: context_logger.warning_with_context( "UV/LIGHT", f"getSensorReading: {sensor[self.partNumber]} {parameters[self.parameter]} value is None", ) except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"getSensorReading failed at [{X}][{Y}] - {parameters['sc']}: {e}", ) return data
[docs] def putSensorValue(self, value: dict) -> dict: """Flush accumulated UV/light readings into the output dict and reset. Args: value: Mutable output dict to populate with aggregated readings. Returns: The updated output dict. """ for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): try: value[parameters["sc"]] = self.v_uvlight[X][Y]["value"] self.v_uvlight[X][Y]["value"] = [] self.v_uvlight[X][Y]["count"] = 0 except Exception as e: context_logger.error_with_context( "UV/LIGHT", f"putSensorValue failed at [{X}][{Y}] - {parameters['sc']}: {e}", ) return value
[docs] def putInitvalues(self, value: dict) -> dict: """Record each sensor's init status into the shared init-value dict. Args: value: Mutable dict updated in-place with a ``"uvlight"`` key. Returns: The updated init-value dict. """ _sensor_init = [] for sensor in self.configuration: _sensor_init.append(sensor["init"]) value.update({"uvlight": _sensor_init}) return value
[docs] def getUVLight(self, partNo: int, pm: int) -> float: """Dispatch a UV/light reading to the appropriate sub-method. Routes to ``getLight`` for TSL2591 (part 32), ``rika_pyro`` for pyranometer (part 37), or ``getUV`` for all other UV/light sensors. Args: partNo: Hardware part number identifying the sensor model. pm: Parameter identifier selecting the measurement type. Returns: The measured value as a float. """ value = 0.0 gpio.select_I2C(partNo) if partNo == 32: value = self.getLight(partNo, pm) elif partNo == 37: value = self.rika_pyro.getSolar() context_logger.info_with_context("LIGHT", f"getUVLight: {partNo} {pm} : {value}") else: value = self.getUV(partNo, pm) return value
[docs] def getLight(self, partNo: int, pm: int) -> float: """Read visible-light data from the TSL2591 sensor. Args: partNo: Hardware part number (32=TSL2591). pm: Parameter identifier (1=lux, 2=UV-equivalent, 3=IR). Returns: The measured value as a float, or 0.0 on error. """ value = 0.0 try: if partNo == 32: if pm == 1: self.light_full = self.light.get_current() value = round(self.light_full["lux"], 4) context_logger.info_with_context("LIGHT", f"getLight: {partNo} {pm} : {value}") elif pm == 2: full = self.light_full["full"] value = round((full / ((100.0 * 1.0) / 408.0)) * 0.0079, 4) context_logger.info_with_context("LIGHT", f"getLight: {partNo} {pm} : {value}") elif pm == 3: value = round(self.light_full["ir"], 4) context_logger.info_with_context("LIGHT", f"getLight: {partNo} {pm} : {value}") except Exception as e: context_logger.error_with_context("LIGHT", f"getLight: {e}") return value
[docs] def getUV(self, partNo: int, pm: int) -> float: """Read UV or multi-spectrum data from UV sensor ICs. Supports VEML6070, Si1133, Si1147, and LTR390. Args: partNo: Hardware part number identifying the sensor model. pm: Parameter identifier (varies by sensor: UV index, visible, IR, lux, UVS, etc.). Returns: The measured value as a float, or 0.0 on error. """ value = 0.0 try: if partNo == 33: self.uv.get_uva_light_intensity_raw() value = round(self.uv.get_uva_light_intensity(), 4) context_logger.info_with_context("UV", f"getUV: {partNo} {pm} : {value}") return value if partNo == 34: value = self.uv_si1133.readUV() context_logger.info_with_context("UV", f"getUV: {partNo} {pm} : {value}") return value if partNo == 35: if pm == 1: uv = self.uv_si1147.readUV() / 100.0 vis = self.uv_si1147.readVisible() ir = self.uv_si1147.readIR() value = (-224.739 * uv) + (1.410168 * vis) + (0.036012 * ir) - 397.597 context_logger.info_with_context("Spectrum", f"getUV: {partNo} {pm} : {value}") return value if pm == 2: vis_value = self.uv_si1147.readVisible() uv_value = self.uv_si1147.readUV() value = uv_value / 100.0 if value > 15: value = -1 if vis_value == 0 and value == 0: value = -1 context_logger.info_with_context("UV", f"getUV: {partNo} {pm} : {value}") return value if pm == 3: value = self.uv_si1147.readVisible() context_logger.info_with_context("Visible", f"getUV: {partNo} {pm} : {value}") return value if pm == 4: value = self.uv_si1147.readIR() context_logger.info_with_context("IR", f"getUV: {partNo} {pm} : {value}") return value if partNo == 36: if pm == 1: value = self.uv_ltr390.getLux() context_logger.info_with_context("Lux", f"getUV: {partNo} {pm} : {value}") return value if pm == 2: value = self.uv_ltr390.getUV() context_logger.info_with_context("UV", f"getUV: {partNo} {pm} : {value}") return value if pm == 3: value = self.uv_ltr390.getLight() context_logger.info_with_context("Light", f"getUV: {partNo} {pm} : {value}") return value if pm == 4: value = self.uv_ltr390.getUVS() context_logger.info_with_context("UVS", f"getUV: {partNo} {pm} : {value}") return value except Exception as e: context_logger.error_with_context("UV", f"getUV: {e}") return value
# Example code # python3 -m OzWrapper.OzUVLight.OzUVLight if __name__ == "__main__": import os sensor_string = "uvlight" file_string = "uvlight.config.json" dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, file_string) with open(file_name) as configFile: sensor = configFile.read() sensorConfig = json.loads(sensor) context_logger.info_with_context("UVLight", f"Config loaded: {sensorConfig}") sensor = OzUVLight() sensor.initialize(sensorConfig[sensor_string], {}) for _ in range(0, 4): sensor.getSensorReading() data = {} context_logger.info_with_context("UVLight", f"Put sensor value result: {sensor.putSensorValue(data)}")