Compliance

GDPR Data Processing: Technical Implementation Guide

DeviDevs Team
11 min read
#gdpr#data-privacy#compliance#data-protection#privacy-engineering

GDPR compliance requires systematic technical implementation across data processing systems. This guide covers building privacy-respecting systems that meet regulatory requirements.

Data Subject Rights Automation

Build systems to handle data subject requests:

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from enum import Enum
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import json
import hashlib
import asyncio
 
class RequestType(Enum):
    ACCESS = "access"
    RECTIFICATION = "rectification"
    ERASURE = "erasure"
    PORTABILITY = "portability"
    RESTRICTION = "restriction"
    OBJECTION = "objection"
 
class RequestStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    AWAITING_VERIFICATION = "awaiting_verification"
    COMPLETED = "completed"
    REJECTED = "rejected"
 
@dataclass
class DataSubjectRequest:
    request_id: str
    request_type: RequestType
    subject_email: str
    subject_id: Optional[str]
    submitted_at: datetime
    deadline: datetime
    status: RequestStatus
    verification_token: Optional[str] = None
    verified_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    notes: List[str] = field(default_factory=list)
    data_sources: List[str] = field(default_factory=list)
 
@dataclass
class PersonalData:
    source: str
    category: str
    data: Dict
    collected_at: datetime
    legal_basis: str
    retention_period: int
    encrypted: bool
 
