edge 14 min read

Window Aggregation — Rolling, Tumbling, and Custom Functions

Configure sliding, tumbling, and session windows. Compute mean, max, min, sum, count, and stddev with real code examples.

Published Jun 2, 2026

Why Windows Matter

Raw sensor readings are noisy. A single outlier should not wake an on-call engineer at 3 AM. Windows aggregate readings over time, smoothing transient spikes and exposing genuine trends. The Pyvorin Edge SDK provides two built-in window types—RollingWindow and TumblingWindow—plus an extension point for custom aggregation functions.

Window Types at a Glance

TypeOverlapUse Case
Rolling (Sliding)YesReal-time anomaly detection where recent context matters.
TumblingNoPeriodic reporting, billing intervals, or discrete snapshots.

RollingWindow

A RollingWindow accumulates every reading whose timestamp falls within a fixed duration from the newest sample. Because the SDK does not auto-expire stale readings, you must call slide(cutoff_timestamp) manually or let the Pipeline hot loop manage it.


from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading

window = RollingWindow(duration_seconds=300.0, sensor_name="turbine_vibration")

# Ingest a batch
for i in range(20):
    window.add_reading(SensorReading(
        sensor_name="turbine_vibration",
        timestamp=1717152000.0 + i * 30.0,
        value=2.5 + (i % 3),
        unit="mm/s",
    ))

print(f"Readings in window: {len(window)}")
print(f"Window full? {window.is_full()}")  # True once span >= 300s

result = window.compute()
print(f"Mean: {result.mean:.2f}")
print(f"Max:  {result.max:.2f}")
print(f"Min:  {result.min:.2f}")
print(f"Std:  {result.std:.2f}")
print(f"Sum:  {result.sum:.2f}")
print(f"Count: {result.count}")
  

TumblingWindow

A TumblingWindow is non-overlapping. Its start time is anchored to the first reading it receives. Once the time span from that anchor exceeds duration_seconds, the window is considered full. After computing aggregates, call reset() to begin the next tumbling bucket.


from pyvorin_edge.windows import TumblingWindow
from pyvorin_edge.sensors import SensorReading

window = TumblingWindow(duration_seconds=60.0, sensor_name="flow_rate")

readings = [
    SensorReading("flow_rate", 1717152000.0 + i * 10, 45.0 + i, "L/min")
    for i in range(12)
]

for r in readings:
    window.add_reading(r)
    if window.is_full():
        result = window.compute()
        print(f"Bucket complete: mean={result.mean:.1f}, max={result.max:.1f}")
        window.reset()
  

WindowResult

Both window types return a WindowResult dataclass with the following fields:

FieldTypeDescription
sensor_namestr | NoneFilter sensor, or None for all.
window_typestr"rolling" or "tumbling".
duration_secondsfloatConfigured window duration.
countintNumber of readings aggregated.
meanfloat | NoneArithmetic mean.
maxfloat | NoneMaximum value.
minfloat | NoneMinimum value.
stdfloat | NoneSample standard deviation (0.0 if count == 1).
sumfloat | NoneSum of all values.
customdictUser-defined aggregation outputs.

Aggregation Functions

Built-in aggregates are computed automatically by compute(). You do not pass the function name as a string; instead, the result object simply contains every built-in metric. If you only need one metric, ignore the rest—overhead is negligible for window sizes under 10,000 readings.


from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading

window = RollingWindow(duration_seconds=120.0)
values = [12.0, 15.0, 14.0, 18.0, 11.0]
for i, v in enumerate(values):
    window.add_reading(SensorReading("demo", 1717152000.0 + i, v, "V"))

res = window.compute()
print(f"mean={res.mean}, max={res.max}, min={res.min}, "
      f"sum={res.sum}, count={res.count}, std={res.std}")
# mean=14.0, max=18.0, min=11.0, sum=70.0, count=5, std=2.7386...
  

Custom Aggregation

For domain-specific metrics—such as vibration RMS, kurtosis, or energy cost estimates—supply a dictionary of callables to the window constructor. Each function receives a list[float] of windowed values and can return any JSON-serialisable type. Errors are caught and stored as strings in the custom dict so that one failing aggregation does not crash the entire window.


import math
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading

def rms(values: list[float]) -> float:
    """Root-mean-square of a vibration signal."""
    if not values:
        return 0.0
    return math.sqrt(sum(v * v for v in values) / len(values))

def cost_estimate(values: list[float]) -> float:
    """Rough energy cost at £0.15 per kWh assuming values are kW."""
    if not values:
        return 0.0
    avg_kw = sum(values) / len(values)
    hours = 1.0  # 1-hour window assumed
    kwh = avg_kw * hours
    return round(kwh * 0.15, 2)

window = RollingWindow(
    duration_seconds=3600.0,
    sensor_name="mains_power",
    custom_aggregation={"rms": rms, "cost_gbp": cost_estimate},
)

for i in range(60):
    window.add_reading(SensorReading(
        sensor_name="mains_power",
        timestamp=1717152000.0 + i * 60.0,
        value=4.5 + math.sin(i / 10.0),
        unit="kW",
    ))

result = window.compute()
print(result.custom)
# {'rms': 4.623..., 'cost_gbp': 0.69}
  

Session Windows

The SDK does not provide a dedicated SessionWindow class, but you can emulate session semantics with a RollingWindow and a gap-detection wrapper. A session ends when the gap between consecutive readings exceeds a threshold.


from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading

class SessionWindow:
    def __init__(self, gap_seconds: float, sensor_name: str | None = None):
        self.gap_seconds = gap_seconds
        self.sensor_name = sensor_name
        self.window = RollingWindow(duration_seconds=gap_seconds, sensor_name=sensor_name)
        self._last_ts: float | None = None

    def add_reading(self, reading: SensorReading) -> bool:
        """Returns True if a new session was started."""
        if self._last_ts is not None and (reading.timestamp - self._last_ts) > self.gap_seconds:
            self.window.reset()
            new_session = True
        else:
            new_session = False
        self.window.add_reading(reading)
        self._last_ts = reading.timestamp
        return new_session

    def compute(self):
        return self.window.compute()

session = SessionWindow(gap_seconds=300.0, sensor_name="pir_motion")
for i, ts in enumerate([0, 60, 120, 600, 660]):
    reading = SensorReading("pir_motion", 1717152000.0 + ts, 1.0, "count")
    if session.add_reading(reading):
        print(f"New session started at reading {i}")
  

Window Lifecycle in a Pipeline

When you call pipeline.add_window(), the pipeline instantiates the concrete window object and stores it in _window_objects. During run(), every reading is appended to every window whose sensor_name matches (or is None, meaning "all sensors"). You do not need to manage slide() or reset() manually unless you are using windows outside of a pipeline.


from pyvorin_edge.pipeline import Pipeline, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType

pipeline = Pipeline("factory_floor")
pipeline.add_sensor(Sensor("press_force", SensorType.PRESSURE, "kN"))
pipeline.add_window(WindowConfig(duration_seconds=60.0,  window_type="tumbling", sensor_name="press_force"))
pipeline.add_window(WindowConfig(duration_seconds=300.0, window_type="rolling",  sensor_name="press_force"))
  

Performance Considerations

  • Window memory grows unbounded until you call reset() or slide(). In long-running edge runtimes, slide windows periodically.
  • Standard deviation uses statistics.stdev (sample stddev, N-1 denominator). For populations, supply a custom aggregation.
  • Custom aggregations are executed synchronously inside compute(). Keep them O(N) or offload heavy math to a background thread.