edge 15 min read

Custom Cloud Endpoints

Building custom cloud receivers for Pyvorin Edge: expected JSON payload format, Bearer token authentication, batch schema, and webhook handler examples in Flask and FastAPI.

Published Jun 2, 2026

Beyond Managed Ingestion

Not every deployment sends data to Pyvorin's managed cloud. Enterprises with existing data lakes, regulated industries with air-gapped analysis clusters, and OEMs white-labelling the Edge Runtime all need to receive batches on their own infrastructure. The good news is that the HTTPCloudUploader speaks plain HTTPS + JSON. Any server that can accept a POST request and parse JSON can be a Pyvorin Edge receiver.

This article documents the exact payload format, authentication scheme, and response codes that the uploader expects. It then provides complete, production-ready webhook handlers in Flask and FastAPI that you can deploy behind nginx or a cloud load balancer within minutes.

Expected JSON Payload Format

When HTTPCloudUploader.post_batch(items) is called, it serialises the following envelope:


{
  "batch_id": "pyv-1717000000000",
  "timestamp": 1717000000.123,
  "count": 3,
  "items": [
    {
      "sensor_name": "temp.warehouse.a",
      "timestamp": 1717000000.0,
      "value": 18.5,
      "unit": "celsius",
      "metadata": {"zone": "north", "floor": "2"}
    },
    {
      "sensor_name": "pressure.hydraulic",
      "timestamp": 1717000001.0,
      "value": 101325.0,
      "unit": "pascal",
      "metadata": {"safety_zone": "A"}
    },
    {
      "sensor_name": "motion.lobby",
      "timestamp": 1717000002.0,
      "value": 1.0,
      "unit": "count",
      "metadata": {}
    }
  ]
}
  

Field semantics:

  • batch_id — Unique identifier for idempotency. Derived from millisecond timestamp. Format: pyv-{milliseconds_since_epoch}.
  • timestamp — Unix epoch seconds (with fractional precision) when the batch was constructed on the edge device.
  • count — Redundant count of items for quick validation.
  • items — Array of dictionaries. Each dictionary is typically the output of SensorReading.to_dict() or a raw event dict that has passed through the privacy engine.

The payload is sent with Content-Type: application/json and a User-Agent: Pyvorin-Edge/1.0 header. The body is UTF-8 encoded.

Authentication: Bearer Token

The uploader supports API key authentication via the Authorization: Bearer {token} header. If api_key is non-empty when the HTTPCloudUploader is constructed, the header is added automatically. Your receiver must validate this token before processing the payload.


# Excerpt from uploader.py
headers = {
    "Content-Type": "application/json",
    "User-Agent": "Pyvorin-Edge/1.0",
}
if self.api_key:
    headers["Authorization"] = f"Bearer {self.api_key}"
  

Token management recommendations:

  • Use long random strings (256 bits / 43 base64url characters minimum).
  • Store tokens in a database or secret manager, not in source code.
  • Support multiple active tokens so that fleet rotation does not cause a global outage.
  • Return 401 Unauthorized for missing or invalid tokens, and 403 Forbidden if the token is valid but does not have permission for the requested endpoint.

Response Codes and Retry Behaviour

The uploader's retry logic is driven entirely by HTTP status:

Status Uploader Behaviour Receiver Semantics
2xx Return True; items are acknowledged. Batch accepted and persisted. Safe to deduplicate by batch_id.
400 Bad Request Return False after 3 attempts. Items are nacked. Payload malformed or schema violation. Client will retry with same payload, so do not partially process.
401 / 403 Return False after 3 attempts. Items are nacked. Authentication or authorisation failure. Client may never succeed without operator intervention.
429 Too Many Requests Return False after 3 attempts. Items are nacked. Rate limit exceeded. Include Retry-After header if possible; the uploader does not parse it yet, but future versions may.
5xx Server Error Return False after 3 attempts. Items are nacked. Transient server failure. Client will retry with exponential backoff.
Network failure (0) Return False after 3 attempts. Items are nacked. DNS, TCP, or TLS failure. Client treats as transient.

Flask Webhook Handler

Below is a minimal but production-ready Flask receiver. It validates the Bearer token, checks the payload shape, deduplicates by batch_id, and writes items to a local SQLite database for downstream processing.


import sqlite3
import json
from functools import wraps
from flask import Flask, request, jsonify

app = Flask(__name__)

# In production, load from environment or secret vault
VALID_TOKENS = {"pyv_live_abc123xyz", "pyv_live_backup_456"}

DB_PATH = "edge_ingest.db"


def init_db():
    conn = sqlite3.connect(DB_PATH)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS batches (
            batch_id TEXT PRIMARY KEY,
            received_at REAL NOT NULL,
            payload_json TEXT NOT NULL
        )
    """)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS items (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            batch_id TEXT NOT NULL,
            sensor_name TEXT NOT NULL,
            timestamp REAL NOT NULL,
            value REAL,
            unit TEXT,
            metadata_json TEXT
        )
    """)
    conn.commit()
    conn.close()


