Source code for drivers.BME280.BME280

"""Driver for the Bosch BME280 temperature, humidity, and pressure sensor.

Provides I2C communication with the BME280 environmental sensor via the
Adafruit CircuitPython BME280 library. The driver handles initialization on
either I2C bus 0 or 1, automatic reinitialization on I2C errors, and
MCP23017-controlled 3.3V power rail reset for hardware fault recovery.

The BME280 is a combined digital temperature, humidity, and barometric
pressure sensor with high accuracy and low power consumption.

Hardware:
    - Interface: I2C (bus 0 or 1)
    - Default address: 0x76 (alternate: 0x77)
    - Supply: 3.3V
    - Accuracy: +/-1C temp, +/-3% RH, +/-1 hPa pressure

Typical usage::

    sensor = BME280(i2c_port=0, i2c_addr=0x76)
    temp = sensor.get_temperature()
    hum = sensor.get_humidity()
    pressure = sensor.get_pressure()

Note:
    Requires ``adafruit-circuitpython-bme280``, ``board``, and ``busio``
    packages, plus I2C bus access on the Raspberry Pi.
"""

import os
import time

import board
import busio
from adafruit_bme280 import basic as adafruit_bme280

from drivers.gpio import gpio
from drivers.MCP230XX import MCP230XX
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class BME280: """I2C driver for the Bosch BME280 environmental sensor. Wraps the Adafruit CircuitPython BME280 library with automatic retry and reinitialization logic. On persistent I2C errors, resets the 3.3V power rail via MCP23017 GPIO expander to recover the sensor. Attributes: sensor: Active Adafruit BME280 sensor instance, or ``None`` if initialization failed. i2c_error: ``True`` when the sensor is in an error state requiring reinitialization. """ sensor = None
[docs] def __init__(self, i2c_port: int = 0, i2c_addr: int = 0x76, max_retries: int = 3) -> None: """Initialize the BME280 sensor on the specified I2C bus. Creates the I2C bus connection, initializes the MCP23017 power control, and attempts the first sensor initialization. If initialization fails, the sensor attribute remains ``None`` and subsequent reads will trigger reinitialization attempts. Args: i2c_port: I2C bus number (0 for ``/dev/i2c-0``, 1 for ``/dev/i2c-1``). i2c_addr: 7-bit I2C address. Use 0x76 (SDO to GND) or 0x77 (SDO to VCC). max_retries: Maximum number of read attempts before returning a zero value. """ self.part_no = 24 self.i2c_port = i2c_port self.i2c_addr = i2c_addr self.max_retries = max_retries self.i2c = None self.i2c_error = False self.mcp230XX = MCP230XX(devicenumber=os.getenv("MCP_ID", 6)) # Initialize power control FIRST self._initialize_sensor()
def _initialize_sensor(self) -> bool: """Create the I2C bus and Adafruit BME280 sensor instance. Selects the appropriate I2C bus pins based on :attr:`i2c_port`, creates the bus object, and instantiates the Adafruit BME280 driver. On I2C errors, attempts a 3.3V power rail reset via MCP23017. Returns: ``True`` if the sensor was successfully initialized, ``False`` on any error. """ try: if self.i2c_port not in (0, 1): msg = "Invalid i2c_port number. Must be 0 or 1." context_logger.error_with_context("BME280", msg) return False # Create I2C bus object to clear any stale state if self.i2c_port == 0: self.i2c = busio.I2C(board.D1, board.D0) else: self.i2c = busio.I2C(board.SCL, board.SDA) # Small delay to let I2C bus stabilize after recreation time.sleep(0.1) # Initialize the sensor object directly (skip I2C scan) self.sensor = adafruit_bme280.Adafruit_BME280_I2C(i2c=self.i2c, address=self.i2c_addr) context_logger.info_with_context( "BME280", f"BME280 initialized on port {self.i2c_port}, addr 0x{self.i2c_addr:02X}", ) return True except (OSError, TimeoutError, ConnectionError, ValueError) as e: # I/O errors or device not found - reset power rail context_logger.warning_with_context( "BME280", f"I/O error during BME280 initialization at 0x{self.i2c_addr:02X}: {e}", ) try: self.mcp230XX.power_3v3_rst() time.sleep(1) # Wait for sensor to reset context_logger.warning_with_context("BME280", "3.3V power rail reset during initialization") except Exception as reset_error: context_logger.error_with_context("BME280", f"Failed to reset power rail: {reset_error}") self.sensor = None return False except Exception as e: context_logger.error_with_context("BME280", f"Failed to initialize BME280 sensor. Error: {e}") self.sensor = None return False
[docs] def reinitialize(self) -> bool: """Reinitialize the sensor after an I2C error. Releases the existing I2C bus and sensor references, waits for bus recovery, selects the correct I2C multiplexer channel, and attempts a fresh initialization. Returns: ``True`` if reinitialization succeeded, ``False`` otherwise. """ try: context_logger.warning_with_context("BME280", "Reinitializing BME280 sensor due to I/O errors...") # Nullify references to allow garbage collection self.sensor = None self.i2c = None # Wait before reinitializing (helps with I2C bus recovery) time.sleep(1) gpio.select_I2C(self.part_no) return self._initialize_sensor() except Exception as e: context_logger.error_with_context("BME280", f"Reinitialization failed: {e}") return False
[docs] def get_temperature(self) -> float: """Read the current temperature from the BME280 sensor. Attempts to read the temperature up to :attr:`max_retries` times. On I2C bus errors, resets the 3.3V power rail and reinitializes the sensor before retrying. Returns: Temperature in degrees Celsius, or ``0.0`` if all read attempts fail. """ for attempt in range(self.max_retries): try: temp = self.sensor.temperature context_logger.debug_with_context("BME280", f"Temp value retrieved: {temp}") if temp == 0.0: msg = "Sensor returned zero temperature reading" raise ValueError(msg) return temp except (RuntimeError, ValueError, AttributeError) as e: self.i2c_error = True context_logger.warning_with_context( "BME280", f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries") break except (OSError, TimeoutError, ConnectionError) as e: self.mcp230XX.power_3v3_rst() time.sleep(1) # Wait for sensor to reset context_logger.warning_with_context("BME280", "3.3 Reboot") self.i2c_error = True context_logger.warning_with_context( "BME280", f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries") break except Exception as e: self.i2c_error = True context_logger.error_with_context( "BME280", f"Unexpected error retrieving temperature from BME280: {e}", ) break return 0.0
[docs] def get_humidity(self) -> float: """Read the current relative humidity from the BME280 sensor. Attempts to read the humidity up to :attr:`max_retries` times. On I2C bus errors, resets the 3.3V power rail and reinitializes the sensor before retrying. Returns: Relative humidity in percent (0-100), or ``0.0`` if all read attempts fail. """ for attempt in range(self.max_retries): try: hum = self.sensor.relative_humidity context_logger.debug_with_context("BME280", f"Humidity Index retrieved: {hum}") if hum == 0.0: msg = "Sensor returned zero humidity reading" raise ValueError(msg) return hum except (RuntimeError, ValueError, AttributeError) as e: self.i2c_error = True context_logger.warning_with_context( "BME280", f"Failed to retrieve humidity (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries") break except (OSError, TimeoutError, ConnectionError) as e: self.mcp230XX.power_3v3_rst() time.sleep(1) # Wait for sensor to reset context_logger.warning_with_context("BME280", "3.3 Reboot") self.i2c_error = True context_logger.warning_with_context( "BME280", f"Failed to retrieve humidity (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries") break except Exception as e: self.i2c_error = True context_logger.error_with_context("BME280", f"Unexpected error retrieving humidity from BME280: {e}") break return 0.0
[docs] def get_pressure(self) -> float: """Read the current barometric pressure from the BME280 sensor. Attempts to read the pressure up to :attr:`max_retries` times. On I2C bus errors, resets the 3.3V power rail and reinitializes the sensor before retrying. Returns: Barometric pressure in hPa (hectopascals), or ``0.0`` if all read attempts fail. """ for attempt in range(self.max_retries): try: pressure = self.sensor.pressure context_logger.debug_with_context("BME280", f"Pressure value retrieved: {pressure}") if pressure == 0.0: msg = "Sensor returned zero pressure reading" raise ValueError(msg) return pressure except (RuntimeError, ValueError, AttributeError) as e: self.i2c_error = True context_logger.warning_with_context( "BME280", f"Failed to retrieve pressure (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries") break except (OSError, TimeoutError, ConnectionError) as e: self.mcp230XX.power_3v3_rst() time.sleep(1) # Wait for sensor to reset context_logger.warning_with_context("BME280", "3.3 Reboot") self.i2c_error = True context_logger.warning_with_context( "BME280", f"Failed to retrieve pressure (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries") break except Exception as e: self.i2c_error = True context_logger.error_with_context("BME280", f"Unexpected error retrieving pressure from BME280: {e}") break return 0.0
[docs] def getAltitude(self) -> float: """Read the calculated altitude from the BME280 sensor. The altitude is derived from the pressure reading using the international barometric formula. Attempts to read up to :attr:`max_retries` times with automatic reinitialization. Returns: Altitude in meters above sea level, or ``0.0`` if all read attempts fail. """ for attempt in range(self.max_retries): try: alt = self.sensor.altitude context_logger.debug_with_context("BME280", f"Altitude value retrieved: {alt}") return alt except (RuntimeError, ValueError, AttributeError) as e: self.i2c_error = True context_logger.warning_with_context( "BME280", f"Failed to retrieve altitude (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries") break except (OSError, TimeoutError, ConnectionError) as e: self.mcp230XX.power_3v3_rst() time.sleep(1) # Wait for sensor to reset context_logger.warning_with_context("BME280", "3.3 Reboot") self.i2c_error = True context_logger.warning_with_context( "BME280", f"Failed to retrieve altitude (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries") break except Exception as e: self.i2c_error = True context_logger.error_with_context("BME280", f"Unexpected error retrieving altitude from BME280: {e}") break return 0.0