edge Advanced 22 min read

Advanced Pipeline Patterns in Pyvorin Edge

Design multi-stage pipelines, implement fan-out/fan-in, build custom window functions like EWMA and percentile, and tune backpressure for production edge workloads.

Published Jun 2, 2026

Introduction

Real-world edge deployments rarely map to a single linear pipeline. You need multi-stage flows, fan-out to independent rules, custom aggregations beyond mean and max, and backpressure policies that prevent memory exhaustion when sensors burst.

This article demonstrates advanced patterns using the Pyvorin Edge SDK. Every example is complete and copy-paste ready.

Multi-Stage Pipelines

A production pipeline typically decomposes into discrete stages: ingest → filter → aggregate → evaluate → act → output. In Pyvorin Edge you model this by chaining Pipeline instances or by building a single pipeline with tightly scoped rules.

Chained Pipeline Instances

from pyvorin_edge.pipeline import Pipeline, RuleConfig, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorReading, SensorType
from pyvorin_edge.windows import TumblingWindow

# Stage 1: Ingest + Filter
stage_ingest = Pipeline("ingest_filter")
stage_ingest.add_sensor(Sensor("temp", SensorType.TEMPERATURE, "celsius"))
stage_ingest.add_rule(
    RuleConfig(
        name="drop_negative",
        condition_expr="ctx.value >= 0.0",
        severity="info",
    )
)

# Stage 2: Aggregate
stage_aggregate = Pipeline("aggregate")
stage_aggregate.add_window(
    WindowConfig(
        duration_seconds=60.0,
        window_type="tumbling",
        sensor_name="temp",
        aggregation="mean",
    )
)

# Stage 3: Evaluate + Act
stage_evaluate = Pipeline("evaluate")
stage_evaluate.add_rule(
    RuleConfig(
        name="overheating",
        condition_expr="ctx.value > 80.0",
        severity="critical",
    )
)


def run_multi_stage(readings: list[SensorReading]):
    # Filter
    filtered = []
    for r in readings:
        res = stage_ingest.run([r])
        if res.events:  # rule fired = value passed filter
            filtered.append(r)

    # Aggregate
    agg_result = stage_aggregate.run(filtered)
    window_results = [w for w in stage_aggregate._window_objects if len(w) > 0]

    # Evaluate on latest window
    if window_results:
        latest = window_results[-1].compute()
        # synthesize a reading from the aggregate
        eval_reading = SensorReading(
            sensor_name="temp",
            timestamp=time.time(),
            value=latest.mean or 0.0,
            unit="celsius",
        )
        return stage_evaluate.run([eval_reading])
    return None

Fan-Out / Fan-In Patterns

One sensor reading often triggers multiple independent rules. Pipeline.run() already evaluates every rule against each reading, but you can also split work across specialized sub-pipelines for isolation and independent tuning.

import time
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import SensorReading


def fan_out(reading: SensorReading):
    """Send one reading to three independent evaluation pipelines."""
    safety = Pipeline("safety")
    safety.add_rule(
        RuleConfig(
            name="fire_risk",
            condition_expr="ctx.value > 100.0",
            severity="critical",
        )
    )

    efficiency = Pipeline("efficiency")
    efficiency.add_rule(
        RuleConfig(
            name="idle_zone",
            condition_expr="ctx.value < 18.0",
            severity="info",
        )
    )

    maintenance = Pipeline("maintenance")
    maintenance.add_rule(
        RuleConfig(
            name="thermal_drift",
            condition_expr="abs(ctx.value - 22.0) > 5.0",
            severity="warning",
        )
    )

    return {
        "safety": safety.run([reading]),
        "efficiency": efficiency.run([reading]),
        "maintenance": maintenance.run([reading]),
    }


# Usage
reading = SensorReading(
    sensor_name="temp",
    timestamp=time.time(),
    value=105.0,
    unit="celsius",
)
results = fan_out(reading)
for domain, res in results.items():
    print(domain, [e.rule_name for e in res.events])

Custom Window Functions

The built-in aggregates in RollingWindow and TumblingWindow cover mean, max, min, std, and sum. For EWMA or percentile you pass custom_aggregation, a dictionary of callables.

Exponentially Weighted Moving Average (EWMA)

from typing import List
from pyvorin_edge.windows import RollingWindow


