"""Driver for the Sensirion SHT41 temperature and humidity sensor.
Provides low-level I2C communication with the SHT4x series digital humidity
and temperature sensors using raw ``smbus2`` commands. The driver handles
CRC-8 verification of measurement data, multiple precision/heater modes,
and provides both a low-level driver (:class:`SHT41_driver`) and a
high-level wrapper (:class:`SHT41`) with a simplified API.
The SHT4x series supports multiple measurement modes combining different
precision levels with optional on-chip heater activation for condensation
removal.
Hardware:
- Interface: I2C
- Default address: 0x44
- Supply: 3.3V
- Accuracy: +/-0.2C temperature, +/-1.8% RH humidity (high precision)
- Features: Configurable heater (high/med/low, 100ms/1s pulses)
Typical usage::
sensor = SHT41(port=0)
data = sensor.getTemp()
print(f"Temperature: {data['temp']}C, Humidity: {data['hum']}%")
Note:
Requires ``smbus2`` for raw I2C access. CRC-8 verification uses
polynomial 0x31 (CRC-8/NRSC) as specified in the Sensirion datasheet.
"""
import struct
import time
from typing import ClassVar
import smbus2
from smbus2 import i2c_msg
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
DEBUG = False
[docs]
class SHT41_driver:
"""Low-level I2C driver for the Sensirion SHT4x sensor.
Handles raw I2C byte-level communication, CRC-8 verification, and
sensor mode configuration. Supports all SHT4x measurement modes
including no-heat, high-heat, medium-heat, and low-heat variants
at different pulse durations.
Attributes:
SHT41_ADDR: I2C slave address (0x44).
current_mode: Currently active measurement mode command byte.
"""
SHT41_ADDR = 0x44 # SHT4X I2C Address
SHT41_READSERIAL = 0x89 # Read Out of Serial Register
SHT41_SOFTRESET = 0x94 # Soft Reset
# Measurement modes
NOHEAT_HIGHPRECISION = 0xFD
NOHEAT_MEDPRECISION = 0xF6
NOHEAT_LOWPRECISION = 0xE0
HIGHHEAT_1S = 0x39
HIGHHEAT_100MS = 0x32
MEDHEAT_1S = 0x2F
MEDHEAT_100MS = 0x24
LOWHEAT_1S = 0x1E
LOWHEAT_100MS = 0x15
current_mode = NOHEAT_HIGHPRECISION
[docs]
def __init__(self, port: int = 0) -> None:
# gpio.select_I2C(27)
self.port = port
self.bus = smbus2.SMBus(self.port)
time.sleep(1)
self.buffer = bytearray(6)
context_logger.debug_with_context("SHT41", f"Initialized on port {self.port}")
[docs]
def begin(self) -> bool:
if not self.reset():
context_logger.error_with_context("SHT41", "Begin failed during reset")
return False
self.set_mode(self.NOHEAT_HIGHPRECISION)
context_logger.debug_with_context("SHT41", "Set mode to NOHEAT_HIGHPRECISION")
self.current_mode = self.NOHEAT_HIGHPRECISION
context_logger.debug_with_context("SHT41", "Begin complete!")
return True
[docs]
def reset(self) -> bool:
try:
self.write_byte_data(self.SHT41_SOFTRESET)
time.sleep(0.01)
return True
except Exception as e:
context_logger.error_with_context("SHT41", f"Exception in reset: {e}")
return False
[docs]
def get_serial_number(self) -> int:
"""The unique 32-bit serial number"""
try:
self.write_byte_data(self.SHT41_READSERIAL)
time.sleep(0.1)
self.read_block_data(self.buffer)
ser1 = self.buffer[0:2]
ser1_crc = self.buffer[2]
ser2 = self.buffer[3:5]
ser2_crc = self.buffer[5]
# Check CRC of bytes
if ser1_crc != self.crc8(ser1) or ser2_crc != self.crc8(ser2):
context_logger.error_with_context("SHT41", "CRC check failed")
serial = (ser1[0] << 24) + (ser1[1] << 16) + (ser2[0] << 8) + ser2[1]
context_logger.debug_with_context("SHT41", f"Serial number: {serial}")
return serial
except Exception as e:
context_logger.error_with_context("SHT41", f"Exception in get_serial_number: {e}")
return None
[docs]
def get_mode(self) -> int:
return self.current_mode
[docs]
def set_mode(self, mode: int) -> None:
context_logger.debug_with_context("SHT41", f"Setting mode to {hex(mode)}")
self.current_mode = mode
[docs]
def get_measurements(self) -> tuple[float, float]:
"""Both `temperature` and `relative_humidity`, read simultaneously"""
try:
command = self.current_mode
self.write_byte_data(command)
time.sleep(1.5)
self.read_block_data(self.buffer)
# Separate the read data
temp_data = self.buffer[0:2]
temp_crc = self.buffer[2]
humidity_data = self.buffer[3:5]
humidity_crc = self.buffer[5]
# Check CRC of bytes
if temp_crc != self.crc8(temp_data) or humidity_crc != self.crc8(humidity_data):
context_logger.error_with_context("SHT41", "CRC check failed")
# Decode data into human values:
temperature = struct.unpack_from(">H", temp_data)[0] # >=
temperature = -45.0 + 175.0 * temperature / 65535.0
humidity = struct.unpack_from(">H", humidity_data)[0]
humidity = -6.0 + 125.0 * humidity / 65535.0
humidity = max(min(humidity, 100), 0)
context_logger.debug_with_context("SHT41", f"Temperature: {temperature} °C, Humidity: {humidity}%")
return temperature, humidity
except Exception as e:
context_logger.error_with_context("SHT41", f"Exception in get_measurements: {e}")
return 0, 0
[docs]
def crc8(self, buffer: bytearray) -> int:
"""Verify the crc8 checksum"""
crc = 0xFF
for byte in buffer:
crc ^= byte
for _ in range(8):
if crc & 0x80:
crc = (crc << 1) ^ 0x31
else:
crc <<= 1
crc &= 0xFF # Return the bottom 8 bits
context_logger.debug_with_context("SHT41", f"CRC8 calculation result: {crc}")
return crc
[docs]
def write_byte_data(self, value: int) -> None:
try:
context_logger.debug_with_context("SHT41", f"Writing byte: {hex(value)}")
self.bus.write_byte(self.SHT41_ADDR, value)
except Exception as e:
context_logger.error_with_context("SHT41", f"Exception in write_byte_data: {e}")
[docs]
def read_byte(self) -> int | None:
try:
byte = self.bus.read_byte(self.SHT41_ADDR)
context_logger.debug_with_context("SHT41", f"Read byte: {byte}")
return byte
except Exception as e:
context_logger.error_with_context("SHT41", f"Exception in read_byte: {e}")
return None
[docs]
def read_block_data(self, buffer: bytearray) -> bytearray | None:
"""Read data byte-by-byte into the provided buffer using read_byte."""
try:
# Prepare a message to read 6 bytes of data
msg = i2c_msg.read(self.SHT41_ADDR, 6)
# Execute the read operation
self.bus.i2c_rdwr(msg)
# Extract the read data from the msg object and populate the passed buffer
read_data = list(msg)
del msg
if len(read_data) != 6:
context_logger.error_with_context("SHT41", f"Expected 6 bytes, but got {len(read_data)} bytes")
return None
buffer[:] = read_data
return buffer
except Exception as e:
context_logger.error_with_context("SHT41", f"Exception in read_block_data: {e}")
return None
[docs]
class SHT41:
"""High-level wrapper for the SHT41 temperature and humidity sensor.
Provides a simplified API compatible with the OzTemp wrapper pattern.
Wraps :class:`SHT41_driver` and returns measurement data as dictionaries
with ``"temp"`` and ``"hum"`` keys.
Attributes:
sht41: Underlying :class:`SHT41_driver` instance.
data: Last measurement result dictionary.
"""
sht41: SHT41_driver | None = None
debug = False
data: ClassVar[dict[str, float]] = {"temp": 0.0, "hum": 0.0}
[docs]
def __init__(self, port: int = 0) -> None:
self.port = port
context_logger.info_with_context("SHT41", "Initializing SHT41")
self.sht41 = SHT41_driver(self.port)
if not self.sht41.begin():
context_logger.error_with_context("SHT41", "Begin Failed")
time.sleep(1)
[docs]
def getTemp(self) -> dict[str, float]:
self.data = {"temp": 0.0, "hum": 0.0}
try:
self.data["temp"], self.data["hum"] = self.sht41.get_measurements()
except Exception as e:
context_logger.error_with_context("SHT41", f"Exception in getTemp: {e}")
return self.data
[docs]
def getHumidity(self, new_measurement: bool = False) -> dict[str, float]:
if new_measurement:
self.data = {"temp": 0.0, "hum": 0.0}
try:
self.data["temp"], self.data["hum"] = self.sht41.get_measurements()
except Exception as e:
context_logger.error_with_context("SHT41", f"Exception in getHumidity: {e}")
return self.data
if __name__ == "__main__":
sht41 = SHT41()
while True:
try:
temp = sht41.getTemp()["temp"]
context_logger.info_with_context("SHT41", f"Temperature: {temp}")
hum = sht41.getHumidity()["hum"]
context_logger.info_with_context("SHT41", f"Humidity: {hum}")
except Exception as e:
context_logger.error_with_context("SHT41", f"Exception in main loop: {e}")
print("\n\n")
time.sleep(3)