Conformitatea cu GDPR necesita mai mult decat documente de politica - presupune implementare tehnica la nivelul stocarii, procesarii si sistemelor de acces la date. Acest ghid ofera solutii practice de inginerie pentru indeplinirea cerintelor GDPR in aplicatii moderne.
Maparea si Descoperirea Datelor
Descoperire Automata a Datelor
Construieste sisteme care identifica si clasifica datele personale:
# data_discovery.py
import re
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
import hashlib
class DataCategory(Enum):
IDENTIFIER = "identifier" # Name, ID numbers
CONTACT = "contact" # Email, phone, address
FINANCIAL = "financial" # Payment info
SENSITIVE = "sensitive" # Health, religion, etc.
BEHAVIORAL = "behavioral" # Browsing, preferences
TECHNICAL = "technical" # IP, device info
@dataclass
class PersonalDataField:
field_name: str
table_name: str
data_category: DataCategory
is_encrypted: bool
retention_days: Optional[int]
legal_basis: str
class DataDiscoveryEngine:
"""Automated personal data discovery in databases"""
# Patterns for identifying personal data fields
FIELD_PATTERNS = {
DataCategory.IDENTIFIER: [
r'.*name.*', r'.*_id$', r'.*ssn.*', r'.*passport.*',
r'.*national.*id.*', r'.*license.*'
],
DataCategory.CONTACT: [
r'.*email.*', r'.*phone.*', r'.*address.*', r'.*zip.*',
r'.*postal.*', r'.*city.*', r'.*country.*'
],
DataCategory.FINANCIAL: [
r'.*card.*', r'.*account.*', r'.*iban.*', r'.*bank.*',
r'.*payment.*', r'.*billing.*'
],
DataCategory.SENSITIVE: [
r'.*health.*', r'.*medical.*', r'.*religion.*',
r'.*ethnicity.*', r'.*political.*', r'.*biometric.*'
],
DataCategory.BEHAVIORAL: [
r'.*preference.*', r'.*history.*', r'.*activity.*',
r'.*session.*', r'.*click.*', r'.*view.*'
],
DataCategory.TECHNICAL: [
r'.*ip.*', r'.*device.*', r'.*browser.*', r'.*user.*agent.*',
r'.*cookie.*', r'.*fingerprint.*'
]
}
def __init__(self, db_connection):
self.db = db_connection
def discover_fields(self) -> List[PersonalDataField]:
"""Scan database schema for personal data fields"""
discovered = []
# Get all tables and columns
tables = self._get_all_tables()
for table in tables:
columns = self._get_table_columns(table)
for column in columns:
category = self._classify_field(column['name'])
if category:
discovered.append(PersonalDataField(
field_name=column['name'],
table_name=table,
data_category=category,
is_encrypted=self._check_encryption(table, column['name']),
retention_days=self._get_retention_policy(table),
legal_basis=self._determine_legal_basis(category)
))
return discovered
def _classify_field(self, field_name: str) -> Optional[DataCategory]:
"""Classify field based on naming patterns"""
field_lower = field_name.lower()
for category, patterns in self.FIELD_PATTERNS.items():
for pattern in patterns:
if re.match(pattern, field_lower):
return category
return None
def _check_encryption(self, table: str, column: str) -> bool:
"""Check if field is encrypted (implementation specific)"""
# Check for encryption indicators in column metadata
# This varies by database system
return False
def _get_retention_policy(self, table: str) -> Optional[int]:
"""Get retention policy for table (from metadata or config)"""
retention_policies = {
'users': 365 * 3, # 3 years
'orders': 365 * 7, # 7 years (financial)
'sessions': 30, # 30 days
'logs': 90, # 90 days
}
return retention_policies.get(table)
def _determine_legal_basis(self, category: DataCategory) -> str:
"""Determine appropriate legal basis for processing"""
legal_bases = {
DataCategory.IDENTIFIER: "contract",
DataCategory.CONTACT: "contract",
DataCategory.FINANCIAL: "legal_obligation",
DataCategory.SENSITIVE: "explicit_consent",
DataCategory.BEHAVIORAL: "consent",
DataCategory.TECHNICAL: "legitimate_interest"
}
return legal_bases.get(category, "consent")
def generate_data_map(self) -> Dict:
"""Generate complete data mapping document"""
fields = self.discover_fields()
data_map = {
"generated_at": datetime.utcnow().isoformat(),
"total_fields": len(fields),
"by_category": {},
"by_table": {},
"fields": []
}
for field in fields:
# Group by category
cat = field.data_category.value
if cat not in data_map["by_category"]:
data_map["by_category"][cat] = []
data_map["by_category"][cat].append(field.field_name)
# Group by table
if field.table_name not in data_map["by_table"]:
data_map["by_table"][field.table_name] = []
data_map["by_table"][field.table_name].append(field.field_name)
# Add field details
data_map["fields"].append({
"table": field.table_name,
"field": field.field_name,
"category": cat,
"encrypted": field.is_encrypted,
"retention_days": field.retention_days,
"legal_basis": field.legal_basis
})
return data_mapSistem de Gestionare a Consimtamantului
Colectarea si Stocarea Consimtamantului
# consent_manager.py
from datetime import datetime
from typing import List, Optional, Dict
from dataclasses import dataclass
import json
import hashlib
@dataclass
class ConsentRecord:
user_id: str
purpose: str
granted: bool
timestamp: datetime
ip_address: str
user_agent: str
version: str # Consent form version
proof_hash: str # Cryptographic proof
class ConsentManager:
"""GDPR-compliant consent management"""
PURPOSES = [
"marketing_email",
"marketing_sms",
"analytics",
"personalization",
"third_party_sharing",
"profiling"
]
def __init__(self, db_connection):
self.db = db_connection
def record_consent(
self,
user_id: str,
purpose: str,
granted: bool,
ip_address: str,
user_agent: str,
consent_form_version: str
) -> ConsentRecord:
"""Record user consent with cryptographic proof"""
if purpose not in self.PURPOSES:
raise ValueError(f"Invalid purpose: {purpose}")
timestamp = datetime.utcnow()
# Create cryptographic proof
proof_data = {
"user_id": user_id,
"purpose": purpose,
"granted": granted,
"timestamp": timestamp.isoformat(),
"version": consent_form_version
}
proof_hash = hashlib.sha256(
json.dumps(proof_data, sort_keys=True).encode()
).hexdigest()
record = ConsentRecord(
user_id=user_id,
purpose=purpose,
granted=granted,
timestamp=timestamp,
ip_address=self._hash_ip(ip_address), # Privacy-preserving
user_agent=user_agent,
version=consent_form_version,
proof_hash=proof_hash
)
# Store in database (append-only log)
self._store_consent(record)
return record
def _hash_ip(self, ip_address: str) -> str:
"""Hash IP for privacy while maintaining proof"""
return hashlib.sha256(ip_address.encode()).hexdigest()[:16]
def _store_consent(self, record: ConsentRecord):
"""Store consent in append-only audit log"""
self.db.execute("""
INSERT INTO consent_log
(user_id, purpose, granted, timestamp, ip_hash, user_agent, version, proof_hash)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
record.user_id,
record.purpose,
record.granted,
record.timestamp,
record.ip_address,
record.user_agent,
record.version,
record.proof_hash
))
def get_current_consent(self, user_id: str) -> Dict[str, bool]:
"""Get current consent status for all purposes"""
result = {}
for purpose in self.PURPOSES:
# Get most recent consent record
row = self.db.fetchone("""
SELECT granted FROM consent_log
WHERE user_id = %s AND purpose = %s
ORDER BY timestamp DESC
LIMIT 1
""", (user_id, purpose))
result[purpose] = row['granted'] if row else False
return result
def check_consent(self, user_id: str, purpose: str) -> bool:
"""Check if user has given consent for specific purpose"""
if purpose not in self.PURPOSES:
return False
row = self.db.fetchone("""
SELECT granted FROM consent_log
WHERE user_id = %s AND purpose = %s
ORDER BY timestamp DESC
LIMIT 1
""", (user_id, purpose))
return row['granted'] if row else False
def get_consent_history(self, user_id: str) -> List[ConsentRecord]:
"""Get full consent history for data subject access request"""
rows = self.db.fetchall("""
SELECT * FROM consent_log
WHERE user_id = %s
ORDER BY timestamp DESC
""", (user_id,))
return [ConsentRecord(**row) for row in rows]
def withdraw_all_consent(self, user_id: str, ip_address: str, user_agent: str):
"""Withdraw all consent (right to withdraw)"""
for purpose in self.PURPOSES:
self.record_consent(
user_id=user_id,
purpose=purpose,
granted=False,
ip_address=ip_address,
user_agent=user_agent,
consent_form_version="withdrawal"
)Endpoint-uri API pentru Consimtamant
# consent_api.py
from fastapi import APIRouter, Request, HTTPException
from pydantic import BaseModel
from typing import Dict, List
router = APIRouter(prefix="/api/consent", tags=["consent"])
class ConsentUpdate(BaseModel):
purposes: Dict[str, bool]
class ConsentResponse(BaseModel):
user_id: str
consents: Dict[str, bool]
last_updated: str
@router.get("/", response_model=ConsentResponse)
async def get_consent_status(request: Request):
"""Get current consent status for authenticated user"""
user_id = request.state.user_id
consent_manager = request.app.state.consent_manager
consents = consent_manager.get_current_consent(user_id)
return ConsentResponse(
user_id=user_id,
consents=consents,
last_updated=datetime.utcnow().isoformat()
)
@router.post("/", response_model=ConsentResponse)
async def update_consent(
request: Request,
consent_update: ConsentUpdate
):
"""Update consent preferences"""
user_id = request.state.user_id
consent_manager = request.app.state.consent_manager
ip_address = request.client.host
user_agent = request.headers.get("user-agent", "unknown")
for purpose, granted in consent_update.purposes.items():
consent_manager.record_consent(
user_id=user_id,
purpose=purpose,
granted=granted,
ip_address=ip_address,
user_agent=user_agent,
consent_form_version="v2.0"
)
# Return updated status
consents = consent_manager.get_current_consent(user_id)
return ConsentResponse(
user_id=user_id,
consents=consents,
last_updated=datetime.utcnow().isoformat()
)
@router.post("/withdraw-all")
async def withdraw_all_consent(request: Request):
"""Withdraw all consent"""
user_id = request.state.user_id
consent_manager = request.app.state.consent_manager
consent_manager.withdraw_all_consent(
user_id=user_id,
ip_address=request.client.host,
user_agent=request.headers.get("user-agent", "unknown")
)
return {"status": "success", "message": "All consent withdrawn"}Implementarea Dreptului la Stergere
Serviciu de Stergere a Datelor
# erasure_service.py
from datetime import datetime
from typing import List, Dict
from dataclasses import dataclass
import logging
@dataclass
class ErasureRequest:
request_id: str
user_id: str
requested_at: datetime
status: str # pending, processing, completed, failed
tables_processed: List[str]
errors: List[str]
class ErasureService:
"""Implement right to erasure (right to be forgotten)"""
# Tables containing personal data with deletion strategy
DATA_TABLES = {
"users": {
"strategy": "delete",
"cascade_check": ["orders", "comments", "sessions"]
},
"orders": {
"strategy": "anonymize", # Keep for financial records
"fields_to_clear": ["customer_name", "email", "address"],
"anonymize_with": "DELETED_USER"
},
"comments": {
"strategy": "delete"
},
"sessions": {
"strategy": "delete"
},
"audit_logs": {
"strategy": "anonymize", # Required for compliance
"fields_to_clear": ["user_email", "ip_address"],
"anonymize_with": None # NULL
},
"marketing_lists": {
"strategy": "delete"
}
}
def __init__(self, db_connection):
self.db = db_connection
self.logger = logging.getLogger(__name__)
def create_erasure_request(self, user_id: str) -> ErasureRequest:
"""Create new erasure request"""
request_id = f"ER-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{user_id[:8]}"
request = ErasureRequest(
request_id=request_id,
user_id=user_id,
requested_at=datetime.utcnow(),
status="pending",
tables_processed=[],
errors=[]
)
# Store request
self.db.execute("""
INSERT INTO erasure_requests
(request_id, user_id, requested_at, status)
VALUES (%s, %s, %s, %s)
""", (request_id, user_id, request.requested_at, "pending"))
return request
def process_erasure(self, request_id: str) -> ErasureRequest:
"""Process erasure request"""
# Get request
row = self.db.fetchone(
"SELECT * FROM erasure_requests WHERE request_id = %s",
(request_id,)
)
if not row:
raise ValueError(f"Request not found: {request_id}")
request = ErasureRequest(**row)
request.status = "processing"
self._update_request_status(request)
try:
# Process each table
for table_name, config in self.DATA_TABLES.items():
try:
if config["strategy"] == "delete":
self._delete_from_table(table_name, request.user_id)
elif config["strategy"] == "anonymize":
self._anonymize_in_table(
table_name,
request.user_id,
config["fields_to_clear"],
config.get("anonymize_with")
)
request.tables_processed.append(table_name)
except Exception as e:
self.logger.error(f"Error processing {table_name}: {e}")
request.errors.append(f"{table_name}: {str(e)}")
request.status = "completed" if not request.errors else "completed_with_errors"
except Exception as e:
request.status = "failed"
request.errors.append(str(e))
self.logger.error(f"Erasure failed: {e}")
self._update_request_status(request)
self._create_erasure_certificate(request)
return request
def _delete_from_table(self, table: str, user_id: str):
"""Delete user data from table"""
self.db.execute(
f"DELETE FROM {table} WHERE user_id = %s",
(user_id,)
)
self.logger.info(f"Deleted from {table} for user {user_id[:8]}...")
def _anonymize_in_table(
self,
table: str,
user_id: str,
fields: List[str],
replacement: str
):
"""Anonymize user data in table"""
set_clauses = []
for field in fields:
if replacement is None:
set_clauses.append(f"{field} = NULL")
else:
set_clauses.append(f"{field} = %s")
query = f"UPDATE {table} SET {', '.join(set_clauses)} WHERE user_id = %s"
params = []
if replacement:
params.extend([replacement] * len(fields))
params.append(user_id)
self.db.execute(query, tuple(params))
self.logger.info(f"Anonymized {fields} in {table} for user {user_id[:8]}...")
def _create_erasure_certificate(self, request: ErasureRequest) -> Dict:
"""Create certificate of erasure for compliance"""
certificate = {
"certificate_id": f"CERT-{request.request_id}",
"type": "erasure_certificate",
"request_id": request.request_id,
"user_id_hash": hashlib.sha256(request.user_id.encode()).hexdigest(),
"requested_at": request.requested_at.isoformat(),
"completed_at": datetime.utcnow().isoformat(),
"tables_processed": request.tables_processed,
"status": request.status,
"legal_basis": "GDPR Article 17 - Right to Erasure"
}
# Store certificate
self.db.execute("""
INSERT INTO erasure_certificates (certificate_id, data)
VALUES (%s, %s)
""", (certificate["certificate_id"], json.dumps(certificate)))
return certificateCerere de Acces a Persoanei Vizate (DSAR)
Procesare Automata DSAR
# dsar_service.py
from datetime import datetime
from typing import Dict, List
import json
import zipfile
import io
class DSARService:
"""Handle Data Subject Access Requests"""
def __init__(self, db_connection, data_discovery):
self.db = db_connection
self.discovery = data_discovery
def create_dsar_request(self, user_id: str, email: str) -> str:
"""Create new DSAR request"""
request_id = f"DSAR-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
self.db.execute("""
INSERT INTO dsar_requests
(request_id, user_id, email, requested_at, status, deadline)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
request_id,
user_id,
email,
datetime.utcnow(),
"pending",
datetime.utcnow() + timedelta(days=30) # GDPR 30-day deadline
))
return request_id
def process_dsar(self, request_id: str) -> bytes:
"""Process DSAR and generate data export"""
# Get request details
request = self.db.fetchone(
"SELECT * FROM dsar_requests WHERE request_id = %s",
(request_id,)
)
if not request:
raise ValueError(f"Request not found: {request_id}")
user_id = request['user_id']
# Collect all personal data
data_export = {
"export_info": {
"request_id": request_id,
"generated_at": datetime.utcnow().isoformat(),
"data_controller": "Your Company Name",
"contact": "privacy@example.com"
},
"personal_data": {},
"consent_history": [],
"processing_purposes": []
}
# Get data from each table
data_map = self.discovery.generate_data_map()
for table, fields in data_map["by_table"].items():
table_data = self._export_table_data(table, fields, user_id)
if table_data:
data_export["personal_data"][table] = table_data
# Get consent history
data_export["consent_history"] = self._get_consent_history(user_id)
# Get processing purposes
data_export["processing_purposes"] = self._get_processing_purposes()
# Generate downloadable package
return self._create_export_package(data_export)
def _export_table_data(
self,
table: str,
fields: List[str],
user_id: str
) -> List[Dict]:
"""Export data from specific table"""
fields_str = ", ".join(fields)
rows = self.db.fetchall(
f"SELECT {fields_str} FROM {table} WHERE user_id = %s",
(user_id,)
)
return [dict(row) for row in rows]
def _get_consent_history(self, user_id: str) -> List[Dict]:
"""Get consent history"""
rows = self.db.fetchall("""
SELECT purpose, granted, timestamp, version
FROM consent_log
WHERE user_id = %s
ORDER BY timestamp DESC
""", (user_id,))
return [dict(row) for row in rows]
def _get_processing_purposes(self) -> List[Dict]:
"""Get data processing purposes documentation"""
return [
{
"purpose": "Service Delivery",
"legal_basis": "Contract",
"data_categories": ["identity", "contact"],
"retention": "Duration of contract + 3 years"
},
{
"purpose": "Marketing",
"legal_basis": "Consent",
"data_categories": ["contact", "preferences"],
"retention": "Until consent withdrawn"
},
{
"purpose": "Analytics",
"legal_basis": "Legitimate Interest",
"data_categories": ["usage", "technical"],
"retention": "2 years"
}
]
def _create_export_package(self, data: Dict) -> bytes:
"""Create downloadable ZIP package"""
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
# Main data file
zf.writestr(
'personal_data.json',
json.dumps(data, indent=2, default=str)
)
# Human-readable summary
summary = self._generate_summary(data)
zf.writestr('README.txt', summary)
# Separate files per data category
for category, records in data["personal_data"].items():
zf.writestr(
f'data/{category}.json',
json.dumps(records, indent=2, default=str)
)
buffer.seek(0)
return buffer.getvalue()
def _generate_summary(self, data: Dict) -> str:
"""Generate human-readable summary"""
summary = f"""
EXPORT CERERE DE ACCES PERSOANA VIZATA
======================================
ID Cerere: {data['export_info']['request_id']}
Generat: {data['export_info']['generated_at']}
Operator de Date: {data['export_info']['data_controller']}
CATEGORII DE DATE INCLUSE:
"""
for category in data["personal_data"].keys():
count = len(data["personal_data"][category])
summary += f"- {category}: {count} inregistrari\n"
summary += """
SCOPURI DE PRELUCRARE:
"""
for purpose in data["processing_purposes"]:
summary += f"- {purpose['purpose']} (Temeiul legal: {purpose['legal_basis']})\n"
summary += """
DREPTURILE TALE:
- Dreptul la rectificare (corectarea datelor inexacte)
- Dreptul la stergere (solicitarea stergerii)
- Dreptul la restrictionarea prelucrarii
- Dreptul la portabilitatea datelor
- Dreptul de opozitie la prelucrare
- Dreptul de retragere a consimtamantului
Pentru a exercita aceste drepturi, contacteaza: privacy@example.com
"""
return summaryImplementarea Criptarii Datelor
Criptare la Nivel de Camp
# encryption_service.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
import base64
import os
class FieldEncryption:
"""Field-level encryption for personal data"""
def __init__(self, master_key: bytes):
self.master_key = master_key
self._fernet_cache = {}
def _derive_key(self, field_name: str) -> bytes:
"""Derive field-specific key from master key"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=field_name.encode(),
iterations=100000,
backend=default_backend()
)
return base64.urlsafe_b64encode(kdf.derive(self.master_key))
def _get_fernet(self, field_name: str) -> Fernet:
"""Get or create Fernet instance for field"""
if field_name not in self._fernet_cache:
key = self._derive_key(field_name)
self._fernet_cache[field_name] = Fernet(key)
return self._fernet_cache[field_name]
def encrypt(self, field_name: str, value: str) -> str:
"""Encrypt a field value"""
if not value:
return value
fernet = self._get_fernet(field_name)
encrypted = fernet.encrypt(value.encode())
return base64.urlsafe_b64encode(encrypted).decode()
def decrypt(self, field_name: str, encrypted_value: str) -> str:
"""Decrypt a field value"""
if not encrypted_value:
return encrypted_value
fernet = self._get_fernet(field_name)
decoded = base64.urlsafe_b64decode(encrypted_value.encode())
return fernet.decrypt(decoded).decode()
# Database wrapper with automatic encryption
class EncryptedDatabase:
"""Database wrapper with transparent field encryption"""
ENCRYPTED_FIELDS = {
"users": ["email", "phone", "address"],
"orders": ["customer_name", "billing_address"],
"payments": ["card_last_four"]
}
def __init__(self, db_connection, encryption: FieldEncryption):
self.db = db_connection
self.encryption = encryption
def insert(self, table: str, data: Dict) -> None:
"""Insert with automatic encryption"""
encrypted_data = self._encrypt_fields(table, data)
columns = ", ".join(encrypted_data.keys())
placeholders = ", ".join(["%s"] * len(encrypted_data))
self.db.execute(
f"INSERT INTO {table} ({columns}) VALUES ({placeholders})",
tuple(encrypted_data.values())
)
def select(self, table: str, where: Dict) -> List[Dict]:
"""Select with automatic decryption"""
rows = self.db.fetchall(
f"SELECT * FROM {table} WHERE " +
" AND ".join([f"{k} = %s" for k in where.keys()]),
tuple(where.values())
)
return [self._decrypt_fields(table, dict(row)) for row in rows]
def _encrypt_fields(self, table: str, data: Dict) -> Dict:
"""Encrypt sensitive fields"""
result = data.copy()
fields_to_encrypt = self.ENCRYPTED_FIELDS.get(table, [])
for field in fields_to_encrypt:
if field in result and result[field]:
result[field] = self.encryption.encrypt(
f"{table}.{field}",
result[field]
)
return result
def _decrypt_fields(self, table: str, data: Dict) -> Dict:
"""Decrypt sensitive fields"""
result = data.copy()
fields_to_decrypt = self.ENCRYPTED_FIELDS.get(table, [])
for field in fields_to_decrypt:
if field in result and result[field]:
result[field] = self.encryption.decrypt(
f"{table}.{field}",
result[field]
)
return resultRezumat
Conformitatea cu GDPR necesita implementare tehnica pe mai multe domenii:
- Maparea datelor: Descoperire si clasificare automata a datelor personale
- Gestionarea consimtamantului: Consimtamant granular, auditabil, cu dovada criptografica
- Dreptul la stergere: Stergere si anonimizare sistematica
- Procesare DSAR: Export automat al datelor in termenul de 30 de zile
- Criptare: Criptare la nivel de camp pentru datele sensibile
Aceste implementari ofera fundamentul tehnic pentru conformitatea GDPR, mentinand in acelasi timp eficienta operationala. Auditurile si actualizarile regulate asigura continuitatea conformitatii pe masura ce reglementarile evolueaza.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →