edge 17 min read

Queue Management and Backpressure

Deep dive into CloudSyncQueue: SQLite-backed persistence, enqueue and dequeue semantics, maybe_flush with optional uploader, queue depth monitoring, batch sizing, and ack/nack handling.

Published Jun 2, 2026

The Store-and-Forward Queue

Network connectivity at the edge is not a constant; it is a probabilistic resource that vanishes without warning. A factory floor may lose its primary fibre link during maintenance, a delivery van may drive through a cellular dead zone, or a remote weather station may be isolated by a snowstorm for days. In all of these scenarios, the Pyvorin Edge Runtime must continue collecting sensor data, apply privacy filtering, and store the results safely until the network returns. The component responsible for this is CloudSyncQueue in pyv_edge_agent/cloud_sync/queue.py.

CloudSyncQueue is a persistent, SQLite-backed store-and-forward queue. It implements priority-ordered dequeuing, time-to-live (TTL) expiration, retry counting with backoff, and atomic acknowledgement semantics. This article explains every public method, the underlying schema, the threading model, and how to tune the queue for your workload.

Schema and Persistence

The queue stores items in a single SQLite table named sync_queue:


CREATE TABLE IF NOT EXISTS sync_queue (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    payload TEXT NOT NULL,
    priority INTEGER NOT NULL,
    created_at REAL NOT NULL,
    ttl_seconds INTEGER NOT NULL,
    retry_count INTEGER NOT NULL DEFAULT 0,
    next_retry_at REAL NOT NULL DEFAULT 0
);

CREATE INDEX IF NOT EXISTS idx_sync_queue_priority
    ON sync_queue(priority, next_retry_at);
  

The payload column stores JSON-serialised dictionaries. The priority column uses integer values from the Priority enum: CRITICAL=1, ANOMALY=2, TELEMETRY=3, LOGS=4. Lower integers are dequeued first. The composite index on (priority, next_retry_at) ensures that dequeue() and peek() run in O(log n) time even when the queue depth reaches hundreds of thousands of items.

enqueue(): Adding Items

enqueue() is the write path. It accepts a dictionary payload, a priority level, and a TTL, and returns the auto-generated row ID.


from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue, Priority
from pyv_edge_agent.types import SensorReading

queue = CloudSyncQueue(db_path="/var/lib/pyvorin/sync_queue.db")

reading = SensorReading(
    sensor_name="temp.cold_chain",
    timestamp=1717000000.0,
    value=2.4,
    unit="celsius",
    metadata={"zone": "A"},
)

item_id = queue.enqueue(
    payload=reading.to_dict(),
    priority=Priority.TELEMETRY,
    ttl_seconds=86400,  # retain for 24 hours
)
print(f"Enqueued item {item_id}")
  

The next_retry_at field is initialised to the current time, meaning the item is immediately eligible for dequeuing. If you want to delay the first transmission attempt — for example, to batch items over a five-minute window — you can set next_retry_at manually by subclassing CloudSyncQueue and overriding enqueue(), or by post-processing the row with a raw SQL UPDATE.

dequeue(): Retrieving Batches

dequeue(batch_size=10) returns the next batch_size items that are eligible for transmission. Eligibility is defined by two conditions:

  • next_retry_at <= now — the item's retry delay has expired.
  • created_at + ttl_seconds > now — the item has not exceeded its TTL.

Items are ordered by priority ASC, next_retry_at ASC. This guarantees that a critical alarm will always be transmitted before a routine temperature reading, even if the temperature reading was enqueued earlier.


items = queue.dequeue(batch_size=50)
for item in items:
    print(f"ID={item.id} priority={item.priority.name} "
          f"retries={item.retry_count}")
  

Note that dequeue() does not remove items from the database. It merely reads them. Removal happens only after successful transmission, via ack().

ack() and nack(): Completion Semantics

The queue uses explicit acknowledgement, similar to AMQP or MQTT QoS 1:

  • ack(ids) deletes the specified rows permanently. Call this after the uploader reports success.
  • nack(ids, retry_delay_seconds=60.0) increments retry_count and sets next_retry_at = now + retry_delay_seconds. Call this after a failed upload.

from pyv_edge_agent.cloud_sync.uploader import HTTPCloudUploader

uploader = HTTPCloudUploader(endpoint="https://api.pyvorin.com/v1/ingest")

items = queue.dequeue(batch_size=10)
if items:
    payloads = [item.payload for item in items]
    success = uploader.post_batch(payloads)
    if success:
        queue.ack([item.id for item in items])
    else:
        queue.nack([item.id for item in items], retry_delay_seconds=120.0)
  

The retry_count field is not currently capped by the queue itself. If you want to dead-letter items after a maximum number of retries, wrap the nack() call in a conditional that checks item.retry_count:


