GDPR compliance requires more than policy documents - it demands technical implementation across data storage, processing, and access systems. This guide provides practical engineering solutions for meeting GDPR requirements in modern applications.
Data Mapping and Discovery
Automated Data Discovery
Build systems to identify and classify personal data:
# data_discovery.py
import re
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
import hashlib
class DataCategory(Enum):
IDENTIFIER = "identifier" # Name, ID numbers
CONTACT = "contact" # Email, phone, address
FINANCIAL = "financial" # Payment info
SENSITIVE = "sensitive" # Health, religion, etc.
BEHAVIORAL = "behavioral" # Browsing, preferences
TECHNICAL = "technical" # IP, device info
@dataclass
class PersonalDataField:
field_name: str
table_name: str
data_category: DataCategory
is_encrypted: bool
retention_days: Optional[int]
legal_basis: str
class DataDiscoveryEngine:
"""Automated personal data discovery in databases"""
# Patterns for identifying personal data fields
FIELD_PATTERNS = {
DataCategory.IDENTIFIER: [
r'.*name.*', r'.*_id$', r'.*ssn.*', r'.*passport.*',
r'.*national.*id.*', r'.*license.*'
],
DataCategory.CONTACT: [
r'.*email.*', r'.*phone.*', r'.*address.*', r'.*zip.*',
r'.*postal.*', r'.*city.*', r'.*country.*'
],
DataCategory.FINANCIAL: [
r'.*card.*', r'.*account.*', r'.*iban.*', r'.*bank.*',
r'.*payment.*', r'.*billing.*'
],
DataCategory.SENSITIVE: [
r'.*health.*', r'.*medical.*', r'.*religion.*',
r'.*ethnicity.*', r'.*political.*', r'.*biometric.*'
],
DataCategory.BEHAVIORAL: [
r'.*preference.*', r'.*history.*', r'.*activity.*',
r'.*session.*', r'.*click.*', r'.*view.*'
],
DataCategory.TECHNICAL: [
r'.*ip.*', r'.*device.*', r'.*browser.*', r'.*user.*agent.*',
r'.*cookie.*', r'.*fingerprint.*'
]
}
def __init__(self, db_connection):
self.db = db_connection
def discover_fields(self) -> List[PersonalDataField]:
"""Scan database schema for personal data fields"""
discovered = []
# Get all tables and columns
tables = self._get_all_tables()
for table in tables:
columns = self._get_table_columns(table)
for column in columns:
category = self._classify_field(column['name'])
if category:
discovered.append(PersonalDataField(
field_name=column['name'],
table_name=table,
data_category=category,
is_encrypted=self._check_encryption(table, column['name']),
retention_days=self._get_retention_policy(table),
legal_basis=self._determine_legal_basis(category)
))
return discovered
def _classify_field(self, field_name: str) -> Optional[DataCategory]:
"""Classify field based on naming patterns"""
field_lower = field_name.lower()
for category, patterns in self.FIELD_PATTERNS.items():
for pattern in patterns:
if re.match(pattern, field_lower):
return category
return None
def _check_encryption(self, table: str, column: str) -> bool:
"""Check if field is encrypted (implementation specific)"""
# Check for encryption indicators in column metadata
# This varies by database system
return False
def _get_retention_policy(self, table: str) -> Optional[int]:
"""Get retention policy for table (from metadata or config)"""
retention_policies = {
'users': 365 * 3, # 3 years
'orders': 365 * 7, # 7 years (financial)
'sessions': 30, # 30 days
'logs': 90, # 90 days
}
return retention_policies.get(table)
def _determine_legal_basis(self, category: DataCategory) -> str:
"""Determine appropriate legal basis for processing"""
legal_bases = {
DataCategory.IDENTIFIER: "contract",
DataCategory.CONTACT: "contract",
DataCategory.FINANCIAL: "legal_obligation",
DataCategory.SENSITIVE: "explicit_consent",
DataCategory.BEHAVIORAL: "consent",
DataCategory.TECHNICAL: "legitimate_interest"
}
return legal_bases.get(category, "consent")
def generate_data_map(self) -> Dict:
"""Generate complete data mapping document"""
fields = self.discover_fields()
data_map = {
"generated_at": datetime.utcnow().isoformat(),
"total_fields": len(fields),
"by_category": {},
"by_table": {},
"fields": []
}
for field in fields:
# Group by category
cat = field.data_category.value
if cat not in data_map["by_category"]:
data_map["by_category"][cat] = []
data_map["by_category"][cat].append(field.field_name)
# Group by table
if field.table_name not in data_map["by_table"]:
data_map["by_table"][field.table_name] = []
data_map["by_table"][field.table_name].append(field.field_name)
# Add field details
data_map["fields"].append({
"table": field.table_name,
"field": field.field_name,
"category": cat,
"encrypted": field.is_encrypted,
"retention_days": field.retention_days,
"legal_basis": field.legal_basis
})
return data_mapConsent Management System
Consent Collection and Storage
# consent_manager.py
from datetime import datetime
from typing import List, Optional, Dict
from dataclasses import dataclass
import json
import hashlib
@dataclass
class ConsentRecord:
user_id: str
purpose: str
granted: bool
timestamp: datetime
ip_address: str
user_agent: str
version: str # Consent form version
proof_hash: str # Cryptographic proof
class ConsentManager:
"""GDPR-compliant consent management"""
PURPOSES = [
"marketing_email",
"marketing_sms",
"analytics",
"personalization",
"third_party_sharing",
"profiling"
]
def __init__(self, db_connection):
self.db = db_connection
def record_consent(
self,
user_id: str,
purpose: str,
granted: bool,
ip_address: str,
user_agent: str,
consent_form_version: str
) -> ConsentRecord:
"""Record user consent with cryptographic proof"""
if purpose not in self.PURPOSES:
raise ValueError(f"Invalid purpose: {purpose}")
timestamp = datetime.utcnow()
# Create cryptographic proof
proof_data = {
"user_id": user_id,
"purpose": purpose,
"granted": granted,
"timestamp": timestamp.isoformat(),
"version": consent_form_version
}
proof_hash = hashlib.sha256(
json.dumps(proof_data, sort_keys=True).encode()
).hexdigest()
record = ConsentRecord(
user_id=user_id,
purpose=purpose,
granted=granted,
timestamp=timestamp,
ip_address=self._hash_ip(ip_address), # Privacy-preserving
user_agent=user_agent,
version=consent_form_version,
proof_hash=proof_hash
)
# Store in database (append-only log)
self._store_consent(record)
return record
def _hash_ip(self, ip_address: str) -> str:
"""Hash IP for privacy while maintaining proof"""
return hashlib.sha256(ip_address.encode()).hexdigest()[:16]
def _store_consent(self, record: ConsentRecord):
"""Store consent in append-only audit log"""
self.db.execute("""
INSERT INTO consent_log
(user_id, purpose, granted, timestamp, ip_hash, user_agent, version, proof_hash)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
record.user_id,
record.purpose,
record.granted,
record.timestamp,
record.ip_address,
record.user_agent,
record.version,
record.proof_hash
))
def get_current_consent(self, user_id: str) -> Dict[str, bool]:
"""Get current consent status for all purposes"""
result = {}
for purpose in self.PURPOSES:
# Get most recent consent record
row = self.db.fetchone("""
SELECT granted FROM consent_log
WHERE user_id = %s AND purpose = %s
ORDER BY timestamp DESC
LIMIT 1
""", (user_id, purpose))
result[purpose] = row['granted'] if row else False
return result
def check_consent(self, user_id: str, purpose: str) -> bool:
"""Check if user has given consent for specific purpose"""
if purpose not in self.PURPOSES:
return False
row = self.db.fetchone("""
SELECT granted FROM consent_log
WHERE user_id = %s AND purpose = %s
ORDER BY timestamp DESC
LIMIT 1
""", (user_id, purpose))
return row['granted'] if row else False
def get_consent_history(self, user_id: str) -> List[ConsentRecord]:
"""Get full consent history for data subject access request"""
rows = self.db.fetchall("""
SELECT * FROM consent_log
WHERE user_id = %s
ORDER BY timestamp DESC
""", (user_id,))
return [ConsentRecord(**row) for row in rows]
def withdraw_all_consent(self, user_id: str, ip_address: str, user_agent: str):
"""Withdraw all consent (right to withdraw)"""
for purpose in self.PURPOSES:
self.record_consent(
user_id=user_id,
purpose=purpose,
granted=False,
ip_address=ip_address,
user_agent=user_agent,
consent_form_version="withdrawal"
)Consent API Endpoints
# consent_api.py
from fastapi import APIRouter, Request, HTTPException
from pydantic import BaseModel
from typing import Dict, List
router = APIRouter(prefix="/api/consent", tags=["consent"])
class ConsentUpdate(BaseModel):
purposes: Dict[str, bool]
class ConsentResponse(BaseModel):
user_id: str
consents: Dict[str, bool]
last_updated: str
@router.get("/", response_model=ConsentResponse)
async def get_consent_status(request: Request):
"""Get current consent status for authenticated user"""
user_id = request.state.user_id
consent_manager = request.app.state.consent_manager
consents = consent_manager.get_current_consent(user_id)
return ConsentResponse(
user_id=user_id,
consents=consents,
last_updated=datetime.utcnow().isoformat()
)
@router.post("/", response_model=ConsentResponse)
async def update_consent(
request: Request,
consent_update: ConsentUpdate
):
"""Update consent preferences"""
user_id = request.state.user_id
consent_manager = request.app.state.consent_manager
ip_address = request.client.host
user_agent = request.headers.get("user-agent", "unknown")
for purpose, granted in consent_update.purposes.items():
consent_manager.record_consent(
user_id=user_id,
purpose=purpose,
granted=granted,
ip_address=ip_address,
user_agent=user_agent,
consent_form_version="v2.0"
)
# Return updated status
consents = consent_manager.get_current_consent(user_id)
return ConsentResponse(
user_id=user_id,
consents=consents,
last_updated=datetime.utcnow().isoformat()
)
@router.post("/withdraw-all")
async def withdraw_all_consent(request: Request):
"""Withdraw all consent"""
user_id = request.state.user_id
consent_manager = request.app.state.consent_manager
consent_manager.withdraw_all_consent(
user_id=user_id,
ip_address=request.client.host,
user_agent=request.headers.get("user-agent", "unknown")
)
return {"status": "success", "message": "All consent withdrawn"}Right to Erasure Implementation
Data Deletion Service
# erasure_service.py
from datetime import datetime
from typing import List, Dict
from dataclasses import dataclass
import logging
@dataclass
class ErasureRequest:
request_id: str
user_id: str
requested_at: datetime
status: str # pending, processing, completed, failed
tables_processed: List[str]
errors: List[str]
class ErasureService:
"""Implement right to erasure (right to be forgotten)"""
# Tables containing personal data with deletion strategy
DATA_TABLES = {
"users": {
"strategy": "delete",
"cascade_check": ["orders", "comments", "sessions"]
},
"orders": {
"strategy": "anonymize", # Keep for financial records
"fields_to_clear": ["customer_name", "email", "address"],
"anonymize_with": "DELETED_USER"
},
"comments": {
"strategy": "delete"
},
"sessions": {
"strategy": "delete"
},
"audit_logs": {
"strategy": "anonymize", # Required for compliance
"fields_to_clear": ["user_email", "ip_address"],
"anonymize_with": None # NULL
},
"marketing_lists": {
"strategy": "delete"
}
}
def __init__(self, db_connection):
self.db = db_connection
self.logger = logging.getLogger(__name__)
def create_erasure_request(self, user_id: str) -> ErasureRequest:
"""Create new erasure request"""
request_id = f"ER-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{user_id[:8]}"
request = ErasureRequest(
request_id=request_id,
user_id=user_id,
requested_at=datetime.utcnow(),
status="pending",
tables_processed=[],
errors=[]
)
# Store request
self.db.execute("""
INSERT INTO erasure_requests
(request_id, user_id, requested_at, status)
VALUES (%s, %s, %s, %s)
""", (request_id, user_id, request.requested_at, "pending"))
return request
def process_erasure(self, request_id: str) -> ErasureRequest:
"""Process erasure request"""
# Get request
row = self.db.fetchone(
"SELECT * FROM erasure_requests WHERE request_id = %s",
(request_id,)
)
if not row:
raise ValueError(f"Request not found: {request_id}")
request = ErasureRequest(**row)
request.status = "processing"
self._update_request_status(request)
try:
# Process each table
for table_name, config in self.DATA_TABLES.items():
try:
if config["strategy"] == "delete":
self._delete_from_table(table_name, request.user_id)
elif config["strategy"] == "anonymize":
self._anonymize_in_table(
table_name,
request.user_id,
config["fields_to_clear"],
config.get("anonymize_with")
)
request.tables_processed.append(table_name)
except Exception as e:
self.logger.error(f"Error processing {table_name}: {e}")
request.errors.append(f"{table_name}: {str(e)}")
request.status = "completed" if not request.errors else "completed_with_errors"
except Exception as e:
request.status = "failed"
request.errors.append(str(e))
self.logger.error(f"Erasure failed: {e}")
self._update_request_status(request)
self._create_erasure_certificate(request)
return request
def _delete_from_table(self, table: str, user_id: str):
"""Delete user data from table"""
self.db.execute(
f"DELETE FROM {table} WHERE user_id = %s",
(user_id,)
)
self.logger.info(f"Deleted from {table} for user {user_id[:8]}...")
def _anonymize_in_table(
self,
table: str,
user_id: str,
fields: List[str],
replacement: str
):
"""Anonymize user data in table"""
set_clauses = []
for field in fields:
if replacement is None:
set_clauses.append(f"{field} = NULL")
else:
set_clauses.append(f"{field} = %s")
query = f"UPDATE {table} SET {', '.join(set_clauses)} WHERE user_id = %s"
params = []
if replacement:
params.extend([replacement] * len(fields))
params.append(user_id)
self.db.execute(query, tuple(params))
self.logger.info(f"Anonymized {fields} in {table} for user {user_id[:8]}...")
def _create_erasure_certificate(self, request: ErasureRequest) -> Dict:
"""Create certificate of erasure for compliance"""
certificate = {
"certificate_id": f"CERT-{request.request_id}",
"type": "erasure_certificate",
"request_id": request.request_id,
"user_id_hash": hashlib.sha256(request.user_id.encode()).hexdigest(),
"requested_at": request.requested_at.isoformat(),
"completed_at": datetime.utcnow().isoformat(),
"tables_processed": request.tables_processed,
"status": request.status,
"legal_basis": "GDPR Article 17 - Right to Erasure"
}
# Store certificate
self.db.execute("""
INSERT INTO erasure_certificates (certificate_id, data)
VALUES (%s, %s)
""", (certificate["certificate_id"], json.dumps(certificate)))
return certificateData Subject Access Request (DSAR)
Automated DSAR Processing
# dsar_service.py
from datetime import datetime
from typing import Dict, List
import json
import zipfile
import io
class DSARService:
"""Handle Data Subject Access Requests"""
def __init__(self, db_connection, data_discovery):
self.db = db_connection
self.discovery = data_discovery
def create_dsar_request(self, user_id: str, email: str) -> str:
"""Create new DSAR request"""
request_id = f"DSAR-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
self.db.execute("""
INSERT INTO dsar_requests
(request_id, user_id, email, requested_at, status, deadline)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
request_id,
user_id,
email,
datetime.utcnow(),
"pending",
datetime.utcnow() + timedelta(days=30) # GDPR 30-day deadline
))
return request_id
def process_dsar(self, request_id: str) -> bytes:
"""Process DSAR and generate data export"""
# Get request details
request = self.db.fetchone(
"SELECT * FROM dsar_requests WHERE request_id = %s",
(request_id,)
)
if not request:
raise ValueError(f"Request not found: {request_id}")
user_id = request['user_id']
# Collect all personal data
data_export = {
"export_info": {
"request_id": request_id,
"generated_at": datetime.utcnow().isoformat(),
"data_controller": "Your Company Name",
"contact": "privacy@example.com"
},
"personal_data": {},
"consent_history": [],
"processing_purposes": []
}
# Get data from each table
data_map = self.discovery.generate_data_map()
for table, fields in data_map["by_table"].items():
table_data = self._export_table_data(table, fields, user_id)
if table_data:
data_export["personal_data"][table] = table_data
# Get consent history
data_export["consent_history"] = self._get_consent_history(user_id)
# Get processing purposes
data_export["processing_purposes"] = self._get_processing_purposes()
# Generate downloadable package
return self._create_export_package(data_export)
def _export_table_data(
self,
table: str,
fields: List[str],
user_id: str
) -> List[Dict]:
"""Export data from specific table"""
fields_str = ", ".join(fields)
rows = self.db.fetchall(
f"SELECT {fields_str} FROM {table} WHERE user_id = %s",
(user_id,)
)
return [dict(row) for row in rows]
def _get_consent_history(self, user_id: str) -> List[Dict]:
"""Get consent history"""
rows = self.db.fetchall("""
SELECT purpose, granted, timestamp, version
FROM consent_log
WHERE user_id = %s
ORDER BY timestamp DESC
""", (user_id,))
return [dict(row) for row in rows]
def _get_processing_purposes(self) -> List[Dict]:
"""Get data processing purposes documentation"""
return [
{
"purpose": "Service Delivery",
"legal_basis": "Contract",
"data_categories": ["identity", "contact"],
"retention": "Duration of contract + 3 years"
},
{
"purpose": "Marketing",
"legal_basis": "Consent",
"data_categories": ["contact", "preferences"],
"retention": "Until consent withdrawn"
},
{
"purpose": "Analytics",
"legal_basis": "Legitimate Interest",
"data_categories": ["usage", "technical"],
"retention": "2 years"
}
]
def _create_export_package(self, data: Dict) -> bytes:
"""Create downloadable ZIP package"""
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
# Main data file
zf.writestr(
'personal_data.json',
json.dumps(data, indent=2, default=str)
)
# Human-readable summary
summary = self._generate_summary(data)
zf.writestr('README.txt', summary)
# Separate files per data category
for category, records in data["personal_data"].items():
zf.writestr(
f'data/{category}.json',
json.dumps(records, indent=2, default=str)
)
buffer.seek(0)
return buffer.getvalue()
def _generate_summary(self, data: Dict) -> str:
"""Generate human-readable summary"""
summary = f"""
DATA SUBJECT ACCESS REQUEST EXPORT
==================================
Request ID: {data['export_info']['request_id']}
Generated: {data['export_info']['generated_at']}
Data Controller: {data['export_info']['data_controller']}
DATA CATEGORIES INCLUDED:
"""
for category in data["personal_data"].keys():
count = len(data["personal_data"][category])
summary += f"- {category}: {count} records\n"
summary += """
PROCESSING PURPOSES:
"""
for purpose in data["processing_purposes"]:
summary += f"- {purpose['purpose']} (Legal basis: {purpose['legal_basis']})\n"
summary += """
YOUR RIGHTS:
- Right to rectification (correct inaccurate data)
- Right to erasure (request deletion)
- Right to restrict processing
- Right to data portability
- Right to object to processing
- Right to withdraw consent
To exercise these rights, contact: privacy@example.com
"""
return summaryData Encryption Implementation
Field-Level Encryption
# encryption_service.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
import base64
import os
class FieldEncryption:
"""Field-level encryption for personal data"""
def __init__(self, master_key: bytes):
self.master_key = master_key
self._fernet_cache = {}
def _derive_key(self, field_name: str) -> bytes:
"""Derive field-specific key from master key"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=field_name.encode(),
iterations=100000,
backend=default_backend()
)
return base64.urlsafe_b64encode(kdf.derive(self.master_key))
def _get_fernet(self, field_name: str) -> Fernet:
"""Get or create Fernet instance for field"""
if field_name not in self._fernet_cache:
key = self._derive_key(field_name)
self._fernet_cache[field_name] = Fernet(key)
return self._fernet_cache[field_name]
def encrypt(self, field_name: str, value: str) -> str:
"""Encrypt a field value"""
if not value:
return value
fernet = self._get_fernet(field_name)
encrypted = fernet.encrypt(value.encode())
return base64.urlsafe_b64encode(encrypted).decode()
def decrypt(self, field_name: str, encrypted_value: str) -> str:
"""Decrypt a field value"""
if not encrypted_value:
return encrypted_value
fernet = self._get_fernet(field_name)
decoded = base64.urlsafe_b64decode(encrypted_value.encode())
return fernet.decrypt(decoded).decode()
# Database wrapper with automatic encryption
class EncryptedDatabase:
"""Database wrapper with transparent field encryption"""
ENCRYPTED_FIELDS = {
"users": ["email", "phone", "address"],
"orders": ["customer_name", "billing_address"],
"payments": ["card_last_four"]
}
def __init__(self, db_connection, encryption: FieldEncryption):
self.db = db_connection
self.encryption = encryption
def insert(self, table: str, data: Dict) -> None:
"""Insert with automatic encryption"""
encrypted_data = self._encrypt_fields(table, data)
columns = ", ".join(encrypted_data.keys())
placeholders = ", ".join(["%s"] * len(encrypted_data))
self.db.execute(
f"INSERT INTO {table} ({columns}) VALUES ({placeholders})",
tuple(encrypted_data.values())
)
def select(self, table: str, where: Dict) -> List[Dict]:
"""Select with automatic decryption"""
rows = self.db.fetchall(
f"SELECT * FROM {table} WHERE " +
" AND ".join([f"{k} = %s" for k in where.keys()]),
tuple(where.values())
)
return [self._decrypt_fields(table, dict(row)) for row in rows]
def _encrypt_fields(self, table: str, data: Dict) -> Dict:
"""Encrypt sensitive fields"""
result = data.copy()
fields_to_encrypt = self.ENCRYPTED_FIELDS.get(table, [])
for field in fields_to_encrypt:
if field in result and result[field]:
result[field] = self.encryption.encrypt(
f"{table}.{field}",
result[field]
)
return result
def _decrypt_fields(self, table: str, data: Dict) -> Dict:
"""Decrypt sensitive fields"""
result = data.copy()
fields_to_decrypt = self.ENCRYPTED_FIELDS.get(table, [])
for field in fields_to_decrypt:
if field in result and result[field]:
result[field] = self.encryption.decrypt(
f"{table}.{field}",
result[field]
)
return resultSummary
GDPR compliance requires technical implementation across multiple domains:
- Data mapping: Automated discovery and classification of personal data
- Consent management: Granular, auditable consent with proof
- Right to erasure: Systematic deletion and anonymization
- DSAR processing: Automated data export within 30-day deadline
- Encryption: Field-level encryption for sensitive data
These implementations provide the technical foundation for GDPR compliance while maintaining operational efficiency. Regular audits and updates ensure continued compliance as regulations evolve.