Queue Management and Backpressure
Deep dive into CloudSyncQueue: SQLite-backed persistence, enqueue and dequeue semantics, maybe_flush with optional uploader, queue depth monitoring, batch sizing, and ack/nack handling.
Published Jun 2, 2026
The Store-and-Forward Queue
Network connectivity at the edge is not a constant; it is a probabilistic resource that
vanishes without warning. A factory floor may lose its primary fibre link during maintenance,
a delivery van may drive through a cellular dead zone, or a remote weather station may be
isolated by a snowstorm for days. In all of these scenarios, the Pyvorin Edge Runtime must
continue collecting sensor data, apply privacy filtering, and store the results safely until
the network returns. The component responsible for this is CloudSyncQueue in
pyv_edge_agent/cloud_sync/queue.py.
CloudSyncQueue is a persistent, SQLite-backed store-and-forward queue. It
implements priority-ordered dequeuing, time-to-live (TTL) expiration, retry counting with
backoff, and atomic acknowledgement semantics. This article explains every public method,
the underlying schema, the threading model, and how to tune the queue for your workload.
Schema and Persistence
The queue stores items in a single SQLite table named sync_queue:
CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
payload TEXT NOT NULL,
priority INTEGER NOT NULL,
created_at REAL NOT NULL,
ttl_seconds INTEGER NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
next_retry_at REAL NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_sync_queue_priority
ON sync_queue(priority, next_retry_at);
The payload column stores JSON-serialised dictionaries. The
priority column uses integer values from the Priority enum:
CRITICAL=1, ANOMALY=2, TELEMETRY=3, LOGS=4.
Lower integers are dequeued first. The composite index on (priority, next_retry_at)
ensures that dequeue() and peek() run in O(log n) time even when the
queue depth reaches hundreds of thousands of items.
enqueue(): Adding Items
enqueue() is the write path. It accepts a dictionary payload, a priority level,
and a TTL, and returns the auto-generated row ID.
from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue, Priority
from pyv_edge_agent.types import SensorReading
queue = CloudSyncQueue(db_path="/var/lib/pyvorin/sync_queue.db")
reading = SensorReading(
sensor_name="temp.cold_chain",
timestamp=1717000000.0,
value=2.4,
unit="celsius",
metadata={"zone": "A"},
)
item_id = queue.enqueue(
payload=reading.to_dict(),
priority=Priority.TELEMETRY,
ttl_seconds=86400, # retain for 24 hours
)
print(f"Enqueued item {item_id}")
The next_retry_at field is initialised to the current time, meaning the item is
immediately eligible for dequeuing. If you want to delay the first transmission attempt — for
example, to batch items over a five-minute window — you can set next_retry_at
manually by subclassing CloudSyncQueue and overriding enqueue(), or
by post-processing the row with a raw SQL UPDATE.
dequeue(): Retrieving Batches
dequeue(batch_size=10) returns the next batch_size items that are
eligible for transmission. Eligibility is defined by two conditions:
next_retry_at <= now— the item's retry delay has expired.created_at + ttl_seconds > now— the item has not exceeded its TTL.
Items are ordered by priority ASC, next_retry_at ASC. This guarantees that a
critical alarm will always be transmitted before a routine temperature reading, even if the
temperature reading was enqueued earlier.
items = queue.dequeue(batch_size=50)
for item in items:
print(f"ID={item.id} priority={item.priority.name} "
f"retries={item.retry_count}")
Note that dequeue() does not remove items from the database. It merely
reads them. Removal happens only after successful transmission, via ack().
ack() and nack(): Completion Semantics
The queue uses explicit acknowledgement, similar to AMQP or MQTT QoS 1:
ack(ids)deletes the specified rows permanently. Call this after the uploader reports success.nack(ids, retry_delay_seconds=60.0)incrementsretry_countand setsnext_retry_at = now + retry_delay_seconds. Call this after a failed upload.
from pyv_edge_agent.cloud_sync.uploader import HTTPCloudUploader
uploader = HTTPCloudUploader(endpoint="https://api.pyvorin.com/v1/ingest")
items = queue.dequeue(batch_size=10)
if items:
payloads = [item.payload for item in items]
success = uploader.post_batch(payloads)
if success:
queue.ack([item.id for item in items])
else:
queue.nack([item.id for item in items], retry_delay_seconds=120.0)
The retry_count field is not currently capped by the queue itself. If you want to
dead-letter items after a maximum number of retries, wrap the nack() call in a
conditional that checks item.retry_count:
MAX_RETRIES = 5
items = queue.dequeue(batch_size=10)
# ... attempt upload ...
if not success:
to_retry = []
to_dead_letter = []
for item in items:
if item.retry_count < MAX_RETRIES:
to_retry.append(item.id)
else:
to_dead_letter.append(item)
if to_retry:
queue.nack(to_retry, retry_delay_seconds=300.0)
for item in to_dead_letter:
write_to_local_log(item.payload) # preserve for forensic analysis
queue.ack([item.id]) # remove from queue so it does not block others
maybe_flush(): Automated Batching
maybe_flush(uploader=None) is the high-level convenience method that most agents
call from their main loop. It checks the total queue depth; if non-zero, it dequeues all
pending items, passes them to the uploader if one is provided, and then acknowledges or
negative-acknowledges the entire batch.
from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue
from pyv_edge_agent.cloud_sync.uploader import HTTPCloudUploader
queue = CloudSyncQueue("sync_queue.db")
uploader = HTTPCloudUploader("https://api.pyvorin.com/v1/ingest")
# In your agent's main loop, call every N seconds
flushed = queue.maybe_flush(uploader=uploader)
print(f"Flushed {flushed} items this cycle")
If uploader is None, maybe_flush() still dequeues and
acknowledges the items. This is useful during shutdown or testing, when you want to drain the
queue without actually transmitting data. After a successful flush, two counters are updated:
last_flush_time— Unix timestamp of the most recent successful flush.messages_sent_today— Incremented by the number of items flushed. Reset this at midnight viareset_daily_counters()to enforce daily egress quotas.
Queue Depth Monitoring
Operational visibility into the queue is essential for detecting network outages, upstream
saturation, or runaway sensor polling. CloudSyncQueue provides three introspection
methods:
# Simple depth count
depth = queue.pending_count()
print(f"Queue depth: {depth}")
# Full statistics
stats = queue.get_stats()
print(f"Depth: {stats['depth']}")
print(f"Oldest item: {stats['oldest_item_timestamp']}")
print(f"Total retries across all items: {stats['total_retries']}")
print(f"Items currently retrying: {stats['retrying_items']}")
A healthy queue has low depth, a recent oldest item, and few retrying items. If
depth grows monotonically over hours, your uploader is failing faster than items
are expiring. If retrying_items is high, your network link is flaky or your
upstream server is returning 5xx errors.
Batch Sizing
The optimal batch size depends on three variables: link bandwidth, upstream latency, and the urgency of the data. There is no universal constant, but the following guidelines have proven effective in production:
- High-bandwidth, low-latency links (fibre, 5G): batch sizes of 500–1000 items maximise throughput and minimise per-request HTTP overhead.
- Low-bandwidth links (2G, LoRaWAN backhaul): batch sizes of 10–50 items prevent timeouts and reduce the blast radius of a single failed upload.
- Critical alarms: bypass batching entirely. Enqueue critical items with
Priority.CRITICALand calldequeue(batch_size=1)in a dedicated fast path that uploads immediately.
What Happens on Upload Failure
When HTTPCloudUploader.post_batch() returns False,
maybe_flush() calls nack() on every item in the batch. The items
remain in SQLite with an incremented retry_count and a future
next_retry_at. They will not be dequeued again until that time passes, which
prevents tight-loop retry storms that could exhaust battery or bandwidth.
If the queue grows large enough to fill the device's storage, you have two escalation options:
- TTL expiration. Items older than their TTL are silently skipped by
dequeue(). They still occupy disk space, but they will never be transmitted. Run a periodic purge job (or extendCloudSyncQueuewith apurge_expired()method) to reclaim space. - Priority shedding. When disk usage exceeds a threshold, delete all items
with
priority >= Priority.LOGSwhile preservingCRITICALandANOMALYitems.
Tuning Parameters
| Parameter | Default | When to Increase | When to Decrease |
|---|---|---|---|
ttl_seconds |
86400 (24 h) | Long outages expected (remote, maritime) | Short compliance windows (GDPR data minimisation) |
batch_size |
10 | Fat pipe, large payloads | Thin pipe, latency-sensitive alarms |
retry_delay_seconds |
60 | Intermittent connectivity (cellular handover) | Fast-recovery LAN environments |
Summary
CloudSyncQueue transforms unreliable network connectivity into a manageable
operational concern. By persisting every item in SQLite with priority ordering, TTL expiration,
and explicit acknowledgement, it ensures that no data is lost to transient failures and that
critical alarms are never blocked behind routine telemetry. The maybe_flush()
method provides a one-line integration with HTTPCloudUploader, while
get_stats() and pending_count() give you the observability you need
to detect and diagnose backpressure before it becomes a data-loss incident.