edge 20 min read

Understanding the EdgeAgent

A deep dive into the EdgeAgent class, its lifecycle, threads, health endpoint, ingest adapters, buffers, and cloud queue.

Published Jun 2, 2026

Introduction

The EdgeAgent class is the heart of the Pyvorin Edge Runtime. It is not merely a script that polls sensors; it is a multi-threaded daemon that manages the entire data lifecycle from ingestion to storage to cloud synchronisation. This article explains every attribute, every thread, every lifecycle phase, and every design decision in plain English. By the end, you will know exactly what happens when you call agent.start() and why it matters.

Where EdgeAgent Lives

The class is defined in /var/www/pyvorin/edge_runtime/pyv_edge_agent/main.py. This is the entry point for both the pyv-edge CLI and programmatic usage. When you run pyv-edge --config config.toml, the main() function instantiates an EdgeAgent, registers signal handlers for graceful shutdown, and calls agent.start().

The Constructor

When you create an agent, it starts in a dormant state. No threads are spawned. No files are opened. No network connections are made. This lazy initialisation is deliberate: it lets you construct and configure the agent safely before committing resources.


from pyv_edge_agent.main import EdgeAgent

# Dormant state: no threads, no files, no sockets
agent = EdgeAgent(config_path="/home/pi/pyvorin-edge/config.toml")

# Inspect initial state
print(agent.is_running)  # False
print(agent.health())
# {
#   "running": False,
#   "config_path": "/home/pi/pyvorin-edge/config.toml",
#   "buffer_count": 0,
#   "cloud_pending": 0,
#   "privacy_enabled": True
# }
  

Key Attributes at Construction

Attribute Type Purpose
_config_path Optional[str] Path to the TOML/JSON config file
_config Config Configuration object (initially empty defaults)
_store Optional[SQLiteStore] SQLite evidence store (initially None)
_privacy PrivacyPolicy Privacy firewall rules
_cloud CloudSyncQueue Persistent upload queue
_uploader Optional[HTTPCloudUploader] HTTP client for cloud batches
_buffers Dict[str, RingBuffer] One ring buffer per configured device
_adapters Dict[str, Any] Ingest adapter instances per device
_readings_processed int Counter incremented per reading
_events_triggered int Counter incremented when value > 30.0
_running bool Main loop control flag
_shutdown_event threading.Event Thread-safe signal for graceful shutdown
_thread Optional[threading.Thread] The main agent loop thread
_health_server Optional[HTTPServer] Built-in HTTP health server
_health_thread Optional[threading.Thread] Daemon thread serving health HTTP
_system_metrics SystemMetrics Collector for CPU, RAM, disk, thermal

Lifecycle Diagram


  [Constructed] --start()--> [Starting]
                                   |
                                   v
                           [Config Loaded]
                                   |
                                   v
                           [Logging Setup]
                                   |
                                   v
                           [Components Init]
                                   |
                       +-----------+-----------+
                       |                       |
                       v                       v
               [Health Server]         [Agent Thread]
                       |                       |
                       +-----------+-----------+
                                   |
                                   v
                              [Running]
                                   |
                    +--------------+--------------+
                    |                             |
               run_once()                    run_loop()
           (single cycle)              (poll interval wait)
                    |                             |
                    +--------------+--------------+
                                   |
                           [Shutdown Signal]
                                   |
                                   v
                           [Stopping]
                                   |
                                   v
                    +--------------+--------------+
                    |                             |
               stop()                        SIGINT/SIGTERM
                    |                             |
                    +--------------+--------------+
                                   |
                                   v
                           [Health Stop]
                                   |
                                   v
                           [Thread Join]
                                   |
                                   v
                           [Store Close]
                                   |
                                   v
                              [Stopped]
  

Phase 1: Starting

Calling agent.start() triggers a strict sequence. If the agent is already running, it logs a warning and returns immediately to prevent double initialisation.

1.1 Load Configuration


if self._config_path:
    self._config = Config.from_file(self._config_path)
  

The Config.from_file() method reads TOML or JSON, substitutes environment variables of the form ${VAR} or ${VAR:-default}, and merges user settings over built-in defaults. If the file is missing, a FileNotFoundError is raised immediately so you know the problem is configuration, not runtime.

1.2 Setup Logging

The agent supports two log formats: JSON (default, for structured log aggregation) and plain text (for human readability during development). The root logger is reconfigured with a single StreamHandler writing to stdout. Any existing handlers are cleared to prevent duplicate output from third-party libraries.


log_cfg = self._config.get("logging", default={})
level = log_cfg.get("level", "INFO")
fmt = log_cfg.get("format", "json")
  

JSON format includes timestamp, level, logger name, message, and exception tracebacks if present. This makes it trivial to ship logs to Elasticsearch, Loki, or CloudWatch.

1.3 Initialise Components

_init_components() is where the heavy lifting happens. It creates, in order:

  1. SQLiteStore — a connection-pooled, WAL-mode SQLite database for readings, events, summaries, and audit logs.
  2. PrivacyPolicy — loaded from the [privacy] config section.
  3. CloudSyncQueue — loaded from the [cloud] config section.
  4. RingBuffers — one per device, configured with capacity and dtype from [windows].
  5. Ingest Adapters — created based on each device's ingest_type (simulator, mqtt, http, file).
  6. HTTPCloudUploader — created only if cloud.enabled is true and an endpoint is provided.

1.4 Start the Health Server

The health server is a minimal HTTP server using Python's built-in http.server module. It runs in a daemon thread so it does not block agent shutdown. It responds to:

  • GET /health — comprehensive status JSON including agent state, system metrics, cloud queue depth, privacy rules, and ingest adapters.
  • GET /metrics — raw system metrics only (CPU, RAM, disk, thermal, uptime).
  • OPTIONS * — CORS preflight responses.

