"""LoRa communication wrapper for transmitting sensor data over LoRaWAN.
Encodes sensor telemetry into binary LoRa payloads and transmits them via
Melange, OTTA, or LORA-E5 modules, supporting both ABP and OTAA activation
modes with configurable regional settings.
"""
import struct
import time
from pathlib import Path
from queue import Queue
import serial
from drivers.LORAE5.LORAE5 import LORAE5
from drivers.Melange import Melange
from drivers.OTTA import OTTA
from drivers.OzLan import OzLan
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzLora:
"""LoRaWAN communication wrapper supporting multiple LoRa module types.
Manages LoRaWAN key provisioning, binary payload encoding via struct,
dual-buffer transmission, and downlink command processing (including
remote reboot).
Attributes:
module: LoRa driver instance (Melange, OTTA, or LORAE5), or None.
appkey: OTAA application key.
network_key: ABP network session key.
app_session_key: ABP application session key.
dev_addr: ABP device address.
lora_class: LoRaWAN device class (A, B, or C).
lora_mode: Activation mode (ABP or OTAA).
lora_region: Regional frequency plan identifier.
isRealtime: Whether to transmit only realtime data payloads.
modulePart: Module type selector (0 = Melange, 1 = OTTA, 2 = LORA-E5).
lora_keys: Primary payload register map (send-code to [offset, byte-length]).
lora_keys_2: Secondary payload register map.
lora_buffer: Primary binary transmit buffer.
lora_buffer_2: Secondary binary transmit buffer.
"""
module = None # driver
[docs]
def __init__(self) -> None:
"""Initialize OzLora with default keys, buffers, and register maps."""
super().__init__()
self.appkey = []
self.network_key = []
self.app_session_key = []
self.dev_addr = []
self.lora_class = []
self.lora_mode = []
self.lora_region = ""
self.app_eui = []
self.dev_eui = []
self.payloaddata = None
self.isRealtime = 0
self.modulePart = 0
self.ignoreKeys = ["temp", "t1"]
self.lora_keys = {
"g1": [0, 2],
"g3": [2, 4],
"g5": [6, 4],
"g7": [10, 4],
"g2": [14, 4],
"g8": [18, 4],
"p1": [22, 2],
"p2": [24, 2],
"temp": [26, 3],
"hum": [28, 2],
"leq": [30, 2],
"lmax": [32, 2],
"lmin": [34, 2],
"light": [36, 4],
"uv": [40, 2],
"rain": [42, 1],
"mode": [43, 1],
"t": [44, 4],
}
self.lora_keys_2 = {
"p3": [0, 2],
"p4": [2, 2],
"t1": [4, 3],
"t2": [6, 2],
"bs": [8, 2],
"g4": [10, 4],
"g6": [14, 4],
"v1": [18, 4],
"v2": [22, 4],
"v3": [26, 4],
"v4": [30, 4],
"v5": [34, 4],
"pr": [38, 4],
"flood": [42, 1],
"mode": [43, 1],
"t": [44, 4],
}
self.lora_buffer = []
self.lora_buffer_2 = []
self.lora_status = [b"\x2b", b"\x01"]
self.lora_buffer = [b"\x00"] * (list(self.lora_keys.values())[-1][0] + 4)
self.lora_buffer_2 = [b"\x00"] * (list(self.lora_keys_2.values())[-1][0] + 4)
self.lora_buffer_2[43] = b"\x01"
[docs]
def setup(self, loraConfig: dict) -> None:
"""Configure the LoRa module with keys, port settings, and regional parameters.
Args:
loraConfig: Configuration dict with gpio, keys, class, mode, and region.
"""
port = "/dev/ttyAMA2"
baud = 9600
if "gpio" in loraConfig:
gpioConfig = loraConfig["gpio"]
if "port" in gpioConfig:
port = gpioConfig["port"]
if "baud" in gpioConfig:
baud = gpioConfig["baud"]
if "appkey" in loraConfig:
# OTTA Appkey
self.appkey = loraConfig["appkey"]
if "appeui" in loraConfig:
# Join EUI
self.app_eui = loraConfig["appeui"]
if "deveui" in loraConfig:
# DevEUI
self.dev_eui = loraConfig["deveui"]
if "nwkey" in loraConfig:
# ABP Network Session Key
self.network_key = loraConfig["nwkey"]
if "appskey" in loraConfig:
# ABP App Session Key
self.app_session_key = loraConfig["appskey"]
if "dev" in loraConfig:
# ABP Device Address
self.dev_addr = loraConfig["dev"]
if "loraclass" in loraConfig:
self.lora_class = loraConfig["loraclass"]
if "loramode" in loraConfig:
self.lora_mode = loraConfig["loramode"]
if "loraregion" in loraConfig:
self.lora_region = loraConfig["loraregion"]
if "rdata" in loraConfig:
self.isRealtime = loraConfig["rdata"]
if "pn" in loraConfig:
self.modulePart = loraConfig["pn"]
if self.modulePart == 0:
# Melange LoRa module
self.module = Melange(port, baud)
context_logger.info_with_context("LoRa", f"Initializing... {port} {baud}")
elif self.modulePart == 1:
# OTTA Lora Module
self.module = OTTA(port, baud)
context_logger.info_with_context("LoRa", f"Initializing... {port} {baud}")
elif self.modulePart == 2:
self.module = LORAE5(port, baud)
context_logger.info_with_context("LoRa", f"Initializing... {port} {baud}")
if self.modulePart == 2:
self.module.initialize(
appkey=self.appkey,
appeui=self.app_eui,
deveui=self.dev_eui,
network_session_key=self.network_key,
app_session_key=self.app_session_key,
devaddr=self.dev_addr,
)
self.module.setup_lorawan(
loraClass=self.lora_class,
loraMode=self.lora_mode,
loraRegion=self.lora_region,
)
else:
self.module.initialize(
appkey=self.appkey,
network_session_key=self.network_key,
devaddr=self.dev_addr,
loraclass=self.lora_class,
loramode=self.lora_mode,
appeui=self.app_eui,
)
time.sleep(5)
self.module.sendLora(self.lora_status)
[docs]
def convert(self, val: float, points: int = 2) -> list[int]:
"""Pack a sensor value into a binary byte list for LoRa transmission.
Uses struct packing with network byte order. Supports 1-byte unsigned,
2-byte unsigned/signed (x10 scaling), and 4-byte float/unsigned int.
Args:
val: Sensor value to encode.
points: Byte length and format selector (1, 2, 3=signed-2, 4).
Returns:
List of integer byte values ready for the LoRa buffer.
"""
list_sensor_val = None
if points == 2:
# 2 bytes unsigned integer
val = int(val * 10)
list_sensor_val = list(struct.pack("!H", val))
if points == 4 and type(val) is float:
context_logger.info_with_context("LoRa", "Converting float")
val = round(float(val), 2)
list_sensor_val = list(struct.pack("!f", val))
if points == 4 and type(val) is int:
context_logger.info_with_context("LoRa", "Converting int")
list_sensor_val = list(struct.pack("!I", val))
if points == 1:
list_sensor_val = list(struct.pack("!B", val))
if points == 3:
# this will return only 2 values which is 2 bytes signed integer
val = int(val * 10)
list_sensor_val = list(struct.pack("!h", val))
context_logger.info_with_context("LoRa", [f"0x{b:02x}" for b in list_sensor_val])
return list(list_sensor_val)
[docs]
def rebootGSM(self, gsm_port: str = "/dev/ttyUSB3") -> None:
"""Reboot the GSM modem via AT command and wait for it to come back online.
Args:
gsm_port: Serial port path of the GSM modem.
"""
_device_iccid = None
context_logger.info_with_context("LoRa", "rebootGSM")
# port = str(gsm_port)
ser = serial.Serial(port=gsm_port, baudrate=115200, timeout=1)
try:
cmd = "AT+CFUN=1,1\r"
ser.write(cmd.encode())
msg = ser.readline() # read echo command from quectel
context_logger.info_with_context("LoRa", msg.decode("utf-8"))
msg = ser.readline() # read response of the command from quectel
context_logger.info_with_context("LoRa", msg.decode("utf-8"))
ser.close()
path = Path(gsm_port)
currentTime = time.time()
timeout = 15 # seconds
success_flag = True # Flag to indicate the success or failure of the reboot
context_logger.info_with_context("LoRa", "Rebooting...gsm")
while (time.time() - currentTime) < timeout:
if path.exists():
pass
else:
context_logger.info_with_context("LoRa", "Reboot stage-1 SUCCESS")
break
context_logger.info_with_context("LoRa", "Waiting for online")
currentTime = time.time()
while (time.time() - currentTime) < timeout:
if not path.exists():
pass
else:
context_logger.info_with_context("LoRa", "Reboot stage-2 SUCCESS")
success_flag = True
break
if success_flag:
context_logger.info_with_context("LoRa", "GSM Reboot Done")
time.sleep(2)
except Exception as e:
context_logger.error_with_context("LoRa", f"rebootGSM: {e}")
finally:
ser.close()
[docs]
def rebootSystem(self) -> None:
"""Reboot the entire device by resetting the GSM modem and triggering SAMD watchdog."""
data = {
"pid": "wd",
"rb": 1, # 1 : Create Timer [NOT IMPLEMENTED] | 2: Reset watchdog | 3 : Stop Timer [future plan]
}
try:
self.rebootGSM()
port = "/dev/ttyACM0"
baud = 115200
samdPort = OzLan(port, baud)
if samdPort is not None:
context_logger.info_with_context("LoRa", "[WATCHDG] rebooting system")
samdPort.send_command(data)
except Exception as e:
context_logger.error_with_context("LoRa", f"[ERR_OzLORA] rebootSystem: {e}")
[docs]
def loop(self, loraqueue: Queue) -> None:
"""Continuously read payloads, encode into LoRa buffers, and transmit.
Processes downlink commands (reboot) and re-initializes the module on
transmission failure.
Args:
loraqueue: Queue supplying sensor data payloads.
"""
context_logger.info_with_context("LoRa", "running..")
while 1:
try:
data = loraqueue.get()
if self.isRealtime:
if data.get("d") and data["d"].get("rdata"):
self.payloaddata = data["d"]
context_logger.info_with_context("LoRa", f"Sending realtime data through Lora {data}")
else:
continue
else:
if data.get("d") and not data["d"].get("rdata"):
self.payloaddata = data["d"]
context_logger.info_with_context("LoRa", f"Sending data through Lora {data}")
else:
continue
if self.payloaddata is not None:
for _d in self.payloaddata:
if _d in list(self.lora_keys.keys()):
context_logger.info_with_context("LoRa", f"Key:{_d} Register:{self.lora_keys.get(_d)}")
_s_v = self.payloaddata.get(_d)
if _s_v is not None:
if _d not in self.ignoreKeys:
if _s_v < 0:
_s_v = 0
# prepare lora format
lora_index = self.lora_keys.get(_d)[0]
lora_length = self.lora_keys.get(_d)[1]
_buffer = self.convert(_s_v, lora_length)
for _index, _b in enumerate(_buffer):
_val = bytes([_b])
context_logger.info_with_context("LoRa", f"val :{_index} {_val}")
self.lora_buffer[lora_index + _index] = _val
if _d in list(self.lora_keys_2.keys()):
context_logger.info_with_context("LoRa", f"Key:{_d} Register:{self.lora_keys_2.get(_d)}")
_s_v = self.payloaddata.get(_d)
if _s_v is not None:
if _d not in self.ignoreKeys:
if _s_v < 0:
_s_v = 0
# prepare lora format
lora_index = self.lora_keys_2.get(_d)[0]
lora_length = self.lora_keys_2.get(_d)[1]
_buffer = self.convert(_s_v, lora_length)
for _index, _b in enumerate(_buffer):
_val = bytes([_b])
context_logger.info_with_context("LoRa", f"val :{_index} {_val}")
self.lora_buffer_2[lora_index + _index] = _val
# send data to lora
success, downlink_1 = self.module.sendLora(self.lora_buffer)
time.sleep(45)
success, downlink_2 = self.module.sendLora(self.lora_buffer_2)
time.sleep(5)
if len(downlink_1) > 0:
context_logger.info_with_context("LoRa", f"downlink command: {downlink_1[4]}")
if downlink_1[4] == b"\x01":
self.rebootSystem()
if len(downlink_2) > 0:
context_logger.info_with_context("LoRa", f"downlink command: {downlink_2[4]}")
if downlink_2[4] == b"\x01":
self.rebootSystem()
if success == 0:
time.sleep(2)
if self.modulePart == 2:
self.module.initialize(
appkey=self.appkey,
appeui=self.app_eui,
deveui=self.dev_eui,
network_session_key=self.network_key,
app_session_key=self.app_session_key,
devaddr=self.dev_addr,
)
self.module.setup_lorawan(
loraClass=self.lora_class,
loraMode=self.lora_mode,
loraRegion=self.module.lora.IN865,
)
else:
self.module.initialize(
appkey=self.appkey,
network_session_key=self.network_key,
devaddr=self.dev_addr,
loraclass=self.lora_class,
loramode=self.lora_mode,
appeui=self.app_eui,
)
time.sleep(2)
except Exception as e:
context_logger.error_with_context("LoRa", f"loop: {e}")
finally:
time.sleep(1)
[docs]
def run(self, loraConfig: dict, loraqueue: Queue) -> None:
"""Entry point: set up the LoRa module and start the transmit loop if enabled.
Args:
loraConfig: Configuration dict with 'en' enable flag.
loraqueue: Queue supplying sensor data payloads.
"""
if "en" in loraConfig:
if loraConfig["en"] == 1:
self.setup(loraConfig)
self.loop(loraqueue)