Custom Adapters — Extending the Ingest Layer
Implement the adapter plugin interface, build a custom Modbus adapter, register it with the runtime, and write unit tests.
Published Jun 2, 2026
The Adapter Contract
The Pyvorin Edge SDK does not enforce adapters through an abstract base class. Instead, it follows a duck-typing convention: any object that produces SensorReading dicts (or SensorReading dataclass instances) can feed a pipeline. Despite this flexibility, a de-facto interface has emerged across the built-in adapters. Following this interface makes your custom adapter predictable, testable, and composable.
The Informal Base Interface
Every production-grade adapter should implement at least the following surface:
| Method | Signature | Purpose |
|---|---|---|
read() | () → Iterator[dict] | Blocking or non-blocking generator that yields reading dicts. |
generate_reading() | (...) → dict | Produce a single reading on demand. |
generate_batch() | (...) → list[list[dict]] | Produce a time-series batch for simulation or backfill. |
In addition, most adapters expose lifecycle methods such as connect() / disconnect() or start_server() / stop_server(). Choose the names that match your transport semantics.
Project Structure for a Custom Adapter
my_edge_project/
├── adapters/
│ ├── __init__.py
│ └── modbus_adapter.py
├── pipeline_config.py
└── tests/
└── test_modbus_adapter.py
Implementing a Modbus Adapter
Below is a complete, drop-in ModbusTCPAdapter that polls holding registers from a Modbus TCP server (such as a PLC or an industrial gateway). It uses the popular pymodbus library and follows the adapter contract.
pip install pymodbus
# adapters/modbus_adapter.py
from __future__ import annotations
import logging
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterator, List, Optional
logger = logging.getLogger(__name__)
try:
from pymodbus.client import ModbusTcpClient
_MODBUS_AVAILABLE = True
except ImportError:
_MODBUS_AVAILABLE = False
ModbusTcpClient = None # type: ignore[assignment,misc]
@dataclass
class ModbusRegisterMap:
"""Maps a Modbus register address to a sensor name, scale, and unit."""
address: int
sensor_name: str
unit: str
scale: float = 1.0
offset: float = 0.0
register_count: int = 1
class ModbusTCPAdapter:
"""Poll Modbus TCP holding registers and emit SensorReading dicts."""
def __init__(
self,
host: str,
port: int = 502,
slave_id: int = 1,
registers: Optional[List[ModbusRegisterMap]] = None,
) -> None:
if not _MODBUS_AVAILABLE:
raise ImportError("pymodbus is required. Install it with: pip install pymodbus")
self.host = host
self.port = port
self.slave_id = slave_id
self.registers = registers or []
self._client: Any = None
def connect(self) -> None:
self._client = ModbusTcpClient(host=self.host, port=self.port)
if not self._client.connect():
raise ConnectionError(f"Failed to connect to Modbus server at {self.host}:{self.port}")
logger.info("Modbus connected to %s:%s", self.host, self.port)
def disconnect(self) -> None:
if self._client is not None:
self._client.close()
self._client = None
logger.info("Modbus disconnected")
def generate_reading(self, register_map: ModbusRegisterMap) -> Dict[str, Any]:
"""Read a single register map and return a SensorReading dict."""
if self._client is None:
raise RuntimeError("Adapter is not connected. Call connect() first.")
response = self._client.read_holding_registers(
address=register_map.address,
count=register_map.register_count,
slave=self.slave_id,
)
if response.isError():
raise RuntimeError(f"Modbus error reading address {register_map.address}: {response}")
raw = response.registers[0] if response.registers else 0
value = raw * register_map.scale + register_map.offset
return {
"sensor_name": register_map.sensor_name,
"timestamp": time.time(),
"value": float(value),
"unit": register_map.unit,
"metadata": {
"modbus_address": register_map.address,
"raw_value": raw,
"slave_id": self.slave_id,
},
}
def read(self) -> Iterator[Dict[str, Any]]:
"""Yield one reading per configured register map."""
for reg in self.registers:
yield self.generate_reading(reg)
def generate_batch(
self,
duration_seconds: float,
interval_seconds: float,
) -> List[List[Dict[str, Any]]]:
"""Poll repeatedly for a fixed duration and return frames."""
if interval_seconds <= 0:
raise ValueError("interval_seconds must be positive")
num_frames = int(duration_seconds / interval_seconds)
result: List[List[Dict[str, Any]]] = []
for _ in range(num_frames):
frame = list(self.read())
result.append(frame)
time.sleep(interval_seconds)
return result
Registration with the Runtime
Custom adapters are not magically discovered. Instantiate them in your main entry point and wire their output into the pipeline. The pattern below shows how to compose a Modbus adapter with a pipeline inside a long-running polling loop.
# main.py
import time
from pyvorin_edge.pipeline import Pipeline, RuleConfig, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType
from adapters.modbus_adapter import ModbusTCPAdapter, ModbusRegisterMap
def main():
adapter = ModbusTCPAdapter(
host="192.168.1.50",
port=502,
slave_id=1,
registers=[
ModbusRegisterMap(address=0, sensor_name="tank_level", unit="m", scale=0.01),
ModbusRegisterMap(address=1, sensor_name="pump_current", unit="A", scale=0.001),
ModbusRegisterMap(address=2, sensor_name="inlet_temp", unit="°C", scale=0.1, offset=-50.0),
],
)
adapter.connect()
pipeline = Pipeline("plc_monitor")
pipeline.add_sensor(Sensor("tank_level", SensorType.PRESSURE, "m"))
pipeline.add_sensor(Sensor("pump_current", SensorType.CURRENT, "A"))
pipeline.add_sensor(Sensor("inlet_temp", SensorType.TEMPERATURE, "°C"))
pipeline.add_window(WindowConfig(duration_seconds=300.0, window_type="rolling"))
pipeline.add_rule(RuleConfig(
name="low_level",
condition=lambda r: r["sensor_name"] == "tank_level" and r["value"] < 1.5,
severity="critical",
cooldown_seconds=300.0,
))
try:
while True:
for reading in adapter.read():
result = pipeline.run([reading])
for event in result.events:
print(f"EVENT: {event.to_dict()}")
time.sleep(5.0)
finally:
adapter.disconnect()
if __name__ == "__main__":
main()
Configuration-Driven Instantiation
For fleet deployments, avoid hard-coding IP addresses. Load adapter parameters from the same TOML/JSON config that drives the rest of the edge runtime.
# edge_config.toml
[modbus]
host = "192.168.1.50"
port = 502
slave_id = 1
[[modbus.registers]]
address = 0
sensor_name = "tank_level"
unit = "m"
scale = 0.01
[[modbus.registers]]
address = 1
sensor_name = "pump_current"
unit = "A"
scale = 0.001
from pyv_edge_agent.config import Config
cfg = Config.from_file("edge_config.toml")
modbus_cfg = cfg.get("modbus")
adapter = ModbusTCPAdapter(
host=modbus_cfg["host"],
port=modbus_cfg.get("port", 502),
slave_id=modbus_cfg.get("slave_id", 1),
registers=[ModbusRegisterMap(**r) for r in modbus_cfg.get("registers", [])],
)
Testing Your Adapter
Never test against real hardware in CI. Use a mock Modbus server, or monkey-patch the client. The example below uses pytest and unittest.mock to verify adapter behaviour without network access.
# tests/test_modbus_adapter.py
import pytest
from unittest.mock import MagicMock, patch
from adapters.modbus_adapter import ModbusTCPAdapter, ModbusRegisterMap
@pytest.fixture
def adapter():
return ModbusTCPAdapter(
host="127.0.0.1",
registers=[
ModbusRegisterMap(address=0, sensor_name="temp", unit="°C", scale=0.1),
],
)
def test_generate_reading(adapter):
mock_response = MagicMock()
mock_response.isError.return_value = False
mock_response.registers = [215]
adapter._client = MagicMock()
adapter._client.read_holding_registers.return_value = mock_response
reading = adapter.generate_reading(adapter.registers[0])
assert reading["sensor_name"] == "temp"
assert reading["value"] == pytest.approx(21.5)
assert reading["unit"] == "°C"
assert reading["metadata"]["raw_value"] == 215
def test_read_yields_all_registers(adapter):
adapter._client = MagicMock()
adapter._client.read_holding_registers.return_value = MagicMock(
isError=lambda: False,
registers=[100],
)
readings = list(adapter.read())
assert len(readings) == 1
assert readings[0]["sensor_name"] == "temp"
def test_modbus_error_raises(adapter):
mock_response = MagicMock()
mock_response.isError.return_value = True
adapter._client = MagicMock()
adapter._client.read_holding_registers.return_value = mock_response
with pytest.raises(RuntimeError, match="Modbus error"):
adapter.generate_reading(adapter.registers[0])
Async Adapters
If your transport library is asyncio-native (e.g., aiomodbus, asyncio-mqtt), implement async def read() and run it inside an asyncio event loop. Feed readings into the synchronous pipeline via loop.run_in_executor or a thread-safe queue.
import asyncio
from queue import Queue
_reading_queue: Queue = Queue()
async def async_modbus_poller(adapter):
while True:
for reading in adapter.read():
_reading_queue.put(reading)
await asyncio.sleep(1.0)
def pipeline_consumer(pipeline):
while True:
reading = _reading_queue.get()
pipeline.run([reading])
_reading_queue.task_done()
Adapter Checklist
- Does the adapter yield normalised
SensorReadingdicts with at leastsensor_name,timestamp,value, andunit? - Are connection errors raised early (in
connect()) rather than silently swallowed? - Is there a matching
disconnect()or cleanup method? - Does
generate_batch()respectduration_secondsandinterval_secondscontracts? - Are all network calls covered by unit tests using mocks?
- Is the adapter configurable from a TOML/JSON file without code changes?
Summary
Custom adapters are the gateway between the Pyvorin pipeline engine and the messy reality of industrial protocols, proprietary APIs, and legacy SCADA systems. By following the informal adapter contract—connect(), read(), generate_reading(), generate_batch(), disconnect()—you create reusable, testable components that slot cleanly into any edge deployment.