Source code for Network.Network

"""Network connectivity monitor for Oizom edge devices.

Provides internet reachability checking by probing configurable server
endpoints via TCP socket connections. Runs as a background thread that
continuously updates a shared :class:`Queue` with connectivity status.

Also communicates with an external Network Manager service to query
detailed interface status (Ethernet, GSM, WiFi) for LED indicator
control and diagnostic reporting.

Environment Variables:
    NETWORKMANAGER_URL: URL of the Network Manager service
        (default: ``http://oz_terminal/network``).

    STATUS_API: Status endpoint path (default: ``/network/status``).

Typical usage::

    from Network import Network
    from queue import Queue

    net = Network()
    q = Queue(maxsize=2)
    q.put(0)  # placeholder
    q.put(False)  # initial status
    net.run(q)  # blocking -- run in a thread
"""

import json
import os
import socket
import time
from queue import Queue
from typing import Any

import requests

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class Network: """Internet connectivity monitor and Network Manager client. Probes a list of servers via TCP sockets to determine internet reachability, and queries the Network Manager service for detailed interface status. Designed to run in a background thread, continuously updating a shared Queue with the current connectivity state. Attributes: server: URL of the Network Manager service. serverList: List of server dicts (``{"ip": ..., "port": ...}``) to probe for connectivity. timeout: TCP connection timeout in seconds. looptime: Delay between connectivity checks in seconds. HOSTPOT_TIMER: Duration (seconds) for hotspot LED blinking mode. """ # serverList (per-instance) moved to __init__ to avoid shared mutable class attribute # default server tuple (immutable) used to seed per-instance list _DEFAULT_SERVER = ("8.8.8.8", 53) success_delay = 6 HOSTPOT_TIMER = -1 # Blue blinking for 2 minute server = os.getenv( "NETWORKMANAGER_URL", "http://oz_terminal/network" ) # # Hardware services <-> Gateway IP Bridge in-between <-> Network manager status_api = os.getenv("STATUS_API", "/network/status")
[docs] def __init__( self, userList: list[dict[str, Any]] | None = None, timeout: int = 5, looptime: int = 5, ) -> None: """Initialize the Network monitor. Args: userList: Optional list of server dicts to prepend to the default probe list. Each dict must have ``"ip"`` and optionally ``"port"`` keys. timeout: TCP socket connection timeout in seconds. looptime: Delay between connectivity check iterations in seconds. """ self.timeout = timeout self.looptime = looptime # initialize serverList per-instance to avoid shared mutable state across instances default_ip, default_port = self._DEFAULT_SERVER self.serverList = [{"ip": default_ip, "port": default_port}] if userList is not None: # make a shallow copy of userList to avoid accidental sharing self.serverList = userList + self.serverList
[docs] def add_server(self, ip: str, port: int = 80) -> None: """Add a server to the front of the probe list. Args: ip: IP address or hostname of the server. port: TCP port to probe (default: 80). """ self.serverList = [{"ip": ip, "port": port}, *self.serverList]
[docs] def is_internet(self) -> bool: """Check internet reachability by probing configured servers. Iterates through :attr:`serverList` and attempts a TCP connection to each. Returns ``True`` on the first successful connection. Returns: ``True`` if any server in the list is reachable, ``False`` otherwise. """ success = False for server in self.serverList: _ip = "" _port = 80 # default port 80 try: if "port" in server: _port = server.get("port") _ip = server.get("ip") socket.setdefaulttimeout(self.timeout) soc = socket.create_connection((_ip, _port)) soc.close() success = True break except (TimeoutError, OSError) as err: # Expected network errors (timeout, unreachable, connection refused, etc.) # Log at debug level to avoid spamming production logs but keep context for troubleshooting. context_logger.debug_with_context("Network", f"Network Connection {_ip}:{_port} - {err}") success = False except Exception as err: # Unexpected errors: log with context for visibility. context_logger.error_with_context("Network", f"is_internet: {err}") success = False return success
[docs] def run(self, network_queue: Queue) -> None: """Run the connectivity check loop (blocking). Continuously checks internet reachability and updates ``network_queue.queue[1]`` with the current status. Designed to be run in a background thread. Args: network_queue: Shared Queue whose second element is updated with connectivity status (``True``/``False`` or ``2`` for hotspot mode). """ start_time = time.time() while 1: connection = 0 current_time = time.time() try: internet_available = self.is_internet() if current_time - start_time < self.HOSTPOT_TIMER: connection = self.identifyConnection() if connection == 2: network_queue.queue[1] = connection else: network_queue.queue[1] = internet_available if internet_available: time.sleep(self.success_delay) time.sleep(self.looptime) except Exception as e: context_logger.error_with_context("Network", f"run: {e}") time.sleep(self.looptime)
[docs] def putStatus(self, value: dict[str, Any]) -> None: """Fetch network status and merge it into the given dictionary. Args: value: Dictionary to update with network status fields from the Network Manager service. """ try: r = requests.get(url=(self.server + self.status_api), timeout=self.timeout) status = json.loads(json.dumps(r.json())) value.update(status) except Exception as err: context_logger.error_with_context("Network", f"putStatus: {err}")
[docs] def identifyConnection(self) -> int: """Identify the current connection state for LED indicator control. Queries the Network Manager for interface status and returns an integer code used to drive the device's RGB LED indicator. Returns: ``1`` if connected (Ethernet, GSM, or WiFi), ``2`` if no connection and WiFi not associated (hotspot mode), ``0`` if status is unavailable. """ nw_status = self.getStatus() # Cyan if nw_status is not None: try: # Cyan - Device is Connected if nw_status["eth"]["internet"] or nw_status["gsm"]["internet"] or nw_status["wifi"]["internet"]: return 1 if (not nw_status["eth"]["internet"] and not nw_status["gsm"]["internet"]) and ( not nw_status["wifi"]["wpa_state"] == "COMPLETED" ): return 2 except Exception as e: context_logger.error_with_context("Network", f"identifyConnection: {e}") return 0
[docs] def getStatus(self) -> dict[str, object] | None: """Fetch detailed network interface status from the Network Manager. Returns: Dictionary with ``"eth"``, ``"gsm"``, and ``"wifi"`` sub-dicts containing interface-level status, or ``None`` on request failure. """ try: r = requests.get(url=(self.server + self.status_api), timeout=self.timeout) return json.loads(json.dumps(r.json())) except Exception as err: context_logger.error_with_context("Network", f"getStatus: {err}") return None
if __name__ == "__main__": nt = Network() nt.add_server("manager.oizom.com") while 1: time.sleep(1) context_logger.info_with_context("Network", f"Internet status: {nt.is_internet()}")