#!/usr/bin/env python
"""Temperature, humidity, pressure, and dew-point sensor wrapper.
Abstracts over multiple hardware drivers -- BME280, SHT31, SHT41, DS18B20,
ES12248, MPL1152, LPS25HB, and ATH20 -- providing a unified interface for
ambient temperature, relative humidity, barometric pressure, and dew-point
readings through the GenericSensor contract.
"""
import json
import time
from math import log
from typing import ClassVar, Literal
from drivers.ATH20 import ATH20
from drivers.BME280 import BME280
from drivers.DS18B20 import DS18B20
from drivers.ES12248.ES12248 import ES12248
from drivers.gpio import gpio
from drivers.LPS25HB import LPS25HB
from drivers.MPL1152.MPL1152 import MPL1152
from drivers.SHT31 import SHT31
from drivers.SHT41 import SHT41
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzTemp(GenericSensor):
"""Wrapper for temperature, humidity, pressure, and dew-point sensors.
Manages initialisation, periodic reading, and value aggregation for every
temperature-family sensor supported by the device. The specific hardware
driver instantiated depends on the part number received in the Gateway
configuration.
Attributes:
configuration: List of sensor config dicts from the Gateway.
v_temphum: Nested list holding accumulated readings per sensor/parameter.
bme280: BME280 driver instance for ambient temperature/humidity/pressure.
boxbme280: BME280 driver instance for box-internal readings.
sht: SHT31 driver instance.
sht_address: I2C address of the active SHT31 sensor.
bme_address: Index into ``bme_add`` for the active box BME280 address.
bme_add: Candidate I2C addresses for the box BME280.
bme_sensor: Whether the primary BME280 address is still valid.
mpl1152: MPL1152 barometric-pressure driver instance.
"""
configuration: ClassVar[list[dict]] = []
v_temphum: ClassVar[list] = []
bme280 = None
boxbme280 = None
sht = None
sht_address = 0
bme_address = 0
bme_add: ClassVar[list] = [0x76, 0x77]
bme_sensor = True
mpl1152 = None
[docs]
def __init__(self) -> None:
"""Initialise the OzTemp wrapper with default state."""
super().__init__()
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialise all temperature sensors listed in the Gateway config.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Mutable dict updated with per-sensor init status.
Returns:
True if at least one sensor initialised successfully.
"""
self.configuration = config
context_logger.info_with_context("Temp", "Initializing Temperature sensors")
_success = False
for sensor in self.configuration:
try:
if "pn" in sensor:
sensor["init"] = self.initializeSensor(sensor)
_success = True
except Exception as e:
sensor["init"] = 0
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
self.putInitvalues(init_value)
return _success
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialise a single temperature-family sensor by part number.
Creates the appropriate hardware driver, takes an initial reading for
each configured parameter, and stores the bootstrap values in
``v_temphum``.
Args:
sensor: Single sensor configuration dict containing part number,
enable flag, GPIO settings, and parameter list.
Returns:
1 if the sensor was initialised successfully, 0 otherwise.
"""
success = False
value = []
gpio.select_I2C(sensor[self.partNumber], sensor["pos"] if "pos" in sensor else -1)
time.sleep(0.5)
if sensor[self.partNumber] == 22 and sensor["en"] == self.baseConfig["oz_enable"]:
self.es12248_sensor = ES12248() # This temperature is also used as Tg in WBGT
self.es12248_sensor.initialize()
parameters = sensor["parameters"]
for param in parameters:
if param[self.parameter] == 1:
val = 0.0
oldVal = 0.0
count = 0
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
oldVal = val
if val == 0:
success = 0
else:
success = 1
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
success = 0
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
# """ CONTEXT: 🌐 As this sensor is used as Tg in WBGT, humidity is just added
# because it is offering new parameter and it is not used in any calculation. """
elif param[self.parameter] == 2:
val = 0.0
oldVal = 0.0
count = 0
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
oldVal = val
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
success = False
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
if sensor[self.partNumber] == 21 and sensor["en"] == self.baseConfig["oz_enable"]:
self.tg_sensor = DS18B20()
self.tg_sensor.initialize()
parameters = sensor["parameters"]
for param in parameters:
if param[self.parameter] == 1:
val = 0.0
oldVal = 0.0
count = 0
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
oldVal = val
if val == 0:
success = 0
else:
success = 1
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
success = 0
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
if sensor[self.partNumber] == 23 and sensor["en"] == self.baseConfig["oz_enable"]:
port = 0
address = 0x77
if "gpio" in sensor:
port = sensor["gpio"]["port"]
address = int(sensor["gpio"]["address"], 16)
context_logger.info_with_context("Temp", f"Port : {port} addr: {address}")
self.bme280 = BME280(i2c_port=0, i2c_addr=address)
parameters = sensor["parameters"]
for param in parameters:
if param[self.parameter] == 1:
val = 0.0
oldVal = 0.0
count = 0
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
oldVal = val
if val == 0:
success = False
else:
success = True
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
success = False
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
elif param[self.parameter] == 2 or param[self.parameter] == 3:
val = 0.0
oldVal = 0.0
count = 0
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
oldVal = val
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
success = False
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
if sensor[self.partNumber] == 24 and sensor["en"] == self.baseConfig["oz_enable"]:
port = 0
address = 0x76
if "gpio" in sensor:
port = sensor["gpio"]["port"]
address = int(sensor["gpio"]["address"], 16)
context_logger.info_with_context("Temp", f"Port : {port} addr: {address}")
self.boxbme280 = BME280(i2c_port=0, i2c_addr=self.bme_add[self.bme_address])
parameters = sensor["parameters"]
for param in parameters:
if param[self.parameter] == 1:
val = 0.0
oldVal = 0.0
count = 0
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
oldVal = val
if val == 0:
success = False
else:
success = True
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
self.boxbme280 = None
self.bme_address = 1
self.boxbme280 = BME280(i2c_port=0, i2c_addr=self.bme_add[self.bme_address])
success = False
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
elif param[self.parameter] == 2 or param[self.parameter] == 3:
val = 0.0
oldVal = 0.0
count = 0
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
oldVal = val
if val == 0:
success = False
else:
success = True
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
success = False
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
if self.bme_address == 0:
self.bme_sensor = False
if sensor[self.partNumber] == 25 and sensor["en"] == self.baseConfig["oz_enable"]:
port = 0
self.sht_address = 0x44
if "gpio" in sensor:
port = int(sensor["gpio"]["port"])
self.sht_address = int(sensor["gpio"]["address"], 16)
context_logger.info_with_context(
"Temp",
f"Port : {port} addr: {self.sht_address}",
)
self.sht = SHT31(i2c_port=0, address=self.sht_address)
parameters = sensor["parameters"]
for param in parameters:
if param[self.parameter] == 1 or param[self.parameter] == 2 or param[self.parameter] == 4:
val = 0.0
oldVal = 0.0
count = 0
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
oldVal = val
if val == 0:
success = False
else:
success = True
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
success = False
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
if sensor[self.partNumber] == 26 and sensor["en"] == self.baseConfig["oz_enable"]:
parameters = sensor["parameters"]
port = 0
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = int(sensor["gpio"]["port"])
self.mpl1152 = MPL1152()
for param in parameters:
if param[self.parameter] == 3:
val = 0.0
count = 3
while count > 0:
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
if val > 0:
count = 0
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
success = True
else:
count = count - 1
time.sleep(2)
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
count = count - 1
success = False
if sensor[self.partNumber] == 27 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
port = 0
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = int(sensor["gpio"]["port"])
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
try:
self.sht41 = SHT41()
# self.sht41.debug = debug
for param in parameters:
if param[self.parameter] == 1:
val = 0.0
count = 3
while count > 0:
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
if val >= 0:
count = 0
oldVal = val
_val = {
"value": [val],
"oldvalue": oldVal,
"count": 1,
}
value.append(_val)
success = True
break
count = count - 1
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
count = count - 1
success = False
if not success:
_val = {"value": [0], "oldvalue": 0, "count": 1}
value.append(_val)
if param[self.parameter] == 2:
val = 0.0
count = 3
while count > 0:
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
if val >= 0:
count = 0
oldVal = val
_val = {
"value": [val],
"oldvalue": oldVal,
"count": 1,
}
value.append(_val)
success = True
break
count = count - 1
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
count = count - 1
success = False
time.sleep(2)
if not success:
_val = {"value": [0], "oldvalue": 0, "count": 1}
value.append(_val)
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
success = False
_val = {"value": [0], "oldvalue": 0, "count": 1}
value.append(_val)
# NOTE: 📝 Previously this sensor assigned as part no. 28
# if (
# sensor[self.partNumber] == 28
# and sensor["en"] == self.baseConfig["oz_enable"]
# ):
# port = 0
# debug = False
# if "gpio" in sensor:
# if "port" in sensor["gpio"]:
# port = int(sensor["gpio"]["port"])
# if "debug" in sensor["gpio"]:
# debug = True if sensor["gpio"]["debug"] == 1 else False
# self.max31875 = MAX31875(port=port)
# self.max31875.DEBUG = debug
# parameters = sensor["parameters"]
# for param in parameters:
# val = 0.0
# oldVal = 0.0
# count = 0
# try:
# val = self.getTempHum(
# sensor[self.partNumber], param[self.parameter]
# )
# oldVal = val
# if val == 0:
# success = False
# else:
# success = True
# except Exception as e:
# context_logger.error_with_context(
# "Temp",
# f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
# )
# success = False
# _val = {"value": [val], "oldvalue": oldVal, "count": 1}
# value.append(_val)
if sensor[self.partNumber] == 28 and sensor["en"] == self.baseConfig["oz_enable"]:
port = 0
address = 0x5D
if "gpio" in sensor:
port = sensor["gpio"]["port"]
address = int(sensor["gpio"]["address"], 16)
context_logger.info_with_context("Temp", f"Port : {port} addr: {address}")
self.lps25hb = LPS25HB(i2c_port=0, i2c_addr=address)
parameters = sensor["parameters"]
for param in parameters:
if param[self.parameter] == 1:
val = 0.0
oldVal = 0.0
count = 0
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
oldVal = val
if val == 0:
success = False
else:
success = True
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
success = False
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
elif param[self.parameter] == 3:
val = 0.0
oldVal = 0.0
count = 0
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
oldVal = val
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
success = False
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
if sensor[self.partNumber] == 29 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
port = 0
debug = 0
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = int(sensor["gpio"]["port"])
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
try:
self.ath20 = ATH20()
self.ath20.debug = debug
for param in parameters:
if param[self.parameter] == 1:
val = 0.0
count = 3
while count > 0:
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
if val >= 0:
count = 0
oldVal = val
_val = {
"value": [val],
"oldvalue": oldVal,
"count": 1,
}
value.append(_val)
success = True
break
count = count - 1
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
count = count - 1
success = False
if not success:
_val = {"value": [0], "oldvalue": 0, "count": 1}
value.append(_val)
if param[self.parameter] == 2:
val = 0.0
count = 3
while count > 0:
try:
val = self.getTempHum(sensor[self.partNumber], param[self.parameter])
if val >= 0:
count = 0
oldVal = val
_val = {
"value": [val],
"oldvalue": oldVal,
"count": 1,
}
value.append(_val)
success = True
break
count = count - 1
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
count = count - 1
success = False
time.sleep(2)
if not success:
_val = {"value": [0], "oldvalue": 0, "count": 1}
value.append(_val)
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to initialize sensor {sensor.get('pn', 'unknown')}: {e}",
)
success = False
_val = {"value": [0], "oldvalue": 0, "count": 1}
value.append(_val)
self.v_temphum.append(value)
return int(success)
[docs]
def getSensorReading(self, calibrationKeys: list[str] | None = None) -> dict:
"""Read current values from all initialised temperature sensors.
Applies sensitivity and correction offsets, accumulates values for
later averaging, and returns real-time data keyed by short-code.
Args:
calibrationKeys: Optional list of short-code keys to restrict
which parameters are read. If empty or None, all parameters
are read.
Returns:
Dict mapping parameter short-codes to their latest readings.
"""
data = {}
if calibrationKeys is None:
calibrationKeys = []
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
for Y, parameters in enumerate(sensor["parameters"]):
try:
# Check if this parameter index exists in v_temphum
if len(self.v_temphum[X]) <= Y:
context_logger.warning_with_context(
"Temp",
f"Parameter index [{X}][{Y}] not initialized for {parameters.get('sc', 'unknown')}",
)
continue
if len(calibrationKeys) == 0 or parameters["sc"] in calibrationKeys:
gpio.select_I2C(
sensor[self.partNumber],
sensor["pos"] if "pos" in sensor else -1,
)
time.sleep(0.5)
value = self.getTempHum(sensor[self.partNumber], parameters[self.parameter])
if value is not None:
value = round(
((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)),
2,
)
self.v_temphum[X][Y]["value"].append(value)
self.v_temphum[X][Y]["count"] += 1
data[parameters["sc"]] = value
else:
context_logger.warning_with_context(
"Temp",
f"getSensorReading: {sensor[self.partNumber]} {parameters[self.parameter]} has None value",
)
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to get sensor reading at [{X}][{Y}] - {parameters['sc']}: {e}",
)
return data
[docs]
def putSensorValue(self, value: dict, calibrationKeys: list[str] | None = None) -> dict:
"""Flush accumulated readings into the output dict and reset counters.
Optionally triggers the SHT31 heater for humidity-sensor conditioning.
Args:
value: Mutable output dict to populate with aggregated readings.
calibrationKeys: Optional list of short-code keys to restrict
which parameters are flushed.
Returns:
The updated output dict with lists of accumulated values per
short-code.
"""
if calibrationKeys is None:
calibrationKeys = []
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
if sensor["pn"] == 25:
gpio.select_I2C(
sensor[self.partNumber],
sensor["pos"] if "pos" in sensor else -1,
)
self.sht.start_heater()
for Y, parameters in enumerate(sensor["parameters"]):
try:
if len(calibrationKeys) == 0 or parameters["sc"] in calibrationKeys:
value[parameters["sc"]] = self.v_temphum[X][Y]["value"]
self.v_temphum[X][Y]["value"] = []
self.v_temphum[X][Y]["count"] = 0
except Exception as e:
context_logger.error_with_context(
"Temp",
f"Failed to put sensor value at [{X}][{Y}] - {parameters['sc']}: {e}",
)
return value
[docs]
def putInitvalues(self, value: json) -> json:
"""Record each sensor's init status into the shared init-value dict.
Args:
value: Mutable dict updated in-place with a ``"temp"`` key
containing a list of per-sensor init statuses.
Returns:
The updated init-value dict.
"""
_sensor_init = []
for sensor in self.configuration:
_sensor_init.append(sensor["init"])
value.update({"temp": _sensor_init})
return value
[docs]
def getTempHum(self, partNo: int, pm: int) -> float:
"""Dispatch a reading request to the correct measurement sub-method.
Args:
partNo: Hardware part number identifying the sensor model.
pm: Parameter identifier (1=temp, 2=humidity, 3=pressure,
4=dew point).
Returns:
The measured value rounded to two decimal places.
"""
value = 0.0
if pm == 1:
value = self.getTemp(partNo)
elif pm == 2:
value = self.getHum(partNo)
elif pm == 3:
value = self.getPressure(partNo)
elif pm == 4:
value = self.getDewPoint(partNo)
# if partNo == 28:
# value = self.getMAXTemp(pm)
return round(value, 2)
[docs]
def getMAXTemp(self, pm: int) -> float:
"""Read temperature from the MAX31875 sensor (currently unused).
Args:
pm: Parameter identifier used to derive the I2C channel.
Returns:
Temperature in degrees Celsius, rounded to two decimal places.
"""
try:
ch = pm - 7
value = self.max31875.get_temperature(channel=ch)
context_logger.info_with_context("Temp", f"MAX31875's temp at Channel {ch} : {value}")
except Exception as e:
context_logger.error_with_context("Temp", f"Failed to get MAX31875's temp at Channel {ch}: {e}")
value = 0.0
return round(value, 2)
[docs]
def getTemp(self, partNo: int) -> float | Literal[0]:
"""Read temperature from the sensor identified by part number.
Args:
partNo: Hardware part number selecting the driver to query.
Returns:
Temperature in degrees Celsius, or 0 on failure.
"""
if partNo == 21:
value = self.tg_sensor.get_temperature()
value = round(value, 2)
context_logger.info_with_context("Temp", f"DS18B20's temp at Part {partNo} : {value}")
return value
if partNo == 22:
value = self.es12248_sensor.get_temperature()
value = round(value, 2)
context_logger.info_with_context("Temp", f"ES12248's temp at Part {partNo} : {value}")
return value
if partNo == 23:
# value = 23.01
value = self.bme280.get_temperature()
value = round(value, 2)
context_logger.info_with_context("Temp", f"BME's temp at Part {partNo} : {value}")
return value
if partNo == 24:
value = 0.0
try:
value = self.boxbme280.get_temperature()
value = round(value, 2)
context_logger.info_with_context("Temp", f"BOX-BME's temp at Part {partNo} : {value}")
except Exception as e:
context_logger.error_with_context("Temp", f"Failed to get BOX-BME's temp at Part {partNo}: {e}")
if self.bme_sensor:
self.boxbme280 = None
if self.bme_address == 0:
self.bme_address = 1
else:
self.bme_address = 0
self.boxbme280 = BME280(i2c_port=0, i2c_addr=self.bme_add[self.bme_address])
return value
if partNo == 25:
try:
value = self.sht.get_temperature()
context_logger.info_with_context("Temp", f"SHT's temp at Part {partNo} : {value}")
return value
except Exception as e:
context_logger.error_with_context("Temp", f"Failed to get SHT's temp at Part {partNo}: {e}")
return 0.00
if partNo == 27:
try:
temp = self.sht41.getTemp()["temp"]
context_logger.info_with_context("Temp", f"SHT41's temp at Part {partNo} : {temp}")
except Exception as e:
context_logger.error_with_context("Temp", f"Failed to get SHT41's temp at Part {partNo}: {e}")
temp = 0.00
return round(temp, 2)
if partNo == 28:
try:
value = self.lps25hb.get_temperature()
value = round(value, 2)
context_logger.info_with_context("Temp", f"LPS25HB's temp at Part {partNo} : {value}")
return value
except Exception as e:
context_logger.error_with_context("Temp", f"Failed to get LPS25HB's temp at Part {partNo}: {e}")
return 0.00
if partNo == 29:
try:
temp = self.ath20.getTemp()["temp"]
context_logger.info_with_context("Temp", f"ATH20's temp at Part {partNo} : {temp}")
except Exception as e:
context_logger.error_with_context("Temp", f"Failed to get ATH20's temp at Part {partNo}: {e}")
temp = 0.00
return round(temp, 2)
return 0.0
[docs]
def getHum(self, partNo: int) -> float | Literal[0]:
"""Read relative humidity from the sensor identified by part number.
Args:
partNo: Hardware part number selecting the driver to query.
Returns:
Relative humidity in percent, or 0 on failure.
"""
if partNo == 22:
# value = 55.01
value = self.es12248_sensor.get_humidity()
value = round(value, 2)
context_logger.info_with_context("Hum", f"ES12248's humidity at Part {partNo} : {value}")
return value
if partNo == 23:
# value = 55.01
value = self.bme280.get_humidity()
value = round(value, 2)
context_logger.info_with_context("Hum", f"BME's humidity at Part {partNo} : {value}")
return value
if partNo == 24:
try:
# value = 55.01
value = self.boxbme280.get_humidity()
value = round(value, 2)
context_logger.info_with_context("Hum", f"BOX-BME's humidity at Part {partNo} : {value}")
return value
except Exception as e:
context_logger.error_with_context("Hum", f"Failed to get BOX-BME's humidity at Part {partNo}: {e}")
return 0
if partNo == 25:
try:
humidity = self.sht.get_humidity()
context_logger.info_with_context("Hum", f"SHT's humidity at Part {partNo} : {humidity}")
return humidity
except Exception as e:
context_logger.error_with_context("Hum", f"Failed to get SHT's humidity at Part {partNo}: {e}")
return 0
if partNo == 27:
try:
humidity = self.sht41.getHumidity()["hum"]
context_logger.info_with_context("Hum", f"SHT41's humidity at Part {partNo} : {humidity}")
except Exception as e:
context_logger.error_with_context("Hum", f"Failed to get SHT41's humidity at Part {partNo}: {e}")
humidity = 0.00
return round(humidity, 2)
if partNo == 29:
try:
humidity = self.ath20.getHum()["hum"]
context_logger.info_with_context("Hum", f"ATH20's humidity at Part {partNo} : {humidity}")
except Exception as e:
context_logger.error_with_context("Hum", f"Failed to get ATH20's humidity at Part {partNo}: {e}")
humidity = 0.00
return round(humidity, 2)
return 0.0
[docs]
def getPressure(self, partNo: int) -> float | Literal[0]:
"""Read barometric pressure from the sensor identified by part number.
Args:
partNo: Hardware part number selecting the driver to query.
Returns:
Pressure in hPa, or 0 on failure.
"""
if partNo == 23:
# value = 1000.1
value = self.bme280.get_pressure()
value = round(value, 2)
context_logger.info_with_context("Pressure", f"BME's pressure at Part {partNo} : {value}")
return value
if partNo == 24:
try:
# value = 1000.1
value = self.boxbme280.get_pressure()
value = round(value, 2)
context_logger.info_with_context("Pressure", f"BOX-BME's pressure at Part {partNo} : {value}")
return value
except Exception as e:
context_logger.error_with_context(
"Pressure",
f"Failed to get BOX-BME's pressure at Part {partNo}: {e}",
)
return 0
if partNo == 26:
# value = 1000.1
value = self.mpl1152.getPressure()
value = round(value, 2)
context_logger.info_with_context("Pressure", f"MPL1152's pressure at Part {partNo} : {value}")
return value
if partNo == 28:
try:
# value = 1000.1
value = self.lps25hb.get_pressure()
value = round(value, 2)
context_logger.info_with_context("Pressure", f"LPS25HB's pressure at Part {partNo} : {value}")
return value
except Exception as e:
context_logger.error_with_context(
"Pressure",
f"Failed to get LPS25HB's pressure at Part {partNo}: {e}",
)
return 0
return 0
# no pressure in sht
[docs]
def getDewPoint(self, partNo: int) -> float | Literal[0]:
"""Calculate dew point using the Magnus-Tetens formula.
Requires temperature and humidity from an SHT31 sensor (part 25).
Args:
partNo: Hardware part number (only 25 is supported).
Returns:
Dew point in degrees Celsius, or 0 on failure.
"""
if partNo == 25:
try:
cTemp = self.sht.get_temperature()
humidity = self.sht.get_humidity()
# According to Magnus-Tetens formula
emperical_a = 17.27
emperical_b = 237.7
intermidiate_variable = log(humidity / 100) + ((emperical_a * cTemp) / (emperical_b + cTemp))
dewPoint = (emperical_b * intermidiate_variable) / (emperical_a - intermidiate_variable)
context_logger.info_with_context("DewPoint", f"SHT's dew point at Part {partNo} : {dewPoint}")
return dewPoint
except Exception as e:
context_logger.error_with_context("DewPoint", f"Failed to get SHT's dew point at Part {partNo}: {e}")
return 0
# Example code
# python3 -m OzWrapper.OzTemp.OzTemp
if __name__ == "__main__":
import os
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, "temp.config.json")
with open(file_name) as tempConfigFile:
temp = tempConfigFile.read()
tempConfig = json.loads(temp)
context_logger.info_with_context("TempConfig", f"Loaded temp config: {tempConfig}")
temp = OzTemp()
temp.initialize(tempConfig["temp"], init_value={})
for _ in range(0, 4):
temp.getSensorReading()
data = {}
context_logger.info_with_context("SensorData", f"Put sensor value result: {temp.putSensorValue(data)}")