Source code for OzWrapper.OzError.OzError

"""Sensor data filtering and aggregation wrapper.

Provides a configurable pipeline of range, delta, statistical, and low-pass
filters followed by aggregation functions (average, energy-average, vector
wind) to clean raw sensor time-series before publication.
"""

import json
import math
import os
import statistics

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


# TODO: add old value record
[docs] class OzError: """Sensor data quality filter and aggregation engine. Applies a configurable chain of filters (range, delta, statistical, low-pass) to raw sensor readings, then aggregates the cleaned values using parameter-specific strategies (arithmetic mean, energy dB average, vector wind, latest GPS, min/max). Attributes: DEBUG: Class-level debug flag. counter: Warm-up countdown; filters are bypassed until this reaches 0. publish_interval: Seconds between publish cycles (used for warm-up display). debug_mode: Instance-level verbose logging toggle. low_pass_cache_values: Per-key cache for the exponential moving average filter. cache_filtered_values: Per-key cache of the most recent filtered series. old_values: Per-key cache of the last aggregated value (fallback on empty series). filter_length: Number of samples required before low-pass filtering activates. """ DEBUG = False
[docs] def __init__(self) -> None: """Initialize OzError with default filter config for all known parameters.""" super().__init__() # Initialize counter as instance attribute for sensor warm-up period self.counter = 15 self.publish_interval = 300 # in seconds self.debug_mode = True self.low_pass_cache_values: dict = {} self.cache_filtered_values: dict = {} self.old_values: dict = {} self.filter_length: int = 20 self.__error_config: dict[str, dict] = { "temp": { "min": -60, "max": 85, "diff": 5, "filter": [0, 2], "aggregate": [0], }, "hum": { "min": 1, "max": 100, "diff": 5, "filter": [0, 2], "aggregate": [0], }, "t1": { "min": -40, "max": 85, "diff": 5, "filter": [0, 2], "aggregate": [0], }, "t2": {"min": 0, "max": 100, "diff": 5, "filter": [0, 2], "aggregate": [0]}, "pr": { "min": 0, "max": 20000, "diff": 10, "filter": [0, 2], "aggregate": [0], }, "bs": {"min": 0, "max": 100, "diff": 5, "filter": [0, 2], "aggregate": [0]}, "flood": { "min": 0, "max": 5000, "diff": 20, "filter": [0, 2], "aggregate": [0], }, "p1": { "min": 0, "max": 10000, "diff": 5, "filter": [0, 2], "aggregate": [0], }, "p2": { "min": 0, "max": 10000, "diff": 5, "filter": [0, 2], "aggregate": [0], }, "p3": { "min": 0, "max": 1000, "diff": 5, "filter": [0, 2], "aggregate": [0], }, "p4": { "min": 0, "max": 30000, "diff": 5, "filter": [0, 2], "aggregate": [0], }, "g1": { "min": 0, "max": 5000, "diff": 150, "filter": [0, 2], "aggregate": [0], }, "uv": {"min": 0, "max": 15, "diff": 5, "filter": [0, 2], "aggregate": [0]}, "g21": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "g25": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "g31": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "g35": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "g51": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "g55": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "g61": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "g65": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "g71": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "g75": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "g81": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "g85": { "min": 0, "max": 5000, "diff": 500, "filter": [3], "aggregate": [0], }, "n11": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [3]}, "n21": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [3]}, "n31": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [3]}, "n12": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [1]}, "n22": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [1]}, "n32": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [1]}, "n13": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [2]}, "n23": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [2]}, "n33": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [2]}, "n14": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [2]}, "n24": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [2]}, "n34": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [2]}, "n15": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [0]}, "n25": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [0]}, "n35": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [0]}, "n16": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [0]}, "n26": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [0]}, "n36": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [0]}, "n17": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [0]}, "n27": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [0]}, "n37": {"min": 20, "max": 130, "diff": 20, "filter": [0], "aggregate": [0]}, "vb1": {"min": -8, "max": 8, "diff": 1, "filter": [0], "aggregate": [0]}, "vb2": {"min": -8, "max": 8, "diff": 1, "filter": [0], "aggregate": [0]}, "vb3": {"min": -8, "max": 8, "diff": 1, "filter": [0], "aggregate": [0]}, "vb4": { "min": -2500, "max": 2500, "diff": 100, "filter": [0], "aggregate": [0], }, "vb5": { "min": -2500, "max": 2500, "diff": 100, "filter": [0], "aggregate": [0], }, "vb6": { "min": -2500, "max": 2500, "diff": 100, "filter": [0], "aggregate": [0], }, "wd": { "min": 0, "max": 360, "diff": 30, "filter": [0], "aggregate": [4], "key_relation": ["ws"], }, "ws": { "min": 0, "max": 60, "diff": 10, "filter": [0], "aggregate": [5], "key_relation": ["wd"], }, "wg": {"min": 0, "max": 60, "diff": 10, "filter": [0], "aggregate": [0]}, "lat": {"min": -90, "max": 90, "diff": 10, "filter": [0], "aggregate": [6]}, "lon": { "min": -180, "max": 180, "diff": 10, "filter": [0], "aggregate": [6], }, } self.__filter_functions = { 0: self.range_filter, 1: self.delta_filter, 2: self.statistical_filter, 3: self.low_pass_filter, } self.__aggregate_functions = { 0: self.average, 1: self.min_value, 2: self.max_value, 3: self.energy_average, 4: self.vector_direction, 5: self.vector_magnitude, 6: self.latest_value, }
[docs] def putSensorValue(self, values: dict[str, list] | dict[str, list[list]], calibration: bool = False) -> dict: """Filter and aggregate raw sensor value lists into scalar outputs. Args: values: Dict mapping send-codes to raw value lists (or nested [values, timestamps] pairs). calibration: If True, skip filtering and use simple averaging. Returns: The updated dict with each key replaced by its aggregated scalar value, plus optional confidence keys (``<key>_cf``) when outliers were removed. """ if self.debug_mode: context_logger.debug_with_context("OzError", f"Unfiltered: {values}") total_seconds = self.publish_interval * self.counter minutes = int(total_seconds // 60) seconds = int(total_seconds % 60) context_logger.debug_with_context( "OzError", f"Remaining Warm Up time: {minutes} minutes {seconds} seconds ({self.counter} cycles)", ) # Track which sensors received new input data THIS cycle (before any modifications) # TODO: 📋 sensors_with_new_data = set(values.keys()) try: # ATTENTION: ⚠️ Convert to list to allow safe modification of `values` while iterating for key in list(values): try: val = values[key] # NEXT STEP: ➡️ Extract parameter data from the new/legacy format series = self._extract_series(val) if len(series) == 0: # TODO: 📋 self.cache_filtered_values[key] = [] values.update({key: 0.0}) continue self.cache_filtered_values[key] = series if calibration: continue # NEXT STEP: ➡️ Apply filter on the extracted data remove_count, filtered_values = self.sensor_filter(key, series) self.cache_filtered_values[key] = filtered_values if remove_count > 0 and self.counter == 0: confidence_on_value = (len(filtered_values) / len(series)) * 100 values.update({key + "_cf": confidence_on_value}) except Exception as e: context_logger.error_with_context("OzError", f"Error in filter loop for {key}: {e}") for key in list(self.cache_filtered_values): try: filtered_values = self.cache_filtered_values.get(key, []) if len(filtered_values) > 0: if not calibration and key in self.__error_config: val = self.aggregate_value(key) values.update({key: val}) self.old_values.update({key: val}) else: val = self.average(key) values.update({key: val}) self.old_values.update({key: val}) else: if key in self.old_values: values.update({key: self.old_values[key]}) else: values.update({key: 0.0}) except Exception as e: context_logger.error_with_context("OzError", f"Error in aggregation loop for {key}: {e}") values.update({key: 0.0}) except Exception as e: context_logger.error_with_context("OzError", f"putSensorValue error: {e}") context_logger.debug_with_context("OzError", f"Filtered: {values}") return values
[docs] def setConfig(self, config: dict) -> None: """Merge external filter configuration into the error config. Args: config: Dict with parameter filter overrides and optional 'length' and 'debug' keys. """ self.__error_config.update(config) if "length" in config: self.filter_length = int(config["length"]) if "debug" in config: self.DEBUG = True if config["debug"] else False
def _extract_series(self, data: list | list[list] | float) -> list: """Return cleaned numeric series and optional timestamps. Accepts: - legacy: [v1, v2, ...] - new: [[v1, v2, ...], [ts1, ts2, ...]] Filters out non-numeric / NaN entries and aligns timestamps if present. """ try: # new format: [values, timestamps] if isinstance(data, (list, tuple)) and len(data) == 2: raw_values = data[0] if isinstance(data[0], (list, tuple)) else [] cleaned_values = [] for v in raw_values: try: num = float(v) if not math.isnan(num): cleaned_values.append(num) except (ValueError, TypeError): continue return cleaned_values # flat sequence (legacy) -> clean and return if isinstance(data, (list, tuple)): cleaned_values = [] for v in data: try: num = float(v) if not math.isnan(num): cleaned_values.append(num) except (ValueError, TypeError): continue return cleaned_values # scalar value (number or numeric string) try: fv = float(data) if not math.isnan(fv): return [fv] return [] except Exception: return [] except Exception as e: context_logger.error_with_context("OzError", f"_extract_series error: {e}") return []
[docs] def sensor_filter(self, key: str, values: list) -> tuple[int, list]: """Run the configured filter pipeline for a given sensor key. Args: key: Sensor send-code identifying the parameter. values: Raw numeric values to filter. Returns: Tuple of (total_removed_count, filtered_values). """ # ATTENTION: ⚠️ Convert to list to allow safe modification of `values` while iterating filtered_values: list = list(values) total_remove_count = 0 if key in self.__error_config: filter_pipeline = self.__error_config[key].get("filter", [0]) for filter_id in filter_pipeline: try: filter_fn = self.__filter_functions.get(filter_id) if filter_fn is None: context_logger.warning_with_context( "OzError", f"Unknown filter id {filter_id} for key {key}", ) continue remove_count, filtered_values = filter_fn(key, filtered_values) total_remove_count += remove_count except Exception as e: context_logger.error_with_context("OzError", f"Sensor filter error: {e}") return total_remove_count, filtered_values
[docs] def aggregate_value(self, key: str) -> float: """Run the configured aggregation pipeline for a given sensor key. Args: key: Sensor send-code identifying the parameter. Returns: The final aggregated scalar value. """ aggregated_value = self.average(key) if key in self.__error_config: aggregate_pipeline = self.__error_config[key].get("aggregate", [0]) for aggregate_id in aggregate_pipeline: try: aggregate_fn = self.__aggregate_functions.get(aggregate_id) if aggregate_fn is None: context_logger.warning_with_context( "OzError", f"Unknown aggregate id {aggregate_id} for key {key}", ) continue aggregated_value = aggregate_fn(key) except Exception as e: context_logger.error_with_context("OzError", f"Sensor aggregate error: {e}") return aggregated_value
[docs] def range_filter(self, key: str, values: list) -> tuple[int, list]: """Remove values outside the configured min/max range for a parameter. Args: key: Sensor send-code for config lookup. values: Numeric values to filter. Returns: Tuple of (removed_count, values_within_range). """ filtered_values = [] remove_count = 0 try: min_threshold = self.__error_config[key]["min"] max_threshold = self.__error_config[key]["max"] for value in values: if min_threshold <= value <= max_threshold: filtered_values.append(value) remove_count = len(values) - len(filtered_values) return remove_count, filtered_values except Exception as e: context_logger.error_with_context("OzError", f"Range filter error for key {key}: {e}") return remove_count, values
[docs] def delta_filter(self, key: str, values: list) -> tuple[int, list]: """Remove spike values that exceed the configured delta from the previous reading. Args: key: Sensor send-code for config lookup. values: Numeric values to filter. Returns: Tuple of (removed_count, spike-free_values). """ filtered_values = [] remove_count = 0 # Need at least 2 values to compare if len(values) < 2 or self.counter > 0: return remove_count, values try: prev_value = self.old_values.get(key, values[0]) for value in values: if abs(value - prev_value) <= self.__error_config[key]["diff"]: filtered_values.append(value) prev_value = value remove_count = len(values) - len(filtered_values) return remove_count, filtered_values except Exception as e: context_logger.error_with_context("OzError", f"Delta filter error for key {key}: {e}") return remove_count, values
[docs] def statistical_filter(self, key: str, values: list) -> tuple[int, list]: """Remove outliers beyond a configurable number of standard deviations. Args: key: Sensor send-code for config lookup. values: Numeric values to filter. Returns: Tuple of (removed_count, values_within_statistical_bounds). """ filtered_values = [] remove_count = 0 # Need at least 3 values for meaningful standard deviation if len(values) < 3 or self.counter > 0: return remove_count, values try: mean_value = round(statistics.mean(values), 2) stdev_value = round(statistics.stdev(values), 2) threshold_multiplier = self.__error_config[key]["diff"] if self.debug_mode: context_logger.debug_with_context("OzError", {"mean": mean_value, "key": key}) context_logger.debug_with_context("OzError", {"sd": stdev_value, "key": key}) # Calculate acceptance bounds lower_bound = mean_value - (threshold_multiplier * stdev_value) upper_bound = mean_value + (threshold_multiplier * stdev_value) for value in values: if lower_bound <= value <= upper_bound: filtered_values.append(value) remove_count = len(values) - len(filtered_values) return remove_count, filtered_values except statistics.StatisticsError as e: context_logger.error_with_context("OzError", f"Spike Detect v2 statistics error: {e}") return remove_count, values except Exception as e: context_logger.error_with_context("OzError", f"Spike Detect v2 error: {e}") return remove_count, values
[docs] def low_pass_filter(self, key: str, values: list, alpha: float = 0.15) -> tuple[int, list]: """ Apply exponential moving average (low-pass) filter to smooth sensor readings. Reduces high-frequency noise while preserving signal trends. Formula: filtered[n] = α × input[n] + (1 - α) × filtered[n-1] Args: key: Sensor key for cache lookup values: List of sensor values to filter alpha: Smoothing factor (0 < α ≤ 1). Lower = more smoothing Default 0.15 provides good noise reduction Returns: tuple: (remove_count, filtered_values) - remove_count: Always 0 (low-pass doesn't remove values) - filtered_values: Smoothed values Note: Requires accumulating filter_length values before filtering begins. During accumulation, returns original values unchanged. """ filtered_values = [] remove_count = 0 # Validate alpha parameter if not (0 < alpha <= 1): context_logger.warning_with_context( "OzError", f"{key}: Invalid alpha={alpha}. Must be in range (0, 1]. Using default 0.15", ) alpha = 0.15 # Initialize cache for this sensor if not exists if key not in self.low_pass_cache_values: self.low_pass_cache_values[key] = [] # Append new values to cache self.low_pass_cache_values[key].extend(values) # Maintain only the latest filter_length values in cache if len(self.low_pass_cache_values[key]) > self.filter_length: self.low_pass_cache_values[key] = self.low_pass_cache_values[key][-self.filter_length :] # If not enough values accumulated yet, return original values if len(self.low_pass_cache_values[key]) < self.filter_length: return remove_count, values # Apply low-pass filter try: prev_output = self.low_pass_cache_values[key][0] # Start with first cached value for value in self.low_pass_cache_values[key]: filtered_value = alpha * value + (1 - alpha) * prev_output filtered_values.append(round(filtered_value, 2)) prev_output = filtered_value # Clear cache after processing to avoid reprocessing same data self.low_pass_cache_values[key] = [] return remove_count, filtered_values except Exception as e: context_logger.error_with_context("OzError", f"Low-pass filter error for key {key}: {e}") # Clear problematic cache to avoid repeated errors self.low_pass_cache_values[key] = [] return remove_count, values
[docs] def average(self, key: str) -> float: """Compute arithmetic mean of the cached filtered values for a key. Args: key: Sensor send-code to look up in the filtered cache. Returns: Rounded average, or 0.0 if the series is empty. """ try: values = self.cache_filtered_values.get(key, []) value = sum(values) / len(values) if key == "rain": return value # Return raw value for rain without rounding return round(value, 2) except ZeroDivisionError: return 0.0
[docs] def min_value(self, key: str) -> float: """Return the minimum of the cached filtered values for a key. Args: key: Sensor send-code to look up in the filtered cache. Returns: Rounded minimum, or 0.0 if the series is empty. """ try: values = self.cache_filtered_values.get(key, []) value = min(values) return round(value, 2) except ValueError: return 0.0
[docs] def max_value(self, key: str) -> float: """Return the maximum of the cached filtered values for a key. Args: key: Sensor send-code to look up in the filtered cache. Returns: Rounded maximum, or 0.0 if the series is empty. """ try: values = self.cache_filtered_values.get(key, []) value = max(values) return round(value, 2) except ValueError: return 0.0
[docs] def energy_average(self, key: str) -> float: """Compute acoustically correct energy average for decibel values. Uses the formula: 10 * log10(mean(10^(dB/10))) with numerical stability via max-value subtraction. Args: key: Sensor send-code to look up in the filtered cache. Returns: Energy-averaged dB value, or arithmetic mean as fallback. """ energy_sum = 0.0 energy_avg = 0.0 result = 0.0 try: values = self.cache_filtered_values.get(key, []) # Acoustic energy averaging: 10*log10(mean(10^(dB/10))) max_val = max(values) # For numerical stability for val in values: energy_sum += 10 ** ((val - max_val) / 10.0) energy_avg = energy_sum / len(values) if energy_avg <= 0: return 0.0 result = max_val + 10.0 * math.log10(energy_avg) return round(result, 2) except (OverflowError, ValueError, ZeroDivisionError): # Fallback to arithmetic mean value = sum(values) / len(values) return round(value, 2)
[docs] def vector_direction(self, key: str) -> float: """Compute vector-averaged direction from directional data and related magnitudes. Uses U/V component decomposition for proper circular averaging of wind direction paired with wind speed magnitudes. Args: key: Sensor send-code for the directional parameter (e.g., 'wd'). Returns: Vector-averaged direction in degrees [0, 360), or 0.0 if empty. """ u = self.cache_filtered_values.get(key, []) for related_key in self.__error_config[key].get("key_relation", []): v = self.cache_filtered_values.get(related_key, []) sum_u = 0.0 sum_v = 0.0 count = 0 for i in range(len(u)): # Safety check if i >= len(v): break rad = math.radians(u[i]) sum_u += math.sin(rad) * v[i] sum_v += math.cos(rad) * v[i] count += 1 if count == 0: return 0.0 avg_u = sum_u / count avg_v = sum_v / count avg_angle = (math.degrees(math.atan2(avg_u, avg_v)) + 360.0) % 360.0 if avg_angle == 0: avg_angle = sum(u) / len(u) if key == "wd": return int(round(avg_angle, 0)) return round(avg_angle, 2)
[docs] def latest_value(self, key: str) -> float: """Return the most recent value from the filtered cache (used for GPS coordinates). Args: key: Sensor send-code to look up in the filtered cache. Returns: The last value in the series, or 0.0 if empty. """ values = self.cache_filtered_values.get(key, []) if len(values) == 0: return 0.0 return values[-1]
[docs] def vector_magnitude(self, key: str) -> float: """Compute vector-averaged magnitude from speed and related direction data. Uses U/V component decomposition to compute the resultant wind speed magnitude from paired speed and direction series. Args: key: Sensor send-code for the magnitude parameter (e.g., 'ws'). Returns: Vector-averaged magnitude, or 0.0 if empty. """ v = self.cache_filtered_values.get(key, []) for related_key in self.__error_config[key].get("key_relation", []): u = self.cache_filtered_values.get(related_key, []) sum_u = 0.0 sum_v = 0.0 count = 0 for i in range(len(u)): # Safety check if i >= len(v): break rad = math.radians(u[i]) sum_u += math.sin(rad) * v[i] sum_v += math.cos(rad) * v[i] count += 1 if count == 0: return 0.0 avg_u = sum_u / count avg_v = sum_v / count magnitude = math.sqrt(avg_u**2 + avg_v**2) return round(magnitude, 2)
if __name__ == "__main__": context_logger.debug_with_context("OzError", "--- OzError Standalone Test ---") dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, "error.config.json") # 1. Load Config or Use Default values = {} if os.path.exists(file_name): try: with open(file_name) as configFile: config = json.loads(configFile.read()) values = config.get("dummy", {}) context_logger.debug_with_context("OzError", f"Loaded test data from {file_name}") except Exception as e: context_logger.debug_with_context("OzError", f"Failed to load config: {e}") if not values: context_logger.debug_with_context("OzError", "Using internal test data (Temp sensor with spike)") values = { "temp": [24.5, 24.6, 24.5, 100.0, 24.7, 24.6], # 100.0 is a spike "hum": [50, 51, 50, 50, 51, 50], } # 2. Initialize error = OzError() error.debug_mode = True # 3. Simulate Warm-up bypass for testing # Since counter is 15 by default, filters won't run unless we clear it context_logger.debug_with_context("OzError", "Skipping warm-up for test...") error.counter = 0 context_logger.debug_with_context("OzError", f"\nInput Data: {values}") # 4. Run Processing output = error.putSensorValue(values) # 5. Show Results context_logger.debug_with_context("OzError", "\n--- Output Data ---") context_logger.debug_with_context("OzError", json.dumps(output))