"Structured Log Management and Shipping"
Integrate structlog with Pyvorin Edge, configure log rotation, and ship logs to Loki, Splunk, and CloudWatch with correlation IDs.
Published Jun 2, 2026
Introduction
Logs at the edge are not an afterthought—they are evidence. When a temperature excursion
triggers a critical alert, the log trail must be complete, structured, and tamper-evident.
This article shows you how to integrate structlog with the Pyvorin Edge SDK,
rotate logs safely on flash storage, inject correlation IDs across pipeline stages, and ship
logs to Loki, Splunk, and CloudWatch without blocking the ingestion loop.
structlog Integration
The Edge Agent in edge_runtime/pyv_edge_agent/main.py uses the standard-library
logging module with a custom JSON formatter. For richer structured logging, we
recommend layering structlog on top. It preserves compatibility with the standard
library while adding type-safe key-value rendering and processor pipelines.
pip install structlog
import logging
import structlog
from structlog.stdlib import LoggerFactory
from structlog.processors import TimeStamper, JSONRenderer, add_log_level
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
TimeStamper(fmt="iso"),
JSONRenderer(),
],
context_class=dict,
logger_factory=LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger("pyvorin.edge")
logger.info("pipeline_started", pipeline="precision_agriculture", version="2.1.0")
Output:
{"event": "pipeline_started", "pipeline": "precision_agriculture", "version": "2.1.0", "logger": "pyvorin.edge", "level": "info", "timestamp": "2024-05-30T08:37:37.951919Z"}
Log Levels
Pyvorin Edge uses the standard five log levels. In production, set the level to
INFO; during commissioning, use DEBUG.
| Level | When to Use |
|---|---|
DEBUG | Sensor raw values, adapter state transitions, window buffer dumps. |
INFO | Pipeline start/stop, rule firings, cloud flush completions. |
WARNING | Adapter read failures, retry exhaustion, thermal throttling. |
ERROR | Rule action exceptions, SQLite write errors, compiler bridge failures. |
CRITICAL | Unrecoverable crashes, privacy chain tampering, disk full. |
Log Rotation with RotatingFileHandler
Flash storage wears out with excessive writes. Never log continuously to the same file on an
SD card. Use RotatingFileHandler with a modest maximum size and a small backup
count. This caps total log disk usage and distributes writes across inodes.
import logging
import logging.handlers
from pathlib import Path
LOG_DIR = Path("/var/log/pyvorin")
LOG_DIR.mkdir(parents=True, exist_ok=True)
handler = logging.handlers.RotatingFileHandler(
LOG_DIR / "edge.log",
maxBytes=1_048_576, # 1 MB per file
backupCount=3, # Keep edge.log, edge.log.1, edge.log.2, edge.log.3
)
formatter = logging.Formatter(
"%(asctime)s %(levelname)s %(name)s: %(message)s"
)
handler.setFormatter(formatter)
root = logging.getLogger()
root.handlers = []
root.addHandler(handler)
root.setLevel(logging.INFO)
Shipping to Loki
Grafana Loki accepts logs via a JSON POST to /loki/api/v1/push. The snippet below
reads the current log file and streams new lines. Run it as a systemd service or a sidecar
container.
import json
import urllib.request
from pathlib import Path
LOKI_URL = "http://loki.local:3100/loki/api/v1/push"
LOG_FILE = Path("/var/log/pyvorin/edge.log")
def ship_to_loki(lines: list[str]):
streams = [
{
"stream": {"job": "pyvorin-edge", "host": "rpi4-field-01"},
"values": [
[str(int(time.time() * 1e9)), line]
for line in lines
],
}
]
payload = json.dumps({"streams": streams}).encode("utf-8")
req = urllib.request.Request(
LOKI_URL,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
if resp.status != 204:
raise RuntimeError(f"Loki returned {resp.status}")
# In production, tail the file with a library like `pytail` or `watchdog`
if LOG_FILE.exists():
with open(LOG_FILE, "r", encoding="utf-8") as f:
ship_to_loki(f.readlines()[-100:]) # Ship last 100 lines
Shipping to Splunk
Splunk's HTTP Event Collector (HEC) expects a simple JSON envelope with an
event field and an optional sourcetype.
import json
import urllib.request
SPLUNK_HEC_URL = "https://splunk.local:8088/services/collector/event"
SPLUNK_TOKEN = "Splunk hec-token-here"
def ship_to_splunk(records: list[dict]):
data = "\n".join(
json.dumps({"event": rec, "sourcetype": "pyvorin:edge"})
for rec in records
).encode("utf-8")
req = urllib.request.Request(
SPLUNK_HEC_URL,
data=data,
headers={
"Authorization": SPLUNK_TOKEN,
"Content-Type": "application/json",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
if resp.status != 200:
raise RuntimeError(f"Splunk returned {resp.status}")
Shipping to CloudWatch
AWS CloudWatch Logs requires the boto3 SDK. Because it is a heavy dependency,
only install it if you are already running in an AWS environment.
import boto3
from botocore.exceptions import ClientError
LOG_GROUP = "/pyvorin/edge"
LOG_STREAM = "rpi4-field-01"
client = boto3.client("logs", region_name="eu-west-2")
def ship_to_cloudwatch(events: list[dict]):
try:
client.put_log_events(
logGroupName=LOG_GROUP,
logStreamName=LOG_STREAM,
logEvents=[
{
"timestamp": int(rec["timestamp"] * 1000),
"message": json.dumps(rec),
}
for rec in events
],
)
except ClientError as exc:
print(f"CloudWatch error: {exc}")
Correlation IDs Across Pipeline Stages
When a reading flows from ingestion through windows, rules, and cloud sync, it is processed by multiple threads and potentially multiple processes. A correlation ID ties the entire journey together for debugging.
import uuid
import structlog
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import SensorReading
logger = structlog.get_logger("pyvorin.edge")
def run_pipeline_with_cid(pipeline: Pipeline, readings: list[SensorReading]):
cid = str(uuid.uuid4())
structlog.contextvars.bind_contextvars(correlation_id=cid)
logger.info("pipeline_run_start", readings_count=len(readings))
result = pipeline.run(readings)
for ev in result.events:
logger.warning(
"rule_fired",
rule=ev.rule_name,
severity=ev.severity,
correlation_id=cid,
)
logger.info("pipeline_run_end", events=len(result.events), correlation_id=cid)
structlog.contextvars.clear_contextvars()
return result
Complete Logging Configuration
Save the following as logging_config.py and import it at the top of your agent
entrypoint. It sets up JSON rotation, structlog, and correlation-ID support in one place.
#!/usr/bin/env python3
"""Complete logging configuration for Pyvorin Edge."""
import logging
import logging.handlers
import structlog
from pathlib import Path
LOG_DIR = Path("/var/log/pyvorin")
LOG_DIR.mkdir(parents=True, exist_ok=True)
def configure_logging(level: str = "INFO") -> None:
# Standard-library handler
handler = logging.handlers.RotatingFileHandler(
LOG_DIR / "edge.log",
maxBytes=1_048_576,
backupCount=3,
)
handler.setFormatter(
logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s")
)
root = logging.getLogger()
root.handlers = []
root.addHandler(handler)
root.setLevel(getattr(logging, level.upper(), logging.INFO))
# structlog processors
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
if __name__ == "__main__":
configure_logging("DEBUG")
logger = structlog.get_logger("pyvorin.edge")
logger.info("logging_configured", log_dir=str(LOG_DIR))
Summary
You now have a production-grade logging stack: structured JSON via structlog,
flash-safe rotation via RotatingFileHandler, correlation IDs across pipeline
stages, and sidecar shippers for Loki, Splunk, and CloudWatch. Keep the log volume low at
the edge—ship summaries upstream and retain only DEBUG logs locally for 24 hours.