Compliance

GDPR Data Protection Engineering: Technical Implementation Guide

DeviDevs Team
12 min read
#GDPR#data protection#privacy#compliance#data engineering

GDPR compliance requires more than policy documents - it demands technical implementation across data storage, processing, and access systems. This guide provides practical engineering solutions for meeting GDPR requirements in modern applications.

Data Mapping and Discovery

Automated Data Discovery

Build systems to identify and classify personal data:

# data_discovery.py
import re
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
import hashlib
 
class DataCategory(Enum):
    IDENTIFIER = "identifier"  # Name, ID numbers
    CONTACT = "contact"  # Email, phone, address
    FINANCIAL = "financial"  # Payment info
    SENSITIVE = "sensitive"  # Health, religion, etc.
    BEHAVIORAL = "behavioral"  # Browsing, preferences
    TECHNICAL = "technical"  # IP, device info
 
@dataclass
class PersonalDataField:
    field_name: str
    table_name: str
    data_category: DataCategory
    is_encrypted: bool
    retention_days: Optional[int]
    legal_basis: str
 
class DataDiscoveryEngine:
    """Automated personal data discovery in databases"""
 
    # Patterns for identifying personal data fields
    FIELD_PATTERNS = {
        DataCategory.IDENTIFIER: [
            r'.*name.*', r'.*_id$', r'.*ssn.*', r'.*passport.*',
            r'.*national.*id.*', r'.*license.*'
        ],
        DataCategory.CONTACT: [
            r'.*email.*', r'.*phone.*', r'.*address.*', r'.*zip.*',
            r'.*postal.*', r'.*city.*', r'.*country.*'
        ],
        DataCategory.FINANCIAL: [
            r'.*card.*', r'.*account.*', r'.*iban.*', r'.*bank.*',
            r'.*payment.*', r'.*billing.*'
        ],
        DataCategory.SENSITIVE: [
            r'.*health.*', r'.*medical.*', r'.*religion.*',
            r'.*ethnicity.*', r'.*political.*', r'.*biometric.*'
        ],
        DataCategory.BEHAVIORAL: [
            r'.*preference.*', r'.*history.*', r'.*activity.*',
            r'.*session.*', r'.*click.*', r'.*view.*'
        ],
        DataCategory.TECHNICAL: [
            r'.*ip.*', r'.*device.*', r'.*browser.*', r'.*user.*agent.*',
            r'.*cookie.*', r'.*fingerprint.*'
        ]
    }
 
    def __init__(self, db_connection):
        self.db = db_connection
 
    def discover_fields(self) -> List[PersonalDataField]:
        """Scan database schema for personal data fields"""
        discovered = []
 
        # Get all tables and columns
        tables = self._get_all_tables()
 
        for table in tables:
            columns = self._get_table_columns(table)
            for column in columns:
                category = self._classify_field(column['name'])
                if category:
                    discovered.append(PersonalDataField(
                        field_name=column['name'],
                        table_name=table,
                        data_category=category,
                        is_encrypted=self._check_encryption(table, column['name']),
                        retention_days=self._get_retention_policy(table),
                        legal_basis=self._determine_legal_basis(category)
                    ))
 
        return discovered
 
    def _classify_field(self, field_name: str) -> Optional[DataCategory]:
        """Classify field based on naming patterns"""
        field_lower = field_name.lower()
 
        for category, patterns in self.FIELD_PATTERNS.items():
            for pattern in patterns:
                if re.match(pattern, field_lower):
                    return category
 
        return None
 
    def _check_encryption(self, table: str, column: str) -> bool:
        """Check if field is encrypted (implementation specific)"""
        # Check for encryption indicators in column metadata
        # This varies by database system
        return False
 
    def _get_retention_policy(self, table: str) -> Optional[int]:
        """Get retention policy for table (from metadata or config)"""
        retention_policies = {
            'users': 365 * 3,  # 3 years
            'orders': 365 * 7,  # 7 years (financial)
            'sessions': 30,  # 30 days
            'logs': 90,  # 90 days
        }
        return retention_policies.get(table)
 
    def _determine_legal_basis(self, category: DataCategory) -> str:
        """Determine appropriate legal basis for processing"""
        legal_bases = {
            DataCategory.IDENTIFIER: "contract",
            DataCategory.CONTACT: "contract",
            DataCategory.FINANCIAL: "legal_obligation",
            DataCategory.SENSITIVE: "explicit_consent",
            DataCategory.BEHAVIORAL: "consent",
            DataCategory.TECHNICAL: "legitimate_interest"
        }
        return legal_bases.get(category, "consent")
 
    def generate_data_map(self) -> Dict:
        """Generate complete data mapping document"""
        fields = self.discover_fields()
 
        data_map = {
            "generated_at": datetime.utcnow().isoformat(),
            "total_fields": len(fields),
            "by_category": {},
            "by_table": {},
            "fields": []
        }
 
        for field in fields:
            # Group by category
            cat = field.data_category.value
            if cat not in data_map["by_category"]:
                data_map["by_category"][cat] = []
            data_map["by_category"][cat].append(field.field_name)
 
            # Group by table
            if field.table_name not in data_map["by_table"]:
                data_map["by_table"][field.table_name] = []
            data_map["by_table"][field.table_name].append(field.field_name)
 
            # Add field details
            data_map["fields"].append({
                "table": field.table_name,
                "field": field.field_name,
                "category": cat,
                "encrypted": field.is_encrypted,
                "retention_days": field.retention_days,
                "legal_basis": field.legal_basis
            })
 
        return data_map