MAX_RETRIES = 5

items = queue.dequeue(batch_size=10)
# ... attempt upload ...
if not success:
    to_retry = []
    to_dead_letter = []
    for item in items:
        if item.retry_count < MAX_RETRIES:
            to_retry.append(item.id)
        else:
            to_dead_letter.append(item)

    if to_retry:
        queue.nack(to_retry, retry_delay_seconds=300.0)

    for item in to_dead_letter:
        write_to_local_log(item.payload)  # preserve for forensic analysis
        queue.ack([item.id])  # remove from queue so it does not block others
  

maybe_flush(): Automated Batching

maybe_flush(uploader=None) is the high-level convenience method that most agents call from their main loop. It checks the total queue depth; if non-zero, it dequeues all pending items, passes them to the uploader if one is provided, and then acknowledges or negative-acknowledges the entire batch.


from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue
from pyv_edge_agent.cloud_sync.uploader import HTTPCloudUploader

queue = CloudSyncQueue("sync_queue.db")
uploader = HTTPCloudUploader("https://api.pyvorin.com/v1/ingest")

# In your agent's main loop, call every N seconds
flushed = queue.maybe_flush(uploader=uploader)
print(f"Flushed {flushed} items this cycle")
  

If uploader is None, maybe_flush() still dequeues and acknowledges the items. This is useful during shutdown or testing, when you want to drain the queue without actually transmitting data. After a successful flush, two counters are updated:

  • last_flush_time — Unix timestamp of the most recent successful flush.
  • messages_sent_today — Incremented by the number of items flushed. Reset this at midnight via reset_daily_counters() to enforce daily egress quotas.

Queue Depth Monitoring

Operational visibility into the queue is essential for detecting network outages, upstream saturation, or runaway sensor polling. CloudSyncQueue provides three introspection methods:


# Simple depth count
depth = queue.pending_count()
print(f"Queue depth: {depth}")

# Full statistics
stats = queue.get_stats()
print(f"Depth: {stats['depth']}")
print(f"Oldest item: {stats['oldest_item_timestamp']}")
print(f"Total retries across all items: {stats['total_retries']}")
print(f"Items currently retrying: {stats['retrying_items']}")
  

A healthy queue has low depth, a recent oldest item, and few retrying items. If depth grows monotonically over hours, your uploader is failing faster than items are expiring. If retrying_items is high, your network link is flaky or your upstream server is returning 5xx errors.

Batch Sizing

The optimal batch size depends on three variables: link bandwidth, upstream latency, and the urgency of the data. There is no universal constant, but the following guidelines have proven effective in production:

  • High-bandwidth, low-latency links (fibre, 5G): batch sizes of 500–1000 items maximise throughput and minimise per-request HTTP overhead.
  • Low-bandwidth links (2G, LoRaWAN backhaul): batch sizes of 10–50 items prevent timeouts and reduce the blast radius of a single failed upload.
  • Critical alarms: bypass batching entirely. Enqueue critical items with Priority.CRITICAL and call dequeue(batch_size=1) in a dedicated fast path that uploads immediately.

What Happens on Upload Failure

When HTTPCloudUploader.post_batch() returns False, maybe_flush() calls nack() on every item in the batch. The items remain in SQLite with an incremented retry_count and a future next_retry_at. They will not be dequeued again until that time passes, which prevents tight-loop retry storms that could exhaust battery or bandwidth.

If the queue grows large enough to fill the device's storage, you have two escalation options:

  1. TTL expiration. Items older than their TTL are silently skipped by dequeue(). They still occupy disk space, but they will never be transmitted. Run a periodic purge job (or extend CloudSyncQueue with a purge_expired() method) to reclaim space.
  2. Priority shedding. When disk usage exceeds a threshold, delete all items with priority >= Priority.LOGS while preserving CRITICAL and ANOMALY items.

Tuning Parameters

Parameter Default When to Increase When to Decrease
ttl_seconds 86400 (24 h) Long outages expected (remote, maritime) Short compliance windows (GDPR data minimisation)
batch_size 10 Fat pipe, large payloads Thin pipe, latency-sensitive alarms
retry_delay_seconds 60 Intermittent connectivity (cellular handover) Fast-recovery LAN environments

Summary

CloudSyncQueue transforms unreliable network connectivity into a manageable operational concern. By persisting every item in SQLite with priority ordering, TTL expiration, and explicit acknowledgement, it ensures that no data is lost to transient failures and that critical alarms are never blocked behind routine telemetry. The maybe_flush() method provides a one-line integration with HTTPCloudUploader, while get_stats() and pending_count() give you the observability you need to detect and diagnose backpressure before it becomes a data-loss incident.