edge 16 min read

Rules and Conditions — Logic at the Edge

Write lambda conditions, composite AND/OR rules, stateful evaluations, and priority ordering with ten complete production examples.

Published Jun 2, 2026

Rule Anatomy

A rule is defined by a RuleConfig dataclass and evaluated by the pipeline's _evaluate_rule() method. At minimum, a rule needs a name and a condition. It may also carry an action, a severity level, and a cooldown timer to prevent alert spam.

FieldTypeDescription
namestrUnique rule identifier.
conditionCallable[[Any], bool] | NonePython callable evaluated against the reading.
condition_exprstr | NoneString expression evaluated in a restricted sandbox.
actionCallable[[Any], None] | NoneOptional side-effect callable.
severitystrOne of "info", "warning", "critical".
cooldown_secondsfloatMinimum seconds between successive fires.

Lambda Conditions

The simplest conditions are Python lambdas that inspect the current SensorReading. The condition receives the reading object itself, so you have full access to sensor_name, timestamp, value, unit, and metadata.


from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import SensorReading

pipeline = Pipeline("cold_chain")
pipeline.add_rule(RuleConfig(
    name="temp_too_high",
    condition=lambda r: r.value > 8.0 and r.unit == "°C",
    severity="warning",
    cooldown_seconds=300.0,
))

# Evaluate manually
reading = SensorReading("vaccine_fridge", 1717152000.0, 9.5, "°C")
result = pipeline.run([reading])
print(len(result.events))  # 1
  

String Expressions

If you load rules from a configuration file, you may not want to ship raw Python callables. The SDK supports string expressions via condition_expr. These are evaluated in a restricted environment containing only safe builtins: abs, max, min, sum, len, round, float, int, bool. The reading is bound to the variable ctx.


pipeline.add_rule(RuleConfig(
    name="pressure_low",
    condition_expr="ctx.value < 1.5 and ctx.unit == 'bar'",
    severity="critical",
    cooldown_seconds=60.0,
))
  

Composite Rules — AND / OR

The SDK does not provide a dedicated composite-rule DSL. Instead, compose logic with plain Python inside a single lambda, or chain multiple independent rules. For complex scenarios, write a named function and reference it.


def composite_condition(r: SensorReading) -> bool:
    """Fire if temperature is high AND the calibration flag is missing."""
    temp_high = r.value > 35.0 and r.unit == "°C"
    uncalibrated = not r.metadata.get("calibrated", False)
    return temp_high and uncalibrated

pipeline.add_rule(RuleConfig(
    name="uncalibrated_overheat",
    condition=composite_condition,
    severity="critical",
    cooldown_seconds=600.0,
))
  

Stateful Rules

Because condition callables can close over mutable objects in your own module, you can implement arbitrary state machines. A common pattern is a counter that requires N consecutive violations before firing.


class ConsecutiveThresholdRule:
    def __init__(self, threshold: float, count: int):
        self.threshold = threshold
        self.count = count
        self._violations = 0

    def __call__(self, r: SensorReading) -> bool:
        if r.value > self.threshold:
            self._violations += 1
            if self._violations >= self.count:
                self._violations = 0
                return True
        else:
            self._violations = 0
        return False

pipeline.add_rule(RuleConfig(
    name="three_strikes_overheat",
    condition=ConsecutiveThresholdRule(threshold=40.0, count=3),
    severity="critical",
))
  

Rule Priority and Ordering

Rules are evaluated in the exact order they are added via add_rule(). There is no numeric priority field. If order matters— for example, a "critical shutdown" rule should always be evaluated before a "log info" rule—simply add them in that sequence. The PipelineResult records every event fired, so downstream consumers can filter by severity if needed.


# 1. Emergency stop — evaluated first
pipeline.add_rule(RuleConfig(
    name="emergency_stop",
    condition=lambda r: r.value > 100.0,
    severity="critical",
))

