GDPR and NIS2 Compliance
Implement GDPR Article 30 records of processing, data retention policies, right to erasure, breach notification, and NIS2 technical measures using Pyvorin Edge privacy and audit APIs.
Published Jun 2, 2026
Compliance at the Edge
Edge computing changes the compliance landscape. When personal data is collected, processed, and stored on a device inside the European Union, GDPR applies in full. When that device is part of critical infrastructure — a hospital, a water treatment plant, an energy grid — the NIS2 Directive imposes additional security and incident-reporting obligations. The Pyvorin Edge Runtime provides the building blocks you need to satisfy both frameworks: tamper-evident audit chains, configurable data retention, pseudonymisation, and automated breach detection.
This article shows you how to implement GDPR Article 30 records of processing, enforce data retention policies, handle right-to-erasure requests, set up automated breach notification, and map your controls to NIS2 Article 21 technical measures.
GDPR Article 30 — Records of Processing Activities
Article 30 requires controllers and processors to maintain a record of every processing activity
under their responsibility. The record must include: the purpose, categories of data subjects and
data, recipients, retention periods, and security measures. The PrivacyAudit class
in pyv_edge_agent/privacy_firewall/audit.py provides
export_for_gdpr(), which transforms the tamper-evident audit chain into exactly
this format.
from pyv_edge_agent.privacy_firewall.audit import PrivacyAudit
import json
audit = PrivacyAudit(db_path="/var/lib/pyvorin/edge_store.db")
# Log a processing activity with full GDPR metadata
audit.log_action(
action="sensor_batch_processed",
details={
"purpose": "Environmental monitoring for HVAC optimisation",
"data_subjects": ["building_occupants"],
"data_categories": ["temperature_reading", "humidity_reading", "occupancy_count"],
"recipients": ["cloud_analytics", "facilities_dashboard"],
"retention_period": "90_days",
"security_measures": [
"tls_1_3_mtls",
"sha256_audit_chain",
"field_redaction_for_pii",
"encrypted_local_storage",
],
},
)
# Export all records in Article 30 format
gdpr_records = audit.export_for_gdpr()
with open("/tmp/gdpr_article_30_export.json", "w", encoding="utf-8") as f:
json.dump(gdpr_records, f, indent=2, default=str)
print(f"Exported {len(gdpr_records)} processing records.")
Structure of the GDPR Export
Each record returned by export_for_gdpr() contains:
record_id— The primary key from theprivacy_audittable.processing_activity— Theactionstring (e.g.,sensor_batch_processed).timestamp_iso— UTC timestamp in ISO 8601 format.purpose— Extracted fromdetails["purpose"].data_subjects— List of subject categories.data_categories— List of data category strings.recipients— Internal or external recipients of the data.retention_period— Human-readable retention description.security_measures— List of controls applied during processing.additional_details— The full rawdetailsdict for audit depth.
[
{
"record_id": 42,
"processing_activity": "sensor_batch_processed",
"timestamp_iso": "2024-05-30T14:22:10Z",
"purpose": "Environmental monitoring for HVAC optimisation",
"data_subjects": ["building_occupants"],
"data_categories": ["temperature_reading", "humidity_reading", "occupancy_count"],
"recipients": ["cloud_analytics", "facilities_dashboard"],
"retention_period": "90_days",
"security_measures": [
"tls_1_3_mtls",
"sha256_audit_chain",
"field_redaction_for_pii",
"encrypted_local_storage"
],
"additional_details": { ... }
}
]
Data Retention Policies
GDPR Article 5(1)(e) requires that personal data be kept no longer than necessary for the
purposes for which it is processed. The Pyvorin Edge Runtime implements this through two
mechanisms: the PrivacyPolicyEngine in
pyv_edge_agent/privacy_firewall/policy.py, which classifies and filters data at
ingestion time; and the SQLiteStore.purge_old() method in
pyv_edge_agent/local_store/sqlite_store.py, which physically deletes expired records.
Configuring Retention
from pyv_edge_agent.local_store.sqlite_store import SQLiteStore
from pyv_edge_agent.privacy_firewall.policy import PrivacyPolicyEngine, PrivacyRuleset
# Define which fields are sensitive and subject to strict retention
ruleset = PrivacyRuleset(
redact_fields=["patient_id", "ssn", "email"],
mask_fields=["phone_number"],
hash_fields=["device_mac"],
drop_fields=["raw_camera_frame"],
local_only=["biometric_score"],
)
engine = PrivacyPolicyEngine(ruleset=ruleset)
# Open the store and purge records older than 90 days
store = SQLiteStore(db_path="/var/lib/pyvorin/edge_store.db")
deleted = store.purge_old(hours=90 * 24)
print(f"Purged {deleted} expired records from all tables.")
The purge_old() method iterates over readings, events,
summaries, and audit_log, deleting rows whose timestamp is older than
the cutoff. It returns the total number of rows removed. Schedule this via cron or systemd timer
every night at 02:00.
# /etc/cron.d/pyvorin-retention
0 2 * * * pyvorin /usr/bin/python3 -c \
"from pyv_edge_agent.local_store.sqlite_store import SQLiteStore; \
store = SQLiteStore('/var/lib/pyvorin/edge_store.db'); \
store.purge_old(hours=2160)" \
>> /var/log/pyvorin/retention.log 2>&1
Per-Category Retention
For stricter compliance, you may want different retention periods for different data categories.
Query the readings table with a sensor_name filter before purging.
import time
import sqlite3
def purge_by_sensor(db_path: str, sensor_prefix: str, hours: int) -> int:
"""Delete readings from sensors matching prefix older than N hours."""
cutoff = time.time() - (hours * 3600)
conn = sqlite3.connect(db_path)
try:
cur = conn.execute(
"DELETE FROM readings WHERE sensor_name LIKE ? AND timestamp < ?",
(f"{sensor_prefix}%", cutoff),
)
conn.commit()
return cur.rowcount
finally:
conn.close()
# Purge camera data after 7 days, temperature after 90 days
deleted_cam = purge_by_sensor("/var/lib/pyvorin/edge_store.db", "camera_", 7 * 24)
deleted_temp = purge_by_sensor("/var/lib/pyvorin/edge_store.db", "temp_", 90 * 24)
print(f"Purged {deleted_cam} camera records, {deleted_temp} temperature records.")
Right to Erasure (Article 17)
When a data subject requests deletion, you must erase their personal data without undue delay. At the edge, this is complicated by the fact that data may have already been synced upstream. The correct procedure is: erase locally, log the erasure event, and enqueue a deletion request for the cloud.
import time
import sqlite3
from pyv_edge_agent.privacy_firewall.audit import PrivacyAudit
from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue, Priority
DB_PATH = "/var/lib/pyvorin/edge_store.db"
def delete_subject_locally(subject_id: str) -> dict:
"""Erase all local records matching subject_id and log the action."""
conn = sqlite3.connect(DB_PATH)
deleted_counts = {}
try:
for table in ("readings", "events", "summaries", "audit_log"):
# Assumes metadata_json contains the subject_id
cur = conn.execute(
f"DELETE FROM {table} WHERE metadata_json LIKE ?",
(f'%"subject_id": "{subject_id}"%',),
)
deleted_counts[table] = cur.rowcount
conn.commit()
finally:
conn.close()
# Log the erasure
audit = PrivacyAudit(db_path=DB_PATH)
audit.log_action(
action="right_to_erasure",
details={
"subject_id": subject_id,
"deleted_counts": deleted_counts,
"legal_basis": "GDPR Article 17",
},
)
return deleted_counts
def enqueue_cloud_deletion(subject_id: str) -> int:
"""Notify upstream systems to delete the subject's data."""
queue = CloudSyncQueue(db_path="/var/lib/pyvorin/sync_queue.db")
return queue.enqueue(
payload={
"type": "deletion_request",
"subject_id": subject_id,
"requested_at": time.time(),
"legal_basis": "GDPR Article 17",
},
priority=Priority.CRITICAL,
ttl_seconds=86400 * 7,
)
# Execute a deletion request
deleted = delete_subject_locally(subject_id="user-8842")
print(f"Local deletion complete: {deleted}")
queue_id = enqueue_cloud_deletion(subject_id="user-8842")
print(f"Cloud deletion request enqueued as item {queue_id}")
Breach Notification
GDPR Article 33 requires controllers to notify the supervisory authority within 72 hours of becoming aware of a personal data breach. NIS2 tightens this for critical infrastructure, requiring notification within 24 hours for significant incidents. Automated breach detection at the edge is essential because you may not have human operators on site.
Automated Alert Triggers
import time
import logging
from pyv_edge_agent.privacy_firewall.audit import PrivacyAudit
from pyv_edge_agent.cloud_sync.queue import CloudSyncQueue, Priority
logger = logging.getLogger("breach_detection")
def check_breach_indicators(audit: PrivacyAudit, queue: CloudSyncQueue) -> list:
"""Scan for conditions that may indicate a data breach."""
alerts = []
# Indicator 1: Audit chain tampering
if not audit.verify_chain():
alerts.append({
"severity": "critical",
"type": "audit_chain_tampered",
"message": "Privacy audit chain integrity check failed.",
})
# Indicator 2: Sudden spike in outbound data volume
stats = queue.get_stats()
if stats["depth"] > 100_000:
alerts.append({
"severity": "high",
"type": "queue_depth_spike",
"message": f"Sync queue depth {stats['depth']} exceeds threshold.",
})
# Indicator 3: Unusual retry patterns (possible exfiltration)
if stats["retrying_items"] > 1000:
alerts.append({
"severity": "high",
"type": "excessive_retries",
"message": f"{stats['retrying_items']} items in retry loop.",
})
return alerts
def send_breach_alert(alert: dict) -> int:
"""Enqueue a critical alert for immediate upstream delivery."""
queue = CloudSyncQueue(db_path="/var/lib/pyvorin/sync_queue.db")
return queue.enqueue(
payload={
"type": "breach_alert",
"severity": alert["severity"],
"alert_type": alert["type"],
"message": alert["message"],
"timestamp": time.time(),
"device_serial": "pi5-warehouse-a7f3",
},
priority=Priority.CRITICAL,
ttl_seconds=86400,
)
# Run every 5 minutes via systemd timer or cron
audit = PrivacyAudit(db_path="/var/lib/pyvorin/edge_store.db")
queue = CloudSyncQueue(db_path="/var/lib/pyvorin/sync_queue.db")
for alert in check_breach_indicators(audit, queue):
logger.critical("BREACH INDICATOR: %s — %s", alert["type"], alert["message"])
send_breach_alert(alert)
Breach Notification Template
from datetime import datetime, timezone
import json
def generate_breach_report(
breach_type: str,
affected_subjects: list,
data_categories: list,
likely_consequences: str,
measures_taken: list,
) -> dict:
"""Generate a GDPR Article 33 compliant breach notification dict."""
return {
"notification_type": "gdpr_article_33",
"timestamp_utc": datetime.now(timezone.utc).isoformat(),
"breach_type": breach_type,
"affected_data_subjects_count": len(affected_subjects),
"affected_data_categories": data_categories,
"likely_consequences": likely_consequences,
"measures_taken": measures_taken,
"contact_dpo": "dpo@pyvorin.com",
"supervisory_authority": "German DPA (Berlin)",
}
report = generate_breach_report(
breach_type="unauthorised_access",
affected_subjects=["building_occupants"],
data_categories=["temperature_reading", "occupancy_count"],
likely_consequences="Potential inference of building occupancy patterns.",
measures_taken=[
"device_isolated_from_network",
"audit_chain_verified",
"certificates_rotated",
"law_enforcement_notified",
],
)
print(json.dumps(report, indent=2))
NIS2 Compliance for Critical Infrastructure
The NIS2 Directive (EU 2022/2555) applies to essential and important entities in sectors such as energy, transport, banking, health, drinking water, and digital infrastructure. Article 21 requires measures for risk analysis, incident handling, business continuity, supply chain security, encryption, and authentication. Pyvorin Edge provides direct support for most of these.
Mapping Pyvorin Edge Controls to NIS2 Article 21
| NIS2 Article 21 Measure | Pyvorin Edge Implementation |
|---|---|
| Risk analysis and information system security | BundleVerifier.verify_at_runtime(), tamper detection, hardware attestation |
| Incident handling | Automated breach alerts via CloudSyncQueue with Priority.CRITICAL |
| Business continuity and crisis management | SQLite WAL recovery, queue rebuild scripts, automated backups |
| Supply chain security | pyv-edge-sign for signed bundles, Ed25519 trust anchors |
| Security in acquisition and development | Signed OTA updates, SimulatorAdapter for pre-deployment testing |
| Policies and procedures for cryptography | Ed25519 signing, SHA-256 hash chains, mTLS with certificate pinning |
| Authentication and access control | Hardware serial binding, per-device client certificates |
NIS2 Incident Timeline
- T+0 hours: Automated breach detection triggers. Alert enqueued with
Priority.CRITICAL. - T+1 hour: Alert reaches cloud SIEM. Incident response team acknowledges.
- T+12 hours: Preliminary assessment completed using
PrivacyAuditexport and hardware attestation report. - T+24 hours: NIS2 significant incident notification sent to national CSIRT and sector regulator.
- T+72 hours: GDPR breach notification sent to supervisory authority (if personal data involved).
Complete Python Example for GDPR Export
The following script is a complete, copy-paste-ready tool that connects to an edge device, verifies the audit chain, exports GDPR Article 30 records, generates a retention report, and produces an integrity attestation.
#!/usr/bin/env python3
"""Complete GDPR compliance export and verification tool for Pyvorin Edge."""
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
from pyv_edge_agent.privacy_firewall.audit import PrivacyAudit
from pyv_edge_agent.local_store.sqlite_store import SQLiteStore
from pyvorin_edge.attestation import collect_hardware_info
DB_PATH = Path("/var/lib/pyvorin/edge_store.db")
OUTPUT_DIR = Path("/tmp/gdpr_export")
def main() -> int:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
audit = PrivacyAudit(db_path=str(DB_PATH))
store = SQLiteStore(db_path=str(DB_PATH))
# 1. Verify audit chain integrity
if not audit.verify_chain():
print("CRITICAL: Audit chain tampered. Export aborted.", file=sys.stderr)
return 1
# 2. Export Article 30 records
gdpr_records = audit.export_for_gdpr()
article_30_path = OUTPUT_DIR / "article_30_records.json"
with open(article_30_path, "w", encoding="utf-8") as f:
json.dump(gdpr_records, f, indent=2, default=str)
print(f"Exported {len(gdpr_records)} Article 30 records to {article_30_path}")
# 3. Retention report
stats = store.get_stats()
retention_report = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"db_size_bytes": stats["db_size_bytes"],
"wal_size_bytes": stats["wal_size_bytes"],
"total_rows": stats["total_rows"],
"oldest_record_timestamp": stats["oldest_record_timestamp"],
"row_counts": stats["row_counts"],
}
retention_path = OUTPUT_DIR / "retention_report.json"
with open(retention_path, "w", encoding="utf-8") as f:
json.dump(retention_report, f, indent=2, default=str)
print(f"Retention report written to {retention_path}")
# 4. Hardware attestation
hw = collect_hardware_info()
attestation = {
"device_serial": hw.serial_number,
"hardware_revision": hw.hardware_revision,
"cpu_model": hw.cpu_model,
"exported_at": datetime.now(timezone.utc).isoformat(),
}
attestation_path = OUTPUT_DIR / "device_attestation.json"
with open(attestation_path, "w", encoding="utf-8") as f:
json.dump(attestation, f, indent=2, default=str)
print(f"Device attestation written to {attestation_path}")
# 5. Summary manifest
manifest = {
"export_timestamp": datetime.now(timezone.utc).isoformat(),
"files": {
"article_30_records": str(article_30_path),
"retention_report": str(retention_path),
"device_attestation": str(attestation_path),
},
"audit_chain_integrity": "verified",
"record_count": len(gdpr_records),
}
with open(OUTPUT_DIR / "manifest.json", "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2)
print("GDPR export complete.")
return 0
if __name__ == "__main__":
sys.exit(main())
Operational Best Practices
- Run
export_for_gdpr()monthly. Even if no audit is scheduled, regular exports prove ongoing compliance and detect schema drift early. - Sign the export. Use
pyv-edge-signto sign the JSON export before transferring it to legal. This preserves non-repudiation. - Store exports off-device. rsync or scp the
/tmp/gdpr_exportdirectory to an object-store bucket with object-lock and legal-hold enabled. - Document your retention rationale. GDPR requires you to justify retention
periods. Add a
retention_rationalefield to yourPrivacyAuditdetails for every processing activity. - Test deletion requests quarterly. Simulate a right-to-erasure request on
a staging device, verify that local and upstream data are removed, and confirm that the
PrivacyAuditchain records the deletion event.
Summary
GDPR and NIS2 compliance at the edge is achievable with the tools built into Pyvorin Edge.
PrivacyAudit.export_for_gdpr() gives you Article 30 records in a standard format.
SQLiteStore.purge_old() enforces retention limits. The deletion-request workflow
handles right-to-erasure with full audit logging. Automated breach detection and alerting
satisfy both GDPR Article 33 and NIS2 incident-reporting timelines. By mapping every control
to a specific runtime API or CLI tool, Pyvorin Edge turns regulatory requirements from
paperwork into executable, testable, and verifiable code.