"""HTTP client for communication with the Oizom Gateway container.
Provides methods to fetch device configuration, post sensor data (both
periodic and realtime), send initialization payloads, update device
configuration, and retrieve PPB analytics from the Gateway REST API.
All communication uses JSON over HTTP with an access token for
authentication. The Gateway URL and API paths are configurable via
environment variables.
Environment Variables:
GATEWAY_URL: Base URL of the Gateway (default: ``http://gateway:8080``).
GATEWAY_CONFIG_API: Config endpoint path (default: ``/query/config``).
GATEWAY_DATA_API: Data posting endpoint (default: ``/query/data``).
GATEWAY_INIT_API: Init endpoint path (default: ``/config/init``).
GATEWAY_ACCESS_TOKEN: JWT token for API authentication.
Typical usage::
from Manager import Manager
mgr = Manager()
config, status = mgr.getConfig()
mgr.sendData(data_payload, realtime_payload)
"""
import json
import os
from typing import Any
import requests
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class Manager:
"""HTTP client for the Oizom Gateway REST API.
Handles all communication between the hardware sensor layer and the
Gateway container, including configuration fetching, data posting,
and device initialization. All HTTP requests use a 10-second timeout
and return status code 404 on any failure.
Attributes:
server: Base URL of the Gateway service.
config_api: Path to the configuration endpoint.
data_api: Path to the data posting endpoint.
access_token: JWT token for API authentication.
config_headers: HTTP headers sent with every request.
"""
# server = "http://localhost:5000"
# server = "http://gateway:5000"
server = os.getenv("GATEWAY_URL", "http://gateway:8080")
config_api = os.getenv("GATEWAY_CONFIG_API", "/query/config")
data_api = os.getenv("GATEWAY_DATA_API", "/query/data")
realtime_data_api = os.getenv("GATEWAY_DATA_API", "/query/realtimeData")
automation_alert_api = os.getenv("GATEWAY_DATA_API", "/alerts/automation")
init_api = os.getenv("GATEWAY_INIT_API", "/config/init")
ppb_analytics = os.getenv("GATEWAY_PPB_ANALYTICS", "/ppb/device/")
access_token = os.getenv(
"GATEWAY_ACCESS_TOKEN",
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJlbWFpbCI6ImppbUBvaXpvbS5jb20iLCJhcGlDb3VudGVyIjowLCJpYXQiOjE0ODkyMzIwODYsImV4cCI6MTUyMDc4OTAxMn0.Cc5nwYiEv_WsV1sCMH_sY7QCKZ3dlPMQJVQYugaJbrk",
)
[docs]
def __init__(self) -> None:
"""Initialize the Manager with Gateway connection parameters.
Reads Gateway URL, API paths, and access token from environment
variables, then constructs the HTTP headers used for all requests.
"""
# initialize mutable attributes per-instance to avoid shared state across instances
self.config_headers = {
"Content-Type": "application/json",
"x-access-token": self.access_token,
}
print(
"--------------------------HARDWARE RESTARTING--------------------------",
)
print("server: %s", self.server)
print("config_api: %s", self.config_api)
print("data_api: %s", self.data_api)
print("access_token: %s", self.access_token)
print("config_headers: %s", self.config_headers)
[docs]
def getConfig(self) -> tuple[dict, int]:
"""Fetch the device configuration from the Gateway.
Sends a GET request to the Gateway's configuration endpoint and
returns the parsed JSON response along with the HTTP status code.
Returns:
Tuple of (config_dict, status_code). Returns ({}, 404) on
any request failure.
"""
try:
r = requests.get(url=(self.server + self.config_api), timeout=10)
data = r.json()
context_logger.info_with_context("Manager", "Config Data: %s", data)
return (data, r.status_code)
except Exception as e:
context_logger.error_with_context("Manager", "getConfig: %s", e)
return ({}, 404)
[docs]
def getPPBanalytics(self, lte: int, gte: int, deviceId: str, deviceType: str) -> tuple[Any, int]:
"""Fetch PPB (parts-per-billion) analytics data for a device.
Retrieves raw sensor analytics from the Gateway for a specified
time range, used for calibration and quality control calculations.
Args:
lte: Upper bound epoch timestamp (less-than-or-equal).
gte: Lower bound epoch timestamp (greater-than-or-equal).
deviceId: Unique identifier of the target device.
deviceType: Device type string (unused in current implementation).
Returns:
Tuple of (analytics_data, status_code). Returns ({}, 404) on failure.
"""
try:
payload = '{"gte":' + str(gte) + ',"lte":' + str(lte) + ',"limit":1000,"page":0,"raw":true}'
querystring = {"db": "cs"}
url_raw = str(self.server) + str(self.ppb_analytics) + str(deviceId) + "?db=cs"
context_logger.info_with_context("Manager", "PPB Analytics URL: %s", url_raw)
r_oz = requests.request(
"POST",
url_raw,
data=payload,
headers=self.config_headers,
params=querystring,
timeout=10,
)
Sdata = json.loads(r_oz.text)
return (Sdata, r_oz.status_code)
except Exception as e:
context_logger.error_with_context("Manager", "getPPBanalytics: %s", e)
return ({}, 404)
[docs]
def updateConfig(self, config: dict, deviceId: str) -> int:
"""Update the device configuration on the Gateway.
Sends a PUT request to update the configuration for a specific device.
Used for automatic temperature compensation updates and similar
runtime configuration changes.
Args:
config: Configuration dictionary to apply.
deviceId: Unique identifier of the target device.
Returns:
HTTP status code from the Gateway, or 404 on failure.
"""
try:
context_logger.info_with_context("Manager", "Updating Config: %s", config)
url_raw = str(self.server) + "/config/devices/" + str(deviceId) + "/user/2019"
r = requests.put(
url_raw,
headers=self.config_headers,
data=json.dumps({"config": config, "update_note": "auto temp compensation updated"}),
timeout=10,
)
context_logger.info_with_context("Manager", "Config Response: %s", r.json())
return r.status_code
except Exception as e:
context_logger.error_with_context("Manager", "updateConfig: %s", e)
return 404
[docs]
def sendData(self, payload: json, r_data: json) -> int:
"""Post aggregated sensor data to the Gateway.
Sends the periodic data payload (``d``) and optional realtime data
payload (``r``) to the Gateway's data endpoint. This is the primary
method for publishing sensor readings to the cloud.
Args:
payload: Aggregated sensor data dictionary (the ``d`` payload).
r_data: Realtime sensor data dictionary (the ``r`` payload),
or ``None``/empty to omit.
Returns:
HTTP status code from the Gateway, or 404 on failure.
"""
try:
cloud_payload = {}
cloud_payload["d"] = payload
if r_data:
cloud_payload["r"] = r_data
context_logger.info_with_context("Manager", "Sending Data: %s", cloud_payload)
r = requests.post(
self.server + self.data_api,
headers=self.config_headers,
data=json.dumps(cloud_payload),
timeout=10,
)
return r.status_code
except Exception as e:
context_logger.error_with_context("Manager", "sendData: %s", e)
return 404
[docs]
def sendRealtimeData(self, payload: json) -> int:
"""Post realtime sensor data to the Gateway's WebSocket relay.
Sends the latest sensor readings to the realtime data endpoint,
which forwards them to connected WebSocket clients for live
dashboard display.
Args:
payload: Realtime sensor data dictionary.
Returns:
HTTP status code from the Gateway, or 404 on failure.
"""
try:
socket_payload = {"d": payload}
context_logger.info_with_context("Manager", "Sending Data to realtime socket: %s", socket_payload)
r = requests.post(
self.server + self.realtime_data_api,
headers=self.config_headers,
data=json.dumps(socket_payload),
timeout=10,
)
return r.status_code
except Exception as e:
context_logger.error_with_context("Manager", "sendRealtimeData: %s", e)
return 404
[docs]
def sendAutomationAlertsEmailSms(self, payload: json) -> int:
"""Send automation alert notifications via the Gateway.
Posts an alert payload to the Gateway's automation alert endpoint,
which triggers email and/or SMS notifications based on configured
alert rules.
Args:
payload: Alert payload dictionary containing alert type,
threshold values, and notification targets.
Returns:
HTTP status code from the Gateway, or 404 on failure.
"""
try:
context_logger.info_with_context("Manager", "Sending Automation Alert: %s", payload)
r = requests.post(
self.server + self.automation_alert_api,
headers=self.config_headers,
data=json.dumps(payload),
timeout=10,
)
return r.status_code
except Exception as e:
context_logger.error_with_context("Manager", "sendAutomationAlertsEmailSms: %s", e)
return 404
[docs]
def sendInit(self, payload: json) -> int:
"""Send device initialization data to the Gateway.
Posts hardware and firmware information to the Gateway's init
endpoint during device startup. This registers the device and
its capabilities with the cloud platform.
Args:
payload: Initialization payload containing device ID, firmware
version, sensor list, and hardware capabilities.
Returns:
HTTP status code from the Gateway, or 404 on failure.
"""
try:
context_logger.info_with_context("Manager", "Sending Init: %s", payload)
r = requests.put(
self.server + self.init_api,
headers=self.config_headers,
data=json.dumps(payload),
timeout=10,
)
context_logger.info_with_context("Manager", "Init Response: %s", r.json())
return r.status_code
except Exception as e:
context_logger.error_with_context("Manager", "sendInit: %s", e)
return 404