# consent_manager.py
from datetime import datetime
from typing import List, Optional, Dict
from dataclasses import dataclass
import json
import hashlib
 
@dataclass
class ConsentRecord:
    user_id: str
    purpose: str
    granted: bool
    timestamp: datetime
    ip_address: str
    user_agent: str
    version: str  # Consent form version
    proof_hash: str  # Cryptographic proof
 
class ConsentManager:
    """GDPR-compliant consent management"""
 
    PURPOSES = [
        "marketing_email",
        "marketing_sms",
        "analytics",
        "personalization",
        "third_party_sharing",
        "profiling"
    ]
 
    def __init__(self, db_connection):
        self.db = db_connection
 
    def record_consent(
        self,
        user_id: str,
        purpose: str,
        granted: bool,
        ip_address: str,
        user_agent: str,
        consent_form_version: str
    ) -> ConsentRecord:
        """Record user consent with cryptographic proof"""
 
        if purpose not in self.PURPOSES:
            raise ValueError(f"Invalid purpose: {purpose}")
 
        timestamp = datetime.utcnow()
 
        # Create cryptographic proof
        proof_data = {
            "user_id": user_id,
            "purpose": purpose,
            "granted": granted,
            "timestamp": timestamp.isoformat(),
            "version": consent_form_version
        }
        proof_hash = hashlib.sha256(
            json.dumps(proof_data, sort_keys=True).encode()
        ).hexdigest()
 
        record = ConsentRecord(
            user_id=user_id,
            purpose=purpose,
            granted=granted,
            timestamp=timestamp,
            ip_address=self._hash_ip(ip_address),  # Privacy-preserving
            user_agent=user_agent,
            version=consent_form_version,
            proof_hash=proof_hash
        )
 
        # Store in database (append-only log)
        self._store_consent(record)
 
        return record
 
    def _hash_ip(self, ip_address: str) -> str:
        """Hash IP for privacy while maintaining proof"""
        return hashlib.sha256(ip_address.encode()).hexdigest()[:16]
 
    def _store_consent(self, record: ConsentRecord):
        """Store consent in append-only audit log"""
        self.db.execute("""
            INSERT INTO consent_log
            (user_id, purpose, granted, timestamp, ip_hash, user_agent, version, proof_hash)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """, (
            record.user_id,
            record.purpose,
            record.granted,
            record.timestamp,
            record.ip_address,
            record.user_agent,
            record.version,
            record.proof_hash
        ))
 
    def get_current_consent(self, user_id: str) -> Dict[str, bool]:
        """Get current consent status for all purposes"""
        result = {}
 
        for purpose in self.PURPOSES:
            # Get most recent consent record
            row = self.db.fetchone("""
                SELECT granted FROM consent_log
                WHERE user_id = %s AND purpose = %s
                ORDER BY timestamp DESC
                LIMIT 1
            """, (user_id, purpose))
 
            result[purpose] = row['granted'] if row else False
 
        return result
 
    def check_consent(self, user_id: str, purpose: str) -> bool:
        """Check if user has given consent for specific purpose"""
        if purpose not in self.PURPOSES:
            return False
 
        row = self.db.fetchone("""
            SELECT granted FROM consent_log
            WHERE user_id = %s AND purpose = %s
            ORDER BY timestamp DESC
            LIMIT 1
        """, (user_id, purpose))
 
        return row['granted'] if row else False
 
    def get_consent_history(self, user_id: str) -> List[ConsentRecord]:
        """Get full consent history for data subject access request"""
        rows = self.db.fetchall("""
            SELECT * FROM consent_log
            WHERE user_id = %s
            ORDER BY timestamp DESC
        """, (user_id,))
 
        return [ConsentRecord(**row) for row in rows]
 
    def withdraw_all_consent(self, user_id: str, ip_address: str, user_agent: str):
        """Withdraw all consent (right to withdraw)"""
        for purpose in self.PURPOSES:
            self.record_consent(
                user_id=user_id,
                purpose=purpose,
                granted=False,
                ip_address=ip_address,
                user_agent=user_agent,
                consent_form_version="withdrawal"
            )
