edge 12 min read

Profiling Edge Pipelines and System Metrics

How to profile edge pipelines with cProfile, interpret memory usage, identify bottlenecks, and use the SystemMetrics class and HTTP health endpoints.

Published Jun 2, 2026

Why Profile Edge Pipelines?

Edge devices operate under tight constraints: limited CPU cores, small memory footprints, and thermal throttling under sustained load. A pipeline that runs in 0.5 ms on a developer's laptop may take 5 ms on a Raspberry Pi, causing backpressure and missed readings. Profiling is not optional — it is a prerequisite for production deployment.

The Pyvorin Edge SDK provides three levels of observability: per-pipeline latency tracking in PipelineResult, CPU and memory profiling with standard Python tools, and system-level metrics via the SystemMetrics class and HTTP endpoints.

PipelineResult and Latency Tracking

Every call to Pipeline.run(readings) returns a PipelineResult dataclass containing the average latency per reading:


from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType

pipeline = Pipeline(name="thermal_monitor")
pipeline.add_sensor(Sensor(name="boiler", sensor_type=SensorType.TEMPERATURE, unit="C"))
pipeline.add_rule(RuleConfig(name="overheat", condition_expr="ctx.value > 85.0"))

# Generate 10,000 synthetic readings
readings = [
    SensorReading(sensor_name="boiler", timestamp=float(i), value=80.0 + (i % 10))
    for i in range(10_000)
]

result = pipeline.run(readings)
print(f"Processed: {result.readings_processed}")
print(f"Evaluations: {result.evaluations}")
print(f"Avg latency: {result.latency_ms:.4f} ms")
print(f"Events: {len(result.events)}")
  

The latency_ms field is the mean time to process one reading, including window updates and rule evaluations. If this value exceeds your sampling interval (e.g., 1000 ms for 1 Hz), the pipeline cannot keep up and you must optimize or downsample.

CPU Profiling with cProfile

Use Python's built-in cProfile to find hot functions in your pipeline logic:


import cProfile
import pstats
import io

profiler = cProfile.Profile()
profiler.enable()

# Run your pipeline
result = pipeline.run(readings)

profiler.disable()

# Print top 20 functions by cumulative time
stream = io.StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats("cumtime")
stats.print_stats(20)
print(stream.getvalue())
  

Look for these common bottlenecks:

  • eval() on condition_expr — consider compiling the expression with CompilerBridge instead.
  • RollingWindow.add_reading() or TumblingWindow.compute() — if these dominate, reduce window size or switch to tumbling windows.
  • SensorReading.to_dict() — if serialization shows up heavily, batch your dict conversions.

Memory Profiling

Memory leaks on edge devices eventually trigger the OOM killer. Use tracemalloc to track allocations:


import tracemalloc

tracemalloc.start()

# Warm up
pipeline.run(readings[:100])

# Snapshot before
before = tracemalloc.take_snapshot()

# Heavy load
for _ in range(100):
    pipeline.run(readings)

# Snapshot after
after = tracemalloc.take_snapshot()

diff = after.compare_to(before, "lineno")
for stat in diff[:10]:
    print(stat)
  

If CloudSyncQueue.enqueue() appears in the top allocators, your queue depth may be growing faster than the uploader can flush. Check pending_count() and consider reducing TTL or increasing flush frequency.

SystemMetrics Class

The edge runtime includes a SystemMetrics class that reads from /proc to provide CPU, RAM, disk, thermal, and uptime data. It is thread-safe and designed for low overhead.


from pyv_edge_agent.health_monitor.metrics import SystemMetrics, MetricsSnapshot

metrics = SystemMetrics()

# Individual readings
print(f"CPU:     {metrics.cpu_percent():.1f}%")
print(f"RAM:     {metrics.ram_percent():.1f}%")
print(f"Disk:    {metrics.disk_percent('/'):.1f}%")
print(f"Thermal: {metrics.thermal_celsius()}°C")
print(f"Uptime:  {metrics.uptime_seconds():.0f}s")

# Full immutable snapshot
snapshot: MetricsSnapshot = metrics.snapshot()
print(snapshot.to_dict())
  

SystemMetrics API Reference

MethodReturn TypeDescription
cpu_percent()floatCPU utilization [0, 100] from /proc/stat. First call returns 0.0 because a delta is required.
ram_percent()floatRAM utilization [0, 100] from /proc/meminfo (MemTotal vs MemAvailable).
disk_percent(path="/")floatDisk utilization [0, 100] for the given mount point.
thermal_celsius()Optional[float]SoC temperature from /sys/class/thermal/thermal_zone0/temp, or None.
uptime_seconds()floatSystem uptime from /proc/uptime.
snapshot()MetricsSnapshotImmutable snapshot of all metrics at the current instant.
to_dict()dict[str, object]JSON-serializable dict of the current snapshot.

