Source code for utils.reboot_RPi

"""Raspberry Pi reboot manager with cooldown protection.

Provides controlled device rebooting with a configurable cooldown period
(default 4 hours) to prevent reboot loops. Each reboot event is logged to
a persistent file so the cooldown survives process restarts.

The reboot is triggered via the Linux SysRq mechanism
(``/proc/sysrq-trigger``), which performs an immediate hardware reboot
without a clean shutdown. This is used as a last-resort recovery mechanism
when the device enters an unrecoverable error state.

Typical usage::

    from utils.reboot_RPi import RebootManager

    manager = RebootManager()
    manager.reboot_raspberry_pi({
        "type": "events",
        "t": 1700000000000,
        "msg": {"type": "watchdog", "value": "sensor_timeout"},
    })

Warning:
    The SysRq reboot is immediate and ungraceful. Filesystem corruption is
    possible if writes are in progress. The 5-second delay before triggering
    allows pending I/O to flush.
"""

import os
import time

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class RebootManager: """Manages Raspberry Pi reboots with a cooldown period to prevent loops. Tracks reboot history in a persistent log file and enforces a minimum interval between reboots. This prevents cascading reboot loops when a persistent hardware or configuration error triggers repeated reboot requests. Attributes: LOG_FILE: Path to the persistent reboot log file. COOLDOWN_SECONDS: Minimum interval between reboots in seconds (default: 4 hours). """ LOG_FILE = "reboot_log.txt" COOLDOWN_SECONDS = 4 * 60 * 60 # 4 hours
[docs] def read_last_reboot(self) -> int | None: """Read the timestamp of the most recent reboot from the log file. Parses the last line of :attr:`LOG_FILE` looking for a ``t=<epoch_ms>`` field. Returns ``None`` if the file does not exist, is empty, or cannot be parsed. Returns: Epoch timestamp in milliseconds of the last reboot, or ``None`` if no previous reboot is recorded. """ if not os.path.exists(self.LOG_FILE): return None try: with open(self.LOG_FILE) as f: lines = f.readlines() if not lines: return None last_line = lines[-1] # Format: # t=1234567890 | type=... | msg_type=... | msg_value=... for part in last_line.split("|"): part = part.strip() if part.startswith("t="): return int(part.split("=")[1]) except Exception as e: context_logger.error_with_context("RebootManager", f"Error reading last reboot time: {e}") return None return None
[docs] def write_reboot_log(self, payload: dict) -> bool: """Append a reboot event to the persistent log file. Writes a pipe-delimited line containing the event timestamp, type, and message details. Uses ``fsync`` to ensure the write is durable before the impending reboot. Args: payload: Reboot event dictionary with keys ``"t"`` (epoch ms), ``"type"`` (event category), and ``"msg"`` (dict with ``"type"`` and ``"value"`` sub-keys). Returns: ``True`` after successfully writing and flushing the log entry. """ with open(self.LOG_FILE, "a") as f: log_line = ( f"t={payload['t']} | " f"type={payload['type']} | " f"msg_type={payload['msg']['type']} | " f"msg_value={payload['msg']['value']}\n" ) f.write(log_line) f.flush() try: os.fsync(f.fileno()) except Exception: # fsync may fail on some filesystems (e.g., /proc), ignore but keep write pass time.sleep(0.5) # short pause not strictly necessary return True
[docs] def reboot_raspberry_pi(self, payload: dict) -> None: """Initiate a Raspberry Pi reboot if the cooldown period has elapsed. Checks the reboot log to determine whether the cooldown period has passed since the last reboot. If the cooldown is still active, the reboot is blocked and a log message is emitted. Otherwise, the event is logged and an immediate SysRq reboot is triggered. Args: payload: Reboot event dictionary with the following structure:: { "type": "events", "t": <epoch_ms>, "msg": {"type": "...", "value": "..."} } """ event_time_ms = payload["t"] last_reboot_ms = self.read_last_reboot() if last_reboot_ms is not None: diff_seconds = (event_time_ms - last_reboot_ms) / 1000 if diff_seconds < self.COOLDOWN_SECONDS: context_logger.info_with_context( "RebootManager", f"Reboot blocked: Only {diff_seconds / 3600:.2f} hours since last reboot.", ) return # Allowed -- Log + Reboot self.write_reboot_log(payload) context_logger.debug_with_context("RebootManager", "Rebooting Raspberry Pi...") try: time.sleep(5) with open("/proc/sysrq-trigger", "w") as f: f.write("b") except Exception as e: context_logger.error_with_context("RebootManager", f"Reboot failed: {e}")
if __name__ == "__main__": import time manager = RebootManager() payload = { "type": "events", "t": int(time.time() * 1000), "msg": {"type": "manual", "value": "test reboot"}, } manager.reboot_raspberry_pi(payload)