"""Driver for HONGYUV and Calypso ultrasonic wind sensors via SDI-12.
Communicates via a serial SDI-12 bridge to read wind speed, direction, and
gust values. Supports multiple sensor variants identified by part number.
Hardware:
Interface: UART via SDI-12 bridge
Port: /dev/ttyACM0 (typical)
Baud rate: 1200 (SDI-12 standard), 7E1 framing
Protocol: SDI-12 ASCII commands (e.g., ``0R0!``, ``0R6!``)
Typical usage::
import serial
from drivers.Wind.Wind import Wind
ser = serial.Serial(
port="/dev/ttyACM0", baudrate=1200,
bytesize=serial.SEVENBITS, parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE, timeout=2,
)
wind = Wind()
wind.initialize(ser, {"en": 1, "pn": 211, "parameters": [...]})
reading = wind.getSensorReading()
Note:
Requires ``pyserial`` for serial communication. Part numbers: 211 = 40 m/s
HONGYUV, 212 = Calypso, 213 = 60 m/s HONGYUV with gust support.
"""
import re
import time
import serial
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class Wind:
"""Driver for HONGYUV / Calypso ultrasonic wind sensors over SDI-12.
Sends SDI-12 commands through a serial bridge to read wind speed, direction,
and (optionally) gust. Accumulates time-series data for batch export.
Attributes:
ser: Open ``serial.Serial`` instance for the SDI-12 bridge.
configuration: Device configuration dictionary from Gateway.
address: SDI-12 sensor address (single character, default ``"0"``).
part_number: Hardware variant identifier (211, 212, or 213).
wind_angle: Accumulated wind direction readings in degrees.
wind_speed: Accumulated wind speed readings in m/s.
wind_gust: Accumulated gust speed readings in m/s (part 213 only).
timestamp: Accumulated epoch timestamps in milliseconds.
max_wind_speed: Maximum measurable wind speed for the sensor variant.
"""
[docs]
def __init__(self) -> None:
"""Initialize default attributes for the Wind sensor."""
self.ser: serial.Serial = None
self.configuration: dict = {}
self.address = "0"
self.debug = True
self.part_number = 211 # 211 for 40m/s, 213 for 60m/s
self.wind_angle: list[float] = []
self.wind_speed: list[float] = []
self.wind_gust: list[float] = []
self.timestamp: list[int] = []
self.max_wind_speed = 40 # default for part=1
self.gust_avg_period = 300
self.EOL = b"\r\n"
[docs]
def initialize(self, serial_port: serial.Serial, configuration: dict) -> bool:
"""Configure the wind sensor with a serial port and device configuration.
Verifies that the sensor responds at the configured SDI-12 address and
optionally sets the gust averaging period for part 213 sensors.
Args:
serial_port: An open ``serial.Serial`` instance connected to the
SDI-12 bridge.
configuration: Device configuration dict with keys like ``"pn"``,
``"debug"``, ``"sensor_address"``, ``"gust_avg_period"``, and
``"parameters"``.
Returns:
True if address verification succeeded, False otherwise.
"""
self.ser = serial_port
self.configuration = configuration
self.debug = configuration.get("debug", False)
self.part_number = configuration.get("pn", 211)
self.gust_avg_period = configuration.get("gust_avg_period", 300)
self.address = str(configuration.get("sensor_address", 0)) #Taking address from config
self.max_wind_speed = 40 if self.part_number == 211 else 60
self.retires = 3
try:
for attempts in range(self.retires):
init_response = self.verify_address()
if len(init_response) > 3:
context_logger.info_with_context(
"Wind", f"Sensor address confirmed: {self.address}"
)
break
context_logger.warning_with_context(
"Wind",
f"Sensor address mismatch: expected {self.address}, got {init_response}. Retrying...",
)
if attempts == self.retires - 1:
context_logger.error_with_context(
"Wind",
f"Failed to confirm sensor address after {self.retires} attempts.",
)
return False
if self.part_number == 213:
self.set_gust_avg_period(self.gust_avg_period)
return True
except Exception as e:
context_logger.error_with_context("Wind", f"initialize: {e}")
return False
[docs]
def send_command(self, command: str, delay: float = 1.5) -> str:
"""Send an SDI-12 command via the serial bridge and read the response.
Args:
command: ASCII SDI-12 command string (e.g., ``"0R0!"``).
delay: Seconds to wait after sending before reading. SDI-12 timing
requires a minimum delay for sensor processing.
Returns:
The decoded ASCII response string, or empty string on error.
"""
# Format command with proper SDI-12 termination
command = command.encode("ascii", errors="ignore")
payload = command + self.EOL
if self.debug:
context_logger.debug_with_context("Wind", f"Sending: {payload}")
try:
if not self.ser.is_open:
self.ser.open()
# Clear any pending data
self.ser.reset_input_buffer()
self.ser.reset_output_buffer()
# Send command with proper encoding
self.ser.write(payload)
self.ser.flush()
# Wait for response (SDI-12 timing)
# ATTENTION: ⚠️ This delay is crucial for SDI-12! if you reduce it, you may miss the response or garbage response you will get.
time.sleep(delay)
# Read response with timeout handling
if self.ser.in_waiting > 0:
response_bytes = self.ser.read(self.ser.in_waiting)
response = response_bytes.decode("ascii", errors="ignore").strip()
else:
response = ""
if self.debug:
context_logger.debug_with_context("Wind", f"Raw response: {response!r}")
return response
except serial.SerialException as e:
context_logger.error_with_context(
"Wind", f"Serial communication error: {e}"
)
return ""
except Exception as e:
context_logger.error_with_context(
"Wind", f"Unexpected error in send_command: {e}"
)
return ""
finally:
# Ensure port is closed even on error
try:
if self.ser and self.ser.is_open:
self.ser.close()
except Exception as close_error:
context_logger.error_with_context(
"Wind", f"Error closing serial port: {close_error}"
)
[docs]
def verify_address(self) -> str:
"""Verify the sensor responds at the configured SDI-12 address.
Returns:
The raw response string from the sensor.
"""
return self.send_command(f"{self.address}R0!", delay=1.5)
[docs]
def set_gust_avg_period(self, interval: int = 300) -> None:
"""Set the gust averaging period on the sensor (part 213 only).
Args:
interval: Averaging period in seconds (default 300).
"""
response = self.send_command(f"0XST0,C={interval}!")
if self.debug:
context_logger.debug_with_context(
"Wind", f"Gust avg interval set to : {response} ms"
)
[docs]
def parse_wind_response(self, response: str, cmd_type: str) -> dict | None:
"""Parse an SDI-12 response string into wind measurement values.
Handles different response formats based on command type and sensor
part number (211=HONGYUV, 212=Calypso, 213=HONGYUV with gust).
Args:
response: Raw ASCII response string from the sensor.
cmd_type: The SDI-12 command type that generated the response
(``"R0"`` for standard, ``"R6"`` for gust).
Returns:
A dict with ``"address"``, ``"speed"``, ``"angle"``, and ``"gust"``
keys, or None if the response could not be parsed.
"""
if not response:
return None
response = response.strip()
if len(response) < 2:
return None
address = response[0]
body = response[1:]
numeric_matches = list(re.finditer(r"[+-]\d+(?:\.\d+)?", body))
values = []
for match in numeric_matches:
try:
values.append(float(match.group(0)))
except ValueError:
values.append(None)
if cmd_type == "R0":
if len(values) < 2:
return None
if self.part_number == 212:
return {
"address": address,
"speed": values[0],
"angle": values[1],
"gust": None,
}
else:
return {
"address": address,
"angle": values[0],
"speed": values[1],
"gust": None,
}
if cmd_type == "R6":
if len(values) < 7:
return None
return {
"address": address,
"speed": values[0],
"angle": values[3],
"gust": values[6],
}
return None
[docs]
def get_speed_and_angle(self) -> tuple[float, float] | None:
"""Query the sensor for wind speed and direction via the R0 command.
Returns:
A tuple ``(angle, speed)`` in (degrees, m/s), or None on failure.
"""
try:
if not self.ser:
context_logger.error_with_context("Wind", "Serial port not initialized")
return None
# ---- Standard Measurement (R0) ----
response = self.send_command(f"{self.address}R0!")
vals = self.parse_wind_response(response, "R0")
if vals is not None:
angle_value = vals.get("angle")
speed_value = vals.get("speed")
if angle_value is not None:
self.wind_angle.append(angle_value)
if speed_value is not None:
self.wind_speed.append(speed_value)
return angle_value, speed_value
elif self.debug:
context_logger.debug_with_context(
"Wind", f"R0 payload rejected: {response!r}"
)
return None
except Exception as e:
context_logger.error_with_context("Wind", f"get_speed_and_angle: {e}")
return None
[docs]
def get_gust(self) -> float | None:
"""Query the sensor for gust speed via the R6 command (part 213 only).
Returns:
Gust speed in m/s, or None on failure.
"""
try:
if not self.ser:
context_logger.error_with_context("Wind", "Serial port not initialized")
return None
# ---- Gust Measurement (R6) only if part_number==213 ----
response = self.send_command(f"{self.address}R6!")
vals6 = self.parse_wind_response(response, "R6")
if vals6 is not None:
gust_value = vals6.get("gust")
if gust_value is not None:
self.wind_gust.append(gust_value)
return gust_value
elif self.debug:
context_logger.debug_with_context(
"Wind", f"R6 payload rejected: {response!r}"
)
return None
except Exception as e:
context_logger.error_with_context("Wind", f"get_gust: {e}")
return None
[docs]
def getSensorReading(self) -> dict:
"""Get the latest wind reading and accumulate values for batch export.
Queries speed, angle, and optionally gust, then maps them to the
configured parameter short-codes.
Returns:
A dict mapping parameter short-codes to their current values,
plus a ``"t"`` timestamp. Empty dict on failure.
"""
result = {}
try:
readings = self.get_speed_and_angle()
if readings is None:
return result
angle_value, speed_value = readings
gust_value = None
if self.part_number == 213:
gust_value = self.get_gust()
# This part returns the latest raw values, not averages.
for parameter in self.configuration.get("parameters", []):
pm = parameter.get("pm", 0)
if pm == 1 and angle_value is not None:
result[parameter["sc"]] = angle_value
if pm == 2 and speed_value is not None:
result[parameter["sc"]] = speed_value
if pm == 3 and self.part_number == 213 and gust_value is not None:
result[parameter["sc"]] = gust_value
result["t"] = int(time.time() * 1000)
self.timestamp.append(result["t"])
return result
except Exception as e:
context_logger.error_with_context("Wind", f"getSensorReading: {e}")
return result
[docs]
def putSensorValue(self, result: dict | None = None) -> dict:
"""Export accumulated wind data as time-series lists and reset buffers.
Args:
result: Optional dict to populate with time-series data.
Creates a new dict if None.
Returns:
The result dict with each parameter mapped to
``[[values...], [timestamps...]]``.
"""
if self.debug:
context_logger.debug_with_context("Wind", "Aggregating wind data...")
if result is None:
result = {}
for parameter in self.configuration.get("parameters", []):
pm = parameter.get("pm", 0)
if pm == 1:
result[parameter["sc"]] = [list(self.wind_angle), list(self.timestamp)]
if pm == 2:
result[parameter["sc"]] = [list(self.wind_speed), list(self.timestamp)]
if pm == 3 and self.part_number == 213:
result[parameter["sc"]] = [list(self.wind_gust), list(self.timestamp)]
# Clear lists for the next aggregation period
self.wind_speed.clear()
self.wind_angle.clear()
self.wind_gust.clear()
self.timestamp.clear()
return result
if __name__ == "__main__":
import json
wind = Wind()
# Note: Ensure the serial port and parameters match your device.
# For SDI-12, baud rate is 1200, 7 data bits, even parity, 1 stop bit.
# The pyserial default is 8N1, which might not work for all SDI-12 bridges.
try:
ser = serial.Serial(
port="/dev/ttyACM0",
baudrate=1200,
bytesize=serial.SEVENBITS,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE,
timeout=2,
)
except serial.SerialException as e:
context_logger.error_with_context("Wind", f"Error opening serial port: {e}")
exit()
wind.initialize(
ser,
{
"en": 1,
"pn": 211,
"debug": True,
"sensor_address": 0,
"parameters": [
{"sc": "wd", "pm": 1},
{"sc": "ws", "pm": 2},
{"sc": "wg", "pm": 3},
],
},
)
for i in range(5):
context_logger.info_with_context("Wind", f"Reading {i + 1}/5...")
wind.getSensorReading()
time.sleep(2)
context_logger.info_with_context("Wind", "\nAggregating data...")
data = wind.putSensorReading()
context_logger.info_with_context(
"Wind", f"Final aggregated data: {json.dumps(data, indent=2)}"
)