Compliance

ISO 27701 Privacy Implementation: Building a Privacy Information Management System

DeviDevs Team
15 min read
#iso-27701#privacy#pims#gdpr#compliance#data-protection#iso-27001

ISO 27701 Privacy Implementation: Building a Privacy Information Management System

ISO 27701 provides a framework for implementing, maintaining, and improving a Privacy Information Management System (PIMS) as an extension to ISO 27001. This guide covers practical implementation of privacy controls for both data controllers and processors.

Understanding ISO 27701

ISO 27701 extends ISO 27001's Information Security Management System (ISMS) to address privacy-specific requirements:

  • Clause 5: PIMS-specific requirements extending ISO 27001
  • Clause 6: PIMS-specific guidance extending ISO 27002
  • Clause 7: Additional guidance for PII controllers
  • Clause 8: Additional guidance for PII processors

Privacy Control Implementation

Data Subject Rights Management

# data_subject_rights.py
"""
Implementation of data subject rights per ISO 27701.
"""
 
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from enum import Enum
import json
import hashlib
import secrets
 
 
class RightType(Enum):
    """Types of data subject rights."""
    ACCESS = "access"
    RECTIFICATION = "rectification"
    ERASURE = "erasure"
    RESTRICTION = "restriction"
    PORTABILITY = "portability"
    OBJECTION = "objection"
    AUTOMATED_DECISION = "automated_decision"
 
 
class RequestStatus(Enum):
    """Status of a data subject request."""
    RECEIVED = "received"
    IDENTITY_VERIFICATION = "identity_verification"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    REJECTED = "rejected"
    EXTENDED = "extended"
 
 
@dataclass
class DataSubjectRequest:
    """Represents a data subject rights request."""
    request_id: str
    subject_id: str
    subject_email: str
    right_type: RightType
    status: RequestStatus
    created_at: datetime
    due_date: datetime
    completed_at: Optional[datetime] = None
    extension_reason: Optional[str] = None
    rejection_reason: Optional[str] = None
    verification_token: Optional[str] = None
    processing_notes: List[str] = field(default_factory=list)
    affected_systems: List[str] = field(default_factory=list)
 
 
