"""WebSocket communication wrapper using Socket.IO.
Manages bidirectional real-time communication with the Gateway container,
receiving MQTT data and calibration keys, broadcasting sensor payloads to
downstream queues (Modbus, HMI, Display), and emitting raw device data.
"""
import json
import os
import threading
from queue import Queue
import socketio
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzSocket:
"""Socket.IO client for real-time data exchange with the Gateway.
Connects to the Gateway WebSocket endpoint, receives MQTT sensor data
and calibration commands, fans out payloads to local consumer queues,
and emits raw device data back to the server.
Attributes:
configuration: Socket configuration dict from the Gateway.
web_socket_url: WebSocket endpoint URL.
ignoring_keys: Telemetry keys excluded from negative-value clamping.
calibration_keys: Keys requiring calibration before processing.
sending_queue: List of Queues for broadcasting data to downstream consumers.
receiving_queue: List of Queues for incoming MQTT data.
mqtt_send_queue: Queue for outgoing MQTT data to emit via socket.
socket: Socket.IO client instance with auto-reconnection.
need_data_publish: Threading event controlling data publish gating.
debug_mode: Whether verbose debug logging is enabled.
"""
[docs]
def __init__(self) -> None:
"""Initialize the socket client, configure default endpoints, and set up key filters.
Attributes:
socket (socketio.Client): Socket.IO client configured with reconnection and timeout options.
web_socket_url (str): WebSocket endpoint URL loaded from the WS_URL environment variable or defaulting to http://redis.local:8080.
ignoring_keys (list[str]): Telemetry keys that should be ignored when handling socket messages.
calibration_keys (list[str]): Telemetry keys that require calibration before processing.
sending_queue (list[queue.Queue]): Collection of queues used to broadcast incoming messages across consumers.
"""
# INITIALIZATION: 🌱 Initialize required variables
self.configuration: dict = {}
self.web_socket_url: str = os.getenv("WS_URL", "http://redis.local:8080")
self.ignoring_keys: list[str] = ["temp", "t1", "sg", "lat", "lon"]
self.calibration_keys: list[str] = ["temp", "hum", "t1", "t2"]
self.sending_queue: list[Queue] = []
self.receiving_queue: list[Queue] = []
self.mqtt_send_queue: Queue | None = None
self.socket: socketio.Client = socketio.Client(reconnection=True, reconnection_delay_max=1, request_timeout=1)
self.need_data_publish: threading.Event = threading.Event()
self.debug_mode: bool = False
[docs]
def setup(
self,
socket_config: dict,
sending_queue: list[Queue],
receiving_queue: list[Queue],
mqtt_send_queue: Queue,
need_data_publish: threading.Event,
) -> None:
"""Configure queues, register Socket.IO callbacks, and connect to the server.
Args:
socket_config: Configuration dict with socket settings.
sending_queue: List of Queues to broadcast received data to consumers.
receiving_queue: List of Queues for incoming MQTT data.
mqtt_send_queue: Queue for outgoing data to emit via socket.
need_data_publish: Event flag controlling whether data is published.
"""
self.configuration = socket_config
self.sending_queue = sending_queue
self.receiving_queue = receiving_queue
self.mqtt_send_queue = mqtt_send_queue
self.need_data_publish = need_data_publish
if self.debug_mode:
context_logger.debug_with_context("Socket", f"Web Socket URL: {self.web_socket_url}")
self.socket_callbacks() # register callbacks
self.socket.connect(self.web_socket_url)
[docs]
def loop(self) -> None:
"""Start a background thread to emit queued data and wait on the socket connection."""
import threading
import time
if self.debug_mode:
context_logger.debug_with_context("Socket", "Socket Loop Started")
# Start a background thread to handle queue emission
def emit_from_queue() -> None:
while True:
try:
# Defensive: mqtt_send_queue may be None or not yet assigned
if self.mqtt_send_queue is None:
if self.debug_mode:
context_logger.warning_with_context("Socket", "mqtt_send_queue not assigned yet")
time.sleep(0.1)
continue
try:
data = self.mqtt_send_queue.get(timeout=0.1)
except Exception: # queue.Empty or other
if self.debug_mode:
context_logger.debug_with_context("Socket", "No data in mqtt_send_queue, continuing")
continue
data.update({"deviceId": os.environ.get("DEVICEID", "OZUNKNOWN")})
# emit to remote
try:
self.socket.emit("raw_device_data", json.dumps(data))
except Exception as e:
context_logger.warning_with_context("Socket", f"Socket emit failed: {e}")
if self.debug_mode:
context_logger.debug_with_context("Socket", f"Emitted to raw_device_data: {data}")
data_copy = data.copy()
d_data = {"d": data_copy}
# If publishing is not requested (event not set), forward to local queues
# SECTION_START: 👇 Realtime data send through only MODBUS
if not self.need_data_publish.is_set():
for _queue in self.sending_queue:
with _queue.mutex:
_queue.queue.clear()
_queue.put(d_data)
# SECTION_END: 👆 Realtime data send through only MODBUS
except Exception as e:
context_logger.error_with_context("Socket", f"Error in emit_from_queue: {e}")
# short sleep to avoid tight loop when idle
time.sleep(0.01)
emit_thread = threading.Thread(target=emit_from_queue, daemon=True)
emit_thread.start()
self.socket.wait()
[docs]
def socket_callbacks(self) -> None:
"""Register all Socket.IO event handlers (connect, disconnect, data listeners)."""
@self.socket.event
def connect() -> None:
if self.debug_mode:
context_logger.debug_with_context("Socket", "Socket Connected")
@self.socket.event
def connect_error(conn_err: Exception) -> None:
context_logger.error_with_context("Socket", f"Connection failed: {conn_err}")
@self.socket.on("mqtt_data")
def mqtt_listener(mqtt_data: dict) -> None:
# Fan-out MQTT payloads to any downstream queues registered via
# ``receiving_queue``. Each item in ``receiving_queue`` is expected
# to be a ``Queue`` instance. For vibration data that arrives infrequently
# (every ~10s) but is consumed frequently (every ~10ms), we should NOT
# clear the queue to avoid race conditions and data loss.
for target_queue in self.receiving_queue:
if not hasattr(target_queue, "put"):
context_logger.warning_with_context(
"Socket",
"Skipping mqtt listener target without queue interface",
)
continue
# Just put new data - don't clear existing data
# This prevents race conditions when consumer runs more frequently than producer
try:
target_queue.put_nowait(mqtt_data)
except Exception:
# If queue is full, clear old data and add new data
# This ensures we always have the latest data while preventing memory buildup
with target_queue.mutex:
target_queue.queue.clear()
target_queue.put(mqtt_data)
# SECTION_START: 👇 Only debugging part
if self.debug_mode:
context_logger.debug_with_context("Socket", f"MQTT Data received: {mqtt_data}")
topic: str = mqtt_data.get("topic", "")
# Only process vibration streaming data
if "STREAMING" in topic and mqtt_data.get("payload"):
payload: bytearray = mqtt_data.get("payload")
# NEXT STEP: ➡️ Extract device ID from topic (format: DEVICEID/STREAMING)
device_id = topic.split("/")[0] if "/" in topic else ""
context_logger.debug_with_context(
"Socket",
f"[VIBRATION] Received streaming data from device {device_id}",
)
# SECTION_END: 👆 Only debugging part
@self.socket.on("raw_data")
def calibrationKeys(keys: str) -> None:
context_logger.info_with_context("Socket", f"Calibration Keys: {keys}")
if "g5" in keys and "g3" not in keys:
keys = keys + ",g3"
for key in keys.split(","):
self.calibration_keys.append(key)
@self.socket.on("docs")
@self.socket.on("data_broad")
@self.socket.on("realtime_data_broad")
def data_listener(data: dict) -> None:
if self.debug_mode:
context_logger.debug_with_context("Socket", f"Data received: {data}")
if data.get("d"):
for key in data["d"]:
if key not in self.ignoring_keys:
# Skip status and metadata fields - they're strings by design
if key.endswith(("_status", "_cf")) or key in ["t", "fan"]:
continue
try:
# Only process numeric sensor values
value = float(data["d"][key])
if value < 0:
data["d"][key] = 0
context_logger.debug_with_context(
"Socket",
f"Clamped negative value for '{key}': {value} -> 0",
)
except (ValueError, TypeError) as e:
context_logger.warning_with_context(
"Socket",
f"Non-numeric sensor value for '{key}': {data['d'][key]} - {e}",
)
# Keep original value if conversion fails
# clear queue and put new data
for _queue in self.sending_queue:
with _queue.mutex:
_queue.queue.clear()
_queue.put(data)
@self.socket.event
def disconnect() -> None:
context_logger.warning_with_context("Socket", "Disconnected from server")
[docs]
def run(
self,
socket_config: dict,
sending_queue: list[Queue],
receiving_queue: list[Queue],
mqtt_send_queue: Queue,
need_data_publish: threading.Event,
) -> None:
"""Entry point: set up the socket and start the event loop if enabled.
Args:
socket_config: Configuration dict with 'en' enable flag.
sending_queue: List of Queues for downstream consumers.
receiving_queue: List of Queues for incoming MQTT data.
mqtt_send_queue: Queue for outgoing data emission.
need_data_publish: Event flag for data publish gating.
"""
if socket_config.get("en", 0):
self.setup(
socket_config,
sending_queue,
receiving_queue,
mqtt_send_queue,
need_data_publish,
)
self.loop()
if __name__ == "__main__":
import os
import threading
import time
sending_queue: list[Queue] = []
oz_socket = OzSocket()
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, "socket.config.json")
with open(file_name) as configFile:
config = configFile.read()
config = json.loads(config)
context_logger.info_with_context("Socket", f"Config loaded: {config}")
socket_config = config["socket"]
socket_thread = threading.Thread(target=oz_socket.run, args=(socket_config, sending_queue), daemon=True)
socket_thread.start()
context_logger.info_with_context("Socket", "Socket Thread started")
while 1:
time.sleep(1)
context_logger.info_with_context("Socket", "Main Thread alive")