Source code for Indices.Indices

"""Environmental index calculator for derived meteorological parameters.

Computes derived environmental indices from raw sensor data, including
Wet Bulb Globe Temperature (WBGT), air density, dew point, vapor pressure,
evapotranspiration, heat index, wind chill, and Humidex. These indices are
identified by keys ``i1`` through ``i10`` in the data payload and are
configured per-device via the Gateway.

The calculations follow standard meteorological formulas from WMO and
ASHRAE references. Each index method accepts the current data payload
and returns the computed value, or ``None`` if required input parameters
are missing.

Typical usage::

    from Indices import Indices

    idx = Indices()
    idx.updateIndices(["i1", "i3", "i5"], data_payload, realtime_payload)
"""

from math import atan, exp, log

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class Indices: """Calculator for derived environmental indices. Computes meteorological and environmental comfort indices from raw sensor readings. Each index is identified by a string key (``i1`` through ``i10``) and computed from temperature, humidity, pressure, wind speed, and solar radiation inputs. Index mapping: - ``i1``: WBGT indoor (Wet Bulb Globe Temperature) - ``i2``: WBGT outdoor - ``i3``: Air density (kg/m3) - ``i4``: Natural wet bulb temperature - ``i5``: Internal dew point - ``i6``: External dew point - ``i7``: Vapor pressure - ``i8``: Evapotranspiration (Penman-Monteith) - ``i9``: Heat index / Wind chill - ``i10``: Humidex """
[docs] def __init__(self) -> None: """Initialize the Indices calculator.""" pass
[docs] def updateIndices(self, indices_list: list, d_data_payload: dict, r_data_payload: dict) -> None: """Compute and inject all requested indices into the data payload. Iterates through the list of requested index keys, computes each one from the current sensor data, and updates the data payload dictionary in place with the results. Args: indices_list: List of index key strings (e.g., ``["i1", "i3"]``). d_data_payload: Data payload dictionary to read inputs from and write computed indices into. r_data_payload: Realtime data payload, used by some indices (e.g., evapotranspiration) that need recent history. """ for index in indices_list: try: if index == "i1": i1 = self.calculate_wbgt_indoor(d_data_payload) if i1 is not None: d_data_payload.update({"i1": i1}) elif index == "i2": i2 = self.calculate_wbgt_outdoor(d_data_payload) if i2 is not None: d_data_payload.update({"i2": i2}) elif index == "i3": i3 = self.calculate_air_density(d_data_payload) if i3 is not None: d_data_payload.update({"i3": i3}) elif index == "i4": i4 = self.calculate_tnwb(d_data_payload) if i4 is not None: d_data_payload.update({"i4": i4}) elif index == "i5": i5 = self.calculate_internal_dew_point(d_data_payload) if i5 is not None: d_data_payload.update({"i5": i5}) elif index == "i6": i6 = self.calculate_external_dew_point(d_data_payload) if i6 is not None: d_data_payload.update({"i6": i6}) elif index == "i7": i7 = self.calculate_vapour_pressure(d_data_payload) if i7 is not None: d_data_payload.update({"i7": i7}) elif index == "i8": i8 = self.calculate_evapotranspiration(d_data_payload, r_data_payload) if i8 is not None: d_data_payload.update({"i8": i8}) elif index == "i9": i9 = self.calculate_cloudbase_height(d_data_payload, r_data_payload) if i9 is not None: d_data_payload.update({"i9": i9}) except Exception as e: context_logger.error_with_context("Indices", f"Error calculating {index}: {e}")
[docs] def calculate_wbgt_indoor(self, payload: dict) -> float | None: """Calculate indoor Wet Bulb Globe Temperature (WBGT). Uses the formula ``WBGT_indoor = 0.7 * Tnwb + 0.3 * Tg`` where *Tnwb* is the natural wet-bulb temperature and *Tg* is the globe temperature. Args: payload: Sensor data dictionary containing keys ``"t3"`` (globe temp), ``"temp"`` (dry-bulb temp), and ``"hum"`` (relative humidity). Returns: Indoor WBGT in degrees Celsius rounded to 2 decimal places, or ``None`` if required inputs are missing. """ try: tg_value = payload.get("t3") tdb_value = payload.get("temp") rh_value = payload.get("hum") if tg_value is None or tdb_value is None or rh_value is None: return None tnwb_value = self.calculate_tnwb(payload) if tnwb_value is None: return None wbgt_indoor = round(0.7 * tnwb_value + 0.3 * tg_value, 2) return wbgt_indoor except Exception as e: context_logger.error_with_context("Indices", f"Error calculating wbgt_indoor (i1): {e}") return None
[docs] def calculate_wbgt_outdoor(self, payload: dict) -> float | None: """Calculate outdoor Wet Bulb Globe Temperature (WBGT). Uses the formula ``WBGT_outdoor = 0.7 * Tnwb + 0.2 * Tg + 0.1 * Tdb`` which adds a dry-bulb component for outdoor solar exposure. Args: payload: Sensor data dictionary containing keys ``"t3"`` (globe temp), ``"temp"`` (dry-bulb temp), and ``"hum"`` (relative humidity). Returns: Outdoor WBGT in degrees Celsius rounded to 2 decimal places, or ``None`` if required inputs are missing. """ try: tg_value = payload.get("t3") tdb_value = payload.get("temp") rh_value = payload.get("hum") if tg_value is None or tdb_value is None or rh_value is None: return None tnwb_value = self.calculate_tnwb(payload) if tnwb_value is None: return None wbgt_outdoor = round(0.7 * tnwb_value + 0.2 * tg_value + 0.1 * tdb_value, 2) return wbgt_outdoor except Exception as e: context_logger.error_with_context("Indices", f"Error calculating wbgt_outdoor (i2): {e}") return None
[docs] def calculate_tnwb(self, payload: dict) -> float | None: """Calculate natural wet-bulb temperature (Tnwb) using the Stull (2011) formula. Args: payload: Sensor data dictionary containing ``"temp"`` (dry-bulb temperature in degrees C) and ``"hum"`` (relative humidity in %). Returns: Tnwb in degrees Celsius, or ``None`` if required inputs are missing. """ try: Tdb = payload.get("temp") Rh = payload.get("hum") if Tdb is None or Rh is None: return None return ( Tdb * atan(0.151977 * (Rh + 8.313659) ** 0.5) + atan(Tdb + Rh) - atan(Rh - 1.676331) + 0.00391838 * (Rh**1.5) * atan(0.023101 * Rh) - 4.686035 ) except Exception as e: context_logger.error_with_context("Indices", f"Error calculating tnwb (i4): {e}") return None
[docs] def calculate_air_density(self, payload: dict) -> float | None: """Calculate air density using the ideal gas law. Converts temperature to Kelvin and pressure to Pascals, then applies ``rho = P / (R_specific * T)`` with ``R_specific = 287.05``. Args: payload: Sensor data dictionary containing ``"temp"`` (degrees C) and ``"pr"`` (pressure in hPa). Returns: Air density in kg/m^3 rounded to 2 decimal places, or ``None`` if required inputs are missing. """ try: temp = payload.get("temp") # in °C pressure = payload.get("pr") # in hPa if temp is None or pressure is None: return None temp = temp + 273.15 # Convert to Kelvin pressure = pressure * 100 # Convert to Pa # Calculate air density using the ideal gas law air_density = pressure / (287.05 * temp) return round(air_density, 2) except Exception as e: context_logger.error_with_context("Indices", f"Error calculating air_density (i3): {e}") return None
[docs] def calculate_internal_dew_point(self, payload: dict) -> float | None: """Calculate the internal (box) dew point using the Magnus formula. Uses the enclosure temperature (``"t1"``) and enclosure humidity (``"t2"``) readings from the BME280 inside the device housing. Args: payload: Sensor data dictionary containing ``"t1"`` (internal temp in degrees C) and ``"t2"`` (internal humidity in %). Returns: Dew point in degrees Celsius rounded to 2 decimal places, or ``None`` if required inputs are missing. """ try: temp = payload.get("t1") # in °C hum = payload.get("t2") # in % if temp is None or hum is None: return None # Calculate dew point using the Magnus formula a = 17.27 b = 237.7 alpha = ((a * temp) / (b + temp)) + log(hum / 100.0) dew_point = (b * alpha) / (a - alpha) return round(dew_point, 2) except Exception as e: context_logger.error_with_context("Indices", f"Error calculating internal_dew_point (i5): {e}") return None
[docs] def calculate_external_dew_point(self, payload: dict) -> float | None: """Calculate the external (ambient) dew point using the Magnus formula. Uses the external SHT31 temperature (``"temp"``) and humidity (``"hum"``) readings. Args: payload: Sensor data dictionary containing ``"temp"`` (ambient temp in degrees C) and ``"hum"`` (ambient humidity in %). Returns: Dew point in degrees Celsius rounded to 2 decimal places, or ``None`` if required inputs are missing. """ try: temp = payload.get("temp") # in °C hum = payload.get("hum") # in % if temp is None or hum is None: return None # Calculate dew point using the Magnus formula a = 17.27 b = 237.7 alpha = ((a * temp) / (b + temp)) + log(hum / 100.0) dew_point = (b * alpha) / (a - alpha) return round(dew_point, 2) except Exception: return None
[docs] def calculate_vapour_pressure(self, payload: dict) -> float | None: """Calculate actual vapour pressure from temperature and humidity. Computes saturation vapour pressure via the Buck equation, then scales by relative humidity. Args: payload: Sensor data dictionary containing ``"temp"`` (degrees C) and ``"hum"`` (relative humidity in %). Returns: Vapour pressure in hPa rounded to 2 decimal places, or ``None`` if required inputs are missing. """ try: temp = payload.get("temp") # in °C hum = payload.get("hum") # in % if temp is None or hum is None: return None saturation_vapour_pressure = 6.112 * exp(17.67 * temp / (temp + 243.5)) vapour_pressure = hum * saturation_vapour_pressure / 100 return round(vapour_pressure, 2) except Exception as e: context_logger.error_with_context("Indices", f"Error calculating vapour_pressure (i6): {e}") return None
[docs] def calculate_evapotranspiration(self, d_data_payload: dict, r_data_payload: dict) -> float | None: """Calculate reference evapotranspiration using the FAO Penman-Monteith method. Requires realtime temperature and humidity arrays (min/max over the interval) plus averaged pressure, solar radiation, and wind speed. Args: d_data_payload: Averaged data payload containing ``"pr"`` (hPa), ``"solar"`` (W/m2), and ``"ws"`` (wind speed in m/s). r_data_payload: Realtime data payload containing ``"temp"`` and ``"hum"`` as ``[values_list, timestamps_list]`` pairs. Returns: Evapotranspiration in mm/day rounded to 2 decimal places, or ``None`` if required inputs are missing. """ try: temp_data = r_data_payload.get("temp") # in °C hum_data = r_data_payload.get("hum") # in % pressure = d_data_payload.get("pr") # in hPa solar = d_data_payload.get("solar") # in W/m2 wind_speed = d_data_payload.get("ws") # in m/s if temp_data is None or hum_data is None or pressure is None or solar is None or wind_speed is None: return None temp_list = temp_data[0] hum_list = hum_data[0] t_max = max(temp_list) t_min = min(temp_list) t_mean = (t_max + t_min) / 2 rh_max = max(hum_list) rh_min = min(hum_list) e0_Tmax = 0.6108 * exp((17.27 * t_max) / (t_max + 237.3)) e0_Tmin = 0.6108 * exp((17.27 * t_min) / (t_min + 237.3)) es = e0_Tmax + e0_Tmin / 2 ea = e0_Tmin * (rh_max / 100) + e0_Tmax * (rh_min / 100) / 2 vpd = es - ea delta = (4096 * (0.6108 * exp((17.27 * t_mean) / (t_mean + 237.3)))) / ((t_mean + 237.3) ** 2) gamma = 0.000665 * pressure evapotranspiration = ((0.408 * delta * solar) + (gamma * 900 / (t_mean + 273) * vpd * wind_speed)) / ( delta + gamma * (1 + 0.34 * wind_speed) ) return round(evapotranspiration, 2) except Exception as e: context_logger.error_with_context("Indices", f"Error calculating evapotranspiration (i7): {e}") return None
[docs] def calculate_cloud_base_height(self, payload: dict, r_data: dict) -> float | None: """Estimate cloud base height from temperature and dew point spread. Uses the rule of thumb ``CBH = (T - Td) * 125`` where *T* is the ambient temperature and *Td* is the external dew point. Args: payload: Sensor data dictionary containing ``"temp"`` and ``"hum"`` (used to derive dew point). r_data: Realtime data payload (unused, kept for interface consistency). Returns: Cloud base height in metres rounded to 2 decimal places, or ``None`` if required inputs are missing. """ try: temp = payload.get("temp") external_dew_point = self.calculate_external_dew_point(payload) if temp is None or external_dew_point is None: return None # Calculate cloud base height using the formula cloud_base_height = (temp - external_dew_point) * 125 # In meter return round(cloud_base_height, 2) except Exception as e: context_logger.error_with_context("Indices", f"Error calculating cloud_base_height (i9): {e}") return None