class DataSubjectRightsManager:
    def __init__(self, data_sources: List['DataSource']):
        self.data_sources = {ds.name: ds for ds in data_sources}
        self.requests: Dict[str, DataSubjectRequest] = {}
        self.deadline_days = 30
 
    def submit_request(self, request_type: RequestType, email: str) -> DataSubjectRequest:
        """Submit a new data subject request."""
        request_id = self._generate_request_id()
        verification_token = self._generate_verification_token()
 
        request = DataSubjectRequest(
            request_id=request_id,
            request_type=request_type,
            subject_email=email,
            subject_id=None,
            submitted_at=datetime.utcnow(),
            deadline=datetime.utcnow() + timedelta(days=self.deadline_days),
            status=RequestStatus.AWAITING_VERIFICATION,
            verification_token=verification_token,
            data_sources=list(self.data_sources.keys())
        )
 
        self.requests[request_id] = request
        self._send_verification_email(email, verification_token)
 
        return request
 
    def verify_request(self, request_id: str, token: str) -> bool:
        """Verify data subject identity."""
        request = self.requests.get(request_id)
        if not request:
            return False
 
        if request.verification_token != token:
            return False
 
        request.status = RequestStatus.IN_PROGRESS
        request.verified_at = datetime.utcnow()
 
        # Start processing asynchronously
        asyncio.create_task(self._process_request(request))
 
        return True
 
    async def _process_request(self, request: DataSubjectRequest):
        """Process verified request based on type."""
        handlers = {
            RequestType.ACCESS: self._handle_access_request,
            RequestType.ERASURE: self._handle_erasure_request,
            RequestType.PORTABILITY: self._handle_portability_request,
            RequestType.RECTIFICATION: self._handle_rectification_request
        }
 
        handler = handlers.get(request.request_type)
        if handler:
            await handler(request)
 
    async def _handle_access_request(self, request: DataSubjectRequest) -> Dict:
        """Handle right of access request (Article 15)."""
        all_data = []
 
        for source_name, source in self.data_sources.items():
            try:
                data = await source.get_subject_data(request.subject_email)
                if data:
                    all_data.append({
                        'source': source_name,
                        'categories': source.data_categories,
                        'data': data,
                        'legal_basis': source.legal_basis,
                        'retention_period': source.retention_days,
                        'recipients': source.data_recipients
                    })
                    request.notes.append(f"Retrieved data from {source_name}")
            except Exception as e:
                request.notes.append(f"Error retrieving from {source_name}: {str(e)}")
 
        # Generate access report
        report = {
            'request_id': request.request_id,
            'generated_at': datetime.utcnow().isoformat(),
            'data_subject': request.subject_email,
            'data_collected': all_data,
            'processing_purposes': self._get_processing_purposes(),
            'data_sources': list(self.data_sources.keys()),
            'third_party_recipients': self._get_all_recipients(),
            'retention_policies': self._get_retention_policies(),
            'rights_information': self._get_rights_info()
        }
 
        request.status = RequestStatus.COMPLETED
        request.completed_at = datetime.utcnow()
 
        # Send report to data subject
        await self._send_access_report(request.subject_email, report)
 
        return report
 
    async def _handle_erasure_request(self, request: DataSubjectRequest) -> Dict:
        """Handle right to erasure request (Article 17)."""
        results = {'deleted': [], 'retained': [], 'errors': []}
 
        for source_name, source in self.data_sources.items():
            try:
                # Check if deletion is legally permitted
                can_delete, reason = source.can_delete_data(request.subject_email)
 
                if can_delete:
                    await source.delete_subject_data(request.subject_email)
                    results['deleted'].append(source_name)
                    request.notes.append(f"Deleted data from {source_name}")
                else:
                    results['retained'].append({
                        'source': source_name,
                        'reason': reason
                    })
                    request.notes.append(f"Retained data in {source_name}: {reason}")
 
            except Exception as e:
                results['errors'].append({
                    'source': source_name,
                    'error': str(e)
                })
                request.notes.append(f"Error deleting from {source_name}: {str(e)}")
 
        request.status = RequestStatus.COMPLETED
        request.completed_at = datetime.utcnow()
 
        # Notify data subject
        await self._send_erasure_confirmation(request.subject_email, results)
 
        return results
 
    async def _handle_portability_request(self, request: DataSubjectRequest) -> Dict:
        """Handle data portability request (Article 20)."""
        portable_data = []
 
        for source_name, source in self.data_sources.items():
            if source.supports_portability:
                try:
                    data = await source.get_portable_data(request.subject_email)
                    if data:
                        portable_data.append({
                            'source': source_name,
                            'format': 'JSON',
                            'data': data
                        })
                except Exception as e:
                    request.notes.append(f"Error exporting from {source_name}: {str(e)}")
 
        # Package as machine-readable format
        export_package = {
            'export_id': request.request_id,
            'created_at': datetime.utcnow().isoformat(),
            'format_version': '1.0',
            'data_subject': request.subject_email,
            'data': portable_data
        }
 
        request.status = RequestStatus.COMPLETED
        request.completed_at = datetime.utcnow()
 
        # Send download link
        await self._send_portability_package(request.subject_email, export_package)
 
        return export_package
 
    async def _handle_rectification_request(self, request: DataSubjectRequest):
        """Handle rectification request (Article 16)."""
        # Would include logic to update incorrect data
        pass
 
    def _generate_request_id(self) -> str:
        return f"DSR-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{hashlib.sha256(str(datetime.utcnow().timestamp()).encode()).hexdigest()[:8]}"
 
    def _generate_verification_token(self) -> str:
        import secrets
        return secrets.token_urlsafe(32)
 
    def _send_verification_email(self, email: str, token: str):
        # Implementation would send actual email
        pass
 
    async def _send_access_report(self, email: str, report: Dict):
        pass
 
    async def _send_erasure_confirmation(self, email: str, results: Dict):
        pass
 
    async def _send_portability_package(self, email: str, package: Dict):
        pass
 
    def _get_processing_purposes(self) -> List[str]:
        return ['Service delivery', 'Communication', 'Analytics', 'Legal compliance']
 
    def _get_all_recipients(self) -> List[str]:
        return list(set(r for s in self.data_sources.values() for r in s.data_recipients))
 
    def _get_retention_policies(self) -> Dict:
        return {name: f"{source.retention_days} days" for name, source in self.data_sources.items()}
 
    def _get_rights_info(self) -> Dict:
        return {
            'access': 'Right to access your personal data',
            'rectification': 'Right to correct inaccurate data',
            'erasure': 'Right to request deletion',
            'portability': 'Right to receive data in portable format',
            'restriction': 'Right to restrict processing',
            'objection': 'Right to object to processing',
            'complaint': 'Right to lodge complaint with supervisory authority'
        }
 
