edge 13 min read

Actions and Events — Responding to the Edge

Writing action functions, the Event dataclass, logging, webhooks, local file output, and robust action error handling.

Published Jun 2, 2026

From Condition to Consequence

A rule without an action is a silent observer. While silent observation is useful for metrics and dashboards, most production pipelines need to do something when a threshold is breached. The Pyvorin Edge SDK lets you attach an arbitrary callable—an action—to any RuleConfig. When the rule fires, the action is invoked with the same reading object that triggered it.

The Event Dataclass

Every time a rule fires, the pipeline constructs an Event object. Events are collected in the PipelineResult.events list and can also be forwarded to local storage or the cloud.

FieldTypeDescription
rule_namestrName of the rule that fired.
timestampfloatEpoch time of the firing.
severitystrinfo, warning, or critical.
messagestrHuman-readable description.
sensor_readingsList[Any]The reading(s) that caused the fire.
metadatadictOptional enrichment data.

from pyvorin_edge.policies import Event

event = Event(
    rule_name="temp_spike",
    timestamp=1717152000.0,
    severity="critical",
    message="Rule 'temp_spike' fired",
    sensor_readings=[],
    metadata={"notified": False},
)
print(event.to_dict())
  

Writing Action Functions

An action is any callable accepting a single positional argument—the reading. Actions run synchronously inside the pipeline hot loop, so keep them fast. For slow I/O (HTTP POSTs, database writes), dispatch to a background thread or queue.


from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import SensorReading
import logging

logger = logging.getLogger("edge.actions")

def log_to_stderr(reading: SensorReading) -> None:
    logger.warning(
        "ALERT: %s = %.2f %s at ts=%s",
        reading.sensor_name,
        reading.value,
        reading.unit,
        reading.timestamp,
    )

pipeline = Pipeline("demo")
pipeline.add_rule(RuleConfig(
    name="high_temp",
    condition=lambda r: r.value > 50.0,
    action=log_to_stderr,
    severity="warning",
))
  

Logging Events

The simplest action is structured logging. Use Python's standard logging module with a JSON formatter so that edge logs can be scraped by Fluent Bit or Promtail.


import json
import logging

class JsonFormatter(logging.Formatter):
    def format(self, record):
        return json.dumps({
            "ts": record.created,
            "level": record.levelname,
            "msg": record.getMessage(),
            "rule": getattr(record, "rule_name", None),
        })

handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logging.getLogger("edge.actions").addHandler(handler)

def structured_log_action(reading: SensorReading) -> None:
    logging.getLogger("edge.actions").warning(
        "Rule fired",
        extra={"rule_name": "high_temp", "value": reading.value},
    )
  

Triggering Webhooks

For integrations with PagerDuty, Slack, or your own alert manager, post a JSON payload from inside the action. Use a thread-safe queue if the pipeline runs in a tight loop.


import json
import urllib.request
from queue import Queue
from threading import Thread

_webhook_queue: Queue = Queue()

def _webhook_worker():
    while True:
        url, payload = _webhook_queue.get()
        try:
            data = json.dumps(payload).encode("utf-8")
            req = urllib.request.Request(
                url,
                data=data,
                headers={"Content-Type": "application/json"},
                method="POST",
            )
            with urllib.request.urlopen(req, timeout=5) as resp:
                pass
        except Exception as exc:
            logging.error("Webhook failed: %s", exc)
        finally:
            _webhook_queue.task_done()

Thread(target=_webhook_worker, daemon=True).start()

def webhook_action_factory(url: str):
    def action(reading: SensorReading) -> None:
        _webhook_queue.put((url, {
            "sensor": reading.sensor_name,
            "value": reading.value,
            "unit": reading.unit,
            "ts": reading.timestamp,
        }))
    return action

pipeline.add_rule(RuleConfig(
    name="critical_temp",
    condition=lambda r: r.value > 80.0,
    action=webhook_action_factory("https://alerts.example.com/webhook"),
    severity="critical",
    cooldown_seconds=60.0,
))
  

Writing to Local File

When cloud connectivity is intermittent, persist events to a local append-only file. Use the logging module with a RotatingFileHandler for automatic rollover, or write line-delimited JSON yourself.


import json
from pathlib import Path

_EVENT_LOG = Path("/var/lib/pyvorin/events.jsonl")

def append_event_action(reading: SensorReading) -> None:
    record = {
        "sensor_name": reading.sensor_name,
        "timestamp": reading.timestamp,
        "value": reading.value,
        "unit": reading.unit,
    }
    with _EVENT_LOG.open("a", encoding="utf-8") as fh:
        fh.write(json.dumps(record) + "\n")

pipeline.add_rule(RuleConfig(
    name="log_everything",
    condition=lambda r: True,  # Fire on every reading
    action=append_event_action,
    severity="info",
    cooldown_seconds=0.0,
))
  

Action Error Handling

The pipeline wraps every action invocation in a try/except block. If an action raises an exception, the error is logged with logger.error(..., exc_info=True) and the pipeline continues processing the next reading. The event is still recorded in PipelineResult.events even if the action fails. This fail-open design ensures that a broken webhook URL does not stall your entire edge runtime.


def flaky_action(reading: SensorReading) -> None:
    # Simulate an external service that occasionally times out
    if reading.value > 90.0:
        raise TimeoutError("Downstream API unreachable")
    print(f"OK: {reading.sensor_name}")

pipeline = Pipeline("resilience_demo")
pipeline.add_rule(RuleConfig(
    name="maybe_breaks",
    condition=lambda r: r.value > 50.0,
    action=flaky_action,
    severity="warning",
))

# This batch contains one reading that will trigger the exception
readings = [
    SensorReading("t", 1717152000.0, 55.0, "°C"),   # action succeeds
    SensorReading("t", 1717152060.0, 95.0, "°C"),   # action raises
    SensorReading("t", 1717152120.0, 60.0, "°C"),   # action succeeds
]
result = pipeline.run(readings)
assert result.readings_processed == 3
# Events are still emitted for all three readings
  

Combining Multiple Actions

A rule accepts only one action callable, but you can multiplex inside that callable. The pattern below dispatches to a logger, a webhook, and a local file simultaneously.


def multi_action(reading: SensorReading) -> None:
    log_to_stderr(reading)
    append_event_action(reading)
    _webhook_queue.put(("https://alerts.example.com/webhook", reading.to_dict()))

pipeline.add_rule(RuleConfig(
    name="all_channels",
    condition=lambda r: r.value > 70.0,
    action=multi_action,
    severity="critical",
    cooldown_seconds=120.0,
))
  

Action Best Practices

  • Keep actions under 10 ms — anything longer belongs in a background thread or queue.
  • Make actions idempotent — the same reading may be reprocessed after a crash. Use reading timestamps as deduplication keys.
  • Do not mutate the reading — actions receive the original object. Mutating it can confuse downstream rules.
  • Use cooldowns for noisy channels — a rule posting to Slack every second will get your integration rate-limited.