Source code for OzWrapper.OzTemp.OzTemp

#!/usr/bin/env python
"""Temperature, humidity, pressure, and dew-point sensor wrapper.

Abstracts over multiple hardware drivers -- BME280, SHT31, SHT41, DS18B20,
ES12248, MPL1152, LPS25HB, and ATH20 -- providing a unified interface for
ambient temperature, relative humidity, barometric pressure, and dew-point
readings through the GenericSensor contract.
"""

import json
import time
from math import log
from typing import ClassVar, Literal

from drivers.ATH20 import ATH20
from drivers.BME280 import BME280
from drivers.DS18B20 import DS18B20
from drivers.ES12248.ES12248 import ES12248
from drivers.gpio import gpio
from drivers.LPS25HB import LPS25HB
from drivers.MPL1152.MPL1152 import MPL1152
from drivers.SHT31 import SHT31
from drivers.SHT41 import SHT41
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzTemp(GenericSensor): """Wrapper for temperature, humidity, pressure, and dew-point sensors. Manages initialisation, periodic reading, and value aggregation for every temperature-family sensor supported by the device. The specific hardware driver instantiated depends on the part number received in the Gateway configuration. Attributes: configuration: List of sensor config dicts from the Gateway. v_temphum: Nested list holding accumulated readings per sensor/parameter. bme280: BME280 driver instance for ambient temperature/humidity/pressure. boxbme280: BME280 driver instance for box-internal readings. sht: SHT31 driver instance. sht_address: I2C address of the active SHT31 sensor. bme_address: Index into ``bme_add`` for the active box BME280 address. bme_add: Candidate I2C addresses for the box BME280. bme_sensor: Whether the primary BME280 address is still valid. mpl1152: MPL1152 barometric-pressure driver instance. """ configuration: ClassVar[list[dict]] = [] v_temphum: ClassVar[list] = [] bme280 = None boxbme280 = None sht = None sht_address = 0 bme_address = 0 bme_add: ClassVar[list] = [0x76, 0x77] bme_sensor = True mpl1152 = None
[docs] def __init__(self) -> None: """Initialise the OzTemp wrapper with default state.""" super().__init__()
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialise all temperature sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. init_value: Mutable dict updated with per-sensor init status. Returns: True if at least one sensor initialised successfully. """ self.configuration = config context_logger.info_with_context("Temp", "Initializing Temperature sensors") _success = False for sensor in self.configuration: try: if "pn" in sensor: sensor["init"] = self.initializeSensor(sensor) _success = True except Exception as e: sensor["init"] = 0 context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) self.putInitvalues(init_value) return _success
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialise a single temperature-family sensor by part number. Creates the appropriate hardware driver, takes an initial reading for each configured parameter, and stores the bootstrap values in ``v_temphum``. Args: sensor: Single sensor configuration dict containing part number, enable flag, GPIO settings, and parameter list. Returns: 1 if the sensor was initialised successfully, 0 otherwise. """ success = False value = [] gpio.select_I2C(sensor[self.partNumber], sensor["pos"] if "pos" in sensor else -1) time.sleep(0.5) if sensor[self.partNumber] == 22 and sensor["en"] == self.baseConfig["oz_enable"]: self.es12248_sensor = ES12248() # This temperature is also used as Tg in WBGT self.es12248_sensor.initialize() parameters = sensor["parameters"] for param in parameters: if param[self.parameter] == 1: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) oldVal = val if val == 0: success = 0 else: success = 1 except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) success = 0 _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) # """ CONTEXT: 🌐 As this sensor is used as Tg in WBGT, humidity is just added # because it is offering new parameter and it is not used in any calculation. """ elif param[self.parameter] == 2: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) oldVal = val except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) if sensor[self.partNumber] == 21 and sensor["en"] == self.baseConfig["oz_enable"]: self.tg_sensor = DS18B20() self.tg_sensor.initialize() parameters = sensor["parameters"] for param in parameters: if param[self.parameter] == 1: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) oldVal = val if val == 0: success = 0 else: success = 1 except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) success = 0 _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) if sensor[self.partNumber] == 23 and sensor["en"] == self.baseConfig["oz_enable"]: port = 0 address = 0x77 if "gpio" in sensor: port = sensor["gpio"]["port"] address = int(sensor["gpio"]["address"], 16) context_logger.info_with_context("Temp", f"Port : {port} addr: {address}") self.bme280 = BME280(i2c_port=0, i2c_addr=address) parameters = sensor["parameters"] for param in parameters: if param[self.parameter] == 1: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) oldVal = val if val == 0: success = False else: success = True except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) elif param[self.parameter] == 2 or param[self.parameter] == 3: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) oldVal = val except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) if sensor[self.partNumber] == 24 and sensor["en"] == self.baseConfig["oz_enable"]: port = 0 address = 0x76 if "gpio" in sensor: port = sensor["gpio"]["port"] address = int(sensor["gpio"]["address"], 16) context_logger.info_with_context("Temp", f"Port : {port} addr: {address}") self.boxbme280 = BME280(i2c_port=0, i2c_addr=self.bme_add[self.bme_address]) parameters = sensor["parameters"] for param in parameters: if param[self.parameter] == 1: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) oldVal = val if val == 0: success = False else: success = True except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) self.boxbme280 = None self.bme_address = 1 self.boxbme280 = BME280(i2c_port=0, i2c_addr=self.bme_add[self.bme_address]) success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) elif param[self.parameter] == 2 or param[self.parameter] == 3: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) oldVal = val if val == 0: success = False else: success = True except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) if self.bme_address == 0: self.bme_sensor = False if sensor[self.partNumber] == 25 and sensor["en"] == self.baseConfig["oz_enable"]: port = 0 self.sht_address = 0x44 if "gpio" in sensor: port = int(sensor["gpio"]["port"]) self.sht_address = int(sensor["gpio"]["address"], 16) context_logger.info_with_context( "Temp", f"Port : {port} addr: {self.sht_address}", ) self.sht = SHT31(i2c_port=0, address=self.sht_address) parameters = sensor["parameters"] for param in parameters: if param[self.parameter] == 1 or param[self.parameter] == 2 or param[self.parameter] == 4: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) oldVal = val if val == 0: success = False else: success = True except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) if sensor[self.partNumber] == 26 and sensor["en"] == self.baseConfig["oz_enable"]: parameters = sensor["parameters"] port = 0 if "gpio" in sensor: if "port" in sensor["gpio"]: port = int(sensor["gpio"]["port"]) self.mpl1152 = MPL1152() for param in parameters: if param[self.parameter] == 3: val = 0.0 count = 3 while count > 0: try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) if val > 0: count = 0 oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) success = True else: count = count - 1 time.sleep(2) except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) count = count - 1 success = False if sensor[self.partNumber] == 27 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] port = 0 if "gpio" in sensor: if "port" in sensor["gpio"]: port = int(sensor["gpio"]["port"]) if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False try: self.sht41 = SHT41() # self.sht41.debug = debug for param in parameters: if param[self.parameter] == 1: val = 0.0 count = 3 while count > 0: try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) success = True break count = count - 1 except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) count = count - 1 success = False if not success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if param[self.parameter] == 2: val = 0.0 count = 3 while count > 0: try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) success = True break count = count - 1 except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) count = count - 1 success = False time.sleep(2) if not success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) success = False _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) # NOTE: 📝 Previously this sensor assigned as part no. 28 # if ( # sensor[self.partNumber] == 28 # and sensor["en"] == self.baseConfig["oz_enable"] # ): # port = 0 # debug = False # if "gpio" in sensor: # if "port" in sensor["gpio"]: # port = int(sensor["gpio"]["port"]) # if "debug" in sensor["gpio"]: # debug = True if sensor["gpio"]["debug"] == 1 else False # self.max31875 = MAX31875(port=port) # self.max31875.DEBUG = debug # parameters = sensor["parameters"] # for param in parameters: # val = 0.0 # oldVal = 0.0 # count = 0 # try: # val = self.getTempHum( # sensor[self.partNumber], param[self.parameter] # ) # oldVal = val # if val == 0: # success = False # else: # success = True # except Exception as e: # context_logger.error_with_context( # "Temp", # f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", # ) # success = False # _val = {"value": [val], "oldvalue": oldVal, "count": 1} # value.append(_val) if sensor[self.partNumber] == 28 and sensor["en"] == self.baseConfig["oz_enable"]: port = 0 address = 0x5D if "gpio" in sensor: port = sensor["gpio"]["port"] address = int(sensor["gpio"]["address"], 16) context_logger.info_with_context("Temp", f"Port : {port} addr: {address}") self.lps25hb = LPS25HB(i2c_port=0, i2c_addr=address) parameters = sensor["parameters"] for param in parameters: if param[self.parameter] == 1: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) oldVal = val if val == 0: success = False else: success = True except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) elif param[self.parameter] == 3: val = 0.0 oldVal = 0.0 count = 0 try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) oldVal = val except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) success = False _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) if sensor[self.partNumber] == 29 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] port = 0 debug = 0 if "gpio" in sensor: if "port" in sensor["gpio"]: port = int(sensor["gpio"]["port"]) if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False try: self.ath20 = ATH20() self.ath20.debug = debug for param in parameters: if param[self.parameter] == 1: val = 0.0 count = 3 while count > 0: try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) success = True break count = count - 1 except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) count = count - 1 success = False if not success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) if param[self.parameter] == 2: val = 0.0 count = 3 while count > 0: try: val = self.getTempHum(sensor[self.partNumber], param[self.parameter]) if val >= 0: count = 0 oldVal = val _val = { "value": [val], "oldvalue": oldVal, "count": 1, } value.append(_val) success = True break count = count - 1 except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) count = count - 1 success = False time.sleep(2) if not success: _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) except Exception as e: context_logger.error_with_context( "Temp", f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}", ) success = False _val = {"value": [0], "oldvalue": 0, "count": 1} value.append(_val) self.v_temphum.append(value) return int(success)
[docs] def getSensorReading(self, calibrationKeys: list[str] | None = None) -> dict: """Read current values from all initialised temperature sensors. Applies sensitivity and correction offsets, accumulates values for later averaging, and returns real-time data keyed by short-code. Args: calibrationKeys: Optional list of short-code keys to restrict which parameters are read. If empty or None, all parameters are read. Returns: Dict mapping parameter short-codes to their latest readings. """ data = {} if calibrationKeys is None: calibrationKeys = [] for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): try: # Check if this parameter index exists in v_temphum if len(self.v_temphum[X]) <= Y: context_logger.warning_with_context( "Temp", f"Parameter index [{X}][{Y}] not initialized for {parameters.get('sc', 'unknown')}", ) continue if len(calibrationKeys) == 0 or parameters["sc"] in calibrationKeys: gpio.select_I2C( sensor[self.partNumber], sensor["pos"] if "pos" in sensor else -1, ) time.sleep(0.5) value = self.getTempHum(sensor[self.partNumber], parameters[self.parameter]) if value is not None: value = round( ((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)), 2, ) self.v_temphum[X][Y]["value"].append(value) self.v_temphum[X][Y]["count"] += 1 data[parameters["sc"]] = value else: context_logger.warning_with_context( "Temp", f"getSensorReading: {sensor[self.partNumber]} {parameters[self.parameter]} has None value", ) except Exception as e: context_logger.error_with_context( "Temp", f"Failed to get sensor reading at [{X}][{Y}] - {parameters['sc']}: {e}", ) return data
[docs] def putSensorValue(self, value: dict, calibrationKeys: list[str] | None = None) -> dict: """Flush accumulated readings into the output dict and reset counters. Optionally triggers the SHT31 heater for humidity-sensor conditioning. Args: value: Mutable output dict to populate with aggregated readings. calibrationKeys: Optional list of short-code keys to restrict which parameters are flushed. Returns: The updated output dict with lists of accumulated values per short-code. """ if calibrationKeys is None: calibrationKeys = [] for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: if sensor["pn"] == 25: gpio.select_I2C( sensor[self.partNumber], sensor["pos"] if "pos" in sensor else -1, ) self.sht.start_heater() for Y, parameters in enumerate(sensor["parameters"]): try: if len(calibrationKeys) == 0 or parameters["sc"] in calibrationKeys: value[parameters["sc"]] = self.v_temphum[X][Y]["value"] self.v_temphum[X][Y]["value"] = [] self.v_temphum[X][Y]["count"] = 0 except Exception as e: context_logger.error_with_context( "Temp", f"Failed to put sensor value at [{X}][{Y}] - {parameters['sc']}: {e}", ) return value
[docs] def putInitvalues(self, value: json) -> json: """Record each sensor's init status into the shared init-value dict. Args: value: Mutable dict updated in-place with a ``"temp"`` key containing a list of per-sensor init statuses. Returns: The updated init-value dict. """ _sensor_init = [] for sensor in self.configuration: _sensor_init.append(sensor["init"]) value.update({"temp": _sensor_init}) return value
[docs] def getTempHum(self, partNo: int, pm: int) -> float: """Dispatch a reading request to the correct measurement sub-method. Args: partNo: Hardware part number identifying the sensor model. pm: Parameter identifier (1=temp, 2=humidity, 3=pressure, 4=dew point). Returns: The measured value rounded to two decimal places. """ value = 0.0 if pm == 1: value = self.getTemp(partNo) elif pm == 2: value = self.getHum(partNo) elif pm == 3: value = self.getPressure(partNo) elif pm == 4: value = self.getDewPoint(partNo) # if partNo == 28: # value = self.getMAXTemp(pm) return round(value, 2)
[docs] def getMAXTemp(self, pm: int) -> float: """Read temperature from the MAX31875 sensor (currently unused). Args: pm: Parameter identifier used to derive the I2C channel. Returns: Temperature in degrees Celsius, rounded to two decimal places. """ try: ch = pm - 7 value = self.max31875.get_temperature(channel=ch) context_logger.info_with_context("Temp", f"MAX31875's temp at Channel {ch} : {value}") except Exception as e: context_logger.error_with_context("Temp", f"Failed to get MAX31875's temp at Channel {ch}: {e}") value = 0.0 return round(value, 2)
[docs] def getTemp(self, partNo: int) -> float | Literal[0]: """Read temperature from the sensor identified by part number. Args: partNo: Hardware part number selecting the driver to query. Returns: Temperature in degrees Celsius, or 0 on failure. """ if partNo == 21: value = self.tg_sensor.get_temperature() value = round(value, 2) context_logger.info_with_context("Temp", f"DS18B20's temp at Part {partNo} : {value}") return value if partNo == 22: value = self.es12248_sensor.get_temperature() value = round(value, 2) context_logger.info_with_context("Temp", f"ES12248's temp at Part {partNo} : {value}") return value if partNo == 23: # value = 23.01 value = self.bme280.get_temperature() value = round(value, 2) context_logger.info_with_context("Temp", f"BME's temp at Part {partNo} : {value}") return value if partNo == 24: value = 0.0 try: value = self.boxbme280.get_temperature() value = round(value, 2) context_logger.info_with_context("Temp", f"BOX-BME's temp at Part {partNo} : {value}") except Exception as e: context_logger.error_with_context("Temp", f"Failed to get BOX-BME's temp at Part {partNo}: {e}") if self.bme_sensor: self.boxbme280 = None if self.bme_address == 0: self.bme_address = 1 else: self.bme_address = 0 self.boxbme280 = BME280(i2c_port=0, i2c_addr=self.bme_add[self.bme_address]) return value if partNo == 25: try: value = self.sht.get_temperature() context_logger.info_with_context("Temp", f"SHT's temp at Part {partNo} : {value}") return value except Exception as e: context_logger.error_with_context("Temp", f"Failed to get SHT's temp at Part {partNo}: {e}") return 0.00 if partNo == 27: try: temp = self.sht41.getTemp()["temp"] context_logger.info_with_context("Temp", f"SHT41's temp at Part {partNo} : {temp}") except Exception as e: context_logger.error_with_context("Temp", f"Failed to get SHT41's temp at Part {partNo}: {e}") temp = 0.00 return round(temp, 2) if partNo == 28: try: value = self.lps25hb.get_temperature() value = round(value, 2) context_logger.info_with_context("Temp", f"LPS25HB's temp at Part {partNo} : {value}") return value except Exception as e: context_logger.error_with_context("Temp", f"Failed to get LPS25HB's temp at Part {partNo}: {e}") return 0.00 if partNo == 29: try: temp = self.ath20.getTemp()["temp"] context_logger.info_with_context("Temp", f"ATH20's temp at Part {partNo} : {temp}") except Exception as e: context_logger.error_with_context("Temp", f"Failed to get ATH20's temp at Part {partNo}: {e}") temp = 0.00 return round(temp, 2) return 0.0
[docs] def getHum(self, partNo: int) -> float | Literal[0]: """Read relative humidity from the sensor identified by part number. Args: partNo: Hardware part number selecting the driver to query. Returns: Relative humidity in percent, or 0 on failure. """ if partNo == 22: # value = 55.01 value = self.es12248_sensor.get_humidity() value = round(value, 2) context_logger.info_with_context("Hum", f"ES12248's humidity at Part {partNo} : {value}") return value if partNo == 23: # value = 55.01 value = self.bme280.get_humidity() value = round(value, 2) context_logger.info_with_context("Hum", f"BME's humidity at Part {partNo} : {value}") return value if partNo == 24: try: # value = 55.01 value = self.boxbme280.get_humidity() value = round(value, 2) context_logger.info_with_context("Hum", f"BOX-BME's humidity at Part {partNo} : {value}") return value except Exception as e: context_logger.error_with_context("Hum", f"Failed to get BOX-BME's humidity at Part {partNo}: {e}") return 0 if partNo == 25: try: humidity = self.sht.get_humidity() context_logger.info_with_context("Hum", f"SHT's humidity at Part {partNo} : {humidity}") return humidity except Exception as e: context_logger.error_with_context("Hum", f"Failed to get SHT's humidity at Part {partNo}: {e}") return 0 if partNo == 27: try: humidity = self.sht41.getHumidity()["hum"] context_logger.info_with_context("Hum", f"SHT41's humidity at Part {partNo} : {humidity}") except Exception as e: context_logger.error_with_context("Hum", f"Failed to get SHT41's humidity at Part {partNo}: {e}") humidity = 0.00 return round(humidity, 2) if partNo == 29: try: humidity = self.ath20.getHum()["hum"] context_logger.info_with_context("Hum", f"ATH20's humidity at Part {partNo} : {humidity}") except Exception as e: context_logger.error_with_context("Hum", f"Failed to get ATH20's humidity at Part {partNo}: {e}") humidity = 0.00 return round(humidity, 2) return 0.0
[docs] def getPressure(self, partNo: int) -> float | Literal[0]: """Read barometric pressure from the sensor identified by part number. Args: partNo: Hardware part number selecting the driver to query. Returns: Pressure in hPa, or 0 on failure. """ if partNo == 23: # value = 1000.1 value = self.bme280.get_pressure() value = round(value, 2) context_logger.info_with_context("Pressure", f"BME's pressure at Part {partNo} : {value}") return value if partNo == 24: try: # value = 1000.1 value = self.boxbme280.get_pressure() value = round(value, 2) context_logger.info_with_context("Pressure", f"BOX-BME's pressure at Part {partNo} : {value}") return value except Exception as e: context_logger.error_with_context( "Pressure", f"Failed to get BOX-BME's pressure at Part {partNo}: {e}", ) return 0 if partNo == 26: # value = 1000.1 value = self.mpl1152.getPressure() value = round(value, 2) context_logger.info_with_context("Pressure", f"MPL1152's pressure at Part {partNo} : {value}") return value if partNo == 28: try: # value = 1000.1 value = self.lps25hb.get_pressure() value = round(value, 2) context_logger.info_with_context("Pressure", f"LPS25HB's pressure at Part {partNo} : {value}") return value except Exception as e: context_logger.error_with_context( "Pressure", f"Failed to get LPS25HB's pressure at Part {partNo}: {e}", ) return 0 return 0
# no pressure in sht
[docs] def getDewPoint(self, partNo: int) -> float | Literal[0]: """Calculate dew point using the Magnus-Tetens formula. Requires temperature and humidity from an SHT31 sensor (part 25). Args: partNo: Hardware part number (only 25 is supported). Returns: Dew point in degrees Celsius, or 0 on failure. """ if partNo == 25: try: cTemp = self.sht.get_temperature() humidity = self.sht.get_humidity() # According to Magnus-Tetens formula emperical_a = 17.27 emperical_b = 237.7 intermidiate_variable = log(humidity / 100) + ((emperical_a * cTemp) / (emperical_b + cTemp)) dewPoint = (emperical_b * intermidiate_variable) / (emperical_a - intermidiate_variable) context_logger.info_with_context("DewPoint", f"SHT's dew point at Part {partNo} : {dewPoint}") return dewPoint except Exception as e: context_logger.error_with_context("DewPoint", f"Failed to get SHT's dew point at Part {partNo}: {e}") return 0
# Example code # python3 -m OzWrapper.OzTemp.OzTemp if __name__ == "__main__": import os dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, "temp.config.json") with open(file_name) as tempConfigFile: temp = tempConfigFile.read() tempConfig = json.loads(temp) context_logger.info_with_context("TempConfig", f"Loaded temp config: {tempConfig}") temp = OzTemp() temp.initialize(tempConfig["temp"], init_value={}) for _ in range(0, 4): temp.getSensorReading() data = {} context_logger.info_with_context("SensorData", f"Put sensor value result: {temp.putSensorValue(data)}")