Source code for OzWrapper.OzRGB.OzRGB

"""RGB LED indicator wrapper for NeoPixel status display.

Drives a NeoPixel LED strip to indicate device status (network connectivity,
data sending, errors) through color and blink/fade patterns, and optionally
shows AQI-based beacon colors on remaining LEDs.
"""

import time
from queue import Queue

import board
import neopixel

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)

# RGB CONFIGURATION
pixel_pin = board.D18
num_pixels = 10
pixels = neopixel.NeoPixel(pixel_pin, num_pixels)
beacon_config = {
    "en": 1,
    "sc": "aqi",
    "range": [0, 50, 100, 200, 300, 400],
    "colors": ["50f0e6", "29cc70", "f0e641", "d08000", "ff0000", "c700d0"],
}
beacon_flag = False

# colors
cyan = (0, 255, 255)
red = (255, 0, 0)
green = (0, 255, 0)
blue = (0, 0, 255)
magenta = (170, 10, 127)
yellow = (255, 255, 0)
white = (255, 255, 255)
purple = (68, 0, 68)
neonOrange = (255, 95, 31)
unsupported_color = white

# pattern
pattern_fade = 0
pattern_normal_blink = 1
pattern_slow_blink = 2
pattern_fast_blink = 3
pattern_ultra_fast_blink = 4

all_colors = [cyan, red, green, blue, magenta, yellow, white]

# pattern  { <network_status_indicator_number>: [<color>, <pattern>] }
status = {
    0: [green, pattern_normal_blink, "Disconnected"],
    1: [cyan, pattern_fade, "Connected"],
    2: [blue, pattern_slow_blink, "No Simcard"],
    3: [magenta, pattern_fade, "Sending Data"],
    4: [yellow, pattern_fast_blink, "Getting Config"],
    5: [red, pattern_ultra_fast_blink, "Some Bug Found"],
    6: [purple, pattern_ultra_fast_blink, "init success"],
    7: [neonOrange, pattern_normal_blink, "OGS Allocation"],
    8: [blue, pattern_fade, "QC Mode"],
}


