Source code for drivers.ATH20.ATH20

"""Driver for the ASAIR ATH20 (AHT20) temperature and humidity sensor.

Provides I2C communication with the ATH20 capacitive humidity and temperature
sensor. The driver handles initialization, soft reset, CRC-verified
measurement readout, and automatic reinitialization with MCP23017-controlled
power cycling on persistent I2C bus errors.

The ATH20 returns 20-bit raw values for both temperature and humidity,
which are converted to physical units using the formulas from the datasheet.

Hardware:
    - Interface: I2C
    - Default address: 0x38
    - Supply: 3.3V
    - Measurement time: ~80 ms per cycle
    - Accuracy: +/-0.3C temperature, +/-2% RH humidity

Typical usage::

    sensor = ATH20(port=0)
    data = sensor.getTemp()
    print(f"Temperature: {data['temp']}C, Humidity: {data['hum']}%")

Note:
    Requires ``smbus2`` and I2C bus access. Uses MCP23017 GPIO expander
    for 3.3V power rail reset on unrecoverable I2C errors.
"""

import os
import time
from typing import ClassVar

from smbus2 import SMBus

from drivers.MCP230XX import MCP230XX
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class ATH20: """I2C driver for the ASAIR ATH20 (AHT20) temperature and humidity sensor. Manages I2C communication, sensor initialization, calibration verification, and automatic recovery from bus errors via MCP23017-controlled power cycling. Implements a retry mechanism with configurable maximum attempts. Attributes: ATH20_ADDR: I2C slave address of the sensor (0x38). debug: Enable verbose debug logging of I2C transactions. data: Last measurement result with ``"temp"`` and ``"hum"`` keys. i2c_error: ``True`` when the sensor has experienced an I2C error requiring reinitialization. """ ATH20_ADDR = 0x38 ATH20_SOFTRESET = 0xBA ATH20_INITIALIZE: ClassVar[list[int]] = [0xBE, 0x08, 0x00] AHT20_MEASURE: ClassVar[list[int]] = [0xAC, 0x33, 0x00] ATH20_STATUS: ClassVar[int] = 0x71 ATH20_BUSY_BIT: ClassVar[int] = 0x80 ATH20_CALIBRATION_BIT: ClassVar[int] = 0x08 debug: ClassVar[bool] = False data: ClassVar[dict[str, float]] = {"temp": 0.0, "hum": 0.0}
[docs] def __init__(self, port: int = 0, max_retries: int = 3) -> None: self.port = port self.max_retries = max_retries self.bus = None self.i2c_error = False self.buffer = bytearray(6) self.mcp230XX = MCP230XX(devicenumber=os.getenv("MCP_ID", 6)) # Initialize power control context_logger.debug_with_context("ATH20", f"Buffer initialized: {self.buffer}") self._initialize_sensor()
def _initialize_sensor(self) -> bool: """Internal method to initialize/reinitialize the sensor.""" try: # Create SMBus object (clear any stale state) self.bus = SMBus(self.port) time.sleep(0.5) # Perform sensor reset and initialization if not self.reset(): context_logger.warning_with_context("ATH20", "Soft reset failed") return False # Try to initialize up to 3 times count = 0 while count < 3: if self.begin(): context_logger.info_with_context("ATH20", f"ATH20 initialized on port {self.port}") return True count += 1 time.sleep(1) context_logger.error_with_context("ATH20", "Initialization Failed!") return False except (OSError, TimeoutError, ConnectionError, ValueError) as e: # I/O errors or device not found - reset power rail context_logger.warning_with_context( "ATH20", f"I/O error during ATH20 initialization: {e}", ) try: self.mcp230XX.power_3v3_rst() time.sleep(1) # Wait for sensor to reset context_logger.warning_with_context("ATH20", "3.3V power rail reset during initialization") except Exception as reset_error: context_logger.error_with_context("ATH20", f"Failed to reset power rail: {reset_error}") self.bus = None return False except Exception as e: context_logger.error_with_context("ATH20", f"Failed to initialize ATH20 sensor. Error: {e}") self.bus = None return False
[docs] def reinitialize(self) -> bool: """ Public method to reinitialize the sensor. Returns True if successful, False otherwise. """ try: context_logger.warning_with_context("ATH20", "Reinitializing ATH20 sensor due to I/O errors...") # Nullify references to allow garbage collection self.bus = None time.sleep(1) return self._initialize_sensor() except Exception as e: context_logger.error_with_context("ATH20", f"Reinitialization failed: {e}") return False
[docs] def begin(self) -> bool: """Initialize the sensor after reset.""" try: context_logger.debug_with_context("ATH20", "Initializing ATH20...") self.write_block_data(self.ATH20_INITIALIZE) time.sleep(1) while self.get_busy_status(): time.sleep(0.5) if not self.get_calibration_status(): return False return True except Exception as e: context_logger.error_with_context("ATH20", f"Begin failed: {e}") return False
[docs] def get_busy_status(self) -> bool: """Check if sensor is busy.""" try: response = 0 response = self.read_data() if self.debug: context_logger.debug_with_context("ATH20", f"Busy Status: {hex(response)}") if not response & self.ATH20_BUSY_BIT: time.sleep(0.040) return False return True except Exception as e: context_logger.error_with_context("ATH20", f"Get busy status failed: {e}") return False
[docs] def get_calibration_status(self) -> bool: """Check if sensor is calibrated.""" try: response = 0 response = self.read_data() if self.debug: context_logger.debug_with_context("ATH20", f"Calibration Status: {hex(response)}") if not response & self.ATH20_CALIBRATION_BIT: time.sleep(0.040) return False return True except Exception as e: context_logger.error_with_context("ATH20", f"Get calibration status failed: {e}") return False
[docs] def reset(self) -> bool: """Soft reset the sensor.""" try: context_logger.debug_with_context("ATH20", "Resetting ATH20...") if self.write_data(self.ATH20_SOFTRESET): time.sleep(0.04) return True return False except Exception as e: context_logger.error_with_context("ATH20", f"Reset failed: {e}") return False
def _get_measurement(self) -> tuple[float, float]: """Internal method to get raw measurement data.""" temp_data = 0.0 hum_data = 0.0 self.write_block_data(self.AHT20_MEASURE) time.sleep(0.80) data = self.read_block_data() if len(data) > 5: temp_data = ((data[3] & 0xF) << 16) | (data[4] << 8) | data[5] temp_data = temp_data / (pow(2, 20)) * 200 - 50 hum_data = (data[1] << 12) | (data[2] << 4) | (data[3] >> 4) hum_data = hum_data * 100 / pow(2, 20) if self.debug: context_logger.debug_with_context("ATH20", f"Temperature: {temp_data} and Humidity: {hum_data}") return round(temp_data, 2), round(hum_data, 2)
[docs] def get_temperature(self) -> float: """ Get temperature with automatic reinitialization on I/O errors. """ for attempt in range(self.max_retries): try: temp, _ = self._get_measurement() context_logger.debug_with_context("ATH20", f"Temp value retrieved: {temp}") if temp == 0.0: msg = "Sensor returned zero temperature reading" raise ValueError(msg) return temp except (RuntimeError, ValueError, AttributeError) as e: self.i2c_error = True context_logger.warning_with_context( "ATH20", f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries - 1: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("ATH20", "Reinitialization failed, aborting retries") break except (OSError, TimeoutError, ConnectionError) as e: self.mcp230XX.power_3v3_rst() time.sleep(1) # Wait for sensor to reset context_logger.warning_with_context("ATH20", "3.3V power reset") self.i2c_error = True context_logger.warning_with_context( "ATH20", f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries - 1: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("ATH20", "Reinitialization failed, aborting retries") break except Exception as e: self.i2c_error = True context_logger.error_with_context( "ATH20", f"Unexpected error retrieving temperature from ATH20: {e}", ) break return 0.0
[docs] def get_humidity(self) -> float: """ Get humidity with automatic reinitialization on I/O errors. """ for attempt in range(self.max_retries): try: _, hum = self._get_measurement() context_logger.debug_with_context("ATH20", f"Humidity value retrieved: {hum}") if hum == 0.0: msg = "Sensor returned zero humidity reading" raise ValueError(msg) return hum except (RuntimeError, ValueError, AttributeError) as e: self.i2c_error = True context_logger.warning_with_context( "ATH20", f"Failed to retrieve humidity (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries - 1: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("ATH20", "Reinitialization failed, aborting retries") break except (OSError, TimeoutError, ConnectionError) as e: self.mcp230XX.power_3v3_rst() time.sleep(1) # Wait for sensor to reset context_logger.warning_with_context("ATH20", "3.3V power reset") self.i2c_error = True context_logger.warning_with_context( "ATH20", f"Failed to retrieve humidity (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries - 1: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("ATH20", "Reinitialization failed, aborting retries") break except Exception as e: self.i2c_error = True context_logger.error_with_context("ATH20", f"Unexpected error retrieving humidity from ATH20: {e}") break return 0.0
[docs] def read_data(self) -> int: """Read a single byte from the sensor.""" if self.bus is None: return 0 response = 0 try: response = self.bus.read_byte(self.ATH20_ADDR) if self.debug: context_logger.debug_with_context("ATH20", f"Reading Byte data: {hex(response)}") return response except Exception as e: context_logger.error_with_context("ATH20", f"Exception in read_data: {e}") raise
[docs] def write_data(self, value: int) -> bool: """Write a single byte to the sensor.""" if self.bus is None: return False try: if self.debug: context_logger.debug_with_context("ATH20", f"Writing Byte data: {value}") self.bus.write_byte(self.ATH20_ADDR, value) return True except Exception as e: context_logger.error_with_context("ATH20", f"Exception in write_data: {e}") raise
[docs] def write_block_data(self, value: list[int]) -> bool: """Write a block of data to the sensor.""" if self.bus is None: return False try: if self.debug: context_logger.debug_with_context("ATH20", f"Writing Block data: {[hex(v) for v in value]}") # For AHT20, the first byte is the command, followed by parameters if len(value) > 1: self.bus.write_i2c_block_data(self.ATH20_ADDR, value[0], value[1:]) else: self.bus.write_byte(self.ATH20_ADDR, value[0]) return True except Exception as e: context_logger.error_with_context("ATH20", f"Exception in write_block_data: {e}") raise
[docs] def read_block_data(self) -> list[int]: """Read a block of data from the sensor.""" if self.bus is None: return [] response = [] try: response = self.bus.read_i2c_block_data(self.ATH20_ADDR, 0x00, 6) if self.debug: context_logger.debug_with_context("ATH20", f"Reading Block data: {[hex(r) for r in response]}") return response except Exception as e: context_logger.error_with_context("ATH20", f"Exception in read_block_data: {e}") raise
[docs] def getTemp(self) -> dict[str, float]: """ Backward compatible method to get temperature and humidity. Uses the new retry logic. """ self.data = { "temp": self.get_temperature(), "hum": self.get_humidity(), } return self.data
[docs] def getHum(self, new_measurement: bool = False) -> dict[str, float]: """ Backward compatible method to get humidity. If new_measurement is True, triggers a new measurement. """ if new_measurement: self.data = { "temp": self.get_temperature(), "hum": self.get_humidity(), } return self.data
if __name__ == "__main__": sensor = ATH20(port=0) print(sensor.getTemp()) print(sensor.getHum())