"""RGB LED indicator wrapper for NeoPixel status display.
Drives a NeoPixel LED strip to indicate device status (network connectivity,
data sending, errors) through color and blink/fade patterns, and optionally
shows AQI-based beacon colors on remaining LEDs.
"""
import time
from queue import Queue
import board
import neopixel
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
# RGB CONFIGURATION
pixel_pin = board.D18
num_pixels = 10
pixels = neopixel.NeoPixel(pixel_pin, num_pixels)
beacon_config = {
"en": 1,
"sc": "aqi",
"range": [0, 50, 100, 200, 300, 400],
"colors": ["50f0e6", "29cc70", "f0e641", "d08000", "ff0000", "c700d0"],
}
beacon_flag = False
# colors
cyan = (0, 255, 255)
red = (255, 0, 0)
green = (0, 255, 0)
blue = (0, 0, 255)
magenta = (170, 10, 127)
yellow = (255, 255, 0)
white = (255, 255, 255)
purple = (68, 0, 68)
neonOrange = (255, 95, 31)
unsupported_color = white
# pattern
pattern_fade = 0
pattern_normal_blink = 1
pattern_slow_blink = 2
pattern_fast_blink = 3
pattern_ultra_fast_blink = 4
all_colors = [cyan, red, green, blue, magenta, yellow, white]
# pattern { <network_status_indicator_number>: [<color>, <pattern>] }
status = {
0: [green, pattern_normal_blink, "Disconnected"],
1: [cyan, pattern_fade, "Connected"],
2: [blue, pattern_slow_blink, "No Simcard"],
3: [magenta, pattern_fade, "Sending Data"],
4: [yellow, pattern_fast_blink, "Getting Config"],
5: [red, pattern_ultra_fast_blink, "Some Bug Found"],
6: [purple, pattern_ultra_fast_blink, "init success"],
7: [neonOrange, pattern_normal_blink, "OGS Allocation"],
8: [blue, pattern_fade, "QC Mode"],
}
[docs]
def hex_to_rgb(hex_code: str) -> tuple:
"""Convert a 6-character hex color code to an (R, G, B) tuple.
Args:
hex_code: Six-character hex string (e.g., '50f0e6'), without '#' prefix.
Returns:
Tuple of (red, green, blue) integers in the range 0-255.
"""
# Validate input
if len(hex_code) != 6:
context_logger.error_with_context("RGB", "Hex code must be 6 characters long.")
return (0, 0, 0)
# Convert each pair of hex digits to an integer
r = int(hex_code[0:2], 16)
g = int(hex_code[2:4], 16)
b = int(hex_code[4:6], 16)
return (r, g, b)
[docs]
def get_color_from_value(value: int) -> str | None:
"""Look up the beacon hex color for a given AQI value.
Args:
value: AQI (or other index) value to classify.
Returns:
Hex color string from the beacon config, or None if below all ranges.
"""
ranges = beacon_config["range"]
colors = beacon_config["colors"]
for i in range(len(ranges) - 1):
if ranges[i] <= value < ranges[i + 1]:
return colors[i]
if value >= ranges[-1]:
return colors[-1]
return None
[docs]
class Magic:
"""LED animation engine providing fade and blink patterns for NeoPixel strips.
Attributes:
brightness: Current brightness level (0.0 to 1.0).
min_bright: Minimum brightness counter value for fade animations.
max_bright: Maximum brightness counter value for fade animations.
bright_divider: Divisor to convert counter to NeoPixel brightness (0.0-1.0).
fadInDelay: Milliseconds between brightness increments during fade-in.
fadeOutWait: Milliseconds between brightness decrements during fade-out.
pixels: NeoPixel strip instance being controlled.
RISING: Direction constant for increasing brightness.
FALLING: Direction constant for decreasing brightness.
blink_delay: Milliseconds between blink toggles.
blink_direction: Current blink phase (RISING or FALLING).
"""
brightness = 0
min_bright = 1
max_bright = 100
bright_divider = 100
fadInDelay = 20
fadeOutWait = 10
fadeOutWait = fadInDelay
pixels = None
_fade_current_time = None
_fade_prev_time = None
_fade_direction = 1
counter_brightness = 0
RISING = 1
FALLING = -1
blink_delay = 60
blink_direction = RISING
prev_blinking = 0
[docs]
def __init__(self, pixels: neopixel.NeoPixel) -> None:
"""Initialize the animation engine with a NeoPixel strip.
Args:
pixels: NeoPixel strip instance to control.
"""
self.pixels = pixels
self.setFadeProperty()
self.blink_delay = 60
[docs]
def setFadeProperty(
self,
brightness: int = 0,
min_bright: int = 1,
max_bright: int = 100,
bright_divider: int = 100,
fadInDelay: int = 20,
fadeOutWait: int = 10,
) -> None:
"""Configure fade animation properties.
Args:
brightness: Initial brightness counter value.
min_bright: Minimum brightness counter for fade range.
max_bright: Maximum brightness counter for fade range.
bright_divider: Divisor to convert counter to NeoPixel brightness.
fadInDelay: Milliseconds between fade-in steps.
fadeOutWait: Milliseconds between fade-out steps.
"""
self.brightness = brightness
self.min_bright = min_bright
self.max_bright = max_bright
self.bright_divider = bright_divider
self.fadInDelay = fadInDelay
self.fadeOutWait = fadeOutWait
self._fade_current_time = self.millis()
self._fade_prev_time = self.millis()
self._fade_direction = 1
self.brightness = self.min_bright / self.bright_divider # OVERRIDE
self.pixels.brightness = self.brightness
self.counter_brightness = self.min_bright + 1
[docs]
def fade(self) -> None:
"""Advance the fade animation by one step (non-blocking)."""
self._fade_current_time = self.millis()
if self._fade_direction == self.RISING and self.counter_brightness >= self.max_bright:
self._fade_direction = self.FALLING
elif self._fade_direction == self.FALLING and self.counter_brightness <= self.min_bright:
self._fade_direction = self.RISING
__dif_time = self._fade_current_time - self._fade_prev_time
if (self._fade_direction == self.RISING and (__dif_time) >= self.fadInDelay) or (
self._fade_direction == self.FALLING and (__dif_time) >= self.fadeOutWait
):
_b_print = self.counter_brightness / self.bright_divider
self.counter_brightness += self._fade_direction
self.pixels.brightness = self.counter_brightness / self.bright_divider
self.pixels.show()
self._fade_prev_time = self._fade_current_time
[docs]
def blinking(self, blink_delay: int = 60) -> None:
"""Advance the blink animation by one step (non-blocking).
Args:
blink_delay: Milliseconds between on/off toggles.
"""
self.blink_delay = blink_delay
__current_blink = self.millis()
__dif_time = __current_blink - self.prev_blinking
if (__dif_time) > self.blink_delay:
if self.blink_direction == self.RISING:
self.pixels.brightness = self.max_bright / self.bright_divider
self.blink_direction = self.FALLING
self.prev_blinking = __current_blink
elif self.blink_direction == self.FALLING:
self.pixels.brightness = 0
self.blink_direction = self.RISING
self.prev_blinking = __current_blink
self.pixels.show()
[docs]
def micros(self) -> int:
"""Return the current time in microseconds."""
return int(time.time() * 1000000)
[docs]
def millis(self) -> int:
"""Return the current time in milliseconds."""
return int(time.time() * 1000)
[docs]
def main(
os_network_alert_file: None,
network_status: Queue | None = None,
beacon_queue: Queue | None = None,
) -> None:
"""Run the RGB LED main loop, updating status and beacon colors continuously.
Args:
os_network_alert_file: Unused (legacy network alert file path).
network_status: Queue providing integer network status codes for LED color.
beacon_queue: Queue providing AQI data for beacon LED coloring.
"""
current_time = int(time.time())
prev_time = int(time.time())
print_interval = 1
f = Magic(pixels=pixels)
beacon_color = unsupported_color
value = -1
while 1:
output = -1
current_time = int(time.time())
try:
output = network_status.queue[0]
# output = network_status[0]
status.get(output)[0]
except Exception:
try:
output = network_status.queue[1]
except Exception as e:
context_logger.error_with_context("RGB", f"main: {e}")
try:
# Get all the configuration
_color = status.get(output)[0] # color
_pattern = status.get(output)[1]
# print network status output
if (current_time - prev_time) > print_interval:
# print("Network : {0}".format(_output_string))
prev_time = int(time.time())
pixels[0] = _color
if beacon_flag:
try:
if not beacon_queue.empty():
data = beacon_queue.queue[0]
parameter = beacon_config["sc"]
if parameter == "aqi":
if data and parameter in data:
value = data[parameter]
else:
if data and parameter in data["d"]:
value = data["d"][parameter]
if value >= 0:
beacon_color = hex_to_rgb(get_color_from_value(value))
context_logger.info_with_context(
"BEACON",
f"value: {value} color: {get_color_from_value(value)}",
)
else:
beacon_color = unsupported_color
beacon_queue.queue.clear()
# Update Beacon color
pixels[1:] = [beacon_color] * 9
except Exception as e:
context_logger.error_with_context("BEACON", f"update: {e}")
# run patterns
if _pattern == pattern_normal_blink:
f.blinking(100)
elif _pattern == pattern_slow_blink:
f.blinking(600)
elif _pattern == pattern_fast_blink:
f.blinking(70)
elif _pattern == pattern_ultra_fast_blink:
f.blinking(30)
elif _pattern == pattern_fade:
f.fade()
except Exception as e:
pixels.fill(unsupported_color)
f.fade()
if (current_time - prev_time) > print_interval:
context_logger.error_with_context("RGB", f"main: {e}")
prev_time = int(time.time())
if __name__ == "__main__":
NETWORK_FILE = "/etc/ozone_network.txt"
main(NETWORK_FILE)