Source code for OzWrapper.OzAlert.OzAlert

"""Alert and automation wrapper for threshold-based sensor alerts.

Monitors sensor readings against configurable upper/lower thresholds
and triggers GPIO outputs (buzzer, light, relay), email/SMS notifications,
or MQTT messages when thresholds are breached.
"""

import json
import os
import time
from queue import Queue
from typing import Any

from drivers.gpio import gpio
from drivers.MCP230XX import MCP230XX
from Manager import Manager
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzAlert: """Threshold-based alert handler for sensor automation. Evaluates incoming sensor data against configured thresholds and activates the appropriate output channel (GPIO relay, buzzer/light, email, SMS, or MQTT). Attributes: output1_pin: GPIO pin number for relay output 1. output2_pin: GPIO pin number for relay output 2. buzzer_pin: GPIO pin number for the audible buzzer. light_pin: GPIO pin number for the alert indicator light. minInterval: Minimum polling interval (seconds) across all alerts. alerts: List of alert configuration dicts from the Gateway. manager: Manager instance for sending email/SMS via the Gateway. MCP: MCP230XX I/O expander instance (used when pin >= 100). mqtt_send_queue: Queue for publishing MQTT alert payloads. """ output1_pin = 19 output2_pin = 20 buzzer_pin = 16 light_pin = 26 minInterval = 100000
[docs] def __init__(self, manager: Manager) -> None: """Initialise the OzAlert handler. Args: manager: Manager instance used to dispatch email/SMS alerts through the Gateway API. """ super().__init__() # Initialize mutable/instance-specific attributes here to avoid shared state self.alerts: list[dict[str, Any]] = [] self.manager = manager self.MCP = None self.mqtt_send_queue: Queue | None = None
[docs] def setup(self, alertConfig: dict) -> None: """Configure GPIO pins and load alert rules from the Gateway config. Sets up output pins (direct GPIO or MCP230XX expander) for buzzer, light, and relay channels, then loads the list of alert rules. Args: alertConfig: Alert configuration dict containing ``gpio`` pin overrides and an ``alert`` list of threshold rules. """ if "gpio" in alertConfig: gpioConfig = alertConfig["gpio"] if "output1" in gpioConfig: self.output1_pin = gpioConfig["output1"] if "output2" in gpioConfig: self.output2_pin = gpioConfig["output2"] if "buzzer" in gpioConfig: self.buzzer_pin = gpioConfig["buzzer"] if "light" in gpioConfig: self.light_pin = gpioConfig["light"] if self.buzzer_pin < 100: gpio.setup(self.buzzer_pin, gpio.OUT) else: self.MCP = MCP230XX(devicenumber=os.getenv("MCP_ID", 6)) self.MCP.pinMode(self.buzzer_pin - 100, "output") if self.light_pin < 100: gpio.setup(self.light_pin, gpio.OUT) else: self.MCP.pinMode(self.light_pin - 100, "output") if self.output1_pin < 100: gpio.setup(self.output1_pin, gpio.OUT) else: self.MCP.pinMode(self.output1_pin - 100, "output") if self.output2_pin < 100: gpio.setup(self.output2_pin, gpio.OUT) else: self.MCP.pinMode(self.output2_pin - 100, "output") if "alert" in alertConfig: self.alerts = alertConfig["alert"] for alert in self.alerts: if "en" in alert and alert["en"] == 1: self.setAlert(alert["channel"], alert["default"]) if alert.get("interval"): if int(str(alert["interval"])) > 0 and int(str(alert["interval"])) < self.minInterval: self.minInterval = int(str(alert["interval"]))
[docs] def loop(self, alertqueue: Queue) -> None: """Run the alert evaluation loop, blocking on the queue for sensor data. Continuously reads sensor payloads from the queue, compares each value against its upper/lower thresholds, and triggers the corresponding alert channel. Also handles timed alert resets. Args: alertqueue: Queue supplying sensor data dicts for evaluation. """ while 1: try: data = {} context_logger.info_with_context("Alert", "Alert loop running...") if self.minInterval == 100000: data = alertqueue.get() else: data = alertqueue.get(timeout=self.minInterval) if data.get("d"): context_logger.debug_with_context("Alert", f"Sending data through Alert {data}") for index, alert in enumerate(self.alerts): if "en" in alert and alert["en"] == 0: continue if alert["key"] in data["d"]: value = data["d"][alert["key"]] if value > alert["upperThreshold"]: context_logger.debug_with_context("Alert", f"Upper Threshold hit for {alert['key']}") self.setAlert( alert["channel"], alert["upperStatus"], alert, data["d"], alert["upperThreshold"], ">", ) self.alerts[index]["time"] = int(time.time()) if value < alert["lowerThreshold"]: context_logger.debug_with_context("Alert", f"Lower Threshold hit for {alert['key']}") self.setAlert( alert["channel"], alert["lowerStatus"], alert, data["d"], alert["lowerThreshold"], "<", ) self.alerts[index]["time"] = int(time.time()) else: if alert["channel"] == 3: context_logger.info_with_context("Alert", "Key Missing from payload") self.setAlert( alert["channel"], alert["upperStatus"], alert, data["d"], alert["upperThreshold"], ">", ) self.alerts[index]["time"] = int(time.time()) except Exception as e: context_logger.error_with_context("Alert", f"Error occurred: {e}") finally: context_logger.info_with_context("Alert", "Alert loop finished") time.sleep(1) for alert in self.alerts: if "en" in alert and alert["en"] == 0: continue if alert.get("interval"): if alert.get("time"): currentTime = int(time.time()) if int(str(alert["interval"])) > 0: if (currentTime - alert["time"]) > int(str(alert["interval"])): self.setAlert(alert["channel"], alert["default"]) del alert["time"]
[docs] def setAlert( self, output: int, status: int, config: dict[str, Any] | None = None, data: dict[str, Any] | None = None, threshold: int | float | None = None, operator: str | None = None, ) -> None: """Activate or deactivate an alert on the specified output channel. Channel mapping: 0 -- relay output 1 (GPIO/MCP) 1 -- relay output 2 (GPIO/MCP) 2 -- buzzer + light (GPIO/MCP) 3 -- email notification via Gateway 4 -- SMS notification via Gateway 5 -- MQTT message via send queue Args: output: Output channel number (0-5). status: Desired state (1 = active, 0 = inactive). config: Alert rule dict (required for email/SMS/MQTT channels). data: Current sensor data payload. threshold: Threshold value that was breached. operator: Comparison operator string (``">"`` or ``"<"``). """ if output == 0: if self.output1_pin < 100: gpio.set(self.output1_pin, status) else: self.MCP.digitalWrite(self.output1_pin - 100, status) if output == 1: if self.output2_pin < 100: gpio.set(self.output2_pin, status) else: self.MCP.digitalWrite(self.output2_pin - 100, status) if output == 2: if self.buzzer_pin < 100: gpio.set(self.buzzer_pin, status) else: self.MCP.digitalWrite(self.buzzer_pin - 100, status) if self.light_pin < 100: gpio.set(self.light_pin, status) else: self.MCP.digitalWrite(self.light_pin - 100, status) if output == 3 and status: if config is not None and threshold is not None: emailString = ",".join(config["email"]) payload = { "config": config, "to": emailString, "value": threshold, "userId": 2019, "deviceId": os.environ["DEVICEID"], "templateId": 1001, "data": data, "operator": operator, } try: s_success = self.manager.sendAutomationAlertsEmailSms(payload) context_logger.info_with_context("Alert", "AUTOMATION_EMAIL_EXECUTION: success") except Exception as e: context_logger.error_with_context("Alert", f"Error occurred: {e}") if output == 4 and status: if config is not None and threshold is not None: smsString = "<".join(config["sms"]) payload = { "config": config, "value": threshold, "to": smsString, "userId": 2019, "deviceId": os.environ["DEVICEID"], "templateId": 1002, "data": data, "operator": operator, } try: s_success = self.manager.sendAutomationAlertsEmailSms(payload) context_logger.info_with_context("Alert", f"AUTOMATION_EMAIL_EXECUTION: {s_success}") except Exception as e: context_logger.error_with_context("Alert", f"Error occurred: {e}") if output == 5 and status: if config is not None and threshold is not None: payload = { "metadata": config["metadata"], "key": config["key"], "threshold": threshold, "value": data[config["key"]], "operator": operator, "type": "alert", } try: self.mqtt_send_queue.put(payload) context_logger.info_with_context("Alert", "AUTOMATION_MQTT_EXECUTION: Success") except Exception as e: context_logger.error_with_context("Alert", f"Error occurred: {e}")
[docs] def run(self, alertConfig: dict, alertqueue: Queue, mqttqueue: Queue) -> None: """Entry point: set up alerts and start the evaluation loop. Checks if alerts are enabled in the config, stores the MQTT queue, then delegates to ``setup()`` and ``loop()``. Args: alertConfig: Full alert/automation configuration from the Gateway. alertqueue: Queue supplying sensor data payloads. mqttqueue: Queue for outbound MQTT alert messages. """ if "en" in alertConfig: if alertConfig["en"] == 1: self.mqtt_send_queue = mqttqueue self.setup(alertConfig) self.loop(alertqueue)
if __name__ == "__main__": from queue import Queue file_string = "alert.json" dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, file_string) with open(file_name) as configFile: sensor = configFile.read() alertConfig = json.loads(sensor)["automation"] alert_queue = Queue(1) automation = OzAlert() automation.setup(alertConfig) automation.loop(alert_queue)