"""E-paper and 7-segment display wrapper for showing sensor readings.
Drives TM1637 (4-digit 7-segment) or TM1638 (8-digit with LEDs) displays
to show real-time sensor values with unit indicator LEDs, supporting
multi-parameter cycling.
"""
import json
import os
import time
from queue import Queue
from drivers.MCP230XX import MCP230XX
from drivers.TM1637.TM1637 import TM1637Decimal
from drivers.TM1638.TM1638 import TM1638
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzDisplay:
"""Wrapper for TM1637/TM1638 7-segment displays with unit indicator LEDs.
Shows sensor readings on a 7-segment display, cycling through multiple
parameters with configurable decimal precision and unit LED indicators.
Attributes:
display: TM1637Decimal or TM1638 driver instance, or None before setup.
payloaddata: Latest sensor payload received from the queue.
CLK: GPIO clock pin for the display.
DIO: GPIO data pin for the display.
STB: GPIO strobe pin for TM1638 displays.
MCP: MCP230XX GPIO expander for unit LEDs (TM1637 mode), or None.
sTime: Display refresh timeout in seconds.
displayParam: Currently displayed parameter send-code.
displayUnit: Currently displayed unit string.
displayCount: Index into the multi-parameter cycle list.
isRealtime: Whether to process only realtime data payloads.
parameters: Display parameter configuration list.
multiParam: List of send-codes to cycle through on the display.
"""
display = None # Display Driver
# Store screen parameters
# Payload
payloaddata = None
CLK = 21
DIO = 20
STB = 3
MCP = None
sTime = 10
displayParam = None
displayUnit = None
displayCount = -1
isRealtime = 0
[docs]
def __init__(self) -> None:
"""Initialize OzDisplay with default pin assignments and unit mappings."""
super().__init__()
self.parameters: list = []
self.multiParam: list = []
self.unit_pin: list = [12, 13, 14, 15]
self.units: list = ["ppb", "ppm", "µg/m³", "mg/m³"]
self.noise_units: list = ["leq", "lmax", "lmin"]
self.pm_units: list = ["p3", "p1", "p2", "p4"]
self.c_units: list = ["ppm", "mg/m³", "%"]
self.thp_units = ["temp", "hum", "pr"]
[docs]
def updateLed(self) -> None:
"""Update the unit indicator LED based on the currently displayed parameter."""
pin = 3
try:
if self.displayParam in self.pm_units:
pin = self.pm_units.index(self.displayParam)
elif self.displayParam in self.noise_units:
pin = self.noise_units.index(self.displayParam)
elif self.displayParam in self.thp_units:
pin = self.thp_units.index(self.displayParam)
elif self.displayUnit in self.units:
pin = self.units.index(self.displayUnit)
if self.displayParam == "g1":
pin = self.c_units.index(self.displayUnit)
except Exception as e:
context_logger.error_with_context("Display", f"UpdateLed:{e}")
if self.partNo == 1:
for gpio in self.unit_pin:
self.MCP.digitalWrite(gpio, 0)
time.sleep(0.1)
self.MCP.digitalWrite(self.unit_pin[pin], 1)
else:
self.display.clear_leds()
time.sleep(0.2)
self.display.update_led(pin)
[docs]
def setup(self, displayConfig: dict) -> None:
"""Configure the display driver, unit LEDs, and parameter cycling.
Args:
displayConfig: Configuration dict with parameters, refresh time,
part number, and realtime flag.
"""
self.partNo = 1
try:
if "parameters" in displayConfig:
self.parameters = displayConfig["parameters"]
if "sTime" in displayConfig:
self.sTime = displayConfig["sTime"]
if "rdata" in displayConfig:
self.isRealtime = displayConfig["rdata"]
if "pn" in displayConfig:
self.partNo = displayConfig["pn"]
if self.partNo == 1:
self.display = TM1637Decimal(clk=self.CLK, dio=self.DIO)
self.MCP = MCP230XX(devicenumber=os.getenv("MCP_ID", 6))
else:
self.display = TM1638(stb=self.STB, clk=self.CLK, dio=self.DIO, brightness=7)
# self.display.write([0,0,0,0,0,0])
time.sleep(1)
self.display.show("------")
try:
if self.partNo == 1:
for pin in self.unit_pin:
self.MCP.pinMode(pin, "output")
self.MCP.digitalWrite(pin, 0)
else:
self.display.clear_leds()
except Exception as e:
context_logger.error_with_context("Display", f"Setup:{e}")
for param in self.parameters:
pin = 3
try:
context_logger.info_with_context("Display", f"Setup: {param['sc']}")
if isinstance(param["sc"], list):
self.multiParam = param["sc"]
self.displayParam = self.multiParam[0]
self.displayUnit = param["un"]
else:
self.displayParam = param["sc"]
self.displayUnit = param["un"]
self.updateLed()
except Exception as e:
context_logger.error_with_context("Display", f"Setup: {e}")
except Exception as e:
context_logger.error_with_context("Display", f"Setup: {e}")
[docs]
def loop(self, displayqueue: Queue) -> None:
"""Continuously read payloads and update the display with sensor values.
Cycles through configured parameters and formats values with the
specified decimal precision.
Args:
displayqueue: Queue supplying sensor data payloads.
"""
while 1:
try:
data = {}
if self.sTime == 10:
data = displayqueue.get()
else:
data = displayqueue.get(timeout=self.sTime)
if self.isRealtime:
if data.get("d") and data["d"].get("rdata"):
context_logger.info_with_context("Display", f"Sending realtime data through display {data}")
self.payloaddata = data["d"]
else:
continue
else:
if data.get("d") and not data["d"].get("rdata"):
context_logger.info_with_context("Display", f"Sending realtime data through display {data}")
self.payloaddata = data["d"]
else:
continue
except Exception as e:
context_logger.error_with_context("Display", f"loop : {e}")
finally:
time.sleep(1)
if len(self.multiParam) > 0:
self.displayCount = self.displayCount + 1
if self.displayCount >= len(self.multiParam):
self.displayCount = 0
self.displayParam = self.multiParam[self.displayCount]
self.updateLed()
if self.payloaddata is not None:
for param in self.parameters:
if self.displayParam is not None and self.displayParam in self.payloaddata:
value = self.payloaddata[self.displayParam] * (1.0 * param["fc"])
if value < 0:
value = 0
message = "------"
if param["dc"] == 0:
message = f"{int(value)}"
elif param["dc"] == 1:
message = f"{value:.1f}"
elif param["dc"] == 2:
message = f"{value:.2f}"
elif param["dc"] == 3:
message = f"{value:.3f}"
else:
message = f"{int(value)}"
mLenght = len(message.replace(".", ""))
if mLenght > 6:
message = "------"
if mLenght < 6:
message = " " * (6 - mLenght) + message
context_logger.info_with_context("Display", f"Sending display message {message}")
# self.display.write([0,0,0,0,0,0])
# time.sleep(0.5)
self.display.show(message)
[docs]
def run(self, displayConfig: dict, displayqueue: Queue) -> None:
"""Entry point: set up the display and start the update loop if enabled.
Args:
displayConfig: Configuration dict with 'en' enable flag.
displayqueue: Queue supplying sensor data payloads.
"""
if "en" in displayConfig:
if displayConfig["en"] == 1:
self.setup(displayConfig)
self.loop(displayqueue)
if __name__ == "__main__":
from queue import Queue
file_string = "display.config.json"
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, file_string)
with open(file_name) as configFile:
sensor = configFile.read()
displayConfig = json.loads(sensor)["display"]
display_queue = Queue(1)
display = OzDisplay()
display.setup(displayConfig)
display.loop(display_queue)