Window Aggregation — Rolling, Tumbling, and Custom Functions
Configure sliding, tumbling, and session windows. Compute mean, max, min, sum, count, and stddev with real code examples.
Published Jun 2, 2026
Why Windows Matter
Raw sensor readings are noisy. A single outlier should not wake an on-call engineer at 3 AM. Windows aggregate readings over time, smoothing transient spikes and exposing genuine trends. The Pyvorin Edge SDK provides two built-in window types—RollingWindow and TumblingWindow—plus an extension point for custom aggregation functions.
Window Types at a Glance
| Type | Overlap | Use Case |
|---|---|---|
| Rolling (Sliding) | Yes | Real-time anomaly detection where recent context matters. |
| Tumbling | No | Periodic reporting, billing intervals, or discrete snapshots. |
RollingWindow
A RollingWindow accumulates every reading whose timestamp falls within a fixed duration from the newest sample. Because the SDK does not auto-expire stale readings, you must call slide(cutoff_timestamp) manually or let the Pipeline hot loop manage it.
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading
window = RollingWindow(duration_seconds=300.0, sensor_name="turbine_vibration")
# Ingest a batch
for i in range(20):
window.add_reading(SensorReading(
sensor_name="turbine_vibration",
timestamp=1717152000.0 + i * 30.0,
value=2.5 + (i % 3),
unit="mm/s",
))
print(f"Readings in window: {len(window)}")
print(f"Window full? {window.is_full()}") # True once span >= 300s
result = window.compute()
print(f"Mean: {result.mean:.2f}")
print(f"Max: {result.max:.2f}")
print(f"Min: {result.min:.2f}")
print(f"Std: {result.std:.2f}")
print(f"Sum: {result.sum:.2f}")
print(f"Count: {result.count}")
TumblingWindow
A TumblingWindow is non-overlapping. Its start time is anchored to the first reading it receives. Once the time span from that anchor exceeds duration_seconds, the window is considered full. After computing aggregates, call reset() to begin the next tumbling bucket.
from pyvorin_edge.windows import TumblingWindow
from pyvorin_edge.sensors import SensorReading
window = TumblingWindow(duration_seconds=60.0, sensor_name="flow_rate")
readings = [
SensorReading("flow_rate", 1717152000.0 + i * 10, 45.0 + i, "L/min")
for i in range(12)
]
for r in readings:
window.add_reading(r)
if window.is_full():
result = window.compute()
print(f"Bucket complete: mean={result.mean:.1f}, max={result.max:.1f}")
window.reset()
WindowResult
Both window types return a WindowResult dataclass with the following fields:
| Field | Type | Description |
|---|---|---|
sensor_name | str | None | Filter sensor, or None for all. |
window_type | str | "rolling" or "tumbling". |
duration_seconds | float | Configured window duration. |
count | int | Number of readings aggregated. |
mean | float | None | Arithmetic mean. |
max | float | None | Maximum value. |
min | float | None | Minimum value. |
std | float | None | Sample standard deviation (0.0 if count == 1). |
sum | float | None | Sum of all values. |
custom | dict | User-defined aggregation outputs. |
Aggregation Functions
Built-in aggregates are computed automatically by compute(). You do not pass the function name as a string; instead, the result object simply contains every built-in metric. If you only need one metric, ignore the rest—overhead is negligible for window sizes under 10,000 readings.
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading
window = RollingWindow(duration_seconds=120.0)
values = [12.0, 15.0, 14.0, 18.0, 11.0]
for i, v in enumerate(values):
window.add_reading(SensorReading("demo", 1717152000.0 + i, v, "V"))
res = window.compute()
print(f"mean={res.mean}, max={res.max}, min={res.min}, "
f"sum={res.sum}, count={res.count}, std={res.std}")
# mean=14.0, max=18.0, min=11.0, sum=70.0, count=5, std=2.7386...
Custom Aggregation
For domain-specific metrics—such as vibration RMS, kurtosis, or energy cost estimates—supply a dictionary of callables to the window constructor. Each function receives a list[float] of windowed values and can return any JSON-serialisable type. Errors are caught and stored as strings in the custom dict so that one failing aggregation does not crash the entire window.
import math
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading
def rms(values: list[float]) -> float:
"""Root-mean-square of a vibration signal."""
if not values:
return 0.0
return math.sqrt(sum(v * v for v in values) / len(values))
def cost_estimate(values: list[float]) -> float:
"""Rough energy cost at £0.15 per kWh assuming values are kW."""
if not values:
return 0.0
avg_kw = sum(values) / len(values)
hours = 1.0 # 1-hour window assumed
kwh = avg_kw * hours
return round(kwh * 0.15, 2)
window = RollingWindow(
duration_seconds=3600.0,
sensor_name="mains_power",
custom_aggregation={"rms": rms, "cost_gbp": cost_estimate},
)
for i in range(60):
window.add_reading(SensorReading(
sensor_name="mains_power",
timestamp=1717152000.0 + i * 60.0,
value=4.5 + math.sin(i / 10.0),
unit="kW",
))
result = window.compute()
print(result.custom)
# {'rms': 4.623..., 'cost_gbp': 0.69}
Session Windows
The SDK does not provide a dedicated SessionWindow class, but you can emulate session semantics with a RollingWindow and a gap-detection wrapper. A session ends when the gap between consecutive readings exceeds a threshold.
from pyvorin_edge.windows import RollingWindow
from pyvorin_edge.sensors import SensorReading
class SessionWindow:
def __init__(self, gap_seconds: float, sensor_name: str | None = None):
self.gap_seconds = gap_seconds
self.sensor_name = sensor_name
self.window = RollingWindow(duration_seconds=gap_seconds, sensor_name=sensor_name)
self._last_ts: float | None = None
def add_reading(self, reading: SensorReading) -> bool:
"""Returns True if a new session was started."""
if self._last_ts is not None and (reading.timestamp - self._last_ts) > self.gap_seconds:
self.window.reset()
new_session = True
else:
new_session = False
self.window.add_reading(reading)
self._last_ts = reading.timestamp
return new_session
def compute(self):
return self.window.compute()
session = SessionWindow(gap_seconds=300.0, sensor_name="pir_motion")
for i, ts in enumerate([0, 60, 120, 600, 660]):
reading = SensorReading("pir_motion", 1717152000.0 + ts, 1.0, "count")
if session.add_reading(reading):
print(f"New session started at reading {i}")
Window Lifecycle in a Pipeline
When you call pipeline.add_window(), the pipeline instantiates the concrete window object and stores it in _window_objects. During run(), every reading is appended to every window whose sensor_name matches (or is None, meaning "all sensors"). You do not need to manage slide() or reset() manually unless you are using windows outside of a pipeline.
from pyvorin_edge.pipeline import Pipeline, WindowConfig
from pyvorin_edge.sensors import Sensor, SensorType
pipeline = Pipeline("factory_floor")
pipeline.add_sensor(Sensor("press_force", SensorType.PRESSURE, "kN"))
pipeline.add_window(WindowConfig(duration_seconds=60.0, window_type="tumbling", sensor_name="press_force"))
pipeline.add_window(WindowConfig(duration_seconds=300.0, window_type="rolling", sensor_name="press_force"))
Performance Considerations
- Window memory grows unbounded until you call
reset()orslide(). In long-running edge runtimes, slide windows periodically. - Standard deviation uses
statistics.stdev(sample stddev, N-1 denominator). For populations, supply a custom aggregation. - Custom aggregations are executed synchronously inside
compute(). Keep them O(N) or offload heavy math to a background thread.