The handler is implemented as _HealthHandler, a subclass of BaseHTTPRequestHandler. It holds a class-level reference to the agent instance so it can inspect live state without global variables.


class _HealthHandler(BaseHTTPRequestHandler):
    agent_ref: Optional["EdgeAgent"] = None

    def do_GET(self):
        if self.path == "/health":
            status = "healthy" if (self.agent_ref and self.agent_ref.is_running) else "unhealthy"
            # ... builds comprehensive JSON ...
  

1.5 Start the Main Thread

Finally, the agent spawns its main loop thread as a daemon:


self._thread = threading.Thread(target=self.run_loop, daemon=True, name="edge-agent")
self._thread.start()
  

Daemons are chosen deliberately: if the main process exits unexpectedly, the threads terminate with it, preventing zombie processes.

Phase 2: Running

2.1 The Main Loop

run_loop() is deceptively simple:


def run_loop(self) -> None:
    interval = float(self._config.get("sensors", "poll_interval_seconds", default=5.0))
    while self._running and not self._shutdown_event.is_set():
        try:
            self.run_once()
        except Exception:
            logger.exception("Error in run_loop iteration")
        self._shutdown_event.wait(timeout=interval)
  

It reads the configured poll interval (default 5 seconds), then repeats until shutdown. The wait uses threading.Event.wait() rather than time.sleep() so that stop() can wake it immediately instead of waiting for the sleep to finish.

2.2 A Single Iteration

run_once() performs the core data lifecycle:

  1. Collect. For each buffer (and therefore each device), the agent asks the corresponding adapter for a value. It tries multiple adapter methods in order: read(), generate_batch(), and generate_reading().
  2. Wrap. The raw value is wrapped in a SensorReading dataclass with the current timestamp.
  3. Filter. The reading is passed through PrivacyPolicy.evaluate(). If the result is None, the reading is dropped and never stored.
  4. Buffer. If allowed, the reading is appended to the device's ring buffer.
  5. Store. The reading is inserted into the SQLite readings table.
  6. Enqueue. The reading is serialised and added to the cloud sync queue.
  7. Event Detection. If the value exceeds 30.0, the _events_triggered counter increments. (In production, you would configure explicit rules in [rules] instead of this hard-coded threshold.)
  8. Flush. CloudSyncQueue.maybe_flush() checks whether the uploader should send a batch. If cloud sync is disabled, items remain in the queue indefinitely until polled manually or the database is purged.

2.3 Adapter Resilience

Adapters are allowed to fail. If _get_reading_value() raises an exception, the agent logs a warning and returns 0.0. This means a flaky MQTT connection or a missing sensor does not crash the entire agent; it simply records a zero for that cycle and tries again next interval.


def _get_reading_value(self, name: str) -> float:
    adapter = self._adapters.get(name)
    if adapter is None:
        return 0.0
    try:
        # Try read(), generate_batch(), generate_reading() in order
        ...
    except Exception as exc:
        logger.warning("Adapter read failed for %s: %s", name, exc)
    return 0.0
  

Phase 3: Stopping

Calling agent.stop() (or sending SIGINT/SIGTERM) triggers a graceful shutdown:

  1. Set _running = False and signal _shutdown_event to wake the main loop immediately.
  2. Shut down the health HTTP server and join its thread with a 2-second timeout.
  3. Join the main agent thread with a 10-second timeout.
  4. Close the SQLite store, which closes all pooled connections.
  5. Log "EdgeAgent stopped".

The 10-second timeout on the main thread is important. If run_once() is blocked on a slow adapter or database write, the agent will wait up to 10 seconds before forcefully continuing shutdown. This prevents indefinite hangs while still giving in-flight operations a chance to complete.

Context Manager Support

EdgeAgent implements __enter__ and __exit__, so you can use it with Python's with statement for automatic lifecycle management:


from pyv_edge_agent.main import EdgeAgent

with EdgeAgent(config_path="/home/pi/pyvorin-edge/config.toml") as agent:
    # Agent is running. Do work here.
    import time
    time.sleep(30)
    # On exit, agent.stop() is called automatically.
  

This pattern is excellent for unit tests and short-lived batch jobs because it guarantees cleanup even if an exception is raised.

Signal Handling

The CLI entry point registers handlers for both SIGINT (Ctrl+C) and SIGTERM (systemd stop, Docker stop):


signal.signal(signal.SIGTERM, lambda s, f: _signal_handler(agent, s, f))
signal.signal(signal.SIGINT, lambda s, f: _signal_handler(agent, s, f))
  

The handler logs the signal, calls agent.stop(), and exits the process with code 0. This makes Pyvorin Edge friendly to process supervisors like systemd and container orchestrators.

Thread Safety Summary

Component Thread Safety Mechanism
RingBuffer Thread-safe threading.Lock around append/peek/pop
SQLiteStore Thread-safe Connection pool + per-thread connections
CloudSyncQueue Thread-safe threading.RLock around DB operations
PrivacyPolicy Read-safe Immutable rules list; no locks needed for reads
SystemMetrics Thread-safe threading.Lock around CPU delta state

Summary

The EdgeAgent is a carefully orchestrated multi-threaded daemon. It loads configuration, sets up logging, initialises a SQLite store, creates ring buffers and ingest adapters, starts a health HTTP server, and enters a poll-evaluate-store-sync loop. It handles adapter failures gracefully, supports clean shutdown via signals or context managers, and every shared resource is protected by locks. Understanding this lifecycle is essential for debugging, extending, and operating Pyvorin Edge in production.