Source code for drivers.LPS25HB.LPS25HB

"""Driver for the STMicroelectronics LPS25HB barometric pressure sensor.

Provides I2C communication with the LPS25HB MEMS pressure sensor via the
Adafruit CircuitPython LPS2x library. The driver handles initialization on
either I2C bus 0 or 1, automatic reinitialization on I2C errors, and
MCP23017-controlled 3.3V power rail reset for hardware fault recovery.

The LPS25HB measures absolute barometric pressure (260-1260 hPa) and
includes an embedded temperature sensor for compensation.

Hardware:
    - Interface: I2C (bus 0 or 1)
    - Default address: 0x5D (alternate: 0x5C)
    - Supply: 3.3V
    - Range: 260-1260 hPa
    - Accuracy: +/-0.2 hPa (absolute), +/-2C temperature

Typical usage::

    sensor = LPS25HB(i2c_port=0, i2c_addr=0x5D)
    temp = sensor.get_temperature()
    pressure = sensor.get_pressure()

Note:
    Requires ``adafruit-circuitpython-lps2x``, ``board``, and ``busio``.
"""

import os
import time

import adafruit_lps2x
import board
import busio

from drivers.gpio import gpio
from drivers.MCP230XX import MCP230XX
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class LPS25HB: """I2C driver for the STMicroelectronics LPS25HB pressure sensor. Wraps the Adafruit LPS2x library with automatic retry and reinitialization logic. On persistent I2C errors, resets the 3.3V power rail via MCP23017 GPIO expander to recover the sensor. Attributes: sensor: Active Adafruit LPS25 sensor instance, or ``None`` if initialization failed. i2c_error: ``True`` when the sensor is in an error state requiring reinitialization. """ sensor = None
[docs] def __init__(self, i2c_port: int = 0, i2c_addr: int = 0x5D, max_retries: int = 3) -> None: self.part_no = 28 self.i2c_port = i2c_port self.i2c_addr = i2c_addr self.max_retries = max_retries self.i2c = None self.i2c_error = False self.mcp230XX = MCP230XX(devicenumber=os.getenv("MCP_ID", 6)) # Initialize power control FIRST self._initialize_sensor()
def _initialize_sensor(self) -> bool: """Internal method to initialize/reinitialize the sensor.""" try: if self.i2c_port not in (0, 1): msg = "Invalid i2c_port number. Must be 0 or 1." context_logger.error_with_context("LPS25HB", msg) return False # Create I2C bus object to clear any stale state if self.i2c_port == 0: self.i2c = busio.I2C(board.D1, board.D0) else: self.i2c = busio.I2C(board.SCL, board.SDA) # Small delay to let I2C bus stabilize after recreation time.sleep(0.1) # Initialize the sensor object directly (skip I2C scan) self.sensor = adafruit_lps2x.LPS25(i2c_bus=self.i2c, address=self.i2c_addr) context_logger.info_with_context( "LPS25HB", f"LPS25HB initialized on port {self.i2c_port}, addr 0x{self.i2c_addr:02X}", ) return True except (OSError, TimeoutError, ConnectionError, ValueError) as e: # I/O errors or device not found - reset power rail context_logger.warning_with_context( "LPS25HB", f"I/O error during LPS25HB initialization at 0x{self.i2c_addr:02X}: {e}", ) try: self.mcp230XX.power_3v3_rst() time.sleep(1) # Wait for sensor to reset context_logger.warning_with_context("LPS25HB", "3.3V power rail reset during initialization") except Exception as reset_error: context_logger.error_with_context("LPS25HB", f"Failed to reset power rail: {reset_error}") self.sensor = None return False except Exception as e: context_logger.error_with_context("LPS25HB", f"Failed to initialize LPS25HB sensor. Error: {e}") self.sensor = None return False
[docs] def reinitialize(self) -> bool: """ Public method to reinitialize the sensor. Returns True if successful, False otherwise. """ try: context_logger.warning_with_context("LPS25HB", "Reinitializing LPS25HB sensor due to I/O errors...") # Nullify references to allow garbage collection self.sensor = None self.i2c = None # Wait before reinitializing (helps with I2C bus recovery) time.sleep(1) gpio.select_I2C(self.part_no) return self._initialize_sensor() except Exception as e: context_logger.error_with_context("LPS25HB", f"Reinitialization failed: {e}") return False
[docs] def get_temperature(self) -> float: """ Get temperature with automatic reinitialization on I/O errors. """ for attempt in range(self.max_retries): try: temp = self.sensor.temperature context_logger.debug_with_context("LPS25HB", f"Temp value retrieved: {temp}") if temp == 0.0: msg = "Sensor returned zero temperature reading" raise ValueError(msg) return temp except (RuntimeError, ValueError, AttributeError) as e: self.i2c_error = True context_logger.warning_with_context( "LPS25HB", f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries - 1: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("LPS25HB", "Reinitialization failed, aborting retries") break except (OSError, TimeoutError, ConnectionError) as e: self.mcp230XX.power_3v3_rst() time.sleep(1) # Wait for sensor to reset context_logger.warning_with_context("LPS25HB", "3.3 Reboot") self.i2c_error = True context_logger.warning_with_context( "LPS25HB", f"Failed to retrieve temperature (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries - 1: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("LPS25HB", "Reinitialization failed, aborting retries") break except Exception as e: self.i2c_error = True context_logger.error_with_context( "LPS25HB", f"Unexpected error retrieving temperature from LPS25HB: {e}", ) break return 0.0
[docs] def get_pressure(self) -> float: """ Get pressure with automatic reinitialization on I/O errors. """ for attempt in range(self.max_retries): try: pressure = self.sensor.pressure context_logger.debug_with_context("LPS25HB", f"Pressure value retrieved: {pressure}") if pressure == 0.0: msg = "Sensor returned zero pressure reading" raise ValueError(msg) return pressure except (RuntimeError, ValueError, AttributeError) as e: self.i2c_error = True context_logger.warning_with_context( "LPS25HB", f"Failed to retrieve pressure (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries - 1: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("LPS25HB", "Reinitialization failed, aborting retries") break except (OSError, TimeoutError, ConnectionError) as e: self.mcp230XX.power_3v3_rst() time.sleep(1) # Wait for sensor to reset context_logger.warning_with_context("LPS25HB", "3.3 Reboot") self.i2c_error = True context_logger.warning_with_context( "LPS25HB", f"Failed to retrieve pressure (attempt {attempt + 1}/{self.max_retries}). Error: {e}.", ) if attempt < self.max_retries - 1: # Reinitialize before next attempt if self.reinitialize(): time.sleep(1) # Brief delay before retry else: context_logger.error_with_context("LPS25HB", "Reinitialization failed, aborting retries") break except Exception as e: self.i2c_error = True context_logger.error_with_context("LPS25HB", f"Unexpected error retrieving pressure from LPS25HB: {e}") break return 0.0