Conformitatea GDPR necesita implementare tehnica sistematica in toate sistemele de procesare a datelor. Acest ghid acopera construirea sistemelor care respecta confidentialitatea si indeplinesc cerintele de reglementare.
Automatizarea Drepturilor Persoanelor Vizate
Construieste sisteme pentru gestionarea cererilor persoanelor vizate:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from enum import Enum
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import json
import hashlib
import asyncio
class RequestType(Enum):
ACCESS = "access"
RECTIFICATION = "rectification"
ERASURE = "erasure"
PORTABILITY = "portability"
RESTRICTION = "restriction"
OBJECTION = "objection"
class RequestStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
AWAITING_VERIFICATION = "awaiting_verification"
COMPLETED = "completed"
REJECTED = "rejected"
@dataclass
class DataSubjectRequest:
request_id: str
request_type: RequestType
subject_email: str
subject_id: Optional[str]
submitted_at: datetime
deadline: datetime
status: RequestStatus
verification_token: Optional[str] = None
verified_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
notes: List[str] = field(default_factory=list)
data_sources: List[str] = field(default_factory=list)
@dataclass
class PersonalData:
source: str
category: str
data: Dict
collected_at: datetime
legal_basis: str
retention_period: int
encrypted: bool
class DataSubjectRightsManager:
def __init__(self, data_sources: List['DataSource']):
self.data_sources = {ds.name: ds for ds in data_sources}
self.requests: Dict[str, DataSubjectRequest] = {}
self.deadline_days = 30
def submit_request(self, request_type: RequestType, email: str) -> DataSubjectRequest:
"""Trimite o noua cerere de drepturi a persoanei vizate."""
request_id = self._generate_request_id()
verification_token = self._generate_verification_token()
request = DataSubjectRequest(
request_id=request_id,
request_type=request_type,
subject_email=email,
subject_id=None,
submitted_at=datetime.utcnow(),
deadline=datetime.utcnow() + timedelta(days=self.deadline_days),
status=RequestStatus.AWAITING_VERIFICATION,
verification_token=verification_token,
data_sources=list(self.data_sources.keys())
)
self.requests[request_id] = request
self._send_verification_email(email, verification_token)
return request
def verify_request(self, request_id: str, token: str) -> bool:
"""Verifica identitatea persoanei vizate."""
request = self.requests.get(request_id)
if not request:
return False
if request.verification_token != token:
return False
request.status = RequestStatus.IN_PROGRESS
request.verified_at = datetime.utcnow()
# Start processing asynchronously
asyncio.create_task(self._process_request(request))
return True
async def _process_request(self, request: DataSubjectRequest):
"""Proceseaza cererea verificata in functie de tip."""
handlers = {
RequestType.ACCESS: self._handle_access_request,
RequestType.ERASURE: self._handle_erasure_request,
RequestType.PORTABILITY: self._handle_portability_request,
RequestType.RECTIFICATION: self._handle_rectification_request
}
handler = handlers.get(request.request_type)
if handler:
await handler(request)
async def _handle_access_request(self, request: DataSubjectRequest) -> Dict:
"""Gestioneaza cererea de drept de acces (Articolul 15)."""
all_data = []
for source_name, source in self.data_sources.items():
try:
data = await source.get_subject_data(request.subject_email)
if data:
all_data.append({
'source': source_name,
'categories': source.data_categories,
'data': data,
'legal_basis': source.legal_basis,
'retention_period': source.retention_days,
'recipients': source.data_recipients
})
request.notes.append(f"Date preluate din {source_name}")
except Exception as e:
request.notes.append(f"Eroare la preluarea din {source_name}: {str(e)}")
# Generate access report
report = {
'request_id': request.request_id,
'generated_at': datetime.utcnow().isoformat(),
'data_subject': request.subject_email,
'data_collected': all_data,
'processing_purposes': self._get_processing_purposes(),
'data_sources': list(self.data_sources.keys()),
'third_party_recipients': self._get_all_recipients(),
'retention_policies': self._get_retention_policies(),
'rights_information': self._get_rights_info()
}
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.utcnow()
# Send report to data subject
await self._send_access_report(request.subject_email, report)
return report
async def _handle_erasure_request(self, request: DataSubjectRequest) -> Dict:
"""Gestioneaza cererea de drept la stergere (Articolul 17)."""
results = {'deleted': [], 'retained': [], 'errors': []}
for source_name, source in self.data_sources.items():
try:
# Check if deletion is legally permitted
can_delete, reason = source.can_delete_data(request.subject_email)
if can_delete:
await source.delete_subject_data(request.subject_email)
results['deleted'].append(source_name)
request.notes.append(f"Date sterse din {source_name}")
else:
results['retained'].append({
'source': source_name,
'reason': reason
})
request.notes.append(f"Date pastrate in {source_name}: {reason}")
except Exception as e:
results['errors'].append({
'source': source_name,
'error': str(e)
})
request.notes.append(f"Eroare la stergerea din {source_name}: {str(e)}")
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.utcnow()
# Notify data subject
await self._send_erasure_confirmation(request.subject_email, results)
return results
async def _handle_portability_request(self, request: DataSubjectRequest) -> Dict:
"""Gestioneaza cererea de portabilitate a datelor (Articolul 20)."""
portable_data = []
for source_name, source in self.data_sources.items():
if source.supports_portability:
try:
data = await source.get_portable_data(request.subject_email)
if data:
portable_data.append({
'source': source_name,
'format': 'JSON',
'data': data
})
except Exception as e:
request.notes.append(f"Eroare la exportul din {source_name}: {str(e)}")
# Package as machine-readable format
export_package = {
'export_id': request.request_id,
'created_at': datetime.utcnow().isoformat(),
'format_version': '1.0',
'data_subject': request.subject_email,
'data': portable_data
}
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.utcnow()
# Send download link
await self._send_portability_package(request.subject_email, export_package)
return export_package
async def _handle_rectification_request(self, request: DataSubjectRequest):
"""Gestioneaza cererea de rectificare (Articolul 16)."""
# Would include logic to update incorrect data
pass
def _generate_request_id(self) -> str:
return f"DSR-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{hashlib.sha256(str(datetime.utcnow().timestamp()).encode()).hexdigest()[:8]}"
def _generate_verification_token(self) -> str:
import secrets
return secrets.token_urlsafe(32)
def _send_verification_email(self, email: str, token: str):
# Implementation would send actual email
pass
async def _send_access_report(self, email: str, report: Dict):
pass
async def _send_erasure_confirmation(self, email: str, results: Dict):
pass
async def _send_portability_package(self, email: str, package: Dict):
pass
def _get_processing_purposes(self) -> List[str]:
return ['Livrarea serviciului', 'Comunicare', 'Analiza', 'Conformitate legala']
def _get_all_recipients(self) -> List[str]:
return list(set(r for s in self.data_sources.values() for r in s.data_recipients))
def _get_retention_policies(self) -> Dict:
return {name: f"{source.retention_days} zile" for name, source in self.data_sources.items()}
def _get_rights_info(self) -> Dict:
return {
'access': 'Dreptul de acces la datele tale personale',
'rectification': 'Dreptul de a corecta datele inexacte',
'erasure': 'Dreptul de a solicita stergerea',
'portability': 'Dreptul de a primi datele in format portabil',
'restriction': 'Dreptul de a restrictiona procesarea',
'objection': 'Dreptul de a te opune procesarii',
'complaint': 'Dreptul de a depune plangere la autoritatea de supraveghere'
}
class DataSource(ABC):
def __init__(self, name: str, data_categories: List[str], legal_basis: str, retention_days: int):
self.name = name
self.data_categories = data_categories
self.legal_basis = legal_basis
self.retention_days = retention_days
self.data_recipients: List[str] = []
self.supports_portability = True
@abstractmethod
async def get_subject_data(self, identifier: str) -> Optional[Dict]:
pass
@abstractmethod
async def delete_subject_data(self, identifier: str) -> bool:
pass
@abstractmethod
async def get_portable_data(self, identifier: str) -> Optional[Dict]:
pass
def can_delete_data(self, identifier: str) -> tuple:
"""Verifica daca datele pot fi sterse. Returneaza (poate_sterge, motiv)."""
return (True, None)Sistem de Gestionare a Consimtamantului
Construieste o platforma completa de gestionare a consimtamantului:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set
from datetime import datetime
from enum import Enum
import json
class ConsentPurpose(Enum):
ESSENTIAL = "essential"
ANALYTICS = "analytics"
MARKETING = "marketing"
PERSONALIZATION = "personalization"
THIRD_PARTY = "third_party"
class ConsentAction(Enum):
GRANTED = "granted"
WITHDRAWN = "withdrawn"
UPDATED = "updated"
@dataclass
class ConsentRecord:
consent_id: str
subject_id: str
purpose: ConsentPurpose
granted: bool
granted_at: Optional[datetime]
withdrawn_at: Optional[datetime]
version: str
source: str
ip_address: str
user_agent: str
@dataclass
class ConsentAuditLog:
log_id: str
consent_id: str
action: ConsentAction
timestamp: datetime
old_value: Optional[bool]
new_value: bool
metadata: Dict
class ConsentManagementPlatform:
def __init__(self):
self.consents: Dict[str, Dict[ConsentPurpose, ConsentRecord]] = {}
self.audit_logs: List[ConsentAuditLog] = []
self.consent_version = "2.0"
def record_consent(
self,
subject_id: str,
purposes: Dict[ConsentPurpose, bool],
source: str,
ip_address: str,
user_agent: str
) -> Dict[str, ConsentRecord]:
"""Inregistreaza deciziile de consimtamant pentru scopuri multiple."""
if subject_id not in self.consents:
self.consents[subject_id] = {}
records = {}
timestamp = datetime.utcnow()
for purpose, granted in purposes.items():
# Skip if essential - always required
if purpose == ConsentPurpose.ESSENTIAL:
granted = True
existing = self.consents[subject_id].get(purpose)
old_value = existing.granted if existing else None
consent_id = f"consent_{subject_id}_{purpose.value}_{timestamp.strftime('%Y%m%d%H%M%S')}"
record = ConsentRecord(
consent_id=consent_id,
subject_id=subject_id,
purpose=purpose,
granted=granted,
granted_at=timestamp if granted else None,
withdrawn_at=None if granted else timestamp,
version=self.consent_version,
source=source,
ip_address=ip_address,
user_agent=user_agent
)
self.consents[subject_id][purpose] = record
records[purpose.value] = record
# Audit log
action = ConsentAction.GRANTED if granted else ConsentAction.WITHDRAWN
if old_value is not None and old_value != granted:
action = ConsentAction.UPDATED
self._log_audit(consent_id, action, old_value, granted, {
'source': source,
'ip': ip_address,
'version': self.consent_version
})
return records
def check_consent(self, subject_id: str, purpose: ConsentPurpose) -> bool:
"""Verifica daca subiectul a acordat consimtamantul pentru un scop."""
if purpose == ConsentPurpose.ESSENTIAL:
return True
subject_consents = self.consents.get(subject_id, {})
consent = subject_consents.get(purpose)
return consent.granted if consent else False
def withdraw_consent(self, subject_id: str, purpose: ConsentPurpose) -> bool:
"""Retrage consimtamantul pentru un scop specific."""
if purpose == ConsentPurpose.ESSENTIAL:
return False
subject_consents = self.consents.get(subject_id, {})
consent = subject_consents.get(purpose)
if consent and consent.granted:
old_value = consent.granted
consent.granted = False
consent.withdrawn_at = datetime.utcnow()
self._log_audit(
consent.consent_id,
ConsentAction.WITHDRAWN,
old_value,
False,
{'reason': 'user_withdrawal'}
)
return True
return False
def get_consent_status(self, subject_id: str) -> Dict:
"""Obtine statusul curent al consimtamantului pentru un subiect."""
subject_consents = self.consents.get(subject_id, {})
status = {}
for purpose in ConsentPurpose:
consent = subject_consents.get(purpose)
status[purpose.value] = {
'granted': consent.granted if consent else (purpose == ConsentPurpose.ESSENTIAL),
'granted_at': consent.granted_at.isoformat() if consent and consent.granted_at else None,
'version': consent.version if consent else None
}
return status
def get_audit_trail(self, subject_id: str) -> List[Dict]:
"""Obtine traseul de audit al consimtamantului pentru un subiect."""
subject_consents = self.consents.get(subject_id, {})
consent_ids = {c.consent_id for c in subject_consents.values()}
return [
{
'action': log.action.value,
'timestamp': log.timestamp.isoformat(),
'old_value': log.old_value,
'new_value': log.new_value,
'metadata': log.metadata
}
for log in self.audit_logs
if log.consent_id in consent_ids
]
def export_consent_proof(self, subject_id: str) -> Dict:
"""Exporta inregistrarile de consimtamant ca proba pentru conformitate."""
subject_consents = self.consents.get(subject_id, {})
return {
'subject_id': subject_id,
'exported_at': datetime.utcnow().isoformat(),
'consents': {
purpose.value: {
'consent_id': record.consent_id,
'granted': record.granted,
'timestamp': (record.granted_at or record.withdrawn_at).isoformat(),
'version': record.version,
'source': record.source,
'proof_hash': self._generate_proof_hash(record)
}
for purpose, record in subject_consents.items()
},
'audit_trail': self.get_audit_trail(subject_id)
}
def _log_audit(
self,
consent_id: str,
action: ConsentAction,
old_value: Optional[bool],
new_value: bool,
metadata: Dict
):
log = ConsentAuditLog(
log_id=f"audit_{datetime.utcnow().strftime('%Y%m%d%H%M%S%f')}",
consent_id=consent_id,
action=action,
timestamp=datetime.utcnow(),
old_value=old_value,
new_value=new_value,
metadata=metadata
)
self.audit_logs.append(log)
def _generate_proof_hash(self, record: ConsentRecord) -> str:
data = f"{record.consent_id}:{record.granted}:{record.granted_at}:{record.version}"
return hashlib.sha256(data.encode()).hexdigest()Sistem de Notificare a Incalcarilor de Date
Implementeaza detectia si notificarea incalcarilor:
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from enum import Enum
class BreachSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class BreachStatus(Enum):
DETECTED = "detected"
INVESTIGATING = "investigating"
CONTAINED = "contained"
NOTIFIED_DPA = "notified_dpa"
NOTIFIED_SUBJECTS = "notified_subjects"
CLOSED = "closed"
@dataclass
class DataBreach:
breach_id: str
detected_at: datetime
description: str
severity: BreachSeverity
status: BreachStatus
affected_subjects_count: int
data_categories_affected: List[str]
notification_deadline: datetime
dpa_notified_at: Optional[datetime] = None
subjects_notified_at: Optional[datetime] = None
containment_actions: List[str] = field(default_factory=list)
root_cause: Optional[str] = None
remediation_steps: List[str] = field(default_factory=list)
class BreachNotificationSystem:
def __init__(self):
self.breaches: Dict[str, DataBreach] = {}
self.dpa_notification_hours = 72
def report_breach(
self,
description: str,
severity: BreachSeverity,
affected_count: int,
data_categories: List[str]
) -> DataBreach:
"""Raporteaza o noua incalcare de date."""
breach_id = f"BREACH-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
breach = DataBreach(
breach_id=breach_id,
detected_at=datetime.utcnow(),
description=description,
severity=severity,
status=BreachStatus.DETECTED,
affected_subjects_count=affected_count,
data_categories_affected=data_categories,
notification_deadline=datetime.utcnow() + timedelta(hours=self.dpa_notification_hours)
)
self.breaches[breach_id] = breach
# Trigger immediate response workflow
self._trigger_incident_response(breach)
return breach
def assess_notification_requirement(self, breach: DataBreach) -> Dict:
"""Evalueaza daca notificarea DPA si a subiectilor este necesara."""
# Risk factors
high_risk_categories = ['financial', 'health', 'biometric', 'criminal']
high_risk_data = any(cat in breach.data_categories_affected for cat in high_risk_categories)
assessment = {
'dpa_notification_required': True,
'subject_notification_required': False,
'reasoning': []
}
# DPA notification (Article 33) - required unless unlikely to result in risk
if breach.severity in [BreachSeverity.CRITICAL, BreachSeverity.HIGH]:
assessment['reasoning'].append("Incalcare cu severitate ridicata - notificarea DPA necesara in 72 de ore")
if breach.affected_subjects_count > 100:
assessment['reasoning'].append(f"Incalcare la scara mare ({breach.affected_subjects_count} subiecti)")
# Subject notification (Article 34) - required if high risk to rights
if high_risk_data:
assessment['subject_notification_required'] = True
assessment['reasoning'].append("Categorii de date cu risc ridicat afectate - notificarea subiectilor necesara")
if breach.severity == BreachSeverity.CRITICAL:
assessment['subject_notification_required'] = True
assessment['reasoning'].append("Severitate critica - notificarea subiectilor necesara")
# Check for mitigating factors
if self._has_encryption_protection(breach):
assessment['subject_notification_required'] = False
assessment['reasoning'].append("Datele erau criptate - notificarea subiectilor poate sa nu fie necesara")
return assessment
def _has_encryption_protection(self, breach: DataBreach) -> bool:
# Would check if breached data was encrypted
return False
def generate_dpa_notification(self, breach: DataBreach) -> Dict:
"""Genereaza notificarea pentru Autoritatea de Protectie a Datelor."""
return {
'notification_type': 'Notificare de Incalcare conform Articolului 33',
'controller_details': {
'name': 'Numele Organizatiei',
'contact': 'dpo@organization.com',
'address': 'Adresa Organizatiei'
},
'breach_details': {
'breach_id': breach.breach_id,
'nature_of_breach': breach.description,
'date_detected': breach.detected_at.isoformat(),
'categories_of_data': breach.data_categories_affected,
'approximate_subjects': breach.affected_subjects_count,
'likely_consequences': self._assess_consequences(breach),
'measures_taken': breach.containment_actions,
'measures_proposed': breach.remediation_steps
},
'dpo_contact': {
'name': 'Responsabil cu Protectia Datelor',
'email': 'dpo@organization.com',
'phone': '+1-XXX-XXX-XXXX'
},
'generated_at': datetime.utcnow().isoformat()
}
def generate_subject_notification(self, breach: DataBreach) -> Dict:
"""Genereaza notificarea pentru persoanele vizate afectate."""
return {
'subject': 'Important: Incident de Securitate care Afecteaza Datele Tale',
'body': {
'greeting': 'Stimate Utilizator,',
'breach_description': f"Va scriem pentru a va informa despre un incident de securitate care ar fi putut afecta datele dumneavoastra personale. {breach.description}",
'data_affected': f"Urmatoarele categorii de date ar fi putut fi afectate: {', '.join(breach.data_categories_affected)}",
'likely_consequences': self._assess_consequences(breach),
'measures_taken': "Am luat masuri imediate pentru a contine incidentul si a preveni accesul neautorizat suplimentar.",
'recommendations': [
"Monitorizeaza-ti conturile pentru activitate suspecta",
"Ia in considerare schimbarea parolei",
"Fii precaut la tentativele de phishing"
],
'contact': "Daca aveti intrebari, va rugam sa contactati Responsabilul nostru cu Protectia Datelor la dpo@organization.com",
'apology': "Ne cerem sincer scuze pentru orice inconvenient cauzat."
},
'generated_at': datetime.utcnow().isoformat()
}
def _assess_consequences(self, breach: DataBreach) -> List[str]:
consequences = []
if 'financial' in breach.data_categories_affected:
consequences.append("Potential de frauda financiara sau furt de identitate")
if 'credentials' in breach.data_categories_affected:
consequences.append("Potential de acces neautorizat la conturi")
if 'health' in breach.data_categories_affected:
consequences.append("Potential de divulgare a informatiilor sensibile de sanatate")
if not consequences:
consequences.append("Potential impact asupra confidentialitatii")
return consequences
def _trigger_incident_response(self, breach: DataBreach):
"""Declanseaza fluxul automat de raspuns la incident."""
# Would integrate with incident management system
pass
def update_breach_status(self, breach_id: str, status: BreachStatus, notes: str = None):
"""Actualizeaza statusul incalcarii."""
breach = self.breaches.get(breach_id)
if breach:
breach.status = status
if status == BreachStatus.NOTIFIED_DPA:
breach.dpa_notified_at = datetime.utcnow()
if status == BreachStatus.NOTIFIED_SUBJECTS:
breach.subjects_notified_at = datetime.utcnow()
def get_compliance_report(self) -> Dict:
"""Genereaza raportul de conformitate GDPR pentru incalcari."""
total = len(self.breaches)
notified_on_time = sum(
1 for b in self.breaches.values()
if b.dpa_notified_at and b.dpa_notified_at <= b.notification_deadline
)
return {
'report_period': {
'start': min(b.detected_at for b in self.breaches.values()).isoformat() if self.breaches else None,
'end': datetime.utcnow().isoformat()
},
'total_breaches': total,
'by_severity': {
severity.value: sum(1 for b in self.breaches.values() if b.severity == severity)
for severity in BreachSeverity
},
'notification_compliance': {
'total_requiring_notification': total,
'notified_within_deadline': notified_on_time,
'compliance_rate': notified_on_time / total if total > 0 else 1.0
},
'breaches': [
{
'id': b.breach_id,
'detected': b.detected_at.isoformat(),
'severity': b.severity.value,
'status': b.status.value,
'affected_subjects': b.affected_subjects_count
}
for b in self.breaches.values()
]
}Concluzie
Conformitatea GDPR necesita implementare tehnica sistematica in toate sistemele de procesare a datelor. Automatizeaza gestionarea drepturilor persoanelor vizate, implementeaza gestionare robusta a consimtamantului cu trasee de audit si mentine capabilitati de notificare a incalcarilor. Testarea si documentarea periodica asigura conformitatea continua. Privacy by design ar trebui sa ghideze toate deciziile de arhitectura a sistemului.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →