"""Network connectivity monitor for Oizom edge devices.
Provides internet reachability checking by probing configurable server
endpoints via TCP socket connections. Runs as a background thread that
continuously updates a shared :class:`Queue` with connectivity status.
Also communicates with an external Network Manager service to query
detailed interface status (Ethernet, GSM, WiFi) for LED indicator
control and diagnostic reporting.
Environment Variables:
NETWORKMANAGER_URL: URL of the Network Manager service
(default: ``http://oz_terminal/network``).
STATUS_API: Status endpoint path (default: ``/network/status``).
Typical usage::
from Network import Network
from queue import Queue
net = Network()
q = Queue(maxsize=2)
q.put(0) # placeholder
q.put(False) # initial status
net.run(q) # blocking -- run in a thread
"""
import json
import os
import socket
import time
from queue import Queue
from typing import Any
import requests
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class Network:
"""Internet connectivity monitor and Network Manager client.
Probes a list of servers via TCP sockets to determine internet
reachability, and queries the Network Manager service for detailed
interface status. Designed to run in a background thread, continuously
updating a shared Queue with the current connectivity state.
Attributes:
server: URL of the Network Manager service.
serverList: List of server dicts (``{"ip": ..., "port": ...}``)
to probe for connectivity.
timeout: TCP connection timeout in seconds.
looptime: Delay between connectivity checks in seconds.
HOSTPOT_TIMER: Duration (seconds) for hotspot LED blinking mode.
"""
# serverList (per-instance) moved to __init__ to avoid shared mutable class attribute
# default server tuple (immutable) used to seed per-instance list
_DEFAULT_SERVER = ("8.8.8.8", 53)
success_delay = 6
HOSTPOT_TIMER = -1 # Blue blinking for 2 minute
server = os.getenv(
"NETWORKMANAGER_URL", "http://oz_terminal/network"
) # # Hardware services <-> Gateway IP Bridge in-between <-> Network manager
status_api = os.getenv("STATUS_API", "/network/status")
[docs]
def __init__(
self,
userList: list[dict[str, Any]] | None = None,
timeout: int = 5,
looptime: int = 5,
) -> None:
"""Initialize the Network monitor.
Args:
userList: Optional list of server dicts to prepend to the
default probe list. Each dict must have ``"ip"`` and
optionally ``"port"`` keys.
timeout: TCP socket connection timeout in seconds.
looptime: Delay between connectivity check iterations in seconds.
"""
self.timeout = timeout
self.looptime = looptime
# initialize serverList per-instance to avoid shared mutable state across instances
default_ip, default_port = self._DEFAULT_SERVER
self.serverList = [{"ip": default_ip, "port": default_port}]
if userList is not None:
# make a shallow copy of userList to avoid accidental sharing
self.serverList = userList + self.serverList
[docs]
def add_server(self, ip: str, port: int = 80) -> None:
"""Add a server to the front of the probe list.
Args:
ip: IP address or hostname of the server.
port: TCP port to probe (default: 80).
"""
self.serverList = [{"ip": ip, "port": port}, *self.serverList]
[docs]
def is_internet(self) -> bool:
"""Check internet reachability by probing configured servers.
Iterates through :attr:`serverList` and attempts a TCP connection
to each. Returns ``True`` on the first successful connection.
Returns:
``True`` if any server in the list is reachable, ``False`` otherwise.
"""
success = False
for server in self.serverList:
_ip = ""
_port = 80 # default port 80
try:
if "port" in server:
_port = server.get("port")
_ip = server.get("ip")
socket.setdefaulttimeout(self.timeout)
soc = socket.create_connection((_ip, _port))
soc.close()
success = True
break
except (TimeoutError, OSError) as err:
# Expected network errors (timeout, unreachable, connection refused, etc.)
# Log at debug level to avoid spamming production logs but keep context for troubleshooting.
context_logger.debug_with_context("Network", f"Network Connection {_ip}:{_port} - {err}")
success = False
except Exception as err:
# Unexpected errors: log with context for visibility.
context_logger.error_with_context("Network", f"is_internet: {err}")
success = False
return success
[docs]
def run(self, network_queue: Queue) -> None:
"""Run the connectivity check loop (blocking).
Continuously checks internet reachability and updates
``network_queue.queue[1]`` with the current status. Designed
to be run in a background thread.
Args:
network_queue: Shared Queue whose second element is updated
with connectivity status (``True``/``False`` or ``2`` for
hotspot mode).
"""
start_time = time.time()
while 1:
connection = 0
current_time = time.time()
try:
internet_available = self.is_internet()
if current_time - start_time < self.HOSTPOT_TIMER:
connection = self.identifyConnection()
if connection == 2:
network_queue.queue[1] = connection
else:
network_queue.queue[1] = internet_available
if internet_available:
time.sleep(self.success_delay)
time.sleep(self.looptime)
except Exception as e:
context_logger.error_with_context("Network", f"run: {e}")
time.sleep(self.looptime)
[docs]
def putStatus(self, value: dict[str, Any]) -> None:
"""Fetch network status and merge it into the given dictionary.
Args:
value: Dictionary to update with network status fields from
the Network Manager service.
"""
try:
r = requests.get(url=(self.server + self.status_api), timeout=self.timeout)
status = json.loads(json.dumps(r.json()))
value.update(status)
except Exception as err:
context_logger.error_with_context("Network", f"putStatus: {err}")
[docs]
def identifyConnection(self) -> int:
"""Identify the current connection state for LED indicator control.
Queries the Network Manager for interface status and returns an
integer code used to drive the device's RGB LED indicator.
Returns:
``1`` if connected (Ethernet, GSM, or WiFi), ``2`` if no
connection and WiFi not associated (hotspot mode), ``0`` if
status is unavailable.
"""
nw_status = self.getStatus()
# Cyan
if nw_status is not None:
try:
# Cyan - Device is Connected
if nw_status["eth"]["internet"] or nw_status["gsm"]["internet"] or nw_status["wifi"]["internet"]:
return 1
if (not nw_status["eth"]["internet"] and not nw_status["gsm"]["internet"]) and (
not nw_status["wifi"]["wpa_state"] == "COMPLETED"
):
return 2
except Exception as e:
context_logger.error_with_context("Network", f"identifyConnection: {e}")
return 0
[docs]
def getStatus(self) -> dict[str, object] | None:
"""Fetch detailed network interface status from the Network Manager.
Returns:
Dictionary with ``"eth"``, ``"gsm"``, and ``"wifi"`` sub-dicts
containing interface-level status, or ``None`` on request failure.
"""
try:
r = requests.get(url=(self.server + self.status_api), timeout=self.timeout)
return json.loads(json.dumps(r.json()))
except Exception as err:
context_logger.error_with_context("Network", f"getStatus: {err}")
return None
if __name__ == "__main__":
nt = Network()
nt.add_server("manager.oizom.com")
while 1:
time.sleep(1)
context_logger.info_with_context("Network", f"Internet status: {nt.is_internet()}")