Interpreting /health and /metrics Endpoints

When the EdgeAgent starts, it launches a lightweight HTTP server on port 8080 (configurable). Two endpoints are exposed:

GET /health

Returns a comprehensive health document including agent status, system metrics, cloud queue depth, and privacy settings:


{
  "status": "healthy",
  "timestamp": 1717234567.89,
  "metrics": {
    "cpu_percent": 12.5,
    "ram_percent": 34.2,
    "disk_percent": 67.8,
    "thermal_celsius": 52.3,
    "uptime_seconds": 86400.0
  },
  "agent": {
    "running": true,
    "buffer_count": 4,
    "readings_processed": 123456,
    "events_triggered": 42
  },
  "cloud": {
    "queue_depth": 7,
    "last_flush_time": 1717234500.0,
    "messages_sent_today": 1024,
    "endpoint": "https://api.pyvorin.com/v1/ingest"
  },
  "privacy": {
    "enabled": true,
    "rules_active": 3,
    "fields_redacted": ["location"],
    "fields_hashed": ["device_id"]
  },
  "ingest": {
    "adapters_connected": ["simulator", "mqtt"],
    "devices_configured": 4
  }
}
  

GET /metrics

Returns only the SystemMetrics snapshot for lightweight monitoring integrations:


{
  "cpu_percent": 12.5,
  "ram_percent": 34.2,
  "disk_percent": 67.8,
  "thermal_celsius": 52.3,
  "uptime_seconds": 86400.0,
  "timestamp": 1717234567.89
}
  

Identifying Bottlenecks

Use this decision tree when pipeline latency is too high:

  1. Check latency_ms: Is it consistently high or spiky? Spikes suggest GC pauses or thermal throttling.
  2. Check cpu_percent: If CPU is near 100%, the pipeline is compute-bound. Profile with cProfile and compile hot paths with @edge.
  3. Check ram_percent: If RAM is high, look for unbounded queue growth or large window buffers. Reduce RingBuffer capacity.
  4. Check thermal_celsius: On Raspberry Pi, sustained loads above 80°C trigger throttling. Add a heatsink or reduce polling frequency.
  5. Check cloud.queue_depth: If the queue grows monotonically, the uploader is failing or the endpoint is slow. Check nack() retry counts with CloudSyncQueue.get_stats().

End-to-End Profiling Script

Here is a complete script that profiles a pipeline, collects system metrics, and prints a diagnostic summary:


import time
import cProfile
import pstats
import io
from pyv_edge_agent.health_monitor.metrics import SystemMetrics
from pyvorin_edge.pipeline import Pipeline, RuleConfig, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading

# Build pipeline
pipeline = Pipeline(name="profiler_demo")
pipeline.add_sensor(Sensor(name="motor", sensor_type=SensorType.VIBRATION, unit="g"))
pipeline.add_window(WindowConfig(duration_seconds=60.0, sensor_name="motor"))
pipeline.add_rule(RuleConfig(name="high_vibration", condition_expr="ctx.value > 5.0"))

# Synthetic load
readings = [
    SensorReading(sensor_name="motor", timestamp=float(i), value=2.0 + (i % 4))
    for i in range(50_000)
]

# System metrics before
metrics = SystemMetrics()
before = metrics.snapshot()

# Profile
profiler = cProfile.Profile()
profiler.enable()
start = time.perf_counter()

result = pipeline.run(readings)

elapsed = (time.perf_counter() - start) * 1000.0
profiler.disable()

# System metrics after
after = metrics.snapshot()

# Report
stream = io.StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats("cumtime")
stats.print_stats(10)

print("=== Pipeline Result ===")
print(f"Readings: {result.readings_processed}")
print(f"Latency:  {result.latency_ms:.4f} ms/reading")
print(f"Total:    {elapsed:.2f} ms")
print(f"Events:   {len(result.events)}")

print("\n=== System Metrics Delta ===")
print(f"CPU:   {before.cpu_percent:.1f}% -> {after.cpu_percent:.1f}%")
print(f"RAM:   {before.ram_percent:.1f}% -> {after.ram_percent:.1f}%")
print(f"Temp:  {before.thermal_celsius}°C -> {after.thermal_celsius}°C")

print("\n=== Top 10 by Cumulative Time ===")
print(stream.getvalue())