Pipeline API Reference
Complete reference for the Pipeline class, WindowConfig, RuleConfig, PipelineResult, and all public methods with parameters, return types, exceptions, and usage examples.
Published Jun 2, 2026
Overview
The Pipeline class is the central abstraction of the Pyvorin Edge SDK. It defines a directed dataflow from sensors through windows, rules, and policies to events and reports. A pipeline is declarative: you add sources, sensors, windows, rules, and policies, then call run() to execute it over a batch of SensorReading objects.
Dataclasses
WindowConfig
@dataclass
class WindowConfig:
duration_seconds: float
step_seconds: Optional[float] = None
sensor_name: Optional[str] = None
window_type: str = "rolling"
aggregation: Optional[str] = None
| Field | Type | Default | Description |
|---|---|---|---|
duration_seconds | float | required | Length of the window in seconds. |
step_seconds | Optional[float] | None | Step size for sliding windows. If None, defaults to duration_seconds (tumbling behavior). |
sensor_name | Optional[str] | None | If set, the window only accepts readings from this sensor. If None, accepts all sensors. |
window_type | str | "rolling" | Either "rolling" or "tumbling". |
aggregation | Optional[str] | None | Aggregation function name (e.g., "mean", "sum"). Reserved for future use. |
RuleConfig
@dataclass
class RuleConfig:
name: str
condition: Optional[Callable[[Any], bool]] = None
action: Optional[Callable[[Any], None]] = None
condition_expr: Optional[str] = None
severity: str = "info"
cooldown_seconds: float = 0.0
| Field | Type | Default | Description |
|---|---|---|---|
name | str | required | Unique rule identifier. |
condition | Optional[Callable] | None | Python callable that receives the reading context and returns bool. |
action | Optional[Callable] | None | Side-effect callable invoked when the rule fires. |
condition_expr | Optional[str] | None | String expression evaluated in a restricted builtins environment. Mutually exclusive with condition. |
severity | str | "info" | One of "info", "warning", "critical". Raises ValueError on invalid values. |
cooldown_seconds | float | 0.0 | Minimum seconds between successive firings of this rule. |
PipelineResult
@dataclass
class PipelineResult:
events: List[Event] = field(default_factory=list)
evaluations: int = 0
latency_ms: float = 0.0
readings_processed: int = 0
| Field | Type | Description |
|---|---|---|
events | List[Event] | Events emitted during this run. |
evaluations | int | Total rule evaluations performed. |
latency_ms | float | Average milliseconds per reading. |
readings_processed | int | Number of readings iterated over. |
Pipeline Constructor
class Pipeline:
def __init__(self, name: str) -> None:
...
Creates a new pipeline. The name is used in manifests, reports, and logging.
Builder Methods
All builder methods return self, enabling fluent chaining.
add_source(source)
def add_source(self, source: Dict[str, Any]) -> Pipeline
Adds a data source configuration dict. Used primarily for manifest generation.
add_sensor(sensor)
def add_sensor(self, sensor: Sensor) -> Pipeline
Registers a Sensor and adds it to the internal SensorRegistry. Raises ValueError if a sensor with the same name is already registered.
add_window(window)
def add_window(self, window: WindowConfig) -> Pipeline
Adds a window configuration and instantiates the corresponding runtime window object (RollingWindow or TumblingWindow). The window object is appended to self._window_objects.
add_rule(rule)
def add_rule(self, rule: RuleConfig) -> Pipeline
Adds a rule configuration. Rules are evaluated in insertion order during run().
add_policy(policy)
def add_policy(self, policy: Union[CloudPolicy, PrivacyPolicy]) -> Pipeline
Adds a cloud or privacy policy. Policies are validated for conflicts during export_policy().
Execution Methods
run(readings)
def run(
self, readings: Optional[List[SensorReading]] = None
) -> Union[Dict[str, Any], PipelineResult]
| Parameter | Type | Default | Description |
|---|---|---|---|
readings | Optional[List[SensorReading]] | None | If provided, evaluate rules against each reading and return PipelineResult. If None, return a static metadata dict for backwards compatibility. |
When readings is provided, the pipeline:
- Iterates over each reading.
- Updates all matching windows with
window.add_reading(reading). - Evaluates each rule via
_evaluate_rule(), respecting cooldowns. - Measures per-iteration latency with
time.perf_counter().
Returns a PipelineResult with average latency, event list, evaluation count, and processed count.
run_simulation(duration_seconds, interval_seconds, seed)
def run_simulation(
self,
duration_seconds: float = 3600.0,
interval_seconds: float = 60.0,
seed: Optional[int] = None,
) -> Dict[str, Any]
Generates synthetic readings for all registered sensors, evaluates rules and windows, and returns a full report. Useful for testing pipelines without physical hardware.
Export Methods
export_manifest(version)
def export_manifest(self, version: str = "1.0.0") -> Dict[str, Any]
Generates a deployment manifest describing sensors, windows, rules, and policies. Used by the edge runtime to reconstruct the pipeline at startup.
export_policy()
def export_policy(self) -> Dict[str, Any]
Exports all policies and runs cross-validation between CloudPolicy and PrivacyPolicy instances. Returns a dict with cloud_policies, privacy_policies, and validation_errors.
to_dict()
def to_dict(self) -> Dict[str, Any]
Returns a JSON-serializable dict of the entire pipeline configuration.
Internal Evaluation
_evaluate_rule(rule, context, timestamp)
def _evaluate_rule(
self, rule: RuleConfig, context: Any, timestamp: float
) -> Optional[Event]
Evaluates a single rule against a reading context. Checks cooldown, then evaluates rule.condition or rule.condition_expr. If the rule fires, creates an Event, stores it, executes rule.action if present, and returns the event. Otherwise returns None.
Usage Examples
Example 1: Basic Temperature Monitor
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading
pipeline = Pipeline(name="temperature_monitor")
pipeline.add_sensor(Sensor(name="room_1", sensor_type=SensorType.TEMPERATURE, unit="C"))
pipeline.add_rule(RuleConfig(name="too_hot", condition_expr="ctx.value > 26.0"))
readings = [SensorReading(sensor_name="room_1", timestamp=float(i), value=22.0 + i) for i in range(10)]
result = pipeline.run(readings)
print(f"Events: {len(result.events)}, Latency: {result.latency_ms:.4f} ms")
Example 2: Windowed Aggregation
from pyvorin_edge.pipeline import Pipeline, WindowConfig, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading
pipeline = Pipeline(name="hvac")
pipeline.add_sensor(Sensor(name="duct", sensor_type=SensorType.TEMPERATURE, unit="C"))
pipeline.add_window(WindowConfig(duration_seconds=300.0, sensor_name="duct", window_type="rolling"))
pipeline.add_rule(RuleConfig(name="spike", condition_expr="ctx.value > 40.0", cooldown_seconds=60.0))
readings = [SensorReading(sensor_name="duct", timestamp=float(i), value=38.0) for i in range(1000)]
result = pipeline.run(readings)
print(result.to_dict())
Example 3: Multiple Sensors and Severity Levels
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading
pipeline = Pipeline(name="factory_floor")
for name in ["motor_a", "motor_b", "motor_c"]:
pipeline.add_sensor(Sensor(name=name, sensor_type=SensorType.VIBRATION, unit="g"))
pipeline.add_rule(RuleConfig(
name=f"{name}_warning",
condition_expr="ctx.value > 2.0",
severity="warning",
cooldown_seconds=30.0,
))
pipeline.add_rule(RuleConfig(
name=f"{name}_critical",
condition_expr="ctx.value > 5.0",
severity="critical",
cooldown_seconds=10.0,
))
readings = [
SensorReading(sensor_name="motor_a", timestamp=1.0, value=6.0),
SensorReading(sensor_name="motor_b", timestamp=1.0, value=1.5),
SensorReading(sensor_name="motor_c", timestamp=1.0, value=3.0),
]
result = pipeline.run(readings)
for ev in result.events:
print(f"{ev.severity}: {ev.rule_name}")
Example 4: Simulation Mode
from pyvorin_edge.pipeline import Pipeline, RuleConfig, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType
pipeline = Pipeline(name="sim_demo")
pipeline.add_sensor(Sensor(name="tank", sensor_type=SensorType.PRESSURE, unit="bar"))
pipeline.add_window(WindowConfig(duration_seconds=60.0, sensor_name="tank"))
pipeline.add_rule(RuleConfig(name="overpressure", condition_expr="ctx.value > 4.5"))
report = pipeline.run_simulation(duration_seconds=3600.0, interval_seconds=10.0, seed=42)
print(f"Generated {report['readings_generated']} readings")
print(f"Events: {len(report['events'])}")
Example 5: Compiled Rule Conditions
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.compiler_bridge import CompilerBridge
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading
pipeline = Pipeline(name="compiled_demo")
pipeline.add_sensor(Sensor(name="solar_panel", sensor_type=SensorType.CURRENT, unit="A"))
pipeline.add_rule(RuleConfig(name="overcurrent", condition_expr="ctx.value > 12.0"))
# Compile all rule conditions to native .so files
bridge = CompilerBridge()
compiled = bridge.compile_pipeline(pipeline, output_dir="/tmp/compiled_rules/")
print(f"Compiled modules: {list(compiled.keys())}")
# Run the interpreted pipeline (runtime can swap in compiled modules later)
readings = [SensorReading(sensor_name="solar_panel", timestamp=1.0, value=15.0)]
result = pipeline.run(readings)
print(f"Events: {len(result.events)}")