edge 13 min read

CloudSyncQueue API Reference

Complete reference for CloudSyncQueue, including enqueue, dequeue, peek, ack, nack, maybe_flush, SQLite schema, locking, and configuration.

Published Jun 2, 2026

Overview

CloudSyncQueue is a persistent store-and-forward queue for edge-to-cloud data synchronization. It is backed by SQLite and designed to survive process crashes, power losses, and network outages. Items remain in the queue until explicitly acknowledged (ack) after successful upload, or negatively acknowledged (nack) for retry with exponential backoff.

The queue supports priority levels (Critical, Anomaly, Telemetry, Logs) and TTL-based expiration. It is thread-safe via an internal threading.RLock.

SQLite Schema

The queue stores items in a single table with an index on priority and retry time:


CREATE TABLE IF NOT EXISTS sync_queue (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    payload TEXT NOT NULL,
    priority INTEGER NOT NULL,
    created_at REAL NOT NULL,
    ttl_seconds INTEGER NOT NULL,
    retry_count INTEGER NOT NULL DEFAULT 0,
    next_retry_at REAL NOT NULL DEFAULT 0
);

CREATE INDEX IF NOT EXISTS idx_sync_queue_priority
    ON sync_queue(priority, next_retry_at);
  

The priority column stores the enum value (1 = Critical, 2 = Anomaly, 3 = Telemetry, 4 = Logs). The next_retry_at column ensures that nacked items are not immediately retried.

Constructor


class CloudSyncQueue:
    def __init__(self, db_path: str = "sync_queue.db") -> None:
        ...
  
ParameterTypeDefaultDescription
db_pathstr"sync_queue.db"Path to the SQLite database file. The directory is created automatically if needed.

On construction, the queue creates the table and index if they do not exist. Each operation opens and closes its own connection; this is slightly less efficient than a persistent connection but safer in multi-process scenarios.

Priority Levels


class Priority(Enum):
    CRITICAL = 1
    ANOMALY = 2
    TELEMETRY = 3
    LOGS = 4
  

Lower numeric values are higher priority. dequeue() and peek() order by priority ASC, next_retry_at ASC, so critical items are always sent first.

Core Methods

enqueue(payload, priority, ttl_seconds)


def enqueue(
    self,
    payload: Dict[str, Any],
    priority: Priority = Priority.TELEMETRY,
    ttl_seconds: int = 86400,
) -> int
  
ParameterTypeDefaultDescription
payloadDict[str, Any]requiredJSON-serializable dictionary. Automatically serialized with json.dumps(payload, default=str).
priorityPriorityPriority.TELEMETRYQueue priority level.
ttl_secondsint86400Time-to-live in seconds. Items older than TTL are silently skipped by dequeue/peek and remain in the table until acked.

Returns the auto-incremented row ID of the inserted item.


from pyv_edge_agent.cloud_sync import CloudSyncQueue, Priority

queue = CloudSyncQueue(db_path="/data/sync.db")
item_id = queue.enqueue(
    payload={"sensor": "boiler", "value": 85.0, "alert": "overheat"},
    priority=Priority.CRITICAL,
    ttl_seconds=3600,
)
print(f"Enqueued item {item_id}")
  

dequeue(batch_size)


def dequeue(self, batch_size: int = 10) -> List[QueueItem]
  

Retrieves up to batch_size items that are eligible for transmission: next_retry_at <= now and created_at + ttl_seconds > now. Items are ordered by priority and retry time. This method does not remove items from the queue. Call ack() after successful upload.


items = queue.dequeue(batch_size=50)
for item in items:
    print(f"ID={item.id}, Priority={item.priority.name}, Retry={item.retry_count}")
  

peek(n)


def peek(self, n: int = 5) -> List[QueueItem]
  

Identical to dequeue() in query logic but semantically intended for inspection without transmission intent. Returns up to n eligible items.


up_next = queue.peek(n=3)
for item in up_next:
    print(item.payload)
  

ack(ids)


def ack(self, ids: Sequence[int]) -> int
  