class DataSource(ABC):
    def __init__(self, name: str, data_categories: List[str], legal_basis: str, retention_days: int):
        self.name = name
        self.data_categories = data_categories
        self.legal_basis = legal_basis
        self.retention_days = retention_days
        self.data_recipients: List[str] = []
        self.supports_portability = True
 
    @abstractmethod
    async def get_subject_data(self, identifier: str) -> Optional[Dict]:
        pass
 
    @abstractmethod
    async def delete_subject_data(self, identifier: str) -> bool:
        pass
 
    @abstractmethod
    async def get_portable_data(self, identifier: str) -> Optional[Dict]:
        pass
 
    def can_delete_data(self, identifier: str) -> tuple:
        """Check if data can be deleted. Returns (can_delete, reason)."""
        return (True, None)

Build a comprehensive consent management platform:

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set
from datetime import datetime
from enum import Enum
import json
 
class ConsentPurpose(Enum):
    ESSENTIAL = "essential"
    ANALYTICS = "analytics"
    MARKETING = "marketing"
    PERSONALIZATION = "personalization"
    THIRD_PARTY = "third_party"
 
class ConsentAction(Enum):
    GRANTED = "granted"
    WITHDRAWN = "withdrawn"
    UPDATED = "updated"
 
@dataclass
class ConsentRecord:
    consent_id: str
    subject_id: str
    purpose: ConsentPurpose
    granted: bool
    granted_at: Optional[datetime]
    withdrawn_at: Optional[datetime]
    version: str
    source: str
    ip_address: str
    user_agent: str
 
@dataclass
class ConsentAuditLog:
    log_id: str
    consent_id: str
    action: ConsentAction
    timestamp: datetime
    old_value: Optional[bool]
    new_value: bool
    metadata: Dict
 