# consent_api.py
from fastapi import APIRouter, Request, HTTPException
from pydantic import BaseModel
from typing import Dict, List
 
router = APIRouter(prefix="/api/consent", tags=["consent"])
 
class ConsentUpdate(BaseModel):
    purposes: Dict[str, bool]
 
class ConsentResponse(BaseModel):
    user_id: str
    consents: Dict[str, bool]
    last_updated: str
 
@router.get("/", response_model=ConsentResponse)
async def get_consent_status(request: Request):
    """Get current consent status for authenticated user"""
    user_id = request.state.user_id
    consent_manager = request.app.state.consent_manager
 
    consents = consent_manager.get_current_consent(user_id)
 
    return ConsentResponse(
        user_id=user_id,
        consents=consents,
        last_updated=datetime.utcnow().isoformat()
    )
 
@router.post("/", response_model=ConsentResponse)
async def update_consent(
    request: Request,
    consent_update: ConsentUpdate
):
    """Update consent preferences"""
    user_id = request.state.user_id
    consent_manager = request.app.state.consent_manager
 
    ip_address = request.client.host
    user_agent = request.headers.get("user-agent", "unknown")
 
    for purpose, granted in consent_update.purposes.items():
        consent_manager.record_consent(
            user_id=user_id,
            purpose=purpose,
            granted=granted,
            ip_address=ip_address,
            user_agent=user_agent,
            consent_form_version="v2.0"
        )
 
    # Return updated status
    consents = consent_manager.get_current_consent(user_id)
 
    return ConsentResponse(
        user_id=user_id,
        consents=consents,
        last_updated=datetime.utcnow().isoformat()
    )
 
@router.post("/withdraw-all")
async def withdraw_all_consent(request: Request):
    """Withdraw all consent"""
    user_id = request.state.user_id
    consent_manager = request.app.state.consent_manager
 
    consent_manager.withdraw_all_consent(
        user_id=user_id,
        ip_address=request.client.host,
        user_agent=request.headers.get("user-agent", "unknown")
    )
 
    return {"status": "success", "message": "All consent withdrawn"}

Right to Erasure Implementation

Data Deletion Service

# erasure_service.py
from datetime import datetime
from typing import List, Dict
from dataclasses import dataclass
import logging
 
@dataclass
class ErasureRequest:
    request_id: str
    user_id: str
    requested_at: datetime
    status: str  # pending, processing, completed, failed
    tables_processed: List[str]
    errors: List[str]
 
