FileReplayAdapter — Time-Travel for Sensor Data
Replay CSV and JSONL log files with timestamp parsing, real-time pacing, speed control, and loop mode for testing and regression.
Published Jun 2, 2026
What Is File Replay?
The FileReplayAdapter turns historical log files into live sensor streams. It reads JSON Lines (.jsonl) or comma-separated values (.csv), parses timestamps, normalises rows into SensorReading dicts, and optionally paces output according to the original inter-arrival times. This makes it the perfect tool for regression testing, demos, and pipeline development when hardware is offline.
Supported Formats
| Format | Extension | Requirements |
|---|---|---|
| JSON Lines | .jsonl | Each line is a JSON object with sensor_name, timestamp, and value. |
| CSV | .csv | Header row containing at least sensor_name, timestamp, and value. |
Timestamp Parsing
The adapter accepts three timestamp representations and normalises them all to epoch seconds (float):
- ISO 8601 strings — e.g.,
2024-05-31T12:00:00Zor2024-05-31T12:00:00+00:00 - Epoch seconds — e.g.,
1717152000.0 - Epoch milliseconds — values greater than
1e11are automatically divided by 1000
from pyv_edge_agent.ingest import FileReplayAdapter
# Direct parsing helper
print(FileReplayAdapter._parse_timestamp("2024-05-31T12:00:00Z")) # 1717156800.0
print(FileReplayAdapter._parse_timestamp(1717152000)) # 1717152000.0
print(FileReplayAdapter._parse_timestamp(1717152000000)) # 1717152000.0
Reading JSONL
read_jsonl() yields one SensorReading dict per line. It is a generator, so memory usage stays constant regardless of file size.
# /data/readings.jsonl
# {"sensor_name": "t1", "timestamp": "2024-05-31T12:00:00Z", "value": 22.5, "unit": "°C"}
# {"sensor_name": "t1", "timestamp": "2024-05-31T12:01:00Z", "value": 23.0, "unit": "°C"}
adapter = FileReplayAdapter()
for reading in adapter.read_jsonl("/data/readings.jsonl"):
print(reading["timestamp"], reading["value"])
Reading CSV
read_csv() uses the standard library csv.DictReader. All values are coerced to strings first, then parsed to the correct types.
# /data/readings.csv
# sensor_name,timestamp,value,unit
# t1,2024-05-31T12:00:00Z,22.5,°C
# t1,2024-05-31T12:01:00Z,23.0,°C
for reading in adapter.read_csv("/data/readings.csv"):
pipeline.run([reading])
Speed Control
The replay() method paces output according to the delta between consecutive timestamps. A speed_multiplier of 1.0 means real-time; 10.0 replays ten times faster. Values between 0 and 1 slow playback down.
import time
adapter = FileReplayAdapter()
# Real-time replay
start = time.monotonic()
for reading in adapter.replay("/data/readings.jsonl", speed_multiplier=1.0):
pipeline.run([reading])
real_duration = time.monotonic() - start
# 10× accelerated — useful for CI
for reading in adapter.replay("/data/readings.jsonl", speed_multiplier=10.0):
pipeline.run([reading])
Loop Mode
The adapter does not have a built-in loop flag, but you can wrap the generator in an infinite loop for kiosk demos or soak testing.
import itertools
def loop_replay(adapter: FileReplayAdapter, path: str, speed_multiplier: float = 1.0):
"""Replay a file forever."""
while True:
yield from adapter.replay(path, speed_multiplier)
# Demo kiosk: replay one hour of data in a 6-minute loop
for reading in loop_replay(adapter, "/data/one_hour.jsonl", speed_multiplier=10.0):
update_dashboard(reading)
Validation
Before replaying, call validate_format() to catch missing columns, invalid JSON, or empty files. This is especially useful in CI pipelines where test data is generated by earlier build stages.
from pyv_edge_agent.ingest import FileReplayAdapter
try:
FileReplayAdapter.validate_format("/data/readings.jsonl")
print("Format OK")
except (FileNotFoundError, ValueError) as exc:
print(f"Validation failed: {exc}")
Use Cases
Regression Testing
Capture a known-bad event sequence from production, save it as JSONL, and assert that your pipeline emits the expected events.
import pytest
from pyvorin_edge.pipeline import Pipeline
from pyv_edge_agent.ingest import FileReplayAdapter
def test_known_anomaly_pipeline():
pipeline = build_production_pipeline()
adapter = FileReplayAdapter()
events = []
for reading in adapter.replay("tests/fixtures/anomaly_2024-05-31.jsonl", speed_multiplier=100.0):
result = pipeline.run([reading])
events.extend(result.events)
assert any(e.rule_name == "vibration_anomaly" for e in events)
Demos
Replay a rich dataset at 50× speed while a stakeholder watches the dashboard update in real time.
for reading in adapter.replay("demo_dataset.jsonl", speed_multiplier=50.0):
websocket.broadcast(reading)
Load Testing
Disable pacing entirely by using read_jsonl() or read_csv() directly, then fire readings into the pipeline as fast as the CPU allows.
import time
adapter = FileReplayAdapter()
readings = list(adapter.read_jsonl("/data/large_file.jsonl"))
start = time.perf_counter()
result = pipeline.run(readings)
elapsed = time.perf_counter() - start
throughput = len(readings) / elapsed
print(f"Throughput: {throughput:.0f} readings/sec")
Metadata Handling
If the metadata column in a CSV is a JSON-encoded string, the adapter automatically decodes it. If decoding fails, the metadata falls back to an empty dict so the pipeline never crashes on malformed annotations.
# CSV row with JSON metadata string
# sensor_name,timestamp,value,unit,metadata
# t1,1717152000,22.5,°C,"{""calibrated"": true}"
reading = next(adapter.read_csv("/data/meta.csv"))
print(reading["metadata"]) # {'calibrated': True}
Best Practices
- Sort files by timestamp before replaying; the pacing logic assumes monotonic order.
- Keep files under 1 GB for real-time replay. For larger datasets, preprocess with
read_jsonl()and batch into the pipeline. - Commit fixture files to version control alongside regression tests so that pipeline changes are always tested against the same ground truth.