Source code for OzWrapper.OzDisplay.OzDisplay

"""E-paper and 7-segment display wrapper for showing sensor readings.

Drives TM1637 (4-digit 7-segment) or TM1638 (8-digit with LEDs) displays
to show real-time sensor values with unit indicator LEDs, supporting
multi-parameter cycling.
"""

import json
import os
import time
from queue import Queue

from drivers.MCP230XX import MCP230XX
from drivers.TM1637.TM1637 import TM1637Decimal
from drivers.TM1638.TM1638 import TM1638
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzDisplay: """Wrapper for TM1637/TM1638 7-segment displays with unit indicator LEDs. Shows sensor readings on a 7-segment display, cycling through multiple parameters with configurable decimal precision and unit LED indicators. Attributes: display: TM1637Decimal or TM1638 driver instance, or None before setup. payloaddata: Latest sensor payload received from the queue. CLK: GPIO clock pin for the display. DIO: GPIO data pin for the display. STB: GPIO strobe pin for TM1638 displays. MCP: MCP230XX GPIO expander for unit LEDs (TM1637 mode), or None. sTime: Display refresh timeout in seconds. displayParam: Currently displayed parameter send-code. displayUnit: Currently displayed unit string. displayCount: Index into the multi-parameter cycle list. isRealtime: Whether to process only realtime data payloads. parameters: Display parameter configuration list. multiParam: List of send-codes to cycle through on the display. """ display = None # Display Driver # Store screen parameters # Payload payloaddata = None CLK = 21 DIO = 20 STB = 3 MCP = None sTime = 10 displayParam = None displayUnit = None displayCount = -1 isRealtime = 0
[docs] def __init__(self) -> None: """Initialize OzDisplay with default pin assignments and unit mappings.""" super().__init__() self.parameters: list = [] self.multiParam: list = [] self.unit_pin: list = [12, 13, 14, 15] self.units: list = ["ppb", "ppm", "µg/m³", "mg/m³"] self.noise_units: list = ["leq", "lmax", "lmin"] self.pm_units: list = ["p3", "p1", "p2", "p4"] self.c_units: list = ["ppm", "mg/m³", "%"] self.thp_units = ["temp", "hum", "pr"]
[docs] def updateLed(self) -> None: """Update the unit indicator LED based on the currently displayed parameter.""" pin = 3 try: if self.displayParam in self.pm_units: pin = self.pm_units.index(self.displayParam) elif self.displayParam in self.noise_units: pin = self.noise_units.index(self.displayParam) elif self.displayParam in self.thp_units: pin = self.thp_units.index(self.displayParam) elif self.displayUnit in self.units: pin = self.units.index(self.displayUnit) if self.displayParam == "g1": pin = self.c_units.index(self.displayUnit) except Exception as e: context_logger.error_with_context("Display", f"UpdateLed:{e}") if self.partNo == 1: for gpio in self.unit_pin: self.MCP.digitalWrite(gpio, 0) time.sleep(0.1) self.MCP.digitalWrite(self.unit_pin[pin], 1) else: self.display.clear_leds() time.sleep(0.2) self.display.update_led(pin)
[docs] def setup(self, displayConfig: dict) -> None: """Configure the display driver, unit LEDs, and parameter cycling. Args: displayConfig: Configuration dict with parameters, refresh time, part number, and realtime flag. """ self.partNo = 1 try: if "parameters" in displayConfig: self.parameters = displayConfig["parameters"] if "sTime" in displayConfig: self.sTime = displayConfig["sTime"] if "rdata" in displayConfig: self.isRealtime = displayConfig["rdata"] if "pn" in displayConfig: self.partNo = displayConfig["pn"] if self.partNo == 1: self.display = TM1637Decimal(clk=self.CLK, dio=self.DIO) self.MCP = MCP230XX(devicenumber=os.getenv("MCP_ID", 6)) else: self.display = TM1638(stb=self.STB, clk=self.CLK, dio=self.DIO, brightness=7) # self.display.write([0,0,0,0,0,0]) time.sleep(1) self.display.show("------") try: if self.partNo == 1: for pin in self.unit_pin: self.MCP.pinMode(pin, "output") self.MCP.digitalWrite(pin, 0) else: self.display.clear_leds() except Exception as e: context_logger.error_with_context("Display", f"Setup:{e}") for param in self.parameters: pin = 3 try: context_logger.info_with_context("Display", f"Setup: {param['sc']}") if isinstance(param["sc"], list): self.multiParam = param["sc"] self.displayParam = self.multiParam[0] self.displayUnit = param["un"] else: self.displayParam = param["sc"] self.displayUnit = param["un"] self.updateLed() except Exception as e: context_logger.error_with_context("Display", f"Setup: {e}") except Exception as e: context_logger.error_with_context("Display", f"Setup: {e}")
[docs] def loop(self, displayqueue: Queue) -> None: """Continuously read payloads and update the display with sensor values. Cycles through configured parameters and formats values with the specified decimal precision. Args: displayqueue: Queue supplying sensor data payloads. """ while 1: try: data = {} if self.sTime == 10: data = displayqueue.get() else: data = displayqueue.get(timeout=self.sTime) if self.isRealtime: if data.get("d") and data["d"].get("rdata"): context_logger.info_with_context("Display", f"Sending realtime data through display {data}") self.payloaddata = data["d"] else: continue else: if data.get("d") and not data["d"].get("rdata"): context_logger.info_with_context("Display", f"Sending realtime data through display {data}") self.payloaddata = data["d"] else: continue except Exception as e: context_logger.error_with_context("Display", f"loop : {e}") finally: time.sleep(1) if len(self.multiParam) > 0: self.displayCount = self.displayCount + 1 if self.displayCount >= len(self.multiParam): self.displayCount = 0 self.displayParam = self.multiParam[self.displayCount] self.updateLed() if self.payloaddata is not None: for param in self.parameters: if self.displayParam is not None and self.displayParam in self.payloaddata: value = self.payloaddata[self.displayParam] * (1.0 * param["fc"]) if value < 0: value = 0 message = "------" if param["dc"] == 0: message = f"{int(value)}" elif param["dc"] == 1: message = f"{value:.1f}" elif param["dc"] == 2: message = f"{value:.2f}" elif param["dc"] == 3: message = f"{value:.3f}" else: message = f"{int(value)}" mLenght = len(message.replace(".", "")) if mLenght > 6: message = "------" if mLenght < 6: message = " " * (6 - mLenght) + message context_logger.info_with_context("Display", f"Sending display message {message}") # self.display.write([0,0,0,0,0,0]) # time.sleep(0.5) self.display.show(message)
[docs] def run(self, displayConfig: dict, displayqueue: Queue) -> None: """Entry point: set up the display and start the update loop if enabled. Args: displayConfig: Configuration dict with 'en' enable flag. displayqueue: Queue supplying sensor data payloads. """ if "en" in displayConfig: if displayConfig["en"] == 1: self.setup(displayConfig) self.loop(displayqueue)
if __name__ == "__main__": from queue import Queue file_string = "display.config.json" dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, file_string) with open(file_name) as configFile: sensor = configFile.read() displayConfig = json.loads(sensor)["display"] display_queue = Queue(1) display = OzDisplay() display.setup(displayConfig) display.loop(display_queue)