edge 16 min read

Pipeline API Reference

Complete reference for the Pipeline class, WindowConfig, RuleConfig, PipelineResult, and all public methods with parameters, return types, exceptions, and usage examples.

Published Jun 2, 2026

Overview

The Pipeline class is the central abstraction of the Pyvorin Edge SDK. It defines a directed dataflow from sensors through windows, rules, and policies to events and reports. A pipeline is declarative: you add sources, sensors, windows, rules, and policies, then call run() to execute it over a batch of SensorReading objects.

Dataclasses

WindowConfig


@dataclass
class WindowConfig:
    duration_seconds: float
    step_seconds: Optional[float] = None
    sensor_name: Optional[str] = None
    window_type: str = "rolling"
    aggregation: Optional[str] = None
  
FieldTypeDefaultDescription
duration_secondsfloatrequiredLength of the window in seconds.
step_secondsOptional[float]NoneStep size for sliding windows. If None, defaults to duration_seconds (tumbling behavior).
sensor_nameOptional[str]NoneIf set, the window only accepts readings from this sensor. If None, accepts all sensors.
window_typestr"rolling"Either "rolling" or "tumbling".
aggregationOptional[str]NoneAggregation function name (e.g., "mean", "sum"). Reserved for future use.

RuleConfig


@dataclass
class RuleConfig:
    name: str
    condition: Optional[Callable[[Any], bool]] = None
    action: Optional[Callable[[Any], None]] = None
    condition_expr: Optional[str] = None
    severity: str = "info"
    cooldown_seconds: float = 0.0
  
FieldTypeDefaultDescription
namestrrequiredUnique rule identifier.
conditionOptional[Callable]NonePython callable that receives the reading context and returns bool.
actionOptional[Callable]NoneSide-effect callable invoked when the rule fires.
condition_exprOptional[str]NoneString expression evaluated in a restricted builtins environment. Mutually exclusive with condition.
severitystr"info"One of "info", "warning", "critical". Raises ValueError on invalid values.
cooldown_secondsfloat0.0Minimum seconds between successive firings of this rule.

PipelineResult


@dataclass
class PipelineResult:
    events: List[Event] = field(default_factory=list)
    evaluations: int = 0
    latency_ms: float = 0.0
    readings_processed: int = 0
  
FieldTypeDescription
eventsList[Event]Events emitted during this run.
evaluationsintTotal rule evaluations performed.
latency_msfloatAverage milliseconds per reading.
readings_processedintNumber of readings iterated over.

Pipeline Constructor


class Pipeline:
    def __init__(self, name: str) -> None:
        ...
  

Creates a new pipeline. The name is used in manifests, reports, and logging.

Builder Methods

All builder methods return self, enabling fluent chaining.

add_source(source)


def add_source(self, source: Dict[str, Any]) -> Pipeline
  

Adds a data source configuration dict. Used primarily for manifest generation.

add_sensor(sensor)


def add_sensor(self, sensor: Sensor) -> Pipeline
  

Registers a Sensor and adds it to the internal SensorRegistry. Raises ValueError if a sensor with the same name is already registered.

add_window(window)


def add_window(self, window: WindowConfig) -> Pipeline
  

Adds a window configuration and instantiates the corresponding runtime window object (RollingWindow or TumblingWindow). The window object is appended to self._window_objects.

add_rule(rule)


def add_rule(self, rule: RuleConfig) -> Pipeline
  

Adds a rule configuration. Rules are evaluated in insertion order during run().

add_policy(policy)


def add_policy(self, policy: Union[CloudPolicy, PrivacyPolicy]) -> Pipeline
  

Adds a cloud or privacy policy. Policies are validated for conflicts during export_policy().

Execution Methods

run(readings)


def run(
    self, readings: Optional[List[SensorReading]] = None
) -> Union[Dict[str, Any], PipelineResult]
  
ParameterTypeDefaultDescription
readingsOptional[List[SensorReading]]NoneIf provided, evaluate rules against each reading and return PipelineResult. If None, return a static metadata dict for backwards compatibility.

When readings is provided, the pipeline:

  1. Iterates over each reading.
  2. Updates all matching windows with window.add_reading(reading).
  3. Evaluates each rule via _evaluate_rule(), respecting cooldowns.
  4. Measures per-iteration latency with time.perf_counter().

Returns a PipelineResult with average latency, event list, evaluation count, and processed count.

run_simulation(duration_seconds, interval_seconds, seed)


def run_simulation(
    self,
    duration_seconds: float = 3600.0,
    interval_seconds: float = 60.0,
    seed: Optional[int] = None,
) -> Dict[str, Any]
  

Generates synthetic readings for all registered sensors, evaluates rules and windows, and returns a full report. Useful for testing pipelines without physical hardware.

Export Methods

export_manifest(version)


def export_manifest(self, version: str = "1.0.0") -> Dict[str, Any]
  