class ConsentManagementPlatform:
    def __init__(self):
        self.consents: Dict[str, Dict[ConsentPurpose, ConsentRecord]] = {}
        self.audit_logs: List[ConsentAuditLog] = []
        self.consent_version = "2.0"
 
    def record_consent(
        self,
        subject_id: str,
        purposes: Dict[ConsentPurpose, bool],
        source: str,
        ip_address: str,
        user_agent: str
    ) -> Dict[str, ConsentRecord]:
        """Record consent decisions for multiple purposes."""
        if subject_id not in self.consents:
            self.consents[subject_id] = {}
 
        records = {}
        timestamp = datetime.utcnow()
 
        for purpose, granted in purposes.items():
            # Skip if essential - always required
            if purpose == ConsentPurpose.ESSENTIAL:
                granted = True
 
            existing = self.consents[subject_id].get(purpose)
            old_value = existing.granted if existing else None
 
            consent_id = f"consent_{subject_id}_{purpose.value}_{timestamp.strftime('%Y%m%d%H%M%S')}"
 
            record = ConsentRecord(
                consent_id=consent_id,
                subject_id=subject_id,
                purpose=purpose,
                granted=granted,
                granted_at=timestamp if granted else None,
                withdrawn_at=None if granted else timestamp,
                version=self.consent_version,
                source=source,
                ip_address=ip_address,
                user_agent=user_agent
            )
 
            self.consents[subject_id][purpose] = record
            records[purpose.value] = record
 
            # Audit log
            action = ConsentAction.GRANTED if granted else ConsentAction.WITHDRAWN
            if old_value is not None and old_value != granted:
                action = ConsentAction.UPDATED
 
            self._log_audit(consent_id, action, old_value, granted, {
                'source': source,
                'ip': ip_address,
                'version': self.consent_version
            })
 
        return records
 
    def check_consent(self, subject_id: str, purpose: ConsentPurpose) -> bool:
        """Check if subject has granted consent for a purpose."""
        if purpose == ConsentPurpose.ESSENTIAL:
            return True
 
        subject_consents = self.consents.get(subject_id, {})
        consent = subject_consents.get(purpose)
 
        return consent.granted if consent else False
 
    def withdraw_consent(self, subject_id: str, purpose: ConsentPurpose) -> bool:
        """Withdraw consent for a specific purpose."""
        if purpose == ConsentPurpose.ESSENTIAL:
            return False
 
        subject_consents = self.consents.get(subject_id, {})
        consent = subject_consents.get(purpose)
 
        if consent and consent.granted:
            old_value = consent.granted
            consent.granted = False
            consent.withdrawn_at = datetime.utcnow()
 
            self._log_audit(
                consent.consent_id,
                ConsentAction.WITHDRAWN,
                old_value,
                False,
                {'reason': 'user_withdrawal'}
            )
            return True
 
        return False
 
    def get_consent_status(self, subject_id: str) -> Dict:
        """Get current consent status for a subject."""
        subject_consents = self.consents.get(subject_id, {})
 
        status = {}
        for purpose in ConsentPurpose:
            consent = subject_consents.get(purpose)
            status[purpose.value] = {
                'granted': consent.granted if consent else (purpose == ConsentPurpose.ESSENTIAL),
                'granted_at': consent.granted_at.isoformat() if consent and consent.granted_at else None,
                'version': consent.version if consent else None
            }
 
        return status
 
    def get_audit_trail(self, subject_id: str) -> List[Dict]:
        """Get consent audit trail for a subject."""
        subject_consents = self.consents.get(subject_id, {})
        consent_ids = {c.consent_id for c in subject_consents.values()}
 
        return [
            {
                'action': log.action.value,
                'timestamp': log.timestamp.isoformat(),
                'old_value': log.old_value,
                'new_value': log.new_value,
                'metadata': log.metadata
            }
            for log in self.audit_logs
            if log.consent_id in consent_ids
        ]
 
    def export_consent_proof(self, subject_id: str) -> Dict:
        """Export consent records as proof for compliance."""
        subject_consents = self.consents.get(subject_id, {})
 
        return {
            'subject_id': subject_id,
            'exported_at': datetime.utcnow().isoformat(),
            'consents': {
                purpose.value: {
                    'consent_id': record.consent_id,
                    'granted': record.granted,
                    'timestamp': (record.granted_at or record.withdrawn_at).isoformat(),
                    'version': record.version,
                    'source': record.source,
                    'proof_hash': self._generate_proof_hash(record)
                }
                for purpose, record in subject_consents.items()
            },
            'audit_trail': self.get_audit_trail(subject_id)
        }
 
    def _log_audit(
        self,
        consent_id: str,
        action: ConsentAction,
        old_value: Optional[bool],
        new_value: bool,
        metadata: Dict
    ):
        log = ConsentAuditLog(
            log_id=f"audit_{datetime.utcnow().strftime('%Y%m%d%H%M%S%f')}",
            consent_id=consent_id,
            action=action,
            timestamp=datetime.utcnow(),
            old_value=old_value,
            new_value=new_value,
            metadata=metadata
        )
        self.audit_logs.append(log)
 
    def _generate_proof_hash(self, record: ConsentRecord) -> str:
        data = f"{record.consent_id}:{record.granted}:{record.granted_at}:{record.version}"
        return hashlib.sha256(data.encode()).hexdigest()

Data Breach Notification System

Implement breach detection and notification:

from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from enum import Enum
 
class BreachSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"
 
