Advanced Pipeline Patterns in Pyvorin Edge
Design multi-stage pipelines, implement fan-out/fan-in, build custom window functions like EWMA and percentile, and tune backpressure for production edge workloads.
Published Jun 2, 2026
Introduction
Real-world edge deployments rarely map to a single linear pipeline. You need multi-stage flows, fan-out to independent rules, custom aggregations beyond mean and max, and backpressure policies that prevent memory exhaustion when sensors burst.
This article demonstrates advanced patterns using the Pyvorin Edge SDK. Every example is complete and copy-paste ready.
Multi-Stage Pipelines
A production pipeline typically decomposes into discrete stages: ingest → filter → aggregate → evaluate → act → output. In Pyvorin Edge you model this by chaining Pipeline instances or by building a single pipeline with tightly scoped rules.
Chained Pipeline Instances
from pyvorin_edge.pipeline import Pipeline, RuleConfig, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorReading, SensorType
from pyvorin_edge.windows import TumblingWindow
# Stage 1: Ingest + Filter
stage_ingest = Pipeline("ingest_filter")
stage_ingest.add_sensor(Sensor("temp", SensorType.TEMPERATURE, "celsius"))
stage_ingest.add_rule(
RuleConfig(
name="drop_negative",
condition_expr="ctx.value >= 0.0",
severity="info",
)
)
# Stage 2: Aggregate
stage_aggregate = Pipeline("aggregate")
stage_aggregate.add_window(
WindowConfig(
duration_seconds=60.0,
window_type="tumbling",
sensor_name="temp",
aggregation="mean",
)
)
# Stage 3: Evaluate + Act
stage_evaluate = Pipeline("evaluate")
stage_evaluate.add_rule(
RuleConfig(
name="overheating",
condition_expr="ctx.value > 80.0",
severity="critical",
)
)
def run_multi_stage(readings: list[SensorReading]):
# Filter
filtered = []
for r in readings:
res = stage_ingest.run([r])
if res.events: # rule fired = value passed filter
filtered.append(r)
# Aggregate
agg_result = stage_aggregate.run(filtered)
window_results = [w for w in stage_aggregate._window_objects if len(w) > 0]
# Evaluate on latest window
if window_results:
latest = window_results[-1].compute()
# synthesize a reading from the aggregate
eval_reading = SensorReading(
sensor_name="temp",
timestamp=time.time(),
value=latest.mean or 0.0,
unit="celsius",
)
return stage_evaluate.run([eval_reading])
return None
Fan-Out / Fan-In Patterns
One sensor reading often triggers multiple independent rules. Pipeline.run() already evaluates every rule against each reading, but you can also split work across specialized sub-pipelines for isolation and independent tuning.
import time
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import SensorReading
def fan_out(reading: SensorReading):
"""Send one reading to three independent evaluation pipelines."""
safety = Pipeline("safety")
safety.add_rule(
RuleConfig(
name="fire_risk",
condition_expr="ctx.value > 100.0",
severity="critical",
)
)
efficiency = Pipeline("efficiency")
efficiency.add_rule(
RuleConfig(
name="idle_zone",
condition_expr="ctx.value < 18.0",
severity="info",
)
)
maintenance = Pipeline("maintenance")
maintenance.add_rule(
RuleConfig(
name="thermal_drift",
condition_expr="abs(ctx.value - 22.0) > 5.0",
severity="warning",
)
)
return {
"safety": safety.run([reading]),
"efficiency": efficiency.run([reading]),
"maintenance": maintenance.run([reading]),
}
# Usage
reading = SensorReading(
sensor_name="temp",
timestamp=time.time(),
value=105.0,
unit="celsius",
)
results = fan_out(reading)
for domain, res in results.items():
print(domain, [e.rule_name for e in res.events])
Custom Window Functions
The built-in aggregates in RollingWindow and TumblingWindow cover mean, max, min, std, and sum. For EWMA or percentile you pass custom_aggregation, a dictionary of callables.
Exponentially Weighted Moving Average (EWMA)
from typing import List
from pyvorin_edge.windows import RollingWindow
def ewma(values: List[float], alpha: float = 0.3) -> float:
if not values:
return 0.0
result = values[0]
for v in values[1:]:
result = alpha * v + (1 - alpha) * result
return result
window = RollingWindow(
duration_seconds=300.0,
sensor_name="cpu_load",
custom_aggregation={"ewma": ewma},
)
Percentile Aggregation
import statistics
from typing import List
def p95(values: List[float]) -> float:
if not values:
return 0.0
sorted_vals = sorted(values)
idx = int(len(sorted_vals) * 0.95)
return sorted_vals[min(idx, len(sorted_vals) - 1)]
def p99(values: List[float]) -> float:
if not values:
return 0.0
sorted_vals = sorted(values)
idx = int(len(sorted_vals) * 0.99)
return sorted_vals[min(idx, len(sorted_vals) - 1)]
window = RollingWindow(
duration_seconds=60.0,
sensor_name="latency_ms",
custom_aggregation={"p95": p95, "p99": p99},
)
Access custom results through the custom field of WindowResult:
result = window.compute()
print(result.custom.get("p95")) # 42.1
print(result.custom.get("p99")) # 78.5
Backpressure Tuning
Edge devices have finite RAM. If a burst of sensor readings arrives faster than the pipeline can process them, you need a backpressure policy.
Queue Depth Limits and Drop Policies
from collections import deque
from dataclasses import dataclass
from typing import Callable, Deque, List
from pyvorin_edge.sensors import SensorReading
@dataclass
class BackpressureConfig:
max_depth: int = 1000
drop_policy: str = "tail" # "tail" | "head" | "reject"
class BackpressureQueue:
"""Bounded queue with configurable overflow behavior."""
def __init__(self, config: BackpressureConfig, processor: Callable[[List[SensorReading]], None]):
self._q: Deque[SensorReading] = deque()
self._cfg = config
self._processor = processor
def enqueue(self, reading: SensorReading) -> bool:
if len(self._q) >= self._cfg.max_depth:
if self._cfg.drop_policy == "reject":
return False
elif self._cfg.drop_policy == "head":
self._q.popleft()
elif self._cfg.drop_policy == "tail":
return False # silently drop the newest
self._q.append(reading)
return True
def drain(self, batch_size: int = 100) -> None:
batch: List[SensorReading] = []
while self._q and len(batch) < batch_size:
batch.append(self._q.popleft())
if batch:
self._processor(batch)
# Example processor
def process_batch(batch: List[SensorReading]) -> None:
print(f"Processing batch of {len(batch)} readings")
queue = BackpressureQueue(
BackpressureConfig(max_depth=500, drop_policy="head"),
process_batch,
)
Pipeline Composition
You can treat a Pipeline as a callable building block and compose higher-order flows. Because add_* methods return self, you can also chain declaratively.
from pyvorin_edge.pipeline import Pipeline, RuleConfig, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType
def build_temperature_pipeline(name: str, alert_threshold: float) -> Pipeline:
return (
Pipeline(name)
.add_sensor(Sensor("temp", SensorType.TEMPERATURE, "celsius"))
.add_window(
WindowConfig(
duration_seconds=60.0,
window_type="rolling",
sensor_name="temp",
aggregation="mean",
)
)
.add_rule(
RuleConfig(
name="temp_alert",
condition_expr=f"ctx.value > {alert_threshold}",
severity="critical",
)
)
)
# Compose two pipelines into a fleet monitor
fleet = Pipeline("fleet_monitor")
fleet.add_sensor(Sensor("temp", SensorType.TEMPERATURE, "celsius"))
zone_a = build_temperature_pipeline("zone_a", alert_threshold=75.0)
zone_b = build_temperature_pipeline("zone_b", alert_threshold=65.0)
# Run both against the same readings and merge events
readings = [...] # SensorReading list
events_a = zone_a.run(readings).events
events_b = zone_b.run(readings).events
all_events = events_a + events_b
print(f"Total events: {len(all_events)}")
Summary
Advanced pipelines in Pyvorin Edge are built from simple primitives: Pipeline, RuleConfig, WindowConfig, and the _BaseWindow custom-aggregation hooks. By chaining stages, fanning out to isolated rule sets, injecting custom statistics, and bounding queues, you can handle production edge workloads without over-engineering.