Source code for drivers.DeviceDetector.DeviceDetector

"""I2C and USB device scanner for identifying connected serial peripherals.

Detects ACM devices (OizomDust, Lasan), GSM modules (Quectel, Telit),
and generic USB-serial adapters (QinHeng HL-340) by matching vendor/product
IDs. Provides stable port mapping based on USB topology so that device
assignments survive reboots and replug events.
"""

import json
import os
import re
import subprocess

import serial.tools.list_ports

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class DeviceDetector: """Scans USB and serial buses to detect and classify connected hardware devices. Maintains VID/PID lookup tables for known ACM, GSM, and USB-serial devices and exposes methods that return structured port information suitable for driver initialization. """
[docs] def __init__(self) -> None: """Initialize device configuration tables for all known peripherals.""" self.acm_device_configs = { "Lasan": { "vid": "0x2341", # Arduino SA VID "pid": "0x004d", # Arduino SA PID }, "OizomDust": { "vid": "0x04d8", # Microchip VID "pid": "0xeb98", # Microchip PID }, } self.usb_device_configs = {"QinHengHL340": {"vid": "0x1a86", "pid": "0x7523"}} self.gsm_device_configs = { "Quectel": { "vid": "0x2c7c", # Quectel VID "pid": "0x0125", # Quectel PID "interfaces": { "00": "diag", # Diagnostic "01": "gps", # GPS/NMEA "02": "modem", # Main AT "03": "ppp", # PPP/Data }, }, "Telit": { "vid": "0x1bc7", # Telit VID "pid": "0x1033", # Telit PID "interfaces": { "00": "aux", # Auxiliary port "01": "gps", # GPS/NMEA port "02": "modem", # Main AT command port }, }, }
# ========================================================================= # ACM Devices # =========================================================================
[docs] def scan_acm_devices(self) -> dict[str, str]: """ Scan for ACM devices (OizomDust, Lasan) and map them to their ports. Returns: { "OizomDust": {"vid": ..., "pid": ..., "port": ...}, ... } """ result = {} try: acm_ports = [port for port in serial.tools.list_ports.comports() if "ACM" in port.device] if acm_ports: for port in acm_ports: try: if port.vid is None or port.pid is None: continue current_vid = f"0x{port.vid:04x}" current_pid = f"0x{port.pid:04x}" if ( current_vid == self.acm_device_configs["OizomDust"]["vid"] and current_pid == self.acm_device_configs["OizomDust"]["pid"] ): device_name = "OizomDust" elif current_vid == self.acm_device_configs["Lasan"]["vid"]: device_name = "Lasan" else: device_name = "Unknown" result[device_name] = { "vid": current_vid, "pid": current_pid, "port": port.device, } except Exception as e: context_logger.error_with_context("LTE_Module", f"processing ACM port {port.device}: {e}") else: context_logger.warning_with_context("LTE_Module", "No ACM ports found!") except Exception as e: context_logger.error_with_context("LTE_Module", f"scan_acm_devices: {e}") return result
# ========================================================================= # GSM Devices (Quectel / Telit) # ========================================================================= def _get_usb_interface_number(self, device_path: str) -> str | None: """ Get USB interface number for a GSM device port. Returns interface as zero-padded string e.g. "00", "01", "02" """ try: # Method 1: Try udevadm result = subprocess.run( ["udevadm", "info", "--name=" + device_path], capture_output=True, text=True, timeout=2, ) for line in result.stdout.split("\n"): if "ID_USB_INTERFACE_NUM" in line: return line.split("=")[-1].strip() # Also check DEVPATH which contains interface number if "DEVPATH=" in line: devpath = line.split("=")[-1].strip() # Extract interface number from path like: .../1-1.3:1.2/ttyUSB0 # The number after the colon and dot is the interface (e.g., "1.2" -> interface 2) import re match = re.search(r":1\.(\d+)/", devpath) if match: return f"{int(match.group(1)):02d}" # Fallback: read from sysfs device_name = os.path.basename(device_path) sysfs_path = f"/sys/class/tty/{device_name}/device/../bInterfaceNumber" if os.path.exists(sysfs_path): with open(sysfs_path) as f: interface_hex = f.read().strip() # Convert hex to decimal and format as 2-digit string interface_num = int(interface_hex, 16) return f"{interface_num:02d}" except Exception as e: context_logger.error_with_context("LTE_Module", f"getting interface for {device_path}: {e}") return None
[docs] def scan_gsm_devices(self) -> dict[str, dict]: """ Scan for GSM devices and map ports by function (gps, modem, etc.) Returns: { "Telit": { "vid": "0x1bc7", "pid": "0x1033", "ports": { "aux": "/dev/ttyUSB0", "gps": "/dev/ttyUSB1", "modem": "/dev/ttyUSB2", } } } """ result = {} try: for port in serial.tools.list_ports.comports(): try: if port.vid is None or port.pid is None: continue current_vid = f"0x{port.vid:04x}" current_pid = f"0x{port.pid:04x}" for device_name, config in self.gsm_device_configs.items(): if current_vid == config["vid"] and current_pid == config["pid"]: # Initialize device entry if not exists if device_name not in result: result[device_name] = { "vid": current_vid, "pid": current_pid, "ports": {}, } interface_num = self._get_usb_interface_number(port.device) if interface_num and interface_num in config["interfaces"]: port_function = config["interfaces"][interface_num] result[device_name]["ports"][port_function] = port.device else: result[device_name]["ports"].setdefault("unknown", []) result[device_name]["ports"]["unknown"].append(port.device) except Exception as e: context_logger.error_with_context("LTE_Module", f"processing GSM port {port.device}: {e}") except Exception as e: context_logger.error_with_context("LTE_Module", f"scan_gsm_devices: {e}") return result
[docs] def get_gsm_gps_port(self, device_name: str = "Quectel") -> str | None: """ Get GPS port for specified GSM device Args: device_name: "Quectel" or "Telit" Returns: GPS port path or None """ devices = self.scan_gsm_devices() return devices.get(device_name, {}).get("ports", {}).get("gps")
[docs] def get_gsm_modem_port(self, device_name: str = "Quectel") -> str | None: """ Get Modem/AT port for specified GSM device Args: device_name: "Quectel" or "Telit" Returns: Modem port path or None """ devices = self.scan_gsm_devices() return devices.get(device_name, {}).get("ports", {}).get("modem")
# ========================================================================= # USB Serial Devices (multi-instance, topology-based) # ========================================================================= def _get_usb_id_path(self, device_path: str) -> str | None: """ Returns the stable ID_PATH suffix for a ttyUSB device by reading udevadm output and stripping the platform prefix. e.g. /dev/ttyUSB3 → "1-1.1.2.2:1.0" This value is tied to the PHYSICAL socket on the hub, so it stays the same across reboots and replug events regardless of ttyUSB number. """ try: result = subprocess.run( ["udevadm", "info", "--query=property", "--name=" + device_path], capture_output=True, text=True, timeout=2, ) for line in result.stdout.split("\n"): if "ID_PATH=" in line: full_path = line.split("=", 1)[-1].strip() # Strip platform prefix, e.g.: # "platform-fe980000.usb-usb-0:1.1.2.2:1.0" → "1-1.1.2.2:1.0" match = re.search(r"usb-0:(.+)$", full_path) if match: return "1-" + match.group(1) # e.g. "1-1.1.2.2:1.0" if full_path.startswith("1-"): return full_path except Exception as e: context_logger.error_with_context("DeviceDetector", f"_get_usb_id_path({device_path}): {e}") return None
[docs] def scan_usb_devices(self) -> dict: """ Scan for USB serial devices (QinHeng HL-340 etc.) and assign logical names dynamically from ttyUSB indices when available. Returns: { "QinHengHL340_1": {"vid": "0x1a86", "pid": "0x7523", "port": "/dev/ttyUSB3", "path": "1-1.1.2.2:1.0"}, "QinHengHL340_2": {...}, ... "Unmapped_1-1.1.2.9:1.0": {...}, # if tty index is unavailable } """ result = {} try: usb_ports = [port for port in serial.tools.list_ports.comports() if "USB" in port.device] if not usb_ports: context_logger.info_with_context("DeviceDetector", "No ttyUSB ports found!") return result for port in usb_ports: try: if port.vid is None or port.pid is None: continue current_vid = f"0x{port.vid:04x}" current_pid = f"0x{port.pid:04x}" # Check against known USB serial configs matched_config = None for config_name, config in self.usb_device_configs.items(): if current_vid == config["vid"] and current_pid == config["pid"]: matched_config = config_name break if matched_config is None: continue id_path = self._get_usb_id_path(port.device) if id_path is None and getattr(port, "location", None): # pyserial location often contains USB topology such as # "1-1.1.2.2"; append interface suffix for path context. location_path = str(port.location) id_path = location_path if ":" in location_path else f"{location_path}:1.0" logical_name = None # Dynamic naming based on ttyUSB index. if matched_config == "QinHengHL340": tty_match = re.search(r"/dev/ttyUSB(\d+)$", port.device) if tty_match: logical_name = f"QinHengHL340_{int(tty_match.group(1))}" if logical_name is None: # Device connected to an unmapped socket — still report it logical_name = f"Unmapped_{id_path or port.device}" result[logical_name] = { "vid": current_vid, "pid": current_pid, "port": port.device, "path": id_path or getattr(port, "location", None), } except Exception as e: context_logger.error_with_context( "DeviceDetector", f"Error processing USB port {port.device}: {e}", ) except Exception as e: context_logger.error_with_context("DeviceDetector", f"Error in scan_usb_devices: {e}") return result
# ============================================================================= # Test / diagnostics # ============================================================================= if __name__ == "__main__": manager = DeviceDetector() # Test ACM devices result = manager.scan_acm_devices() context_logger.debug_with_context("DeviceDetector", f"ACM ports: {json.dumps(result, indent=4)}") # Test GSM devices with port mapping gsm_result = manager.scan_gsm_devices() context_logger.debug_with_context("DeviceDetector", f"GSM ports: {json.dumps(gsm_result, indent=4)}") # Test helper methods for easy port access print("\n" + "=" * 60) print("GSM Port Detection Results") print("=" * 60) for device in ["Quectel", "Telit"]: gps_port = manager.get_gsm_gps_port(device) modem_port = manager.get_gsm_modem_port(device) if gps_port or modem_port: print(f"\n{device}:") if gps_port: print(f" GPS Port : {gps_port}") if modem_port: print(f" Modem Port: {modem_port}") print("=" * 60)