class DataSubjectRightsManager:
    """
    Manages data subject rights requests per ISO 27701 7.3.
    """
 
    # Response timeframes (in days)
    STANDARD_RESPONSE_TIME = 30
    MAXIMUM_EXTENSION = 60
 
    def __init__(self, storage_backend, notification_service):
        self.storage = storage_backend
        self.notifications = notification_service
        self.requests: Dict[str, DataSubjectRequest] = {}
 
    def create_request(
        self,
        subject_email: str,
        right_type: RightType,
        additional_info: Optional[Dict] = None
    ) -> DataSubjectRequest:
        """
        Create a new data subject request.
 
        ISO 27701 7.3.2: Request handling
        """
        request_id = f"DSR-{secrets.token_hex(8).upper()}"
        subject_id = self._hash_identifier(subject_email)
        verification_token = secrets.token_urlsafe(32)
 
        request = DataSubjectRequest(
            request_id=request_id,
            subject_id=subject_id,
            subject_email=subject_email,
            right_type=right_type,
            status=RequestStatus.RECEIVED,
            created_at=datetime.utcnow(),
            due_date=datetime.utcnow() + timedelta(days=self.STANDARD_RESPONSE_TIME),
            verification_token=verification_token
        )
 
        self.requests[request_id] = request
        self.storage.save_request(request)
 
        # Send verification email
        self._send_verification_email(request)
 
        # Log for audit trail
        self._log_request_action(request_id, "created", additional_info)
 
        return request
 
    def verify_identity(
        self,
        request_id: str,
        verification_token: str,
        identity_documents: Optional[List[str]] = None
    ) -> bool:
        """
        Verify data subject identity.
 
        ISO 27701 7.3.5: Identity verification
        """
        request = self.requests.get(request_id)
        if not request:
            return False
 
        if request.verification_token != verification_token:
            self._log_request_action(request_id, "verification_failed")
            return False
 
        # For high-risk requests, require additional verification
        high_risk_rights = [RightType.ERASURE, RightType.PORTABILITY]
        if request.right_type in high_risk_rights and not identity_documents:
            request.status = RequestStatus.IDENTITY_VERIFICATION
            request.processing_notes.append(
                "Additional identity verification required for high-risk request"
            )
            return False
 
        request.status = RequestStatus.IN_PROGRESS
        self._log_request_action(request_id, "identity_verified")
 
        return True
 
    def process_access_request(
        self,
        request_id: str,
        data_sources: List[str]
    ) -> Dict[str, Any]:
        """
        Process a data access request.
 
        ISO 27701 7.3.3: Access to PII
        """
        request = self.requests.get(request_id)
        if not request or request.right_type != RightType.ACCESS:
            raise ValueError("Invalid access request")
 
        collected_data = {}
 
        for source in data_sources:
            try:
                data = self._collect_data_from_source(
                    source,
                    request.subject_id
                )
                collected_data[source] = {
                    "data": self._sanitize_for_export(data),
                    "categories": self._categorize_data(data),
                    "purposes": self._get_processing_purposes(source),
                    "retention_period": self._get_retention_period(source)
                }
                request.affected_systems.append(source)
            except Exception as e:
                request.processing_notes.append(
                    f"Error collecting from {source}: {str(e)}"
                )
 
        # Compile response
        response = {
            "request_id": request_id,
            "subject_email": request.subject_email,
            "generated_at": datetime.utcnow().isoformat(),
            "data_collected": collected_data,
            "processing_information": self._get_processing_info(),
            "third_party_recipients": self._get_third_party_list()
        }
 
        self._complete_request(request_id, response)
        return response
 
    def process_erasure_request(
        self,
        request_id: str,
        data_sources: List[str]
    ) -> Dict[str, Any]:
        """
        Process a data erasure request.
 
        ISO 27701 7.3.6: Erasure of PII
        """
        request = self.requests.get(request_id)
        if not request or request.right_type != RightType.ERASURE:
            raise ValueError("Invalid erasure request")
 
        erasure_results = {}
 
        for source in data_sources:
            # Check for legal holds or retention requirements
            retention_check = self._check_retention_requirements(
                source,
                request.subject_id
            )
 
            if retention_check["must_retain"]:
                erasure_results[source] = {
                    "status": "retained",
                    "reason": retention_check["reason"],
                    "retention_until": retention_check["until"]
                }
                request.processing_notes.append(
                    f"Data in {source} retained: {retention_check['reason']}"
                )
            else:
                try:
                    # Perform erasure
                    self._erase_data_from_source(source, request.subject_id)
                    erasure_results[source] = {
                        "status": "erased",
                        "erased_at": datetime.utcnow().isoformat()
                    }
                    request.affected_systems.append(source)
 
                    # Notify processors
                    self._notify_processors_of_erasure(
                        source,
                        request.subject_id
                    )
 
                except Exception as e:
                    erasure_results[source] = {
                        "status": "error",
                        "error": str(e)
                    }
 
        response = {
            "request_id": request_id,
            "erasure_results": erasure_results,
            "completed_at": datetime.utcnow().isoformat()
        }
 
        self._complete_request(request_id, response)
        return response
 
    def process_portability_request(
        self,
        request_id: str,
        data_sources: List[str],
        output_format: str = "json"
    ) -> bytes:
        """
        Process a data portability request.
 
        ISO 27701 7.3.7: PII portability
        """
        request = self.requests.get(request_id)
        if not request or request.right_type != RightType.PORTABILITY:
            raise ValueError("Invalid portability request")
 
        portable_data = {
            "export_metadata": {
                "request_id": request_id,
                "exported_at": datetime.utcnow().isoformat(),
                "format": output_format,
                "schema_version": "1.0"
            },
            "data": {}
        }
 
        for source in data_sources:
            # Only include data provided by subject or generated through use
            data = self._collect_portable_data(source, request.subject_id)
            if data:
                portable_data["data"][source] = data
                request.affected_systems.append(source)
 
        # Convert to requested format
        if output_format == "json":
            export_data = json.dumps(portable_data, indent=2).encode('utf-8')
        elif output_format == "csv":
            export_data = self._convert_to_csv(portable_data)
        else:
            export_data = json.dumps(portable_data).encode('utf-8')
 
        self._complete_request(request_id, {"format": output_format, "size": len(export_data)})
 
        return export_data
 
    def extend_deadline(
        self,
        request_id: str,
        reason: str,
        extension_days: int = 30
    ):
        """
        Extend request deadline when complex.
 
        ISO 27701 7.3.2: Complex requests may be extended
        """
        request = self.requests.get(request_id)
        if not request:
            raise ValueError("Request not found")
 
        if extension_days > self.MAXIMUM_EXTENSION - self.STANDARD_RESPONSE_TIME:
            raise ValueError("Extension exceeds maximum allowed")
 
        original_due = request.due_date
        request.due_date = request.created_at + timedelta(
            days=self.STANDARD_RESPONSE_TIME + extension_days
        )
        request.status = RequestStatus.EXTENDED
        request.extension_reason = reason
 
        # Notify subject
        self._send_extension_notification(request, original_due)
 
        self._log_request_action(request_id, "extended", {"reason": reason})
 
    def reject_request(
        self,
        request_id: str,
        reason: str,
        legal_basis: str
    ):
        """
        Reject a request with proper justification.
 
        ISO 27701 7.3.2: Requests may be rejected in specific circumstances
        """
        request = self.requests.get(request_id)
        if not request:
            raise ValueError("Request not found")
 
        request.status = RequestStatus.REJECTED
        request.rejection_reason = f"{reason} (Legal basis: {legal_basis})"
        request.completed_at = datetime.utcnow()
 
        # Notify subject with appeal information
        self._send_rejection_notification(request)
 
        self._log_request_action(
            request_id,
            "rejected",
            {"reason": reason, "legal_basis": legal_basis}
        )
 
    def _complete_request(self, request_id: str, result: Dict):
        """Mark request as completed."""
        request = self.requests.get(request_id)
        request.status = RequestStatus.COMPLETED
        request.completed_at = datetime.utcnow()
 
        self._send_completion_notification(request, result)
        self._log_request_action(request_id, "completed", result)
 
    def _hash_identifier(self, identifier: str) -> str:
        """Create consistent hash of identifier."""
        return hashlib.sha256(identifier.lower().encode()).hexdigest()
 
    def _log_request_action(
        self,
        request_id: str,
        action: str,
        details: Optional[Dict] = None
    ):
        """Log action for audit trail."""
        self.storage.log_audit_event({
            "timestamp": datetime.utcnow().isoformat(),
            "request_id": request_id,
            "action": action,
            "details": details
        })
 
    # Placeholder methods for system-specific implementation
    def _collect_data_from_source(self, source: str, subject_id: str) -> Dict:
        """Collect data from a specific source system."""
        raise NotImplementedError()
 
    def _erase_data_from_source(self, source: str, subject_id: str):
        """Erase data from a specific source system."""
        raise NotImplementedError()
 
    def _collect_portable_data(self, source: str, subject_id: str) -> Dict:
        """Collect portable data from source."""
        raise NotImplementedError()
 
    def _check_retention_requirements(self, source: str, subject_id: str) -> Dict:
        """Check if data must be retained."""
        raise NotImplementedError()
 
    def _send_verification_email(self, request: DataSubjectRequest):
        """Send identity verification email."""
        raise NotImplementedError()
 
    def _send_extension_notification(self, request: DataSubjectRequest, original_due: datetime):
        """Notify subject of deadline extension."""
        raise NotImplementedError()
 
    def _send_rejection_notification(self, request: DataSubjectRequest):
        """Notify subject of request rejection."""
        raise NotImplementedError()
 
    def _send_completion_notification(self, request: DataSubjectRequest, result: Dict):
        """Notify subject of request completion."""
        raise NotImplementedError()
 
    def _sanitize_for_export(self, data: Dict) -> Dict:
        """Remove sensitive internal fields before export."""
        raise NotImplementedError()
 
    def _categorize_data(self, data: Dict) -> List[str]:
        """Categorize data types in response."""
        raise NotImplementedError()
 
    def _get_processing_purposes(self, source: str) -> List[str]:
        """Get processing purposes for a data source."""
        raise NotImplementedError()
 
    def _get_retention_period(self, source: str) -> str:
        """Get retention period for a data source."""
        raise NotImplementedError()
 
    def _get_processing_info(self) -> Dict:
        """Get general processing information."""
        raise NotImplementedError()
 
    def _get_third_party_list(self) -> List[Dict]:
        """Get list of third party recipients."""
        raise NotImplementedError()
 
    def _notify_processors_of_erasure(self, source: str, subject_id: str):
        """Notify data processors of erasure."""
        raise NotImplementedError()
 
    def _convert_to_csv(self, data: Dict) -> bytes:
        """Convert data to CSV format."""
        raise NotImplementedError()