Generates a deployment manifest describing sensors, windows, rules, and policies. Used by the edge runtime to reconstruct the pipeline at startup.

export_policy()


def export_policy(self) -> Dict[str, Any]
  

Exports all policies and runs cross-validation between CloudPolicy and PrivacyPolicy instances. Returns a dict with cloud_policies, privacy_policies, and validation_errors.

to_dict()


def to_dict(self) -> Dict[str, Any]
  

Returns a JSON-serializable dict of the entire pipeline configuration.

Internal Evaluation

_evaluate_rule(rule, context, timestamp)


def _evaluate_rule(
    self, rule: RuleConfig, context: Any, timestamp: float
) -> Optional[Event]
  

Evaluates a single rule against a reading context. Checks cooldown, then evaluates rule.condition or rule.condition_expr. If the rule fires, creates an Event, stores it, executes rule.action if present, and returns the event. Otherwise returns None.

Usage Examples

Example 1: Basic Temperature Monitor


from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading

pipeline = Pipeline(name="temperature_monitor")
pipeline.add_sensor(Sensor(name="room_1", sensor_type=SensorType.TEMPERATURE, unit="C"))
pipeline.add_rule(RuleConfig(name="too_hot", condition_expr="ctx.value > 26.0"))

readings = [SensorReading(sensor_name="room_1", timestamp=float(i), value=22.0 + i) for i in range(10)]
result = pipeline.run(readings)
print(f"Events: {len(result.events)}, Latency: {result.latency_ms:.4f} ms")
  

Example 2: Windowed Aggregation


from pyvorin_edge.pipeline import Pipeline, WindowConfig, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading

pipeline = Pipeline(name="hvac")
pipeline.add_sensor(Sensor(name="duct", sensor_type=SensorType.TEMPERATURE, unit="C"))
pipeline.add_window(WindowConfig(duration_seconds=300.0, sensor_name="duct", window_type="rolling"))
pipeline.add_rule(RuleConfig(name="spike", condition_expr="ctx.value > 40.0", cooldown_seconds=60.0))

readings = [SensorReading(sensor_name="duct", timestamp=float(i), value=38.0) for i in range(1000)]
result = pipeline.run(readings)
print(result.to_dict())
  

Example 3: Multiple Sensors and Severity Levels


from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading

pipeline = Pipeline(name="factory_floor")
for name in ["motor_a", "motor_b", "motor_c"]:
    pipeline.add_sensor(Sensor(name=name, sensor_type=SensorType.VIBRATION, unit="g"))
    pipeline.add_rule(RuleConfig(
        name=f"{name}_warning",
        condition_expr="ctx.value > 2.0",
        severity="warning",
        cooldown_seconds=30.0,
    ))
    pipeline.add_rule(RuleConfig(
        name=f"{name}_critical",
        condition_expr="ctx.value > 5.0",
        severity="critical",
        cooldown_seconds=10.0,
    ))

readings = [
    SensorReading(sensor_name="motor_a", timestamp=1.0, value=6.0),
    SensorReading(sensor_name="motor_b", timestamp=1.0, value=1.5),
    SensorReading(sensor_name="motor_c", timestamp=1.0, value=3.0),
]
result = pipeline.run(readings)
for ev in result.events:
    print(f"{ev.severity}: {ev.rule_name}")
  

Example 4: Simulation Mode


from pyvorin_edge.pipeline import Pipeline, RuleConfig, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType

pipeline = Pipeline(name="sim_demo")
pipeline.add_sensor(Sensor(name="tank", sensor_type=SensorType.PRESSURE, unit="bar"))
pipeline.add_window(WindowConfig(duration_seconds=60.0, sensor_name="tank"))
pipeline.add_rule(RuleConfig(name="overpressure", condition_expr="ctx.value > 4.5"))

report = pipeline.run_simulation(duration_seconds=3600.0, interval_seconds=10.0, seed=42)
print(f"Generated {report['readings_generated']} readings")
print(f"Events: {len(report['events'])}")
  

Example 5: Compiled Rule Conditions


from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.compiler_bridge import CompilerBridge
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading

pipeline = Pipeline(name="compiled_demo")
pipeline.add_sensor(Sensor(name="solar_panel", sensor_type=SensorType.CURRENT, unit="A"))
pipeline.add_rule(RuleConfig(name="overcurrent", condition_expr="ctx.value > 12.0"))

# Compile all rule conditions to native .so files
bridge = CompilerBridge()
compiled = bridge.compile_pipeline(pipeline, output_dir="/tmp/compiled_rules/")
print(f"Compiled modules: {list(compiled.keys())}")

# Run the interpreted pipeline (runtime can swap in compiled modules later)
readings = [SensorReading(sensor_name="solar_panel", timestamp=1.0, value=15.0)]
result = pipeline.run(readings)
print(f"Events: {len(result.events)}")