"""Battery and fuel gauge monitoring wrapper.
Reads voltage, current, and state-of-charge from the MAX17261 fuel gauge IC
over I2C, providing battery health telemetry for solar-powered deployments.
"""
import json
import os
import time
from drivers.gpio import gpio
from drivers.MAX17261 import MAX17261
from SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzBattery(GenericSensor):
"""Wrapper for MAX17261 fuel gauge battery monitoring.
Reads instantaneous voltage, current, and state-of-charge from the
MAX17261 IC and accumulates time-series values for averaging.
Attributes:
configuration: Sensor configuration list from the Gateway.
v_battery: Nested list of per-sensor, per-parameter accumulated values.
battery: MAX17261 driver instance, or None before initialization.
debug: Whether debug logging is enabled for this sensor.
res: Sense resistor value in ohms for current measurement.
resFlag: Whether a custom sense resistor value was provided.
"""
[docs]
def __init__(self) -> None:
"""Initialize OzBattery with default configuration and empty accumulators."""
super().__init__()
self.configuration: dict = {}
self.v_battery: list = []
self.battery = None
self.debug: bool = False
self.res: float = 0.02
self.resFlag = False
[docs]
def initialize(self, config: json, init_value: dict) -> bool:
"""Initialize the battery fuel gauge sensor from Gateway configuration.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Dict to populate with initialization status flags.
Returns:
True if at least one battery sensor initialized successfully.
"""
self.configuration = config
context_logger.info_with_context("battery", "Initialize Battery sensor")
_success = False
for sensor in self.configuration:
try:
if self.partNumber in sensor:
sensor["init"] = self.initializeSensor(sensor)
_success = True # send confirmation to main file
except Exception as e:
context_logger.error_with_context(
"Battery",
f"Initialize : {e}",
)
sensor["init"] = 0
self.putInitvalues(init_value)
return _success
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialize a single MAX17261 fuel gauge and take initial readings.
Args:
sensor: Configuration dict with part number, capacity, and parameters.
Returns:
1 on success, -1 on failure.
"""
value = []
_success = -1
gpio.select_I2C(sensor[self.partNumber])
if sensor[self.partNumber] == self.baseConfig["pn_max"] and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
if "cap" in sensor:
self.battery = MAX17261(batteryCapacity=sensor["cap"])
else:
self.battery = MAX17261()
if "debug" in sensor:
self.debug = True if sensor["debug"] == 1 else False
if "res" in sensor:
self.res = sensor["res"]
self.resFlag = True
time.sleep(0.1)
self.battery.setValues(self.debug, self.res, self.resFlag)
time.sleep(0.1)
parameters = sensor["parameters"]
for param in parameters:
if (
param[self.parameter] == self.baseConfig["volt"]
or param[self.parameter] == self.baseConfig["current"]
or param[self.parameter] == self.baseConfig["charge"]
):
val = self.getBattery(sensor[self.partNumber], param[self.parameter])
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = 1
self.v_battery.append(value)
return _success
[docs]
def getBattery(self, partNo: int, pm: int) -> float | int:
"""Read a single battery metric from the MAX17261 fuel gauge.
Args:
partNo: Part number identifying the fuel gauge model.
pm: Parameter ID (1 = voltage, 2 = current, 3 = state-of-charge).
Returns:
The battery metric value, or 0.0 if the part number is unsupported.
"""
gpio.select_I2C(partNo)
if partNo == self.baseConfig["pn_max"]:
if pm == 1: # voltage : 1
volt = round(self.battery.getInstantaneousVoltage() * 4, 2)
context_logger.info_with_context(
"Battery",
f"Voltage: {volt}",
)
return volt
if pm == 2: # Current: 2
current = round(self.battery.getInstantaneousCurrent(), 2)
context_logger.info_with_context(
"Battery",
f"Current: {current}",
)
return current
if pm == 3: # Soc : 3
current = round(self.battery.getInstantaneousCurrent(), 2)
voltage = round(self.battery.getInstantaneousVoltage() * 4, 2)
if (current > -20 and current < 20) or (voltage > 14.2):
charge = 100
else:
charge = round(self.battery.getSOC(), 2)
context_logger.info_with_context(
"Battery",
f"Charge: {charge}",
)
return charge
return 0.0
[docs]
def getSensorReading(self) -> dict:
"""Read current battery values and accumulate them for averaging.
Returns:
Dict mapping send-codes to their latest calibrated values.
"""
data = {}
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
try:
for Y, parameters in enumerate(sensor["parameters"]):
value = self.getBattery(sensor[self.partNumber], parameters[self.parameter])
if value is not None:
value = round(
((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)),
2,
)
self.v_battery[X][Y]["value"].append(value)
self.v_battery[X][Y]["count"] += 1
data[parameters["sc"]] = value
else:
context_logger.warning_with_context(
"Battery",
f"getSensorReading: {sensor[self.partNumber]} and {[self.parameter]} has None Value",
)
except Exception as e:
context_logger.error_with_context(
"Battery",
f"getSensorReading failed at [{X}][{Y}] - {parameters['sc']}: {e}",
)
return data
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Transfer accumulated battery readings into the output payload and reset.
Args:
value: Shared output dict to populate with battery time-series data.
Returns:
The updated output dict with battery data added.
"""
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
for Y, parameters in enumerate(sensor["parameters"]):
try:
value[parameters["sc"]] = self.v_battery[X][Y]["value"]
self.v_battery[X][Y]["value"] = []
self.v_battery[X][Y]["count"] = 0
except Exception as e:
context_logger.error_with_context(
"Battery",
f"putSensorValue failed at [{X}][{Y}] - {parameters['sc']}: {e}",
)
return value
[docs]
def putInitvalues(self, value: dict) -> dict:
"""Add battery sensor initialization flags to the init payload.
Args:
value: Shared init payload dict.
Returns:
The updated init payload with a 'battery' key listing init statuses.
"""
_sensor_init = []
for sensor in self.configuration:
_sensor_init.append(sensor["init"])
value.update({"battery": _sensor_init})
return value
# Example code
# python3 -m OzWrapper.OzBattery.OzBattery
if __name__ == "__main__":
import os
# TODO: change string here when you change sensor
sensor_string = "battery"
file_string = "battery.config.json"
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, file_string)
with open(file_name) as configFile:
sensor = configFile.read()
sensorConfig = json.loads(sensor)
context_logger.info_with_context("battery", f"Sensor Config: {sensorConfig}")
# TODO: change object here when you change sensor
sensor = OzBattery()
sensor.initialize(sensorConfig[sensor_string], init_value={})
for _ in range(0, 4):
sensor.getSensorReading()
data = {}
context_logger.info_with_context("battery", f"Sensor Values: {sensor.putSensorValue(data)}")