Pyvorin Edge Plugin Architecture
Build discoverable plugins with lifecycle hooks, register custom adapters, window functions, and action handlers, and configure everything from config.toml.
Published Jun 2, 2026
Introduction
Pyvorin Edge is designed to be extended without modifying core SDK code. The plugin system uses Python entry points to discover packages that expose adapters, window functions, and action handlers. This article walks through the full lifecycle: discovery, loading, registration, and runtime configuration.
Plugin Discovery
At startup the edge agent scans the pyvorin_edge.plugins entry-point group. Any package that declares a group member is imported and its factory callable is invoked to obtain a plugin instance.
import importlib.metadata
def discover_plugins():
plugins = {}
for ep in importlib.metadata.entry_points(group="pyvorin_edge.plugins"):
try:
factory = ep.load()
plugins[ep.name] = factory()
except Exception as exc:
print(f"Failed to load plugin {ep.name}: {exc}")
return plugins
Declaring an Entry Point (pyproject.toml)
[project.entry-points."pyvorin_edge.plugins"]
weather = "my_plugins.weather:WeatherPlugin"
The Plugin Base Class
A well-behaved plugin implements lifecycle hooks so the runtime can start and stop resources cleanly. Below is the canonical base class pattern used throughout the SDK ecosystem.
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class EdgePlugin(ABC):
"""Base class for Pyvorin Edge plugins."""
name: str = "unnamed"
version: str = "0.0.0"
def on_load(self, config: Dict[str, Any]) -> None:
"""Called once when the plugin is discovered and instantiated."""
def on_unload(self) -> None:
"""Called during graceful shutdown. Release sockets, files, threads."""
def on_pipeline_start(self, pipeline_name: str) -> None:
"""Called before a pipeline begins executing."""
def on_pipeline_stop(self, pipeline_name: str) -> None:
"""Called after a pipeline finishes or is cancelled."""
@abstractmethod
def register(self, registry: "PluginRegistry") -> None:
"""Register adapters, window functions, and action handlers."""
Complete Plugin Example: WeatherAPIAdapter
The following plugin fetches live weather data from OpenWeatherMap and exposes it as a sensor source. It also registers a custom action handler that logs alerts when temperature crosses a threshold.
# my_plugins/weather.py
from __future__ import annotations
import os
import time
from typing import Any, Dict, List, Optional
import requests
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorReading, SensorType
class WeatherAPIAdapter:
"""Pull-based adapter that queries OpenWeatherMap."""
def __init__(self, api_key: str, city: str, interval_seconds: float = 300.0):
self.api_key = api_key
self.city = city
self.interval_seconds = interval_seconds
self._last_fetch = 0.0
self._cache: Optional[Dict[str, Any]] = None
def fetch(self) -> List[SensorReading]:
now = time.time()
if self._cache is not None and (now - self._last_fetch) < self.interval_seconds:
return self._cache
url = (
"https://api.openweathermap.org/data/2.5/weather"
f"?q={self.city}&appid={self.api_key}&units=metric"
)
resp = requests.get(url, timeout=30)
resp.raise_for_status()
data = resp.json()
ts = now
readings = [
SensorReading(
sensor_name="weather_temp",
timestamp=ts,
value=data["main"]["temp"],
unit="celsius",
metadata={"city": self.city, "humidity": data["main"]["humidity"]},
),
SensorReading(
sensor_name="weather_wind",
timestamp=ts,
value=data["wind"]["speed"],
unit="m/s",
metadata={"city": self.city},
),
]
self._cache = readings
self._last_fetch = now
return readings
def log_alert_action(ctx: Any) -> None:
"""Action handler registered by the plugin."""
print(f"ALERT: condition triggered on {ctx.sensor_name} = {ctx.value}")
class WeatherPlugin:
"""Pyvorin Edge plugin for weather data ingestion and alerting."""
name = "weather"
version = "1.0.0"
def __init__(self) -> None:
self.adapter: Optional[WeatherAPIAdapter] = None
def on_load(self, config: Dict[str, Any]) -> None:
api_key = config.get("api_key") or os.environ.get("OPENWEATHER_API_KEY")
if not api_key:
raise RuntimeError("OpenWeatherMap API key is required")
self.adapter = WeatherAPIAdapter(
api_key=api_key,
city=config.get("city", "London"),
interval_seconds=config.get("interval_seconds", 300.0),
)
def on_unload(self) -> None:
self.adapter = None
def on_pipeline_start(self, pipeline_name: str) -> None:
print(f"[{self.name}] Pipeline {pipeline_name} started")
def on_pipeline_stop(self, pipeline_name: str) -> None:
print(f"[{self.name}] Pipeline {pipeline_name} stopped")
def register(self, registry: Any) -> None:
registry.add_adapter("weather", self.adapter)
registry.add_action_handler("log_alert", log_alert_action)
def WeatherPlugin_factory() -> WeatherPlugin:
return WeatherPlugin()
Registering Components
The runtime provides a PluginRegistry that collects contributions from every loaded plugin. The registry is then consulted when a pipeline is built from config.toml.
class PluginRegistry:
def __init__(self):
self.adapters: Dict[str, Any] = {}
self.window_funcs: Dict[str, Any] = {}
self.action_handlers: Dict[str, Any] = {}
def add_adapter(self, name: str, adapter: Any) -> None:
self.adapters[name] = adapter
def add_window_function(self, name: str, func: Any) -> None:
self.window_funcs[name] = func
def add_action_handler(self, name: str, handler: Any) -> None:
self.action_handlers[name] = handler
Plugin Configuration in config.toml
Runtime behavior is controlled from a single TOML file. The [plugins] section maps plugin names to their configuration dictionaries.
[pipeline]
name = "weather_monitor"
[[pipeline.sources]]
name = "outdoor"
adapter = "weather"
sensors = ["weather_temp", "weather_wind"]
[[pipeline.rules]]
name = "high_temp_alert"
condition = "ctx.value > 35.0"
severity = "critical"
action = "log_alert"
cooldown_seconds = 600.0
[plugins.weather]
api_key = "${OPENWEATHER_API_KEY}"
city = "Austin"
interval_seconds = 300.0
Bootstrapping Everything
The following snippet shows how the agent ties discovery, registry, and pipeline execution together.
#!/usr/bin/env python3
from pathlib import Path
import tomllib
from pyvorin_edge.pipeline import Pipeline, RuleConfig, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType
def main() -> None:
with open("config.toml", "rb") as fh:
cfg = tomllib.load(fh)
# 1. Discover plugins
plugins = discover_plugins()
registry = PluginRegistry()
# 2. Load and register
for name, plugin in plugins.items():
plugin_config = cfg.get("plugins", {}).get(name, {})
plugin.on_load(plugin_config)
plugin.register(registry)
# 3. Build pipeline from config
pipeline = Pipeline(cfg["pipeline"]["name"])
for src in cfg["pipeline"].get("sources", []):
adapter = registry.adapters[src["adapter"]]
pipeline.add_source({"name": src["name"], "adapter": adapter})
for s in src["sensors"]:
pipeline.add_sensor(
Sensor(name=s, sensor_type=SensorType.GENERIC, unit="")
)
for rule in cfg["pipeline"].get("rules", []):
action = registry.action_handlers.get(rule.get("action"))
pipeline.add_rule(
RuleConfig(
name=rule["name"],
condition_expr=rule["condition"],
severity=rule["severity"],
action=action,
cooldown_seconds=rule.get("cooldown_seconds", 0.0),
)
)
# 4. Run
for plugin in plugins.values():
plugin.on_pipeline_start(pipeline.name)
readings = []
for src in cfg["pipeline"].get("sources", []):
adapter = registry.adapters[src["adapter"]]
readings.extend(adapter.fetch())
result = pipeline.run(readings)
print(result.to_dict())
for plugin in plugins.values():
plugin.on_pipeline_stop(pipeline.name)
plugin.on_unload()
if __name__ == "__main__":
main()
Summary
Plugins are the standard way to extend Pyvorin Edge. By implementing on_load, on_unload, and register, you can inject custom adapters, window functions, and action handlers without ever touching the SDK core. Configuration lives in config.toml and secrets are injected via environment variables.