# 2. General warning — evaluated second
pipeline.add_rule(RuleConfig(
    name="high_value",
    condition=lambda r: r.value > 80.0,
    severity="warning",
))
  

Cooldown Behaviour

The pipeline maintains a dictionary _last_rule_fires keyed by rule name. If a rule fires, its timestamp is recorded. Subsequent evaluations within cooldown_seconds are silently ignored. This prevents MQTT broker flooding or SMS gateway rate-limit breaches.


pipeline.add_rule(RuleConfig(
    name="noisy_sensor",
    condition=lambda r: r.value > 50.0,
    severity="warning",
    cooldown_seconds=3600.0,  # Once per hour max
))
  

Ten Complete Rule Examples

1. Temperature Threshold


pipeline.add_rule(RuleConfig(
    name="temp_above_30",
    condition=lambda r: r.sensor_name.startswith("temp_") and r.value > 30.0,
    severity="warning",
    cooldown_seconds=300.0,
))
  

2. Vibration Anomaly


pipeline.add_rule(RuleConfig(
    name="vibration_anomaly",
    condition=lambda r: r.sensor_name == "motor_vibration" and r.value > 10.0,
    severity="critical",
    cooldown_seconds=60.0,
))
  

3. Leak Detection (Boolean)


pipeline.add_rule(RuleConfig(
    name="water_leak",
    condition=lambda r: r.sensor_name == "basement_leak" and r.value >= 1.0,
    severity="critical",
    cooldown_seconds=0.0,
))
  

4. CO2 Ventilation Alert


pipeline.add_rule(RuleConfig(
    name="co2_high",
    condition=lambda r: r.unit == "ppm" and r.value > 1200,
    severity="warning",
    cooldown_seconds=600.0,
))
  

5. Voltage Sag


pipeline.add_rule(RuleConfig(
    name="voltage_sag",
    condition=lambda r: r.unit == "V" and r.value < 210.0,
    severity="critical",
    cooldown_seconds=30.0,
))
  

6. Occupancy After Hours


import time

pipeline.add_rule(RuleConfig(
    name="after_hours_occupancy",
    condition=lambda r: (
        r.sensor_name == "lobby_pir"
        and r.value > 0
        and not (8 <= time.localtime(r.timestamp).tm_hour < 18)
    ),
    severity="info",
    cooldown_seconds=900.0,
))
  

7. Humidity Mould Risk


pipeline.add_rule(RuleConfig(
    name="mould_risk",
    condition=lambda r: r.unit == "%RH" and r.value > 75.0,
    severity="warning",
    cooldown_seconds=1800.0,
))
  

8. Current Overload


pipeline.add_rule(RuleConfig(
    name="current_overload",
    condition=lambda r: r.unit == "A" and r.value > 32.0,
    severity="critical",
    cooldown_seconds=10.0,
))
  

9. Stale Sensor Check


import time

_last_seen: dict[str, float] = {}

def stale_sensor_rule(r):
    _last_seen[r.sensor_name] = r.timestamp
    # Only fire if we have a baseline and the gap is huge
    return False  # Logic handled by a scheduled health check instead

pipeline.add_rule(RuleConfig(
    name="stale_sensor_placeholder",
    condition=stale_sensor_rule,
    severity="info",
))
  

10. Multi-Field Metadata Check


pipeline.add_rule(RuleConfig(
    name="uncertified_probe_overheat",
    condition=lambda r: (
        r.value > 35.0
        and r.unit == "°C"
        and r.metadata.get("certification") != "ISO17025"
    ),
    severity="warning",
    cooldown_seconds=600.0,
))
  

Testing Rules in Isolation

You do not need a full pipeline to unit-test a rule. Because conditions are plain callables, you can exercise them with standard pytest fixtures.


from pyvorin_edge.sensors import SensorReading

condition = lambda r: r.value > 30.0
assert condition(SensorReading("t", 0.0, 31.0, "°C")) is True
assert condition(SensorReading("t", 0.0, 29.0, "°C")) is False