def require_auth(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        auth = request.headers.get("Authorization", "")
        if not auth.startswith("Bearer "):
            return jsonify({"error": "Missing bearer token"}), 401
        token = auth.split(None, 1)[1]
        if token not in VALID_TOKENS:
            return jsonify({"error": "Invalid token"}), 403
        return f(*args, **kwargs)
    return decorated


@app.route("/v1/ingest", methods=["POST"])
@require_auth
def ingest():
    data = request.get_json(force=True, silent=False)
    if not isinstance(data, dict):
        return jsonify({"error": "Expected JSON object"}), 400

    batch_id = data.get("batch_id")
    if not batch_id:
        return jsonify({"error": "Missing batch_id"}), 400

    conn = sqlite3.connect(DB_PATH)
    try:
        # Idempotency check
        row = conn.execute(
            "SELECT 1 FROM batches WHERE batch_id = ?", (batch_id,)
        ).fetchone()
        if row:
            return jsonify({"status": "duplicate", "batch_id": batch_id}), 200

        # Persist batch envelope
        conn.execute(
            "INSERT INTO batches (batch_id, received_at, payload_json) VALUES (?, ?, ?)",
            (batch_id, data.get("timestamp"), json.dumps(data)),
        )

        # Persist individual items
        for item in data.get("items", []):
            conn.execute(
                """
                INSERT INTO items (batch_id, sensor_name, timestamp, value, unit, metadata_json)
                VALUES (?, ?, ?, ?, ?, ?)
                """,
                (
                    batch_id,
                    item.get("sensor_name", ""),
                    item.get("timestamp", 0.0),
                    item.get("value"),
                    item.get("unit", ""),
                    json.dumps(item.get("metadata", {})),
                ),
            )

        conn.commit()
        return jsonify({"status": "ok", "batch_id": batch_id, "count": len(data.get("items", []))}), 201
    finally:
        conn.close()


@app.route("/v1/health", methods=["GET"])
def health():
    return jsonify({"status": "healthy"}), 200


if __name__ == "__main__":
    init_db()
    app.run(host="0.0.0.0", port=5000)
  

FastAPI Webhook Handler

FastAPI offers automatic request validation, async handling, and OpenAPI documentation. The following example uses Pydantic models to enforce the expected schema at the HTTP boundary.


import sqlite3
import json
from typing import List, Dict, Any
from fastapi import FastAPI, HTTPException, Depends, Header, status
from pydantic import BaseModel

app = FastAPI(title="Pyvorin Edge Receiver")

VALID_TOKENS = {"pyv_live_abc123xyz", "pyv_live_backup_456"}
DB_PATH = "edge_ingest.db"


class SensorItem(BaseModel):
    sensor_name: str
    timestamp: float
    value: float
    unit: str = ""
    metadata: Dict[str, Any] = {}


class BatchEnvelope(BaseModel):
    batch_id: str
    timestamp: float
    count: int
    items: List[SensorItem]


def get_db():
    conn = sqlite3.connect(DB_PATH)
    try:
        yield conn
    finally:
        conn.close()


def verify_token(authorization: str = Header(...)):
    if not authorization.startswith("Bearer "):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing bearer token")
    token = authorization.split(None, 1)[1]
    if token not in VALID_TOKENS:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token")
    return token


@app.post("/v1/ingest", status_code=201)
def ingest_batch(batch: BatchEnvelope, token: str = Depends(verify_token), conn: sqlite3.Connection = Depends(get_db)):
    # Idempotency
    row = conn.execute("SELECT 1 FROM batches WHERE batch_id = ?", (batch.batch_id,)).fetchone()
    if row:
        return {"status": "duplicate", "batch_id": batch.batch_id}

    conn.execute(
        "INSERT INTO batches (batch_id, received_at, payload_json) VALUES (?, ?, ?)",
        (batch.batch_id, batch.timestamp, batch.json()),
    )
    for item in batch.items:
        conn.execute(
            """
            INSERT INTO items (batch_id, sensor_name, timestamp, value, unit, metadata_json)
            VALUES (?, ?, ?, ?, ?, ?)
            """,
            (batch.batch_id, item.sensor_name, item.timestamp, item.value, item.unit, json.dumps(item.metadata)),
        )
    conn.commit()
    return {"status": "ok", "batch_id": batch.batch_id, "count": len(batch.items)}


@app.get("/v1/health")
def health():
    return {"status": "healthy"}
  

Batch Schema Validation

Regardless of framework, your receiver should validate the following invariants before persisting data:

  • batch_id is present and is a string.
  • count equals len(items). Mismatch suggests truncation or corruption.
  • Every item has a sensor_name and timestamp. These are the minimum fields required for downstream time-series indexing.
  • value is numeric (int or float). Non-numeric values may indicate a schema version mismatch.

If validation fails, return 400 Bad Request with a descriptive error message. Do not return 2xx; the uploader will treat 2xx as success and drop the items from the local queue, causing permanent data loss if your receiver rejected them silently.

Scaling Considerations

  • Use connection pooling. Both Flask and FastAPI examples above open a new SQLite connection per request. For production traffic, switch to PostgreSQL or MySQL with SQLAlchemy connection pooling.
  • Accept and queue. Do not perform heavy synchronous processing inside the HTTP handler. Write the batch to a message queue (Redis Streams, RabbitMQ, AWS SQS) and return 201 immediately. This keeps the uploader's timeout window short and prevents queue backpressure on the edge device.
  • Implement backpressure signalling. If your downstream pipeline is saturated, return 503 Service Unavailable or 429 Too Many Requests. The edge device will retry with exponential backoff, giving your pipeline time to recover.

Summary

Building a custom cloud receiver for Pyvorin Edge is straightforward because the wire protocol is intentionally simple: HTTPS POST, JSON envelope, Bearer token auth, and standard HTTP status codes. By validating the schema, deduplicating by batch_id, and returning precise status codes, you create a reliable contract that lets the edge device make correct decisions about retries, acknowledgements, and failure handling. The Flask and FastAPI examples in this article are drop-in starting points that you can extend with your own persistence, analytics, and alerting layers.