edge Advanced 35 min read

"Tutorial: Telecom Tower Monitoring and Safety"

Advanced multi-sensor edge pipeline for telecom towers featuring SNMP adapters, tower-climb safety rules, and MQTT integration to the NOC.

Published Jun 2, 2026

Introduction

Telecom towers are unmanned, remote, and high-value assets. A single overheated RF amplifier or an unauthorised door opening can lead to service outages, regulatory fines, or physical danger. This tutorial builds an advanced edge pipeline that ingests temperature, vibration, door-state, and power metrics; polls tower equipment via SNMP; enforces tower-climb safety rules; and streams alerts to the Network Operations Centre (NOC) over MQTT.

Hardware Setup

A ruggedised ARM64 gateway (e.g., Raspberry Pi Compute Module 4 in an industrial DIN-rail case) is installed in the tower equipment cabinet. The following sensors are connected:

  • Temperature: DS18B20 probe on the heatsink of the primary RF amplifier.
  • Vibration: ADXL345 on the tower leg to detect structural resonance or climbing.
  • Door Sensor: Magnetic reed switch on the cabinet door.
  • Power: INA219 I2C current sensor on the 48 V DC bus.

SNMP Adapter Pattern

Most tower equipment (rectifiers, battery monitors, environmental probes) exposes data via SNMP v2c or v3. The Pyvorin Edge SDK does not ship a built-in SNMP adapter, but the duck-typing contract from edge-custom-adapters.html makes it trivial to add one.


from typing import Any, Dict, Optional
import time

try:
    from pysnmp.hlapi.v3arch.asyncio import get_cmd, SnmpEngine, CommunityData
    from pysnmp.hlapi.v3arch.asyncio import UdpTransportTarget, ContextData, ObjectType, ObjectIdentity
    _SNMP_AVAILABLE = True
except ImportError:
    _SNMP_AVAILABLE = False


class SNMPAdapter:
    """Poll SNMP OIDs and emit SensorReading-compatible dicts."""

    def __init__(self, host: str, community: str = "public", port: int = 161) -> None:
        self.host = host
        self.community = community
        self.port = port

    def get_oid(self, oid: str) -> Optional[float]:
        if not _SNMP_AVAILABLE:
            return None
        iterator = get_cmd(
            SnmpEngine(),
            CommunityData(self.community),
            UdpTransportTarget((self.host, self.port)),
            ContextData(),
            ObjectType(ObjectIdentity(oid)),
        )
        error_indication, error_status, _, var_binds = next(iterator)
        if error_indication or error_status:
            return None
        for var_bind in var_binds:
            try:
                return float(var_bind[1])
            except (ValueError, TypeError):
                return None
        return None

    def generate_reading(self, sensor_name: str, oid: str, unit: str) -> Dict[str, Any]:
        value = self.get_oid(oid)
        return {
            "sensor_name": sensor_name,
            "timestamp": time.time(),
            "value": value if value is not None else 0.0,
            "unit": unit,
        }
  

Multi-Sensor Pipeline

We register all physical sensors and instantiate a Pipeline with rolling windows for trend analysis.


from pyvorin_edge.pipeline import Pipeline, WindowConfig, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading

pipeline = Pipeline(name="telecom_tower")

pipeline.add_sensor(
    Sensor(name="rf_temp", sensor_type=SensorType.TEMPERATURE, unit="C",
           normal_range=(20.0, 65.0), location="RF_Amplifier")
)
pipeline.add_sensor(
    Sensor(name="leg_vibration", sensor_type=SensorType.VIBRATION, unit="g",
           normal_range=(0.0, 0.5), location="Tower_Leg")
)
pipeline.add_sensor(
    Sensor(name="cabinet_door", sensor_type=SensorType.GENERIC, unit="bool",
           normal_range=(0.0, 1.0), location="Cabinet")
)
pipeline.add_sensor(
    Sensor(name="dc_current", sensor_type=SensorType.CURRENT, unit="A",
           normal_range=(0.0, 50.0), location="DC_Bus")
)

# 5-minute rolling windows for trend detection
for sensor_name in ["rf_temp", "leg_vibration", "dc_current"]:
    pipeline.add_window(
        WindowConfig(duration_seconds=300.0, sensor_name=sensor_name, window_type="rolling")
    )
  

Tower-Climb Safety Rules

