Source code for drivers.SHT41.SHT41

"""Driver for the Sensirion SHT41 temperature and humidity sensor.

Provides low-level I2C communication with the SHT4x series digital humidity
and temperature sensors using raw ``smbus2`` commands. The driver handles
CRC-8 verification of measurement data, multiple precision/heater modes,
and provides both a low-level driver (:class:`SHT41_driver`) and a
high-level wrapper (:class:`SHT41`) with a simplified API.

The SHT4x series supports multiple measurement modes combining different
precision levels with optional on-chip heater activation for condensation
removal.

Hardware:
    - Interface: I2C
    - Default address: 0x44
    - Supply: 3.3V
    - Accuracy: +/-0.2C temperature, +/-1.8% RH humidity (high precision)
    - Features: Configurable heater (high/med/low, 100ms/1s pulses)

Typical usage::

    sensor = SHT41(port=0)
    data = sensor.getTemp()
    print(f"Temperature: {data['temp']}C, Humidity: {data['hum']}%")

Note:
    Requires ``smbus2`` for raw I2C access. CRC-8 verification uses
    polynomial 0x31 (CRC-8/NRSC) as specified in the Sensirion datasheet.
"""

import struct
import time
from typing import ClassVar

import smbus2
from smbus2 import i2c_msg

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)

DEBUG = False