# consent_management.py
"""
Consent management implementation per ISO 27701.
"""
 
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
import json
 
 
class ConsentPurpose(Enum):
    """Standard consent purposes."""
    MARKETING = "marketing"
    ANALYTICS = "analytics"
    PERSONALIZATION = "personalization"
    THIRD_PARTY_SHARING = "third_party_sharing"
    PROFILING = "profiling"
    AUTOMATED_DECISIONS = "automated_decisions"
    RESEARCH = "research"
    SERVICE_IMPROVEMENT = "service_improvement"
 
 
class ConsentStatus(Enum):
    """Consent status values."""
    GRANTED = "granted"
    DENIED = "denied"
    WITHDRAWN = "withdrawn"
    EXPIRED = "expired"
 
 
@dataclass
class ConsentRecord:
    """Individual consent record."""
    consent_id: str
    subject_id: str
    purpose: ConsentPurpose
    status: ConsentStatus
    granted_at: Optional[datetime]
    withdrawn_at: Optional[datetime]
    expires_at: Optional[datetime]
    version: str
    collection_point: str
    proof_reference: str
 
 
@dataclass
class ConsentPreferences:
    """Complete consent preferences for a subject."""
    subject_id: str
    consents: Dict[ConsentPurpose, ConsentRecord]
    last_updated: datetime
    preference_history: List[Dict]
 
 
