edge 14 min read

EdgeAgent API Reference

Complete reference for the EdgeAgent class, including constructor, lifecycle methods, health checks, threading model, and all public properties.

Published Jun 2, 2026

Overview

EdgeAgent is the main daemon class of the Pyvorin Edge Runtime. It orchestrates sensor ingestion, pipeline evaluation, local storage, privacy filtering, cloud synchronization, and health monitoring. The agent is designed to run as a long-lived background thread with an optional embedded HTTP health server.

This article provides a complete reference of every public method, property, and internal hook in EdgeAgent, with signatures, parameters, return types, exceptions, and usage examples.

Constructor


class EdgeAgent:
    def __init__(self, config_path: Optional[str] = None) -> None:
        ...
  
ParameterTypeDefaultDescription
config_pathOptional[str]NonePath to a TOML or JSON configuration file. If None, the agent starts with an empty default config and must be configured programmatically.

The constructor initializes all internal state but does not start any threads or open network connections. Call start() to begin operation.


from pyv_edge_agent.main import EdgeAgent

# With a config file
agent = EdgeAgent(config_path="/etc/pyvorin/edge_config.toml")

# Without a config file (programmatic setup)
agent = EdgeAgent()
  

Lifecycle Methods

start()


def start(self) -> None
  

Starts the agent. This method is idempotent: if the agent is already running, it logs a warning and returns immediately. The sequence of operations is:

  1. Load configuration from config_path if provided.
  2. Set up logging (JSON or plain text) based on config.
  3. Initialize components: SQLite store, privacy policy, cloud sync queue, ring buffers, and ingest adapters.
  4. Start the health HTTP server if enabled.
  5. Spawn the main agent thread running run_loop().

stop()


def stop(self) -> None
  

Gracefully shuts down the agent. Sets the running flag to False, signals the shutdown event, stops the health server, joins the main thread (with a 10-second timeout), and closes the SQLite store. Safe to call multiple times.

run_once()


def run_once(self) -> None
  

Executes a single iteration of the agent's main loop: collect readings from all adapters, apply privacy filtering, store locally, enqueue for cloud sync, and evaluate triggers. This is useful for:

  • Unit testing the agent without starting a background thread.
  • Cron-style scheduling where an external system drives the loop.
  • Debugging — step through one iteration at a time.

agent = EdgeAgent(config_path="edge_config.toml")
agent.start()
agent.run_once()   # Process one tick
agent.stop()
  

run_loop()


def run_loop(self) -> None
  

The internal background loop. Reads sensors.poll_interval_seconds from config (default 5.0), then repeatedly calls run_once() and waits on the shutdown event. You should not call this directly in production; use start() instead. Exceptions inside the loop are caught and logged to prevent the agent from crashing.

Health and Status

health()


def health(self) -> Dict[str, Any]
  

Returns a dictionary with current agent health:


{
    "running": True,
    "config_path": "/etc/pyvorin/edge_config.toml",
    "buffer_count": 4,
    "cloud_pending": 12,
    "privacy_enabled": True,
}
  

is_running (property)


@property
def is_running(self) -> bool
  

Thread-safe read of the internal running flag. Returns True after start() succeeds and before stop() completes.

Internal Hooks

_init_ingest()


def _init_ingest(self) -> None
  

Initializes ingest adapters based on the sensors.devices configuration. Supported ingest types:

  • "simulator"SimulatorAdapter with synthetic data generation.
  • "mqtt"MQTTAdapter for MQTT broker subscriptions.
  • "http"HTTPAdapter for REST polling endpoints.
  • "file"FileReplayAdapter for replaying recorded CSV/JSON logs.

Adapters are stored in self._adapters keyed by device name. Ingest type is tracked in self._adapter_types. Import errors are logged as warnings and the device is skipped.

_get_reading_value()


def _get_reading_value(self, name: str) -> float
  
ParameterTypeDescription
namestrDevice name to read from.

Returns the current sensor value as a float. The method probes the adapter in three ways:

  1. If the adapter has a read() method, call it and extract "value" from the returned dict, or cast to float.
  2. If the adapter has generate_batch(), call it with a 1-second window and extract the first reading.
  3. If the adapter has generate_reading(name), call it and extract the value.

If the adapter is missing or any step raises an exception, the method logs a warning and returns 0.0.


value = agent._get_reading_value("boiler_temp")
print(value)  # e.g., 82.5
  

Context Manager Support

EdgeAgent implements __enter__ and __exit__, allowing use in with statements for guaranteed cleanup:


with EdgeAgent(config_path="edge_config.toml") as agent:
    # Agent is running in a background thread
    time.sleep(60)
    print(f"Processed {agent._readings_processed} readings")
# Agent.stop() is called automatically on exit
  

Threading Model

EdgeAgent uses a multi-threaded architecture:

  • Main thread: Calls start() / stop() and handles signals.
  • Agent thread (self._thread): Daemon thread executing run_loop(). Named "edge-agent".
  • Health server thread (self._health_thread): Daemon thread serving _HealthHandler on the configured port. Named "health-server".

All shared mutable state (_readings_processed, _events_triggered, _running) is either atomic (int assignments in CPython) or protected by the GIL. The CloudSyncQueue and SystemMetrics classes use their own internal locks.

Configuration Schema

The agent expects a hierarchical configuration. Key sections:


[logging]
level = "INFO"
format = "json"

[sensors]
poll_interval_seconds = 5.0

[[sensors.devices]]
name = "boiler"
ingest_type = "simulator"
sensor_type = "temperature"
unit = "C"
min_value = 0.0
max_value = 120.0
noise_std = 0.5
baseline = 60.0

[windows]
default_size = 100
default_dtype = "float"

[store]
path = "edge_store.db"

[privacy]
enabled = true

[cloud]
enabled = true
endpoint = "https://api.pyvorin.com/v1/ingest"
api_key = "pyv_live_xxxxxxxx"
timeout = 30.0

[health]
enabled = true
port = 8080
  

Signal Handling

When run via main(), EdgeAgent registers handlers for SIGTERM and SIGINT that call agent.stop() and exit cleanly. In embedded or library usage, you should register your own handlers or call stop() explicitly.


import signal

agent = EdgeAgent(config_path="edge_config.toml")
signal.signal(signal.SIGTERM, lambda s, f: agent.stop())
signal.signal(signal.SIGINT, lambda s, f: agent.stop())
agent.start()
  

Complete Usage Example


import time
from pyv_edge_agent.main import EdgeAgent

agent = EdgeAgent(config_path="edge_config.toml")
agent.start()

try:
    while agent.is_running:
        # Poll health every 10 seconds
        h = agent.health()
        print(f"Running={h['running']}, Pending={h['cloud_pending']}")
        time.sleep(10)
except KeyboardInterrupt:
    pass
finally:
    agent.stop()