"Tutorial: Telecom Tower Monitoring and Safety"
Advanced multi-sensor edge pipeline for telecom towers featuring SNMP adapters, tower-climb safety rules, and MQTT integration to the NOC.
Published Jun 2, 2026
Introduction
Telecom towers are unmanned, remote, and high-value assets. A single overheated RF amplifier or an unauthorised door opening can lead to service outages, regulatory fines, or physical danger. This tutorial builds an advanced edge pipeline that ingests temperature, vibration, door-state, and power metrics; polls tower equipment via SNMP; enforces tower-climb safety rules; and streams alerts to the Network Operations Centre (NOC) over MQTT.
Hardware Setup
A ruggedised ARM64 gateway (e.g., Raspberry Pi Compute Module 4 in an industrial DIN-rail case) is installed in the tower equipment cabinet. The following sensors are connected:
- Temperature: DS18B20 probe on the heatsink of the primary RF amplifier.
- Vibration: ADXL345 on the tower leg to detect structural resonance or climbing.
- Door Sensor: Magnetic reed switch on the cabinet door.
- Power: INA219 I2C current sensor on the 48 V DC bus.
SNMP Adapter Pattern
Most tower equipment (rectifiers, battery monitors, environmental probes) exposes data via
SNMP v2c or v3. The Pyvorin Edge SDK does not ship a built-in SNMP adapter, but the
duck-typing contract from edge-custom-adapters.html makes it trivial to add one.
from typing import Any, Dict, Optional
import time
try:
from pysnmp.hlapi.v3arch.asyncio import get_cmd, SnmpEngine, CommunityData
from pysnmp.hlapi.v3arch.asyncio import UdpTransportTarget, ContextData, ObjectType, ObjectIdentity
_SNMP_AVAILABLE = True
except ImportError:
_SNMP_AVAILABLE = False
class SNMPAdapter:
"""Poll SNMP OIDs and emit SensorReading-compatible dicts."""
def __init__(self, host: str, community: str = "public", port: int = 161) -> None:
self.host = host
self.community = community
self.port = port
def get_oid(self, oid: str) -> Optional[float]:
if not _SNMP_AVAILABLE:
return None
iterator = get_cmd(
SnmpEngine(),
CommunityData(self.community),
UdpTransportTarget((self.host, self.port)),
ContextData(),
ObjectType(ObjectIdentity(oid)),
)
error_indication, error_status, _, var_binds = next(iterator)
if error_indication or error_status:
return None
for var_bind in var_binds:
try:
return float(var_bind[1])
except (ValueError, TypeError):
return None
return None
def generate_reading(self, sensor_name: str, oid: str, unit: str) -> Dict[str, Any]:
value = self.get_oid(oid)
return {
"sensor_name": sensor_name,
"timestamp": time.time(),
"value": value if value is not None else 0.0,
"unit": unit,
}
Multi-Sensor Pipeline
We register all physical sensors and instantiate a Pipeline with rolling windows
for trend analysis.
from pyvorin_edge.pipeline import Pipeline, WindowConfig, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading
pipeline = Pipeline(name="telecom_tower")
pipeline.add_sensor(
Sensor(name="rf_temp", sensor_type=SensorType.TEMPERATURE, unit="C",
normal_range=(20.0, 65.0), location="RF_Amplifier")
)
pipeline.add_sensor(
Sensor(name="leg_vibration", sensor_type=SensorType.VIBRATION, unit="g",
normal_range=(0.0, 0.5), location="Tower_Leg")
)
pipeline.add_sensor(
Sensor(name="cabinet_door", sensor_type=SensorType.GENERIC, unit="bool",
normal_range=(0.0, 1.0), location="Cabinet")
)
pipeline.add_sensor(
Sensor(name="dc_current", sensor_type=SensorType.CURRENT, unit="A",
normal_range=(0.0, 50.0), location="DC_Bus")
)
# 5-minute rolling windows for trend detection
for sensor_name in ["rf_temp", "leg_vibration", "dc_current"]:
pipeline.add_window(
WindowConfig(duration_seconds=300.0, sensor_name=sensor_name, window_type="rolling")
)
Tower-Climb Safety Rules
A tower climb is safe only during a scheduled maintenance window and with the door open.
If the door opens outside a maintenance window, the pipeline must emit a
CRITICAL alert immediately. Because RuleConfig.condition accepts a
callable, we can implement complex multi-sensor logic in pure Python.
from dataclasses import dataclass
from typing import List
# Maintenance windows are expressed as (start_hour, end_hour) tuples
MAINTENANCE_WINDOWS = [(9, 11), (14, 16)]
@dataclass
class TowerContext:
door_open: bool
hour_of_day: int
vibration_g: float
def unauthorised_climb(ctx: TowerContext) -> bool:
in_window = any(start <= ctx.hour_of_day < end for start, end in MAINTENANCE_WINDOWS)
return ctx.door_open and not in_window
def structural_overload(ctx: TowerContext) -> bool:
return ctx.vibration_g > 0.3
# Register safety rules
pipeline.add_rule(
RuleConfig(
name="unauthorised_climb",
condition=unauthorised_climb,
severity="critical",
cooldown_seconds=0.0,
action=lambda ctx: print("ALERT: Unauthorised cabinet access!"),
)
)
pipeline.add_rule(
RuleConfig(
name="structural_overload",
condition=structural_overload,
severity="critical",
cooldown_seconds=5.0,
)
)
Edge-to-NOC Integration (MQTT)
Alerts are streamed to the central NOC over MQTT using MQTTAdapter from
pyv_edge_agent.ingest. In egress mode, we reuse the same transport to publish
events upstream.
from pyv_edge_agent.ingest import MQTTAdapter
import json
mqtt = MQTTAdapter()
mqtt.connect(
broker="noc-broker.pyvorin.com",
port=8883,
topic="towers/+/alerts",
qos=2,
)
def publish_alert(event):
payload = json.dumps({
"tower_id": "TWR-42",
"rule": event.rule_name,
"severity": event.severity,
"timestamp": event.timestamp,
"readings": [r.to_dict() for r in event.sensor_readings],
})
mqtt.publish("towers/TWR-42/alerts", payload, qos=2)
print(f"Published alert: {event.rule_name}")
# Attach the publisher to the pipeline as a rule action
pipeline.add_rule(
RuleConfig(
name="rf_overheat",
condition_expr="ctx.value > 60.0",
severity="warning",
cooldown_seconds=60.0,
action=lambda ctx: publish_alert(
# In production, construct the Event object properly
type("Event", (), {
"rule_name": "rf_overheat",
"severity": "warning",
"timestamp": time.time(),
"sensor_readings": [ctx],
})()
),
)
)
Complete Working Script
Save the following as telecom_tower.py. It combines SNMP polling, multi-sensor
ingestion, safety rules, and MQTT egress in a single runnable file.
#!/usr/bin/env python3
"""Telecom Tower Monitoring — complete working example."""
import time
from typing import Any, Dict, Optional
from pyvorin_edge.pipeline import Pipeline, WindowConfig, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading
from pyvorin_edge.windows import RollingWindow
from pyv_edge_agent.ingest import MQTTAdapter
import json
class SNMPAdapter:
"""Minimal SNMP v2c adapter for tower equipment."""
def __init__(self, host: str, community: str = "public", port: int = 161):
self.host = host
self.community = community
self.port = port
def get_oid(self, oid: str) -> Optional[float]:
try:
from pysnmp.hlapi.v3arch.asyncio import (
get_cmd, SnmpEngine, CommunityData,
UdpTransportTarget, ContextData, ObjectType, ObjectIdentity,
)
except ImportError:
return None
iterator = get_cmd(
SnmpEngine(),
CommunityData(self.community),
UdpTransportTarget((self.host, self.port)),
ContextData(),
ObjectType(ObjectIdentity(oid)),
)
error_indication, error_status, _, var_binds = next(iterator)
if error_indication or error_status:
return None
for var_bind in var_binds:
try:
return float(var_bind[1])
except (ValueError, TypeError):
return None
return None
def generate_reading(self, sensor_name: str, oid: str, unit: str) -> Dict[str, Any]:
value = self.get_oid(oid)
return {
"sensor_name": sensor_name,
"timestamp": time.time(),
"value": value if value is not None else 0.0,
"unit": unit,
}
def main():
pipeline = Pipeline(name="telecom_tower")
# Sensors
pipeline.add_sensor(
Sensor(name="rf_temp", sensor_type=SensorType.TEMPERATURE, unit="C",
normal_range=(20.0, 65.0), location="RF_Amplifier")
)
pipeline.add_sensor(
Sensor(name="leg_vibration", sensor_type=SensorType.VIBRATION, unit="g",
normal_range=(0.0, 0.5), location="Tower_Leg")
)
pipeline.add_sensor(
Sensor(name="cabinet_door", sensor_type=SensorType.GENERIC, unit="bool",
normal_range=(0.0, 1.0), location="Cabinet")
)
pipeline.add_sensor(
Sensor(name="dc_current", sensor_type=SensorType.CURRENT, unit="A",
normal_range=(0.0, 50.0), location="DC_Bus")
)
# Windows
for name in ["rf_temp", "leg_vibration", "dc_current"]:
pipeline.add_window(
WindowConfig(duration_seconds=300.0, sensor_name=name, window_type="rolling")
)
# Safety rules
MAINTENANCE_WINDOWS = [(9, 11), (14, 16)]
class TowerContext:
def __init__(self, door_open: bool, hour_of_day: int, vibration_g: float):
self.door_open = door_open
self.hour_of_day = hour_of_day
self.vibration_g = vibration_g
def unauthorised_climb(ctx: TowerContext) -> bool:
in_window = any(start <= ctx.hour_of_day < end for start, end in MAINTENANCE_WINDOWS)
return ctx.door_open and not in_window
def structural_overload(ctx: TowerContext) -> bool:
return ctx.vibration_g > 0.3
pipeline.add_rule(
RuleConfig(
name="unauthorised_climb",
condition=unauthorised_climb,
severity="critical",
cooldown_seconds=0.0,
action=lambda ctx: print("ALERT: Unauthorised cabinet access!"),
)
)
pipeline.add_rule(
RuleConfig(
name="structural_overload",
condition=structural_overload,
severity="critical",
cooldown_seconds=5.0,
)
)
# Simulate readings
now = time.time()
hour = 13 # Outside maintenance window
readings = [
SensorReading(sensor_name="rf_temp", timestamp=now, value=62.0, unit="C"),
SensorReading(sensor_name="leg_vibration", timestamp=now, value=0.35, unit="g"),
SensorReading(sensor_name="cabinet_door", timestamp=now, value=1.0, unit="bool"),
SensorReading(sensor_name="dc_current", timestamp=now, value=28.0, unit="A"),
]
# Note: The multi-sensor callable rules expect a TowerContext, not a SensorReading.
# In production, the EdgeAgent assembles the context object before rule evaluation.
# Here we demonstrate the pipeline running the scalar rules against readings.
result = pipeline.run(readings)
print(f"Processed {result.readings_processed} readings, {len(result.events)} events")
for ev in result.events:
print(f" [{ev.severity}] {ev.rule_name}")
# MQTT egress (connection skipped if broker unavailable)
mqtt = MQTTAdapter()
try:
mqtt.connect(broker="noc-broker.pyvorin.com", port=8883,
topic="towers/TWR-42/alerts", qos=2)
for ev in result.events:
payload = json.dumps({
"tower_id": "TWR-42",
"rule": ev.rule_name,
"severity": ev.severity,
"timestamp": ev.timestamp,
})
mqtt.publish("towers/TWR-42/alerts", payload, qos=2)
except Exception as exc:
print(f"MQTT not available in demo: {exc}")
if __name__ == "__main__":
main()
Summary
You now have a telecom-tower pipeline that polls SNMP equipment, ingests multi-sensor data, enforces life-safety rules with zero-cooldown critical alerts, and streams events to the NOC over MQTT. In production, harden the gateway with IPsec to the NOC, use SNMP v3 with AES-256 encryption, and mount vibration sensors with Loctite to prevent loosening under wind load.