SensorReading API Reference
Complete reference for SensorReading, including fields, serialization, batch operations, timestamp handling, and unit conversion helpers.
Published Jun 2, 2026
Overview
SensorReading is the fundamental data carrier in the Pyvorin Edge SDK. It represents a single measurement from a sensor at a specific point in time. The class is implemented as a frozen dataclass in the edge runtime and as a regular dataclass in the edge SDK, ensuring immutability where thread safety is critical.
There are two definitions in the codebase:
pyv_edge_agent.types.SensorReading— frozen, used by the runtime agent.pyvorin_edge.sensors.SensorReading— mutable, used by the SDK for pipeline construction and simulation.
Both have identical field names and semantics. This article covers both implementations.
Fields
@dataclass(frozen=True) # frozen in edge_runtime, regular in edge_sdk
class SensorReading:
sensor_name: str
timestamp: float
value: float
unit: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
| Field | Type | Default | Description |
|---|---|---|---|
sensor_name | str | required | Unique identifier for the sensor that produced this reading. Must match a registered sensor name in the pipeline. |
timestamp | float | required | Unix timestamp in seconds (UTC). Can be generated with time.time(). |
value | float | required | The measured sensor value. Always stored as a float even for integer sensors. |
unit | str | "" | Physical unit of the measurement (e.g., "C", "bar", "g", "V"). |
metadata | Dict[str, Any] | {} | Optional key-value pairs for additional context such as location, firmware version, or diagnostic flags. |
Construction
from pyvorin_edge.sensors import SensorReading
import time
# Minimal reading
reading = SensorReading(
sensor_name="boiler_temp",
timestamp=time.time(),
value=82.5,
)
# Full reading with metadata
reading = SensorReading(
sensor_name="boiler_temp",
timestamp=time.time(),
value=82.5,
unit="C",
metadata={"location": "basement", "firmware": "v2.1.0"},
)
Serialization
to_dict()
def to_dict(self) -> Dict[str, Any]
Returns a JSON-serializable dictionary representation:
reading = SensorReading(sensor_name="tank", timestamp=1717234567.0, value=3.2, unit="bar")
print(reading.to_dict())
# {
# "sensor_name": "tank",
# "timestamp": 1717234567.0,
# "value": 3.2,
# "unit": "bar",
# "metadata": {}
# }
from_dict()
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "SensorReading"
Reconstructs a SensorReading from a dictionary. Missing keys use defaults:
data = {
"sensor_name": "motor_vib",
"timestamp": 1717234567.0,
"value": 0.45,
"unit": "g",
"metadata": {"axis": "z"}
}
reading = SensorReading.from_dict(data)
print(reading.sensor_name) # motor_vib
Batch Operations
When processing high-frequency sensor streams, you will often work with lists of readings. Here are common batch patterns:
Filtering by Sensor Name
readings: list[SensorReading] = [...]
boiler_readings = [r for r in readings if r.sensor_name == "boiler_temp"]
Filtering by Time Range
from_time = 1717234000.0
to_time = 1717235000.0
window = [r for r in readings if from_time <= r.timestamp <= to_time]
Extracting Values for Statistics
values = [r.value for r in readings if r.sensor_name == "boiler_temp"]
mean = sum(values) / len(values) if values else 0.0
max_val = max(values) if values else 0.0
Serializing a Batch
import json
batch_dicts = [r.to_dict() for r in readings]
json_payload = json.dumps(batch_dicts)
Deserializing a Batch
import json
json_payload = '[{"sensor_name":"tank","timestamp":1717234567.0,"value":3.2,"unit":"bar"}]'
readings = [SensorReading.from_dict(d) for d in json.loads(json_payload)]
Working with Timestamps
The timestamp field is a Unix timestamp in seconds (floating point). This format is timezone-agnostic and compatible with Python's time.time(), NumPy's datetime64, and most databases.
Generating Timestamps
import time
from datetime import datetime, timezone
# Current time
now = time.time()
reading = SensorReading(sensor_name="s1", timestamp=now, value=1.0)
# From a datetime object
dt = datetime(2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc)
ts = dt.timestamp()
reading = SensorReading(sensor_name="s1", timestamp=ts, value=1.0)
Formatting for Display
from datetime import datetime, timezone
def format_reading_time(reading: SensorReading) -> str:
dt = datetime.fromtimestamp(reading.timestamp, tz=timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
print(format_reading_time(reading)) # 2024-06-01 12:00:00 UTC
Sorting by Timestamp
readings.sort(key=lambda r: r.timestamp)
# or in reverse chronological order
readings.sort(key=lambda r: r.timestamp, reverse=True)
Unit Conversion Helpers
While SensorReading does not embed unit conversion logic, the SDK encourages consistent unit handling via helper functions:
Temperature Conversions
def celsius_to_fahrenheit(reading: SensorReading) -> SensorReading:
if reading.unit == "C":
return SensorReading(
sensor_name=reading.sensor_name,
timestamp=reading.timestamp,
value=reading.value * 9.0 / 5.0 + 32.0,
unit="F",
metadata=reading.metadata,
)
return reading
def fahrenheit_to_celsius(reading: SensorReading) -> SensorReading:
if reading.unit == "F":
return SensorReading(
sensor_name=reading.sensor_name,
timestamp=reading.timestamp,
value=(reading.value - 32.0) * 5.0 / 9.0,
unit="C",
metadata=reading.metadata,
)
return reading
Pressure Conversions
def bar_to_psi(reading: SensorReading) -> SensorReading:
if reading.unit == "bar":
return SensorReading(
sensor_name=reading.sensor_name,
timestamp=reading.timestamp,
value=reading.value * 14.5038,
unit="psi",
metadata=reading.metadata,
)
return reading
Generic Conversion Pipeline
from typing import Callable
Converter = Callable[[SensorReading], SensorReading]
def convert_batch(readings: list[SensorReading], converter: Converter) -> list[SensorReading]:
return [converter(r) for r in readings]
# Usage
readings_c = [SensorReading("t1", 1.0, 25.0, "C")]
readings_f = convert_batch(readings_c, celsius_to_fahrenheit)
print(readings_f[0].value) # 77.0
print(readings_f[0].unit) # F
Validation with SensorRegistry
SensorRegistry can validate that a reading conforms to its registered sensor definition:
from pyvorin_edge.sensors import SensorRegistry, Sensor, SensorType, SensorReading
registry = SensorRegistry()
registry.register(Sensor(name="duct", sensor_type=SensorType.TEMPERATURE, unit="C"))
valid = SensorReading(sensor_name="duct", timestamp=1.0, value=22.0, unit="C")
invalid = SensorReading(sensor_name="duct", timestamp=1.0, value=22.0, unit="F")
print(registry.validate_reading(valid)) # True
print(registry.validate_reading(invalid)) # False (wrong unit)
Integration with EdgeAgent
Inside EdgeAgent.run_once(), readings are created from adapter values, filtered by privacy rules, stored in ring buffers and SQLite, and enqueued for cloud sync:
# From pyv_edge_agent.main — simplified excerpt
value = self._get_reading_value(name)
reading = SensorReading(sensor_name=name, timestamp=now, value=value)
filtered = self._privacy.evaluate(reading)
if filtered:
buf.append(filtered)
self._store.store_reading(filtered)
self._cloud.enqueue(filtered)
self._readings_processed += 1
Immutability Considerations
The edge runtime's SensorReading is frozen. Attempting to modify a field after construction raises dataclasses.FrozenInstanceError:
from pyv_edge_agent.types import SensorReading
reading = SensorReading(sensor_name="s1", timestamp=1.0, value=10.0)
# reading.value = 20.0 # FrozenInstanceError!
To "modify" a frozen reading, create a new instance with the updated field:
updated = SensorReading(
sensor_name=reading.sensor_name,
timestamp=reading.timestamp,
value=20.0,
unit=reading.unit,
metadata=reading.metadata,
)