"""I2C and USB device scanner for identifying connected serial peripherals.
Detects ACM devices (OizomDust, Lasan), GSM modules (Quectel, Telit),
and generic USB-serial adapters (QinHeng HL-340) by matching vendor/product
IDs. Provides stable port mapping based on USB topology so that device
assignments survive reboots and replug events.
"""
import json
import os
import re
import subprocess
import serial.tools.list_ports
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class DeviceDetector:
"""Scans USB and serial buses to detect and classify connected hardware devices.
Maintains VID/PID lookup tables for known ACM, GSM, and USB-serial devices
and exposes methods that return structured port information suitable for
driver initialization.
"""
[docs]
def __init__(self) -> None:
"""Initialize device configuration tables for all known peripherals."""
self.acm_device_configs = {
"Lasan": {
"vid": "0x2341", # Arduino SA VID
"pid": "0x004d", # Arduino SA PID
},
"OizomDust": {
"vid": "0x04d8", # Microchip VID
"pid": "0xeb98", # Microchip PID
},
}
self.usb_device_configs = {"QinHengHL340": {"vid": "0x1a86", "pid": "0x7523"}}
self.gsm_device_configs = {
"Quectel": {
"vid": "0x2c7c", # Quectel VID
"pid": "0x0125", # Quectel PID
"interfaces": {
"00": "diag", # Diagnostic
"01": "gps", # GPS/NMEA
"02": "modem", # Main AT
"03": "ppp", # PPP/Data
},
},
"Telit": {
"vid": "0x1bc7", # Telit VID
"pid": "0x1033", # Telit PID
"interfaces": {
"00": "aux", # Auxiliary port
"01": "gps", # GPS/NMEA port
"02": "modem", # Main AT command port
},
},
}
# =========================================================================
# ACM Devices
# =========================================================================
[docs]
def scan_acm_devices(self) -> dict[str, str]:
"""
Scan for ACM devices (OizomDust, Lasan) and map them to their ports.
Returns: { "OizomDust": {"vid": ..., "pid": ..., "port": ...}, ... }
"""
result = {}
try:
acm_ports = [port for port in serial.tools.list_ports.comports() if "ACM" in port.device]
if acm_ports:
for port in acm_ports:
try:
if port.vid is None or port.pid is None:
continue
current_vid = f"0x{port.vid:04x}"
current_pid = f"0x{port.pid:04x}"
if (
current_vid == self.acm_device_configs["OizomDust"]["vid"]
and current_pid == self.acm_device_configs["OizomDust"]["pid"]
):
device_name = "OizomDust"
elif current_vid == self.acm_device_configs["Lasan"]["vid"]:
device_name = "Lasan"
else:
device_name = "Unknown"
result[device_name] = {
"vid": current_vid,
"pid": current_pid,
"port": port.device,
}
except Exception as e:
context_logger.error_with_context("LTE_Module", f"processing ACM port {port.device}: {e}")
else:
context_logger.warning_with_context("LTE_Module", "No ACM ports found!")
except Exception as e:
context_logger.error_with_context("LTE_Module", f"scan_acm_devices: {e}")
return result
# =========================================================================
# GSM Devices (Quectel / Telit)
# =========================================================================
def _get_usb_interface_number(self, device_path: str) -> str | None:
"""
Get USB interface number for a GSM device port.
Returns interface as zero-padded string e.g. "00", "01", "02"
"""
try:
# Method 1: Try udevadm
result = subprocess.run(
["udevadm", "info", "--name=" + device_path],
capture_output=True,
text=True,
timeout=2,
)
for line in result.stdout.split("\n"):
if "ID_USB_INTERFACE_NUM" in line:
return line.split("=")[-1].strip()
# Also check DEVPATH which contains interface number
if "DEVPATH=" in line:
devpath = line.split("=")[-1].strip()
# Extract interface number from path like: .../1-1.3:1.2/ttyUSB0
# The number after the colon and dot is the interface (e.g., "1.2" -> interface 2)
import re
match = re.search(r":1\.(\d+)/", devpath)
if match:
return f"{int(match.group(1)):02d}"
# Fallback: read from sysfs
device_name = os.path.basename(device_path)
sysfs_path = f"/sys/class/tty/{device_name}/device/../bInterfaceNumber"
if os.path.exists(sysfs_path):
with open(sysfs_path) as f:
interface_hex = f.read().strip()
# Convert hex to decimal and format as 2-digit string
interface_num = int(interface_hex, 16)
return f"{interface_num:02d}"
except Exception as e:
context_logger.error_with_context("LTE_Module", f"getting interface for {device_path}: {e}")
return None
[docs]
def scan_gsm_devices(self) -> dict[str, dict]:
"""
Scan for GSM devices and map ports by function (gps, modem, etc.)
Returns:
{
"Telit": {
"vid": "0x1bc7",
"pid": "0x1033",
"ports": {
"aux": "/dev/ttyUSB0",
"gps": "/dev/ttyUSB1",
"modem": "/dev/ttyUSB2",
}
}
}
"""
result = {}
try:
for port in serial.tools.list_ports.comports():
try:
if port.vid is None or port.pid is None:
continue
current_vid = f"0x{port.vid:04x}"
current_pid = f"0x{port.pid:04x}"
for device_name, config in self.gsm_device_configs.items():
if current_vid == config["vid"] and current_pid == config["pid"]:
# Initialize device entry if not exists
if device_name not in result:
result[device_name] = {
"vid": current_vid,
"pid": current_pid,
"ports": {},
}
interface_num = self._get_usb_interface_number(port.device)
if interface_num and interface_num in config["interfaces"]:
port_function = config["interfaces"][interface_num]
result[device_name]["ports"][port_function] = port.device
else:
result[device_name]["ports"].setdefault("unknown", [])
result[device_name]["ports"]["unknown"].append(port.device)
except Exception as e:
context_logger.error_with_context("LTE_Module", f"processing GSM port {port.device}: {e}")
except Exception as e:
context_logger.error_with_context("LTE_Module", f"scan_gsm_devices: {e}")
return result
[docs]
def get_gsm_gps_port(self, device_name: str = "Quectel") -> str | None:
"""
Get GPS port for specified GSM device
Args:
device_name: "Quectel" or "Telit"
Returns:
GPS port path or None
"""
devices = self.scan_gsm_devices()
return devices.get(device_name, {}).get("ports", {}).get("gps")
[docs]
def get_gsm_modem_port(self, device_name: str = "Quectel") -> str | None:
"""
Get Modem/AT port for specified GSM device
Args:
device_name: "Quectel" or "Telit"
Returns:
Modem port path or None
"""
devices = self.scan_gsm_devices()
return devices.get(device_name, {}).get("ports", {}).get("modem")
# =========================================================================
# USB Serial Devices (multi-instance, topology-based)
# =========================================================================
def _get_usb_id_path(self, device_path: str) -> str | None:
"""
Returns the stable ID_PATH suffix for a ttyUSB device by reading
udevadm output and stripping the platform prefix.
e.g. /dev/ttyUSB3 → "1-1.1.2.2:1.0"
This value is tied to the PHYSICAL socket on the hub, so it stays
the same across reboots and replug events regardless of ttyUSB number.
"""
try:
result = subprocess.run(
["udevadm", "info", "--query=property", "--name=" + device_path],
capture_output=True,
text=True,
timeout=2,
)
for line in result.stdout.split("\n"):
if "ID_PATH=" in line:
full_path = line.split("=", 1)[-1].strip()
# Strip platform prefix, e.g.:
# "platform-fe980000.usb-usb-0:1.1.2.2:1.0" → "1-1.1.2.2:1.0"
match = re.search(r"usb-0:(.+)$", full_path)
if match:
return "1-" + match.group(1) # e.g. "1-1.1.2.2:1.0"
if full_path.startswith("1-"):
return full_path
except Exception as e:
context_logger.error_with_context("DeviceDetector", f"_get_usb_id_path({device_path}): {e}")
return None
[docs]
def scan_usb_devices(self) -> dict:
"""
Scan for USB serial devices (QinHeng HL-340 etc.) and assign logical
names dynamically from ttyUSB indices when available.
Returns:
{
"QinHengHL340_1": {"vid": "0x1a86", "pid": "0x7523", "port": "/dev/ttyUSB3", "path": "1-1.1.2.2:1.0"},
"QinHengHL340_2": {...},
...
"Unmapped_1-1.1.2.9:1.0": {...}, # if tty index is unavailable
}
"""
result = {}
try:
usb_ports = [port for port in serial.tools.list_ports.comports() if "USB" in port.device]
if not usb_ports:
context_logger.info_with_context("DeviceDetector", "No ttyUSB ports found!")
return result
for port in usb_ports:
try:
if port.vid is None or port.pid is None:
continue
current_vid = f"0x{port.vid:04x}"
current_pid = f"0x{port.pid:04x}"
# Check against known USB serial configs
matched_config = None
for config_name, config in self.usb_device_configs.items():
if current_vid == config["vid"] and current_pid == config["pid"]:
matched_config = config_name
break
if matched_config is None:
continue
id_path = self._get_usb_id_path(port.device)
if id_path is None and getattr(port, "location", None):
# pyserial location often contains USB topology such as
# "1-1.1.2.2"; append interface suffix for path context.
location_path = str(port.location)
id_path = location_path if ":" in location_path else f"{location_path}:1.0"
logical_name = None
# Dynamic naming based on ttyUSB index.
if matched_config == "QinHengHL340":
tty_match = re.search(r"/dev/ttyUSB(\d+)$", port.device)
if tty_match:
logical_name = f"QinHengHL340_{int(tty_match.group(1))}"
if logical_name is None:
# Device connected to an unmapped socket — still report it
logical_name = f"Unmapped_{id_path or port.device}"
result[logical_name] = {
"vid": current_vid,
"pid": current_pid,
"port": port.device,
"path": id_path or getattr(port, "location", None),
}
except Exception as e:
context_logger.error_with_context(
"DeviceDetector",
f"Error processing USB port {port.device}: {e}",
)
except Exception as e:
context_logger.error_with_context("DeviceDetector", f"Error in scan_usb_devices: {e}")
return result
# =============================================================================
# Test / diagnostics
# =============================================================================
if __name__ == "__main__":
manager = DeviceDetector()
# Test ACM devices
result = manager.scan_acm_devices()
context_logger.debug_with_context("DeviceDetector", f"ACM ports: {json.dumps(result, indent=4)}")
# Test GSM devices with port mapping
gsm_result = manager.scan_gsm_devices()
context_logger.debug_with_context("DeviceDetector", f"GSM ports: {json.dumps(gsm_result, indent=4)}")
# Test helper methods for easy port access
print("\n" + "=" * 60)
print("GSM Port Detection Results")
print("=" * 60)
for device in ["Quectel", "Telit"]:
gps_port = manager.get_gsm_gps_port(device)
modem_port = manager.get_gsm_modem_port(device)
if gps_port or modem_port:
print(f"\n{device}:")
if gps_port:
print(f" GPS Port : {gps_port}")
if modem_port:
print(f" Modem Port: {modem_port}")
print("=" * 60)