GDPR compliance requires systematic technical implementation across data processing systems. This guide covers building privacy-respecting systems that meet regulatory requirements.
Data Subject Rights Automation
Build systems to handle data subject requests:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from enum import Enum
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import json
import hashlib
import asyncio
class RequestType(Enum):
ACCESS = "access"
RECTIFICATION = "rectification"
ERASURE = "erasure"
PORTABILITY = "portability"
RESTRICTION = "restriction"
OBJECTION = "objection"
class RequestStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
AWAITING_VERIFICATION = "awaiting_verification"
COMPLETED = "completed"
REJECTED = "rejected"
@dataclass
class DataSubjectRequest:
request_id: str
request_type: RequestType
subject_email: str
subject_id: Optional[str]
submitted_at: datetime
deadline: datetime
status: RequestStatus
verification_token: Optional[str] = None
verified_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
notes: List[str] = field(default_factory=list)
data_sources: List[str] = field(default_factory=list)
@dataclass
class PersonalData:
source: str
category: str
data: Dict
collected_at: datetime
legal_basis: str
retention_period: int
encrypted: bool
class DataSubjectRightsManager:
def __init__(self, data_sources: List['DataSource']):
self.data_sources = {ds.name: ds for ds in data_sources}
self.requests: Dict[str, DataSubjectRequest] = {}
self.deadline_days = 30
def submit_request(self, request_type: RequestType, email: str) -> DataSubjectRequest:
"""Submit a new data subject request."""
request_id = self._generate_request_id()
verification_token = self._generate_verification_token()
request = DataSubjectRequest(
request_id=request_id,
request_type=request_type,
subject_email=email,
subject_id=None,
submitted_at=datetime.utcnow(),
deadline=datetime.utcnow() + timedelta(days=self.deadline_days),
status=RequestStatus.AWAITING_VERIFICATION,
verification_token=verification_token,
data_sources=list(self.data_sources.keys())
)
self.requests[request_id] = request
self._send_verification_email(email, verification_token)
return request
def verify_request(self, request_id: str, token: str) -> bool:
"""Verify data subject identity."""
request = self.requests.get(request_id)
if not request:
return False
if request.verification_token != token:
return False
request.status = RequestStatus.IN_PROGRESS
request.verified_at = datetime.utcnow()
# Start processing asynchronously
asyncio.create_task(self._process_request(request))
return True
async def _process_request(self, request: DataSubjectRequest):
"""Process verified request based on type."""
handlers = {
RequestType.ACCESS: self._handle_access_request,
RequestType.ERASURE: self._handle_erasure_request,
RequestType.PORTABILITY: self._handle_portability_request,
RequestType.RECTIFICATION: self._handle_rectification_request
}
handler = handlers.get(request.request_type)
if handler:
await handler(request)
async def _handle_access_request(self, request: DataSubjectRequest) -> Dict:
"""Handle right of access request (Article 15)."""
all_data = []
for source_name, source in self.data_sources.items():
try:
data = await source.get_subject_data(request.subject_email)
if data:
all_data.append({
'source': source_name,
'categories': source.data_categories,
'data': data,
'legal_basis': source.legal_basis,
'retention_period': source.retention_days,
'recipients': source.data_recipients
})
request.notes.append(f"Retrieved data from {source_name}")
except Exception as e:
request.notes.append(f"Error retrieving from {source_name}: {str(e)}")
# Generate access report
report = {
'request_id': request.request_id,
'generated_at': datetime.utcnow().isoformat(),
'data_subject': request.subject_email,
'data_collected': all_data,
'processing_purposes': self._get_processing_purposes(),
'data_sources': list(self.data_sources.keys()),
'third_party_recipients': self._get_all_recipients(),
'retention_policies': self._get_retention_policies(),
'rights_information': self._get_rights_info()
}
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.utcnow()
# Send report to data subject
await self._send_access_report(request.subject_email, report)
return report
async def _handle_erasure_request(self, request: DataSubjectRequest) -> Dict:
"""Handle right to erasure request (Article 17)."""
results = {'deleted': [], 'retained': [], 'errors': []}
for source_name, source in self.data_sources.items():
try:
# Check if deletion is legally permitted
can_delete, reason = source.can_delete_data(request.subject_email)
if can_delete:
await source.delete_subject_data(request.subject_email)
results['deleted'].append(source_name)
request.notes.append(f"Deleted data from {source_name}")
else:
results['retained'].append({
'source': source_name,
'reason': reason
})
request.notes.append(f"Retained data in {source_name}: {reason}")
except Exception as e:
results['errors'].append({
'source': source_name,
'error': str(e)
})
request.notes.append(f"Error deleting from {source_name}: {str(e)}")
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.utcnow()
# Notify data subject
await self._send_erasure_confirmation(request.subject_email, results)
return results
async def _handle_portability_request(self, request: DataSubjectRequest) -> Dict:
"""Handle data portability request (Article 20)."""
portable_data = []
for source_name, source in self.data_sources.items():
if source.supports_portability:
try:
data = await source.get_portable_data(request.subject_email)
if data:
portable_data.append({
'source': source_name,
'format': 'JSON',
'data': data
})
except Exception as e:
request.notes.append(f"Error exporting from {source_name}: {str(e)}")
# Package as machine-readable format
export_package = {
'export_id': request.request_id,
'created_at': datetime.utcnow().isoformat(),
'format_version': '1.0',
'data_subject': request.subject_email,
'data': portable_data
}
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.utcnow()
# Send download link
await self._send_portability_package(request.subject_email, export_package)
return export_package
async def _handle_rectification_request(self, request: DataSubjectRequest):
"""Handle rectification request (Article 16)."""
# Would include logic to update incorrect data
pass
def _generate_request_id(self) -> str:
return f"DSR-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{hashlib.sha256(str(datetime.utcnow().timestamp()).encode()).hexdigest()[:8]}"
def _generate_verification_token(self) -> str:
import secrets
return secrets.token_urlsafe(32)
def _send_verification_email(self, email: str, token: str):
# Implementation would send actual email
pass
async def _send_access_report(self, email: str, report: Dict):
pass
async def _send_erasure_confirmation(self, email: str, results: Dict):
pass
async def _send_portability_package(self, email: str, package: Dict):
pass
def _get_processing_purposes(self) -> List[str]:
return ['Service delivery', 'Communication', 'Analytics', 'Legal compliance']
def _get_all_recipients(self) -> List[str]:
return list(set(r for s in self.data_sources.values() for r in s.data_recipients))
def _get_retention_policies(self) -> Dict:
return {name: f"{source.retention_days} days" for name, source in self.data_sources.items()}
def _get_rights_info(self) -> Dict:
return {
'access': 'Right to access your personal data',
'rectification': 'Right to correct inaccurate data',
'erasure': 'Right to request deletion',
'portability': 'Right to receive data in portable format',
'restriction': 'Right to restrict processing',
'objection': 'Right to object to processing',
'complaint': 'Right to lodge complaint with supervisory authority'
}
class DataSource(ABC):
def __init__(self, name: str, data_categories: List[str], legal_basis: str, retention_days: int):
self.name = name
self.data_categories = data_categories
self.legal_basis = legal_basis
self.retention_days = retention_days
self.data_recipients: List[str] = []
self.supports_portability = True
@abstractmethod
async def get_subject_data(self, identifier: str) -> Optional[Dict]:
pass
@abstractmethod
async def delete_subject_data(self, identifier: str) -> bool:
pass
@abstractmethod
async def get_portable_data(self, identifier: str) -> Optional[Dict]:
pass
def can_delete_data(self, identifier: str) -> tuple:
"""Check if data can be deleted. Returns (can_delete, reason)."""
return (True, None)Consent Management System
Build a comprehensive consent management platform:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set
from datetime import datetime
from enum import Enum
import json
class ConsentPurpose(Enum):
ESSENTIAL = "essential"
ANALYTICS = "analytics"
MARKETING = "marketing"
PERSONALIZATION = "personalization"
THIRD_PARTY = "third_party"
class ConsentAction(Enum):
GRANTED = "granted"
WITHDRAWN = "withdrawn"
UPDATED = "updated"
@dataclass
class ConsentRecord:
consent_id: str
subject_id: str
purpose: ConsentPurpose
granted: bool
granted_at: Optional[datetime]
withdrawn_at: Optional[datetime]
version: str
source: str
ip_address: str
user_agent: str
@dataclass
class ConsentAuditLog:
log_id: str
consent_id: str
action: ConsentAction
timestamp: datetime
old_value: Optional[bool]
new_value: bool
metadata: Dict
class ConsentManagementPlatform:
def __init__(self):
self.consents: Dict[str, Dict[ConsentPurpose, ConsentRecord]] = {}
self.audit_logs: List[ConsentAuditLog] = []
self.consent_version = "2.0"
def record_consent(
self,
subject_id: str,
purposes: Dict[ConsentPurpose, bool],
source: str,
ip_address: str,
user_agent: str
) -> Dict[str, ConsentRecord]:
"""Record consent decisions for multiple purposes."""
if subject_id not in self.consents:
self.consents[subject_id] = {}
records = {}
timestamp = datetime.utcnow()
for purpose, granted in purposes.items():
# Skip if essential - always required
if purpose == ConsentPurpose.ESSENTIAL:
granted = True
existing = self.consents[subject_id].get(purpose)
old_value = existing.granted if existing else None
consent_id = f"consent_{subject_id}_{purpose.value}_{timestamp.strftime('%Y%m%d%H%M%S')}"
record = ConsentRecord(
consent_id=consent_id,
subject_id=subject_id,
purpose=purpose,
granted=granted,
granted_at=timestamp if granted else None,
withdrawn_at=None if granted else timestamp,
version=self.consent_version,
source=source,
ip_address=ip_address,
user_agent=user_agent
)
self.consents[subject_id][purpose] = record
records[purpose.value] = record
# Audit log
action = ConsentAction.GRANTED if granted else ConsentAction.WITHDRAWN
if old_value is not None and old_value != granted:
action = ConsentAction.UPDATED
self._log_audit(consent_id, action, old_value, granted, {
'source': source,
'ip': ip_address,
'version': self.consent_version
})
return records
def check_consent(self, subject_id: str, purpose: ConsentPurpose) -> bool:
"""Check if subject has granted consent for a purpose."""
if purpose == ConsentPurpose.ESSENTIAL:
return True
subject_consents = self.consents.get(subject_id, {})
consent = subject_consents.get(purpose)
return consent.granted if consent else False
def withdraw_consent(self, subject_id: str, purpose: ConsentPurpose) -> bool:
"""Withdraw consent for a specific purpose."""
if purpose == ConsentPurpose.ESSENTIAL:
return False
subject_consents = self.consents.get(subject_id, {})
consent = subject_consents.get(purpose)
if consent and consent.granted:
old_value = consent.granted
consent.granted = False
consent.withdrawn_at = datetime.utcnow()
self._log_audit(
consent.consent_id,
ConsentAction.WITHDRAWN,
old_value,
False,
{'reason': 'user_withdrawal'}
)
return True
return False
def get_consent_status(self, subject_id: str) -> Dict:
"""Get current consent status for a subject."""
subject_consents = self.consents.get(subject_id, {})
status = {}
for purpose in ConsentPurpose:
consent = subject_consents.get(purpose)
status[purpose.value] = {
'granted': consent.granted if consent else (purpose == ConsentPurpose.ESSENTIAL),
'granted_at': consent.granted_at.isoformat() if consent and consent.granted_at else None,
'version': consent.version if consent else None
}
return status
def get_audit_trail(self, subject_id: str) -> List[Dict]:
"""Get consent audit trail for a subject."""
subject_consents = self.consents.get(subject_id, {})
consent_ids = {c.consent_id for c in subject_consents.values()}
return [
{
'action': log.action.value,
'timestamp': log.timestamp.isoformat(),
'old_value': log.old_value,
'new_value': log.new_value,
'metadata': log.metadata
}
for log in self.audit_logs
if log.consent_id in consent_ids
]
def export_consent_proof(self, subject_id: str) -> Dict:
"""Export consent records as proof for compliance."""
subject_consents = self.consents.get(subject_id, {})
return {
'subject_id': subject_id,
'exported_at': datetime.utcnow().isoformat(),
'consents': {
purpose.value: {
'consent_id': record.consent_id,
'granted': record.granted,
'timestamp': (record.granted_at or record.withdrawn_at).isoformat(),
'version': record.version,
'source': record.source,
'proof_hash': self._generate_proof_hash(record)
}
for purpose, record in subject_consents.items()
},
'audit_trail': self.get_audit_trail(subject_id)
}
def _log_audit(
self,
consent_id: str,
action: ConsentAction,
old_value: Optional[bool],
new_value: bool,
metadata: Dict
):
log = ConsentAuditLog(
log_id=f"audit_{datetime.utcnow().strftime('%Y%m%d%H%M%S%f')}",
consent_id=consent_id,
action=action,
timestamp=datetime.utcnow(),
old_value=old_value,
new_value=new_value,
metadata=metadata
)
self.audit_logs.append(log)
def _generate_proof_hash(self, record: ConsentRecord) -> str:
data = f"{record.consent_id}:{record.granted}:{record.granted_at}:{record.version}"
return hashlib.sha256(data.encode()).hexdigest()Data Breach Notification System
Implement breach detection and notification:
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from enum import Enum
class BreachSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class BreachStatus(Enum):
DETECTED = "detected"
INVESTIGATING = "investigating"
CONTAINED = "contained"
NOTIFIED_DPA = "notified_dpa"
NOTIFIED_SUBJECTS = "notified_subjects"
CLOSED = "closed"
@dataclass
class DataBreach:
breach_id: str
detected_at: datetime
description: str
severity: BreachSeverity
status: BreachStatus
affected_subjects_count: int
data_categories_affected: List[str]
notification_deadline: datetime
dpa_notified_at: Optional[datetime] = None
subjects_notified_at: Optional[datetime] = None
containment_actions: List[str] = field(default_factory=list)
root_cause: Optional[str] = None
remediation_steps: List[str] = field(default_factory=list)
class BreachNotificationSystem:
def __init__(self):
self.breaches: Dict[str, DataBreach] = {}
self.dpa_notification_hours = 72
def report_breach(
self,
description: str,
severity: BreachSeverity,
affected_count: int,
data_categories: List[str]
) -> DataBreach:
"""Report a new data breach."""
breach_id = f"BREACH-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
breach = DataBreach(
breach_id=breach_id,
detected_at=datetime.utcnow(),
description=description,
severity=severity,
status=BreachStatus.DETECTED,
affected_subjects_count=affected_count,
data_categories_affected=data_categories,
notification_deadline=datetime.utcnow() + timedelta(hours=self.dpa_notification_hours)
)
self.breaches[breach_id] = breach
# Trigger immediate response workflow
self._trigger_incident_response(breach)
return breach
def assess_notification_requirement(self, breach: DataBreach) -> Dict:
"""Assess whether DPA and subject notification is required."""
# Risk factors
high_risk_categories = ['financial', 'health', 'biometric', 'criminal']
high_risk_data = any(cat in breach.data_categories_affected for cat in high_risk_categories)
assessment = {
'dpa_notification_required': True,
'subject_notification_required': False,
'reasoning': []
}
# DPA notification (Article 33) - required unless unlikely to result in risk
if breach.severity in [BreachSeverity.CRITICAL, BreachSeverity.HIGH]:
assessment['reasoning'].append("High severity breach - DPA notification required within 72 hours")
if breach.affected_subjects_count > 100:
assessment['reasoning'].append(f"Large scale breach ({breach.affected_subjects_count} subjects)")
# Subject notification (Article 34) - required if high risk to rights
if high_risk_data:
assessment['subject_notification_required'] = True
assessment['reasoning'].append("High-risk data categories affected - subject notification required")
if breach.severity == BreachSeverity.CRITICAL:
assessment['subject_notification_required'] = True
assessment['reasoning'].append("Critical severity - subject notification required")
# Check for mitigating factors
if self._has_encryption_protection(breach):
assessment['subject_notification_required'] = False
assessment['reasoning'].append("Data was encrypted - subject notification may not be required")
return assessment
def _has_encryption_protection(self, breach: DataBreach) -> bool:
# Would check if breached data was encrypted
return False
def generate_dpa_notification(self, breach: DataBreach) -> Dict:
"""Generate notification for Data Protection Authority."""
return {
'notification_type': 'Article 33 Breach Notification',
'controller_details': {
'name': 'Organization Name',
'contact': 'dpo@organization.com',
'address': 'Organization Address'
},
'breach_details': {
'breach_id': breach.breach_id,
'nature_of_breach': breach.description,
'date_detected': breach.detected_at.isoformat(),
'categories_of_data': breach.data_categories_affected,
'approximate_subjects': breach.affected_subjects_count,
'likely_consequences': self._assess_consequences(breach),
'measures_taken': breach.containment_actions,
'measures_proposed': breach.remediation_steps
},
'dpo_contact': {
'name': 'Data Protection Officer',
'email': 'dpo@organization.com',
'phone': '+1-XXX-XXX-XXXX'
},
'generated_at': datetime.utcnow().isoformat()
}
def generate_subject_notification(self, breach: DataBreach) -> Dict:
"""Generate notification for affected data subjects."""
return {
'subject': 'Important: Security Incident Affecting Your Data',
'body': {
'greeting': 'Dear User,',
'breach_description': f"We are writing to inform you of a security incident that may have affected your personal data. {breach.description}",
'data_affected': f"The following categories of data may have been affected: {', '.join(breach.data_categories_affected)}",
'likely_consequences': self._assess_consequences(breach),
'measures_taken': "We have taken immediate steps to contain the incident and prevent further unauthorized access.",
'recommendations': [
"Monitor your accounts for suspicious activity",
"Consider changing your password",
"Be cautious of phishing attempts"
],
'contact': "If you have any questions, please contact our Data Protection Officer at dpo@organization.com",
'apology': "We sincerely apologize for any inconvenience this may cause."
},
'generated_at': datetime.utcnow().isoformat()
}
def _assess_consequences(self, breach: DataBreach) -> List[str]:
consequences = []
if 'financial' in breach.data_categories_affected:
consequences.append("Potential financial fraud or identity theft")
if 'credentials' in breach.data_categories_affected:
consequences.append("Potential unauthorized account access")
if 'health' in breach.data_categories_affected:
consequences.append("Potential disclosure of sensitive health information")
if not consequences:
consequences.append("Potential privacy impact")
return consequences
def _trigger_incident_response(self, breach: DataBreach):
"""Trigger automated incident response workflow."""
# Would integrate with incident management system
pass
def update_breach_status(self, breach_id: str, status: BreachStatus, notes: str = None):
"""Update breach status."""
breach = self.breaches.get(breach_id)
if breach:
breach.status = status
if status == BreachStatus.NOTIFIED_DPA:
breach.dpa_notified_at = datetime.utcnow()
if status == BreachStatus.NOTIFIED_SUBJECTS:
breach.subjects_notified_at = datetime.utcnow()
def get_compliance_report(self) -> Dict:
"""Generate GDPR compliance report for breaches."""
total = len(self.breaches)
notified_on_time = sum(
1 for b in self.breaches.values()
if b.dpa_notified_at and b.dpa_notified_at <= b.notification_deadline
)
return {
'report_period': {
'start': min(b.detected_at for b in self.breaches.values()).isoformat() if self.breaches else None,
'end': datetime.utcnow().isoformat()
},
'total_breaches': total,
'by_severity': {
severity.value: sum(1 for b in self.breaches.values() if b.severity == severity)
for severity in BreachSeverity
},
'notification_compliance': {
'total_requiring_notification': total,
'notified_within_deadline': notified_on_time,
'compliance_rate': notified_on_time / total if total > 0 else 1.0
},
'breaches': [
{
'id': b.breach_id,
'detected': b.detected_at.isoformat(),
'severity': b.severity.value,
'status': b.status.value,
'affected_subjects': b.affected_subjects_count
}
for b in self.breaches.values()
]
}Conclusion
GDPR compliance requires systematic technical implementation across all data processing systems. Automate data subject rights handling, implement robust consent management with audit trails, and maintain breach notification capabilities. Regular testing and documentation ensure ongoing compliance. Privacy by design should guide all system architecture decisions.