"""Driver for the Bosch BME280 temperature, humidity, and pressure sensor.
Provides I2C communication with the BME280 environmental sensor via the
Adafruit CircuitPython BME280 library. The driver handles initialization on
either I2C bus 0 or 1, automatic reinitialization on I2C errors, and
MCP23017-controlled 3.3V power rail reset for hardware fault recovery.
The BME280 is a combined digital temperature, humidity, and barometric
pressure sensor with high accuracy and low power consumption.
Hardware:
- Interface: I2C (bus 0 or 1)
- Default address: 0x76 (alternate: 0x77)
- Supply: 3.3V
- Accuracy: +/-1C temp, +/-3% RH, +/-1 hPa pressure
Typical usage::
sensor = BME280(i2c_port=0, i2c_addr=0x76)
temp = sensor.get_temperature()
hum = sensor.get_humidity()
pressure = sensor.get_pressure()
Note:
Requires ``adafruit-circuitpython-bme280``, ``board``, and ``busio``
packages, plus I2C bus access on the Raspberry Pi.
"""
import os
import time
import board
import busio
from adafruit_bme280 import basic as adafruit_bme280
from drivers.gpio import gpio
from drivers.MCP230XX import MCP230XX
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class BME280:
"""I2C driver for the Bosch BME280 environmental sensor.
Wraps the Adafruit CircuitPython BME280 library with automatic retry
and reinitialization logic. On persistent I2C errors, resets the 3.3V
power rail via MCP23017 GPIO expander to recover the sensor.
Attributes:
sensor: Active Adafruit BME280 sensor instance, or ``None`` if
initialization failed.
i2c_error: ``True`` when the sensor is in an error state requiring
reinitialization.
"""
sensor = None
[docs]
def __init__(self, i2c_port: int = 0, i2c_addr: int = 0x76, max_retries: int = 3) -> None:
"""Initialize the BME280 sensor on the specified I2C bus.
Creates the I2C bus connection, initializes the MCP23017 power control,
and attempts the first sensor initialization. If initialization fails,
the sensor attribute remains ``None`` and subsequent reads will trigger
reinitialization attempts.
Args:
i2c_port: I2C bus number (0 for ``/dev/i2c-0``, 1 for ``/dev/i2c-1``).
i2c_addr: 7-bit I2C address. Use 0x76 (SDO to GND) or 0x77 (SDO to VCC).
max_retries: Maximum number of read attempts before returning a zero value.
"""
self.part_no = 24
self.i2c_port = i2c_port
self.i2c_addr = i2c_addr
self.max_retries = max_retries
self.i2c = None
self.i2c_error = False
self.mcp230XX = MCP230XX(devicenumber=os.getenv("MCP_ID", 6)) # Initialize power control FIRST
self._initialize_sensor()
def _initialize_sensor(self) -> bool:
"""Create the I2C bus and Adafruit BME280 sensor instance.
Selects the appropriate I2C bus pins based on :attr:`i2c_port`,
creates the bus object, and instantiates the Adafruit BME280 driver.
On I2C errors, attempts a 3.3V power rail reset via MCP23017.
Returns:
``True`` if the sensor was successfully initialized, ``False``
on any error.
"""
try:
if self.i2c_port not in (0, 1):
msg = "Invalid i2c_port number. Must be 0 or 1."
context_logger.error_with_context("BME280", msg)
return False
# Create I2C bus object to clear any stale state
if self.i2c_port == 0:
self.i2c = busio.I2C(board.D1, board.D0)
else:
self.i2c = busio.I2C(board.SCL, board.SDA)
# Small delay to let I2C bus stabilize after recreation
time.sleep(0.1)
# Initialize the sensor object directly (skip I2C scan)
self.sensor = adafruit_bme280.Adafruit_BME280_I2C(i2c=self.i2c, address=self.i2c_addr)
context_logger.info_with_context(
"BME280",
f"BME280 initialized on port {self.i2c_port}, addr 0x{self.i2c_addr:02X}",
)
return True
except (OSError, TimeoutError, ConnectionError, ValueError) as e:
# I/O errors or device not found - reset power rail
context_logger.warning_with_context(
"BME280",
f"I/O error during BME280 initialization at 0x{self.i2c_addr:02X}: {e}",
)
try:
self.mcp230XX.power_3v3_rst()
time.sleep(1) # Wait for sensor to reset
context_logger.warning_with_context("BME280", "3.3V power rail reset during initialization")
except Exception as reset_error:
context_logger.error_with_context("BME280", f"Failed to reset power rail: {reset_error}")
self.sensor = None
return False
except Exception as e:
context_logger.error_with_context("BME280", f"Failed to initialize BME280 sensor. Error: {e}")
self.sensor = None
return False
[docs]
def reinitialize(self) -> bool:
"""Reinitialize the sensor after an I2C error.
Releases the existing I2C bus and sensor references, waits for bus
recovery, selects the correct I2C multiplexer channel, and attempts
a fresh initialization.
Returns:
``True`` if reinitialization succeeded, ``False`` otherwise.
"""
try:
context_logger.warning_with_context("BME280", "Reinitializing BME280 sensor due to I/O errors...")
# Nullify references to allow garbage collection
self.sensor = None
self.i2c = None
# Wait before reinitializing (helps with I2C bus recovery)
time.sleep(1)
gpio.select_I2C(self.part_no)
return self._initialize_sensor()
except Exception as e:
context_logger.error_with_context("BME280", f"Reinitialization failed: {e}")
return False
[docs]
def get_temperature(self) -> float:
"""Read the current temperature from the BME280 sensor.
Attempts to read the temperature up to :attr:`max_retries` times.
On I2C bus errors, resets the 3.3V power rail and reinitializes
the sensor before retrying.
Returns:
Temperature in degrees Celsius, or ``0.0`` if all read attempts
fail.
"""
for attempt in range(self.max_retries):
try:
temp = self.sensor.temperature
context_logger.debug_with_context("BME280", f"Temp value retrieved: {temp}")
if temp == 0.0:
msg = "Sensor returned zero temperature reading"
raise ValueError(msg)
return temp
except (RuntimeError, ValueError, AttributeError) as e:
self.i2c_error = True
context_logger.warning_with_context(
"BME280",
f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries")
break
except (OSError, TimeoutError, ConnectionError) as e:
self.mcp230XX.power_3v3_rst()
time.sleep(1) # Wait for sensor to reset
context_logger.warning_with_context("BME280", "3.3 Reboot")
self.i2c_error = True
context_logger.warning_with_context(
"BME280",
f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries")
break
except Exception as e:
self.i2c_error = True
context_logger.error_with_context(
"BME280",
f"Unexpected error retrieving temperature from BME280: {e}",
)
break
return 0.0
[docs]
def get_humidity(self) -> float:
"""Read the current relative humidity from the BME280 sensor.
Attempts to read the humidity up to :attr:`max_retries` times.
On I2C bus errors, resets the 3.3V power rail and reinitializes
the sensor before retrying.
Returns:
Relative humidity in percent (0-100), or ``0.0`` if all read
attempts fail.
"""
for attempt in range(self.max_retries):
try:
hum = self.sensor.relative_humidity
context_logger.debug_with_context("BME280", f"Humidity Index retrieved: {hum}")
if hum == 0.0:
msg = "Sensor returned zero humidity reading"
raise ValueError(msg)
return hum
except (RuntimeError, ValueError, AttributeError) as e:
self.i2c_error = True
context_logger.warning_with_context(
"BME280",
f"Failed to retrieve humidity (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries")
break
except (OSError, TimeoutError, ConnectionError) as e:
self.mcp230XX.power_3v3_rst()
time.sleep(1) # Wait for sensor to reset
context_logger.warning_with_context("BME280", "3.3 Reboot")
self.i2c_error = True
context_logger.warning_with_context(
"BME280",
f"Failed to retrieve humidity (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries")
break
except Exception as e:
self.i2c_error = True
context_logger.error_with_context("BME280", f"Unexpected error retrieving humidity from BME280: {e}")
break
return 0.0
[docs]
def get_pressure(self) -> float:
"""Read the current barometric pressure from the BME280 sensor.
Attempts to read the pressure up to :attr:`max_retries` times.
On I2C bus errors, resets the 3.3V power rail and reinitializes
the sensor before retrying.
Returns:
Barometric pressure in hPa (hectopascals), or ``0.0`` if all
read attempts fail.
"""
for attempt in range(self.max_retries):
try:
pressure = self.sensor.pressure
context_logger.debug_with_context("BME280", f"Pressure value retrieved: {pressure}")
if pressure == 0.0:
msg = "Sensor returned zero pressure reading"
raise ValueError(msg)
return pressure
except (RuntimeError, ValueError, AttributeError) as e:
self.i2c_error = True
context_logger.warning_with_context(
"BME280",
f"Failed to retrieve pressure (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries")
break
except (OSError, TimeoutError, ConnectionError) as e:
self.mcp230XX.power_3v3_rst()
time.sleep(1) # Wait for sensor to reset
context_logger.warning_with_context("BME280", "3.3 Reboot")
self.i2c_error = True
context_logger.warning_with_context(
"BME280",
f"Failed to retrieve pressure (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries")
break
except Exception as e:
self.i2c_error = True
context_logger.error_with_context("BME280", f"Unexpected error retrieving pressure from BME280: {e}")
break
return 0.0
[docs]
def getAltitude(self) -> float:
"""Read the calculated altitude from the BME280 sensor.
The altitude is derived from the pressure reading using the
international barometric formula. Attempts to read up to
:attr:`max_retries` times with automatic reinitialization.
Returns:
Altitude in meters above sea level, or ``0.0`` if all read
attempts fail.
"""
for attempt in range(self.max_retries):
try:
alt = self.sensor.altitude
context_logger.debug_with_context("BME280", f"Altitude value retrieved: {alt}")
return alt
except (RuntimeError, ValueError, AttributeError) as e:
self.i2c_error = True
context_logger.warning_with_context(
"BME280",
f"Failed to retrieve altitude (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries")
break
except (OSError, TimeoutError, ConnectionError) as e:
self.mcp230XX.power_3v3_rst()
time.sleep(1) # Wait for sensor to reset
context_logger.warning_with_context("BME280", "3.3 Reboot")
self.i2c_error = True
context_logger.warning_with_context(
"BME280",
f"Failed to retrieve altitude (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("BME280", "Reinitialization failed, aborting retries")
break
except Exception as e:
self.i2c_error = True
context_logger.error_with_context("BME280", f"Unexpected error retrieving altitude from BME280: {e}")
break
return 0.0