Actions and Events — Responding to the Edge
Writing action functions, the Event dataclass, logging, webhooks, local file output, and robust action error handling.
Published Jun 2, 2026
From Condition to Consequence
A rule without an action is a silent observer. While silent observation is useful for metrics and dashboards, most production pipelines need to do something when a threshold is breached. The Pyvorin Edge SDK lets you attach an arbitrary callable—an action—to any RuleConfig. When the rule fires, the action is invoked with the same reading object that triggered it.
The Event Dataclass
Every time a rule fires, the pipeline constructs an Event object. Events are collected in the PipelineResult.events list and can also be forwarded to local storage or the cloud.
| Field | Type | Description |
|---|---|---|
rule_name | str | Name of the rule that fired. |
timestamp | float | Epoch time of the firing. |
severity | str | info, warning, or critical. |
message | str | Human-readable description. |
sensor_readings | List[Any] | The reading(s) that caused the fire. |
metadata | dict | Optional enrichment data. |
from pyvorin_edge.policies import Event
event = Event(
rule_name="temp_spike",
timestamp=1717152000.0,
severity="critical",
message="Rule 'temp_spike' fired",
sensor_readings=[],
metadata={"notified": False},
)
print(event.to_dict())
Writing Action Functions
An action is any callable accepting a single positional argument—the reading. Actions run synchronously inside the pipeline hot loop, so keep them fast. For slow I/O (HTTP POSTs, database writes), dispatch to a background thread or queue.
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import SensorReading
import logging
logger = logging.getLogger("edge.actions")
def log_to_stderr(reading: SensorReading) -> None:
logger.warning(
"ALERT: %s = %.2f %s at ts=%s",
reading.sensor_name,
reading.value,
reading.unit,
reading.timestamp,
)
pipeline = Pipeline("demo")
pipeline.add_rule(RuleConfig(
name="high_temp",
condition=lambda r: r.value > 50.0,
action=log_to_stderr,
severity="warning",
))
Logging Events
The simplest action is structured logging. Use Python's standard logging module with a JSON formatter so that edge logs can be scraped by Fluent Bit or Promtail.
import json
import logging
class JsonFormatter(logging.Formatter):
def format(self, record):
return json.dumps({
"ts": record.created,
"level": record.levelname,
"msg": record.getMessage(),
"rule": getattr(record, "rule_name", None),
})
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logging.getLogger("edge.actions").addHandler(handler)
def structured_log_action(reading: SensorReading) -> None:
logging.getLogger("edge.actions").warning(
"Rule fired",
extra={"rule_name": "high_temp", "value": reading.value},
)
Triggering Webhooks
For integrations with PagerDuty, Slack, or your own alert manager, post a JSON payload from inside the action. Use a thread-safe queue if the pipeline runs in a tight loop.
import json
import urllib.request
from queue import Queue
from threading import Thread
_webhook_queue: Queue = Queue()
def _webhook_worker():
while True:
url, payload = _webhook_queue.get()
try:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=5) as resp:
pass
except Exception as exc:
logging.error("Webhook failed: %s", exc)
finally:
_webhook_queue.task_done()
Thread(target=_webhook_worker, daemon=True).start()
def webhook_action_factory(url: str):
def action(reading: SensorReading) -> None:
_webhook_queue.put((url, {
"sensor": reading.sensor_name,
"value": reading.value,
"unit": reading.unit,
"ts": reading.timestamp,
}))
return action
pipeline.add_rule(RuleConfig(
name="critical_temp",
condition=lambda r: r.value > 80.0,
action=webhook_action_factory("https://alerts.example.com/webhook"),
severity="critical",
cooldown_seconds=60.0,
))
Writing to Local File
When cloud connectivity is intermittent, persist events to a local append-only file. Use the logging module with a RotatingFileHandler for automatic rollover, or write line-delimited JSON yourself.
import json
from pathlib import Path
_EVENT_LOG = Path("/var/lib/pyvorin/events.jsonl")
def append_event_action(reading: SensorReading) -> None:
record = {
"sensor_name": reading.sensor_name,
"timestamp": reading.timestamp,
"value": reading.value,
"unit": reading.unit,
}
with _EVENT_LOG.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record) + "\n")
pipeline.add_rule(RuleConfig(
name="log_everything",
condition=lambda r: True, # Fire on every reading
action=append_event_action,
severity="info",
cooldown_seconds=0.0,
))
Action Error Handling
The pipeline wraps every action invocation in a try/except block. If an action raises an exception, the error is logged with logger.error(..., exc_info=True) and the pipeline continues processing the next reading. The event is still recorded in PipelineResult.events even if the action fails. This fail-open design ensures that a broken webhook URL does not stall your entire edge runtime.
def flaky_action(reading: SensorReading) -> None:
# Simulate an external service that occasionally times out
if reading.value > 90.0:
raise TimeoutError("Downstream API unreachable")
print(f"OK: {reading.sensor_name}")
pipeline = Pipeline("resilience_demo")
pipeline.add_rule(RuleConfig(
name="maybe_breaks",
condition=lambda r: r.value > 50.0,
action=flaky_action,
severity="warning",
))
# This batch contains one reading that will trigger the exception
readings = [
SensorReading("t", 1717152000.0, 55.0, "°C"), # action succeeds
SensorReading("t", 1717152060.0, 95.0, "°C"), # action raises
SensorReading("t", 1717152120.0, 60.0, "°C"), # action succeeds
]
result = pipeline.run(readings)
assert result.readings_processed == 3
# Events are still emitted for all three readings
Combining Multiple Actions
A rule accepts only one action callable, but you can multiplex inside that callable. The pattern below dispatches to a logger, a webhook, and a local file simultaneously.
def multi_action(reading: SensorReading) -> None:
log_to_stderr(reading)
append_event_action(reading)
_webhook_queue.put(("https://alerts.example.com/webhook", reading.to_dict()))
pipeline.add_rule(RuleConfig(
name="all_channels",
condition=lambda r: r.value > 70.0,
action=multi_action,
severity="critical",
cooldown_seconds=120.0,
))
Action Best Practices
- Keep actions under 10 ms — anything longer belongs in a background thread or queue.
- Make actions idempotent — the same reading may be reprocessed after a crash. Use reading timestamps as deduplication keys.
- Do not mutate the reading — actions receive the original object. Mutating it can confuse downstream rules.
- Use cooldowns for noisy channels — a rule posting to Slack every second will get your integration rate-limited.