class ErasureService:
    """Implement right to erasure (right to be forgotten)"""
 
    # Tables containing personal data with deletion strategy
    DATA_TABLES = {
        "users": {
            "strategy": "delete",
            "cascade_check": ["orders", "comments", "sessions"]
        },
        "orders": {
            "strategy": "anonymize",  # Keep for financial records
            "fields_to_clear": ["customer_name", "email", "address"],
            "anonymize_with": "DELETED_USER"
        },
        "comments": {
            "strategy": "delete"
        },
        "sessions": {
            "strategy": "delete"
        },
        "audit_logs": {
            "strategy": "anonymize",  # Required for compliance
            "fields_to_clear": ["user_email", "ip_address"],
            "anonymize_with": None  # NULL
        },
        "marketing_lists": {
            "strategy": "delete"
        }
    }
 
    def __init__(self, db_connection):
        self.db = db_connection
        self.logger = logging.getLogger(__name__)
 
    def create_erasure_request(self, user_id: str) -> ErasureRequest:
        """Create new erasure request"""
        request_id = f"ER-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{user_id[:8]}"
 
        request = ErasureRequest(
            request_id=request_id,
            user_id=user_id,
            requested_at=datetime.utcnow(),
            status="pending",
            tables_processed=[],
            errors=[]
        )
 
        # Store request
        self.db.execute("""
            INSERT INTO erasure_requests
            (request_id, user_id, requested_at, status)
            VALUES (%s, %s, %s, %s)
        """, (request_id, user_id, request.requested_at, "pending"))
 
        return request
 
    def process_erasure(self, request_id: str) -> ErasureRequest:
        """Process erasure request"""
        # Get request
        row = self.db.fetchone(
            "SELECT * FROM erasure_requests WHERE request_id = %s",
            (request_id,)
        )
 
        if not row:
            raise ValueError(f"Request not found: {request_id}")
 
        request = ErasureRequest(**row)
        request.status = "processing"
        self._update_request_status(request)
 
        try:
            # Process each table
            for table_name, config in self.DATA_TABLES.items():
                try:
                    if config["strategy"] == "delete":
                        self._delete_from_table(table_name, request.user_id)
                    elif config["strategy"] == "anonymize":
                        self._anonymize_in_table(
                            table_name,
                            request.user_id,
                            config["fields_to_clear"],
                            config.get("anonymize_with")
                        )
 
                    request.tables_processed.append(table_name)
 
                except Exception as e:
                    self.logger.error(f"Error processing {table_name}: {e}")
                    request.errors.append(f"{table_name}: {str(e)}")
 
            request.status = "completed" if not request.errors else "completed_with_errors"
 
        except Exception as e:
            request.status = "failed"
            request.errors.append(str(e))
            self.logger.error(f"Erasure failed: {e}")
 
        self._update_request_status(request)
        self._create_erasure_certificate(request)
 
        return request
 
    def _delete_from_table(self, table: str, user_id: str):
        """Delete user data from table"""
        self.db.execute(
            f"DELETE FROM {table} WHERE user_id = %s",
            (user_id,)
        )
        self.logger.info(f"Deleted from {table} for user {user_id[:8]}...")
 
    def _anonymize_in_table(
        self,
        table: str,
        user_id: str,
        fields: List[str],
        replacement: str
    ):
        """Anonymize user data in table"""
        set_clauses = []
        for field in fields:
            if replacement is None:
                set_clauses.append(f"{field} = NULL")
            else:
                set_clauses.append(f"{field} = %s")
 
        query = f"UPDATE {table} SET {', '.join(set_clauses)} WHERE user_id = %s"
 
        params = []
        if replacement:
            params.extend([replacement] * len(fields))
        params.append(user_id)
 
        self.db.execute(query, tuple(params))
        self.logger.info(f"Anonymized {fields} in {table} for user {user_id[:8]}...")
 
    def _create_erasure_certificate(self, request: ErasureRequest) -> Dict:
        """Create certificate of erasure for compliance"""
        certificate = {
            "certificate_id": f"CERT-{request.request_id}",
            "type": "erasure_certificate",
            "request_id": request.request_id,
            "user_id_hash": hashlib.sha256(request.user_id.encode()).hexdigest(),
            "requested_at": request.requested_at.isoformat(),
            "completed_at": datetime.utcnow().isoformat(),
            "tables_processed": request.tables_processed,
            "status": request.status,
            "legal_basis": "GDPR Article 17 - Right to Erasure"
        }
 
        # Store certificate
        self.db.execute("""
            INSERT INTO erasure_certificates (certificate_id, data)
            VALUES (%s, %s)
        """, (certificate["certificate_id"], json.dumps(certificate)))
 
        return certificate

Data Subject Access Request (DSAR)

Automated DSAR Processing

# dsar_service.py
from datetime import datetime
from typing import Dict, List
import json
import zipfile
import io
 
