"""Driver for the STMicroelectronics LPS25HB barometric pressure sensor.
Provides I2C communication with the LPS25HB MEMS pressure sensor via the
Adafruit CircuitPython LPS2x library. The driver handles initialization on
either I2C bus 0 or 1, automatic reinitialization on I2C errors, and
MCP23017-controlled 3.3V power rail reset for hardware fault recovery.
The LPS25HB measures absolute barometric pressure (260-1260 hPa) and
includes an embedded temperature sensor for compensation.
Hardware:
- Interface: I2C (bus 0 or 1)
- Default address: 0x5D (alternate: 0x5C)
- Supply: 3.3V
- Range: 260-1260 hPa
- Accuracy: +/-0.2 hPa (absolute), +/-2C temperature
Typical usage::
sensor = LPS25HB(i2c_port=0, i2c_addr=0x5D)
temp = sensor.get_temperature()
pressure = sensor.get_pressure()
Note:
Requires ``adafruit-circuitpython-lps2x``, ``board``, and ``busio``.
"""
import os
import time
import adafruit_lps2x
import board
import busio
from drivers.gpio import gpio
from drivers.MCP230XX import MCP230XX
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class LPS25HB:
"""I2C driver for the STMicroelectronics LPS25HB pressure sensor.
Wraps the Adafruit LPS2x library with automatic retry and reinitialization
logic. On persistent I2C errors, resets the 3.3V power rail via MCP23017
GPIO expander to recover the sensor.
Attributes:
sensor: Active Adafruit LPS25 sensor instance, or ``None`` if
initialization failed.
i2c_error: ``True`` when the sensor is in an error state requiring
reinitialization.
"""
sensor = None
[docs]
def __init__(self, i2c_port: int = 0, i2c_addr: int = 0x5D, max_retries: int = 3) -> None:
self.part_no = 28
self.i2c_port = i2c_port
self.i2c_addr = i2c_addr
self.max_retries = max_retries
self.i2c = None
self.i2c_error = False
self.mcp230XX = MCP230XX(devicenumber=os.getenv("MCP_ID", 6)) # Initialize power control FIRST
self._initialize_sensor()
def _initialize_sensor(self) -> bool:
"""Internal method to initialize/reinitialize the sensor."""
try:
if self.i2c_port not in (0, 1):
msg = "Invalid i2c_port number. Must be 0 or 1."
context_logger.error_with_context("LPS25HB", msg)
return False
# Create I2C bus object to clear any stale state
if self.i2c_port == 0:
self.i2c = busio.I2C(board.D1, board.D0)
else:
self.i2c = busio.I2C(board.SCL, board.SDA)
# Small delay to let I2C bus stabilize after recreation
time.sleep(0.1)
# Initialize the sensor object directly (skip I2C scan)
self.sensor = adafruit_lps2x.LPS25(i2c_bus=self.i2c, address=self.i2c_addr)
context_logger.info_with_context(
"LPS25HB",
f"LPS25HB initialized on port {self.i2c_port}, addr 0x{self.i2c_addr:02X}",
)
return True
except (OSError, TimeoutError, ConnectionError, ValueError) as e:
# I/O errors or device not found - reset power rail
context_logger.warning_with_context(
"LPS25HB",
f"I/O error during LPS25HB initialization at 0x{self.i2c_addr:02X}: {e}",
)
try:
self.mcp230XX.power_3v3_rst()
time.sleep(1) # Wait for sensor to reset
context_logger.warning_with_context("LPS25HB", "3.3V power rail reset during initialization")
except Exception as reset_error:
context_logger.error_with_context("LPS25HB", f"Failed to reset power rail: {reset_error}")
self.sensor = None
return False
except Exception as e:
context_logger.error_with_context("LPS25HB", f"Failed to initialize LPS25HB sensor. Error: {e}")
self.sensor = None
return False
[docs]
def reinitialize(self) -> bool:
"""
Public method to reinitialize the sensor.
Returns True if successful, False otherwise.
"""
try:
context_logger.warning_with_context("LPS25HB", "Reinitializing LPS25HB sensor due to I/O errors...")
# Nullify references to allow garbage collection
self.sensor = None
self.i2c = None
# Wait before reinitializing (helps with I2C bus recovery)
time.sleep(1)
gpio.select_I2C(self.part_no)
return self._initialize_sensor()
except Exception as e:
context_logger.error_with_context("LPS25HB", f"Reinitialization failed: {e}")
return False
[docs]
def get_temperature(self) -> float:
"""
Get temperature with automatic reinitialization on I/O errors.
"""
for attempt in range(self.max_retries):
try:
temp = self.sensor.temperature
context_logger.debug_with_context("LPS25HB", f"Temp value retrieved: {temp}")
if temp == 0.0:
msg = "Sensor returned zero temperature reading"
raise ValueError(msg)
return temp
except (RuntimeError, ValueError, AttributeError) as e:
self.i2c_error = True
context_logger.warning_with_context(
"LPS25HB",
f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries - 1:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("LPS25HB", "Reinitialization failed, aborting retries")
break
except (OSError, TimeoutError, ConnectionError) as e:
self.mcp230XX.power_3v3_rst()
time.sleep(1) # Wait for sensor to reset
context_logger.warning_with_context("LPS25HB", "3.3 Reboot")
self.i2c_error = True
context_logger.warning_with_context(
"LPS25HB",
f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries - 1:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("LPS25HB", "Reinitialization failed, aborting retries")
break
except Exception as e:
self.i2c_error = True
context_logger.error_with_context(
"LPS25HB",
f"Unexpected error retrieving temperature from LPS25HB: {e}",
)
break
return 0.0
[docs]
def get_pressure(self) -> float:
"""
Get pressure with automatic reinitialization on I/O errors.
"""
for attempt in range(self.max_retries):
try:
pressure = self.sensor.pressure
context_logger.debug_with_context("LPS25HB", f"Pressure value retrieved: {pressure}")
if pressure == 0.0:
msg = "Sensor returned zero pressure reading"
raise ValueError(msg)
return pressure
except (RuntimeError, ValueError, AttributeError) as e:
self.i2c_error = True
context_logger.warning_with_context(
"LPS25HB",
f"Failed to retrieve pressure (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries - 1:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("LPS25HB", "Reinitialization failed, aborting retries")
break
except (OSError, TimeoutError, ConnectionError) as e:
self.mcp230XX.power_3v3_rst()
time.sleep(1) # Wait for sensor to reset
context_logger.warning_with_context("LPS25HB", "3.3 Reboot")
self.i2c_error = True
context_logger.warning_with_context(
"LPS25HB",
f"Failed to retrieve pressure (attempt {attempt + 1}/{self.max_retries}). Error: {e}.",
)
if attempt < self.max_retries - 1:
# Reinitialize before next attempt
if self.reinitialize():
time.sleep(1) # Brief delay before retry
else:
context_logger.error_with_context("LPS25HB", "Reinitialization failed, aborting retries")
break
except Exception as e:
self.i2c_error = True
context_logger.error_with_context("LPS25HB", f"Unexpected error retrieving pressure from LPS25HB: {e}")
break
return 0.0