[docs] def hex_to_rgb(hex_code: str) -> tuple: """Convert a 6-character hex color code to an (R, G, B) tuple. Args: hex_code: Six-character hex string (e.g., '50f0e6'), without '#' prefix. Returns: Tuple of (red, green, blue) integers in the range 0-255. """ # Validate input if len(hex_code) != 6: context_logger.error_with_context("RGB", "Hex code must be 6 characters long.") return (0, 0, 0) # Convert each pair of hex digits to an integer r = int(hex_code[0:2], 16) g = int(hex_code[2:4], 16) b = int(hex_code[4:6], 16) return (r, g, b)
[docs] def get_color_from_value(value: int) -> str | None: """Look up the beacon hex color for a given AQI value. Args: value: AQI (or other index) value to classify. Returns: Hex color string from the beacon config, or None if below all ranges. """ ranges = beacon_config["range"] colors = beacon_config["colors"] for i in range(len(ranges) - 1): if ranges[i] <= value < ranges[i + 1]: return colors[i] if value >= ranges[-1]: return colors[-1] return None
[docs] class Magic: """LED animation engine providing fade and blink patterns for NeoPixel strips. Attributes: brightness: Current brightness level (0.0 to 1.0). min_bright: Minimum brightness counter value for fade animations. max_bright: Maximum brightness counter value for fade animations. bright_divider: Divisor to convert counter to NeoPixel brightness (0.0-1.0). fadInDelay: Milliseconds between brightness increments during fade-in. fadeOutWait: Milliseconds between brightness decrements during fade-out. pixels: NeoPixel strip instance being controlled. RISING: Direction constant for increasing brightness. FALLING: Direction constant for decreasing brightness. blink_delay: Milliseconds between blink toggles. blink_direction: Current blink phase (RISING or FALLING). """ brightness = 0 min_bright = 1 max_bright = 100 bright_divider = 100 fadInDelay = 20 fadeOutWait = 10 fadeOutWait = fadInDelay pixels = None _fade_current_time = None _fade_prev_time = None _fade_direction = 1 counter_brightness = 0 RISING = 1 FALLING = -1 blink_delay = 60 blink_direction = RISING prev_blinking = 0
[docs] def __init__(self, pixels: neopixel.NeoPixel) -> None: """Initialize the animation engine with a NeoPixel strip. Args: pixels: NeoPixel strip instance to control. """ self.pixels = pixels self.setFadeProperty() self.blink_delay = 60
[docs] def setFadeProperty( self, brightness: int = 0, min_bright: int = 1, max_bright: int = 100, bright_divider: int = 100, fadInDelay: int = 20, fadeOutWait: int = 10, ) -> None: """Configure fade animation properties. Args: brightness: Initial brightness counter value. min_bright: Minimum brightness counter for fade range. max_bright: Maximum brightness counter for fade range. bright_divider: Divisor to convert counter to NeoPixel brightness. fadInDelay: Milliseconds between fade-in steps. fadeOutWait: Milliseconds between fade-out steps. """ self.brightness = brightness self.min_bright = min_bright self.max_bright = max_bright self.bright_divider = bright_divider self.fadInDelay = fadInDelay self.fadeOutWait = fadeOutWait self._fade_current_time = self.millis() self._fade_prev_time = self.millis() self._fade_direction = 1 self.brightness = self.min_bright / self.bright_divider # OVERRIDE self.pixels.brightness = self.brightness self.counter_brightness = self.min_bright + 1
[docs] def fade(self) -> None: """Advance the fade animation by one step (non-blocking).""" self._fade_current_time = self.millis() if self._fade_direction == self.RISING and self.counter_brightness >= self.max_bright: self._fade_direction = self.FALLING elif self._fade_direction == self.FALLING and self.counter_brightness <= self.min_bright: self._fade_direction = self.RISING __dif_time = self._fade_current_time - self._fade_prev_time if (self._fade_direction == self.RISING and (__dif_time) >= self.fadInDelay) or ( self._fade_direction == self.FALLING and (__dif_time) >= self.fadeOutWait ): _b_print = self.counter_brightness / self.bright_divider self.counter_brightness += self._fade_direction self.pixels.brightness = self.counter_brightness / self.bright_divider self.pixels.show() self._fade_prev_time = self._fade_current_time
[docs] def blinking(self, blink_delay: int = 60) -> None: """Advance the blink animation by one step (non-blocking). Args: blink_delay: Milliseconds between on/off toggles. """ self.blink_delay = blink_delay __current_blink = self.millis() __dif_time = __current_blink - self.prev_blinking if (__dif_time) > self.blink_delay: if self.blink_direction == self.RISING: self.pixels.brightness = self.max_bright / self.bright_divider self.blink_direction = self.FALLING self.prev_blinking = __current_blink elif self.blink_direction == self.FALLING: self.pixels.brightness = 0 self.blink_direction = self.RISING self.prev_blinking = __current_blink self.pixels.show()
[docs] def micros(self) -> int: """Return the current time in microseconds.""" return int(time.time() * 1000000)
[docs] def millis(self) -> int: """Return the current time in milliseconds.""" return int(time.time() * 1000)
[docs] def main( os_network_alert_file: None, network_status: Queue | None = None, beacon_queue: Queue | None = None, ) -> None: """Run the RGB LED main loop, updating status and beacon colors continuously. Args: os_network_alert_file: Unused (legacy network alert file path). network_status: Queue providing integer network status codes for LED color. beacon_queue: Queue providing AQI data for beacon LED coloring. """ current_time = int(time.time()) prev_time = int(time.time()) print_interval = 1 f = Magic(pixels=pixels) beacon_color = unsupported_color value = -1 while 1: output = -1 current_time = int(time.time()) try: output = network_status.queue[0] # output = network_status[0] status.get(output)[0] except Exception: try: output = network_status.queue[1] except Exception as e: context_logger.error_with_context("RGB", f"main: {e}") try: # Get all the configuration _color = status.get(output)[0] # color _pattern = status.get(output)[1] # print network status output if (current_time - prev_time) > print_interval: # print("Network : {0}".format(_output_string)) prev_time = int(time.time()) pixels[0] = _color if beacon_flag: try: if not beacon_queue.empty(): data = beacon_queue.queue[0] parameter = beacon_config["sc"] if parameter == "aqi": if data and parameter in data: value = data[parameter] else: if data and parameter in data["d"]: value = data["d"][parameter] if value >= 0: beacon_color = hex_to_rgb(get_color_from_value(value)) context_logger.info_with_context( "BEACON", f"value: {value} color: {get_color_from_value(value)}", ) else: beacon_color = unsupported_color beacon_queue.queue.clear() # Update Beacon color pixels[1:] = [beacon_color] * 9 except Exception as e: context_logger.error_with_context("BEACON", f"update: {e}") # run patterns if _pattern == pattern_normal_blink: f.blinking(100) elif _pattern == pattern_slow_blink: f.blinking(600) elif _pattern == pattern_fast_blink: f.blinking(70) elif _pattern == pattern_ultra_fast_blink: f.blinking(30) elif _pattern == pattern_fade: f.fade() except Exception as e: pixels.fill(unsupported_color) f.fade() if (current_time - prev_time) > print_interval: context_logger.error_with_context("RGB", f"main: {e}") prev_time = int(time.time())
if __name__ == "__main__": NETWORK_FILE = "/etc/ozone_network.txt" main(NETWORK_FILE)