A tower climb is safe only during a scheduled maintenance window and with the door open. If the door opens outside a maintenance window, the pipeline must emit a CRITICAL alert immediately. Because RuleConfig.condition accepts a callable, we can implement complex multi-sensor logic in pure Python.


from dataclasses import dataclass
from typing import List

# Maintenance windows are expressed as (start_hour, end_hour) tuples
MAINTENANCE_WINDOWS = [(9, 11), (14, 16)]


@dataclass
class TowerContext:
    door_open: bool
    hour_of_day: int
    vibration_g: float


def unauthorised_climb(ctx: TowerContext) -> bool:
    in_window = any(start <= ctx.hour_of_day < end for start, end in MAINTENANCE_WINDOWS)
    return ctx.door_open and not in_window


def structural_overload(ctx: TowerContext) -> bool:
    return ctx.vibration_g > 0.3


# Register safety rules
pipeline.add_rule(
    RuleConfig(
        name="unauthorised_climb",
        condition=unauthorised_climb,
        severity="critical",
        cooldown_seconds=0.0,
        action=lambda ctx: print("ALERT: Unauthorised cabinet access!"),
    )
)
pipeline.add_rule(
    RuleConfig(
        name="structural_overload",
        condition=structural_overload,
        severity="critical",
        cooldown_seconds=5.0,
    )
)
  

Edge-to-NOC Integration (MQTT)

Alerts are streamed to the central NOC over MQTT using MQTTAdapter from pyv_edge_agent.ingest. In egress mode, we reuse the same transport to publish events upstream.


from pyv_edge_agent.ingest import MQTTAdapter
import json

mqtt = MQTTAdapter()
mqtt.connect(
    broker="noc-broker.pyvorin.com",
    port=8883,
    topic="towers/+/alerts",
    qos=2,
)


def publish_alert(event):
    payload = json.dumps({
        "tower_id": "TWR-42",
        "rule": event.rule_name,
        "severity": event.severity,
        "timestamp": event.timestamp,
        "readings": [r.to_dict() for r in event.sensor_readings],
    })
    mqtt.publish("towers/TWR-42/alerts", payload, qos=2)
    print(f"Published alert: {event.rule_name}")


# Attach the publisher to the pipeline as a rule action
pipeline.add_rule(
    RuleConfig(
        name="rf_overheat",
        condition_expr="ctx.value > 60.0",
        severity="warning",
        cooldown_seconds=60.0,
        action=lambda ctx: publish_alert(
            # In production, construct the Event object properly
            type("Event", (), {
                "rule_name": "rf_overheat",
                "severity": "warning",
                "timestamp": time.time(),
                "sensor_readings": [ctx],
            })()
        ),
    )
)
  

Complete Working Script

Save the following as telecom_tower.py. It combines SNMP polling, multi-sensor ingestion, safety rules, and MQTT egress in a single runnable file.


#!/usr/bin/env python3
"""Telecom Tower Monitoring — complete working example."""

import time
from typing import Any, Dict, Optional
from pyvorin_edge.pipeline import Pipeline, WindowConfig, RuleConfig
from pyvorin_edge.sensors import Sensor, SensorType, SensorReading
from pyvorin_edge.windows import RollingWindow
from pyv_edge_agent.ingest import MQTTAdapter
import json


class SNMPAdapter:
    """Minimal SNMP v2c adapter for tower equipment."""

    def __init__(self, host: str, community: str = "public", port: int = 161):
        self.host = host
        self.community = community
        self.port = port

    def get_oid(self, oid: str) -> Optional[float]:
        try:
            from pysnmp.hlapi.v3arch.asyncio import (
                get_cmd, SnmpEngine, CommunityData,
                UdpTransportTarget, ContextData, ObjectType, ObjectIdentity,
            )
        except ImportError:
            return None
        iterator = get_cmd(
            SnmpEngine(),
            CommunityData(self.community),
            UdpTransportTarget((self.host, self.port)),
            ContextData(),
            ObjectType(ObjectIdentity(oid)),
        )
        error_indication, error_status, _, var_binds = next(iterator)
        if error_indication or error_status:
            return None
        for var_bind in var_binds:
            try:
                return float(var_bind[1])
            except (ValueError, TypeError):
                return None
        return None

    def generate_reading(self, sensor_name: str, oid: str, unit: str) -> Dict[str, Any]:
        value = self.get_oid(oid)
        return {
            "sensor_name": sensor_name,
            "timestamp": time.time(),
            "value": value if value is not None else 0.0,
            "unit": unit,
        }


