Rules and Conditions — Logic at the Edge
Write lambda conditions, composite AND/OR rules, stateful evaluations, and priority ordering with ten complete production examples.
Published Jun 2, 2026
Rule Anatomy
A rule is defined by a RuleConfig dataclass and evaluated by the pipeline's _evaluate_rule() method. At minimum, a rule needs a name and a condition. It may also carry an action, a severity level, and a cooldown timer to prevent alert spam.
| Field | Type | Description |
|---|---|---|
name | str | Unique rule identifier. |
condition | Callable[[Any], bool] | None | Python callable evaluated against the reading. |
condition_expr | str | None | String expression evaluated in a restricted sandbox. |
action | Callable[[Any], None] | None | Optional side-effect callable. |
severity | str | One of "info", "warning", "critical". |
cooldown_seconds | float | Minimum seconds between successive fires. |
Lambda Conditions
The simplest conditions are Python lambdas that inspect the current SensorReading. The condition receives the reading object itself, so you have full access to sensor_name, timestamp, value, unit, and metadata.
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import SensorReading
pipeline = Pipeline("cold_chain")
pipeline.add_rule(RuleConfig(
name="temp_too_high",
condition=lambda r: r.value > 8.0 and r.unit == "°C",
severity="warning",
cooldown_seconds=300.0,
))
# Evaluate manually
reading = SensorReading("vaccine_fridge", 1717152000.0, 9.5, "°C")
result = pipeline.run([reading])
print(len(result.events)) # 1
String Expressions
If you load rules from a configuration file, you may not want to ship raw Python callables. The SDK supports string expressions via condition_expr. These are evaluated in a restricted environment containing only safe builtins: abs, max, min, sum, len, round, float, int, bool. The reading is bound to the variable ctx.
pipeline.add_rule(RuleConfig(
name="pressure_low",
condition_expr="ctx.value < 1.5 and ctx.unit == 'bar'",
severity="critical",
cooldown_seconds=60.0,
))
Composite Rules — AND / OR
The SDK does not provide a dedicated composite-rule DSL. Instead, compose logic with plain Python inside a single lambda, or chain multiple independent rules. For complex scenarios, write a named function and reference it.
def composite_condition(r: SensorReading) -> bool:
"""Fire if temperature is high AND the calibration flag is missing."""
temp_high = r.value > 35.0 and r.unit == "°C"
uncalibrated = not r.metadata.get("calibrated", False)
return temp_high and uncalibrated
pipeline.add_rule(RuleConfig(
name="uncalibrated_overheat",
condition=composite_condition,
severity="critical",
cooldown_seconds=600.0,
))
Stateful Rules
Because condition callables can close over mutable objects in your own module, you can implement arbitrary state machines. A common pattern is a counter that requires N consecutive violations before firing.
class ConsecutiveThresholdRule:
def __init__(self, threshold: float, count: int):
self.threshold = threshold
self.count = count
self._violations = 0
def __call__(self, r: SensorReading) -> bool:
if r.value > self.threshold:
self._violations += 1
if self._violations >= self.count:
self._violations = 0
return True
else:
self._violations = 0
return False
pipeline.add_rule(RuleConfig(
name="three_strikes_overheat",
condition=ConsecutiveThresholdRule(threshold=40.0, count=3),
severity="critical",
))
Rule Priority and Ordering
Rules are evaluated in the exact order they are added via add_rule(). There is no numeric priority field. If order matters— for example, a "critical shutdown" rule should always be evaluated before a "log info" rule—simply add them in that sequence. The PipelineResult records every event fired, so downstream consumers can filter by severity if needed.
# 1. Emergency stop — evaluated first
pipeline.add_rule(RuleConfig(
name="emergency_stop",
condition=lambda r: r.value > 100.0,
severity="critical",
))
# 2. General warning — evaluated second
pipeline.add_rule(RuleConfig(
name="high_value",
condition=lambda r: r.value > 80.0,
severity="warning",
))
Cooldown Behaviour
The pipeline maintains a dictionary _last_rule_fires keyed by rule name. If a rule fires, its timestamp is recorded. Subsequent evaluations within cooldown_seconds are silently ignored. This prevents MQTT broker flooding or SMS gateway rate-limit breaches.
pipeline.add_rule(RuleConfig(
name="noisy_sensor",
condition=lambda r: r.value > 50.0,
severity="warning",
cooldown_seconds=3600.0, # Once per hour max
))
Ten Complete Rule Examples
1. Temperature Threshold
pipeline.add_rule(RuleConfig(
name="temp_above_30",
condition=lambda r: r.sensor_name.startswith("temp_") and r.value > 30.0,
severity="warning",
cooldown_seconds=300.0,
))
2. Vibration Anomaly
pipeline.add_rule(RuleConfig(
name="vibration_anomaly",
condition=lambda r: r.sensor_name == "motor_vibration" and r.value > 10.0,
severity="critical",
cooldown_seconds=60.0,
))
3. Leak Detection (Boolean)
pipeline.add_rule(RuleConfig(
name="water_leak",
condition=lambda r: r.sensor_name == "basement_leak" and r.value >= 1.0,
severity="critical",
cooldown_seconds=0.0,
))
4. CO2 Ventilation Alert
pipeline.add_rule(RuleConfig(
name="co2_high",
condition=lambda r: r.unit == "ppm" and r.value > 1200,
severity="warning",
cooldown_seconds=600.0,
))
5. Voltage Sag
pipeline.add_rule(RuleConfig(
name="voltage_sag",
condition=lambda r: r.unit == "V" and r.value < 210.0,
severity="critical",
cooldown_seconds=30.0,
))
6. Occupancy After Hours
import time
pipeline.add_rule(RuleConfig(
name="after_hours_occupancy",
condition=lambda r: (
r.sensor_name == "lobby_pir"
and r.value > 0
and not (8 <= time.localtime(r.timestamp).tm_hour < 18)
),
severity="info",
cooldown_seconds=900.0,
))
7. Humidity Mould Risk
pipeline.add_rule(RuleConfig(
name="mould_risk",
condition=lambda r: r.unit == "%RH" and r.value > 75.0,
severity="warning",
cooldown_seconds=1800.0,
))
8. Current Overload
pipeline.add_rule(RuleConfig(
name="current_overload",
condition=lambda r: r.unit == "A" and r.value > 32.0,
severity="critical",
cooldown_seconds=10.0,
))
9. Stale Sensor Check
import time
_last_seen: dict[str, float] = {}
def stale_sensor_rule(r):
_last_seen[r.sensor_name] = r.timestamp
# Only fire if we have a baseline and the gap is huge
return False # Logic handled by a scheduled health check instead
pipeline.add_rule(RuleConfig(
name="stale_sensor_placeholder",
condition=stale_sensor_rule,
severity="info",
))
10. Multi-Field Metadata Check
pipeline.add_rule(RuleConfig(
name="uncertified_probe_overheat",
condition=lambda r: (
r.value > 35.0
and r.unit == "°C"
and r.metadata.get("certification") != "ISO17025"
),
severity="warning",
cooldown_seconds=600.0,
))
Testing Rules in Isolation
You do not need a full pipeline to unit-test a rule. Because conditions are plain callables, you can exercise them with standard pytest fixtures.
from pyvorin_edge.sensors import SensorReading
condition = lambda r: r.value > 30.0
assert condition(SensorReading("t", 0.0, 31.0, "°C")) is True
assert condition(SensorReading("t", 0.0, 29.0, "°C")) is False