class DSARService:
    """Handle Data Subject Access Requests"""
 
    def __init__(self, db_connection, data_discovery):
        self.db = db_connection
        self.discovery = data_discovery
 
    def create_dsar_request(self, user_id: str, email: str) -> str:
        """Create new DSAR request"""
        request_id = f"DSAR-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}"
 
        self.db.execute("""
            INSERT INTO dsar_requests
            (request_id, user_id, email, requested_at, status, deadline)
            VALUES (%s, %s, %s, %s, %s, %s)
        """, (
            request_id,
            user_id,
            email,
            datetime.utcnow(),
            "pending",
            datetime.utcnow() + timedelta(days=30)  # GDPR 30-day deadline
        ))
 
        return request_id
 
    def process_dsar(self, request_id: str) -> bytes:
        """Process DSAR and generate data export"""
        # Get request details
        request = self.db.fetchone(
            "SELECT * FROM dsar_requests WHERE request_id = %s",
            (request_id,)
        )
 
        if not request:
            raise ValueError(f"Request not found: {request_id}")
 
        user_id = request['user_id']
 
        # Collect all personal data
        data_export = {
            "export_info": {
                "request_id": request_id,
                "generated_at": datetime.utcnow().isoformat(),
                "data_controller": "Your Company Name",
                "contact": "privacy@example.com"
            },
            "personal_data": {},
            "consent_history": [],
            "processing_purposes": []
        }
 
        # Get data from each table
        data_map = self.discovery.generate_data_map()
 
        for table, fields in data_map["by_table"].items():
            table_data = self._export_table_data(table, fields, user_id)
            if table_data:
                data_export["personal_data"][table] = table_data
 
        # Get consent history
        data_export["consent_history"] = self._get_consent_history(user_id)
 
        # Get processing purposes
        data_export["processing_purposes"] = self._get_processing_purposes()
 
        # Generate downloadable package
        return self._create_export_package(data_export)
 
    def _export_table_data(
        self,
        table: str,
        fields: List[str],
        user_id: str
    ) -> List[Dict]:
        """Export data from specific table"""
        fields_str = ", ".join(fields)
 
        rows = self.db.fetchall(
            f"SELECT {fields_str} FROM {table} WHERE user_id = %s",
            (user_id,)
        )
 
        return [dict(row) for row in rows]
 
    def _get_consent_history(self, user_id: str) -> List[Dict]:
        """Get consent history"""
        rows = self.db.fetchall("""
            SELECT purpose, granted, timestamp, version
            FROM consent_log
            WHERE user_id = %s
            ORDER BY timestamp DESC
        """, (user_id,))
 
        return [dict(row) for row in rows]
 
    def _get_processing_purposes(self) -> List[Dict]:
        """Get data processing purposes documentation"""
        return [
            {
                "purpose": "Service Delivery",
                "legal_basis": "Contract",
                "data_categories": ["identity", "contact"],
                "retention": "Duration of contract + 3 years"
            },
            {
                "purpose": "Marketing",
                "legal_basis": "Consent",
                "data_categories": ["contact", "preferences"],
                "retention": "Until consent withdrawn"
            },
            {
                "purpose": "Analytics",
                "legal_basis": "Legitimate Interest",
                "data_categories": ["usage", "technical"],
                "retention": "2 years"
            }
        ]
 
    def _create_export_package(self, data: Dict) -> bytes:
        """Create downloadable ZIP package"""
        buffer = io.BytesIO()
 
        with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
            # Main data file
            zf.writestr(
                'personal_data.json',
                json.dumps(data, indent=2, default=str)
            )
 
            # Human-readable summary
            summary = self._generate_summary(data)
            zf.writestr('README.txt', summary)
 
            # Separate files per data category
            for category, records in data["personal_data"].items():
                zf.writestr(
                    f'data/{category}.json',
                    json.dumps(records, indent=2, default=str)
                )
 
        buffer.seek(0)
        return buffer.getvalue()
 
    def _generate_summary(self, data: Dict) -> str:
        """Generate human-readable summary"""
        summary = f"""
DATA SUBJECT ACCESS REQUEST EXPORT
==================================
 
Request ID: {data['export_info']['request_id']}
Generated: {data['export_info']['generated_at']}
Data Controller: {data['export_info']['data_controller']}
 
DATA CATEGORIES INCLUDED:
"""
 
        for category in data["personal_data"].keys():
            count = len(data["personal_data"][category])
            summary += f"- {category}: {count} records\n"
 
        summary += """
PROCESSING PURPOSES:
"""
        for purpose in data["processing_purposes"]:
            summary += f"- {purpose['purpose']} (Legal basis: {purpose['legal_basis']})\n"
 
        summary += """
YOUR RIGHTS:
- Right to rectification (correct inaccurate data)
- Right to erasure (request deletion)
- Right to restrict processing
- Right to data portability
- Right to object to processing
- Right to withdraw consent
 
To exercise these rights, contact: privacy@example.com
"""
 
        return summary

