Source code for OzWrapper.OzSystem.OzSystem

"""System monitoring wrapper for CPU temperature, disk usage, and GSM modem status.

Provides access to host-level diagnostics (CPU thermal readings, storage metrics)
and cellular modem information (signal strength, SIM details) through the
GenericSensor interface.
"""

import json
import os
import subprocess
from typing import Literal

import psutil

from drivers.gpio import gpio
from drivers.MCP230XX import MCP230XX
from drivers.Quectel.Quectel import Modem as QuectelModem
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzSystem(GenericSensor): """Wrapper for system-level monitoring: CPU, storage, and GSM modem. Reads CPU temperature (enabling/disabling the cooling fan as needed), disk usage statistics, and cellular modem signal quality. Attributes: gsmModem: QuectelModem instance for cellular communication, or None. MCP: MCP230XX GPIO expander used for fan control, or None. CPU_TEMP_THRESHOLD: Temperature in Celsius above which the CPU fan activates. CPU_FAN: GPIO pin number for the CPU cooling fan. mcp_flag: Whether the MCP GPIO expander is available. modem_port: Serial port path for the Quectel modem. modem_baud: Baud rate for the modem serial connection. device_space_cmd: Shell command used to query disk space. configuration: Sensor configuration list from the Gateway. v_system: Accumulated system reading values per sensor/parameter. """ gsmModem = None MCP = None CPU_TEMP_THRESHOLD = 35 CPU_FAN = 6 mcp_flag = True modem_port = "/dev/ttyUSB2" modem_baud = 115200 device_space_cmd = "df ~ -hP"
[docs] def __init__(self) -> None: """Initialize OzSystem with empty configuration and value storage.""" super().__init__() self.configuration = {} self.v_system = []
[docs] def initialize(self, config: dict, init_value: dict, mcpFlag: bool) -> bool: """Initialize system sensors (CPU stats and/or GSM modem). Args: config: List of sensor configuration dicts from the Gateway. init_value: Dict to populate with initialization results. mcpFlag: Whether the MCP GPIO expander is present on this board. Returns: True if all configured sensors initialized successfully, False otherwise. """ self.configuration = config self.mcp_flag = mcpFlag if self.mcp_flag: self.MCP = MCP230XX(devicenumber=os.getenv("MCP_ID", 6)) context_logger.debug_with_context("System", f"System config: {self.configuration}") gpio.setup(self.CPU_FAN, gpio.OUT) gpio.set(self.CPU_FAN, 1) _success = False try: for sensor in self.configuration: if "pn" in sensor: sensor["init"] = self.initializeSensor(sensor) _success = True except Exception as e: context_logger.error_with_context("System", f"System init error: {e}") _success = False return _success
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialize a single system sensor (CPU stats or GSM modem). Args: sensor: Configuration dict containing part number, enable flag, and parameters. Returns: 1 if the sensor initialized successfully, 0 otherwise. """ success = False value = [] if sensor["pn"] == 10 and sensor["en"] == 1: parameters = sensor["parameters"] for param in parameters: val = 0.0 oldvalue = 0.0 try: val = self.getSystemStat(sensor["pn"], param["pm"]) context_logger.info_with_context("System", f"System stat: {val}") oldvalue = val success = True except Exception as e: context_logger.error_with_context("System", f"System stat error: {e}") success = False _val = {"value": val, "oldvalue": oldvalue, "count": 1} value.append(_val) if sensor["pn"] == 11 and sensor["en"] == 1: if "gpio" in sensor: gpioConfig = sensor["gpio"] if "port" in gpioConfig: self.modem_port = gpioConfig["port"] if "baud" in gpioConfig: self.modem_baud = gpioConfig["baud"] parameters = sensor["parameters"] for param in parameters: val = 0.0 oldvalue = 0.0 try: self.gsmModem = QuectelModem(modem_port=self.modem_port, modem_baud=self.modem_baud) val = self.getSystemStat(sensor["pn"], param["pm"]) context_logger.info_with_context("System", f"Modem stat: {val}") oldvalue = val success = True except Exception as e: context_logger.error_with_context("System", f"Modem stat error: {e}") success = False _val = {"value": val, "oldvalue": oldvalue, "count": 1} value.append(_val) self.v_system.append(value) return int(success)
[docs] def getSystemStat(self, partNo: int, pm: int) -> float | int | None: """Dispatch a system stat request to the appropriate handler. Args: partNo: Part number identifying the subsystem (10 = CPU/disk, 11 = modem). pm: Parameter ID within the subsystem. Returns: The requested metric value, or None if the part number is unrecognized. """ if partNo == 10: return self.getSystemInfo(pm) if partNo == 11: return self.getModemInfo(pm) return None
[docs] def getSystemInfo(self, pm: int) -> float | int | None: """Read a host system metric and manage the CPU cooling fan. Args: pm: Parameter ID (1 = CPU temp, 2 = disk used %, 3 = disk used GB, 4 = disk available GB). Returns: The requested metric value, or None if the parameter is unrecognized. """ if pm == 1: _cpu_temp = round(psutil.sensors_temperatures()["cpu_thermal"][0].current, 2) if _cpu_temp > self.CPU_TEMP_THRESHOLD: if self.mcp_flag: self.MCP.digitalWrite(self.MCP.RPI_FAN, 1) gpio.set(self.CPU_FAN, 1) else: if self.mcp_flag: self.MCP.digitalWrite(self.MCP.RPI_FAN, 0) gpio.set(self.CPU_FAN, 0) return _cpu_temp if pm == 2: output = self.execute_os_command(self.device_space_cmd) device, size, used, available, percent, mountpoint = output.split("\n")[1].split() return int(percent.replace("%", "")) if pm == 3: output = self.execute_os_command(self.device_space_cmd) device, size, used, available, percent, mountpoint = output.split("\n")[1].split() return float(used.replace("G", "")) if pm == 4: output = self.execute_os_command(self.device_space_cmd) device, size, used, available, percent, mountpoint = output.split("\n")[1].split() return float(available.replace("G", "")) return None
[docs] def getModemInfo(self, pm: int) -> int | None: """Read a GSM modem metric. Args: pm: Parameter ID (1 = signal strength). Returns: The modem metric value, or None if the parameter is unrecognized. """ if pm == 1: return self.gsmModem.signalstrength() return None
[docs] def getSensorReading(self) -> dict: """Collect a single round of system sensor readings. Returns: Dict mapping send-codes to their current values. """ data = {} for sensor in self.configuration: if False: for parameters in sensor["parameters"]: try: value = self.getSystemStat(sensor[self.partNumber], parameters[self.parameter]) data[parameters["sc"]] = value # self.v_system[X][Y]['value'] += value # self.v_system[X][Y]['count'] +=1 context_logger.info_with_context("System", f"{parameters[self.parameter]} {value}") except Exception as e: context_logger.error_with_context("System", f"getSensorReading error: {e}") return data
[docs] def putSensorValue(self, value: dict) -> dict: """Aggregate accumulated readings and merge them into the output payload. Args: value: Shared output dict to populate with averaged system metrics. Returns: The updated output dict with system metrics added. """ for sensor in self.configuration: if sensor["init"] == 1: for parameters in sensor["parameters"]: try: # self.v_system[X][Y]['value'] = self.v_system[X][Y]['value'] / int(self.v_system[X][Y]['count']) # value[parameters['sc']] = round(self.v_system[X][Y]['value'] ,2) value[parameters["sc"]] = self.getSystemStat( sensor[self.partNumber], parameters[self.parameter] ) # self.v_system[X][Y]['value'] = 0 # self.v_system[X][Y]['count'] = 0 except Exception as e: context_logger.error_with_context("System", f"Put sensor value error: {e}") return value
[docs] def getSimDetails(self) -> dict | Literal[0]: """Query the GSM modem for SIM card and network details. Returns: Dict with keys 'csq', 'cgreg', 'cereg', 'iccid', 'imei', 'module' on success, or 0 on failure. """ val = {} try: if self.gsmModem is None: self.gsmModem = QuectelModem(modem_port=self.modem_port, modem_baud=self.modem_baud) val.update({"csq": self.gsmModem.signalstrength()}) val.update({"cgreg": self.gsmModem.getGPRSNetwork()}) val.update({"cereg": self.gsmModem.getLTECheck()}) val.update({"iccid": self.gsmModem.getIccid()}) val.update({"imei": self.gsmModem.getIMEI()}) val.update({"module": self.gsmModem.getModuleInfo()}) context_logger.info_with_context("System", f"SIM details: {val}") return val except Exception as e: context_logger.error_with_context("System", f"Get SIM details error: {e}") return 0
[docs] def putInitvalues(self, value: json) -> json: """Add SIM initialization details to the init payload. Args: value: Shared init payload dict. Returns: The updated init payload with a 'sim' key containing SIM details. """ _sensor_init = self.getSimDetails() # for X, sensor in enumerate(self.configuration): # _sensor_init.append(sensor['init']) value.update({"sim": _sensor_init}) return value
[docs] def execute_os_command(self, command: str) -> str: """Execute a shell command and return its stdout output. Args: command: The shell command string to execute. Returns: Decoded stdout output of the command. """ p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) p.wait() result = p.communicate() return result[0].decode()
# Example code # python3 -m OzWrapper.OzSystem.OzSystem if __name__ == "__main__": import os dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, "system.config.json") with open(file_name) as config_file: config_content = config_file.read() context_logger.info_with_context("System", f"Config content: {config_content}") system_config = json.loads(config_content) context_logger.info_with_context("System", f"System config parsed: {system_config}") system = OzSystem() system.initialize(system_config["system"]) for _ in range(0, 4): system.getSensorReading() data = {} context_logger.info_with_context("System", "PutSensor value start")