"""GPS location wrapper for Quectel modem GNSS receivers.
Reads latitude, longitude, and satellite count from a Quectel modem's GPS
subsystem, and optionally synchronizes the device RTC with GPS time.
"""
import json
import os
from typing import ClassVar
from drivers.Quectel.Quectel import Gps as QuectelGPS
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzGPSParam:
"""Container for a single GPS module's configuration and driver reference.
Attributes:
partNo: Part number identifying the GPS module type.
configuration_port: Serial port for modem AT commands.
gps_port: Serial port for NMEA GPS data.
baud: Baud rate for the GPS serial connection.
gps_driver: QuectelGPS driver instance, or None before initialization.
old_data: Cache of previous GPS readings for fallback.
"""
partNo = 0
configuration_port = "/dev/ttyUSB2"
gps_port = "/dev/ttyUSB1"
baud = 115200
gps_driver = None # OGS Driver Object
[docs]
def __init__(self, partNo: int, gps_port: str, configuration_port: str, baud: int) -> None:
"""Initialize GPS parameter container.
Args:
partNo: Part number for this GPS module.
gps_port: Serial port path for NMEA data.
configuration_port: Serial port path for AT commands.
baud: Baud rate for GPS serial communication.
"""
self.partNo = partNo
self.gps_port = gps_port
self.configuration_port = configuration_port
self.baud = baud
self.old_data = {}
[docs]
def setDriver(self, gps_driver: QuectelGPS) -> None:
"""Assign the QuectelGPS driver instance.
Args:
gps_driver: Initialized QuectelGPS driver object.
"""
self.gps_driver = gps_driver
[docs]
class OzGPS:
"""GPS location wrapper managing Quectel GNSS modules.
Reads latitude, longitude, and satellite count from GPS NMEA data,
accumulates time-series values, and optionally syncs the hardware RTC.
Attributes:
gps_poll: Whether to actively poll GPS on each reading cycle.
lat: Last known latitude in decimal degrees.
lon: Last known longitude in decimal degrees.
put_sensor: Whether GPS data should be included in published payloads.
xone: xOne assisted-GPS enable flag.
xpath: URL path for xTRA ephemeris data download.
xname: Filename of the xTRA ephemeris binary.
v_gps: Accumulated GPS reading values per sensor/parameter.
configuration: GPS sensor configuration list from the Gateway.
s_gps: List of OzGPSParam instances (one per configured GPS module).
"""
gps_poll = True
lat = 0.0
lon = 0.0
put_sensor = 0
xone = 0
xpath = "http://xtrapath6.izatcloud.net/"
xname = "xtra3grc.bin"
v_gps: ClassVar[list] = []
[docs]
def __init__(self) -> None:
"""Initialize OzGPS with empty configuration and sensor lists."""
super().__init__()
self.configuration = []
self.s_gps = []
[docs]
def initialize(self, config: dict) -> bool:
"""Initialize all configured GPS modules.
Args:
config: List of GPS sensor configuration dicts from the Gateway.
Returns:
True if initialization succeeded, False on error.
"""
self.configuration = config
context_logger.info_with_context("GPS", f"Configuration: {self.configuration}")
try:
for sensor in self.configuration:
if "pn" in sensor:
sensor["init"] = self.initializeSensor(sensor)
return True
# Catch Sensor error from Exception
except Exception as e:
context_logger.error_with_context("GPS", f"Failed to initialize GPS: {e}")
return False
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialize a single GPS module and take an initial reading.
Args:
sensor: Configuration dict with part number, port settings, and parameters.
Returns:
1 on success, 0 on failure or if the sensor is disabled.
"""
_success = False
_s_gps = None
value = []
if sensor["pn"] == 30 and sensor["en"] == 1:
gps_port = "/dev/ttyUSB1"
configuration_port = "/dev/ttyUSB2"
baud = 115200
if "poll" in sensor:
self.put_sensor = sensor["poll"]
if "gpio" in sensor:
gpioConfig = sensor["gpio"]
if "gpsport" in gpioConfig:
gps_port = gpioConfig["gpsport"]
if "configport" in gpioConfig:
configuration_port = gpioConfig["configport"]
if "baud" in gpioConfig:
baud = gpioConfig["baud"]
if "xone" in sensor:
self.xone = sensor["xone"]
if "xpath" in sensor:
self.xpath = sensor["xpath"]
if "xname" in sensor:
self.xname = sensor["xname"]
try:
_s_gps = OzGPSParam(sensor["pn"], gps_port, configuration_port, baud)
_s_gps.setDriver(QuectelGPS(gps_port, configuration_port, baud))
_s_gps.gps_driver.initialize(self.xone, self.xpath, self.xname)
parameters = sensor["parameters"]
for param in parameters:
_val = {"value": [], "oldvalue": 0, "count": 0}
value.append(_val)
_success = True
except Exception as e:
_success = False
context_logger.error_with_context("GPS", f"Failed to initialize sensor: {e}")
else:
context_logger.warning_with_context("GPS", f"Disabled: {sensor['pn']}")
_success = False
self.s_gps.append(_s_gps)
self.v_gps.append(value)
return int(_success)
[docs]
def getSensorReading(self) -> dict:
"""Poll all GPS modules and accumulate lat/lon/satellite readings.
Returns:
Dict mapping send-codes to their latest GPS values.
"""
data = {}
if self.gps_poll or self.put_sensor == 1:
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
context_logger.info_with_context("GPS", f"PART: {sensor['pn']}")
try:
response = self.s_gps[X].gps_driver.getSensorData()
context_logger.info_with_context("GPS", f"{response}")
for Y, parameters in enumerate(sensor["parameters"]):
try:
value = self._putvalue(parameters["pm"], response)
if value != 0:
self.v_gps[X][Y]["oldvalue"] = value
self.v_gps[X][Y]["value"].append(self.v_gps[X][Y]["oldvalue"])
self.v_gps[X][Y]["count"] += 1
data[parameters["sc"]] = self.v_gps[X][Y]["oldvalue"]
except Exception as e:
context_logger.error_with_context("GPS", f"Failed to get sensor reading: {e}")
except Exception as e:
context_logger.error_with_context("GPS", f"Failed to getSensorReading: {e}")
context_logger.info_with_context("GPS", f"Sending realtime data: {data}")
return data
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Transfer accumulated GPS readings into the output payload and reset.
Args:
value: Shared output dict to populate with GPS time-series data.
Returns:
The updated output dict with GPS data added.
"""
if self.put_sensor == 1:
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
try:
for Y, parameters in enumerate(sensor["parameters"]):
try:
value[parameters["sc"]] = self.v_gps[X][Y]["value"]
self.v_gps[X][Y]["value"] = []
self.v_gps[X][Y]["count"] = 0
except Exception as e:
context_logger.error_with_context(
"GPS",
f"putSensorValue failed at [{X}][{Y}] - {parameters['sc']}: {e}",
)
except Exception as e:
context_logger.error_with_context("GPS", f"Failed to putSensorValue: {e}")
return value
def _putvalue(self, pm: int, response: dict) -> float | int | None:
"""Extract a specific GPS parameter from the NMEA response.
Args:
pm: Parameter ID (1 = latitude, 2 = longitude, 3 = satellite count).
response: Parsed NMEA response dict from the GPS driver.
Returns:
The extracted value, or 0.0/0 if not available.
"""
gps_data = response["data"]
if pm == 1:
if "GPRMC" in gps_data:
lat = gps_data["GPRMC"]["lat"]
if lat != 0:
self.lat = lat
return self.lat
return 0.0
if pm == 2:
if "GPRMC" in gps_data:
lon = gps_data["GPRMC"]["lon"]
if lon != 0:
self.lon = lon
return self.lon
return 0.0
if pm == 3:
if "GPGGA" in gps_data:
return gps_data["GPGGA"]["num_sats"]
return 0
return 0.0
[docs]
def putInitvalues(self, value: dict) -> None:
"""Add GPS location to the init payload and sync the RTC from GPS time.
Args:
value: Shared init payload dict to update with location coordinates.
"""
self.monitor_gps_datetime()
value.update({"loc": [self.lat, self.lon]})
[docs]
def poll_loc(self) -> bool:
"""Check whether a valid GPS fix has been obtained.
Returns:
True if a non-zero latitude is available, False otherwise.
"""
return False if self.lat == 0.0 else True
[docs]
def monitor_gps_datetime(self) -> None:
"""Compare GPS time with the hardware RTC and update RTC if drift exceeds 10 seconds."""
import subprocess
from datetime import datetime
context_logger.info_with_context("GPS", "Checking GPS time monitor…")
# ── single-pass, no infinite loop ──
for gps_obj in self.s_gps:
if gps_obj and gps_obj.gps_driver:
try:
response = gps_obj.gps_driver.getSensorData()
gps_data = response.get("data", {})
gps_time_str = gps_data.get("GPRMC", {}).get("time", "")
gps_date_str = gps_data.get("GPRMC", {}).get("date", "")
# Removed: num_sats = gps_data.get("GPGGA", {}).get("num_sats", 0)
if gps_time_str and gps_date_str:
# Removed: elapsed = time.time() - start_time
context_logger.info_with_context("GPS", f"Date: {gps_date_str}, Time: {gps_time_str}")
# ── RTC sync check ──
try:
dt_obj = datetime.strptime(f"{gps_date_str} {gps_time_str}", "%d/%m/%y %H:%M:%S")
gps_epoch = int(dt_obj.timestamp())
rtc_out = subprocess.check_output(["hwclock", "--show"], text=True).strip()
rtc_dt = datetime.strptime(rtc_out.split(".")[0], "%Y-%m-%d %H:%M:%S")
rtc_epoch = int(rtc_dt.timestamp())
diff = abs(gps_epoch - rtc_epoch)
context_logger.info_with_context(
"GPS",
f"GPS epoch: {gps_epoch}, RTC epoch: {rtc_epoch}, Diff: {diff}s",
)
if diff > 10:
formatted_time = dt_obj.strftime("%Y-%m-%d %H:%M:%S")
if subprocess.call(["hwclock", "--set", f"--date={formatted_time}"]) == 0:
context_logger.info_with_context("GPS", f"RTC updated to: {formatted_time}")
else:
context_logger.error_with_context("GPS", "Failed to update RTC")
else:
context_logger.info_with_context("GPS", "Time difference < 10s. No update needed.")
except Exception as e:
context_logger.error_with_context("GPS", f"RTC update failed: {e}")
else:
context_logger.info_with_context("GPS", "GPS fix not ready yet. May be NO Satellite")
# Removed: print(f"Satellites: {num_sats}")
except Exception as e:
context_logger.error_with_context("GPS", f"monitor_gps_datetime: {e}")
# Example code
# python3 -m OzWrapper.OzGPS.OzGPS
if __name__ == "__main__":
import os
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, "gps.config.json")
with open(file_name) as config_file:
json_content = config_file.read()
sensorConfig = json.loads(json_content)
context_logger.info_with_context("GPS", f"Config: {sensorConfig}")
gps = OzGPS()
gps.initialize(sensorConfig["gps"])
xOne = True
if xOne:
xOne = gps.getXOne()
for _ in range(0, 4):
gps.getSensorReading()
data = {}
context_logger.info_with_context("GPS", f"GPS {gps.putSensorValue(data)}")