Source code for Manager.Manager

"""HTTP client for communication with the Oizom Gateway container.

Provides methods to fetch device configuration, post sensor data (both
periodic and realtime), send initialization payloads, update device
configuration, and retrieve PPB analytics from the Gateway REST API.

All communication uses JSON over HTTP with an access token for
authentication. The Gateway URL and API paths are configurable via
environment variables.

Environment Variables:
    GATEWAY_URL: Base URL of the Gateway (default: ``http://gateway:8080``).
    GATEWAY_CONFIG_API: Config endpoint path (default: ``/query/config``).
    GATEWAY_DATA_API: Data posting endpoint (default: ``/query/data``).
    GATEWAY_INIT_API: Init endpoint path (default: ``/config/init``).
    GATEWAY_ACCESS_TOKEN: JWT token for API authentication.

Typical usage::

    from Manager import Manager

    mgr = Manager()
    config, status = mgr.getConfig()
    mgr.sendData(data_payload, realtime_payload)
"""

import json
import os
from typing import Any

import requests

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class Manager: """HTTP client for the Oizom Gateway REST API. Handles all communication between the hardware sensor layer and the Gateway container, including configuration fetching, data posting, and device initialization. All HTTP requests use a 10-second timeout and return status code 404 on any failure. Attributes: server: Base URL of the Gateway service. config_api: Path to the configuration endpoint. data_api: Path to the data posting endpoint. access_token: JWT token for API authentication. config_headers: HTTP headers sent with every request. """ # server = "http://localhost:5000" # server = "http://gateway:5000" server = os.getenv("GATEWAY_URL", "http://gateway:8080") config_api = os.getenv("GATEWAY_CONFIG_API", "/query/config") data_api = os.getenv("GATEWAY_DATA_API", "/query/data") realtime_data_api = os.getenv("GATEWAY_DATA_API", "/query/realtimeData") automation_alert_api = os.getenv("GATEWAY_DATA_API", "/alerts/automation") init_api = os.getenv("GATEWAY_INIT_API", "/config/init") ppb_analytics = os.getenv("GATEWAY_PPB_ANALYTICS", "/ppb/device/") access_token = os.getenv( "GATEWAY_ACCESS_TOKEN", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6ImppbUBvaXpvbS5jb20iLCJhcGlDb3VudGVyIjowLCJpYXQiOjE0ODkyMzIwODYsImV4cCI6MTUyMDc4OTAxMn0.Cc5nwYiEv_WsV1sCMH_sY7QCKZ3dlPMQJVQYugaJbrk", )
[docs] def __init__(self) -> None: """Initialize the Manager with Gateway connection parameters. Reads Gateway URL, API paths, and access token from environment variables, then constructs the HTTP headers used for all requests. """ # initialize mutable attributes per-instance to avoid shared state across instances self.config_headers = { "Content-Type": "application/json", "x-access-token": self.access_token, } print( "--------------------------HARDWARE RESTARTING--------------------------", ) print("server: %s", self.server) print("config_api: %s", self.config_api) print("data_api: %s", self.data_api) print("access_token: %s", self.access_token) print("config_headers: %s", self.config_headers)
[docs] def getConfig(self) -> tuple[dict, int]: """Fetch the device configuration from the Gateway. Sends a GET request to the Gateway's configuration endpoint and returns the parsed JSON response along with the HTTP status code. Returns: Tuple of (config_dict, status_code). Returns ({}, 404) on any request failure. """ try: r = requests.get(url=(self.server + self.config_api), timeout=10) data = r.json() context_logger.info_with_context("Manager", "Config Data: %s", data) return (data, r.status_code) except Exception as e: context_logger.error_with_context("Manager", "getConfig: %s", e) return ({}, 404)
[docs] def getPPBanalytics(self, lte: int, gte: int, deviceId: str, deviceType: str) -> tuple[Any, int]: """Fetch PPB (parts-per-billion) analytics data for a device. Retrieves raw sensor analytics from the Gateway for a specified time range, used for calibration and quality control calculations. Args: lte: Upper bound epoch timestamp (less-than-or-equal). gte: Lower bound epoch timestamp (greater-than-or-equal). deviceId: Unique identifier of the target device. deviceType: Device type string (unused in current implementation). Returns: Tuple of (analytics_data, status_code). Returns ({}, 404) on failure. """ try: payload = '{"gte":' + str(gte) + ',"lte":' + str(lte) + ',"limit":1000,"page":0,"raw":true}' querystring = {"db": "cs"} url_raw = str(self.server) + str(self.ppb_analytics) + str(deviceId) + "?db=cs" context_logger.info_with_context("Manager", "PPB Analytics URL: %s", url_raw) r_oz = requests.request( "POST", url_raw, data=payload, headers=self.config_headers, params=querystring, timeout=10, ) Sdata = json.loads(r_oz.text) return (Sdata, r_oz.status_code) except Exception as e: context_logger.error_with_context("Manager", "getPPBanalytics: %s", e) return ({}, 404)
[docs] def updateConfig(self, config: dict, deviceId: str) -> int: """Update the device configuration on the Gateway. Sends a PUT request to update the configuration for a specific device. Used for automatic temperature compensation updates and similar runtime configuration changes. Args: config: Configuration dictionary to apply. deviceId: Unique identifier of the target device. Returns: HTTP status code from the Gateway, or 404 on failure. """ try: context_logger.info_with_context("Manager", "Updating Config: %s", config) url_raw = str(self.server) + "/config/devices/" + str(deviceId) + "/user/2019" r = requests.put( url_raw, headers=self.config_headers, data=json.dumps({"config": config, "update_note": "auto temp compensation updated"}), timeout=10, ) context_logger.info_with_context("Manager", "Config Response: %s", r.json()) return r.status_code except Exception as e: context_logger.error_with_context("Manager", "updateConfig: %s", e) return 404
[docs] def sendData(self, payload: json, r_data: json) -> int: """Post aggregated sensor data to the Gateway. Sends the periodic data payload (``d``) and optional realtime data payload (``r``) to the Gateway's data endpoint. This is the primary method for publishing sensor readings to the cloud. Args: payload: Aggregated sensor data dictionary (the ``d`` payload). r_data: Realtime sensor data dictionary (the ``r`` payload), or ``None``/empty to omit. Returns: HTTP status code from the Gateway, or 404 on failure. """ try: cloud_payload = {} cloud_payload["d"] = payload if r_data: cloud_payload["r"] = r_data context_logger.info_with_context("Manager", "Sending Data: %s", cloud_payload) r = requests.post( self.server + self.data_api, headers=self.config_headers, data=json.dumps(cloud_payload), timeout=10, ) return r.status_code except Exception as e: context_logger.error_with_context("Manager", "sendData: %s", e) return 404
[docs] def sendRealtimeData(self, payload: json) -> int: """Post realtime sensor data to the Gateway's WebSocket relay. Sends the latest sensor readings to the realtime data endpoint, which forwards them to connected WebSocket clients for live dashboard display. Args: payload: Realtime sensor data dictionary. Returns: HTTP status code from the Gateway, or 404 on failure. """ try: socket_payload = {"d": payload} context_logger.info_with_context("Manager", "Sending Data to realtime socket: %s", socket_payload) r = requests.post( self.server + self.realtime_data_api, headers=self.config_headers, data=json.dumps(socket_payload), timeout=10, ) return r.status_code except Exception as e: context_logger.error_with_context("Manager", "sendRealtimeData: %s", e) return 404
[docs] def sendAutomationAlertsEmailSms(self, payload: json) -> int: """Send automation alert notifications via the Gateway. Posts an alert payload to the Gateway's automation alert endpoint, which triggers email and/or SMS notifications based on configured alert rules. Args: payload: Alert payload dictionary containing alert type, threshold values, and notification targets. Returns: HTTP status code from the Gateway, or 404 on failure. """ try: context_logger.info_with_context("Manager", "Sending Automation Alert: %s", payload) r = requests.post( self.server + self.automation_alert_api, headers=self.config_headers, data=json.dumps(payload), timeout=10, ) return r.status_code except Exception as e: context_logger.error_with_context("Manager", "sendAutomationAlertsEmailSms: %s", e) return 404
[docs] def sendInit(self, payload: json) -> int: """Send device initialization data to the Gateway. Posts hardware and firmware information to the Gateway's init endpoint during device startup. This registers the device and its capabilities with the cloud platform. Args: payload: Initialization payload containing device ID, firmware version, sensor list, and hardware capabilities. Returns: HTTP status code from the Gateway, or 404 on failure. """ try: context_logger.info_with_context("Manager", "Sending Init: %s", payload) r = requests.put( self.server + self.init_api, headers=self.config_headers, data=json.dumps(payload), timeout=10, ) context_logger.info_with_context("Manager", "Init Response: %s", r.json()) return r.status_code except Exception as e: context_logger.error_with_context("Manager", "sendInit: %s", e) return 404