class BreachStatus(Enum):
    DETECTED = "detected"
    INVESTIGATING = "investigating"
    CONTAINED = "contained"
    NOTIFIED_DPA = "notified_dpa"
    NOTIFIED_SUBJECTS = "notified_subjects"
    CLOSED = "closed"
 
@dataclass
class DataBreach:
    breach_id: str
    detected_at: datetime
    description: str
    severity: BreachSeverity
    status: BreachStatus
    affected_subjects_count: int
    data_categories_affected: List[str]
    notification_deadline: datetime
    dpa_notified_at: Optional[datetime] = None
    subjects_notified_at: Optional[datetime] = None
    containment_actions: List[str] = field(default_factory=list)
    root_cause: Optional[str] = None
    remediation_steps: List[str] = field(default_factory=list)
 
class BreachNotificationSystem:
    def __init__(self):
        self.breaches: Dict[str, DataBreach] = {}
        self.dpa_notification_hours = 72
 
    def report_breach(
        self,
        description: str,
        severity: BreachSeverity,
        affected_count: int,
        data_categories: List[str]
    ) -> DataBreach:
        """Report a new data breach."""
        breach_id = f"BREACH-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
 
        breach = DataBreach(
            breach_id=breach_id,
            detected_at=datetime.utcnow(),
            description=description,
            severity=severity,
            status=BreachStatus.DETECTED,
            affected_subjects_count=affected_count,
            data_categories_affected=data_categories,
            notification_deadline=datetime.utcnow() + timedelta(hours=self.dpa_notification_hours)
        )
 
        self.breaches[breach_id] = breach
 
        # Trigger immediate response workflow
        self._trigger_incident_response(breach)
 
        return breach
 
    def assess_notification_requirement(self, breach: DataBreach) -> Dict:
        """Assess whether DPA and subject notification is required."""
        # Risk factors
        high_risk_categories = ['financial', 'health', 'biometric', 'criminal']
        high_risk_data = any(cat in breach.data_categories_affected for cat in high_risk_categories)
 
        assessment = {
            'dpa_notification_required': True,
            'subject_notification_required': False,
            'reasoning': []
        }
 
        # DPA notification (Article 33) - required unless unlikely to result in risk
        if breach.severity in [BreachSeverity.CRITICAL, BreachSeverity.HIGH]:
            assessment['reasoning'].append("High severity breach - DPA notification required within 72 hours")
 
        if breach.affected_subjects_count > 100:
            assessment['reasoning'].append(f"Large scale breach ({breach.affected_subjects_count} subjects)")
 
        # Subject notification (Article 34) - required if high risk to rights
        if high_risk_data:
            assessment['subject_notification_required'] = True
            assessment['reasoning'].append("High-risk data categories affected - subject notification required")
 
        if breach.severity == BreachSeverity.CRITICAL:
            assessment['subject_notification_required'] = True
            assessment['reasoning'].append("Critical severity - subject notification required")
 
        # Check for mitigating factors
        if self._has_encryption_protection(breach):
            assessment['subject_notification_required'] = False
            assessment['reasoning'].append("Data was encrypted - subject notification may not be required")
 
        return assessment
 
    def _has_encryption_protection(self, breach: DataBreach) -> bool:
        # Would check if breached data was encrypted
        return False
 
    def generate_dpa_notification(self, breach: DataBreach) -> Dict:
        """Generate notification for Data Protection Authority."""
        return {
            'notification_type': 'Article 33 Breach Notification',
            'controller_details': {
                'name': 'Organization Name',
                'contact': 'dpo@organization.com',
                'address': 'Organization Address'
            },
            'breach_details': {
                'breach_id': breach.breach_id,
                'nature_of_breach': breach.description,
                'date_detected': breach.detected_at.isoformat(),
                'categories_of_data': breach.data_categories_affected,
                'approximate_subjects': breach.affected_subjects_count,
                'likely_consequences': self._assess_consequences(breach),
                'measures_taken': breach.containment_actions,
                'measures_proposed': breach.remediation_steps
            },
            'dpo_contact': {
                'name': 'Data Protection Officer',
                'email': 'dpo@organization.com',
                'phone': '+1-XXX-XXX-XXXX'
            },
            'generated_at': datetime.utcnow().isoformat()
        }
 
    def generate_subject_notification(self, breach: DataBreach) -> Dict:
        """Generate notification for affected data subjects."""
        return {
            'subject': 'Important: Security Incident Affecting Your Data',
            'body': {
                'greeting': 'Dear User,',
                'breach_description': f"We are writing to inform you of a security incident that may have affected your personal data. {breach.description}",
                'data_affected': f"The following categories of data may have been affected: {', '.join(breach.data_categories_affected)}",
                'likely_consequences': self._assess_consequences(breach),
                'measures_taken': "We have taken immediate steps to contain the incident and prevent further unauthorized access.",
                'recommendations': [
                    "Monitor your accounts for suspicious activity",
                    "Consider changing your password",
                    "Be cautious of phishing attempts"
                ],
                'contact': "If you have any questions, please contact our Data Protection Officer at dpo@organization.com",
                'apology': "We sincerely apologize for any inconvenience this may cause."
            },
            'generated_at': datetime.utcnow().isoformat()
        }
 
    def _assess_consequences(self, breach: DataBreach) -> List[str]:
        consequences = []
 
        if 'financial' in breach.data_categories_affected:
            consequences.append("Potential financial fraud or identity theft")
 
        if 'credentials' in breach.data_categories_affected:
            consequences.append("Potential unauthorized account access")
 
        if 'health' in breach.data_categories_affected:
            consequences.append("Potential disclosure of sensitive health information")
 
        if not consequences:
            consequences.append("Potential privacy impact")
 
        return consequences
 
    def _trigger_incident_response(self, breach: DataBreach):
        """Trigger automated incident response workflow."""
        # Would integrate with incident management system
        pass
 
    def update_breach_status(self, breach_id: str, status: BreachStatus, notes: str = None):
        """Update breach status."""
        breach = self.breaches.get(breach_id)
        if breach:
            breach.status = status
 
            if status == BreachStatus.NOTIFIED_DPA:
                breach.dpa_notified_at = datetime.utcnow()
 
            if status == BreachStatus.NOTIFIED_SUBJECTS:
                breach.subjects_notified_at = datetime.utcnow()
 
    def get_compliance_report(self) -> Dict:
        """Generate GDPR compliance report for breaches."""
        total = len(self.breaches)
        notified_on_time = sum(
            1 for b in self.breaches.values()
            if b.dpa_notified_at and b.dpa_notified_at <= b.notification_deadline
        )
 
        return {
            'report_period': {
                'start': min(b.detected_at for b in self.breaches.values()).isoformat() if self.breaches else None,
                'end': datetime.utcnow().isoformat()
            },
            'total_breaches': total,
            'by_severity': {
                severity.value: sum(1 for b in self.breaches.values() if b.severity == severity)
                for severity in BreachSeverity
            },
            'notification_compliance': {
                'total_requiring_notification': total,
                'notified_within_deadline': notified_on_time,
                'compliance_rate': notified_on_time / total if total > 0 else 1.0
            },
            'breaches': [
                {
                    'id': b.breach_id,
                    'detected': b.detected_at.isoformat(),
                    'severity': b.severity.value,
                    'status': b.status.value,
                    'affected_subjects': b.affected_subjects_count
                }
                for b in self.breaches.values()
            ]
        }

Conclusion

GDPR compliance requires systematic technical implementation across all data processing systems. Automate data subject rights handling, implement robust consent management with audit trails, and maintain breach notification capabilities. Regular testing and documentation ensure ongoing compliance. Privacy by design should guide all system architecture decisions.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.