def ewma(values: List[float], alpha: float = 0.3) -> float:
    if not values:
        return 0.0
    result = values[0]
    for v in values[1:]:
        result = alpha * v + (1 - alpha) * result
    return result


window = RollingWindow(
    duration_seconds=300.0,
    sensor_name="cpu_load",
    custom_aggregation={"ewma": ewma},
)

Percentile Aggregation

import statistics
from typing import List


def p95(values: List[float]) -> float:
    if not values:
        return 0.0
    sorted_vals = sorted(values)
    idx = int(len(sorted_vals) * 0.95)
    return sorted_vals[min(idx, len(sorted_vals) - 1)]


def p99(values: List[float]) -> float:
    if not values:
        return 0.0
    sorted_vals = sorted(values)
    idx = int(len(sorted_vals) * 0.99)
    return sorted_vals[min(idx, len(sorted_vals) - 1)]


window = RollingWindow(
    duration_seconds=60.0,
    sensor_name="latency_ms",
    custom_aggregation={"p95": p95, "p99": p99},
)

Access custom results through the custom field of WindowResult:

result = window.compute()
print(result.custom.get("p95"))   # 42.1
print(result.custom.get("p99"))   # 78.5

Backpressure Tuning

Edge devices have finite RAM. If a burst of sensor readings arrives faster than the pipeline can process them, you need a backpressure policy.

Queue Depth Limits and Drop Policies

from collections import deque
from dataclasses import dataclass
from typing import Callable, Deque, List

from pyvorin_edge.sensors import SensorReading


@dataclass
class BackpressureConfig:
    max_depth: int = 1000
    drop_policy: str = "tail"   # "tail" | "head" | "reject"


class BackpressureQueue:
    """Bounded queue with configurable overflow behavior."""

    def __init__(self, config: BackpressureConfig, processor: Callable[[List[SensorReading]], None]):
        self._q: Deque[SensorReading] = deque()
        self._cfg = config
        self._processor = processor

    def enqueue(self, reading: SensorReading) -> bool:
        if len(self._q) >= self._cfg.max_depth:
            if self._cfg.drop_policy == "reject":
                return False
            elif self._cfg.drop_policy == "head":
                self._q.popleft()
            elif self._cfg.drop_policy == "tail":
                return False  # silently drop the newest
        self._q.append(reading)
        return True

    def drain(self, batch_size: int = 100) -> None:
        batch: List[SensorReading] = []
        while self._q and len(batch) < batch_size:
            batch.append(self._q.popleft())
        if batch:
            self._processor(batch)


# Example processor
def process_batch(batch: List[SensorReading]) -> None:
    print(f"Processing batch of {len(batch)} readings")


queue = BackpressureQueue(
    BackpressureConfig(max_depth=500, drop_policy="head"),
    process_batch,
)

Pipeline Composition

You can treat a Pipeline as a callable building block and compose higher-order flows. Because add_* methods return self, you can also chain declaratively.

from pyvorin_edge.pipeline import Pipeline, RuleConfig, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType


def build_temperature_pipeline(name: str, alert_threshold: float) -> Pipeline:
    return (
        Pipeline(name)
        .add_sensor(Sensor("temp", SensorType.TEMPERATURE, "celsius"))
        .add_window(
            WindowConfig(
                duration_seconds=60.0,
                window_type="rolling",
                sensor_name="temp",
                aggregation="mean",
            )
        )
        .add_rule(
            RuleConfig(
                name="temp_alert",
                condition_expr=f"ctx.value > {alert_threshold}",
                severity="critical",
            )
        )
    )


# Compose two pipelines into a fleet monitor
fleet = Pipeline("fleet_monitor")
fleet.add_sensor(Sensor("temp", SensorType.TEMPERATURE, "celsius"))

zone_a = build_temperature_pipeline("zone_a", alert_threshold=75.0)
zone_b = build_temperature_pipeline("zone_b", alert_threshold=65.0)

# Run both against the same readings and merge events
readings = [...]  # SensorReading list
events_a = zone_a.run(readings).events
events_b = zone_b.run(readings).events
all_events = events_a + events_b
print(f"Total events: {len(all_events)}")

Summary

Advanced pipelines in Pyvorin Edge are built from simple primitives: Pipeline, RuleConfig, WindowConfig, and the _BaseWindow custom-aggregation hooks. By chaining stages, fanning out to isolated rule sets, injecting custom statistics, and bounding queues, you can handle production edge workloads without over-engineering.