Source code for OzWrapper.OzLora.OzLora

"""LoRa communication wrapper for transmitting sensor data over LoRaWAN.

Encodes sensor telemetry into binary LoRa payloads and transmits them via
Melange, OTTA, or LORA-E5 modules, supporting both ABP and OTAA activation
modes with configurable regional settings.
"""

import struct
import time
from pathlib import Path
from queue import Queue

import serial

from drivers.LORAE5.LORAE5 import LORAE5
from drivers.Melange import Melange
from drivers.OTTA import OTTA
from drivers.OzLan import OzLan
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzLora: """LoRaWAN communication wrapper supporting multiple LoRa module types. Manages LoRaWAN key provisioning, binary payload encoding via struct, dual-buffer transmission, and downlink command processing (including remote reboot). Attributes: module: LoRa driver instance (Melange, OTTA, or LORAE5), or None. appkey: OTAA application key. network_key: ABP network session key. app_session_key: ABP application session key. dev_addr: ABP device address. lora_class: LoRaWAN device class (A, B, or C). lora_mode: Activation mode (ABP or OTAA). lora_region: Regional frequency plan identifier. isRealtime: Whether to transmit only realtime data payloads. modulePart: Module type selector (0 = Melange, 1 = OTTA, 2 = LORA-E5). lora_keys: Primary payload register map (send-code to [offset, byte-length]). lora_keys_2: Secondary payload register map. lora_buffer: Primary binary transmit buffer. lora_buffer_2: Secondary binary transmit buffer. """ module = None # driver
[docs] def __init__(self) -> None: """Initialize OzLora with default keys, buffers, and register maps.""" super().__init__() self.appkey = [] self.network_key = [] self.app_session_key = [] self.dev_addr = [] self.lora_class = [] self.lora_mode = [] self.lora_region = "" self.app_eui = [] self.dev_eui = [] self.payloaddata = None self.isRealtime = 0 self.modulePart = 0 self.ignoreKeys = ["temp", "t1"] self.lora_keys = { "g1": [0, 2], "g3": [2, 4], "g5": [6, 4], "g7": [10, 4], "g2": [14, 4], "g8": [18, 4], "p1": [22, 2], "p2": [24, 2], "temp": [26, 3], "hum": [28, 2], "leq": [30, 2], "lmax": [32, 2], "lmin": [34, 2], "light": [36, 4], "uv": [40, 2], "rain": [42, 1], "mode": [43, 1], "t": [44, 4], } self.lora_keys_2 = { "p3": [0, 2], "p4": [2, 2], "t1": [4, 3], "t2": [6, 2], "bs": [8, 2], "g4": [10, 4], "g6": [14, 4], "v1": [18, 4], "v2": [22, 4], "v3": [26, 4], "v4": [30, 4], "v5": [34, 4], "pr": [38, 4], "flood": [42, 1], "mode": [43, 1], "t": [44, 4], } self.lora_buffer = [] self.lora_buffer_2 = [] self.lora_status = [b"\x2b", b"\x01"] self.lora_buffer = [b"\x00"] * (list(self.lora_keys.values())[-1][0] + 4) self.lora_buffer_2 = [b"\x00"] * (list(self.lora_keys_2.values())[-1][0] + 4) self.lora_buffer_2[43] = b"\x01"
[docs] def setup(self, loraConfig: dict) -> None: """Configure the LoRa module with keys, port settings, and regional parameters. Args: loraConfig: Configuration dict with gpio, keys, class, mode, and region. """ port = "/dev/ttyAMA2" baud = 9600 if "gpio" in loraConfig: gpioConfig = loraConfig["gpio"] if "port" in gpioConfig: port = gpioConfig["port"] if "baud" in gpioConfig: baud = gpioConfig["baud"] if "appkey" in loraConfig: # OTTA Appkey self.appkey = loraConfig["appkey"] if "appeui" in loraConfig: # Join EUI self.app_eui = loraConfig["appeui"] if "deveui" in loraConfig: # DevEUI self.dev_eui = loraConfig["deveui"] if "nwkey" in loraConfig: # ABP Network Session Key self.network_key = loraConfig["nwkey"] if "appskey" in loraConfig: # ABP App Session Key self.app_session_key = loraConfig["appskey"] if "dev" in loraConfig: # ABP Device Address self.dev_addr = loraConfig["dev"] if "loraclass" in loraConfig: self.lora_class = loraConfig["loraclass"] if "loramode" in loraConfig: self.lora_mode = loraConfig["loramode"] if "loraregion" in loraConfig: self.lora_region = loraConfig["loraregion"] if "rdata" in loraConfig: self.isRealtime = loraConfig["rdata"] if "pn" in loraConfig: self.modulePart = loraConfig["pn"] if self.modulePart == 0: # Melange LoRa module self.module = Melange(port, baud) context_logger.info_with_context("LoRa", f"Initializing... {port} {baud}") elif self.modulePart == 1: # OTTA Lora Module self.module = OTTA(port, baud) context_logger.info_with_context("LoRa", f"Initializing... {port} {baud}") elif self.modulePart == 2: self.module = LORAE5(port, baud) context_logger.info_with_context("LoRa", f"Initializing... {port} {baud}") if self.modulePart == 2: self.module.initialize( appkey=self.appkey, appeui=self.app_eui, deveui=self.dev_eui, network_session_key=self.network_key, app_session_key=self.app_session_key, devaddr=self.dev_addr, ) self.module.setup_lorawan( loraClass=self.lora_class, loraMode=self.lora_mode, loraRegion=self.lora_region, ) else: self.module.initialize( appkey=self.appkey, network_session_key=self.network_key, devaddr=self.dev_addr, loraclass=self.lora_class, loramode=self.lora_mode, appeui=self.app_eui, ) time.sleep(5) self.module.sendLora(self.lora_status)
[docs] def convert(self, val: float, points: int = 2) -> list[int]: """Pack a sensor value into a binary byte list for LoRa transmission. Uses struct packing with network byte order. Supports 1-byte unsigned, 2-byte unsigned/signed (x10 scaling), and 4-byte float/unsigned int. Args: val: Sensor value to encode. points: Byte length and format selector (1, 2, 3=signed-2, 4). Returns: List of integer byte values ready for the LoRa buffer. """ list_sensor_val = None if points == 2: # 2 bytes unsigned integer val = int(val * 10) list_sensor_val = list(struct.pack("!H", val)) if points == 4 and type(val) is float: context_logger.info_with_context("LoRa", "Converting float") val = round(float(val), 2) list_sensor_val = list(struct.pack("!f", val)) if points == 4 and type(val) is int: context_logger.info_with_context("LoRa", "Converting int") list_sensor_val = list(struct.pack("!I", val)) if points == 1: list_sensor_val = list(struct.pack("!B", val)) if points == 3: # this will return only 2 values which is 2 bytes signed integer val = int(val * 10) list_sensor_val = list(struct.pack("!h", val)) context_logger.info_with_context("LoRa", [f"0x{b:02x}" for b in list_sensor_val]) return list(list_sensor_val)
[docs] def rebootGSM(self, gsm_port: str = "/dev/ttyUSB3") -> None: """Reboot the GSM modem via AT command and wait for it to come back online. Args: gsm_port: Serial port path of the GSM modem. """ _device_iccid = None context_logger.info_with_context("LoRa", "rebootGSM") # port = str(gsm_port) ser = serial.Serial(port=gsm_port, baudrate=115200, timeout=1) try: cmd = "AT+CFUN=1,1\r" ser.write(cmd.encode()) msg = ser.readline() # read echo command from quectel context_logger.info_with_context("LoRa", msg.decode("utf-8")) msg = ser.readline() # read response of the command from quectel context_logger.info_with_context("LoRa", msg.decode("utf-8")) ser.close() path = Path(gsm_port) currentTime = time.time() timeout = 15 # seconds success_flag = True # Flag to indicate the success or failure of the reboot context_logger.info_with_context("LoRa", "Rebooting...gsm") while (time.time() - currentTime) < timeout: if path.exists(): pass else: context_logger.info_with_context("LoRa", "Reboot stage-1 SUCCESS") break context_logger.info_with_context("LoRa", "Waiting for online") currentTime = time.time() while (time.time() - currentTime) < timeout: if not path.exists(): pass else: context_logger.info_with_context("LoRa", "Reboot stage-2 SUCCESS") success_flag = True break if success_flag: context_logger.info_with_context("LoRa", "GSM Reboot Done") time.sleep(2) except Exception as e: context_logger.error_with_context("LoRa", f"rebootGSM: {e}") finally: ser.close()
[docs] def rebootSystem(self) -> None: """Reboot the entire device by resetting the GSM modem and triggering SAMD watchdog.""" data = { "pid": "wd", "rb": 1, # 1 : Create Timer [NOT IMPLEMENTED] | 2: Reset watchdog | 3 : Stop Timer [future plan] } try: self.rebootGSM() port = "/dev/ttyACM0" baud = 115200 samdPort = OzLan(port, baud) if samdPort is not None: context_logger.info_with_context("LoRa", "[WATCHDG] rebooting system") samdPort.send_command(data) except Exception as e: context_logger.error_with_context("LoRa", f"[ERR_OzLORA] rebootSystem: {e}")
[docs] def loop(self, loraqueue: Queue) -> None: """Continuously read payloads, encode into LoRa buffers, and transmit. Processes downlink commands (reboot) and re-initializes the module on transmission failure. Args: loraqueue: Queue supplying sensor data payloads. """ context_logger.info_with_context("LoRa", "running..") while 1: try: data = loraqueue.get() if self.isRealtime: if data.get("d") and data["d"].get("rdata"): self.payloaddata = data["d"] context_logger.info_with_context("LoRa", f"Sending realtime data through Lora {data}") else: continue else: if data.get("d") and not data["d"].get("rdata"): self.payloaddata = data["d"] context_logger.info_with_context("LoRa", f"Sending data through Lora {data}") else: continue if self.payloaddata is not None: for _d in self.payloaddata: if _d in list(self.lora_keys.keys()): context_logger.info_with_context("LoRa", f"Key:{_d} Register:{self.lora_keys.get(_d)}") _s_v = self.payloaddata.get(_d) if _s_v is not None: if _d not in self.ignoreKeys: if _s_v < 0: _s_v = 0 # prepare lora format lora_index = self.lora_keys.get(_d)[0] lora_length = self.lora_keys.get(_d)[1] _buffer = self.convert(_s_v, lora_length) for _index, _b in enumerate(_buffer): _val = bytes([_b]) context_logger.info_with_context("LoRa", f"val :{_index} {_val}") self.lora_buffer[lora_index + _index] = _val if _d in list(self.lora_keys_2.keys()): context_logger.info_with_context("LoRa", f"Key:{_d} Register:{self.lora_keys_2.get(_d)}") _s_v = self.payloaddata.get(_d) if _s_v is not None: if _d not in self.ignoreKeys: if _s_v < 0: _s_v = 0 # prepare lora format lora_index = self.lora_keys_2.get(_d)[0] lora_length = self.lora_keys_2.get(_d)[1] _buffer = self.convert(_s_v, lora_length) for _index, _b in enumerate(_buffer): _val = bytes([_b]) context_logger.info_with_context("LoRa", f"val :{_index} {_val}") self.lora_buffer_2[lora_index + _index] = _val # send data to lora success, downlink_1 = self.module.sendLora(self.lora_buffer) time.sleep(45) success, downlink_2 = self.module.sendLora(self.lora_buffer_2) time.sleep(5) if len(downlink_1) > 0: context_logger.info_with_context("LoRa", f"downlink command: {downlink_1[4]}") if downlink_1[4] == b"\x01": self.rebootSystem() if len(downlink_2) > 0: context_logger.info_with_context("LoRa", f"downlink command: {downlink_2[4]}") if downlink_2[4] == b"\x01": self.rebootSystem() if success == 0: time.sleep(2) if self.modulePart == 2: self.module.initialize( appkey=self.appkey, appeui=self.app_eui, deveui=self.dev_eui, network_session_key=self.network_key, app_session_key=self.app_session_key, devaddr=self.dev_addr, ) self.module.setup_lorawan( loraClass=self.lora_class, loraMode=self.lora_mode, loraRegion=self.module.lora.IN865, ) else: self.module.initialize( appkey=self.appkey, network_session_key=self.network_key, devaddr=self.dev_addr, loraclass=self.lora_class, loramode=self.lora_mode, appeui=self.app_eui, ) time.sleep(2) except Exception as e: context_logger.error_with_context("LoRa", f"loop: {e}") finally: time.sleep(1)
[docs] def run(self, loraConfig: dict, loraqueue: Queue) -> None: """Entry point: set up the LoRa module and start the transmit loop if enabled. Args: loraConfig: Configuration dict with 'en' enable flag. loraqueue: Queue supplying sensor data payloads. """ if "en" in loraConfig: if loraConfig["en"] == 1: self.setup(loraConfig) self.loop(loraqueue)