edge 15 min read

Custom Adapters — Extending the Ingest Layer

Implement the adapter plugin interface, build a custom Modbus adapter, register it with the runtime, and write unit tests.

Published Jun 2, 2026

The Adapter Contract

The Pyvorin Edge SDK does not enforce adapters through an abstract base class. Instead, it follows a duck-typing convention: any object that produces SensorReading dicts (or SensorReading dataclass instances) can feed a pipeline. Despite this flexibility, a de-facto interface has emerged across the built-in adapters. Following this interface makes your custom adapter predictable, testable, and composable.

The Informal Base Interface

Every production-grade adapter should implement at least the following surface:

MethodSignaturePurpose
read()() → Iterator[dict]Blocking or non-blocking generator that yields reading dicts.
generate_reading()(...) → dictProduce a single reading on demand.
generate_batch()(...) → list[list[dict]]Produce a time-series batch for simulation or backfill.

In addition, most adapters expose lifecycle methods such as connect() / disconnect() or start_server() / stop_server(). Choose the names that match your transport semantics.

Project Structure for a Custom Adapter


my_edge_project/
├── adapters/
│   ├── __init__.py
│   └── modbus_adapter.py
├── pipeline_config.py
└── tests/
    └── test_modbus_adapter.py
  

Implementing a Modbus Adapter

Below is a complete, drop-in ModbusTCPAdapter that polls holding registers from a Modbus TCP server (such as a PLC or an industrial gateway). It uses the popular pymodbus library and follows the adapter contract.


pip install pymodbus
  

# adapters/modbus_adapter.py
from __future__ import annotations

import logging
import time
from dataclasses import dataclass
from typing import Any, Dict, Iterator, List, Optional

logger = logging.getLogger(__name__)

try:
    from pymodbus.client import ModbusTcpClient
    _MODBUS_AVAILABLE = True
except ImportError:
    _MODBUS_AVAILABLE = False
    ModbusTcpClient = None  # type: ignore[assignment,misc]


@dataclass
class ModbusRegisterMap:
    """Maps a Modbus register address to a sensor name, scale, and unit."""
    address: int
    sensor_name: str
    unit: str
    scale: float = 1.0
    offset: float = 0.0
    register_count: int = 1


class ModbusTCPAdapter:
    """Poll Modbus TCP holding registers and emit SensorReading dicts."""

    def __init__(
        self,
        host: str,
        port: int = 502,
        slave_id: int = 1,
        registers: Optional[List[ModbusRegisterMap]] = None,
    ) -> None:
        if not _MODBUS_AVAILABLE:
            raise ImportError("pymodbus is required. Install it with: pip install pymodbus")
        self.host = host
        self.port = port
        self.slave_id = slave_id
        self.registers = registers or []
        self._client: Any = None

    def connect(self) -> None:
        self._client = ModbusTcpClient(host=self.host, port=self.port)
        if not self._client.connect():
            raise ConnectionError(f"Failed to connect to Modbus server at {self.host}:{self.port}")
        logger.info("Modbus connected to %s:%s", self.host, self.port)

    def disconnect(self) -> None:
        if self._client is not None:
            self._client.close()
            self._client = None
            logger.info("Modbus disconnected")

    def generate_reading(self, register_map: ModbusRegisterMap) -> Dict[str, Any]:
        """Read a single register map and return a SensorReading dict."""
        if self._client is None:
            raise RuntimeError("Adapter is not connected. Call connect() first.")

        response = self._client.read_holding_registers(
            address=register_map.address,
            count=register_map.register_count,
            slave=self.slave_id,
        )
        if response.isError():
            raise RuntimeError(f"Modbus error reading address {register_map.address}: {response}")

        raw = response.registers[0] if response.registers else 0
        value = raw * register_map.scale + register_map.offset

        return {
            "sensor_name": register_map.sensor_name,
            "timestamp": time.time(),
            "value": float(value),
            "unit": register_map.unit,
            "metadata": {
                "modbus_address": register_map.address,
                "raw_value": raw,
                "slave_id": self.slave_id,
            },
        }

    def read(self) -> Iterator[Dict[str, Any]]:
        """Yield one reading per configured register map."""
        for reg in self.registers:
            yield self.generate_reading(reg)

    def generate_batch(
        self,
        duration_seconds: float,
        interval_seconds: float,
    ) -> List[List[Dict[str, Any]]]:
        """Poll repeatedly for a fixed duration and return frames."""
        if interval_seconds <= 0:
            raise ValueError("interval_seconds must be positive")
        num_frames = int(duration_seconds / interval_seconds)
        result: List[List[Dict[str, Any]]] = []
        for _ in range(num_frames):
            frame = list(self.read())
            result.append(frame)
            time.sleep(interval_seconds)
        return result
  

Registration with the Runtime

Custom adapters are not magically discovered. Instantiate them in your main entry point and wire their output into the pipeline. The pattern below shows how to compose a Modbus adapter with a pipeline inside a long-running polling loop.


# main.py
import time
from pyvorin_edge.pipeline import Pipeline, RuleConfig, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType
from adapters.modbus_adapter import ModbusTCPAdapter, ModbusRegisterMap

