edge 13 min read

HTTPCloudUploader Configuration

Complete guide to the HTTPCloudUploader class: endpoint configuration, API key management, timeouts, retries, exponential backoff, and the post_batch() method with production-ready code examples.

Published Jun 2, 2026

From Edge to Cloud

After sensor readings have been ingested, windowed, rule-evaluated, and privacy-filtered, the final stage of the pipeline is egress: moving the sanitised data to a cloud endpoint where it can be aggregated, visualised, and acted upon. The Pyvorin Edge Runtime provides the HTTPCloudUploader class in pyv_edge_agent/cloud_sync/uploader.py for this purpose. It is a thin, reliable wrapper around Python's standard-library urllib that adds batching, authentication, and exponential backoff — everything you need and nothing you do not, keeping the dependency footprint minimal for ARM64 gateways.

The HTTPCloudUploader Class

HTTPCloudUploader is designed to be instantiated once per agent lifetime and shared across threads. It has three configuration parameters:

  • endpoint: str — The HTTPS URL to which batches are POSTed. If empty, uploads are silently skipped.
  • api_key: str = "" — An optional Bearer token sent in the Authorization header.
  • timeout: float = 30.0 — Per-request socket timeout in seconds.

from pyv_edge_agent.cloud_sync.uploader import HTTPCloudUploader

uploader = HTTPCloudUploader(
    endpoint="https://api.pyvorin.com/v1/ingest",
    api_key="pyv_live_abc123xyz",
    timeout=30.0,
)
  

The post_batch() Method

post_batch(items) accepts a list of dictionaries (typically the to_dict() output from SensorReading objects, or the payloads dequeued from CloudSyncQueue) and returns a boolean indicating success.


from pyv_edge_agent.types import SensorReading

readings = [
    SensorReading("temp.warehouse", 1717000000.0, 18.5, "celsius", {"zone": "A"}),
    SensorReading("pressure.hydraulic", 1717000001.0, 101325.0, "pascal", {"zone": "A"}),
]

# Convert to dicts for serialisation
items = [r.to_dict() for r in readings]

success = uploader.post_batch(items)
if success:
    print("Batch uploaded successfully.")
else:
    print("Upload failed — items will be retried by the queue.")
  

The method constructs a JSON payload with the following envelope:


{
  "batch_id": "pyv-1717000000000",
  "timestamp": 1717000000.123,
  "count": 2,
  "items": [
    {"sensor_name": "temp.warehouse", "timestamp": 1717000000.0, ...},
    {"sensor_name": "pressure.hydraulic", "timestamp": 1717000001.0, ...}
  ]
}
  

The batch_id is derived from the current millisecond timestamp, making it monotonically increasing and roughly unique per device. The cloud receiver can use it for idempotency: if two batches share the same batch_id (for example, because a network timeout caused the client to retry), the server can deduplicate safely.

Exponential Backoff and Retries

Network conditions at the edge are unpredictable. A cellular backhaul may drop packets during rush hour, or a warehouse Wi-Fi access point may reboot for firmware updates. The uploader implements a simple but effective retry strategy:

  • 3 attempts total. The first attempt is immediate.
  • Exponential delay. After attempt 1, wait 1 second. After attempt 2, wait 2 seconds. After attempt 3, give up.
  • Jitter is not applied. In the uploader itself, the delay is deterministic (sleep_time = 2 ** attempt). For deployments with many devices, you may want to add jitter to avoid thundering-herd effects. The ExponentialBackoff class in cloud_sync/retry.py provides this; see the Queue Management article for integration details.

# Excerpt from uploader.py — retry loop
for attempt in range(3):
    try:
        with urllib.request.urlopen(req, timeout=self.timeout) as resp:
            if 200 <= resp.status < 300:
                logger.info("Uploaded batch %s (%d items) — HTTP %d",
                            payload["batch_id"], len(items), resp.status)
                return True
            else:
                logger.warning("Upload failed — HTTP %d", resp.status)
    except urllib.error.HTTPError as exc:
        logger.warning("Upload attempt %d failed — HTTP %d: %s",
                       attempt + 1, exc.code, exc.reason)
    except Exception as exc:
        logger.warning("Upload attempt %d failed — %s", attempt + 1, exc)

    if attempt < 2:
        sleep_time = 2 ** attempt
        logger.info("Retrying in %.1fs...", sleep_time)
        time.sleep(sleep_time)

