Source code for OzWrapper.OzSocket.OzSocket

"""WebSocket communication wrapper using Socket.IO.

Manages bidirectional real-time communication with the Gateway container,
receiving MQTT data and calibration keys, broadcasting sensor payloads to
downstream queues (Modbus, HMI, Display), and emitting raw device data.
"""

import json
import os
import threading
from queue import Queue

import socketio

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzSocket: """Socket.IO client for real-time data exchange with the Gateway. Connects to the Gateway WebSocket endpoint, receives MQTT sensor data and calibration commands, fans out payloads to local consumer queues, and emits raw device data back to the server. Attributes: configuration: Socket configuration dict from the Gateway. web_socket_url: WebSocket endpoint URL. ignoring_keys: Telemetry keys excluded from negative-value clamping. calibration_keys: Keys requiring calibration before processing. sending_queue: List of Queues for broadcasting data to downstream consumers. receiving_queue: List of Queues for incoming MQTT data. mqtt_send_queue: Queue for outgoing MQTT data to emit via socket. socket: Socket.IO client instance with auto-reconnection. need_data_publish: Threading event controlling data publish gating. debug_mode: Whether verbose debug logging is enabled. """
[docs] def __init__(self) -> None: """Initialize the socket client, configure default endpoints, and set up key filters. Attributes: socket (socketio.Client): Socket.IO client configured with reconnection and timeout options. web_socket_url (str): WebSocket endpoint URL loaded from the WS_URL environment variable or defaulting to http://redis.local:8080. ignoring_keys (list[str]): Telemetry keys that should be ignored when handling socket messages. calibration_keys (list[str]): Telemetry keys that require calibration before processing. sending_queue (list[queue.Queue]): Collection of queues used to broadcast incoming messages across consumers. """ # INITIALIZATION: 🌱 Initialize required variables self.configuration: dict = {} self.web_socket_url: str = os.getenv("WS_URL", "http://redis.local:8080") self.ignoring_keys: list[str] = ["temp", "t1", "sg", "lat", "lon"] self.calibration_keys: list[str] = ["temp", "hum", "t1", "t2"] self.sending_queue: list[Queue] = [] self.receiving_queue: list[Queue] = [] self.mqtt_send_queue: Queue | None = None self.socket: socketio.Client = socketio.Client(reconnection=True, reconnection_delay_max=1, request_timeout=1) self.need_data_publish: threading.Event = threading.Event() self.debug_mode: bool = False
[docs] def setup( self, socket_config: dict, sending_queue: list[Queue], receiving_queue: list[Queue], mqtt_send_queue: Queue, need_data_publish: threading.Event, ) -> None: """Configure queues, register Socket.IO callbacks, and connect to the server. Args: socket_config: Configuration dict with socket settings. sending_queue: List of Queues to broadcast received data to consumers. receiving_queue: List of Queues for incoming MQTT data. mqtt_send_queue: Queue for outgoing data to emit via socket. need_data_publish: Event flag controlling whether data is published. """ self.configuration = socket_config self.sending_queue = sending_queue self.receiving_queue = receiving_queue self.mqtt_send_queue = mqtt_send_queue self.need_data_publish = need_data_publish if self.debug_mode: context_logger.debug_with_context("Socket", f"Web Socket URL: {self.web_socket_url}") self.socket_callbacks() # register callbacks self.socket.connect(self.web_socket_url)
[docs] def loop(self) -> None: """Start a background thread to emit queued data and wait on the socket connection.""" import threading import time if self.debug_mode: context_logger.debug_with_context("Socket", "Socket Loop Started") # Start a background thread to handle queue emission def emit_from_queue() -> None: while True: try: # Defensive: mqtt_send_queue may be None or not yet assigned if self.mqtt_send_queue is None: if self.debug_mode: context_logger.warning_with_context("Socket", "mqtt_send_queue not assigned yet") time.sleep(0.1) continue try: data = self.mqtt_send_queue.get(timeout=0.1) except Exception: # queue.Empty or other if self.debug_mode: context_logger.debug_with_context("Socket", "No data in mqtt_send_queue, continuing") continue data.update({"deviceId": os.environ.get("DEVICEID", "OZUNKNOWN")}) # emit to remote try: self.socket.emit("raw_device_data", json.dumps(data)) except Exception as e: context_logger.warning_with_context("Socket", f"Socket emit failed: {e}") if self.debug_mode: context_logger.debug_with_context("Socket", f"Emitted to raw_device_data: {data}") data_copy = data.copy() d_data = {"d": data_copy} # If publishing is not requested (event not set), forward to local queues # SECTION_START: 👇 Realtime data send through only MODBUS if not self.need_data_publish.is_set(): for _queue in self.sending_queue: with _queue.mutex: _queue.queue.clear() _queue.put(d_data) # SECTION_END: 👆 Realtime data send through only MODBUS except Exception as e: context_logger.error_with_context("Socket", f"Error in emit_from_queue: {e}") # short sleep to avoid tight loop when idle time.sleep(0.01) emit_thread = threading.Thread(target=emit_from_queue, daemon=True) emit_thread.start() self.socket.wait()
[docs] def socket_callbacks(self) -> None: """Register all Socket.IO event handlers (connect, disconnect, data listeners).""" @self.socket.event def connect() -> None: if self.debug_mode: context_logger.debug_with_context("Socket", "Socket Connected") @self.socket.event def connect_error(conn_err: Exception) -> None: context_logger.error_with_context("Socket", f"Connection failed: {conn_err}") @self.socket.on("mqtt_data") def mqtt_listener(mqtt_data: dict) -> None: # Fan-out MQTT payloads to any downstream queues registered via # ``receiving_queue``. Each item in ``receiving_queue`` is expected # to be a ``Queue`` instance. For vibration data that arrives infrequently # (every ~10s) but is consumed frequently (every ~10ms), we should NOT # clear the queue to avoid race conditions and data loss. for target_queue in self.receiving_queue: if not hasattr(target_queue, "put"): context_logger.warning_with_context( "Socket", "Skipping mqtt listener target without queue interface", ) continue # Just put new data - don't clear existing data # This prevents race conditions when consumer runs more frequently than producer try: target_queue.put_nowait(mqtt_data) except Exception: # If queue is full, clear old data and add new data # This ensures we always have the latest data while preventing memory buildup with target_queue.mutex: target_queue.queue.clear() target_queue.put(mqtt_data) # SECTION_START: 👇 Only debugging part if self.debug_mode: context_logger.debug_with_context("Socket", f"MQTT Data received: {mqtt_data}") topic: str = mqtt_data.get("topic", "") # Only process vibration streaming data if "STREAMING" in topic and mqtt_data.get("payload"): payload: bytearray = mqtt_data.get("payload") # NEXT STEP: ➡️ Extract device ID from topic (format: DEVICEID/STREAMING) device_id = topic.split("/")[0] if "/" in topic else "" context_logger.debug_with_context( "Socket", f"[VIBRATION] Received streaming data from device {device_id}", ) # SECTION_END: 👆 Only debugging part @self.socket.on("raw_data") def calibrationKeys(keys: str) -> None: context_logger.info_with_context("Socket", f"Calibration Keys: {keys}") if "g5" in keys and "g3" not in keys: keys = keys + ",g3" for key in keys.split(","): self.calibration_keys.append(key) @self.socket.on("docs") @self.socket.on("data_broad") @self.socket.on("realtime_data_broad") def data_listener(data: dict) -> None: if self.debug_mode: context_logger.debug_with_context("Socket", f"Data received: {data}") if data.get("d"): for key in data["d"]: if key not in self.ignoring_keys: # Skip status and metadata fields - they're strings by design if key.endswith(("_status", "_cf")) or key in ["t", "fan"]: continue try: # Only process numeric sensor values value = float(data["d"][key]) if value < 0: data["d"][key] = 0 context_logger.debug_with_context( "Socket", f"Clamped negative value for '{key}': {value} -> 0", ) except (ValueError, TypeError) as e: context_logger.warning_with_context( "Socket", f"Non-numeric sensor value for '{key}': {data['d'][key]} - {e}", ) # Keep original value if conversion fails # clear queue and put new data for _queue in self.sending_queue: with _queue.mutex: _queue.queue.clear() _queue.put(data) @self.socket.event def disconnect() -> None: context_logger.warning_with_context("Socket", "Disconnected from server")
[docs] def run( self, socket_config: dict, sending_queue: list[Queue], receiving_queue: list[Queue], mqtt_send_queue: Queue, need_data_publish: threading.Event, ) -> None: """Entry point: set up the socket and start the event loop if enabled. Args: socket_config: Configuration dict with 'en' enable flag. sending_queue: List of Queues for downstream consumers. receiving_queue: List of Queues for incoming MQTT data. mqtt_send_queue: Queue for outgoing data emission. need_data_publish: Event flag for data publish gating. """ if socket_config.get("en", 0): self.setup( socket_config, sending_queue, receiving_queue, mqtt_send_queue, need_data_publish, ) self.loop()
if __name__ == "__main__": import os import threading import time sending_queue: list[Queue] = [] oz_socket = OzSocket() dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, "socket.config.json") with open(file_name) as configFile: config = configFile.read() config = json.loads(config) context_logger.info_with_context("Socket", f"Config loaded: {config}") socket_config = config["socket"] socket_thread = threading.Thread(target=oz_socket.run, args=(socket_config, sending_queue), daemon=True) socket_thread.start() context_logger.info_with_context("Socket", "Socket Thread started") while 1: time.sleep(1) context_logger.info_with_context("Socket", "Main Thread alive")