HTTPCloudUploader Configuration
Complete guide to the HTTPCloudUploader class: endpoint configuration, API key management, timeouts, retries, exponential backoff, and the post_batch() method with production-ready code examples.
Published Jun 2, 2026
From Edge to Cloud
After sensor readings have been ingested, windowed, rule-evaluated, and privacy-filtered, the
final stage of the pipeline is egress: moving the sanitised data to a cloud endpoint where it
can be aggregated, visualised, and acted upon. The Pyvorin Edge Runtime provides the
HTTPCloudUploader class in pyv_edge_agent/cloud_sync/uploader.py for
this purpose. It is a thin, reliable wrapper around Python's standard-library
urllib that adds batching, authentication, and exponential backoff — everything you
need and nothing you do not, keeping the dependency footprint minimal for ARM64 gateways.
The HTTPCloudUploader Class
HTTPCloudUploader is designed to be instantiated once per agent lifetime and shared
across threads. It has three configuration parameters:
endpoint: str— The HTTPS URL to which batches are POSTed. If empty, uploads are silently skipped.api_key: str = ""— An optional Bearer token sent in theAuthorizationheader.timeout: float = 30.0— Per-request socket timeout in seconds.
from pyv_edge_agent.cloud_sync.uploader import HTTPCloudUploader
uploader = HTTPCloudUploader(
endpoint="https://api.pyvorin.com/v1/ingest",
api_key="pyv_live_abc123xyz",
timeout=30.0,
)
The post_batch() Method
post_batch(items) accepts a list of dictionaries (typically the
to_dict() output from SensorReading objects, or the payloads
dequeued from CloudSyncQueue) and returns a boolean indicating success.
from pyv_edge_agent.types import SensorReading
readings = [
SensorReading("temp.warehouse", 1717000000.0, 18.5, "celsius", {"zone": "A"}),
SensorReading("pressure.hydraulic", 1717000001.0, 101325.0, "pascal", {"zone": "A"}),
]
# Convert to dicts for serialisation
items = [r.to_dict() for r in readings]
success = uploader.post_batch(items)
if success:
print("Batch uploaded successfully.")
else:
print("Upload failed — items will be retried by the queue.")
The method constructs a JSON payload with the following envelope:
{
"batch_id": "pyv-1717000000000",
"timestamp": 1717000000.123,
"count": 2,
"items": [
{"sensor_name": "temp.warehouse", "timestamp": 1717000000.0, ...},
{"sensor_name": "pressure.hydraulic", "timestamp": 1717000001.0, ...}
]
}
The batch_id is derived from the current millisecond timestamp, making it
monotonically increasing and roughly unique per device. The cloud receiver can use it for
idempotency: if two batches share the same batch_id (for example, because a
network timeout caused the client to retry), the server can deduplicate safely.
Exponential Backoff and Retries
Network conditions at the edge are unpredictable. A cellular backhaul may drop packets during rush hour, or a warehouse Wi-Fi access point may reboot for firmware updates. The uploader implements a simple but effective retry strategy:
- 3 attempts total. The first attempt is immediate.
- Exponential delay. After attempt 1, wait 1 second. After attempt 2, wait 2 seconds. After attempt 3, give up.
- Jitter is not applied. In the uploader itself, the delay is deterministic
(
sleep_time = 2 ** attempt). For deployments with many devices, you may want to add jitter to avoid thundering-herd effects. TheExponentialBackoffclass incloud_sync/retry.pyprovides this; see the Queue Management article for integration details.
# Excerpt from uploader.py — retry loop
for attempt in range(3):
try:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
if 200 <= resp.status < 300:
logger.info("Uploaded batch %s (%d items) — HTTP %d",
payload["batch_id"], len(items), resp.status)
return True
else:
logger.warning("Upload failed — HTTP %d", resp.status)
except urllib.error.HTTPError as exc:
logger.warning("Upload attempt %d failed — HTTP %d: %s",
attempt + 1, exc.code, exc.reason)
except Exception as exc:
logger.warning("Upload attempt %d failed — %s", attempt + 1, exc)
if attempt < 2:
sleep_time = 2 ** attempt
logger.info("Retrying in %.1fs...", sleep_time)
time.sleep(sleep_time)
logger.error("Upload failed after 3 attempts")
return False
Success and Failure Handling
post_batch() returns True only if an HTTP 2xx response is received on
one of the three attempts. Any other outcome — including 4xx client errors, 5xx server errors,
DNS failures, TCP timeouts, or SSL handshake errors — returns False. The caller
(typically CloudSyncQueue.maybe_flush()) is responsible for deciding what to do
with a failed batch.
This design separates concerns cleanly:
- The uploader knows how to speak HTTP and how to retry transient failures.
- The queue knows how to persist items, schedule reattempts, and manage backpressure.
When post_batch() returns False, the queue calls nack()
on the items, incrementing their retry_count and setting
next_retry_at into the future. The items remain in SQLite until the next flush
cycle or until they exceed their TTL and are garbage-collected.
Configuration in config.toml
The Edge Runtime configuration schema includes a top-level cloud section that maps
directly to the uploader's constructor arguments:
[cloud]
enabled = true
endpoint = "https://api.pyvorin.com/v1/ingest"
api_key = "${PYV_API_KEY}"
batch_size = 100
flush_interval_seconds = 60.0
Notice the use of ${PYV_API_KEY}. The configuration loader performs environment
variable substitution for any value matching ${VAR} or ${VAR:-default}.
This keeps secrets out of your configuration repository.
from pyv_edge_agent.config import Config
from pyv_edge_agent.cloud_sync.uploader import HTTPCloudUploader
cfg = Config.from_file("/etc/pyvorin-edge/config.toml")
uploader = HTTPCloudUploader(
endpoint=cfg.get("cloud", "endpoint", ""),
api_key=cfg.get("cloud", "api_key", ""),
timeout=30.0,
)
Using HTTPCloudClient for Advanced Control
If you need lower-level control — custom headers, per-request timeouts, or raw response
inspection — use HTTPCloudClient in cloud_sync/http_client.py. It
exposes post() and post_batch() methods that return a detailed
response dictionary instead of a simple boolean.
from pyv_edge_agent.cloud_sync.http_client import HTTPCloudClient
client = HTTPCloudClient(default_headers={"X-Device-ID": "pi5-warehouse-42"})
result = client.post_batch(
url="https://api.pyvorin.com/v1/ingest",
payloads=[{"sensor_name": "temp.1", "value": 22.0}],
headers={"Authorization": "Bearer pyv_live_abc123"},
timeout=15,
)
print(result["status"]) # HTTP status code, or 0 for network failure
print(result["data"]) # Parsed JSON response body
print(result["error"]) # Human-readable error string, if any
Production Checklist
- Use HTTPS only. The uploader will accept HTTP URLs, but doing so transmits your API key and sensor data in plaintext. Configure TLS 1.2+ on your receiver and pin the certificate if possible.
- Rotate API keys. Issue short-lived tokens (e.g., 30-day expiry) and refresh them via a secure side channel. Never embed long-lived keys in device images.
- Monitor upload latency. If
post_batch()consistently triggers retries, check whether your batch size is too large for the available bandwidth. A batch of 100 readings is typically 20–40 KB of JSON; on a 2G connection, that may take several seconds. - Handle 4xx gracefully. A 400 Bad Request usually indicates a schema mismatch (e.g., you added a new metadata field that the server does not expect). Do not retry 4xx errors indefinitely; log them and alert an operator.
- Set conservative timeouts. The default 30 seconds is appropriate for broadband, but on satellite or LPWAN links you may need 120 seconds or more. Conversely, on a wired LAN, 5 seconds is plenty and reduces queue backpressure during transient outages.
Summary
HTTPCloudUploader is the final stage of the Pyvorin Edge pipeline. It wraps
urllib in a minimal, zero-dependency interface that supports Bearer token
authentication, JSON batch envelopes, and exponential backoff with three retry attempts.
Configuration is declarative via TOML with environment-variable substitution for secrets.
For advanced use cases, HTTPCloudClient provides full response introspection.
By separating upload semantics from queue persistence, the runtime gives you reliable,
observable, and secure cloud egress without bloating the agent footprint.