"""Driver for the ASAIR ATH20 (AHT20) temperature and humidity sensor.
Provides I2C communication with the ATH20 capacitive humidity and temperature
sensor. The driver handles initialization, soft reset, CRC-verified
measurement readout, and automatic reinitialization with MCP23017-controlled
power cycling on persistent I2C bus errors.
The ATH20 returns 20-bit raw values for both temperature and humidity,
which are converted to physical units using the formulas from the datasheet.
Hardware:
- Interface: I2C
- Default address: 0x38
- Supply: 3.3V
- Measurement time: ~80 ms per cycle
- Accuracy: +/-0.3C temperature, +/-2% RH humidity
Typical usage::
sensor = ATH20(port=0)
data = sensor.getTemp()
print(f"Temperature: {data['temp']}C, Humidity: {data['hum']}%")
Note:
Requires ``smbus2`` and I2C bus access. Uses MCP23017 GPIO expander
for 3.3V power rail reset on unrecoverable I2C errors.
"""
import os
import time
from typing import ClassVar
from smbus2 import SMBus
from drivers.MCP230XX import MCP230XX
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class ATH20:
"""I2C driver for the ASAIR ATH20 (AHT20) temperature and humidity sensor.
Manages I2C communication, sensor initialization, calibration verification,
and automatic recovery from bus errors via MCP23017-controlled power cycling.
Implements a retry mechanism with configurable maximum attempts.
Attributes:
ATH20_ADDR: I2C slave address of the sensor (0x38).
debug: Enable verbose debug logging of I2C transactions.
data: Last measurement result with ``"temp"`` and ``"hum"`` keys.
i2c_error: ``True`` when the sensor has experienced an I2C error
requiring reinitialization.
"""
ATH20_ADDR = 0x38
ATH20_SOFTRESET = 0xBA
ATH20_INITIALIZE: ClassVar[list[int]] = [0xBE, 0x08, 0x00]
AHT20_MEASURE: ClassVar[list[int]] = [0xAC, 0x33, 0x00]
ATH20_STATUS: ClassVar[int] = 0x71
ATH20_BUSY_BIT: ClassVar[int] = 0x80
ATH20_CALIBRATION_BIT: ClassVar[int] = 0x08
debug: ClassVar[bool] = False
data: ClassVar[dict[str, float]] = {"temp": 0.0, "hum": 0.0}
[docs]
def __init__(self, port: int = 0, max_retries: int = 3) -> None:
self.port = port
self.max_retries = max_retries
self.bus = None
self.i2c_error = False
self.buffer = bytearray(6)
self.mcp230XX = MCP230XX(devicenumber=os.getenv("MCP_ID", 6)) # Initialize power control
context_logger.debug_with_context("ATH20", f"Buffer initialized: {self.buffer}")
self._initialize_sensor()
def _initialize_sensor(self) -> bool:
"""Internal method to initialize/reinitialize the sensor."""
try:
# Create SMBus object (clear any stale state)
self.bus = SMBus(self.port)
time.sleep(0.5)
# Perform sensor reset and initialization
if not self.reset():
context_logger.warning_with_context("ATH20", "Soft reset failed")
return False
# Try to initialize up to 3 times
count = 0
while count < 3:
if self.begin():
context_logger.info_with_context("ATH20", f"ATH20 initialized on port {self.port}")
return True
count += 1
time.sleep(1)
context_logger.error_with_context("ATH20", "Initialization Failed!")
return False
except (OSError, TimeoutError, ConnectionError, ValueError) as e:
# I/O errors or device not found - reset power rail
context_logger.warning_with_context(
"ATH20",
f"I/O error during ATH20 initialization: {e}",
)
try:
self.mcp230XX.power_3v3_rst()
time.sleep(1) # Wait for sensor to reset
context_logger.warning_with_context("ATH20", "3.3V power rail reset during initialization")
except Exception as reset_error:
context_logger.error_with_context("ATH20", f"Failed to reset power rail: {reset_error}")
self.bus = None
return False
except Exception as e:
context_logger.error_with_context("ATH20", f"Failed to initialize ATH20 sensor. Error: {e}")
self.bus = None
return False
[docs]
def reinitialize(self) -> bool:
"""
Public method to reinitialize the sensor.
Returns True if successful, False otherwise.
"""
try:
context_logger.warning_with_context("ATH20", "Reinitializing ATH20 sensor due to I/O errors...")
# Nullify references to allow garbage collection
self.bus = None
time.sleep(1)
return self._initialize_sensor()
except Exception as e:
context_logger.error_with_context("ATH20", f"Reinitialization failed: {e}")
return False
[docs]
def begin(self) -> bool:
"""Initialize the sensor after reset."""
try:
context_logger.debug_with_context("ATH20", "Initializing ATH20...")
self.write_block_data(self.ATH20_INITIALIZE)
time.sleep(1)
while self.get_busy_status():
time.sleep(0.5)
if not self.get_calibration_status():
return False
return True
except Exception as e:
context_logger.error_with_context("ATH20", f"Begin failed: {e}")
return False
[docs]
def get_busy_status(self) -> bool:
"""Check if sensor is busy."""
try:
response = 0
response = self.read_data()
if self.debug:
context_logger.debug_with_context("ATH20", f"Busy Status: {hex(response)}")
if not response & self.ATH20_BUSY_BIT:
time.sleep(0.040)
return False
return True
except Exception as e:
context_logger.error_with_context("ATH20", f"Get busy status failed: {e}")
return False
[docs]
def get_calibration_status(self) -> bool:
"""Check if sensor is calibrated."""
try:
response = 0
response = self.read_data()
if self.debug:
context_logger.debug_with_context("ATH20", f"Calibration Status: {hex(response)}")
if not response & self.ATH20_CALIBRATION_BIT:
time.sleep(0.040)
return False
return True
except Exception as e:
context_logger.error_with_context("ATH20", f"Get calibration status failed: {e}")
return False
[docs]
def reset(self) -> bool:
"""Soft reset the sensor."""
try:
context_logger.debug_with_context("ATH20", "Resetting ATH20...")
if self.write_data(self.ATH20_SOFTRESET):
time.sleep(0.04)
return True
return False
except Exception as e:
context_logger.error_with_context("ATH20", f"Reset failed: {e}")
return False
def _get_measurement(self) -> tuple[float, float]:
"""Internal method to get raw measurement data."""
temp_data = 0.0
hum_data = 0.0
self.write_block_data(self.AHT20_MEASURE)
time.sleep(0.80)
data = self.read_block_data()
if len(data) > 5:
temp_data = ((data[3] & 0xF) << 16) | (data[4] << 8) | data[5]
temp_data = temp_data / (pow(2, 20)) * 200 - 50
hum_data = (data[1] << 12) | (data[2] << 4) | (data[3] >> 4)
hum_data = hum_data * 100 / pow(2, 20)
if self.debug:
context_logger.debug_with_context("ATH20", f"Temperature: {temp_data} and Humidity: {hum_data}")
return round(temp_data, 2), round(hum_data, 2)
[docs]
def get_temperature(self) -> float:
"""
Get temperature with automatic reinitialization on I/O errors.
"""
for attempt in range(self.max_retries):
try:
temp, _ = self._get_measurement()
context_logger.debug_with_context("ATH20", f"Temp value retrieved: {temp}")
if temp == 0.0:
msg = "Sensor returned zero temperature reading"
raise ValueError(msg)
return temp
except (RuntimeError, ValueError, AttributeError) as e:
self.i2c_error = True
context_logger.warning_with_context(
"ATH20",
f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries - 1:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("ATH20", "Reinitialization failed, aborting retries")
break
except (OSError, TimeoutError, ConnectionError) as e:
self.mcp230XX.power_3v3_rst()
time.sleep(1) # Wait for sensor to reset
context_logger.warning_with_context("ATH20", "3.3V power reset")
self.i2c_error = True
context_logger.warning_with_context(
"ATH20",
f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries - 1:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("ATH20", "Reinitialization failed, aborting retries")
break
except Exception as e:
self.i2c_error = True
context_logger.error_with_context(
"ATH20",
f"Unexpected error retrieving temperature from ATH20: {e}",
)
break
return 0.0
[docs]
def get_humidity(self) -> float:
"""
Get humidity with automatic reinitialization on I/O errors.
"""
for attempt in range(self.max_retries):
try:
_, hum = self._get_measurement()
context_logger.debug_with_context("ATH20", f"Humidity value retrieved: {hum}")
if hum == 0.0:
msg = "Sensor returned zero humidity reading"
raise ValueError(msg)
return hum
except (RuntimeError, ValueError, AttributeError) as e:
self.i2c_error = True
context_logger.warning_with_context(
"ATH20",
f"Failed to retrieve humidity (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries - 1:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("ATH20", "Reinitialization failed, aborting retries")
break
except (OSError, TimeoutError, ConnectionError) as e:
self.mcp230XX.power_3v3_rst()
time.sleep(1) # Wait for sensor to reset
context_logger.warning_with_context("ATH20", "3.3V power reset")
self.i2c_error = True
context_logger.warning_with_context(
"ATH20",
f"Failed to retrieve humidity (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries - 1:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("ATH20", "Reinitialization failed, aborting retries")
break
except Exception as e:
self.i2c_error = True
context_logger.error_with_context("ATH20", f"Unexpected error retrieving humidity from ATH20: {e}")
break
return 0.0
[docs]
def read_data(self) -> int:
"""Read a single byte from the sensor."""
if self.bus is None:
return 0
response = 0
try:
response = self.bus.read_byte(self.ATH20_ADDR)
if self.debug:
context_logger.debug_with_context("ATH20", f"Reading Byte data: {hex(response)}")
return response
except Exception as e:
context_logger.error_with_context("ATH20", f"Exception in read_data: {e}")
raise
[docs]
def write_data(self, value: int) -> bool:
"""Write a single byte to the sensor."""
if self.bus is None:
return False
try:
if self.debug:
context_logger.debug_with_context("ATH20", f"Writing Byte data: {value}")
self.bus.write_byte(self.ATH20_ADDR, value)
return True
except Exception as e:
context_logger.error_with_context("ATH20", f"Exception in write_data: {e}")
raise
[docs]
def write_block_data(self, value: list[int]) -> bool:
"""Write a block of data to the sensor."""
if self.bus is None:
return False
try:
if self.debug:
context_logger.debug_with_context("ATH20", f"Writing Block data: {[hex(v) for v in value]}")
# For AHT20, the first byte is the command, followed by parameters
if len(value) > 1:
self.bus.write_i2c_block_data(self.ATH20_ADDR, value[0], value[1:])
else:
self.bus.write_byte(self.ATH20_ADDR, value[0])
return True
except Exception as e:
context_logger.error_with_context("ATH20", f"Exception in write_block_data: {e}")
raise
[docs]
def read_block_data(self) -> list[int]:
"""Read a block of data from the sensor."""
if self.bus is None:
return []
response = []
try:
response = self.bus.read_i2c_block_data(self.ATH20_ADDR, 0x00, 6)
if self.debug:
context_logger.debug_with_context("ATH20", f"Reading Block data: {[hex(r) for r in response]}")
return response
except Exception as e:
context_logger.error_with_context("ATH20", f"Exception in read_block_data: {e}")
raise
[docs]
def getTemp(self) -> dict[str, float]:
"""
Backward compatible method to get temperature and humidity.
Uses the new retry logic.
"""
self.data = {
"temp": self.get_temperature(),
"hum": self.get_humidity(),
}
return self.data
[docs]
def getHum(self, new_measurement: bool = False) -> dict[str, float]:
"""
Backward compatible method to get humidity.
If new_measurement is True, triggers a new measurement.
"""
if new_measurement:
self.data = {
"temp": self.get_temperature(),
"hum": self.get_humidity(),
}
return self.data
if __name__ == "__main__":
sensor = ATH20(port=0)
print(sensor.getTemp())
print(sensor.getHum())