logger.error("Upload failed after 3 attempts")
return False
  

Success and Failure Handling

post_batch() returns True only if an HTTP 2xx response is received on one of the three attempts. Any other outcome — including 4xx client errors, 5xx server errors, DNS failures, TCP timeouts, or SSL handshake errors — returns False. The caller (typically CloudSyncQueue.maybe_flush()) is responsible for deciding what to do with a failed batch.

This design separates concerns cleanly:

  • The uploader knows how to speak HTTP and how to retry transient failures.
  • The queue knows how to persist items, schedule reattempts, and manage backpressure.

When post_batch() returns False, the queue calls nack() on the items, incrementing their retry_count and setting next_retry_at into the future. The items remain in SQLite until the next flush cycle or until they exceed their TTL and are garbage-collected.

Configuration in config.toml

The Edge Runtime configuration schema includes a top-level cloud section that maps directly to the uploader's constructor arguments:


[cloud]
enabled = true
endpoint = "https://api.pyvorin.com/v1/ingest"
api_key = "${PYV_API_KEY}"
batch_size = 100
flush_interval_seconds = 60.0
  

Notice the use of ${PYV_API_KEY}. The configuration loader performs environment variable substitution for any value matching ${VAR} or ${VAR:-default}. This keeps secrets out of your configuration repository.


from pyv_edge_agent.config import Config
from pyv_edge_agent.cloud_sync.uploader import HTTPCloudUploader

cfg = Config.from_file("/etc/pyvorin-edge/config.toml")

uploader = HTTPCloudUploader(
    endpoint=cfg.get("cloud", "endpoint", ""),
    api_key=cfg.get("cloud", "api_key", ""),
    timeout=30.0,
)
  

Using HTTPCloudClient for Advanced Control

If you need lower-level control — custom headers, per-request timeouts, or raw response inspection — use HTTPCloudClient in cloud_sync/http_client.py. It exposes post() and post_batch() methods that return a detailed response dictionary instead of a simple boolean.


from pyv_edge_agent.cloud_sync.http_client import HTTPCloudClient

client = HTTPCloudClient(default_headers={"X-Device-ID": "pi5-warehouse-42"})

result = client.post_batch(
    url="https://api.pyvorin.com/v1/ingest",
    payloads=[{"sensor_name": "temp.1", "value": 22.0}],
    headers={"Authorization": "Bearer pyv_live_abc123"},
    timeout=15,
)

print(result["status"])   # HTTP status code, or 0 for network failure
print(result["data"])     # Parsed JSON response body
print(result["error"])    # Human-readable error string, if any
  

Production Checklist

  • Use HTTPS only. The uploader will accept HTTP URLs, but doing so transmits your API key and sensor data in plaintext. Configure TLS 1.2+ on your receiver and pin the certificate if possible.
  • Rotate API keys. Issue short-lived tokens (e.g., 30-day expiry) and refresh them via a secure side channel. Never embed long-lived keys in device images.
  • Monitor upload latency. If post_batch() consistently triggers retries, check whether your batch size is too large for the available bandwidth. A batch of 100 readings is typically 20–40 KB of JSON; on a 2G connection, that may take several seconds.
  • Handle 4xx gracefully. A 400 Bad Request usually indicates a schema mismatch (e.g., you added a new metadata field that the server does not expect). Do not retry 4xx errors indefinitely; log them and alert an operator.
  • Set conservative timeouts. The default 30 seconds is appropriate for broadband, but on satellite or LPWAN links you may need 120 seconds or more. Conversely, on a wired LAN, 5 seconds is plenty and reduces queue backpressure during transient outages.

Summary

HTTPCloudUploader is the final stage of the Pyvorin Edge pipeline. It wraps urllib in a minimal, zero-dependency interface that supports Bearer token authentication, JSON batch envelopes, and exponential backoff with three retry attempts. Configuration is declarative via TOML with environment-variable substitution for secrets. For advanced use cases, HTTPCloudClient provides full response introspection. By separating upload semantics from queue persistence, the runtime gives you reliable, observable, and secure cloud egress without bloating the agent footprint.