edge 14 min read

MQTTAdapter — Broker Subscriptions and Resilient Ingest

Connect to MQTT brokers, manage QoS, TLS, authentication, topic wildcards, and automatic reconnection with full working examples.

Published Jun 2, 2026

Overview

MQTT is the dominant protocol for IoT telemetry. The Pyvorin Edge SDK includes an MQTTAdapter built on paho-mqtt that handles connection, subscription, message parsing, and automatic reconnection. It is designed to run in a background thread, feeding SensorReading dicts into your pipeline via a user-supplied callback.

Installation


pip install paho-mqtt
  

Connecting to a Broker

The connect() method stores broker parameters, instantiates a paho.mqtt.client.Client (compatible with both v1 and v2 APIs), and starts the network loop. If the connection drops, an exponential back-off reconnect strategy activates automatically.


from pyv_edge_agent.ingest import MQTTAdapter

adapter = MQTTAdapter()
adapter.connect(
    broker="mqtt.local",
    port=1883,
    topic="sensors/+/temperature",
    qos=1,
)
  

QoS Levels

QoSGuaranteeUse Case
0At most once (fire and forget)High-frequency metrics where loss is acceptable.
1At least onceDefault for sensor telemetry; duplicates are tolerable.
2Exactly onceBilling or safety-critical events where duplicates are harmful.

Topic Subscriptions and Wildcards

The adapter supports both single-level (+) and multi-level (#) wildcards. The initial topic is subscribed automatically on connect; additional topics can be added later with subscribe().


# Subscribe to all sensors in building A
adapter.connect(broker="mqtt.local", topic="building_a/+/+", qos=1)

# Later, add a specific critical topic
adapter.subscribe("building_a/server_room/+/critical")
  

TLS / SSL

For production deployments, always use TLS. Because MQTTAdapter exposes the raw paho client after connection, you can configure TLS before calling connect() by subclassing or by reaching into the internal client object. The following pattern is the recommended approach for encrypted brokers:


import ssl
import paho.mqtt.client as mqtt

class TLSMQTTAdapter(MQTTAdapter):
    def connect(self, broker, port=8883, topic="#", qos=1):
        self._broker = broker
        self._port = port
        self._topic = topic
        self._qos = qos
        self._stop_reconnect.clear()

        try:
            client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
        except (AttributeError, TypeError):
            client = mqtt.Client()

        client.tls_set(
            ca_certs="/etc/ssl/certs/ca-certificates.crt",
            certfile="/opt/pyvorin/certs/client.crt",
            keyfile="/opt/pyvorin/certs/client.key",
            tls_version=ssl.PROTOCOL_TLS_CLIENT,
        )
        client.on_connect = self._on_connect
        client.on_disconnect = self._on_disconnect
        client.on_message = self._on_message
        client.connect(broker, port)
        client.loop_start()
        self._client = client

adapter = TLSMQTTAdapter()
adapter.connect("secure-broker.example.com", port=8883, topic="sensors/#")
  

Authentication

Username/password authentication is supported by setting credentials on the underlying paho client before connect. The same subclassing pattern applies:


class AuthMQTTAdapter(MQTTAdapter):
    def __init__(self, username, password):
        super().__init__()
        self._username = username
        self._password = password

    def connect(self, broker, port=1883, topic="#", qos=1):
        # ... client setup as above ...
        client.username_pw_set(self._username, self._password)
        # ... connect and loop_start ...

adapter = AuthMQTTAdapter("edge_device_001", "super_secret_token")
adapter.connect("broker.example.com")
  

Message Parsing

The adapter expects JSON payloads shaped like a SensorReading dict. Incoming messages are decoded with json.loads and normalised to a dict with keys: sensor_name, timestamp, value, unit, metadata. Non-dict payloads are dropped with a warning.


{
  "sensor_name": "boiler_stack_temp",
  "timestamp": 1717152000.0,
  "value": 145.2,
  "unit": "°C",
  "metadata": {"firmware": "2.4.1"}
}
  

Binary Payloads

If your sensors publish binary payloads (e.g., CBOR, MessagePack, or Protobuf), subclass the adapter and override _on_message to decode before dispatching to the callback.


import msgpack

class MsgPackMQTTAdapter(MQTTAdapter):
    def _on_message(self, client, userdata, msg):
        try:
            payload = msgpack.unpackb(msg.payload, raw=False)
            reading = {
                "sensor_name": str(payload.get("sensor_name", "")),
                "timestamp": float(payload.get("timestamp", 0)),
                "value": float(payload.get("value", 0)),
                "unit": str(payload.get("unit", "")),
                "metadata": dict(payload.get("metadata", {})),
            }
            with self._lock:
                cb = self._callback
            if cb is not None:
                cb(reading)
        except Exception as exc:
            logger.exception("MsgPack decode error: %s", exc)
  

Reconnection Logic

On disconnect, the adapter schedules a reconnect with exponential back-off starting at 1 second and capping at 60 seconds. The timer is daemonised so it will not block interpreter shutdown. Call disconnect() to set the stop flag and cleanly tear down the network loop.


adapter.disconnect()
  

Publishing Upstream

The adapter can also publish JSON payloads back to the broker. This is useful for sending pipeline results, heartbeats, or acknowledged alerts.


adapter.publish("alerts/server_room", {
    "rule": "overheating",
    "severity": "critical",
    "timestamp": 1717152000.0,
})
  

Full Working Example with Mosquitto

Below is a complete script that starts a local Mosquitto broker, publishes synthetic temperature data, and ingests it through the adapter into a Pyvorin pipeline.


# 1. Start mosquitto (Docker or system package)
docker run -it --rm -p 1883:1883 eclipse-mosquitto:2 \
  mosquitto -c /mosquitto-no-auth.conf
  

import json
import time
import threading
import paho.mqtt.publish as publish
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyv_edge_agent.ingest import MQTTAdapter

# 2. Set up pipeline
pipeline = Pipeline("mqtt_demo")
pipeline.add_rule(RuleConfig(
    name="hot",
    condition=lambda r: r["value"] > 30.0,
    severity="warning",
    cooldown_seconds=5.0,
))

events = []
def on_reading(reading):
    result = pipeline.run([reading])
    events.extend(result.events)

# 3. Start adapter
adapter = MQTTAdapter()
adapter.on_message(on_reading)
adapter.connect("localhost", 1883, topic="lab/+/temp", qos=1)

# 4. Publish synthetic data in another thread
def publisher():
    for i in range(20):
        payload = json.dumps({
            "sensor_name": "lab_bench_temp",
            "timestamp": time.time(),
            "value": 20.0 + i,
            "unit": "°C",
            "metadata": {},
        })
        publish.single("lab/bench/temp", payload, hostname="localhost")
        time.sleep(0.5)

threading.Thread(target=publisher, daemon=True).start()

# 5. Let it run, then inspect
time.sleep(12)
adapter.disconnect()
print(f"Total events: {len(events)}")
for e in events:
    print(e.to_dict())