CloudSyncQueue API Reference
Complete reference for CloudSyncQueue, including enqueue, dequeue, peek, ack, nack, maybe_flush, SQLite schema, locking, and configuration.
Published Jun 2, 2026
Overview
CloudSyncQueue is a persistent store-and-forward queue for edge-to-cloud data synchronization. It is backed by SQLite and designed to survive process crashes, power losses, and network outages. Items remain in the queue until explicitly acknowledged (ack) after successful upload, or negatively acknowledged (nack) for retry with exponential backoff.
The queue supports priority levels (Critical, Anomaly, Telemetry, Logs) and TTL-based expiration. It is thread-safe via an internal threading.RLock.
SQLite Schema
The queue stores items in a single table with an index on priority and retry time:
CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
payload TEXT NOT NULL,
priority INTEGER NOT NULL,
created_at REAL NOT NULL,
ttl_seconds INTEGER NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
next_retry_at REAL NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_sync_queue_priority
ON sync_queue(priority, next_retry_at);
The priority column stores the enum value (1 = Critical, 2 = Anomaly, 3 = Telemetry, 4 = Logs). The next_retry_at column ensures that nacked items are not immediately retried.
Constructor
class CloudSyncQueue:
def __init__(self, db_path: str = "sync_queue.db") -> None:
...
| Parameter | Type | Default | Description |
|---|---|---|---|
db_path | str | "sync_queue.db" | Path to the SQLite database file. The directory is created automatically if needed. |
On construction, the queue creates the table and index if they do not exist. Each operation opens and closes its own connection; this is slightly less efficient than a persistent connection but safer in multi-process scenarios.
Priority Levels
class Priority(Enum):
CRITICAL = 1
ANOMALY = 2
TELEMETRY = 3
LOGS = 4
Lower numeric values are higher priority. dequeue() and peek() order by priority ASC, next_retry_at ASC, so critical items are always sent first.
Core Methods
enqueue(payload, priority, ttl_seconds)
def enqueue(
self,
payload: Dict[str, Any],
priority: Priority = Priority.TELEMETRY,
ttl_seconds: int = 86400,
) -> int
| Parameter | Type | Default | Description |
|---|---|---|---|
payload | Dict[str, Any] | required | JSON-serializable dictionary. Automatically serialized with json.dumps(payload, default=str). |
priority | Priority | Priority.TELEMETRY | Queue priority level. |
ttl_seconds | int | 86400 | Time-to-live in seconds. Items older than TTL are silently skipped by dequeue/peek and remain in the table until acked. |
Returns the auto-incremented row ID of the inserted item.
from pyv_edge_agent.cloud_sync import CloudSyncQueue, Priority
queue = CloudSyncQueue(db_path="/data/sync.db")
item_id = queue.enqueue(
payload={"sensor": "boiler", "value": 85.0, "alert": "overheat"},
priority=Priority.CRITICAL,
ttl_seconds=3600,
)
print(f"Enqueued item {item_id}")
dequeue(batch_size)
def dequeue(self, batch_size: int = 10) -> List[QueueItem]
Retrieves up to batch_size items that are eligible for transmission: next_retry_at <= now and created_at + ttl_seconds > now. Items are ordered by priority and retry time. This method does not remove items from the queue. Call ack() after successful upload.
items = queue.dequeue(batch_size=50)
for item in items:
print(f"ID={item.id}, Priority={item.priority.name}, Retry={item.retry_count}")
peek(n)
def peek(self, n: int = 5) -> List[QueueItem]
Identical to dequeue() in query logic but semantically intended for inspection without transmission intent. Returns up to n eligible items.
up_next = queue.peek(n=3)
for item in up_next:
print(item.payload)
ack(ids)
def ack(self, ids: Sequence[int]) -> int
Permanently removes items from the queue. Returns the number of rows deleted. Safe to call with an empty sequence (returns 0).
success_ids = [item.id for item in items]
deleted = queue.ack(success_ids)
print(f"Acknowledged {deleted} items")
nack(ids, retry_delay_seconds)
def nack(self, ids: Sequence[int], retry_delay_seconds: float = 60.0) -> int
Increments retry_count and sets next_retry_at = now + retry_delay_seconds. Returns the number of rows updated. Use this when upload fails and the item should be retried later.
failed_ids = [item.id for item in items]
updated = queue.nack(failed_ids, retry_delay_seconds=300.0)
print(f"Scheduled {updated} items for retry in 5 minutes")
maybe_flush(uploader)
def maybe_flush(self, uploader: Any = None) -> int
Flushes all pending items as a single batch. If uploader is provided, it calls uploader.post_batch() with the payloads. On success, items are acked and counters updated. On failure, items are nacked. Returns the number of items flushed.
from pyv_edge_agent.cloud_sync import HTTPCloudUploader
uploader = HTTPCloudUploader(
endpoint="https://api.pyvorin.com/v1/ingest",
api_key="pyv_live_xxxxxxxx",
timeout=30.0,
)
flushed = queue.maybe_flush(uploader=uploader)
print(f"Flushed {flushed} items to cloud")
Utility Methods
pending_count()
def pending_count(self) -> int
Returns the total number of rows in the sync_queue table, regardless of TTL or retry state.
get_stats()
def get_stats(self) -> Dict[str, Any]
Returns queue statistics:
depth: Total pending count.oldest_item_timestamp: Minimumcreated_at, or None if empty.total_retries: Sum ofretry_countacross all items.retrying_items: Count of items withretry_count > 0.
stats = queue.get_stats()
print(f"Depth: {stats['depth']}, Retrying: {stats['retrying_items']}")
reset_daily_counters()
def reset_daily_counters(self) -> None
Resets messages_sent_today to 0. Should be called by a scheduled job at midnight.
from_dict()
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "CloudSyncQueue"
Factory method for configuration-driven construction:
config = {"db_path": "/data/queue.db"}
queue = CloudSyncQueue.from_dict(config)
Locking Behavior
All public methods acquire an internal threading.RLock for the duration of the SQLite transaction. This means:
- Multiple threads can safely enqueue and dequeue concurrently.
- A single thread can recursively acquire the lock (e.g.,
maybe_flushcallingdequeueandackinternally would be safe, though the current implementation keeps all work inside one lock scope). - The lock does not protect against other processes accessing the same SQLite file. For multi-process deployments, use WAL mode or a separate locking mechanism.
QueueItem Dataclass
@dataclass
class QueueItem:
id: int
payload: Dict[str, Any]
priority: Priority
created_at: float
ttl_seconds: int
retry_count: int = 0
next_retry_at: float = field(default_factory=time.time)
Each QueueItem has a to_dict() method for serialization.
Complete Workflow Example
import time
from pyv_edge_agent.cloud_sync import CloudSyncQueue, Priority, HTTPCloudUploader
queue = CloudSyncQueue(db_path="/tmp/edge_queue.db")
# Enqueue readings from multiple sensors
for i in range(100):
queue.enqueue(
payload={"sensor": "motor", "value": float(i), "timestamp": time.time()},
priority=Priority.TELEMETRY,
)
# Check queue depth
print(f"Pending: {queue.pending_count()}")
# Peek at next batch
print(queue.peek(n=3))
# Simulate upload
uploader = HTTPCloudUploader(
endpoint="https://api.pyvorin.com/v1/ingest",
api_key="pyv_test_xxxxxxxx",
timeout=10.0,
)
flushed = queue.maybe_flush(uploader=uploader)
print(f"Flushed: {flushed}")
# Final stats
print(queue.get_stats())