Source code for drivers.Wind.Wind

"""Driver for HONGYUV and Calypso ultrasonic wind sensors via SDI-12.

Communicates via a serial SDI-12 bridge to read wind speed, direction, and
gust values. Supports multiple sensor variants identified by part number.

Hardware:
    Interface: UART via SDI-12 bridge
    Port: /dev/ttyACM0 (typical)
    Baud rate: 1200 (SDI-12 standard), 7E1 framing
    Protocol: SDI-12 ASCII commands (e.g., ``0R0!``, ``0R6!``)

Typical usage::

    import serial
    from drivers.Wind.Wind import Wind

    ser = serial.Serial(
        port="/dev/ttyACM0", baudrate=1200,
        bytesize=serial.SEVENBITS, parity=serial.PARITY_EVEN,
        stopbits=serial.STOPBITS_ONE, timeout=2,
    )
    wind = Wind()
    wind.initialize(ser, {"en": 1, "pn": 211, "parameters": [...]})
    reading = wind.getSensorReading()

Note:
    Requires ``pyserial`` for serial communication. Part numbers: 211 = 40 m/s
    HONGYUV, 212 = Calypso, 213 = 60 m/s HONGYUV with gust support.
"""

import re
import time

import serial

from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class Wind: """Driver for HONGYUV / Calypso ultrasonic wind sensors over SDI-12. Sends SDI-12 commands through a serial bridge to read wind speed, direction, and (optionally) gust. Accumulates time-series data for batch export. Attributes: ser: Open ``serial.Serial`` instance for the SDI-12 bridge. configuration: Device configuration dictionary from Gateway. address: SDI-12 sensor address (single character, default ``"0"``). part_number: Hardware variant identifier (211, 212, or 213). wind_angle: Accumulated wind direction readings in degrees. wind_speed: Accumulated wind speed readings in m/s. wind_gust: Accumulated gust speed readings in m/s (part 213 only). timestamp: Accumulated epoch timestamps in milliseconds. max_wind_speed: Maximum measurable wind speed for the sensor variant. """
[docs] def __init__(self) -> None: """Initialize default attributes for the Wind sensor.""" self.ser: serial.Serial = None self.configuration: dict = {} self.address = "0" self.debug = True self.part_number = 211 # 211 for 40m/s, 213 for 60m/s self.wind_angle: list[float] = [] self.wind_speed: list[float] = [] self.wind_gust: list[float] = [] self.timestamp: list[int] = [] self.max_wind_speed = 40 # default for part=1 self.gust_avg_period = 300 self.EOL = b"\r\n"
[docs] def initialize(self, serial_port: serial.Serial, configuration: dict) -> bool: """Configure the wind sensor with a serial port and device configuration. Verifies that the sensor responds at the configured SDI-12 address and optionally sets the gust averaging period for part 213 sensors. Args: serial_port: An open ``serial.Serial`` instance connected to the SDI-12 bridge. configuration: Device configuration dict with keys like ``"pn"``, ``"debug"``, ``"sensor_address"``, ``"gust_avg_period"``, and ``"parameters"``. Returns: True if address verification succeeded, False otherwise. """ self.ser = serial_port self.configuration = configuration self.debug = configuration.get("debug", False) self.part_number = configuration.get("pn", 211) self.gust_avg_period = configuration.get("gust_avg_period", 300) self.address = str(configuration.get("sensor_address", 0)) #Taking address from config self.max_wind_speed = 40 if self.part_number == 211 else 60 self.retires = 3 try: for attempts in range(self.retires): init_response = self.verify_address() if len(init_response) > 3: context_logger.info_with_context( "Wind", f"Sensor address confirmed: {self.address}" ) break context_logger.warning_with_context( "Wind", f"Sensor address mismatch: expected {self.address}, got {init_response}. Retrying...", ) if attempts == self.retires - 1: context_logger.error_with_context( "Wind", f"Failed to confirm sensor address after {self.retires} attempts.", ) return False if self.part_number == 213: self.set_gust_avg_period(self.gust_avg_period) return True except Exception as e: context_logger.error_with_context("Wind", f"initialize: {e}") return False
[docs] def send_command(self, command: str, delay: float = 1.5) -> str: """Send an SDI-12 command via the serial bridge and read the response. Args: command: ASCII SDI-12 command string (e.g., ``"0R0!"``). delay: Seconds to wait after sending before reading. SDI-12 timing requires a minimum delay for sensor processing. Returns: The decoded ASCII response string, or empty string on error. """ # Format command with proper SDI-12 termination command = command.encode("ascii", errors="ignore") payload = command + self.EOL if self.debug: context_logger.debug_with_context("Wind", f"Sending: {payload}") try: if not self.ser.is_open: self.ser.open() # Clear any pending data self.ser.reset_input_buffer() self.ser.reset_output_buffer() # Send command with proper encoding self.ser.write(payload) self.ser.flush() # Wait for response (SDI-12 timing) # ATTENTION: ⚠️ This delay is crucial for SDI-12! if you reduce it, you may miss the response or garbage response you will get. time.sleep(delay) # Read response with timeout handling if self.ser.in_waiting > 0: response_bytes = self.ser.read(self.ser.in_waiting) response = response_bytes.decode("ascii", errors="ignore").strip() else: response = "" if self.debug: context_logger.debug_with_context("Wind", f"Raw response: {response!r}") return response except serial.SerialException as e: context_logger.error_with_context( "Wind", f"Serial communication error: {e}" ) return "" except Exception as e: context_logger.error_with_context( "Wind", f"Unexpected error in send_command: {e}" ) return "" finally: # Ensure port is closed even on error try: if self.ser and self.ser.is_open: self.ser.close() except Exception as close_error: context_logger.error_with_context( "Wind", f"Error closing serial port: {close_error}" )
[docs] def verify_address(self) -> str: """Verify the sensor responds at the configured SDI-12 address. Returns: The raw response string from the sensor. """ return self.send_command(f"{self.address}R0!", delay=1.5)
[docs] def set_gust_avg_period(self, interval: int = 300) -> None: """Set the gust averaging period on the sensor (part 213 only). Args: interval: Averaging period in seconds (default 300). """ response = self.send_command(f"0XST0,C={interval}!") if self.debug: context_logger.debug_with_context( "Wind", f"Gust avg interval set to : {response} ms" )
[docs] def parse_wind_response(self, response: str, cmd_type: str) -> dict | None: """Parse an SDI-12 response string into wind measurement values. Handles different response formats based on command type and sensor part number (211=HONGYUV, 212=Calypso, 213=HONGYUV with gust). Args: response: Raw ASCII response string from the sensor. cmd_type: The SDI-12 command type that generated the response (``"R0"`` for standard, ``"R6"`` for gust). Returns: A dict with ``"address"``, ``"speed"``, ``"angle"``, and ``"gust"`` keys, or None if the response could not be parsed. """ if not response: return None response = response.strip() if len(response) < 2: return None address = response[0] body = response[1:] numeric_matches = list(re.finditer(r"[+-]\d+(?:\.\d+)?", body)) values = [] for match in numeric_matches: try: values.append(float(match.group(0))) except ValueError: values.append(None) if cmd_type == "R0": if len(values) < 2: return None if self.part_number == 212: return { "address": address, "speed": values[0], "angle": values[1], "gust": None, } else: return { "address": address, "angle": values[0], "speed": values[1], "gust": None, } if cmd_type == "R6": if len(values) < 7: return None return { "address": address, "speed": values[0], "angle": values[3], "gust": values[6], } return None
[docs] def get_speed_and_angle(self) -> tuple[float, float] | None: """Query the sensor for wind speed and direction via the R0 command. Returns: A tuple ``(angle, speed)`` in (degrees, m/s), or None on failure. """ try: if not self.ser: context_logger.error_with_context("Wind", "Serial port not initialized") return None # ---- Standard Measurement (R0) ---- response = self.send_command(f"{self.address}R0!") vals = self.parse_wind_response(response, "R0") if vals is not None: angle_value = vals.get("angle") speed_value = vals.get("speed") if angle_value is not None: self.wind_angle.append(angle_value) if speed_value is not None: self.wind_speed.append(speed_value) return angle_value, speed_value elif self.debug: context_logger.debug_with_context( "Wind", f"R0 payload rejected: {response!r}" ) return None except Exception as e: context_logger.error_with_context("Wind", f"get_speed_and_angle: {e}") return None
[docs] def get_gust(self) -> float | None: """Query the sensor for gust speed via the R6 command (part 213 only). Returns: Gust speed in m/s, or None on failure. """ try: if not self.ser: context_logger.error_with_context("Wind", "Serial port not initialized") return None # ---- Gust Measurement (R6) only if part_number==213 ---- response = self.send_command(f"{self.address}R6!") vals6 = self.parse_wind_response(response, "R6") if vals6 is not None: gust_value = vals6.get("gust") if gust_value is not None: self.wind_gust.append(gust_value) return gust_value elif self.debug: context_logger.debug_with_context( "Wind", f"R6 payload rejected: {response!r}" ) return None except Exception as e: context_logger.error_with_context("Wind", f"get_gust: {e}") return None
[docs] def getSensorReading(self) -> dict: """Get the latest wind reading and accumulate values for batch export. Queries speed, angle, and optionally gust, then maps them to the configured parameter short-codes. Returns: A dict mapping parameter short-codes to their current values, plus a ``"t"`` timestamp. Empty dict on failure. """ result = {} try: readings = self.get_speed_and_angle() if readings is None: return result angle_value, speed_value = readings gust_value = None if self.part_number == 213: gust_value = self.get_gust() # This part returns the latest raw values, not averages. for parameter in self.configuration.get("parameters", []): pm = parameter.get("pm", 0) if pm == 1 and angle_value is not None: result[parameter["sc"]] = angle_value if pm == 2 and speed_value is not None: result[parameter["sc"]] = speed_value if pm == 3 and self.part_number == 213 and gust_value is not None: result[parameter["sc"]] = gust_value result["t"] = int(time.time() * 1000) self.timestamp.append(result["t"]) return result except Exception as e: context_logger.error_with_context("Wind", f"getSensorReading: {e}") return result
[docs] def putSensorValue(self, result: dict | None = None) -> dict: """Export accumulated wind data as time-series lists and reset buffers. Args: result: Optional dict to populate with time-series data. Creates a new dict if None. Returns: The result dict with each parameter mapped to ``[[values...], [timestamps...]]``. """ if self.debug: context_logger.debug_with_context("Wind", "Aggregating wind data...") if result is None: result = {} for parameter in self.configuration.get("parameters", []): pm = parameter.get("pm", 0) if pm == 1: result[parameter["sc"]] = [list(self.wind_angle), list(self.timestamp)] if pm == 2: result[parameter["sc"]] = [list(self.wind_speed), list(self.timestamp)] if pm == 3 and self.part_number == 213: result[parameter["sc"]] = [list(self.wind_gust), list(self.timestamp)] # Clear lists for the next aggregation period self.wind_speed.clear() self.wind_angle.clear() self.wind_gust.clear() self.timestamp.clear() return result
if __name__ == "__main__": import json wind = Wind() # Note: Ensure the serial port and parameters match your device. # For SDI-12, baud rate is 1200, 7 data bits, even parity, 1 stop bit. # The pyserial default is 8N1, which might not work for all SDI-12 bridges. try: ser = serial.Serial( port="/dev/ttyACM0", baudrate=1200, bytesize=serial.SEVENBITS, parity=serial.PARITY_EVEN, stopbits=serial.STOPBITS_ONE, timeout=2, ) except serial.SerialException as e: context_logger.error_with_context("Wind", f"Error opening serial port: {e}") exit() wind.initialize( ser, { "en": 1, "pn": 211, "debug": True, "sensor_address": 0, "parameters": [ {"sc": "wd", "pm": 1}, {"sc": "ws", "pm": 2}, {"sc": "wg", "pm": 3}, ], }, ) for i in range(5): context_logger.info_with_context("Wind", f"Reading {i + 1}/5...") wind.getSensorReading() time.sleep(2) context_logger.info_with_context("Wind", "\nAggregating data...") data = wind.putSensorReading() context_logger.info_with_context( "Wind", f"Final aggregated data: {json.dumps(data, indent=2)}" )