MQTTAdapter — Broker Subscriptions and Resilient Ingest
Connect to MQTT brokers, manage QoS, TLS, authentication, topic wildcards, and automatic reconnection with full working examples.
Published Jun 2, 2026
Overview
MQTT is the dominant protocol for IoT telemetry. The Pyvorin Edge SDK includes an MQTTAdapter built on paho-mqtt that handles connection, subscription, message parsing, and automatic reconnection. It is designed to run in a background thread, feeding SensorReading dicts into your pipeline via a user-supplied callback.
Installation
pip install paho-mqtt
Connecting to a Broker
The connect() method stores broker parameters, instantiates a paho.mqtt.client.Client (compatible with both v1 and v2 APIs), and starts the network loop. If the connection drops, an exponential back-off reconnect strategy activates automatically.
from pyv_edge_agent.ingest import MQTTAdapter
adapter = MQTTAdapter()
adapter.connect(
broker="mqtt.local",
port=1883,
topic="sensors/+/temperature",
qos=1,
)
QoS Levels
| QoS | Guarantee | Use Case |
|---|---|---|
| 0 | At most once (fire and forget) | High-frequency metrics where loss is acceptable. |
| 1 | At least once | Default for sensor telemetry; duplicates are tolerable. |
| 2 | Exactly once | Billing or safety-critical events where duplicates are harmful. |
Topic Subscriptions and Wildcards
The adapter supports both single-level (+) and multi-level (#) wildcards. The initial topic is subscribed automatically on connect; additional topics can be added later with subscribe().
# Subscribe to all sensors in building A
adapter.connect(broker="mqtt.local", topic="building_a/+/+", qos=1)
# Later, add a specific critical topic
adapter.subscribe("building_a/server_room/+/critical")
TLS / SSL
For production deployments, always use TLS. Because MQTTAdapter exposes the raw paho client after connection, you can configure TLS before calling connect() by subclassing or by reaching into the internal client object. The following pattern is the recommended approach for encrypted brokers:
import ssl
import paho.mqtt.client as mqtt
class TLSMQTTAdapter(MQTTAdapter):
def connect(self, broker, port=8883, topic="#", qos=1):
self._broker = broker
self._port = port
self._topic = topic
self._qos = qos
self._stop_reconnect.clear()
try:
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
except (AttributeError, TypeError):
client = mqtt.Client()
client.tls_set(
ca_certs="/etc/ssl/certs/ca-certificates.crt",
certfile="/opt/pyvorin/certs/client.crt",
keyfile="/opt/pyvorin/certs/client.key",
tls_version=ssl.PROTOCOL_TLS_CLIENT,
)
client.on_connect = self._on_connect
client.on_disconnect = self._on_disconnect
client.on_message = self._on_message
client.connect(broker, port)
client.loop_start()
self._client = client
adapter = TLSMQTTAdapter()
adapter.connect("secure-broker.example.com", port=8883, topic="sensors/#")
Authentication
Username/password authentication is supported by setting credentials on the underlying paho client before connect. The same subclassing pattern applies:
class AuthMQTTAdapter(MQTTAdapter):
def __init__(self, username, password):
super().__init__()
self._username = username
self._password = password
def connect(self, broker, port=1883, topic="#", qos=1):
# ... client setup as above ...
client.username_pw_set(self._username, self._password)
# ... connect and loop_start ...
adapter = AuthMQTTAdapter("edge_device_001", "super_secret_token")
adapter.connect("broker.example.com")
Message Parsing
The adapter expects JSON payloads shaped like a SensorReading dict. Incoming messages are decoded with json.loads and normalised to a dict with keys: sensor_name, timestamp, value, unit, metadata. Non-dict payloads are dropped with a warning.
{
"sensor_name": "boiler_stack_temp",
"timestamp": 1717152000.0,
"value": 145.2,
"unit": "°C",
"metadata": {"firmware": "2.4.1"}
}
Binary Payloads
If your sensors publish binary payloads (e.g., CBOR, MessagePack, or Protobuf), subclass the adapter and override _on_message to decode before dispatching to the callback.
import msgpack
class MsgPackMQTTAdapter(MQTTAdapter):
def _on_message(self, client, userdata, msg):
try:
payload = msgpack.unpackb(msg.payload, raw=False)
reading = {
"sensor_name": str(payload.get("sensor_name", "")),
"timestamp": float(payload.get("timestamp", 0)),
"value": float(payload.get("value", 0)),
"unit": str(payload.get("unit", "")),
"metadata": dict(payload.get("metadata", {})),
}
with self._lock:
cb = self._callback
if cb is not None:
cb(reading)
except Exception as exc:
logger.exception("MsgPack decode error: %s", exc)
Reconnection Logic
On disconnect, the adapter schedules a reconnect with exponential back-off starting at 1 second and capping at 60 seconds. The timer is daemonised so it will not block interpreter shutdown. Call disconnect() to set the stop flag and cleanly tear down the network loop.
adapter.disconnect()
Publishing Upstream
The adapter can also publish JSON payloads back to the broker. This is useful for sending pipeline results, heartbeats, or acknowledged alerts.
adapter.publish("alerts/server_room", {
"rule": "overheating",
"severity": "critical",
"timestamp": 1717152000.0,
})
Full Working Example with Mosquitto
Below is a complete script that starts a local Mosquitto broker, publishes synthetic temperature data, and ingests it through the adapter into a Pyvorin pipeline.
# 1. Start mosquitto (Docker or system package)
docker run -it --rm -p 1883:1883 eclipse-mosquitto:2 \
mosquitto -c /mosquitto-no-auth.conf
import json
import time
import threading
import paho.mqtt.publish as publish
from pyvorin_edge.pipeline import Pipeline, RuleConfig
from pyv_edge_agent.ingest import MQTTAdapter
# 2. Set up pipeline
pipeline = Pipeline("mqtt_demo")
pipeline.add_rule(RuleConfig(
name="hot",
condition=lambda r: r["value"] > 30.0,
severity="warning",
cooldown_seconds=5.0,
))
events = []
def on_reading(reading):
result = pipeline.run([reading])
events.extend(result.events)
# 3. Start adapter
adapter = MQTTAdapter()
adapter.on_message(on_reading)
adapter.connect("localhost", 1883, topic="lab/+/temp", qos=1)
# 4. Publish synthetic data in another thread
def publisher():
for i in range(20):
payload = json.dumps({
"sensor_name": "lab_bench_temp",
"timestamp": time.time(),
"value": 20.0 + i,
"unit": "°C",
"metadata": {},
})
publish.single("lab/bench/temp", payload, hostname="localhost")
time.sleep(0.5)
threading.Thread(target=publisher, daemon=True).start()
# 5. Let it run, then inspect
time.sleep(12)
adapter.disconnect()
print(f"Total events: {len(events)}")
for e in events:
print(e.to_dict())