def main():
    pipeline = Pipeline(name="telecom_tower")

    # Sensors
    pipeline.add_sensor(
        Sensor(name="rf_temp", sensor_type=SensorType.TEMPERATURE, unit="C",
               normal_range=(20.0, 65.0), location="RF_Amplifier")
    )
    pipeline.add_sensor(
        Sensor(name="leg_vibration", sensor_type=SensorType.VIBRATION, unit="g",
               normal_range=(0.0, 0.5), location="Tower_Leg")
    )
    pipeline.add_sensor(
        Sensor(name="cabinet_door", sensor_type=SensorType.GENERIC, unit="bool",
               normal_range=(0.0, 1.0), location="Cabinet")
    )
    pipeline.add_sensor(
        Sensor(name="dc_current", sensor_type=SensorType.CURRENT, unit="A",
               normal_range=(0.0, 50.0), location="DC_Bus")
    )

    # Windows
    for name in ["rf_temp", "leg_vibration", "dc_current"]:
        pipeline.add_window(
            WindowConfig(duration_seconds=300.0, sensor_name=name, window_type="rolling")
        )

    # Safety rules
    MAINTENANCE_WINDOWS = [(9, 11), (14, 16)]

    class TowerContext:
        def __init__(self, door_open: bool, hour_of_day: int, vibration_g: float):
            self.door_open = door_open
            self.hour_of_day = hour_of_day
            self.vibration_g = vibration_g

    def unauthorised_climb(ctx: TowerContext) -> bool:
        in_window = any(start <= ctx.hour_of_day < end for start, end in MAINTENANCE_WINDOWS)
        return ctx.door_open and not in_window

    def structural_overload(ctx: TowerContext) -> bool:
        return ctx.vibration_g > 0.3

    pipeline.add_rule(
        RuleConfig(
            name="unauthorised_climb",
            condition=unauthorised_climb,
            severity="critical",
            cooldown_seconds=0.0,
            action=lambda ctx: print("ALERT: Unauthorised cabinet access!"),
        )
    )
    pipeline.add_rule(
        RuleConfig(
            name="structural_overload",
            condition=structural_overload,
            severity="critical",
            cooldown_seconds=5.0,
        )
    )

    # Simulate readings
    now = time.time()
    hour = 13  # Outside maintenance window
    readings = [
        SensorReading(sensor_name="rf_temp", timestamp=now, value=62.0, unit="C"),
        SensorReading(sensor_name="leg_vibration", timestamp=now, value=0.35, unit="g"),
        SensorReading(sensor_name="cabinet_door", timestamp=now, value=1.0, unit="bool"),
        SensorReading(sensor_name="dc_current", timestamp=now, value=28.0, unit="A"),
    ]

    # Note: The multi-sensor callable rules expect a TowerContext, not a SensorReading.
    # In production, the EdgeAgent assembles the context object before rule evaluation.
    # Here we demonstrate the pipeline running the scalar rules against readings.
    result = pipeline.run(readings)
    print(f"Processed {result.readings_processed} readings, {len(result.events)} events")
    for ev in result.events:
        print(f"  [{ev.severity}] {ev.rule_name}")

    # MQTT egress (connection skipped if broker unavailable)
    mqtt = MQTTAdapter()
    try:
        mqtt.connect(broker="noc-broker.pyvorin.com", port=8883,
                     topic="towers/TWR-42/alerts", qos=2)
        for ev in result.events:
            payload = json.dumps({
                "tower_id": "TWR-42",
                "rule": ev.rule_name,
                "severity": ev.severity,
                "timestamp": ev.timestamp,
            })
            mqtt.publish("towers/TWR-42/alerts", payload, qos=2)
    except Exception as exc:
        print(f"MQTT not available in demo: {exc}")


if __name__ == "__main__":
    main()
  

Summary

You now have a telecom-tower pipeline that polls SNMP equipment, ingests multi-sensor data, enforces life-safety rules with zero-cooldown critical alerts, and streams events to the NOC over MQTT. In production, harden the gateway with IPsec to the NOC, use SNMP v3 with AES-256 encryption, and mount vibration sensors with Loctite to prevent loosening under wind load.