"""Soil sensor wrapper.
Abstracts over multiple soil sensor drivers -- JXBS3001, SEN0600, and NIUBOL
(8-in-1 and 3-in-1) -- providing a unified interface for soil pH, moisture,
temperature, electrical conductivity, nitrogen, phosphorus, and potassium
readings through the GenericSensor contract.
"""
import json
import os
import time
from drivers.JXBS3001.JXBS3001 import JXBS3001
from drivers.NIUBOL.NIUBOL import NIUBOL
from drivers.SEN0600.SEN0600 import SEN0600
from SensorBase.SensorBase import GenericSensor
from utils.oizom_logger import OizomLogger
# -----------------------------------------------------------------------------
# Configure logging
# -----------------------------------------------------------------------------
basic_logger = OizomLogger(__name__).get()
context_logger = OizomLogger(__name__)
[docs]
class OzSoil(GenericSensor):
"""Wrapper for soil sensors (pH, moisture, temperature, EC, NPK).
Manages initialisation, periodic reading, and value aggregation for
Modbus-based soil sensors connected via UART.
Attributes:
jxbs: JXBS3001 Modbus soil sensor driver instance.
configuration: List of sensor config dicts from the Gateway.
v_soil: Nested list holding accumulated readings per sensor/parameter.
"""
jxbs = None
[docs]
def __init__(self) -> None:
"""Initialise the OzSoil wrapper with default state."""
super().__init__()
self.configuration = {}
self.v_soil = []
[docs]
def initialize(self, config: dict, init_value: dict) -> bool:
"""Initialise all soil sensors listed in the Gateway config.
Args:
config: List of sensor configuration dicts from the Gateway.
init_value: Mutable dict updated with per-sensor init status.
Returns:
True if at least one sensor initialised successfully.
"""
self.configuration = config
context_logger.debug_with_context("OzSoil", f"Soil Config: {self.configuration}")
_success = False
for sensor in self.configuration:
try:
if self.partNumber in sensor:
sensor["init"] = self.initializeSensor(sensor)
_success = True # send confirmation to main file
except Exception as e:
sensor["init"] = 0
context_logger.error_with_context("OzSoil", f"Init: {e}")
self.putInitvalues(init_value)
return _success
[docs]
def initializeSensor(self, sensor: dict) -> int:
"""Initialise a single soil sensor by part number.
Creates the appropriate Modbus driver, takes initial readings for
each configured parameter, and stores bootstrap values in ``v_soil``.
Args:
sensor: Single sensor configuration dict containing part number,
enable flag, UART settings, and parameter list.
Returns:
1 if the sensor was initialised successfully, 0 otherwise.
"""
value = []
_success = False
if sensor[self.partNumber] == 212 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
value = []
port = "/dev/ttyAMA2"
baud = 9600
debug = False
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = sensor["gpio"]["port"]
if "baud" in sensor["gpio"]:
baud = sensor["gpio"]["baud"]
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
context_logger.debug_with_context("OzSoil JXBS3001", f"Port: {port}, Baud: {baud}")
self.jxbs = JXBS3001()
self.jxbs.initialize(soil_port=port, baud=baud)
self.jxbs.DEBUG = debug
for param in parameters:
val = self.getSoil(sensor[self.partNumber], param[self.parameter])
if val is None:
val = 0
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = 1
if sensor[self.partNumber] == 111 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
value = []
port = "/dev/ttyAMA2"
baud = 9600
debug = False
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = sensor["gpio"]["port"]
if "baud" in sensor["gpio"]:
baud = sensor["gpio"]["baud"]
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
context_logger.debug_with_context("OzSoil SEN0600", f"Port: {port}, Baud: {baud}")
self.sen0600 = SEN0600()
self.sen0600.initialize(port=port, baud=baud)
self.sen0600.DEBUG = debug
for param in parameters:
val = self.getSoil(sensor[self.partNumber], param[self.parameter])
if val is None:
val = 0
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = 1
if sensor[self.partNumber] == 112 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
value = []
port = "/dev/ttyAMA2"
baud = 9600
debug = False
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = sensor["gpio"]["port"]
if "baud" in sensor["gpio"]:
baud = sensor["gpio"]["baud"]
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
context_logger.debug_with_context("OzSoil NIUBOL", f"Port: {port}, Baud: {baud}")
self.niubol_8in1 = NIUBOL()
self.niubol_8in1.initialize(soil_port=port, baud=baud)
self.niubol_8in1.DEBUG = debug
for param in parameters:
val = self.getSoil(sensor[self.partNumber], param[self.parameter])
if val is None:
val = 0
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = 1
if sensor[self.partNumber] == 113 and sensor["en"] == self.baseConfig["oz_enable"]:
# TODO: Add gpio configuration
parameters = sensor["parameters"]
value = []
port = "/dev/ttyAMA2"
baud = 9600
debug = False
if "gpio" in sensor:
if "port" in sensor["gpio"]:
port = sensor["gpio"]["port"]
if "baud" in sensor["gpio"]:
baud = sensor["gpio"]["baud"]
if "debug" in sensor["gpio"]:
debug = True if sensor["gpio"]["debug"] == 1 else False
context_logger.debug_with_context("OzSoil NIUBOL", f"Port: {port}, Baud: {baud}")
self.niubol_3in1 = NIUBOL()
self.niubol_3in1.initialize(soil_port=port, baud=baud)
self.niubol_3in1.DEBUG = debug
for param in parameters:
val = self.getSoil(sensor[self.partNumber], param[self.parameter])
if val is None:
val = 0
oldVal = val
_val = {"value": [val], "oldvalue": oldVal, "count": 1}
value.append(_val)
_success = 1
self.v_soil.append(value)
return _success
[docs]
def getSoil(self, partNo: int, pm: int):
"""Read a soil measurement from the correct driver.
Parameter mapping for configuration keys:
so1 -- pH,
so2 -- Soil Moisture (%),
so3 -- Soil Temperature (C),
so4 -- Electrical Conductivity (uS/cm),
so5 -- Nitrogen Content (mg/kg),
so6 -- Phosphorus Content (mg/kg),
so7 -- Potassium Content (mg/kg).
Args:
partNo: Hardware part number identifying the sensor model.
pm: Parameter identifier (1-8, mapping varies by sensor).
Returns:
The measured value, or None if unsupported.
CONTEXT: KEY USED IN CONFIGURATION FILES:
so1-pH
so2-Soil Moisture (%)
so3-Soil Temperature (°C)
so4-Electrical Conductivity (µS/cm)
so5-Nitrogen Content (mg/kg)
so6-Phosphorus Content (mg/kg)
so7-Potassium Content (mg/kg)"""
if partNo == 212:
if pm == 1:
data = self.jxbs.getSoilData(True)
val = data["so1"]
context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO1] {val}")
return val
if pm == 2:
data = self.jxbs.getSoilData()
val = data["so2"]
context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO2] {val}")
return val
if pm == 3:
data = self.jxbs.getSoilData()
val = data["so3"]
context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO3] {val}")
return val
if pm == 4:
data = self.jxbs.getSoilData()
val = data["so4"]
context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO4] {val}")
return val
if pm == 5:
data = self.jxbs.getSoilData()
val = data["so5"]
context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO5] {val}")
return val
if pm == 6:
data = self.jxbs.getSoilData()
val = data["so6"]
context_logger.debug_with_context("OzSoil JXBS3001", f"[SOIL_SO6] {val}")
return val
if partNo == 111:
if pm == 1:
data = self.sen0600.getSoilData(True)
val = data["moisture"]
context_logger.debug_with_context("OzSoil SEN0600", f"[SOIL_SO2] {val}")
return val
elif pm == 2:
data = self.sen0600.getSoilData()
val = data["temperature"]
context_logger.debug_with_context("OzSoil SEN0600", f"[SOIL_SO3] {val}")
return val
if partNo == 112:
if pm == 1:
data = self.niubol_8in1.get8in1SoilData(True)
val = data["so4"]
context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO1] {val}")
return val
if pm == 2:
data = self.niubol_8in1.get8in1SoilData()
val = data["so2"]
context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO2] {val}")
return val
if pm == 3:
data = self.niubol_8in1.get8in1SoilData()
val = data["so1"]
context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO3] {val}")
return val
if pm == 4:
data = self.niubol_8in1.get8in1SoilData()
val = data["so3"]
context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO4] {val}")
return val
if pm == 5:
data = self.niubol_8in1.get8in1SoilData()
val = data["so5"]
context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO5] {val}")
return val
if pm == 6:
data = self.niubol_8in1.get8in1SoilData()
val = data["so6"]
context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO6] {val}")
if pm == 7:
data = self.niubol_8in1.get8in1SoilData()
val = data["so7"]
context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO7] {val}")
if pm == 8:
data = self.niubol_8in1.get8in1SoilData()
val = data["so8"]
context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO8] {val}")
return val
if partNo == 113:
if pm == 2:
data = self.niubol_3in1.get3in1SoilData(True)
val = data["so2"]
context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO2] {val}")
return val
if pm == 3:
data = self.niubol_3in1.get3in1SoilData()
val = data["so1"]
context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO1] {val}")
return val
if pm == 4:
data = self.niubol_3in1.get3in1SoilData()
val = data["so3"]
context_logger.debug_with_context("OzSoil NIUBOL", f"[SOIL_SO3] {val}")
return val
return None
[docs]
def getSensorReading(self) -> dict:
"""Read current values from all initialised soil sensors.
Applies sensitivity and correction offsets, accumulates values for
later averaging, and returns real-time data keyed by short-code.
Returns:
Dict mapping parameter short-codes to their latest readings.
"""
data = {}
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
for Y, parameters in enumerate(sensor["parameters"]):
try:
value = self.getSoil(sensor[self.partNumber], parameters[self.parameter])
value = round(
((value * (parameters["se"] / 100.0)) + (parameters["cr"] / 10.0)),
2,
)
self.v_soil[X][Y]["value"].append(value)
self.v_soil[X][Y]["count"] += 1
data[parameters["sc"]] = value
except Exception as e:
context_logger.error_with_context("OzSoil", f"Read: {e}")
return data
[docs]
def putSensorValue(self, value: dict) -> dict:
"""Flush accumulated soil readings into the output dict and reset counters.
Args:
value: Mutable output dict to populate with aggregated readings.
Returns:
The updated output dict.
"""
for X, sensor in enumerate(self.configuration):
if sensor["init"] == 1:
for Y, parameters in enumerate(sensor["parameters"]):
value[parameters["sc"]] = self.v_soil[X][Y]["value"]
self.v_soil[X][Y]["value"] = []
self.v_soil[X][Y]["count"] = 0
return value
[docs]
def putInitvalues(self, value: dict) -> dict:
"""Record each sensor's init status into the shared init-value dict.
Args:
value: Mutable dict updated in-place with a ``"soil"`` key.
Returns:
The updated init-value dict.
"""
_sensor_init = []
for sensor in self.configuration:
_sensor_init.append(sensor["init"])
value.update({"soil": _sensor_init})
return value
# Example code
# python3 -m OzWrapper.OzSoil.OzSoil
if __name__ == "__main__":
import os
dirname = os.path.dirname(__file__)
file_name = os.path.join(dirname, "soil.config.json")
with open(file_name) as soilConfigFile:
data = soilConfigFile.read()
soilConfig = json.loads(data)
context_logger.info_with_context("Soil", f"Soil Config: {soilConfig}")
soil = OzSoil()
soil.initialize(soilConfig["soil"], {})
for _ in range(0, 4):
soil.getSensorReading()
time.sleep(2)
data = {}
context_logger.info_with_context("Soil", "PutSensor value start")