Source code for OzWrapper.OzGPS.OzGPS

"""GPS location wrapper for Quectel modem GNSS receivers.

Reads latitude, longitude, and satellite count from a Quectel modem's GPS
subsystem, and optionally synchronizes the device RTC with GPS time.
"""

import json
import os
from typing import ClassVar

from drivers.Quectel.Quectel import Gps as QuectelGPS
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzGPSParam: """Container for a single GPS module's configuration and driver reference. Attributes: partNo: Part number identifying the GPS module type. configuration_port: Serial port for modem AT commands. gps_port: Serial port for NMEA GPS data. baud: Baud rate for the GPS serial connection. gps_driver: QuectelGPS driver instance, or None before initialization. old_data: Cache of previous GPS readings for fallback. """ partNo = 0 configuration_port = "/dev/ttyUSB2" gps_port = "/dev/ttyUSB1" baud = 115200 gps_driver = None # OGS Driver Object
[docs] def __init__(self, partNo: int, gps_port: str, configuration_port: str, baud: int) -> None: """Initialize GPS parameter container. Args: partNo: Part number for this GPS module. gps_port: Serial port path for NMEA data. configuration_port: Serial port path for AT commands. baud: Baud rate for GPS serial communication. """ self.partNo = partNo self.gps_port = gps_port self.configuration_port = configuration_port self.baud = baud self.old_data = {}
[docs] def setDriver(self, gps_driver: QuectelGPS) -> None: """Assign the QuectelGPS driver instance. Args: gps_driver: Initialized QuectelGPS driver object. """ self.gps_driver = gps_driver
[docs] class OzGPS: """GPS location wrapper managing Quectel GNSS modules. Reads latitude, longitude, and satellite count from GPS NMEA data, accumulates time-series values, and optionally syncs the hardware RTC. Attributes: gps_poll: Whether to actively poll GPS on each reading cycle. lat: Last known latitude in decimal degrees. lon: Last known longitude in decimal degrees. put_sensor: Whether GPS data should be included in published payloads. xone: xOne assisted-GPS enable flag. xpath: URL path for xTRA ephemeris data download. xname: Filename of the xTRA ephemeris binary. v_gps: Accumulated GPS reading values per sensor/parameter. configuration: GPS sensor configuration list from the Gateway. s_gps: List of OzGPSParam instances (one per configured GPS module). """ gps_poll = True lat = 0.0 lon = 0.0 put_sensor = 0 xone = 0 xpath = "http://xtrapath6.izatcloud.net/" xname = "xtra3grc.bin" v_gps: ClassVar[list] = []
[docs] def __init__(self) -> None: """Initialize OzGPS with empty configuration and sensor lists.""" super().__init__() self.configuration = [] self.s_gps = []
[docs] def initialize(self, config: dict) -> bool: """Initialize all configured GPS modules. Args: config: List of GPS sensor configuration dicts from the Gateway. Returns: True if initialization succeeded, False on error. """ self.configuration = config context_logger.info_with_context("GPS", f"Configuration: {self.configuration}") try: for sensor in self.configuration: if "pn" in sensor: sensor["init"] = self.initializeSensor(sensor) return True # Catch Sensor error from Exception except Exception as e: context_logger.error_with_context("GPS", f"Failed to initialize GPS: {e}") return False
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialize a single GPS module and take an initial reading. Args: sensor: Configuration dict with part number, port settings, and parameters. Returns: 1 on success, 0 on failure or if the sensor is disabled. """ _success = False _s_gps = None value = [] if sensor["pn"] == 30 and sensor["en"] == 1: gps_port = "/dev/ttyUSB1" configuration_port = "/dev/ttyUSB2" baud = 115200 if "poll" in sensor: self.put_sensor = sensor["poll"] if "gpio" in sensor: gpioConfig = sensor["gpio"] if "gpsport" in gpioConfig: gps_port = gpioConfig["gpsport"] if "configport" in gpioConfig: configuration_port = gpioConfig["configport"] if "baud" in gpioConfig: baud = gpioConfig["baud"] if "xone" in sensor: self.xone = sensor["xone"] if "xpath" in sensor: self.xpath = sensor["xpath"] if "xname" in sensor: self.xname = sensor["xname"] try: _s_gps = OzGPSParam(sensor["pn"], gps_port, configuration_port, baud) _s_gps.setDriver(QuectelGPS(gps_port, configuration_port, baud)) _s_gps.gps_driver.initialize(self.xone, self.xpath, self.xname) parameters = sensor["parameters"] for param in parameters: _val = {"value": [], "oldvalue": 0, "count": 0} value.append(_val) _success = True except Exception as e: _success = False context_logger.error_with_context("GPS", f"Failed to initialize sensor: {e}") else: context_logger.warning_with_context("GPS", f"Disabled: {sensor['pn']}") _success = False self.s_gps.append(_s_gps) self.v_gps.append(value) return int(_success)
[docs] def getSensorReading(self) -> dict: """Poll all GPS modules and accumulate lat/lon/satellite readings. Returns: Dict mapping send-codes to their latest GPS values. """ data = {} if self.gps_poll or self.put_sensor == 1: for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: context_logger.info_with_context("GPS", f"PART: {sensor['pn']}") try: response = self.s_gps[X].gps_driver.getSensorData() context_logger.info_with_context("GPS", f"{response}") for Y, parameters in enumerate(sensor["parameters"]): try: value = self._putvalue(parameters["pm"], response) if value != 0: self.v_gps[X][Y]["oldvalue"] = value self.v_gps[X][Y]["value"].append(self.v_gps[X][Y]["oldvalue"]) self.v_gps[X][Y]["count"] += 1 data[parameters["sc"]] = self.v_gps[X][Y]["oldvalue"] except Exception as e: context_logger.error_with_context("GPS", f"Failed to get sensor reading: {e}") except Exception as e: context_logger.error_with_context("GPS", f"Failed to getSensorReading: {e}") context_logger.info_with_context("GPS", f"Sending realtime data: {data}") return data
[docs] def putSensorValue(self, value: dict) -> dict: """Transfer accumulated GPS readings into the output payload and reset. Args: value: Shared output dict to populate with GPS time-series data. Returns: The updated output dict with GPS data added. """ if self.put_sensor == 1: for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: try: for Y, parameters in enumerate(sensor["parameters"]): try: value[parameters["sc"]] = self.v_gps[X][Y]["value"] self.v_gps[X][Y]["value"] = [] self.v_gps[X][Y]["count"] = 0 except Exception as e: context_logger.error_with_context( "GPS", f"putSensorValue failed at [{X}][{Y}] - {parameters['sc']}: {e}", ) except Exception as e: context_logger.error_with_context("GPS", f"Failed to putSensorValue: {e}") return value
def _putvalue(self, pm: int, response: dict) -> float | int | None: """Extract a specific GPS parameter from the NMEA response. Args: pm: Parameter ID (1 = latitude, 2 = longitude, 3 = satellite count). response: Parsed NMEA response dict from the GPS driver. Returns: The extracted value, or 0.0/0 if not available. """ gps_data = response["data"] if pm == 1: if "GPRMC" in gps_data: lat = gps_data["GPRMC"]["lat"] if lat != 0: self.lat = lat return self.lat return 0.0 if pm == 2: if "GPRMC" in gps_data: lon = gps_data["GPRMC"]["lon"] if lon != 0: self.lon = lon return self.lon return 0.0 if pm == 3: if "GPGGA" in gps_data: return gps_data["GPGGA"]["num_sats"] return 0 return 0.0
[docs] def putInitvalues(self, value: dict) -> None: """Add GPS location to the init payload and sync the RTC from GPS time. Args: value: Shared init payload dict to update with location coordinates. """ self.monitor_gps_datetime() value.update({"loc": [self.lat, self.lon]})
[docs] def poll_loc(self) -> bool: """Check whether a valid GPS fix has been obtained. Returns: True if a non-zero latitude is available, False otherwise. """ return False if self.lat == 0.0 else True
[docs] def monitor_gps_datetime(self) -> None: """Compare GPS time with the hardware RTC and update RTC if drift exceeds 10 seconds.""" import subprocess from datetime import datetime context_logger.info_with_context("GPS", "Checking GPS time monitor…") # ── single-pass, no infinite loop ── for gps_obj in self.s_gps: if gps_obj and gps_obj.gps_driver: try: response = gps_obj.gps_driver.getSensorData() gps_data = response.get("data", {}) gps_time_str = gps_data.get("GPRMC", {}).get("time", "") gps_date_str = gps_data.get("GPRMC", {}).get("date", "") # Removed: num_sats = gps_data.get("GPGGA", {}).get("num_sats", 0) if gps_time_str and gps_date_str: # Removed: elapsed = time.time() - start_time context_logger.info_with_context("GPS", f"Date: {gps_date_str}, Time: {gps_time_str}") # ── RTC sync check ── try: dt_obj = datetime.strptime(f"{gps_date_str} {gps_time_str}", "%d/%m/%y %H:%M:%S") gps_epoch = int(dt_obj.timestamp()) rtc_out = subprocess.check_output(["hwclock", "--show"], text=True).strip() rtc_dt = datetime.strptime(rtc_out.split(".")[0], "%Y-%m-%d %H:%M:%S") rtc_epoch = int(rtc_dt.timestamp()) diff = abs(gps_epoch - rtc_epoch) context_logger.info_with_context( "GPS", f"GPS epoch: {gps_epoch}, RTC epoch: {rtc_epoch}, Diff: {diff}s", ) if diff > 10: formatted_time = dt_obj.strftime("%Y-%m-%d %H:%M:%S") if subprocess.call(["hwclock", "--set", f"--date={formatted_time}"]) == 0: context_logger.info_with_context("GPS", f"RTC updated to: {formatted_time}") else: context_logger.error_with_context("GPS", "Failed to update RTC") else: context_logger.info_with_context("GPS", "Time difference < 10s. No update needed.") except Exception as e: context_logger.error_with_context("GPS", f"RTC update failed: {e}") else: context_logger.info_with_context("GPS", "GPS fix not ready yet. May be NO Satellite") # Removed: print(f"Satellites: {num_sats}") except Exception as e: context_logger.error_with_context("GPS", f"monitor_gps_datetime: {e}")
# Example code # python3 -m OzWrapper.OzGPS.OzGPS if __name__ == "__main__": import os dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, "gps.config.json") with open(file_name) as config_file: json_content = config_file.read() sensorConfig = json.loads(json_content) context_logger.info_with_context("GPS", f"Config: {sensorConfig}") gps = OzGPS() gps.initialize(sensorConfig["gps"]) xOne = True if xOne: xOne = gps.getXOne() for _ in range(0, 4): gps.getSensorReading() data = {} context_logger.info_with_context("GPS", f"GPS {gps.putSensorValue(data)}")