"""HMI (Human-Machine Interface) display wrapper for Nextion/UART touch screens.
Drives a Nextion-style UART HMI display to show sensor parameter names,
values, and units on multiple pages, along with status icons for WiFi, GSM,
GPS, and battery. Also queries the network manager for connectivity details.
"""
import json
import os
import time
from queue import Queue
import requests
from drivers.HMI.HMI import HMI
from Network import Network
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
# HMI Architecture
# (0)
# extract data from the SOCKET Listening port
# (1)
# extract device id
# write device id to the specific page
# (2)
# extract timestamp and convert it into DD-MM-YY HH:MM format
# write date to all the page
# (3)
# extract Data_Parameters and Data_Value
# Notate every g1 g2 key to its Corresponding PARAMETER
# check STANDARD Data sending format to the HMI driver from the above in code definition
# every parameter should have DEFAULT UNITS
# Write the Data_parameters and Data_Value
# also if the number of data parameters are not enough and are less than 6 in count,
# the default parameters like Temperature, Humidity and others will shown
## SETUP RUNS: 0, 1, 2 (if not present, it shows the default values.)
## LOOP RUNS: 3
[docs]
class OzHMI:
"""HMI display wrapper for Nextion-style UART touch screens.
Receives sensor payloads from a queue, maps send-codes to human-readable
parameter labels and units, and writes formatted values to the HMI driver.
Also manages status icons (WiFi, GSM, GPS, battery) and settings pages.
Attributes:
hmi: HMI driver instance, or None before setup.
deviceId: Device identifier shown on the HMI about page.
hmi_port: UART serial port path for the HMI display.
hmi_baud: Baud rate for HMI serial communication.
timezone: Timezone string for timestamp display.
hmi_key_Param: Mapping of send-codes to display parameter labels.
hmi_key_Unit: Mapping of send-codes to display unit strings.
hmi_key_fc: Mapping of send-codes to conversion factors.
isRealtime: Whether to process only realtime data payloads.
network: Network instance for internet connectivity checks.
Sample Data frame of Oizom V6 device: {'d': {'temp': 0, 'hum': 0, 'hum_cf': 0, 'g51': 342.09, 'g55': 347.92, 'g61': 345.15, 'g65': 359.71, 'g81': 352.66, 'g85': 357.04, 'g21': 331.56, 'g25': 348.03, 's1': 55.02, 'leq': 49.45, 'lmax': 60.5, 'lmin': 44.8, 'wd': 339, 'ws': 1.841428518, 'rain': 0, 't': 1691524500}, 'deviceId': 'P1950', 'deviceType': 'POLLUDRON_PRO'}
Standard Oizom Key Units: {"g1 ": "ppm", "g2 ": "mg/m3", "g3 ": "ug/m3", "g4 ": "ug/m3", "g5 ": "ug/m3", "g6 ": "ug/m3", "g7 ": "ug/m3", "g8 ": "ug/m3", "g9 ": "%", "v1 ": "ppb", "v2 ": "ppb", "v3 ": "ppb", "v4 ": "ppm", "v5 ": "ppb", "v6 ": "ppb", "p1 ": "ug/m3", "p2 ": "ug/m3", "p3 ": "ug/m3", "p4 ": "ug/m3", "temp ": "Celsius", "hum ": "%", "t ": "Seconds", "leq ": "dB", "lmin ": "dB", "lmax ": "dB", "rain ": "mm", "light ": "Lux", "flood ": "mm", "uv ": "Index", "ws ": "m/s", "wd ": "Degree", "pr ": "HPa"}
Standard Oizom Key Parameters: {"g1" : "CO2", "g2" : "CO", "g3" : "NO2", "g4" : "NH3", "g5" : "O3", "g6" : "H2S", "g7" : "NO", "g8" : "SO2", "g9" : "O2", "v1" : "Cl2", "v2" : "TVOC", "v3" : "CH2O", "v4" : "CH4", "v5" : "CH3SH", "v6" : "C2H4", "p1" : "PM2", "p2" : "PM10", "p3" : "PM1", "p4" : "PM100", "temp" : "Temperature", "hum" : "Humidity", "t" : "TimeStamp", "leq" : "NoiseAverage", "lmin" : "MinNoise", "lmax" : "MaxNoise", "rain" : "RainFall", "light" : "VisibleLight", "flood" : "FloodLevel", "uv" : "UVIndex", "ws" : "WindSpeed", "wd" : "WindDirection", "pr" : "Pressure"}
"""
fc = 1 # conversion factor
"""
CO2 - ppm
All Gas - ppb
Dust - ug/m3
O2 - %
Temp - deg C
Hum - %
Pressure - HPa
Wind Speed - m/s
Wind Direction - deg
Rain - mm
"""
# sample SENSOR DATA format to be sent to the HMI driver
# { SensorParam:"",
# SensorValue:"",/
# SensorUnit:""/
# }/
deviceId = "OZTEST001"
hmi = None # driver
lastCal = "13/08/2023"
version = "1.0"
hmi_port = "/dev/ttyAMA2"
hmi_baud = 115200
timezone = "Asia/Kolkata"
debug = 0
heatingInterval = 2
lat = "23.456789 N" # 11 characters
lon = "73.456789 N"
once = True
nmOnce = True
url = ""
response = None
# Payload
payloaddata = None
sTime = 10
displayCount = -1
isRealtime = 0
isInternet = 0
isCharging = 0
currentTime = 0
prevTime = 0
network = None
[docs]
def __init__(self) -> None:
"""Initialize OzHMI with default settings and empty key mappings."""
super().__init__()
self.hmi_key_Param = {}
self.hmi_key_Unit = {}
self.hmi_key_fc = {}
self.hmi_keys = {}
self.hmi_params = {}
self.server = [
"http://172.19.0.1:8084/network/status",
"http://172.17.0.1:8084/network/status",
] # # Hardware services <-> Gateway IP Bridge in-between <-> Network manager
self.headers = {
"Content-Type": "application/json",
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIwODksIm9yZ0lkIjoiT1pfQVBJUyIsInVzZXJFbWFpbCI6ImFwaXNAb2l6b20uY29tIiwiaWF0IjoxNjExNTU5MDE5LCJleHAiOjE4MzIzMTEwMTksImlzcyI6ImxiVzRvOUlUVTZTcHZXekRyeWV3TWVqTjF6cXM4NkVZIn0.8W9fVYBfWMZ931WlfEoL-dFIl9zdTNbMyb3A3dzjags",
}
self.multiParam = []
[docs]
def setup(self, hmiConfig: dict) -> None:
"""Configure the HMI display, build key mappings, and initialize status icons.
Args:
hmiConfig: Configuration dict with parameters, gpio, and settings sections.
"""
if "parameters" in hmiConfig:
self.parameters = hmiConfig["parameters"]
if "rdata" in hmiConfig:
self.isRealtime = hmiConfig["rdata"]
if "gpio" in hmiConfig:
gpioConfig = hmiConfig["gpio"]
if "port" in gpioConfig:
self.hmi_port = gpioConfig["port"]
if "baud" in gpioConfig:
self.hmi_baud = gpioConfig["baud"]
if "debug" in gpioConfig:
self.debug = gpioConfig["debug"]
if "settings" in hmiConfig:
settingsConfig = hmiConfig["settings"]
if "lastCal" in settingsConfig:
self.lastCal = settingsConfig["lastCal"]
if "version" in settingsConfig:
self.version = settingsConfig["version"]
if "timezone" in settingsConfig:
self.timezone = settingsConfig["timezone"]
if "heatingTime" in settingsConfig:
self.heatingInterval = settingsConfig["heatingTime"]
if "lat" in settingsConfig:
self.lat = settingsConfig["lat"]
if "lon" in settingsConfig:
self.lon = settingsConfig["lon"]
# Key Value pair of Parameter
for param in hmiConfig["parameters"]:
self.hmi_key_Param[param["sc"]] = param["lb"]
context_logger.info_with_context("HMI", f"Key Value pair of Parameter... {self.hmi_key_Param}")
# Key Value pair of Units
for param in hmiConfig["parameters"]:
self.hmi_key_Unit[param["sc"]] = param["un"]
context_logger.info_with_context("HMI", f"Key Value pair of Units... {self.hmi_key_Unit}")
# Key Value pair of FC (conversion factor):
for param in hmiConfig["parameters"]:
self.hmi_key_fc[param["sc"]] = param["fc"]
context_logger.info_with_context("HMI", f"Key Value pair of FC... {self.hmi_key_fc}")
try:
self.hmi = HMI(self.hmi_port, self.hmi_baud)
context_logger.info_with_context("HMI", f"Initializing... {self.hmi_port}, {self.hmi_baud}")
self.hmi.SD_About[2] = self.lastCal # update last calibrated date
self.hmi.SD_About[0] = self.version # update device version - PolluSense v1
self.hmi.timeZone = self.timezone # update timezone
self.network = Network()
self.isInternet = self.network.is_internet()
context_logger.info_with_context("HMI", f"Internet Available: {self.isInternet}")
self.hmi.isInternetStatus = self.isInternet
self.hmi.heatingTime = self.heatingInterval
context_logger.info_with_context("HMI", f"Heating Time: {self.hmi.heatingTime} min/s")
self.hmi.clearAllData() # clear Prev Data
self.hmi.setup()
except Exception as e:
context_logger.error_with_context("HMI", f"setup: {e}")
[docs]
def loop(self, hmi_queue: Queue) -> None:
"""Continuously read payloads and update the HMI display with sensor data and icons.
Args:
hmi_queue: Queue supplying sensor data payloads.
"""
while 1:
try:
data = {}
context_logger.info_with_context("HMI", "running..")
if self.sTime == 10:
data = hmi_queue.get()
else:
data = hmi_queue.get(timeout=self.sTime)
if self.isRealtime:
if data.get("d") and data["d"].get("rdata"):
context_logger.info_with_context("HMI", f"Sending realtime data through HMI {data}")
self.payloaddata = data["d"]
else:
continue
else:
if data.get("d") and not data["d"].get("rdata"):
context_logger.info_with_context("HMI", f"Sending non-realtime data through HMI {data}")
self.payloaddata = data["d"]
else:
continue
self.deviceId = data["deviceId"]
context_logger.info_with_context("HMI", f"deviceId {self.deviceId}")
self.hmi.updatedeviceId(self.deviceId)
if self.once:
self.hmi.showPage(4)
self.once = False
time.sleep(2)
self.hmi.showPage(6)
except Exception as e:
context_logger.error_with_context("HMI", f"loop: {e}")
finally:
context_logger.info_with_context("HMI", "Execute every time")
# time.sleep(1)
self.hmi.pos = 0 # sensor pos back to 0
if self.payloaddata is not None:
for _d in self.payloaddata:
if (_d in list(self.hmi_key_Param)) and (_d in list(self.hmi_key_Unit)):
try:
_sensor_value = self.payloaddata.get(_d) # sensor_value
if _sensor_value < 0:
_sensor_value = 0
sensorParameter = self.hmi_key_Param.get(_d)
sensorData = _sensor_value
sensorData = (
_sensor_value * self.hmi_key_fc.get(_d)
) # ENABLE THIS when config gets updated via the software settings && disable the Conversion Factor function
sensorUnit = self.hmi_key_Unit.get(_d)
self.hmi.updateSensorData(
sensorParameter, sensorData, sensorUnit, self.hmi.pos
) # Write all this data to HMI driver
self.hmi.pos += 1
context_logger.info_with_context(
"HMI",
f"sensorParameter:{sensorParameter}, sensorData:{sensorData}, sensorKey:{sensorUnit}, position:{self.hmi.pos}",
)
try:
if "bs" in self.payloaddata:
self.hmi.wifiStatus = 1
self.hmi.updateWifiIcon(self.hmi.enable)
self.hmi.updateBatteryData(self.payloaddata["bs"]) # update Battery Status
context_logger.info_with_context("HMI", f"Battery: {self.payloaddata['bs']}")
except Exception as e:
context_logger.error_with_context("HMI", f"loop: {e}")
try:
if "current" in self.payloaddata: # derive charging status
if self.payloaddata["current"] > 0:
self.hmi.updateBatteryIcon(1)
else:
self.hmi.updateBatteryIcon(0)
except Exception as e:
context_logger.error_with_context("HMI", f"loop: {e}")
try:
if "lat" in self.payloaddata: # if Location available
if self.payloaddata["lat"] > 0:
self.lat = self.payloaddata["lat"]
if "lon" in self.payloaddata:
self.lon = self.payloaddata["lon"]
data_LatLon = str(self.lat) + " , " + str(self.lon)
context_logger.info_with_context("HMI", f"Lat-Long: {data_LatLon}")
self.hmi.SD_About[1] = data_LatLon
self.hmi.updateAbout()
self.hmi.locationStatus = 1
self.hmi.updateLocationIcon(self.hmi.enable)
except Exception as e:
context_logger.error_with_context("HMI", f"loop: {e}")
except Exception as e:
context_logger.error_with_context("HMI", f"loop: {e}")
finally:
self.hmi.loop() # # Listen to HMI
if self.nmOnce: # Call Network Manager only Once.
self.nmOnce = False
for i in range(len(self.server)):
self.url = self.server[i]
context_logger.info_with_context("HMI", f"Connecting to: {self.url}")
try:
# Add a timeout to avoid indefinite blocking; handle request exceptions explicitly
self.response = requests.get(self.url, timeout=10)
context_logger.info_with_context("HMI", f"Response Status Code: {self.response.status_code}")
except requests.exceptions.RequestException as e:
context_logger.error_with_context("HMI", f"loop: {e}")
self.response = None
try:
if not self.response:
context_logger.info_with_context(
"HMI",
"No response from network manager, skipping network status update",
)
else:
status = json.loads(json.dumps(self.response.json()))
if self.response.status_code == 200:
context_logger.info_with_context(
"HMI", f"Connecting to: {self.url}"
) # prints whole of Internet response from the Network Manager
gsmStatus = status["gsm"]["internet"]
context_logger.info_with_context("HMI", f"GSM {gsmStatus}")
# check if GSM is connected ??
try:
if gsmStatus:
self.hmi.gsmStatus = 1
self.hmi.updateGSMIcon(self.hmi.enable)
# update SD settings WAN
self.hmi.SD_settingsWAN[0] = status["gsm"]["gateway"]
context_logger.info_with_context(
"HMI",
f"GSM Gateway: {self.hmi.SD_settingsWAN[0]}",
)
self.hmi.SD_settingsWAN[1] = status["gsm"]["apn"]["en"]
context_logger.info_with_context("HMI", f"GSM APN: {self.hmi.SD_settingsWAN[1]}")
self.hmi.SD_settingsWAN[2] = status["gsm"]["apn"]["apn"]
context_logger.info_with_context(
"HMI",
f"GSM APN APN: {self.hmi.SD_settingsWAN[2]}",
)
self.hmi.updateSettingsWAN()
else:
self.hmi.gsmStatus = 0
self.hmi.updateGSMIcon(self.hmi.disable)
except Exception as e:
context_logger.error_with_context("HMI", f"loop: {e}")
context_logger.info_with_context("HMI", "Display Icons Updated")
except Exception as e:
context_logger.error_with_context("HMI", f"loop: {e}")
[docs]
def run(self, hmiConfig: dict, hmi_queue: Queue) -> None:
"""Entry point: set up the HMI display and start the update loop if enabled.
Args:
hmiConfig: Configuration dict with 'en' enable flag.
hmi_queue: Queue supplying sensor data payloads.
"""
if "en" in hmiConfig:
if hmiConfig["en"] == 1:
self.setup(hmiConfig)
self.loop(hmi_queue)
if __name__ == "__main__":
from queue import Queue
file_string = "hmi.config.json"
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, file_string)
with open(file_name) as configFile:
sensor = configFile.read()
hmiConfig = json.loads(sensor)["hmi"]
hmi_queue = Queue(1)
hmi = OzHMI()
hmi.setup(hmiConfig)
hmi.loop(hmi_queue)