Data Encryption Implementation

Field-Level Encryption

# encryption_service.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
import base64
import os
 
class FieldEncryption:
    """Field-level encryption for personal data"""
 
    def __init__(self, master_key: bytes):
        self.master_key = master_key
        self._fernet_cache = {}
 
    def _derive_key(self, field_name: str) -> bytes:
        """Derive field-specific key from master key"""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=field_name.encode(),
            iterations=100000,
            backend=default_backend()
        )
        return base64.urlsafe_b64encode(kdf.derive(self.master_key))
 
    def _get_fernet(self, field_name: str) -> Fernet:
        """Get or create Fernet instance for field"""
        if field_name not in self._fernet_cache:
            key = self._derive_key(field_name)
            self._fernet_cache[field_name] = Fernet(key)
        return self._fernet_cache[field_name]
 
    def encrypt(self, field_name: str, value: str) -> str:
        """Encrypt a field value"""
        if not value:
            return value
 
        fernet = self._get_fernet(field_name)
        encrypted = fernet.encrypt(value.encode())
        return base64.urlsafe_b64encode(encrypted).decode()
 
    def decrypt(self, field_name: str, encrypted_value: str) -> str:
        """Decrypt a field value"""
        if not encrypted_value:
            return encrypted_value
 
        fernet = self._get_fernet(field_name)
        decoded = base64.urlsafe_b64decode(encrypted_value.encode())
        return fernet.decrypt(decoded).decode()
 
 
# Database wrapper with automatic encryption
class EncryptedDatabase:
    """Database wrapper with transparent field encryption"""
 
    ENCRYPTED_FIELDS = {
        "users": ["email", "phone", "address"],
        "orders": ["customer_name", "billing_address"],
        "payments": ["card_last_four"]
    }
 
    def __init__(self, db_connection, encryption: FieldEncryption):
        self.db = db_connection
        self.encryption = encryption
 
    def insert(self, table: str, data: Dict) -> None:
        """Insert with automatic encryption"""
        encrypted_data = self._encrypt_fields(table, data)
 
        columns = ", ".join(encrypted_data.keys())
        placeholders = ", ".join(["%s"] * len(encrypted_data))
 
        self.db.execute(
            f"INSERT INTO {table} ({columns}) VALUES ({placeholders})",
            tuple(encrypted_data.values())
        )
 
    def select(self, table: str, where: Dict) -> List[Dict]:
        """Select with automatic decryption"""
        rows = self.db.fetchall(
            f"SELECT * FROM {table} WHERE " +
            " AND ".join([f"{k} = %s" for k in where.keys()]),
            tuple(where.values())
        )
 
        return [self._decrypt_fields(table, dict(row)) for row in rows]
 
    def _encrypt_fields(self, table: str, data: Dict) -> Dict:
        """Encrypt sensitive fields"""
        result = data.copy()
        fields_to_encrypt = self.ENCRYPTED_FIELDS.get(table, [])
 
        for field in fields_to_encrypt:
            if field in result and result[field]:
                result[field] = self.encryption.encrypt(
                    f"{table}.{field}",
                    result[field]
                )
 
        return result
 
    def _decrypt_fields(self, table: str, data: Dict) -> Dict:
        """Decrypt sensitive fields"""
        result = data.copy()
        fields_to_decrypt = self.ENCRYPTED_FIELDS.get(table, [])
 
        for field in fields_to_decrypt:
            if field in result and result[field]:
                result[field] = self.encryption.decrypt(
                    f"{table}.{field}",
                    result[field]
                )
 
        return result

Summary

GDPR compliance requires technical implementation across multiple domains:

  1. Data mapping: Automated discovery and classification of personal data
  2. Consent management: Granular, auditable consent with proof
  3. Right to erasure: Systematic deletion and anonymization
  4. DSAR processing: Automated data export within 30-day deadline
  5. Encryption: Field-level encryption for sensitive data

These implementations provide the technical foundation for GDPR compliance while maintaining operational efficiency. Regular audits and updates ensure continued compliance as regulations evolve.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.