class ConsentManager:
    """
    Manages consent lifecycle per ISO 27701 7.2.
    """
 
    def __init__(self, storage_backend, audit_logger):
        self.storage = storage_backend
        self.audit = audit_logger
        self.consent_versions: Dict[ConsentPurpose, str] = {}
 
    def record_consent(
        self,
        subject_id: str,
        purpose: ConsentPurpose,
        granted: bool,
        collection_point: str,
        proof: str,
        expiry_days: Optional[int] = None
    ) -> ConsentRecord:
        """
        Record a consent decision.
 
        ISO 27701 7.2.3: Consent records
        """
        import secrets
 
        consent = ConsentRecord(
            consent_id=f"CON-{secrets.token_hex(8)}",
            subject_id=subject_id,
            purpose=purpose,
            status=ConsentStatus.GRANTED if granted else ConsentStatus.DENIED,
            granted_at=datetime.utcnow() if granted else None,
            withdrawn_at=None,
            expires_at=datetime.utcnow() + timedelta(days=expiry_days) if expiry_days else None,
            version=self.consent_versions.get(purpose, "1.0"),
            collection_point=collection_point,
            proof_reference=proof
        )
 
        self.storage.save_consent(consent)
 
        self.audit.log({
            "event": "consent_recorded",
            "subject_id": subject_id,
            "purpose": purpose.value,
            "status": consent.status.value,
            "collection_point": collection_point
        })
 
        return consent
 
    def withdraw_consent(
        self,
        subject_id: str,
        purpose: ConsentPurpose,
        reason: Optional[str] = None
    ) -> ConsentRecord:
        """
        Withdraw previously granted consent.
 
        ISO 27701 7.2.4: Consent withdrawal
        """
        consent = self.storage.get_consent(subject_id, purpose)
        if not consent:
            raise ValueError("No consent record found")
 
        consent.status = ConsentStatus.WITHDRAWN
        consent.withdrawn_at = datetime.utcnow()
 
        self.storage.save_consent(consent)
 
        # Trigger downstream processing
        self._handle_consent_withdrawal(subject_id, purpose)
 
        self.audit.log({
            "event": "consent_withdrawn",
            "subject_id": subject_id,
            "purpose": purpose.value,
            "reason": reason
        })
 
        return consent
 
    def check_consent(
        self,
        subject_id: str,
        purpose: ConsentPurpose
    ) -> bool:
        """
        Check if valid consent exists.
 
        ISO 27701 7.2.5: Consent verification
        """
        consent = self.storage.get_consent(subject_id, purpose)
 
        if not consent:
            return False
 
        if consent.status != ConsentStatus.GRANTED:
            return False
 
        if consent.expires_at and consent.expires_at < datetime.utcnow():
            consent.status = ConsentStatus.EXPIRED
            self.storage.save_consent(consent)
            return False
 
        # Check if consent version is current
        current_version = self.consent_versions.get(purpose, "1.0")
        if consent.version != current_version:
            self.audit.log({
                "event": "consent_version_mismatch",
                "subject_id": subject_id,
                "purpose": purpose.value,
                "consent_version": consent.version,
                "current_version": current_version
            })
            # May need re-consent depending on changes
            return self._evaluate_version_compatibility(
                consent.version,
                current_version,
                purpose
            )
 
        return True
 
    def get_consent_status(
        self,
        subject_id: str
    ) -> ConsentPreferences:
        """Get complete consent status for a subject."""
        consents = {}
 
        for purpose in ConsentPurpose:
            consent = self.storage.get_consent(subject_id, purpose)
            if consent:
                consents[purpose] = consent
 
        history = self.storage.get_consent_history(subject_id)
 
        return ConsentPreferences(
            subject_id=subject_id,
            consents=consents,
            last_updated=datetime.utcnow(),
            preference_history=history
        )
 
    def update_consent_version(
        self,
        purpose: ConsentPurpose,
        new_version: str,
        changes_description: str,
        requires_reconsent: bool = False
    ):
        """
        Update consent version when terms change.
 
        ISO 27701 7.2.2: Changes to consent
        """
        old_version = self.consent_versions.get(purpose, "1.0")
        self.consent_versions[purpose] = new_version
 
        self.storage.save_version_change({
            "purpose": purpose.value,
            "old_version": old_version,
            "new_version": new_version,
            "changes": changes_description,
            "requires_reconsent": requires_reconsent,
            "effective_date": datetime.utcnow().isoformat()
        })
 
        if requires_reconsent:
            # Invalidate existing consents
            affected = self.storage.get_subjects_with_consent(purpose)
            for subject_id in affected:
                self._request_reconsent(subject_id, purpose, changes_description)
 
    def generate_consent_receipt(
        self,
        consent: ConsentRecord
    ) -> Dict:
        """
        Generate a consent receipt for the data subject.
 
        ISO 27701 7.2.6: Consent receipts
        """
        return {
            "receipt_id": consent.consent_id,
            "version": "1.0",
            "jurisdiction": "global",
            "consent_timestamp": consent.granted_at.isoformat() if consent.granted_at else None,
            "collection_method": consent.collection_point,
            "consent_receipt_id": consent.consent_id,
            "subject": {
                "subject_id": consent.subject_id
            },
            "data_controller": self._get_controller_info(),
            "purposes": [{
                "purpose": consent.purpose.value,
                "purpose_category": self._get_purpose_category(consent.purpose),
                "consent_type": "explicit",
                "pii_category": self._get_pii_categories(consent.purpose),
                "primary_purpose": True,
                "termination": consent.expires_at.isoformat() if consent.expires_at else "none",
                "third_party_disclosure": self._has_third_party_disclosure(consent.purpose)
            }],
            "sensitive": self._is_sensitive_purpose(consent.purpose),
            "spi_category": [],
            "proof": consent.proof_reference
        }
 
    def _handle_consent_withdrawal(
        self,
        subject_id: str,
        purpose: ConsentPurpose
    ):
        """Handle downstream effects of consent withdrawal."""
        # Stop processing
        self.storage.set_processing_flag(subject_id, purpose, False)
 
        # Notify affected systems
        affected_systems = self.storage.get_systems_using_consent(purpose)
        for system in affected_systems:
            self._notify_system_of_withdrawal(system, subject_id, purpose)
 
    def _evaluate_version_compatibility(
        self,
        old_version: str,
        new_version: str,
        purpose: ConsentPurpose
    ) -> bool:
        """Evaluate if old consent version is compatible with new."""
        # Implementation depends on versioning strategy
        # By default, require reconsent for major version changes
        old_major = int(old_version.split('.')[0])
        new_major = int(new_version.split('.')[0])
        return old_major == new_major
 
    def _request_reconsent(
        self,
        subject_id: str,
        purpose: ConsentPurpose,
        reason: str
    ):
        """Request reconsent from subject."""
        self.audit.log({
            "event": "reconsent_requested",
            "subject_id": subject_id,
            "purpose": purpose.value,
            "reason": reason
        })
 
    def _get_controller_info(self) -> Dict:
        """Get data controller information."""
        return {
            "on_behalf": False,
            "contact": "privacy@example.com",
            "company": "Example Corp",
            "address": "123 Privacy St",
            "email": "privacy@example.com",
            "phone": "+1-555-0123"
        }
 
    def _get_purpose_category(self, purpose: ConsentPurpose) -> str:
        """Get purpose category."""
        categories = {
            ConsentPurpose.MARKETING: "marketing",
            ConsentPurpose.ANALYTICS: "core_function",
            ConsentPurpose.PERSONALIZATION: "improved_service"
        }
        return categories.get(purpose, "other")
 
    def _get_pii_categories(self, purpose: ConsentPurpose) -> List[str]:
        """Get PII categories for purpose."""
        return ["contact", "demographic", "behavioral"]
 
    def _has_third_party_disclosure(self, purpose: ConsentPurpose) -> bool:
        """Check if purpose involves third party disclosure."""
        return purpose == ConsentPurpose.THIRD_PARTY_SHARING
 
    def _is_sensitive_purpose(self, purpose: ConsentPurpose) -> bool:
        """Check if purpose involves sensitive data."""
        return purpose in [ConsentPurpose.PROFILING, ConsentPurpose.AUTOMATED_DECISIONS]
 
    def _notify_system_of_withdrawal(
        self,
        system: str,
        subject_id: str,
        purpose: ConsentPurpose
    ):
        """Notify system of consent withdrawal."""
        pass
 
 