Permanently removes items from the queue. Returns the number of rows deleted. Safe to call with an empty sequence (returns 0).


success_ids = [item.id for item in items]
deleted = queue.ack(success_ids)
print(f"Acknowledged {deleted} items")
  

nack(ids, retry_delay_seconds)


def nack(self, ids: Sequence[int], retry_delay_seconds: float = 60.0) -> int
  

Increments retry_count and sets next_retry_at = now + retry_delay_seconds. Returns the number of rows updated. Use this when upload fails and the item should be retried later.


failed_ids = [item.id for item in items]
updated = queue.nack(failed_ids, retry_delay_seconds=300.0)
print(f"Scheduled {updated} items for retry in 5 minutes")
  

maybe_flush(uploader)


def maybe_flush(self, uploader: Any = None) -> int
  

Flushes all pending items as a single batch. If uploader is provided, it calls uploader.post_batch() with the payloads. On success, items are acked and counters updated. On failure, items are nacked. Returns the number of items flushed.


from pyv_edge_agent.cloud_sync import HTTPCloudUploader

uploader = HTTPCloudUploader(
    endpoint="https://api.pyvorin.com/v1/ingest",
    api_key="pyv_live_xxxxxxxx",
    timeout=30.0,
)
flushed = queue.maybe_flush(uploader=uploader)
print(f"Flushed {flushed} items to cloud")
  

Utility Methods

pending_count()


def pending_count(self) -> int
  

Returns the total number of rows in the sync_queue table, regardless of TTL or retry state.

get_stats()


def get_stats(self) -> Dict[str, Any]
  

Returns queue statistics:

  • depth: Total pending count.
  • oldest_item_timestamp: Minimum created_at, or None if empty.
  • total_retries: Sum of retry_count across all items.
  • retrying_items: Count of items with retry_count > 0.

stats = queue.get_stats()
print(f"Depth: {stats['depth']}, Retrying: {stats['retrying_items']}")
  

reset_daily_counters()


def reset_daily_counters(self) -> None
  

Resets messages_sent_today to 0. Should be called by a scheduled job at midnight.

from_dict()


@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "CloudSyncQueue"
  

Factory method for configuration-driven construction:


config = {"db_path": "/data/queue.db"}
queue = CloudSyncQueue.from_dict(config)
  

Locking Behavior

All public methods acquire an internal threading.RLock for the duration of the SQLite transaction. This means:

  • Multiple threads can safely enqueue and dequeue concurrently.
  • A single thread can recursively acquire the lock (e.g., maybe_flush calling dequeue and ack internally would be safe, though the current implementation keeps all work inside one lock scope).
  • The lock does not protect against other processes accessing the same SQLite file. For multi-process deployments, use WAL mode or a separate locking mechanism.

QueueItem Dataclass


@dataclass
class QueueItem:
    id: int
    payload: Dict[str, Any]
    priority: Priority
    created_at: float
    ttl_seconds: int
    retry_count: int = 0
    next_retry_at: float = field(default_factory=time.time)
  

Each QueueItem has a to_dict() method for serialization.

Complete Workflow Example


import time
from pyv_edge_agent.cloud_sync import CloudSyncQueue, Priority, HTTPCloudUploader

queue = CloudSyncQueue(db_path="/tmp/edge_queue.db")

# Enqueue readings from multiple sensors
for i in range(100):
    queue.enqueue(
        payload={"sensor": "motor", "value": float(i), "timestamp": time.time()},
        priority=Priority.TELEMETRY,
    )

# Check queue depth
print(f"Pending: {queue.pending_count()}")

# Peek at next batch
print(queue.peek(n=3))

# Simulate upload
uploader = HTTPCloudUploader(
    endpoint="https://api.pyvorin.com/v1/ingest",
    api_key="pyv_test_xxxxxxxx",
    timeout=10.0,
)
flushed = queue.maybe_flush(uploader=uploader)
print(f"Flushed: {flushed}")

# Final stats
print(queue.get_stats())