"""Raspberry Pi reboot manager with cooldown protection.
Provides controlled device rebooting with a configurable cooldown period
(default 4 hours) to prevent reboot loops. Each reboot event is logged to
a persistent file so the cooldown survives process restarts.
The reboot is triggered via the Linux SysRq mechanism
(``/proc/sysrq-trigger``), which performs an immediate hardware reboot
without a clean shutdown. This is used as a last-resort recovery mechanism
when the device enters an unrecoverable error state.
Typical usage::
from utils.reboot_RPi import RebootManager
manager = RebootManager()
manager.reboot_raspberry_pi({
"type": "events",
"t": 1700000000000,
"msg": {"type": "watchdog", "value": "sensor_timeout"},
})
Warning:
The SysRq reboot is immediate and ungraceful. Filesystem corruption is
possible if writes are in progress. The 5-second delay before triggering
allows pending I/O to flush.
"""
import os
import time
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class RebootManager:
"""Manages Raspberry Pi reboots with a cooldown period to prevent loops.
Tracks reboot history in a persistent log file and enforces a minimum
interval between reboots. This prevents cascading reboot loops when a
persistent hardware or configuration error triggers repeated reboot
requests.
Attributes:
LOG_FILE: Path to the persistent reboot log file.
COOLDOWN_SECONDS: Minimum interval between reboots in seconds
(default: 4 hours).
"""
LOG_FILE = "reboot_log.txt"
COOLDOWN_SECONDS = 4 * 60 * 60 # 4 hours
[docs]
def read_last_reboot(self) -> int | None:
"""Read the timestamp of the most recent reboot from the log file.
Parses the last line of :attr:`LOG_FILE` looking for a ``t=<epoch_ms>``
field. Returns ``None`` if the file does not exist, is empty, or cannot
be parsed.
Returns:
Epoch timestamp in milliseconds of the last reboot, or ``None``
if no previous reboot is recorded.
"""
if not os.path.exists(self.LOG_FILE):
return None
try:
with open(self.LOG_FILE) as f:
lines = f.readlines()
if not lines:
return None
last_line = lines[-1]
# Format:
# t=1234567890 | type=... | msg_type=... | msg_value=...
for part in last_line.split("|"):
part = part.strip()
if part.startswith("t="):
return int(part.split("=")[1])
except Exception as e:
context_logger.error_with_context("RebootManager", f"Error reading last reboot time: {e}")
return None
return None
[docs]
def write_reboot_log(self, payload: dict) -> bool:
"""Append a reboot event to the persistent log file.
Writes a pipe-delimited line containing the event timestamp, type,
and message details. Uses ``fsync`` to ensure the write is durable
before the impending reboot.
Args:
payload: Reboot event dictionary with keys ``"t"`` (epoch ms),
``"type"`` (event category), and ``"msg"`` (dict with
``"type"`` and ``"value"`` sub-keys).
Returns:
``True`` after successfully writing and flushing the log entry.
"""
with open(self.LOG_FILE, "a") as f:
log_line = (
f"t={payload['t']} | "
f"type={payload['type']} | "
f"msg_type={payload['msg']['type']} | "
f"msg_value={payload['msg']['value']}\n"
)
f.write(log_line)
f.flush()
try:
os.fsync(f.fileno())
except Exception:
# fsync may fail on some filesystems (e.g., /proc), ignore but keep write
pass
time.sleep(0.5) # short pause not strictly necessary
return True
[docs]
def reboot_raspberry_pi(self, payload: dict) -> None:
"""Initiate a Raspberry Pi reboot if the cooldown period has elapsed.
Checks the reboot log to determine whether the cooldown period has
passed since the last reboot. If the cooldown is still active, the
reboot is blocked and a log message is emitted. Otherwise, the event
is logged and an immediate SysRq reboot is triggered.
Args:
payload: Reboot event dictionary with the following structure::
{
"type": "events",
"t": <epoch_ms>,
"msg": {"type": "...", "value": "..."}
}
"""
event_time_ms = payload["t"]
last_reboot_ms = self.read_last_reboot()
if last_reboot_ms is not None:
diff_seconds = (event_time_ms - last_reboot_ms) / 1000
if diff_seconds < self.COOLDOWN_SECONDS:
context_logger.info_with_context(
"RebootManager",
f"Reboot blocked: Only {diff_seconds / 3600:.2f} hours since last reboot.",
)
return
# Allowed -- Log + Reboot
self.write_reboot_log(payload)
context_logger.debug_with_context("RebootManager", "Rebooting Raspberry Pi...")
try:
time.sleep(5)
with open("/proc/sysrq-trigger", "w") as f:
f.write("b")
except Exception as e:
context_logger.error_with_context("RebootManager", f"Reboot failed: {e}")
if __name__ == "__main__":
import time
manager = RebootManager()
payload = {
"type": "events",
"t": int(time.time() * 1000),
"msg": {"type": "manual", "value": "test reboot"},
}
manager.reboot_raspberry_pi(payload)