from datetime import timedelta

Privacy Impact Assessment

# privacy_impact_assessment.py
"""
Privacy Impact Assessment (PIA) implementation per ISO 27701.
"""
 
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
 
 
class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"
 
 
class PIAStatus(Enum):
    DRAFT = "draft"
    IN_REVIEW = "in_review"
    APPROVED = "approved"
    REQUIRES_CHANGES = "requires_changes"
    REJECTED = "rejected"
 
 
@dataclass
class DataFlow:
    """Describes a data flow in the system."""
    flow_id: str
    source: str
    destination: str
    data_categories: List[str]
    purpose: str
    legal_basis: str
    retention_period: str
    safeguards: List[str]
 
 
@dataclass
class PrivacyRisk:
    """Identified privacy risk."""
    risk_id: str
    description: str
    likelihood: RiskLevel
    impact: RiskLevel
    overall_risk: RiskLevel
    affected_rights: List[str]
    mitigations: List[str]
    residual_risk: RiskLevel
 
 
@dataclass
class PrivacyImpactAssessment:
    """Complete Privacy Impact Assessment."""
    pia_id: str
    project_name: str
    description: str
    status: PIAStatus
    created_at: datetime
    updated_at: datetime
    assessor: str
    reviewer: Optional[str]
 
    # Assessment components
    data_flows: List[DataFlow] = field(default_factory=list)
    risks: List[PrivacyRisk] = field(default_factory=list)
    data_subjects: List[str] = field(default_factory=list)
    special_categories: List[str] = field(default_factory=list)
 
    # Compliance checks
    lawfulness_assessment: Dict = field(default_factory=dict)
    necessity_assessment: Dict = field(default_factory=dict)
    proportionality_assessment: Dict = field(default_factory=dict)
 
    # Results
    recommendation: Optional[str] = None
    conditions: List[str] = field(default_factory=list)
 
 
