Source code for drivers.MCP230XX.MCP230XX

"""Driver for MCP23008 and MCP23017 I2C GPIO expander ICs.

Provides digital I/O, interrupt management, fan control, power rail resets,
UART mux selection, and SAMD co-processor reset via the MCP230XX registers.
Supports both 8-bit (MCP23008) and 16-bit (MCP23017) register addressing
schemes.

Typical usage::

    mcp = MCP230XX(chip="MCP23017", i2cAddress=0x21, devicenumber=1)
    mcp.resetDefault()
    mcp.FAN_HIGH()
"""

import time
from collections.abc import Callable

import smbus2

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)

# import subprocess


[docs] class MCP230XX: """I2C GPIO expander driver for MCP23008/MCP23017. Attributes: i2cAddress: 7-bit I2C address of the expander. bus: ``smbus2.SMBus`` instance for I2C communication. chip: Chip variant string (``"MCP23008"`` or ``"MCP23017"``). bank: Register addressing scheme (``"8bit"`` or ``"16bit"``). callBackFuncts: Per-pin interrupt callback function list. """ FAN = 2 SAMD_RST = 0 B4SW = 3 PIN_A = 6 # Read FAN status PIN_B = 5 # Read FAN status FAN_STATUS = 4 RPI_FAN = 1 POWER_3V3 = 9 POWER_5V = 8 A_UART = 3 B_UART = 7 bus = None sensor_bus = None
[docs] def __init__( self, chip: str = "MCP23017", i2cAddress: int = 0x21, devicenumber: int = 1, regScheme: str = "16bit", ) -> None: """Initialize the MCP230XX expander over I2C. Args: chip: IC variant -- ``"MCP23008"`` or ``"MCP23017"``. i2cAddress: 7-bit I2C address (default ``0x21``). devicenumber: Linux I2C bus number. regScheme: Register addressing (``"8bit"`` or ``"16bit"``). """ try: self.i2cAddress = i2cAddress self.bus = smbus2.SMBus(devicenumber) self.chip = chip if self.chip == "MCP23008": self.bank = "8bit" else: self.bank = regScheme self.callBackFuncts = [] for _ in range(0, 17): self.callBackFuncts.append(["empty", "empty"]) except Exception as e: context_logger.error_with_context("MCP230XX", f"Failed to initialize MCP230XX: {e}")
[docs] def resetDefault(self) -> None: """Configure all application-specific pins to their default directions and levels.""" try: self.pinMode(self.FAN, "output") self.pinMode(self.SAMD_RST, "output") self.pinMode(self.PIN_A, "input", "enable") self.pinMode(self.PIN_B, "input", "enable") self.pinMode(self.FAN_STATUS, "output") self.pinMode(self.RPI_FAN, "output") self.pinMode(self.POWER_3V3, "output") self.pinMode(self.POWER_5V, "output") self.pinMode(self.A_UART, "output") self.pinMode(self.B_UART, "output") self.digitalWrite(self.SAMD_RST, 0) self.digitalWrite(self.POWER_5V, 1) self.digitalWrite(self.POWER_3V3, 1) self.digitalWrite(self.A_UART, 0) self.digitalWrite(self.B_UART, 0) self.digitalWrite(self.FAN_STATUS, 1) except Exception as e: context_logger.error_with_context("MCP230XX", f"Failed to reset MCP230XX to default: {e}")
[docs] def FAN_HIGH(self) -> None: """Drive the external fan power pin HIGH (on).""" self.digitalWrite(self.FAN, 1)
[docs] def FAN_LOW(self) -> None: """Drive the external fan power pin LOW (off).""" self.digitalWrite(self.FAN, 0)
[docs] def samd_rst(self) -> None: """Pulse the SAMD co-processor reset line (5-second hold).""" self.digitalWrite(self.SAMD_RST, 1) time.sleep(5) self.digitalWrite(self.SAMD_RST, 0)
[docs] def samd_boot_mode(self) -> None: """Double-tap the SAMD reset line to enter bootloader mode.""" self.digitalWrite(self.SAMD_RST, 1) time.sleep(0.1) self.digitalWrite(self.SAMD_RST, 0) time.sleep(0.1) self.digitalWrite(self.SAMD_RST, 1) time.sleep(0.1) self.digitalWrite(self.SAMD_RST, 0)
[docs] def power_3v3_rst(self) -> None: """Cycle the 3.3 V power rail off for 5 seconds, then back on.""" self.digitalWrite(self.POWER_3V3, 0) time.sleep(5) self.digitalWrite(self.POWER_3V3, 1)
[docs] def power_5v_rst(self) -> None: """Cycle the 5 V power rail off for 5 seconds, then back on.""" self.digitalWrite(self.POWER_5V, 0) time.sleep(5) self.digitalWrite(self.POWER_5V, 1)
[docs] def select_lora(self) -> None: """Set the UART mux to route to the LoRa module.""" self.digitalWrite(self.A_UART, 1) self.digitalWrite(self.B_UART, 0)
[docs] def select_PM100(self) -> None: """Set the UART mux to route to the PM100 dust sensor.""" self.digitalWrite(self.A_UART, 0) self.digitalWrite(self.B_UART, 0) time.sleep(0.1)
[docs] def select_UART_1(self) -> None: """Set the UART mux to route to auxiliary UART channel 1.""" self.digitalWrite(self.A_UART, 0) self.digitalWrite(self.B_UART, 1)
[docs] def select_UART_UV(self) -> None: """Set the UART mux to route to the UV sensor UART channel.""" self.digitalWrite(self.A_UART, 1) self.digitalWrite(self.B_UART, 1)
# def select_I2C(self, pn = 24): # print(f"[MCP] Selecting I2C {pn}") # if pn in [0, 23, 24, 53, 54, 1000]: # self.digitalWrite(self.A_UART, 0) # self.digitalWrite(self.B_UART, 0) # elif pn in [1, 32, 33, 34, 35, 36]: # self.digitalWrite(self.A_UART, 1) # self.digitalWrite(self.B_UART, 0) # elif pn in [2, 25, 26]: # self.digitalWrite(self.A_UART, 0) # self.digitalWrite(self.B_UART, 1) # elif pn in [3]: # self.digitalWrite(self.A_UART, 1) # self.digitalWrite(self.B_UART, 1) # time.sleep(0.1) # def detect_i2c_addresses(self, bus_number=0): # try: # output = subprocess.check_output(['i2cdetect', '-y', str(bus_number)], universal_newlines=True) # addresses = [] # lines = output.split('\n')[1:] # Skip the header row # for line in lines: # parts = line.split() # if parts: # for part in parts[1:]: # Skip the first column which is the row header # if part != '--': # addresses.append(int(part, 16)) # return addresses # except subprocess.CalledProcessError as e: # print(f"An error occurred while detecting I2C addresses: {e}") # return [] # def scan_i2c_mux(self): # all_addresses = {} # for channel in range(4): # self.select_I2C(channel) # addresses = self.detect_i2c_addresses() # all_addresses[channel] = addresses # for channel, addresses in all_addresses.items(): # print("------------") # print(f"Channel {channel}:") # sensors = [self.sensorList[str(hex(num)) + '_' + str(channel)] for num in addresses] # for sensor in sensors: # print(sensor) # print("------------")
[docs] def readFAN(self, fan_delay: int) -> bool: """Sample the fan tachometer pins for *fan_delay* seconds and evaluate health. Args: fan_delay: Duration in seconds to sample the two fan status pins. Returns: True if the fan is spinning (signal is toggling), False if stuck. """ status1, status2 = [], [] time_prev = int(time.time()) while fan_delay > (int(time.time()) - time_prev): status1.append(self.input(self.PIN_A)) status2.append(self.input(self.PIN_B)) err1 = sum(status1) / len(status1) err2 = sum(status2) / len(status2) context_logger.info_with_context("FAN", f"Errors {err1}, {err2}") if err1 == 0 or err1 == 1 or err2 == 0 or err2 == 1: self.digitalWrite(self.FAN_STATUS, 0) return False self.digitalWrite(self.FAN_STATUS, 1) return True
# def UART3_3(self): # self.digitalWrite(self.A_UART, 1) # self.digitalWrite(self.B_UART, 1)
[docs] def single_access_read(self, reg: int = 0x00) -> int: """single_access_read, function to read a single data register of the MCP230xx gpio expander""" dataTransfer = 0 try: dataTransfer = self.bus.read_byte_data(self.i2cAddress, reg) except Exception as e: context_logger.error_with_context("MCP230XX", f"single_access_read: {e}") return dataTransfer
[docs] def single_access_write(self, reg: int = 0x00, regValue: int = 0x0) -> None: """single_access_write, function to write to a single data register of the MCP230xx gpio expander""" try: self.bus.write_byte_data(self.i2cAddress, reg, regValue) except Exception as e: context_logger.error_with_context("MCP230XX", f"single_access_write: {e}") return
[docs] def register_bit_select(self, pin: int, reg8A: int, reg16A: int, reg8B: int, reg16B: int) -> tuple[int, int]: """register_bit_select, function to return the proper register and bit position to use for a particular GPIO and GPIO function""" # need to add way of throwing an error if pin is outside # of 0-15 range if pin >= 0 and pin < 8: bit = pin if self.bank == "16bit": reg = reg16A else: # self.bank = '8bit' reg = reg8A elif pin >= 8 and pin < 16: bit = pin - 8 if self.bank == "16bit": reg = reg16B else: # self.bank = '8bit' reg = reg8B return reg, bit
[docs] def interrupt_options(self, outputType: str = "activehigh", bankControl: str = "separate") -> None: """interrupt_options, function to set the options for the 2 interrupt pins""" if self.bank == "16bit": reg = 0x0A else: # self.bank = '8bit' reg = 0x05 if outputType == "activelow": odrBit = 0 intpolBit = 0 elif outputType == "opendrain": odrBit = 1 intpolBit = 0 else: # outputType = 'activehigh' odrBit = 0 intpolBit = 1 if bankControl == "both": mirrorBit = 1 else: # bankControl = 'separate' mirrorBit = 0 regValue = self.single_access_read(reg) regValue = regValue & 0b10111001 regValue = regValue | (mirrorBit << 6) + (odrBit << 2) + (intpolBit << 1) self.single_access_write(reg, regValue) return
[docs] def set_register_addressing(self, regScheme: str = "8bit") -> None: """set_register_addressing, function to change how the registers are mapped. For an MCP23008, bank should always equal '8bit'. This sets bit 7 of the IOCON register""" if self.bank == "16bit": reg = 0x0A else: # self.bank = '8bit' reg = 0x05 if regScheme == "16bit": bankBit = 0 self.bank = "16bit" else: # regScheme = '8bit' bankBit = 1 self.bank = "8bit" regValue = self.single_access_read(reg) regValue = regValue & 0b01111111 regValue = regValue | (bankBit << 7) self.single_access_write(reg, regValue) return
[docs] def pinMode(self, pin: int, mode: str, pullUp: str = "disable") -> None: """pinMode, function to set up a GPIO pin to either an input or output. The input pullup resistor can also be enabled. This sets the appropriate bits in the IODIRA/B and GPPUA/B registers""" # GPIO direction set up section reg, bit = self.register_bit_select(pin, reg8A=0x00, reg16A=0x00, reg8B=0x10, reg16B=0x01) regValue = self.single_access_read(reg) if mode == "output": mask = 0b11111111 & ~(1 << bit) regValue = regValue & mask self.single_access_write(reg, regValue) else: # mode = input mask = 0x00 | (1 << bit) regValue = regValue | mask self.single_access_write(reg, regValue) # pullUp enable/disable section if mode == "input": reg, bit = self.register_bit_select(pin, reg8A=0x06, reg16A=0x0C, reg8B=0x16, reg16B=0x0D) regValue = self.single_access_read(reg) if pullUp == "enable": mask = 0x00 | (1 << bit) regValue = regValue | mask self.single_access_write(reg, regValue) else: # pullUp = disable mask = 0b11111111 & ~(1 << bit) regValue = regValue & mask self.single_access_write(reg, regValue) return
[docs] def invert_input(self, pin: int, invert: bool = False) -> None: """invert_input, function to invert the output of the pins corresponding GPIO register bit. Sets bit in IPOLA/B""" # input invert on/off section reg, bit = self.register_bit_select(pin, reg8A=0x01, reg16A=0x02, reg8B=0x11, reg16B=0x03) regValue = self.single_access_read(reg) if invert: mask = 0x00 | (1 << bit) regValue = regValue | mask self.single_access_write(reg, regValue) else: # invert = False mask = 0b11111111 & ~(1 << bit) regValue = regValue & mask self.single_access_write(reg, regValue) return
[docs] def digitalWrite(self, pin: int, value: int) -> None: """digitalWrite, function to set the state of a GPIO output pin via the appropriate bit in the OLATA/B registers""" try: reg, bit = self.register_bit_select(pin, reg8A=0x0A, reg16A=0x14, reg8B=0x1A, reg16B=0x15) regValue = self.single_access_read(reg) if value == 1: # set output high mask = 0x00 | (1 << bit) regValue = regValue | mask else: # set output low mask = 0b11111111 & ~(1 << bit) regValue = regValue & mask self.single_access_write(reg, regValue) except Exception as e: context_logger.error_with_context("MCP230XX", f"digitalWrite: {e}") return
[docs] def input(self, pin: int) -> int: """input, function to get the current level of a GPIO input pin by reading the appropriate bit in the GPIOA/B registers""" reg, bit = self.register_bit_select(pin, reg8A=0x09, reg16A=0x12, reg8B=0x19, reg16B=0x13) regValue = self.single_access_read(reg) mask = 0x00 | (1 << bit) return (regValue & mask) >> bit
[docs] def input_at_interrupt(self, pin: int) -> int: """input_at_interrupt, function to get the current level of a GPIO input pin when an interrupt has occurred by reading the appropriate bit in the INTCAPA/B registers""" reg, bit = self.register_bit_select(pin, reg8A=0x08, reg16A=0x10, reg8B=0x18, reg16B=0x11) regValue = self.single_access_read(reg) mask = 0x00 | (1 << bit) return (regValue & mask) >> bit
[docs] def add_interrupt( self, pin: int, callbackFunctLow: str | Callable[[int], None] = "empty", callbackFunctHigh: str | Callable[[int], None] = "empty", ) -> None: """add_interrupt, function to set up the interrupt options for a specific GPIO including callback functions to be executed when an interrupt occurs""" # set bit in GPINTENA/B registers # set bit in GPINTENA/B registers reg, bit = self.register_bit_select(pin, reg8A=0x02, reg16A=0x04, reg8B=0x12, reg16B=0x05) regValue = self.single_access_read(reg) mask = 0x00 | (1 << bit) regValue = regValue | mask self.single_access_write(reg, regValue) # set bit in INTCONA/B registers reg, bit = self.register_bit_select(pin, reg8A=0x04, reg16A=0x08, reg8B=0x14, reg16B=0x09) regValue = self.single_access_read(reg) mask = 0b11111111 & ~(1 << bit) regValue = regValue & mask self.single_access_write(reg, regValue) # set bit in DEFVALA/B registers - not required # set call back functions in function list self.callBackFuncts[pin][0] = callbackFunctLow self.callBackFuncts[pin][1] = callbackFunctHigh return
[docs] def remove_interrupt(self, pin: int) -> None: """remove_interrupt, function to remove the interrupt settings from an MCP230xx pin""" # set bit in GPINTENA/B registers reg, bit = self.register_bit_select(pin, reg8A=0x02, reg16A=0x04, reg8B=0x12, reg16B=0x05) regValue = self.single_access_read(reg) mask = 0b11111111 & ~(1 << bit) regValue = regValue & mask self.single_access_write(reg, regValue) # reset call back functions in function list to 'empty' self.callBackFuncts[pin][0] = "empty" self.callBackFuncts[pin][1] = "empty" return
[docs] def callbackA(self, gpio: int) -> None: """function called by RPI.GPIO on an bank A interrupt condition. This function will figure out which MCP230xx pin caused the interrupt and initiate the appropriate callback function""" # read INTF register if self.bank == "16bit": reg = 0x0E else: # self.bank = '8bit' reg = 0x07 regValue = self.single_access_read(reg) pin = -1 for i in range(0, 8): if regValue == (1 << i): pin = i break value = self.input_at_interrupt(pin) if self.callBackFuncts[pin][value] != "empty": self.callBackFuncts[pin][value](pin) return
[docs] def callbackB(self, gpio: int) -> None: """function called by RPI.GPIO on an bank B interrupt condition. This function will figure out which MCP230xx pin caused the interrupt and initiate the appropriate callback function""" # read INTF register if self.bank == "16bit": reg = 0x0F else: # self.bank = '8bit' reg = 0x17 regValue = self.single_access_read(reg) pin = -1 for i in range(0, 8): if regValue == (1 << i): pin = i + 8 break value = self.input_at_interrupt(pin) if self.callBackFuncts[pin][value] != "empty": self.callBackFuncts[pin][value](pin) return
[docs] def callbackBoth(self, gpio: int) -> None: """function called by RPI.GPIO on either a bank A or bank B interrupt condition. This function will figure out which MCP230xx pin caused the interrupt and initiate the appropriate callback function""" # read INTF register if self.bank == "16bit": regA = 0x0E regB = 0x0F else: # self.bank = '8bit' regA = 0x07 regB = 0x17 regValue = self.single_access_read(regA) pin = -1 # check GPIOA bank for interrupt for i in range(0, 8): if regValue == (1 << i): pin = i break # check GPIOB bank for interrupt if none found in GPIOA bank if pin == -1: regValue = self.single_access_read(regB) for i in range(0, 8): if regValue == (1 << i): pin = i + 8 break value = self.input_at_interrupt(pin) if self.callBackFuncts[pin][value] != "empty": self.callBackFuncts[pin][value](pin) return
[docs] def register_reset(self) -> None: """register_reset, function to put chip back to default settings""" if self.chip == "MCP23008": self.single_access_write(0x00, 0xFF) for i in range(1, 12): self.single_access_write(i, 0x00) else: self.set_register_addressing("16bit") self.single_access_write(0x00, 0xFF) self.single_access_write(0x01, 0xFF) for i in range(2, 22): self.single_access_write(i, 0x00) return
if __name__ == "__main__": import argparse import time def parse_command_line_args(dummy: bool = False) -> "argparse.Namespace": """Parse command line arguments.""" parser = argparse.ArgumentParser(description=("Docker direct command to Reboot 3v3 & 5v")) parser.add_argument("--reboot3v3", default=0, help="Reboot 3v3") parser.add_argument("--reboot5v", default=0, help="Reboot 5v") parser.add_argument("--testBuzzLED", default=0, help="Test Buzzer & LED") parser.add_argument("--rebootLASAN", default=0, help="Reboot LASAN") parser.add_argument("--bootmodeLASAN", default=0, help="LASAN Boot Mode") parser.add_argument("--fan", default=0, help="Test FAN") parser.add_argument("--scani2c", default=0, help="Scan I2C Mux") return parser.parse_args() MCP = MCP230XX(devicenumber=6) args = parse_command_line_args(True) context_logger.info_with_context("MCP230XX", f"Arguments: {args}") if args.reboot3v3 == "1": context_logger.info_with_context("MCP230XX", "Rebooting 3.3V") MCP.power_3v3_rst() context_logger.info_with_context("MCP230XX", "Reboot completed") if args.reboot5v == "1": context_logger.info_with_context("MCP230XX", "Rebooting 5V") MCP.power_5v_rst() context_logger.info_with_context("MCP230XX", "Reboot completed") if args.testBuzzLED == "1": context_logger.info_with_context("MCP230XX", "Testing Buzzer & LED") MCP.pinMode(10, "output") MCP.pinMode(11, "output") MCP.digitalWrite(10, 1) MCP.digitalWrite(11, 1) time.sleep(5) MCP.digitalWrite(10, 0) MCP.digitalWrite(11, 0) context_logger.info_with_context("MCP230XX", "Buzzer & LED Test completed") if args.rebootLASAN == "1": context_logger.info_with_context("MCP230XX", "Rebooting LASAN") MCP.samd_rst() context_logger.info_with_context("MCP230XX", "Reboot completed") if args.bootmodeLASAN == "1": context_logger.info_with_context("MCP230XX", "LASAN Boot Mode") MCP.samd_boot_mode() context_logger.info_with_context("MCP230XX", "Boot Mode completed") if args.fan == "1": MCP.FAN_HIGH() context_logger.info_with_context("MCP230XX", "Fan powered ON") fanStatus = MCP.readFAN(3) context_logger.info_with_context("MCP230XX", f"Status: {fanStatus}") time.sleep(1) MCP.FAN_LOW() if args.scani2c == "1": context_logger.info_with_context("MCP230XX", "Scanning I2C Mux") MCP.scan_i2c_mux()