"""Driver for MCP23008 and MCP23017 I2C GPIO expander ICs.
Provides digital I/O, interrupt management, fan control, power rail resets,
UART mux selection, and SAMD co-processor reset via the MCP230XX registers.
Supports both 8-bit (MCP23008) and 16-bit (MCP23017) register addressing
schemes.
Typical usage::
mcp = MCP230XX(chip="MCP23017", i2cAddress=0x21, devicenumber=1)
mcp.resetDefault()
mcp.FAN_HIGH()
"""
import time
from collections.abc import Callable
import smbus2
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
# import subprocess
[docs]
class MCP230XX:
"""I2C GPIO expander driver for MCP23008/MCP23017.
Attributes:
i2cAddress: 7-bit I2C address of the expander.
bus: ``smbus2.SMBus`` instance for I2C communication.
chip: Chip variant string (``"MCP23008"`` or ``"MCP23017"``).
bank: Register addressing scheme (``"8bit"`` or ``"16bit"``).
callBackFuncts: Per-pin interrupt callback function list.
"""
FAN = 2
SAMD_RST = 0
B4SW = 3
PIN_A = 6 # Read FAN status
PIN_B = 5 # Read FAN status
FAN_STATUS = 4
RPI_FAN = 1
POWER_3V3 = 9
POWER_5V = 8
A_UART = 3
B_UART = 7
bus = None
sensor_bus = None
[docs]
def __init__(
self,
chip: str = "MCP23017",
i2cAddress: int = 0x21,
devicenumber: int = 1,
regScheme: str = "16bit",
) -> None:
"""Initialize the MCP230XX expander over I2C.
Args:
chip: IC variant -- ``"MCP23008"`` or ``"MCP23017"``.
i2cAddress: 7-bit I2C address (default ``0x21``).
devicenumber: Linux I2C bus number.
regScheme: Register addressing (``"8bit"`` or ``"16bit"``).
"""
try:
self.i2cAddress = i2cAddress
self.bus = smbus2.SMBus(devicenumber)
self.chip = chip
if self.chip == "MCP23008":
self.bank = "8bit"
else:
self.bank = regScheme
self.callBackFuncts = []
for _ in range(0, 17):
self.callBackFuncts.append(["empty", "empty"])
except Exception as e:
context_logger.error_with_context("MCP230XX", f"Failed to initialize MCP230XX: {e}")
[docs]
def resetDefault(self) -> None:
"""Configure all application-specific pins to their default directions and levels."""
try:
self.pinMode(self.FAN, "output")
self.pinMode(self.SAMD_RST, "output")
self.pinMode(self.PIN_A, "input", "enable")
self.pinMode(self.PIN_B, "input", "enable")
self.pinMode(self.FAN_STATUS, "output")
self.pinMode(self.RPI_FAN, "output")
self.pinMode(self.POWER_3V3, "output")
self.pinMode(self.POWER_5V, "output")
self.pinMode(self.A_UART, "output")
self.pinMode(self.B_UART, "output")
self.digitalWrite(self.SAMD_RST, 0)
self.digitalWrite(self.POWER_5V, 1)
self.digitalWrite(self.POWER_3V3, 1)
self.digitalWrite(self.A_UART, 0)
self.digitalWrite(self.B_UART, 0)
self.digitalWrite(self.FAN_STATUS, 1)
except Exception as e:
context_logger.error_with_context("MCP230XX", f"Failed to reset MCP230XX to default: {e}")
[docs]
def FAN_HIGH(self) -> None:
"""Drive the external fan power pin HIGH (on)."""
self.digitalWrite(self.FAN, 1)
[docs]
def FAN_LOW(self) -> None:
"""Drive the external fan power pin LOW (off)."""
self.digitalWrite(self.FAN, 0)
[docs]
def samd_rst(self) -> None:
"""Pulse the SAMD co-processor reset line (5-second hold)."""
self.digitalWrite(self.SAMD_RST, 1)
time.sleep(5)
self.digitalWrite(self.SAMD_RST, 0)
[docs]
def samd_boot_mode(self) -> None:
"""Double-tap the SAMD reset line to enter bootloader mode."""
self.digitalWrite(self.SAMD_RST, 1)
time.sleep(0.1)
self.digitalWrite(self.SAMD_RST, 0)
time.sleep(0.1)
self.digitalWrite(self.SAMD_RST, 1)
time.sleep(0.1)
self.digitalWrite(self.SAMD_RST, 0)
[docs]
def power_3v3_rst(self) -> None:
"""Cycle the 3.3 V power rail off for 5 seconds, then back on."""
self.digitalWrite(self.POWER_3V3, 0)
time.sleep(5)
self.digitalWrite(self.POWER_3V3, 1)
[docs]
def power_5v_rst(self) -> None:
"""Cycle the 5 V power rail off for 5 seconds, then back on."""
self.digitalWrite(self.POWER_5V, 0)
time.sleep(5)
self.digitalWrite(self.POWER_5V, 1)
[docs]
def select_lora(self) -> None:
"""Set the UART mux to route to the LoRa module."""
self.digitalWrite(self.A_UART, 1)
self.digitalWrite(self.B_UART, 0)
[docs]
def select_PM100(self) -> None:
"""Set the UART mux to route to the PM100 dust sensor."""
self.digitalWrite(self.A_UART, 0)
self.digitalWrite(self.B_UART, 0)
time.sleep(0.1)
[docs]
def select_UART_1(self) -> None:
"""Set the UART mux to route to auxiliary UART channel 1."""
self.digitalWrite(self.A_UART, 0)
self.digitalWrite(self.B_UART, 1)
[docs]
def select_UART_UV(self) -> None:
"""Set the UART mux to route to the UV sensor UART channel."""
self.digitalWrite(self.A_UART, 1)
self.digitalWrite(self.B_UART, 1)
# def select_I2C(self, pn = 24):
# print(f"[MCP] Selecting I2C {pn}")
# if pn in [0, 23, 24, 53, 54, 1000]:
# self.digitalWrite(self.A_UART, 0)
# self.digitalWrite(self.B_UART, 0)
# elif pn in [1, 32, 33, 34, 35, 36]:
# self.digitalWrite(self.A_UART, 1)
# self.digitalWrite(self.B_UART, 0)
# elif pn in [2, 25, 26]:
# self.digitalWrite(self.A_UART, 0)
# self.digitalWrite(self.B_UART, 1)
# elif pn in [3]:
# self.digitalWrite(self.A_UART, 1)
# self.digitalWrite(self.B_UART, 1)
# time.sleep(0.1)
# def detect_i2c_addresses(self, bus_number=0):
# try:
# output = subprocess.check_output(['i2cdetect', '-y', str(bus_number)], universal_newlines=True)
# addresses = []
# lines = output.split('\n')[1:] # Skip the header row
# for line in lines:
# parts = line.split()
# if parts:
# for part in parts[1:]: # Skip the first column which is the row header
# if part != '--':
# addresses.append(int(part, 16))
# return addresses
# except subprocess.CalledProcessError as e:
# print(f"An error occurred while detecting I2C addresses: {e}")
# return []
# def scan_i2c_mux(self):
# all_addresses = {}
# for channel in range(4):
# self.select_I2C(channel)
# addresses = self.detect_i2c_addresses()
# all_addresses[channel] = addresses
# for channel, addresses in all_addresses.items():
# print("------------")
# print(f"Channel {channel}:")
# sensors = [self.sensorList[str(hex(num)) + '_' + str(channel)] for num in addresses]
# for sensor in sensors:
# print(sensor)
# print("------------")
[docs]
def readFAN(self, fan_delay: int) -> bool:
"""Sample the fan tachometer pins for *fan_delay* seconds and evaluate health.
Args:
fan_delay: Duration in seconds to sample the two fan status pins.
Returns:
True if the fan is spinning (signal is toggling), False if stuck.
"""
status1, status2 = [], []
time_prev = int(time.time())
while fan_delay > (int(time.time()) - time_prev):
status1.append(self.input(self.PIN_A))
status2.append(self.input(self.PIN_B))
err1 = sum(status1) / len(status1)
err2 = sum(status2) / len(status2)
context_logger.info_with_context("FAN", f"Errors {err1}, {err2}")
if err1 == 0 or err1 == 1 or err2 == 0 or err2 == 1:
self.digitalWrite(self.FAN_STATUS, 0)
return False
self.digitalWrite(self.FAN_STATUS, 1)
return True
# def UART3_3(self):
# self.digitalWrite(self.A_UART, 1)
# self.digitalWrite(self.B_UART, 1)
[docs]
def single_access_read(self, reg: int = 0x00) -> int:
"""single_access_read, function to read a single data register
of the MCP230xx gpio expander"""
dataTransfer = 0
try:
dataTransfer = self.bus.read_byte_data(self.i2cAddress, reg)
except Exception as e:
context_logger.error_with_context("MCP230XX", f"single_access_read: {e}")
return dataTransfer
[docs]
def single_access_write(self, reg: int = 0x00, regValue: int = 0x0) -> None:
"""single_access_write, function to write to a single data register
of the MCP230xx gpio expander"""
try:
self.bus.write_byte_data(self.i2cAddress, reg, regValue)
except Exception as e:
context_logger.error_with_context("MCP230XX", f"single_access_write: {e}")
return
[docs]
def register_bit_select(self, pin: int, reg8A: int, reg16A: int, reg8B: int, reg16B: int) -> tuple[int, int]:
"""register_bit_select, function to return the proper
register and bit position to use for a particular GPIO
and GPIO function"""
# need to add way of throwing an error if pin is outside
# of 0-15 range
if pin >= 0 and pin < 8:
bit = pin
if self.bank == "16bit":
reg = reg16A
else: # self.bank = '8bit'
reg = reg8A
elif pin >= 8 and pin < 16:
bit = pin - 8
if self.bank == "16bit":
reg = reg16B
else: # self.bank = '8bit'
reg = reg8B
return reg, bit
[docs]
def interrupt_options(self, outputType: str = "activehigh", bankControl: str = "separate") -> None:
"""interrupt_options, function to set the options for the 2 interrupt
pins"""
if self.bank == "16bit":
reg = 0x0A
else: # self.bank = '8bit'
reg = 0x05
if outputType == "activelow":
odrBit = 0
intpolBit = 0
elif outputType == "opendrain":
odrBit = 1
intpolBit = 0
else: # outputType = 'activehigh'
odrBit = 0
intpolBit = 1
if bankControl == "both":
mirrorBit = 1
else: # bankControl = 'separate'
mirrorBit = 0
regValue = self.single_access_read(reg)
regValue = regValue & 0b10111001
regValue = regValue | (mirrorBit << 6) + (odrBit << 2) + (intpolBit << 1)
self.single_access_write(reg, regValue)
return
[docs]
def set_register_addressing(self, regScheme: str = "8bit") -> None:
"""set_register_addressing, function to change how the registers
are mapped. For an MCP23008, bank should always equal '8bit'. This
sets bit 7 of the IOCON register"""
if self.bank == "16bit":
reg = 0x0A
else: # self.bank = '8bit'
reg = 0x05
if regScheme == "16bit":
bankBit = 0
self.bank = "16bit"
else: # regScheme = '8bit'
bankBit = 1
self.bank = "8bit"
regValue = self.single_access_read(reg)
regValue = regValue & 0b01111111
regValue = regValue | (bankBit << 7)
self.single_access_write(reg, regValue)
return
[docs]
def pinMode(self, pin: int, mode: str, pullUp: str = "disable") -> None:
"""pinMode, function to set up a GPIO pin to either an input
or output. The input pullup resistor can also be enabled.
This sets the appropriate bits in the IODIRA/B and GPPUA/B
registers"""
# GPIO direction set up section
reg, bit = self.register_bit_select(pin, reg8A=0x00, reg16A=0x00, reg8B=0x10, reg16B=0x01)
regValue = self.single_access_read(reg)
if mode == "output":
mask = 0b11111111 & ~(1 << bit)
regValue = regValue & mask
self.single_access_write(reg, regValue)
else: # mode = input
mask = 0x00 | (1 << bit)
regValue = regValue | mask
self.single_access_write(reg, regValue)
# pullUp enable/disable section
if mode == "input":
reg, bit = self.register_bit_select(pin, reg8A=0x06, reg16A=0x0C, reg8B=0x16, reg16B=0x0D)
regValue = self.single_access_read(reg)
if pullUp == "enable":
mask = 0x00 | (1 << bit)
regValue = regValue | mask
self.single_access_write(reg, regValue)
else: # pullUp = disable
mask = 0b11111111 & ~(1 << bit)
regValue = regValue & mask
self.single_access_write(reg, regValue)
return
[docs]
def digitalWrite(self, pin: int, value: int) -> None:
"""digitalWrite, function to set the state of a GPIO output
pin via the appropriate bit in the OLATA/B registers"""
try:
reg, bit = self.register_bit_select(pin, reg8A=0x0A, reg16A=0x14, reg8B=0x1A, reg16B=0x15)
regValue = self.single_access_read(reg)
if value == 1:
# set output high
mask = 0x00 | (1 << bit)
regValue = regValue | mask
else:
# set output low
mask = 0b11111111 & ~(1 << bit)
regValue = regValue & mask
self.single_access_write(reg, regValue)
except Exception as e:
context_logger.error_with_context("MCP230XX", f"digitalWrite: {e}")
return
[docs]
def add_interrupt(
self,
pin: int,
callbackFunctLow: str | Callable[[int], None] = "empty",
callbackFunctHigh: str | Callable[[int], None] = "empty",
) -> None:
"""add_interrupt, function to set up the interrupt options
for a specific GPIO including callback functions to be executed
when an interrupt occurs"""
# set bit in GPINTENA/B registers
# set bit in GPINTENA/B registers
reg, bit = self.register_bit_select(pin, reg8A=0x02, reg16A=0x04, reg8B=0x12, reg16B=0x05)
regValue = self.single_access_read(reg)
mask = 0x00 | (1 << bit)
regValue = regValue | mask
self.single_access_write(reg, regValue)
# set bit in INTCONA/B registers
reg, bit = self.register_bit_select(pin, reg8A=0x04, reg16A=0x08, reg8B=0x14, reg16B=0x09)
regValue = self.single_access_read(reg)
mask = 0b11111111 & ~(1 << bit)
regValue = regValue & mask
self.single_access_write(reg, regValue)
# set bit in DEFVALA/B registers - not required
# set call back functions in function list
self.callBackFuncts[pin][0] = callbackFunctLow
self.callBackFuncts[pin][1] = callbackFunctHigh
return
[docs]
def remove_interrupt(self, pin: int) -> None:
"""remove_interrupt, function to remove the interrupt settings
from an MCP230xx pin"""
# set bit in GPINTENA/B registers
reg, bit = self.register_bit_select(pin, reg8A=0x02, reg16A=0x04, reg8B=0x12, reg16B=0x05)
regValue = self.single_access_read(reg)
mask = 0b11111111 & ~(1 << bit)
regValue = regValue & mask
self.single_access_write(reg, regValue)
# reset call back functions in function list to 'empty'
self.callBackFuncts[pin][0] = "empty"
self.callBackFuncts[pin][1] = "empty"
return
[docs]
def callbackA(self, gpio: int) -> None:
"""function called by RPI.GPIO on an bank A interrupt condition.
This function will figure out which MCP230xx pin caused the
interrupt and initiate the appropriate callback function"""
# read INTF register
if self.bank == "16bit":
reg = 0x0E
else: # self.bank = '8bit'
reg = 0x07
regValue = self.single_access_read(reg)
pin = -1
for i in range(0, 8):
if regValue == (1 << i):
pin = i
break
value = self.input_at_interrupt(pin)
if self.callBackFuncts[pin][value] != "empty":
self.callBackFuncts[pin][value](pin)
return
[docs]
def callbackB(self, gpio: int) -> None:
"""function called by RPI.GPIO on an bank B interrupt condition.
This function will figure out which MCP230xx pin caused the
interrupt and initiate the appropriate callback function"""
# read INTF register
if self.bank == "16bit":
reg = 0x0F
else: # self.bank = '8bit'
reg = 0x17
regValue = self.single_access_read(reg)
pin = -1
for i in range(0, 8):
if regValue == (1 << i):
pin = i + 8
break
value = self.input_at_interrupt(pin)
if self.callBackFuncts[pin][value] != "empty":
self.callBackFuncts[pin][value](pin)
return
[docs]
def callbackBoth(self, gpio: int) -> None:
"""function called by RPI.GPIO on either a bank A or bank B
interrupt condition. This function will figure out which MCP230xx
pin caused the interrupt and initiate the appropriate callback function"""
# read INTF register
if self.bank == "16bit":
regA = 0x0E
regB = 0x0F
else: # self.bank = '8bit'
regA = 0x07
regB = 0x17
regValue = self.single_access_read(regA)
pin = -1
# check GPIOA bank for interrupt
for i in range(0, 8):
if regValue == (1 << i):
pin = i
break
# check GPIOB bank for interrupt if none found in GPIOA bank
if pin == -1:
regValue = self.single_access_read(regB)
for i in range(0, 8):
if regValue == (1 << i):
pin = i + 8
break
value = self.input_at_interrupt(pin)
if self.callBackFuncts[pin][value] != "empty":
self.callBackFuncts[pin][value](pin)
return
[docs]
def register_reset(self) -> None:
"""register_reset, function to put chip back to default
settings"""
if self.chip == "MCP23008":
self.single_access_write(0x00, 0xFF)
for i in range(1, 12):
self.single_access_write(i, 0x00)
else:
self.set_register_addressing("16bit")
self.single_access_write(0x00, 0xFF)
self.single_access_write(0x01, 0xFF)
for i in range(2, 22):
self.single_access_write(i, 0x00)
return
if __name__ == "__main__":
import argparse
import time
def parse_command_line_args(dummy: bool = False) -> "argparse.Namespace":
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description=("Docker direct command to Reboot 3v3 & 5v"))
parser.add_argument("--reboot3v3", default=0, help="Reboot 3v3")
parser.add_argument("--reboot5v", default=0, help="Reboot 5v")
parser.add_argument("--testBuzzLED", default=0, help="Test Buzzer & LED")
parser.add_argument("--rebootLASAN", default=0, help="Reboot LASAN")
parser.add_argument("--bootmodeLASAN", default=0, help="LASAN Boot Mode")
parser.add_argument("--fan", default=0, help="Test FAN")
parser.add_argument("--scani2c", default=0, help="Scan I2C Mux")
return parser.parse_args()
MCP = MCP230XX(devicenumber=6)
args = parse_command_line_args(True)
context_logger.info_with_context("MCP230XX", f"Arguments: {args}")
if args.reboot3v3 == "1":
context_logger.info_with_context("MCP230XX", "Rebooting 3.3V")
MCP.power_3v3_rst()
context_logger.info_with_context("MCP230XX", "Reboot completed")
if args.reboot5v == "1":
context_logger.info_with_context("MCP230XX", "Rebooting 5V")
MCP.power_5v_rst()
context_logger.info_with_context("MCP230XX", "Reboot completed")
if args.testBuzzLED == "1":
context_logger.info_with_context("MCP230XX", "Testing Buzzer & LED")
MCP.pinMode(10, "output")
MCP.pinMode(11, "output")
MCP.digitalWrite(10, 1)
MCP.digitalWrite(11, 1)
time.sleep(5)
MCP.digitalWrite(10, 0)
MCP.digitalWrite(11, 0)
context_logger.info_with_context("MCP230XX", "Buzzer & LED Test completed")
if args.rebootLASAN == "1":
context_logger.info_with_context("MCP230XX", "Rebooting LASAN")
MCP.samd_rst()
context_logger.info_with_context("MCP230XX", "Reboot completed")
if args.bootmodeLASAN == "1":
context_logger.info_with_context("MCP230XX", "LASAN Boot Mode")
MCP.samd_boot_mode()
context_logger.info_with_context("MCP230XX", "Boot Mode completed")
if args.fan == "1":
MCP.FAN_HIGH()
context_logger.info_with_context("MCP230XX", "Fan powered ON")
fanStatus = MCP.readFAN(3)
context_logger.info_with_context("MCP230XX", f"Status: {fanStatus}")
time.sleep(1)
MCP.FAN_LOW()
if args.scani2c == "1":
context_logger.info_with_context("MCP230XX", "Scanning I2C Mux")
MCP.scan_i2c_mux()