class PIAManager:
    """
    Manages Privacy Impact Assessments per ISO 27701 7.2.5.
    """
 
    def __init__(self, storage_backend, dpo_email: str):
        self.storage = storage_backend
        self.dpo_email = dpo_email
 
    def create_pia(
        self,
        project_name: str,
        description: str,
        assessor: str
    ) -> PrivacyImpactAssessment:
        """Create a new PIA."""
        import secrets
 
        pia = PrivacyImpactAssessment(
            pia_id=f"PIA-{secrets.token_hex(6).upper()}",
            project_name=project_name,
            description=description,
            status=PIAStatus.DRAFT,
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow(),
            assessor=assessor
        )
 
        self.storage.save_pia(pia)
        return pia
 
    def add_data_flow(
        self,
        pia_id: str,
        source: str,
        destination: str,
        data_categories: List[str],
        purpose: str,
        legal_basis: str,
        retention_period: str,
        safeguards: List[str]
    ) -> DataFlow:
        """Add a data flow to the PIA."""
        import secrets
 
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        flow = DataFlow(
            flow_id=f"DF-{secrets.token_hex(4)}",
            source=source,
            destination=destination,
            data_categories=data_categories,
            purpose=purpose,
            legal_basis=legal_basis,
            retention_period=retention_period,
            safeguards=safeguards
        )
 
        pia.data_flows.append(flow)
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
        return flow
 
    def identify_risk(
        self,
        pia_id: str,
        description: str,
        likelihood: RiskLevel,
        impact: RiskLevel,
        affected_rights: List[str],
        proposed_mitigations: List[str]
    ) -> PrivacyRisk:
        """Identify and document a privacy risk."""
        import secrets
 
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        overall_risk = self._calculate_overall_risk(likelihood, impact)
 
        risk = PrivacyRisk(
            risk_id=f"RISK-{secrets.token_hex(4)}",
            description=description,
            likelihood=likelihood,
            impact=impact,
            overall_risk=overall_risk,
            affected_rights=affected_rights,
            mitigations=proposed_mitigations,
            residual_risk=self._calculate_residual_risk(overall_risk, proposed_mitigations)
        )
 
        pia.risks.append(risk)
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
        return risk
 
    def assess_lawfulness(
        self,
        pia_id: str,
        legal_basis: str,
        justification: str,
        documentation: List[str]
    ):
        """Assess lawfulness of processing."""
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        pia.lawfulness_assessment = {
            "legal_basis": legal_basis,
            "justification": justification,
            "documentation": documentation,
            "assessed_at": datetime.utcnow().isoformat(),
            "compliant": self._validate_legal_basis(legal_basis, pia)
        }
 
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
    def assess_necessity(
        self,
        pia_id: str,
        purpose: str,
        data_minimization: Dict,
        alternatives_considered: List[str]
    ):
        """Assess necessity and data minimization."""
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        pia.necessity_assessment = {
            "purpose": purpose,
            "data_minimization": data_minimization,
            "alternatives_considered": alternatives_considered,
            "assessed_at": datetime.utcnow().isoformat(),
            "compliant": self._validate_necessity(pia)
        }
 
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
    def submit_for_review(self, pia_id: str) -> bool:
        """Submit PIA for DPO review."""
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        # Validate completeness
        if not self._validate_pia_completeness(pia):
            return False
 
        pia.status = PIAStatus.IN_REVIEW
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
        # Notify DPO
        self._notify_dpo_for_review(pia)
 
        return True
 
    def review_pia(
        self,
        pia_id: str,
        reviewer: str,
        approved: bool,
        recommendation: str,
        conditions: Optional[List[str]] = None
    ):
        """DPO review of PIA."""
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        pia.reviewer = reviewer
        pia.recommendation = recommendation
        pia.conditions = conditions or []
 
        if approved:
            pia.status = PIAStatus.APPROVED if not conditions else PIAStatus.REQUIRES_CHANGES
        else:
            pia.status = PIAStatus.REJECTED
 
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
        # Notify assessor
        self._notify_assessor_of_result(pia)
 
    def generate_pia_report(self, pia_id: str) -> Dict:
        """Generate formal PIA report."""
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        return {
            "report_metadata": {
                "pia_id": pia.pia_id,
                "generated_at": datetime.utcnow().isoformat(),
                "status": pia.status.value
            },
            "project_information": {
                "name": pia.project_name,
                "description": pia.description,
                "assessor": pia.assessor,
                "reviewer": pia.reviewer
            },
            "data_processing_overview": {
                "data_subjects": pia.data_subjects,
                "special_categories": pia.special_categories,
                "data_flows": [
                    {
                        "source": f.source,
                        "destination": f.destination,
                        "categories": f.data_categories,
                        "purpose": f.purpose,
                        "legal_basis": f.legal_basis
                    }
                    for f in pia.data_flows
                ]
            },
            "compliance_assessment": {
                "lawfulness": pia.lawfulness_assessment,
                "necessity": pia.necessity_assessment,
                "proportionality": pia.proportionality_assessment
            },
            "risk_assessment": {
                "identified_risks": [
                    {
                        "description": r.description,
                        "overall_risk": r.overall_risk.value,
                        "mitigations": r.mitigations,
                        "residual_risk": r.residual_risk.value
                    }
                    for r in pia.risks
                ],
                "high_risks": len([r for r in pia.risks if r.overall_risk in [RiskLevel.HIGH, RiskLevel.CRITICAL]])
            },
            "recommendation": pia.recommendation,
            "conditions": pia.conditions
        }
 
    def _calculate_overall_risk(
        self,
        likelihood: RiskLevel,
        impact: RiskLevel
    ) -> RiskLevel:
        """Calculate overall risk from likelihood and impact."""
        risk_matrix = {
            (RiskLevel.LOW, RiskLevel.LOW): RiskLevel.LOW,
            (RiskLevel.LOW, RiskLevel.MEDIUM): RiskLevel.LOW,
            (RiskLevel.LOW, RiskLevel.HIGH): RiskLevel.MEDIUM,
            (RiskLevel.LOW, RiskLevel.CRITICAL): RiskLevel.HIGH,
            (RiskLevel.MEDIUM, RiskLevel.LOW): RiskLevel.LOW,
            (RiskLevel.MEDIUM, RiskLevel.MEDIUM): RiskLevel.MEDIUM,
            (RiskLevel.MEDIUM, RiskLevel.HIGH): RiskLevel.HIGH,
            (RiskLevel.MEDIUM, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
            (RiskLevel.HIGH, RiskLevel.LOW): RiskLevel.MEDIUM,
            (RiskLevel.HIGH, RiskLevel.MEDIUM): RiskLevel.HIGH,
            (RiskLevel.HIGH, RiskLevel.HIGH): RiskLevel.CRITICAL,
            (RiskLevel.HIGH, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
            (RiskLevel.CRITICAL, RiskLevel.LOW): RiskLevel.HIGH,
            (RiskLevel.CRITICAL, RiskLevel.MEDIUM): RiskLevel.CRITICAL,
            (RiskLevel.CRITICAL, RiskLevel.HIGH): RiskLevel.CRITICAL,
            (RiskLevel.CRITICAL, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
        }
        return risk_matrix.get((likelihood, impact), RiskLevel.MEDIUM)
 
    def _calculate_residual_risk(
        self,
        overall_risk: RiskLevel,
        mitigations: List[str]
    ) -> RiskLevel:
        """Calculate residual risk after mitigations."""
        # Simplified: each mitigation reduces risk by one level
        risk_levels = [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL]
        current_index = risk_levels.index(overall_risk)
 
        reduction = min(len(mitigations), 2)  # Max 2 level reduction
        new_index = max(0, current_index - reduction)
 
        return risk_levels[new_index]
 
    def _validate_legal_basis(self, legal_basis: str, pia: PrivacyImpactAssessment) -> bool:
        """Validate the legal basis."""
        valid_bases = ["consent", "contract", "legal_obligation", "vital_interests", "public_task", "legitimate_interests"]
        return legal_basis in valid_bases
 
    def _validate_necessity(self, pia: PrivacyImpactAssessment) -> bool:
        """Validate necessity assessment."""
        return bool(pia.necessity_assessment.get("data_minimization"))
 
    def _validate_pia_completeness(self, pia: PrivacyImpactAssessment) -> bool:
        """Check if PIA has all required sections."""
        return all([
            pia.data_flows,
            pia.lawfulness_assessment,
            pia.necessity_assessment,
            pia.data_subjects
        ])
 
    def _notify_dpo_for_review(self, pia: PrivacyImpactAssessment):
        """Notify DPO that PIA is ready for review."""
        pass
 
    def _notify_assessor_of_result(self, pia: PrivacyImpactAssessment):
        """Notify assessor of review result."""
        pass

Conclusion

ISO 27701 implementation requires comprehensive privacy controls covering:

  1. Data subject rights with proper request handling and verification
  2. Consent management with granular tracking and withdrawal support
  3. Privacy impact assessments for new processing activities
  4. Integration with ISO 27001 for unified security and privacy management

These controls enable organizations to demonstrate privacy compliance and build trust with data subjects.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.