The Pipeline Class — Ingest, Window, Evaluate, Act
Complete guide to the Pipeline class, its execution loop, latency tracking, and the PipelineResult API.
Published Jun 2, 2026
Pipeline Architecture Overview
The Pipeline class is the central orchestrator of the Pyvorin Edge SDK. Every data journey follows the same five stages:
- Ingest —
SensorReadingobjects arrive from adapters (simulator, MQTT, HTTP, file replay, or custom). - Window — Readings are appended to rolling or tumbling windows for temporal aggregation.
- Rule Evaluate — Each configured rule inspects the reading (or window state) and decides whether to fire.
- Action — If a rule fires and has an attached action, the action callable is executed.
- Output — Events, health reports, and manifests are emitted for local storage or cloud sync.
This design keeps the hot path simple: a single Python loop over readings with O(1) window updates and O(R) rule evaluations, where R is the number of rules. There are no hidden threads inside Pipeline.run()—latency is deterministic and easy to profile.
Constructing a Pipeline
A pipeline is instantiated with a human-readable name and then configured via fluent add_* methods that return self. This allows fully declarative setup in a single chained block.
from pyvorin_edge.pipeline import Pipeline, WindowConfig, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType
pipeline = Pipeline(name="hvac_monitoring")
# Register sensors
pipeline.add_sensor(Sensor(
name="zone_a_temp",
sensor_type=SensorType.TEMPERATURE,
unit="°C",
sampling_interval_seconds=60.0,
normal_range=(18.0, 24.0),
alert_threshold=28.0,
location="Zone-A",
))
# Add a 5-minute rolling window
pipeline.add_window(WindowConfig(
duration_seconds=300.0,
window_type="rolling",
sensor_name="zone_a_temp",
aggregation="mean",
))
# Add a threshold rule
pipeline.add_rule(RuleConfig(
name="overheating",
condition=lambda r: r.value > 30.0,
severity="critical",
cooldown_seconds=300.0,
))
The Execution Loop
When you call pipeline.run(readings), the engine performs the following work for every reading:
# Pseudocode equivalent of Pipeline.run()
for reading in readings:
start = time.perf_counter()
# 1. Window update
for w in pipeline._window_objects:
if w.sensor_name is None or w.sensor_name == reading.sensor_name:
w.add_reading(reading)
# 2. Rule evaluation
for rule in pipeline.rules:
event = pipeline._evaluate_rule(rule, reading, reading.timestamp)
if event is not None:
events.append(event)
total_latency_ms += (time.perf_counter() - start) * 1000.0
Windows are updated before rules so that rule conditions can optionally inspect window state via closure or shared state. Rule evaluation is serial by design—there is no implicit parallelisation that could reorder events.
PipelineResult
The return value of run(readings) is a PipelineResult dataclass containing everything you need to decide what to do next: log, forward to cloud, or trigger a local alert.
| Field | Type | Description |
|---|---|---|
events | List[Event] | Every event fired during the run. |
evaluations | int | Total number of rule evaluations executed. |
latency_ms | float | Average per-reading latency in milliseconds. |
readings_processed | int | Count of readings consumed. |
from pyvorin_edge.pipeline import Pipeline, PipelineResult
from pyvorin_edge.sensors import SensorReading
# Assuming pipeline is already configured
readings = [
SensorReading("zone_a_temp", 1717152000.0, 22.0, "°C"),
SensorReading("zone_a_temp", 1717152060.0, 31.5, "°C"),
SensorReading("zone_a_temp", 1717152120.0, 32.0, "°C"),
]
result = pipeline.run(readings)
assert isinstance(result, PipelineResult)
print(f"Processed {result.readings_processed} readings in {result.latency_ms:.3f} ms avg")
print(f"Events: {len(result.events)}")
print(result.to_dict())
Full Class API
Below is the complete public API surface of Pipeline. Every method returns self where noted, enabling method chaining.
__init__(name: str)- Create a new pipeline. The
nameis used in manifests, reports, and logging. add_source(source: dict) → Pipeline- Attach a data-source configuration dict. This is primarily for documentation and manifest export; the runtime does not automatically start adapters.
add_sensor(sensor: Sensor) → Pipeline- Register a sensor. Adds it to the internal
SensorRegistryfor validation and simulation. add_window(window: WindowConfig) → Pipeline- Append a window configuration. Instantiates either a
RollingWindoworTumblingWindowinternally. add_rule(rule: RuleConfig) → Pipeline- Append a rule. Rules are evaluated in insertion order on every reading.
add_policy(policy: CloudPolicy | PrivacyPolicy) → Pipeline- Attach a policy. Policies are exported in manifests and validated for conflicts.
run(readings: list[SensorReading] | None = None) → dict | PipelineResult- If
readingsis supplied, evaluate the pipeline and return aPipelineResult. IfNone, return a static metadata dict for backwards compatibility. export_manifest(version: str = "1.0.0") → dict- Generate a deployment manifest dict containing all sensors, windows, rules, and policies. Useful for CI/CD promotion and edge fleet management.
export_policy() → dict- Export cloud and privacy policies, running cross-validation to detect conflicts (e.g., a privacy rule redacting a field that the cloud policy requires).
run_simulation(duration_seconds: float = 3600.0, interval_seconds: float = 60.0, seed: int | None = None) → dict- Generate synthetic readings based on registered sensor normal ranges, evaluate windows and rules, and return a full simulation report. Zero hardware required.
to_dict() → dict- Serialize the entire pipeline configuration to a dictionary.
Latency Tracking
Per-reading latency is measured with time.perf_counter() inside the hot loop. The reported latency_ms in PipelineResult is the arithmetic mean across all readings in the batch. This metric is intentionally conservative—it includes window updates, rule evaluations, and action execution, but excludes adapter I/O.
import statistics
# Run the pipeline many times and collect latency samples
latencies = []
for _ in range(100):
result = pipeline.run(batch_of_50_readings)
latencies.append(result.latency_ms)
print(f"p50: {statistics.median(latencies):.3f} ms")
print(f"p99: {sorted(latencies)[int(len(latencies)*0.99)]:.3f} ms")
Event Loop Integration
Pipeline.run() is a synchronous, blocking call. In an asyncio-based edge runtime you should wrap it with loop.run_in_executor or run it inside a dedicated thread. The pipeline itself holds no locks during rule evaluation, so it is safe to call from a single producer thread.
import asyncio
async def run_pipeline_async(pipeline, readings):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, pipeline.run, readings)
# Inside an async adapter
result = await run_pipeline_async(pipeline, incoming_batch)
End-to-End Example
Below is a complete, copy-pasteable script that builds a pipeline, simulates readings, runs the engine, and prints the result.
import time
from pyvorin_edge.pipeline import Pipeline, WindowConfig, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading
pipeline = Pipeline(name="server_room")
pipeline.add_sensor(Sensor(
name="rack_inlet_temp",
sensor_type=SensorType.TEMPERATURE,
unit="°C",
normal_range=(20.0, 26.0),
alert_threshold=28.0,
location="Rack-42",
))
pipeline.add_window(WindowConfig(
duration_seconds=120.0,
window_type="rolling",
sensor_name="rack_inlet_temp",
))
pipeline.add_rule(RuleConfig(
name="thermal_runaway",
condition=lambda r: r.value > 30.0,
severity="critical",
cooldown_seconds=60.0,
))
readings = [
SensorReading("rack_inlet_temp", time.time() + i * 10, 22.0 + i, "°C")
for i in range(12)
]
result = pipeline.run(readings)
print(result.to_dict())