[docs] class SHT41_driver: """Low-level I2C driver for the Sensirion SHT4x sensor. Handles raw I2C byte-level communication, CRC-8 verification, and sensor mode configuration. Supports all SHT4x measurement modes including no-heat, high-heat, medium-heat, and low-heat variants at different pulse durations. Attributes: SHT41_ADDR: I2C slave address (0x44). current_mode: Currently active measurement mode command byte. """ SHT41_ADDR = 0x44 # SHT4X I2C Address SHT41_READSERIAL = 0x89 # Read Out of Serial Register SHT41_SOFTRESET = 0x94 # Soft Reset # Measurement modes NOHEAT_HIGHPRECISION = 0xFD NOHEAT_MEDPRECISION = 0xF6 NOHEAT_LOWPRECISION = 0xE0 HIGHHEAT_1S = 0x39 HIGHHEAT_100MS = 0x32 MEDHEAT_1S = 0x2F MEDHEAT_100MS = 0x24 LOWHEAT_1S = 0x1E LOWHEAT_100MS = 0x15 current_mode = NOHEAT_HIGHPRECISION
[docs] def __init__(self, port: int = 0) -> None: # gpio.select_I2C(27) self.port = port self.bus = smbus2.SMBus(self.port) time.sleep(1) self.buffer = bytearray(6) context_logger.debug_with_context("SHT41", f"Initialized on port {self.port}")
[docs] def begin(self) -> bool: if not self.reset(): context_logger.error_with_context("SHT41", "Begin failed during reset") return False self.set_mode(self.NOHEAT_HIGHPRECISION) context_logger.debug_with_context("SHT41", "Set mode to NOHEAT_HIGHPRECISION") self.current_mode = self.NOHEAT_HIGHPRECISION context_logger.debug_with_context("SHT41", "Begin complete!") return True
[docs] def reset(self) -> bool: try: self.write_byte_data(self.SHT41_SOFTRESET) time.sleep(0.01) return True except Exception as e: context_logger.error_with_context("SHT41", f"Exception in reset: {e}") return False
[docs] def get_serial_number(self) -> int: """The unique 32-bit serial number""" try: self.write_byte_data(self.SHT41_READSERIAL) time.sleep(0.1) self.read_block_data(self.buffer) ser1 = self.buffer[0:2] ser1_crc = self.buffer[2] ser2 = self.buffer[3:5] ser2_crc = self.buffer[5] # Check CRC of bytes if ser1_crc != self.crc8(ser1) or ser2_crc != self.crc8(ser2): context_logger.error_with_context("SHT41", "CRC check failed") serial = (ser1[0] << 24) + (ser1[1] << 16) + (ser2[0] << 8) + ser2[1] context_logger.debug_with_context("SHT41", f"Serial number: {serial}") return serial except Exception as e: context_logger.error_with_context("SHT41", f"Exception in get_serial_number: {e}") return None
[docs] def get_mode(self) -> int: return self.current_mode
[docs] def set_mode(self, mode: int) -> None: context_logger.debug_with_context("SHT41", f"Setting mode to {hex(mode)}") self.current_mode = mode
[docs] def get_measurements(self) -> tuple[float, float]: """Both `temperature` and `relative_humidity`, read simultaneously""" try: command = self.current_mode self.write_byte_data(command) time.sleep(1.5) self.read_block_data(self.buffer) # Separate the read data temp_data = self.buffer[0:2] temp_crc = self.buffer[2] humidity_data = self.buffer[3:5] humidity_crc = self.buffer[5] # Check CRC of bytes if temp_crc != self.crc8(temp_data) or humidity_crc != self.crc8(humidity_data): context_logger.error_with_context("SHT41", "CRC check failed") # Decode data into human values: temperature = struct.unpack_from(">H", temp_data)[0] # >= temperature = -45.0 + 175.0 * temperature / 65535.0 humidity = struct.unpack_from(">H", humidity_data)[0] humidity = -6.0 + 125.0 * humidity / 65535.0 humidity = max(min(humidity, 100), 0) context_logger.debug_with_context("SHT41", f"Temperature: {temperature} °C, Humidity: {humidity}%") return temperature, humidity except Exception as e: context_logger.error_with_context("SHT41", f"Exception in get_measurements: {e}") return 0, 0
[docs] def crc8(self, buffer: bytearray) -> int: """Verify the crc8 checksum""" crc = 0xFF for byte in buffer: crc ^= byte for _ in range(8): if crc & 0x80: crc = (crc << 1) ^ 0x31 else: crc <<= 1 crc &= 0xFF # Return the bottom 8 bits context_logger.debug_with_context("SHT41", f"CRC8 calculation result: {crc}") return crc
[docs] def write_byte_data(self, value: int) -> None: try: context_logger.debug_with_context("SHT41", f"Writing byte: {hex(value)}") self.bus.write_byte(self.SHT41_ADDR, value) except Exception as e: context_logger.error_with_context("SHT41", f"Exception in write_byte_data: {e}")
[docs] def read_byte(self) -> int | None: try: byte = self.bus.read_byte(self.SHT41_ADDR) context_logger.debug_with_context("SHT41", f"Read byte: {byte}") return byte except Exception as e: context_logger.error_with_context("SHT41", f"Exception in read_byte: {e}") return None
[docs] def read_block_data(self, buffer: bytearray) -> bytearray | None: """Read data byte-by-byte into the provided buffer using read_byte.""" try: # Prepare a message to read 6 bytes of data msg = i2c_msg.read(self.SHT41_ADDR, 6) # Execute the read operation self.bus.i2c_rdwr(msg) # Extract the read data from the msg object and populate the passed buffer read_data = list(msg) del msg if len(read_data) != 6: context_logger.error_with_context("SHT41", f"Expected 6 bytes, but got {len(read_data)} bytes") return None buffer[:] = read_data return buffer except Exception as e: context_logger.error_with_context("SHT41", f"Exception in read_block_data: {e}") return None
[docs] class SHT41: """High-level wrapper for the SHT41 temperature and humidity sensor. Provides a simplified API compatible with the OzTemp wrapper pattern. Wraps :class:`SHT41_driver` and returns measurement data as dictionaries with ``"temp"`` and ``"hum"`` keys. Attributes: sht41: Underlying :class:`SHT41_driver` instance. data: Last measurement result dictionary. """ sht41: SHT41_driver | None = None debug = False data: ClassVar[dict[str, float]] = {"temp": 0.0, "hum": 0.0}
[docs] def __init__(self, port: int = 0) -> None: self.port = port context_logger.info_with_context("SHT41", "Initializing SHT41") self.sht41 = SHT41_driver(self.port) if not self.sht41.begin(): context_logger.error_with_context("SHT41", "Begin Failed") time.sleep(1)
[docs] def getTemp(self) -> dict[str, float]: self.data = {"temp": 0.0, "hum": 0.0} try: self.data["temp"], self.data["hum"] = self.sht41.get_measurements() except Exception as e: context_logger.error_with_context("SHT41", f"Exception in getTemp: {e}") return self.data
[docs] def getHumidity(self, new_measurement: bool = False) -> dict[str, float]: if new_measurement: self.data = {"temp": 0.0, "hum": 0.0} try: self.data["temp"], self.data["hum"] = self.sht41.get_measurements() except Exception as e: context_logger.error_with_context("SHT41", f"Exception in getHumidity: {e}") return self.data
if __name__ == "__main__": sht41 = SHT41() while True: try: temp = sht41.getTemp()["temp"] context_logger.info_with_context("SHT41", f"Temperature: {temp}") hum = sht41.getHumidity()["hum"] context_logger.info_with_context("SHT41", f"Humidity: {hum}") except Exception as e: context_logger.error_with_context("SHT41", f"Exception in main loop: {e}") print("\n\n") time.sleep(3)