Source code for OzWrapper.OzHMI.OzHMI

"""HMI (Human-Machine Interface) display wrapper for Nextion/UART touch screens.

Drives a Nextion-style UART HMI display to show sensor parameter names,
values, and units on multiple pages, along with status icons for WiFi, GSM,
GPS, and battery. Also queries the network manager for connectivity details.
"""

import json
import os
import time
from queue import Queue

import requests

from drivers.HMI.HMI import HMI
from Network import Network
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
# HMI Architecture

# (0)
# extract data from the SOCKET Listening port

# (1)
# extract device id
# write device id to the specific page

# (2)
# extract timestamp and convert it into DD-MM-YY HH:MM format
# write date to all the page

# (3)
# extract Data_Parameters and Data_Value
# Notate every g1 g2 key to its Corresponding PARAMETER
# check STANDARD Data sending format to the HMI driver from the above in code definition
# every parameter should have DEFAULT UNITS
# Write the Data_parameters and Data_Value
# also if the number of data parameters are not enough and are less than 6 in count,
# the default parameters like Temperature, Humidity and others will shown

## SETUP RUNS: 0, 1, 2 (if not present, it shows the default values.)
## LOOP RUNS: 3


[docs] class OzHMI: """HMI display wrapper for Nextion-style UART touch screens. Receives sensor payloads from a queue, maps send-codes to human-readable parameter labels and units, and writes formatted values to the HMI driver. Also manages status icons (WiFi, GSM, GPS, battery) and settings pages. Attributes: hmi: HMI driver instance, or None before setup. deviceId: Device identifier shown on the HMI about page. hmi_port: UART serial port path for the HMI display. hmi_baud: Baud rate for HMI serial communication. timezone: Timezone string for timestamp display. hmi_key_Param: Mapping of send-codes to display parameter labels. hmi_key_Unit: Mapping of send-codes to display unit strings. hmi_key_fc: Mapping of send-codes to conversion factors. isRealtime: Whether to process only realtime data payloads. network: Network instance for internet connectivity checks. Sample Data frame of Oizom V6 device: {'d': {'temp': 0, 'hum': 0, 'hum_cf': 0, 'g51': 342.09, 'g55': 347.92, 'g61': 345.15, 'g65': 359.71, 'g81': 352.66, 'g85': 357.04, 'g21': 331.56, 'g25': 348.03, 's1': 55.02, 'leq': 49.45, 'lmax': 60.5, 'lmin': 44.8, 'wd': 339, 'ws': 1.841428518, 'rain': 0, 't': 1691524500}, 'deviceId': 'P1950', 'deviceType': 'POLLUDRON_PRO'} Standard Oizom Key Units: {"g1 ": "ppm", "g2 ": "mg/m3", "g3 ": "ug/m3", "g4 ": "ug/m3", "g5 ": "ug/m3", "g6 ": "ug/m3", "g7 ": "ug/m3", "g8 ": "ug/m3", "g9 ": "%", "v1 ": "ppb", "v2 ": "ppb", "v3 ": "ppb", "v4 ": "ppm", "v5 ": "ppb", "v6 ": "ppb", "p1 ": "ug/m3", "p2 ": "ug/m3", "p3 ": "ug/m3", "p4 ": "ug/m3", "temp ": "Celsius", "hum ": "%", "t ": "Seconds", "leq ": "dB", "lmin ": "dB", "lmax ": "dB", "rain ": "mm", "light ": "Lux", "flood ": "mm", "uv ": "Index", "ws ": "m/s", "wd ": "Degree", "pr ": "HPa"} Standard Oizom Key Parameters: {"g1" : "CO2", "g2" : "CO", "g3" : "NO2", "g4" : "NH3", "g5" : "O3", "g6" : "H2S", "g7" : "NO", "g8" : "SO2", "g9" : "O2", "v1" : "Cl2", "v2" : "TVOC", "v3" : "CH2O", "v4" : "CH4", "v5" : "CH3SH", "v6" : "C2H4", "p1" : "PM2", "p2" : "PM10", "p3" : "PM1", "p4" : "PM100", "temp" : "Temperature", "hum" : "Humidity", "t" : "TimeStamp", "leq" : "NoiseAverage", "lmin" : "MinNoise", "lmax" : "MaxNoise", "rain" : "RainFall", "light" : "VisibleLight", "flood" : "FloodLevel", "uv" : "UVIndex", "ws" : "WindSpeed", "wd" : "WindDirection", "pr" : "Pressure"} """ fc = 1 # conversion factor """ CO2 - ppm All Gas - ppb Dust - ug/m3 O2 - % Temp - deg C Hum - % Pressure - HPa Wind Speed - m/s Wind Direction - deg Rain - mm """ # sample SENSOR DATA format to be sent to the HMI driver # { SensorParam:"", # SensorValue:"",/ # SensorUnit:""/ # }/ deviceId = "OZTEST001" hmi = None # driver lastCal = "13/08/2023" version = "1.0" hmi_port = "/dev/ttyAMA2" hmi_baud = 115200 timezone = "Asia/Kolkata" debug = 0 heatingInterval = 2 lat = "23.456789 N" # 11 characters lon = "73.456789 N" once = True nmOnce = True url = "" response = None # Payload payloaddata = None sTime = 10 displayCount = -1 isRealtime = 0 isInternet = 0 isCharging = 0 currentTime = 0 prevTime = 0 network = None
[docs] def __init__(self) -> None: """Initialize OzHMI with default settings and empty key mappings.""" super().__init__() self.hmi_key_Param = {} self.hmi_key_Unit = {} self.hmi_key_fc = {} self.hmi_keys = {} self.hmi_params = {} self.server = [ "http://172.19.0.1:8084/network/status", "http://172.17.0.1:8084/network/status", ] # # Hardware services <-> Gateway IP Bridge in-between <-> Network manager self.headers = { "Content-Type": "application/json", "Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOjIwODksIm9yZ0lkIjoiT1pfQVBJUyIsInVzZXJFbWFpbCI6ImFwaXNAb2l6b20uY29tIiwiaWF0IjoxNjExNTU5MDE5LCJleHAiOjE4MzIzMTEwMTksImlzcyI6ImxiVzRvOUlUVTZTcHZXekRyeWV3TWVqTjF6cXM4NkVZIn0.8W9fVYBfWMZ931WlfEoL-dFIl9zdTNbMyb3A3dzjags", } self.multiParam = []
[docs] def setup(self, hmiConfig: dict) -> None: """Configure the HMI display, build key mappings, and initialize status icons. Args: hmiConfig: Configuration dict with parameters, gpio, and settings sections. """ if "parameters" in hmiConfig: self.parameters = hmiConfig["parameters"] if "rdata" in hmiConfig: self.isRealtime = hmiConfig["rdata"] if "gpio" in hmiConfig: gpioConfig = hmiConfig["gpio"] if "port" in gpioConfig: self.hmi_port = gpioConfig["port"] if "baud" in gpioConfig: self.hmi_baud = gpioConfig["baud"] if "debug" in gpioConfig: self.debug = gpioConfig["debug"] if "settings" in hmiConfig: settingsConfig = hmiConfig["settings"] if "lastCal" in settingsConfig: self.lastCal = settingsConfig["lastCal"] if "version" in settingsConfig: self.version = settingsConfig["version"] if "timezone" in settingsConfig: self.timezone = settingsConfig["timezone"] if "heatingTime" in settingsConfig: self.heatingInterval = settingsConfig["heatingTime"] if "lat" in settingsConfig: self.lat = settingsConfig["lat"] if "lon" in settingsConfig: self.lon = settingsConfig["lon"] # Key Value pair of Parameter for param in hmiConfig["parameters"]: self.hmi_key_Param[param["sc"]] = param["lb"] context_logger.info_with_context("HMI", f"Key Value pair of Parameter... {self.hmi_key_Param}") # Key Value pair of Units for param in hmiConfig["parameters"]: self.hmi_key_Unit[param["sc"]] = param["un"] context_logger.info_with_context("HMI", f"Key Value pair of Units... {self.hmi_key_Unit}") # Key Value pair of FC (conversion factor): for param in hmiConfig["parameters"]: self.hmi_key_fc[param["sc"]] = param["fc"] context_logger.info_with_context("HMI", f"Key Value pair of FC... {self.hmi_key_fc}") try: self.hmi = HMI(self.hmi_port, self.hmi_baud) context_logger.info_with_context("HMI", f"Initializing... {self.hmi_port}, {self.hmi_baud}") self.hmi.SD_About[2] = self.lastCal # update last calibrated date self.hmi.SD_About[0] = self.version # update device version - PolluSense v1 self.hmi.timeZone = self.timezone # update timezone self.network = Network() self.isInternet = self.network.is_internet() context_logger.info_with_context("HMI", f"Internet Available: {self.isInternet}") self.hmi.isInternetStatus = self.isInternet self.hmi.heatingTime = self.heatingInterval context_logger.info_with_context("HMI", f"Heating Time: {self.hmi.heatingTime} min/s") self.hmi.clearAllData() # clear Prev Data self.hmi.setup() except Exception as e: context_logger.error_with_context("HMI", f"setup: {e}")
[docs] def loop(self, hmi_queue: Queue) -> None: """Continuously read payloads and update the HMI display with sensor data and icons. Args: hmi_queue: Queue supplying sensor data payloads. """ while 1: try: data = {} context_logger.info_with_context("HMI", "running..") if self.sTime == 10: data = hmi_queue.get() else: data = hmi_queue.get(timeout=self.sTime) if self.isRealtime: if data.get("d") and data["d"].get("rdata"): context_logger.info_with_context("HMI", f"Sending realtime data through HMI {data}") self.payloaddata = data["d"] else: continue else: if data.get("d") and not data["d"].get("rdata"): context_logger.info_with_context("HMI", f"Sending non-realtime data through HMI {data}") self.payloaddata = data["d"] else: continue self.deviceId = data["deviceId"] context_logger.info_with_context("HMI", f"deviceId {self.deviceId}") self.hmi.updatedeviceId(self.deviceId) if self.once: self.hmi.showPage(4) self.once = False time.sleep(2) self.hmi.showPage(6) except Exception as e: context_logger.error_with_context("HMI", f"loop: {e}") finally: context_logger.info_with_context("HMI", "Execute every time") # time.sleep(1) self.hmi.pos = 0 # sensor pos back to 0 if self.payloaddata is not None: for _d in self.payloaddata: if (_d in list(self.hmi_key_Param)) and (_d in list(self.hmi_key_Unit)): try: _sensor_value = self.payloaddata.get(_d) # sensor_value if _sensor_value < 0: _sensor_value = 0 sensorParameter = self.hmi_key_Param.get(_d) sensorData = _sensor_value sensorData = ( _sensor_value * self.hmi_key_fc.get(_d) ) # ENABLE THIS when config gets updated via the software settings && disable the Conversion Factor function sensorUnit = self.hmi_key_Unit.get(_d) self.hmi.updateSensorData( sensorParameter, sensorData, sensorUnit, self.hmi.pos ) # Write all this data to HMI driver self.hmi.pos += 1 context_logger.info_with_context( "HMI", f"sensorParameter:{sensorParameter}, sensorData:{sensorData}, sensorKey:{sensorUnit}, position:{self.hmi.pos}", ) try: if "bs" in self.payloaddata: self.hmi.wifiStatus = 1 self.hmi.updateWifiIcon(self.hmi.enable) self.hmi.updateBatteryData(self.payloaddata["bs"]) # update Battery Status context_logger.info_with_context("HMI", f"Battery: {self.payloaddata['bs']}") except Exception as e: context_logger.error_with_context("HMI", f"loop: {e}") try: if "current" in self.payloaddata: # derive charging status if self.payloaddata["current"] > 0: self.hmi.updateBatteryIcon(1) else: self.hmi.updateBatteryIcon(0) except Exception as e: context_logger.error_with_context("HMI", f"loop: {e}") try: if "lat" in self.payloaddata: # if Location available if self.payloaddata["lat"] > 0: self.lat = self.payloaddata["lat"] if "lon" in self.payloaddata: self.lon = self.payloaddata["lon"] data_LatLon = str(self.lat) + " , " + str(self.lon) context_logger.info_with_context("HMI", f"Lat-Long: {data_LatLon}") self.hmi.SD_About[1] = data_LatLon self.hmi.updateAbout() self.hmi.locationStatus = 1 self.hmi.updateLocationIcon(self.hmi.enable) except Exception as e: context_logger.error_with_context("HMI", f"loop: {e}") except Exception as e: context_logger.error_with_context("HMI", f"loop: {e}") finally: self.hmi.loop() # # Listen to HMI if self.nmOnce: # Call Network Manager only Once. self.nmOnce = False for i in range(len(self.server)): self.url = self.server[i] context_logger.info_with_context("HMI", f"Connecting to: {self.url}") try: # Add a timeout to avoid indefinite blocking; handle request exceptions explicitly self.response = requests.get(self.url, timeout=10) context_logger.info_with_context("HMI", f"Response Status Code: {self.response.status_code}") except requests.exceptions.RequestException as e: context_logger.error_with_context("HMI", f"loop: {e}") self.response = None try: if not self.response: context_logger.info_with_context( "HMI", "No response from network manager, skipping network status update", ) else: status = json.loads(json.dumps(self.response.json())) if self.response.status_code == 200: context_logger.info_with_context( "HMI", f"Connecting to: {self.url}" ) # prints whole of Internet response from the Network Manager gsmStatus = status["gsm"]["internet"] context_logger.info_with_context("HMI", f"GSM {gsmStatus}") # check if GSM is connected ?? try: if gsmStatus: self.hmi.gsmStatus = 1 self.hmi.updateGSMIcon(self.hmi.enable) # update SD settings WAN self.hmi.SD_settingsWAN[0] = status["gsm"]["gateway"] context_logger.info_with_context( "HMI", f"GSM Gateway: {self.hmi.SD_settingsWAN[0]}", ) self.hmi.SD_settingsWAN[1] = status["gsm"]["apn"]["en"] context_logger.info_with_context("HMI", f"GSM APN: {self.hmi.SD_settingsWAN[1]}") self.hmi.SD_settingsWAN[2] = status["gsm"]["apn"]["apn"] context_logger.info_with_context( "HMI", f"GSM APN APN: {self.hmi.SD_settingsWAN[2]}", ) self.hmi.updateSettingsWAN() else: self.hmi.gsmStatus = 0 self.hmi.updateGSMIcon(self.hmi.disable) except Exception as e: context_logger.error_with_context("HMI", f"loop: {e}") context_logger.info_with_context("HMI", "Display Icons Updated") except Exception as e: context_logger.error_with_context("HMI", f"loop: {e}")
[docs] def run(self, hmiConfig: dict, hmi_queue: Queue) -> None: """Entry point: set up the HMI display and start the update loop if enabled. Args: hmiConfig: Configuration dict with 'en' enable flag. hmi_queue: Queue supplying sensor data payloads. """ if "en" in hmiConfig: if hmiConfig["en"] == 1: self.setup(hmiConfig) self.loop(hmi_queue)
if __name__ == "__main__": from queue import Queue file_string = "hmi.config.json" dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, file_string) with open(file_name) as configFile: sensor = configFile.read() hmiConfig = json.loads(sensor)["hmi"] hmi_queue = Queue(1) hmi = OzHMI() hmi.setup(hmiConfig) hmi.loop(hmi_queue)