Source code for OzWrapper.OzSoil.OzSoil

"""Soil sensor wrapper.

Abstracts over multiple soil sensor drivers -- JXBS3001, SEN0600, and NIUBOL
(8-in-1 and 3-in-1) -- providing a unified interface for soil pH, moisture,
temperature, electrical conductivity, nitrogen, phosphorus, and potassium
readings through the GenericSensor contract.
"""

import json
import os
import time

from drivers.JXBS3001.JXBS3001 import JXBS3001
from drivers.NIUBOL.NIUBOL import NIUBOL
from drivers.SEN0600.SEN0600 import SEN0600
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger

# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)


[docs] class OzSoil(GenericSensor): """Wrapper for soil sensors (pH, moisture, temperature, EC, NPK). Manages initialisation, periodic reading, and value aggregation for Modbus-based soil sensors connected via UART. Attributes: jxbs: JXBS3001 Modbus soil sensor driver instance. configuration: List of sensor config dicts from the Gateway. v_soil: Nested list holding accumulated readings per sensor/parameter. """ jxbs = None
[docs] def __init__(self) -> None: """Initialise the OzSoil wrapper with default state.""" super().__init__() self.configuration = {} self.v_soil = []
[docs] def initialize(self, config: dict, init_value: dict) -> bool: """Initialise all soil sensors listed in the Gateway config. Args: config: List of sensor configuration dicts from the Gateway. init_value: Mutable dict updated with per-sensor init status. Returns: True if at least one sensor initialised successfully. """ self.configuration = config context_logger.debug_with_context("OzSoil", f"Soil Config: {self.configuration}") _success = False for sensor in self.configuration: try: if self.partNumber in sensor: sensor["init"] = self.initializeSensor(sensor) _success = True # send confirmation to main file except Exception as e: sensor["init"] = 0 context_logger.error_with_context("OzSoil", f"Init: {e}") self.putInitvalues(init_value) return _success
[docs] def initializeSensor(self, sensor: dict) -> int: """Initialise a single soil sensor by part number. Creates the appropriate Modbus driver, takes initial readings for each configured parameter, and stores bootstrap values in ``v_soil``. Args: sensor: Single sensor configuration dict containing part number, enable flag, UART settings, and parameter list. Returns: 1 if the sensor was initialised successfully, 0 otherwise. """ value = [] _success = False if sensor[self.partNumber] == 212 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] value = [] port = "/dev/ttyAMA2" baud = 9600 debug = False if "gpio" in sensor: if "port" in sensor["gpio"]: port = sensor["gpio"]["port"] if "baud" in sensor["gpio"]: baud = sensor["gpio"]["baud"] if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False context_logger.debug_with_context("OzSoil JXBS3001", f"Port: {port}, Baud: {baud}") self.jxbs = JXBS3001() self.jxbs.initialize(soil_port=port, baud=baud) self.jxbs.DEBUG = debug for param in parameters: val = self.getSoil(sensor[self.partNumber], param[self.parameter]) if val is None: val = 0 oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = 1 if sensor[self.partNumber] == 111 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] value = [] port = "/dev/ttyAMA2" baud = 9600 debug = False if "gpio" in sensor: if "port" in sensor["gpio"]: port = sensor["gpio"]["port"] if "baud" in sensor["gpio"]: baud = sensor["gpio"]["baud"] if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False context_logger.debug_with_context("OzSoil SEN0600", f"Port: {port}, Baud: {baud}") self.sen0600 = SEN0600() self.sen0600.initialize(port=port, baud=baud) self.sen0600.DEBUG = debug for param in parameters: val = self.getSoil(sensor[self.partNumber], param[self.parameter]) if val is None: val = 0 oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = 1 if sensor[self.partNumber] == 112 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] value = [] port = "/dev/ttyAMA2" baud = 9600 debug = False if "gpio" in sensor: if "port" in sensor["gpio"]: port = sensor["gpio"]["port"] if "baud" in sensor["gpio"]: baud = sensor["gpio"]["baud"] if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False context_logger.debug_with_context("OzSoil NIUBOL", f"Port: {port}, Baud: {baud}") self.niubol_8in1 = NIUBOL() self.niubol_8in1.initialize(soil_port=port, baud=baud) self.niubol_8in1.DEBUG = debug for param in parameters: val = self.getSoil(sensor[self.partNumber], param[self.parameter]) if val is None: val = 0 oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = 1 if sensor[self.partNumber] == 113 and sensor["en"] == self.baseConfig["oz_enable"]: # TODO: Add gpio configuration parameters = sensor["parameters"] value = [] port = "/dev/ttyAMA2" baud = 9600 debug = False if "gpio" in sensor: if "port" in sensor["gpio"]: port = sensor["gpio"]["port"] if "baud" in sensor["gpio"]: baud = sensor["gpio"]["baud"] if "debug" in sensor["gpio"]: debug = True if sensor["gpio"]["debug"] == 1 else False context_logger.debug_with_context("OzSoil NIUBOL", f"Port: {port}, Baud: {baud}") self.niubol_3in1 = NIUBOL() self.niubol_3in1.initialize(soil_port=port, baud=baud) self.niubol_3in1.DEBUG = debug for param in parameters: val = self.getSoil(sensor[self.partNumber], param[self.parameter]) if val is None: val = 0 oldVal = val _val = {"value": [val], "oldvalue": oldVal, "count": 1} value.append(_val) _success = 1 self.v_soil.append(value) return _success
[docs] def getSoil(self, partNo: int, pm: int): """Read a soil measurement from the correct driver. Parameter mapping for configuration keys: so1 -- pH, so2 -- Soil Moisture (%), so3 -- Soil Temperature (C), so4 -- Electrical Conductivity (uS/cm), so5 -- Nitrogen Content (mg/kg), so6 -- Phosphorus Content (mg/kg), so7 -- Potassium Content (mg/kg). Args: partNo: Hardware part number identifying the sensor model. pm: Parameter identifier (1-8, mapping varies by sensor). Returns: The measured value, or None if unsupported. CONTEXT: KEY USED IN CONFIGURATION FILES: so1-pH so2-Soil Moisture (%) so3-Soil Temperature (°C) so4-Electrical Conductivity (µS/cm) so5-Nitrogen Content (mg/kg) so6-Phosphorus Content (mg/kg) so7-Potassium Content (mg/kg)""" if partNo == 212: if pm == 1: data = self.jxbs.getSoilData(True) val = data["so1"] context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO1] {val}") return val if pm == 2: data = self.jxbs.getSoilData() val = data["so2"] context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO2] {val}") return val if pm == 3: data = self.jxbs.getSoilData() val = data["so3"] context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO3] {val}") return val if pm == 4: data = self.jxbs.getSoilData() val = data["so4"] context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO4] {val}") return val if pm == 5: data = self.jxbs.getSoilData() val = data["so5"] context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO5] {val}") return val if pm == 6: data = self.jxbs.getSoilData() val = data["so6"] context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO6] {val}") return val if partNo == 111: if pm == 1: data = self.sen0600.getSoilData(True) val = data["moisture"] context_logger.debug_with_context("OzSoil SEN0600", f"[SOIL_SO2] {val}") return val elif pm == 2: data = self.sen0600.getSoilData() val = data["temperature"] context_logger.debug_with_context("OzSoil SEN0600", f"[SOIL_SO3] {val}") return val if partNo == 112: if pm == 1: data = self.niubol_8in1.get8in1SoilData(True) val = data["so4"] context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO1] {val}") return val if pm == 2: data = self.niubol_8in1.get8in1SoilData() val = data["so2"] context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO2] {val}") return val if pm == 3: data = self.niubol_8in1.get8in1SoilData() val = data["so1"] context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO3] {val}") return val if pm == 4: data = self.niubol_8in1.get8in1SoilData() val = data["so3"] context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO4] {val}") return val if pm == 5: data = self.niubol_8in1.get8in1SoilData() val = data["so5"] context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO5] {val}") return val if pm == 6: data = self.niubol_8in1.get8in1SoilData() val = data["so6"] context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO6] {val}") if pm == 7: data = self.niubol_8in1.get8in1SoilData() val = data["so7"] context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO7] {val}") if pm == 8: data = self.niubol_8in1.get8in1SoilData() val = data["so8"] context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO8] {val}") return val if partNo == 113: if pm == 2: data = self.niubol_3in1.get3in1SoilData(True) val = data["so2"] context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO2] {val}") return val if pm == 3: data = self.niubol_3in1.get3in1SoilData() val = data["so1"] context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO1] {val}") return val if pm == 4: data = self.niubol_3in1.get3in1SoilData() val = data["so3"] context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO3] {val}") return val return None
[docs] def getSensorReading(self) -> dict: """Read current values from all initialised soil sensors. Applies sensitivity and correction offsets, accumulates values for later averaging, and returns real-time data keyed by short-code. Returns: Dict mapping parameter short-codes to their latest readings. """ data = {} for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): try: value = self.getSoil(sensor[self.partNumber], parameters[self.parameter]) value = round( ((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)), 2, ) self.v_soil[X][Y]["value"].append(value) self.v_soil[X][Y]["count"] += 1 data[parameters["sc"]] = value except Exception as e: context_logger.error_with_context("OzSoil", f"Read: {e}") return data
[docs] def putSensorValue(self, value: dict) -> dict: """Flush accumulated soil readings into the output dict and reset counters. Args: value: Mutable output dict to populate with aggregated readings. Returns: The updated output dict. """ for X, sensor in enumerate(self.configuration): if sensor["init"] == 1: for Y, parameters in enumerate(sensor["parameters"]): value[parameters["sc"]] = self.v_soil[X][Y]["value"] self.v_soil[X][Y]["value"] = [] self.v_soil[X][Y]["count"] = 0 return value
[docs] def putInitvalues(self, value: dict) -> dict: """Record each sensor's init status into the shared init-value dict. Args: value: Mutable dict updated in-place with a ``"soil"`` key. Returns: The updated init-value dict. """ _sensor_init = [] for sensor in self.configuration: _sensor_init.append(sensor["init"]) value.update({"soil": _sensor_init}) return value
# Example code # python3 -m OzWrapper.OzSoil.OzSoil if __name__ == "__main__": import os dirname = os.path.dirname(__file__) file_name = os.path.join(dirname, "soil.config.json") with open(file_name) as soilConfigFile: data = soilConfigFile.read() soilConfig = json.loads(data) context_logger.info_with_context("Soil", f"Soil Config: {soilConfig}") soil = OzSoil() soil.initialize(soilConfig["soil"], {}) for _ in range(0, 4): soil.getSensorReading() time.sleep(2) data = {} context_logger.info_with_context("Soil", "PutSensor value start")