Source code for OzWrapper.OzBattery.OzBattery

"""Battery and fuel gauge monitoring wrapper.

Reads voltage, current, and state-of-charge from the MAX17261 fuel gauge IC
over I2C, providing battery health telemetry for solar-powered deployments.
"""

import json
import os
import time

from drivers.gpio import gpio
from drivers.MAX17261 import MAX17261
from SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzBattery(GenericSensor): """Wrapper for MAX17261 fuel gauge battery monitoring. Reads instantaneous voltage, current, and state-of-charge from the MAX17261 IC and accumulates time-series values for averaging. Attributes: configuration: Sensor configuration list from the Gateway. v_battery: Nested list of per-sensor, per-parameter accumulated values. battery: MAX17261 driver instance, or None before initialization. debug: Whether debug logging is enabled for this sensor. res: Sense resistor value in ohms for current measurement. resFlag: Whether a custom sense resistor value was provided. """
[docs] def __init__(self) -> None: """Initialize OzBattery with default configuration and empty accumulators.""" super().__init__() self.configuration: dict = {} self.v_battery: list = [] self.battery = None self.debug: bool = False self.res: float = 0.02 self.resFlag = False
[docs] def initialize(self, config: json, init_value: dict) -> bool: """Initialize the battery fuel gauge sensor from Gateway configuration. Args: config: List of sensor configuration dicts from the Gateway. init_value: Dict to populate with initialization status flags. Returns: True if at least one battery sensor initialized successfully. """ self.configuration = config context_logger.info_with_context("battery", "Initialize Battery sensor") _success = False for sensor in self.configuration: try: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) _success = True # send confirmation to main file except Exception as e: context_logger.error_with_context( "Battery", f"Initialize : {e}", ) sensor["init"] = 0 self.putInitvalues(init_value) return _success
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialize a single MAX17261 fuel gauge and take initial readings. Args: sensor: Configuration dict with part number, capacity, and parameters. Returns: 1 on success, -1 on failure. """ value = [] _success = -1 gpio.select_I2C(sensor[self.partNumber]) if sensor[self.partNumber] == self.baseConfig["pn_max"] and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration if "cap" in sensor: self.battery = MAX17261(batteryCapacity=sensor["cap"]) else: self.battery = MAX17261() if "debug" in sensor: self.debug = True if sensor["debug"] == 1 else False if "res" in sensor: self.res = sensor["res"] self.resFlag = True time.sleep(0.1) self.battery.setValues(self.debug, self.res, self.resFlag) time.sleep(0.1) parameters = sensor["parameters"] for param in parameters: if ( param[self.parameter] == self.baseConfig["volt"] or param[self.parameter] == self.baseConfig["current"] or param[self.parameter] == self.baseConfig["charge"] ): val = self.getBattery(sensor[self.partNumber], param[self.parameter]) oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = 1 self.v_battery.append(value) return _success
[docs] def getBattery(self, partNo: int, pm: int) -> float | int: """Read a single battery metric from the MAX17261 fuel gauge. Args: partNo: Part number identifying the fuel gauge model. pm: Parameter ID (1 = voltage, 2 = current, 3 = state-of-charge). Returns: The battery metric value, or 0.0 if the part number is unsupported. """ gpio.select_I2C(partNo) if partNo == self.baseConfig["pn_max"]: if pm == 1: # voltage : 1 volt = round(self.battery.getInstantaneousVoltage() * 4, 2) context_logger.info_with_context( "Battery", f"Voltage: {volt}", ) return volt if pm == 2: # Current: 2 current = round(self.battery.getInstantaneousCurrent(), 2) context_logger.info_with_context( "Battery", f"Current: {current}", ) return current if pm == 3: # Soc : 3 current = round(self.battery.getInstantaneousCurrent(), 2) voltage = round(self.battery.getInstantaneousVoltage() * 4, 2) if (current > -20 and current < 20) or (voltage > 14.2): charge = 100 else: charge = round(self.battery.getSOC(), 2) context_logger.info_with_context( "Battery", f"Charge: {charge}", ) return charge return 0.0
[docs] def getSensorReading(self) -> dict: """Read current battery values and accumulate them for averaging. Returns: Dict mapping send-codes to their latest calibrated values. """ data = {} for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: try: for Y, parameters in enumerate(sensor["parameters"]): value = self.getBattery(sensor[self.partNumber], parameters[self.parameter]) if value is not None: value = round( ((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)), 2, ) self.v_battery[X][Y]["value"].append(value) self.v_battery[X][Y]["count"] += 1 data[parameters["sc"]] = value else: context_logger.warning_with_context( "Battery", f"getSensorReading: {sensor[self.partNumber]} and {[self.parameter]} has None Value", ) except Exception as e: context_logger.error_with_context( "Battery", f"getSensorReading failed at [{X}][{Y}] - {parameters['sc']}: {e}", ) return data
[docs] def putSensorValue(self, value: dict) -> dict: """Transfer accumulated battery readings into the output payload and reset. Args: value: Shared output dict to populate with battery time-series data. Returns: The updated output dict with battery data added. """ for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): try: value[parameters["sc"]] = self.v_battery[X][Y]["value"] self.v_battery[X][Y]["value"] = [] self.v_battery[X][Y]["count"] = 0 except Exception as e: context_logger.error_with_context( "Battery", f"putSensorValue failed at [{X}][{Y}] - {parameters['sc']}: {e}", ) return value
[docs] def putInitvalues(self, value: dict) -> dict: """Add battery sensor initialization flags to the init payload. Args: value: Shared init payload dict. Returns: The updated init payload with a 'battery' key listing init statuses. """ _sensor_init = [] for sensor in self.configuration: _sensor_init.append(sensor["init"]) value.update({"battery": _sensor_init}) return value
# Example code # python3 -m OzWrapper.OzBattery.OzBattery if __name__ == "__main__": import os # TODO: change string here when you change sensor sensor_string = "battery" file_string = "battery.config.json" dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, file_string) with open(file_name) as configFile: sensor = configFile.read() sensorConfig = json.loads(sensor) context_logger.info_with_context("battery", f"Sensor Config: {sensorConfig}") # TODO: change object here when you change sensor sensor = OzBattery() sensor.initialize(sensorConfig[sensor_string], init_value={}) for _ in range(0, 4): sensor.getSensorReading() data = {} context_logger.info_with_context("battery", f"Sensor Values: {sensor.putSensorValue(data)}")