def main():
    adapter = ModbusTCPAdapter(
        host="192.168.1.50",
        port=502,
        slave_id=1,
        registers=[
            ModbusRegisterMap(address=0,   sensor_name="tank_level",  unit="m",   scale=0.01),
            ModbusRegisterMap(address=1,   sensor_name="pump_current", unit="A",   scale=0.001),
            ModbusRegisterMap(address=2,   sensor_name="inlet_temp",   unit="°C", scale=0.1, offset=-50.0),
        ],
    )
    adapter.connect()

    pipeline = Pipeline("plc_monitor")
    pipeline.add_sensor(Sensor("tank_level", SensorType.PRESSURE, "m"))
    pipeline.add_sensor(Sensor("pump_current", SensorType.CURRENT, "A"))
    pipeline.add_sensor(Sensor("inlet_temp", SensorType.TEMPERATURE, "°C"))
    pipeline.add_window(WindowConfig(duration_seconds=300.0, window_type="rolling"))
    pipeline.add_rule(RuleConfig(
        name="low_level",
        condition=lambda r: r["sensor_name"] == "tank_level" and r["value"] < 1.5,
        severity="critical",
        cooldown_seconds=300.0,
    ))

    try:
        while True:
            for reading in adapter.read():
                result = pipeline.run([reading])
                for event in result.events:
                    print(f"EVENT: {event.to_dict()}")
            time.sleep(5.0)
    finally:
        adapter.disconnect()

if __name__ == "__main__":
    main()
  

Configuration-Driven Instantiation

For fleet deployments, avoid hard-coding IP addresses. Load adapter parameters from the same TOML/JSON config that drives the rest of the edge runtime.


# edge_config.toml
[modbus]
host = "192.168.1.50"
port = 502
slave_id = 1

[[modbus.registers]]
address = 0
sensor_name = "tank_level"
unit = "m"
scale = 0.01

[[modbus.registers]]
address = 1
sensor_name = "pump_current"
unit = "A"
scale = 0.001
  

from pyv_edge_agent.config import Config

cfg = Config.from_file("edge_config.toml")
modbus_cfg = cfg.get("modbus")
adapter = ModbusTCPAdapter(
    host=modbus_cfg["host"],
    port=modbus_cfg.get("port", 502),
    slave_id=modbus_cfg.get("slave_id", 1),
    registers=[ModbusRegisterMap(**r) for r in modbus_cfg.get("registers", [])],
)
  

Testing Your Adapter

Never test against real hardware in CI. Use a mock Modbus server, or monkey-patch the client. The example below uses pytest and unittest.mock to verify adapter behaviour without network access.


# tests/test_modbus_adapter.py
import pytest
from unittest.mock import MagicMock, patch
from adapters.modbus_adapter import ModbusTCPAdapter, ModbusRegisterMap


@pytest.fixture
def adapter():
    return ModbusTCPAdapter(
        host="127.0.0.1",
        registers=[
            ModbusRegisterMap(address=0, sensor_name="temp", unit="°C", scale=0.1),
        ],
    )


def test_generate_reading(adapter):
    mock_response = MagicMock()
    mock_response.isError.return_value = False
    mock_response.registers = [215]

    adapter._client = MagicMock()
    adapter._client.read_holding_registers.return_value = mock_response

    reading = adapter.generate_reading(adapter.registers[0])
    assert reading["sensor_name"] == "temp"
    assert reading["value"] == pytest.approx(21.5)
    assert reading["unit"] == "°C"
    assert reading["metadata"]["raw_value"] == 215


def test_read_yields_all_registers(adapter):
    adapter._client = MagicMock()
    adapter._client.read_holding_registers.return_value = MagicMock(
        isError=lambda: False,
        registers=[100],
    )

    readings = list(adapter.read())
    assert len(readings) == 1
    assert readings[0]["sensor_name"] == "temp"


def test_modbus_error_raises(adapter):
    mock_response = MagicMock()
    mock_response.isError.return_value = True
    adapter._client = MagicMock()
    adapter._client.read_holding_registers.return_value = mock_response

    with pytest.raises(RuntimeError, match="Modbus error"):
        adapter.generate_reading(adapter.registers[0])
  

Async Adapters

If your transport library is asyncio-native (e.g., aiomodbus, asyncio-mqtt), implement async def read() and run it inside an asyncio event loop. Feed readings into the synchronous pipeline via loop.run_in_executor or a thread-safe queue.


import asyncio
from queue import Queue

_reading_queue: Queue = Queue()

async def async_modbus_poller(adapter):
    while True:
        for reading in adapter.read():
            _reading_queue.put(reading)
        await asyncio.sleep(1.0)

def pipeline_consumer(pipeline):
    while True:
        reading = _reading_queue.get()
        pipeline.run([reading])
        _reading_queue.task_done()
  

Adapter Checklist

  • Does the adapter yield normalised SensorReading dicts with at least sensor_name, timestamp, value, and unit?
  • Are connection errors raised early (in connect()) rather than silently swallowed?
  • Is there a matching disconnect() or cleanup method?
  • Does generate_batch() respect duration_seconds and interval_seconds contracts?
  • Are all network calls covered by unit tests using mocks?
  • Is the adapter configurable from a TOML/JSON file without code changes?

Summary

Custom adapters are the gateway between the Pyvorin pipeline engine and the messy reality of industrial protocols, proprietary APIs, and legacy SCADA systems. By following the informal adapter contract—connect(), read(), generate_reading(), generate_batch(), disconnect()—you create reusable, testable components that slot cleanly into any edge deployment.