Compliance

ISO 27701: sistem de management al confidentialitatii

Nicu Constantin
--16 min lectura
#iso-27701#privacy#pims#gdpr#compliance#data-protection#iso-27001

Implementarea ISO 27701 pentru confidentialitate: Construirea unui sistem de management al informatiilor de confidentialitate

ISO 27701 ofera un cadru pentru implementarea, mentinerea si imbunatatirea unui Sistem de Management al Informatiilor de Confidentialitate (PIMS) ca extensie a ISO 27001. Acest ghid acopera implementarea practica a controalelor de confidentialitate atat pentru operatorii de date, cat si pentru persoanele imputernicite.

Intelegerea ISO 27701

ISO 27701 extinde Sistemul de Management al Securitatii Informatiilor (ISMS) al ISO 27001 pentru a adresa cerinte specifice de confidentialitate:

  • Clauza 5: Cerinte specifice PIMS care extind ISO 27001
  • Clauza 6: Indrumari specifice PIMS care extind ISO 27002
  • Clauza 7: Indrumari suplimentare pentru operatorii de PII
  • Clauza 8: Indrumari suplimentare pentru persoanele imputernicite de PII

Implementarea controalelor de confidentialitate

Gestionarea drepturilor persoanelor vizate

# data_subject_rights.py
"""
Implementation of data subject rights per ISO 27701.
"""
 
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from enum import Enum
import json
import hashlib
import secrets
 
 
class RightType(Enum):
    """Tipuri de drepturi ale persoanelor vizate."""
    ACCESS = "access"
    RECTIFICATION = "rectification"
    ERASURE = "erasure"
    RESTRICTION = "restriction"
    PORTABILITY = "portability"
    OBJECTION = "objection"
    AUTOMATED_DECISION = "automated_decision"
 
 
class RequestStatus(Enum):
    """Starea unei cereri a persoanei vizate."""
    RECEIVED = "received"
    IDENTITY_VERIFICATION = "identity_verification"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    REJECTED = "rejected"
    EXTENDED = "extended"
 
 
@dataclass
class DataSubjectRequest:
    """Reprezinta o cerere de exercitare a drepturilor persoanei vizate."""
    request_id: str
    subject_id: str
    subject_email: str
    right_type: RightType
    status: RequestStatus
    created_at: datetime
    due_date: datetime
    completed_at: Optional[datetime] = None
    extension_reason: Optional[str] = None
    rejection_reason: Optional[str] = None
    verification_token: Optional[str] = None
    processing_notes: List[str] = field(default_factory=list)
    affected_systems: List[str] = field(default_factory=list)
 
 
class DataSubjectRightsManager:
    """
    Gestioneaza cererile de exercitare a drepturilor persoanelor vizate conform ISO 27701 7.3.
    """
 
    # Termene de raspuns (in zile)
    STANDARD_RESPONSE_TIME = 30
    MAXIMUM_EXTENSION = 60
 
    def __init__(self, storage_backend, notification_service):
        self.storage = storage_backend
        self.notifications = notification_service
        self.requests: Dict[str, DataSubjectRequest] = {}
 
    def create_request(
        self,
        subject_email: str,
        right_type: RightType,
        additional_info: Optional[Dict] = None
    ) -> DataSubjectRequest:
        """
        Creeaza o noua cerere a persoanei vizate.
 
        ISO 27701 7.3.2: Gestionarea cererilor
        """
        request_id = f"DSR-{secrets.token_hex(8).upper()}"
        subject_id = self._hash_identifier(subject_email)
        verification_token = secrets.token_urlsafe(32)
 
        request = DataSubjectRequest(
            request_id=request_id,
            subject_id=subject_id,
            subject_email=subject_email,
            right_type=right_type,
            status=RequestStatus.RECEIVED,
            created_at=datetime.utcnow(),
            due_date=datetime.utcnow() + timedelta(days=self.STANDARD_RESPONSE_TIME),
            verification_token=verification_token
        )
 
        self.requests[request_id] = request
        self.storage.save_request(request)
 
        # Trimite email de verificare
        self._send_verification_email(request)
 
        # Logare pentru pista de audit
        self._log_request_action(request_id, "created", additional_info)
 
        return request
 
    def verify_identity(
        self,
        request_id: str,
        verification_token: str,
        identity_documents: Optional[List[str]] = None
    ) -> bool:
        """
        Verifica identitatea persoanei vizate.
 
        ISO 27701 7.3.5: Verificarea identitatii
        """
        request = self.requests.get(request_id)
        if not request:
            return False
 
        if request.verification_token != verification_token:
            self._log_request_action(request_id, "verification_failed")
            return False
 
        # Pentru cereri cu risc ridicat, necesita verificare suplimentara
        high_risk_rights = [RightType.ERASURE, RightType.PORTABILITY]
        if request.right_type in high_risk_rights and not identity_documents:
            request.status = RequestStatus.IDENTITY_VERIFICATION
            request.processing_notes.append(
                "Additional identity verification required for high-risk request"
            )
            return False
 
        request.status = RequestStatus.IN_PROGRESS
        self._log_request_action(request_id, "identity_verified")
 
        return True
 
    def process_access_request(
        self,
        request_id: str,
        data_sources: List[str]
    ) -> Dict[str, Any]:
        """
        Proceseaza o cerere de acces la date.
 
        ISO 27701 7.3.3: Accesul la PII
        """
        request = self.requests.get(request_id)
        if not request or request.right_type != RightType.ACCESS:
            raise ValueError("Invalid access request")
 
        collected_data = {}
 
        for source in data_sources:
            try:
                data = self._collect_data_from_source(
                    source,
                    request.subject_id
                )
                collected_data[source] = {
                    "data": self._sanitize_for_export(data),
                    "categories": self._categorize_data(data),
                    "purposes": self._get_processing_purposes(source),
                    "retention_period": self._get_retention_period(source)
                }
                request.affected_systems.append(source)
            except Exception as e:
                request.processing_notes.append(
                    f"Error collecting from {source}: {str(e)}"
                )
 
        # Compileaza raspunsul
        response = {
            "request_id": request_id,
            "subject_email": request.subject_email,
            "generated_at": datetime.utcnow().isoformat(),
            "data_collected": collected_data,
            "processing_information": self._get_processing_info(),
            "third_party_recipients": self._get_third_party_list()
        }
 
        self._complete_request(request_id, response)
        return response
 
    def process_erasure_request(
        self,
        request_id: str,
        data_sources: List[str]
    ) -> Dict[str, Any]:
        """
        Proceseaza o cerere de stergere a datelor.
 
        ISO 27701 7.3.6: Stergerea PII
        """
        request = self.requests.get(request_id)
        if not request or request.right_type != RightType.ERASURE:
            raise ValueError("Invalid erasure request")
 
        erasure_results = {}
 
        for source in data_sources:
            # Verificare suspendari legale sau cerinte de retentie
            retention_check = self._check_retention_requirements(
                source,
                request.subject_id
            )
 
            if retention_check["must_retain"]:
                erasure_results[source] = {
                    "status": "retained",
                    "reason": retention_check["reason"],
                    "retention_until": retention_check["until"]
                }
                request.processing_notes.append(
                    f"Data in {source} retained: {retention_check['reason']}"
                )
            else:
                try:
                    # Efectueaza stergerea
                    self._erase_data_from_source(source, request.subject_id)
                    erasure_results[source] = {
                        "status": "erased",
                        "erased_at": datetime.utcnow().isoformat()
                    }
                    request.affected_systems.append(source)
 
                    # Notifica persoanele imputernicite
                    self._notify_processors_of_erasure(
                        source,
                        request.subject_id
                    )
 
                except Exception as e:
                    erasure_results[source] = {
                        "status": "error",
                        "error": str(e)
                    }
 
        response = {
            "request_id": request_id,
            "erasure_results": erasure_results,
            "completed_at": datetime.utcnow().isoformat()
        }
 
        self._complete_request(request_id, response)
        return response
 
    def process_portability_request(
        self,
        request_id: str,
        data_sources: List[str],
        output_format: str = "json"
    ) -> bytes:
        """
        Proceseaza o cerere de portabilitate a datelor.
 
        ISO 27701 7.3.7: Portabilitatea PII
        """
        request = self.requests.get(request_id)
        if not request or request.right_type != RightType.PORTABILITY:
            raise ValueError("Invalid portability request")
 
        portable_data = {
            "export_metadata": {
                "request_id": request_id,
                "exported_at": datetime.utcnow().isoformat(),
                "format": output_format,
                "schema_version": "1.0"
            },
            "data": {}
        }
 
        for source in data_sources:
            # Include doar datele furnizate de subiect sau generate prin utilizare
            data = self._collect_portable_data(source, request.subject_id)
            if data:
                portable_data["data"][source] = data
                request.affected_systems.append(source)
 
        # Converteste in formatul cerut
        if output_format == "json":
            export_data = json.dumps(portable_data, indent=2).encode('utf-8')
        elif output_format == "csv":
            export_data = self._convert_to_csv(portable_data)
        else:
            export_data = json.dumps(portable_data).encode('utf-8')
 
        self._complete_request(request_id, {"format": output_format, "size": len(export_data)})
 
        return export_data
 
    def extend_deadline(
        self,
        request_id: str,
        reason: str,
        extension_days: int = 30
    ):
        """
        Prelungeste termenul cererii cand este complexa.
 
        ISO 27701 7.3.2: Cererile complexe pot fi prelungite
        """
        request = self.requests.get(request_id)
        if not request:
            raise ValueError("Request not found")
 
        if extension_days > self.MAXIMUM_EXTENSION - self.STANDARD_RESPONSE_TIME:
            raise ValueError("Extension exceeds maximum allowed")
 
        original_due = request.due_date
        request.due_date = request.created_at + timedelta(
            days=self.STANDARD_RESPONSE_TIME + extension_days
        )
        request.status = RequestStatus.EXTENDED
        request.extension_reason = reason
 
        # Notifica subiectul
        self._send_extension_notification(request, original_due)
 
        self._log_request_action(request_id, "extended", {"reason": reason})
 
    def reject_request(
        self,
        request_id: str,
        reason: str,
        legal_basis: str
    ):
        """
        Respinge o cerere cu justificare corespunzatoare.
 
        ISO 27701 7.3.2: Cererile pot fi respinse in circumstante specifice
        """
        request = self.requests.get(request_id)
        if not request:
            raise ValueError("Request not found")
 
        request.status = RequestStatus.REJECTED
        request.rejection_reason = f"{reason} (Legal basis: {legal_basis})"
        request.completed_at = datetime.utcnow()
 
        # Notifica subiectul cu informatii despre contestare
        self._send_rejection_notification(request)
 
        self._log_request_action(
            request_id,
            "rejected",
            {"reason": reason, "legal_basis": legal_basis}
        )
 
    def _complete_request(self, request_id: str, result: Dict):
        """Marcheaza cererea ca finalizata."""
        request = self.requests.get(request_id)
        request.status = RequestStatus.COMPLETED
        request.completed_at = datetime.utcnow()
 
        self._send_completion_notification(request, result)
        self._log_request_action(request_id, "completed", result)
 
    def _hash_identifier(self, identifier: str) -> str:
        """Creeaza hash consistent al identificatorului."""
        return hashlib.sha256(identifier.lower().encode()).hexdigest()
 
    def _log_request_action(
        self,
        request_id: str,
        action: str,
        details: Optional[Dict] = None
    ):
        """Logheaza actiunea pentru pista de audit."""
        self.storage.log_audit_event({
            "timestamp": datetime.utcnow().isoformat(),
            "request_id": request_id,
            "action": action,
            "details": details
        })
 
    # Metode placeholder pentru implementare specifica sistemului
    def _collect_data_from_source(self, source: str, subject_id: str) -> Dict:
        """Colecteaza date dintr-un sistem sursa specific."""
        raise NotImplementedError()
 
    def _erase_data_from_source(self, source: str, subject_id: str):
        """Sterge date dintr-un sistem sursa specific."""
        raise NotImplementedError()
 
    def _collect_portable_data(self, source: str, subject_id: str) -> Dict:
        """Colecteaza date portabile din sursa."""
        raise NotImplementedError()
 
    def _check_retention_requirements(self, source: str, subject_id: str) -> Dict:
        """Verifica daca datele trebuie pastrate."""
        raise NotImplementedError()
 
    def _send_verification_email(self, request: DataSubjectRequest):
        """Trimite email de verificare a identitatii."""
        raise NotImplementedError()
 
    def _send_extension_notification(self, request: DataSubjectRequest, original_due: datetime):
        """Notifica subiectul despre prelungirea termenului."""
        raise NotImplementedError()
 
    def _send_rejection_notification(self, request: DataSubjectRequest):
        """Notifica subiectul despre respingerea cererii."""
        raise NotImplementedError()
 
    def _send_completion_notification(self, request: DataSubjectRequest, result: Dict):
        """Notifica subiectul despre finalizarea cererii."""
        raise NotImplementedError()
 
    def _sanitize_for_export(self, data: Dict) -> Dict:
        """Elimina campurile interne sensibile inainte de export."""
        raise NotImplementedError()
 
    def _categorize_data(self, data: Dict) -> List[str]:
        """Categorizeaza tipurile de date din raspuns."""
        raise NotImplementedError()
 
    def _get_processing_purposes(self, source: str) -> List[str]:
        """Obtine scopurile prelucrarii pentru o sursa de date."""
        raise NotImplementedError()
 
    def _get_retention_period(self, source: str) -> str:
        """Obtine perioada de retentie pentru o sursa de date."""
        raise NotImplementedError()
 
    def _get_processing_info(self) -> Dict:
        """Obtine informatii generale despre prelucrare."""
        raise NotImplementedError()
 
    def _get_third_party_list(self) -> List[Dict]:
        """Obtine lista destinatarilor terti."""
        raise NotImplementedError()
 
    def _notify_processors_of_erasure(self, source: str, subject_id: str):
        """Notifica persoanele imputernicite despre stergere."""
        raise NotImplementedError()
 
    def _convert_to_csv(self, data: Dict) -> bytes:
        """Converteste datele in format CSV."""
        raise NotImplementedError()

Gestionarea consimtamantului

# consent_management.py
"""
Consent management implementation per ISO 27701.
"""
 
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
import json
 
 
class ConsentPurpose(Enum):
    """Scopuri standard ale consimtamantului."""
    MARKETING = "marketing"
    ANALYTICS = "analytics"
    PERSONALIZATION = "personalization"
    THIRD_PARTY_SHARING = "third_party_sharing"
    PROFILING = "profiling"
    AUTOMATED_DECISIONS = "automated_decisions"
    RESEARCH = "research"
    SERVICE_IMPROVEMENT = "service_improvement"
 
 
class ConsentStatus(Enum):
    """Valori ale starii consimtamantului."""
    GRANTED = "granted"
    DENIED = "denied"
    WITHDRAWN = "withdrawn"
    EXPIRED = "expired"
 
 
@dataclass
class ConsentRecord:
    """Inregistrare individuala de consimtamant."""
    consent_id: str
    subject_id: str
    purpose: ConsentPurpose
    status: ConsentStatus
    granted_at: Optional[datetime]
    withdrawn_at: Optional[datetime]
    expires_at: Optional[datetime]
    version: str
    collection_point: str
    proof_reference: str
 
 
@dataclass
class ConsentPreferences:
    """Preferinte complete de consimtamant pentru un subiect."""
    subject_id: str
    consents: Dict[ConsentPurpose, ConsentRecord]
    last_updated: datetime
    preference_history: List[Dict]
 
 
class ConsentManager:
    """
    Gestioneaza ciclul de viata al consimtamantului conform ISO 27701 7.2.
    """
 
    def __init__(self, storage_backend, audit_logger):
        self.storage = storage_backend
        self.audit = audit_logger
        self.consent_versions: Dict[ConsentPurpose, str] = {}
 
    def record_consent(
        self,
        subject_id: str,
        purpose: ConsentPurpose,
        granted: bool,
        collection_point: str,
        proof: str,
        expiry_days: Optional[int] = None
    ) -> ConsentRecord:
        """
        Inregistreaza o decizie de consimtamant.
 
        ISO 27701 7.2.3: Inregistrari de consimtamant
        """
        import secrets
 
        consent = ConsentRecord(
            consent_id=f"CON-{secrets.token_hex(8)}",
            subject_id=subject_id,
            purpose=purpose,
            status=ConsentStatus.GRANTED if granted else ConsentStatus.DENIED,
            granted_at=datetime.utcnow() if granted else None,
            withdrawn_at=None,
            expires_at=datetime.utcnow() + timedelta(days=expiry_days) if expiry_days else None,
            version=self.consent_versions.get(purpose, "1.0"),
            collection_point=collection_point,
            proof_reference=proof
        )
 
        self.storage.save_consent(consent)
 
        self.audit.log({
            "event": "consent_recorded",
            "subject_id": subject_id,
            "purpose": purpose.value,
            "status": consent.status.value,
            "collection_point": collection_point
        })
 
        return consent
 
    def withdraw_consent(
        self,
        subject_id: str,
        purpose: ConsentPurpose,
        reason: Optional[str] = None
    ) -> ConsentRecord:
        """
        Retrage un consimtamant acordat anterior.
 
        ISO 27701 7.2.4: Retragerea consimtamantului
        """
        consent = self.storage.get_consent(subject_id, purpose)
        if not consent:
            raise ValueError("No consent record found")
 
        consent.status = ConsentStatus.WITHDRAWN
        consent.withdrawn_at = datetime.utcnow()
 
        self.storage.save_consent(consent)
 
        # Declanseaza prelucrarea downstream
        self._handle_consent_withdrawal(subject_id, purpose)
 
        self.audit.log({
            "event": "consent_withdrawn",
            "subject_id": subject_id,
            "purpose": purpose.value,
            "reason": reason
        })
 
        return consent
 
    def check_consent(
        self,
        subject_id: str,
        purpose: ConsentPurpose
    ) -> bool:
        """
        Verifica daca exista consimtamant valid.
 
        ISO 27701 7.2.5: Verificarea consimtamantului
        """
        consent = self.storage.get_consent(subject_id, purpose)
 
        if not consent:
            return False
 
        if consent.status != ConsentStatus.GRANTED:
            return False
 
        if consent.expires_at and consent.expires_at < datetime.utcnow():
            consent.status = ConsentStatus.EXPIRED
            self.storage.save_consent(consent)
            return False
 
        # Verifica daca versiunea consimtamantului este actuala
        current_version = self.consent_versions.get(purpose, "1.0")
        if consent.version != current_version:
            self.audit.log({
                "event": "consent_version_mismatch",
                "subject_id": subject_id,
                "purpose": purpose.value,
                "consent_version": consent.version,
                "current_version": current_version
            })
            # Poate necesita re-consimtamant in functie de modificari
            return self._evaluate_version_compatibility(
                consent.version,
                current_version,
                purpose
            )
 
        return True
 
    def get_consent_status(
        self,
        subject_id: str
    ) -> ConsentPreferences:
        """Obtine starea completa a consimtamantului pentru un subiect."""
        consents = {}
 
        for purpose in ConsentPurpose:
            consent = self.storage.get_consent(subject_id, purpose)
            if consent:
                consents[purpose] = consent
 
        history = self.storage.get_consent_history(subject_id)
 
        return ConsentPreferences(
            subject_id=subject_id,
            consents=consents,
            last_updated=datetime.utcnow(),
            preference_history=history
        )
 
    def update_consent_version(
        self,
        purpose: ConsentPurpose,
        new_version: str,
        changes_description: str,
        requires_reconsent: bool = False
    ):
        """
        Actualizeaza versiunea consimtamantului cand termenii se schimba.
 
        ISO 27701 7.2.2: Modificari ale consimtamantului
        """
        old_version = self.consent_versions.get(purpose, "1.0")
        self.consent_versions[purpose] = new_version
 
        self.storage.save_version_change({
            "purpose": purpose.value,
            "old_version": old_version,
            "new_version": new_version,
            "changes": changes_description,
            "requires_reconsent": requires_reconsent,
            "effective_date": datetime.utcnow().isoformat()
        })
 
        if requires_reconsent:
            # Invalideaza consimtamintele existente
            affected = self.storage.get_subjects_with_consent(purpose)
            for subject_id in affected:
                self._request_reconsent(subject_id, purpose, changes_description)
 
    def generate_consent_receipt(
        self,
        consent: ConsentRecord
    ) -> Dict:
        """
        Genereaza o recipisa de consimtamant pentru persoana vizata.
 
        ISO 27701 7.2.6: Recipise de consimtamant
        """
        return {
            "receipt_id": consent.consent_id,
            "version": "1.0",
            "jurisdiction": "global",
            "consent_timestamp": consent.granted_at.isoformat() if consent.granted_at else None,
            "collection_method": consent.collection_point,
            "consent_receipt_id": consent.consent_id,
            "subject": {
                "subject_id": consent.subject_id
            },
            "data_controller": self._get_controller_info(),
            "purposes": [{
                "purpose": consent.purpose.value,
                "purpose_category": self._get_purpose_category(consent.purpose),
                "consent_type": "explicit",
                "pii_category": self._get_pii_categories(consent.purpose),
                "primary_purpose": True,
                "termination": consent.expires_at.isoformat() if consent.expires_at else "none",
                "third_party_disclosure": self._has_third_party_disclosure(consent.purpose)
            }],
            "sensitive": self._is_sensitive_purpose(consent.purpose),
            "spi_category": [],
            "proof": consent.proof_reference
        }
 
    def _handle_consent_withdrawal(
        self,
        subject_id: str,
        purpose: ConsentPurpose
    ):
        """Gestioneaza efectele downstream ale retragerii consimtamantului."""
        # Opreste prelucrarea
        self.storage.set_processing_flag(subject_id, purpose, False)
 
        # Notifica sistemele afectate
        affected_systems = self.storage.get_systems_using_consent(purpose)
        for system in affected_systems:
            self._notify_system_of_withdrawal(system, subject_id, purpose)
 
    def _evaluate_version_compatibility(
        self,
        old_version: str,
        new_version: str,
        purpose: ConsentPurpose
    ) -> bool:
        """Evalueaza daca versiunea veche a consimtamantului este compatibila cu cea noua."""
        # Implementarea depinde de strategia de versionare
        # Implicit, necesita re-consimtamant pentru schimbari de versiune majora
        old_major = int(old_version.split('.')[0])
        new_major = int(new_version.split('.')[0])
        return old_major == new_major
 
    def _request_reconsent(
        self,
        subject_id: str,
        purpose: ConsentPurpose,
        reason: str
    ):
        """Solicita re-consimtamant de la subiect."""
        self.audit.log({
            "event": "reconsent_requested",
            "subject_id": subject_id,
            "purpose": purpose.value,
            "reason": reason
        })
 
    def _get_controller_info(self) -> Dict:
        """Obtine informatiile operatorului de date."""
        return {
            "on_behalf": False,
            "contact": "privacy@example.com",
            "company": "Example Corp",
            "address": "123 Privacy St",
            "email": "privacy@example.com",
            "phone": "+1-555-0123"
        }
 
    def _get_purpose_category(self, purpose: ConsentPurpose) -> str:
        """Obtine categoria scopului."""
        categories = {
            ConsentPurpose.MARKETING: "marketing",
            ConsentPurpose.ANALYTICS: "core_function",
            ConsentPurpose.PERSONALIZATION: "improved_service"
        }
        return categories.get(purpose, "other")
 
    def _get_pii_categories(self, purpose: ConsentPurpose) -> List[str]:
        """Obtine categoriile PII pentru scop."""
        return ["contact", "demographic", "behavioral"]
 
    def _has_third_party_disclosure(self, purpose: ConsentPurpose) -> bool:
        """Verifica daca scopul implica dezvaluire catre terti."""
        return purpose == ConsentPurpose.THIRD_PARTY_SHARING
 
    def _is_sensitive_purpose(self, purpose: ConsentPurpose) -> bool:
        """Verifica daca scopul implica date sensibile."""
        return purpose in [ConsentPurpose.PROFILING, ConsentPurpose.AUTOMATED_DECISIONS]
 
    def _notify_system_of_withdrawal(
        self,
        system: str,
        subject_id: str,
        purpose: ConsentPurpose
    ):
        """Notifica sistemul despre retragerea consimtamantului."""
        pass
 
 
from datetime import timedelta

Evaluarea impactului asupra confidentialitatii

# privacy_impact_assessment.py
"""
Privacy Impact Assessment (PIA) implementation per ISO 27701.
"""
 
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
 
 
class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"
 
 
class PIAStatus(Enum):
    DRAFT = "draft"
    IN_REVIEW = "in_review"
    APPROVED = "approved"
    REQUIRES_CHANGES = "requires_changes"
    REJECTED = "rejected"
 
 
@dataclass
class DataFlow:
    """Descrie un flux de date in sistem."""
    flow_id: str
    source: str
    destination: str
    data_categories: List[str]
    purpose: str
    legal_basis: str
    retention_period: str
    safeguards: List[str]
 
 
@dataclass
class PrivacyRisk:
    """Risc de confidentialitate identificat."""
    risk_id: str
    description: str
    likelihood: RiskLevel
    impact: RiskLevel
    overall_risk: RiskLevel
    affected_rights: List[str]
    mitigations: List[str]
    residual_risk: RiskLevel
 
 
@dataclass
class PrivacyImpactAssessment:
    """Evaluare completa a impactului asupra confidentialitatii."""
    pia_id: str
    project_name: str
    description: str
    status: PIAStatus
    created_at: datetime
    updated_at: datetime
    assessor: str
    reviewer: Optional[str]
 
    # Componente ale evaluarii
    data_flows: List[DataFlow] = field(default_factory=list)
    risks: List[PrivacyRisk] = field(default_factory=list)
    data_subjects: List[str] = field(default_factory=list)
    special_categories: List[str] = field(default_factory=list)
 
    # Verificari de conformitate
    lawfulness_assessment: Dict = field(default_factory=dict)
    necessity_assessment: Dict = field(default_factory=dict)
    proportionality_assessment: Dict = field(default_factory=dict)
 
    # Rezultate
    recommendation: Optional[str] = None
    conditions: List[str] = field(default_factory=list)
 
 
class PIAManager:
    """
    Gestioneaza Evaluarile Impactului asupra Confidentialitatii conform ISO 27701 7.2.5.
    """
 
    def __init__(self, storage_backend, dpo_email: str):
        self.storage = storage_backend
        self.dpo_email = dpo_email
 
    def create_pia(
        self,
        project_name: str,
        description: str,
        assessor: str
    ) -> PrivacyImpactAssessment:
        """Creeaza o noua PIA."""
        import secrets
 
        pia = PrivacyImpactAssessment(
            pia_id=f"PIA-{secrets.token_hex(6).upper()}",
            project_name=project_name,
            description=description,
            status=PIAStatus.DRAFT,
            created_at=datetime.utcnow(),
            updated_at=datetime.utcnow(),
            assessor=assessor
        )
 
        self.storage.save_pia(pia)
        return pia
 
    def add_data_flow(
        self,
        pia_id: str,
        source: str,
        destination: str,
        data_categories: List[str],
        purpose: str,
        legal_basis: str,
        retention_period: str,
        safeguards: List[str]
    ) -> DataFlow:
        """Adauga un flux de date la PIA."""
        import secrets
 
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        flow = DataFlow(
            flow_id=f"DF-{secrets.token_hex(4)}",
            source=source,
            destination=destination,
            data_categories=data_categories,
            purpose=purpose,
            legal_basis=legal_basis,
            retention_period=retention_period,
            safeguards=safeguards
        )
 
        pia.data_flows.append(flow)
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
        return flow
 
    def identify_risk(
        self,
        pia_id: str,
        description: str,
        likelihood: RiskLevel,
        impact: RiskLevel,
        affected_rights: List[str],
        proposed_mitigations: List[str]
    ) -> PrivacyRisk:
        """Identifica si documenteaza un risc de confidentialitate."""
        import secrets
 
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        overall_risk = self._calculate_overall_risk(likelihood, impact)
 
        risk = PrivacyRisk(
            risk_id=f"RISK-{secrets.token_hex(4)}",
            description=description,
            likelihood=likelihood,
            impact=impact,
            overall_risk=overall_risk,
            affected_rights=affected_rights,
            mitigations=proposed_mitigations,
            residual_risk=self._calculate_residual_risk(overall_risk, proposed_mitigations)
        )
 
        pia.risks.append(risk)
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
        return risk
 
    def assess_lawfulness(
        self,
        pia_id: str,
        legal_basis: str,
        justification: str,
        documentation: List[str]
    ):
        """Evalueaza legalitatea prelucrarii."""
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        pia.lawfulness_assessment = {
            "legal_basis": legal_basis,
            "justification": justification,
            "documentation": documentation,
            "assessed_at": datetime.utcnow().isoformat(),
            "compliant": self._validate_legal_basis(legal_basis, pia)
        }
 
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
    def assess_necessity(
        self,
        pia_id: str,
        purpose: str,
        data_minimization: Dict,
        alternatives_considered: List[str]
    ):
        """Evalueaza necesitatea si minimizarea datelor."""
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        pia.necessity_assessment = {
            "purpose": purpose,
            "data_minimization": data_minimization,
            "alternatives_considered": alternatives_considered,
            "assessed_at": datetime.utcnow().isoformat(),
            "compliant": self._validate_necessity(pia)
        }
 
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
    def submit_for_review(self, pia_id: str) -> bool:
        """Trimite PIA pentru revizuirea DPO."""
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        # Valideaza completitudinea
        if not self._validate_pia_completeness(pia):
            return False
 
        pia.status = PIAStatus.IN_REVIEW
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
        # Notifica DPO
        self._notify_dpo_for_review(pia)
 
        return True
 
    def review_pia(
        self,
        pia_id: str,
        reviewer: str,
        approved: bool,
        recommendation: str,
        conditions: Optional[List[str]] = None
    ):
        """Revizuirea PIA de catre DPO."""
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        pia.reviewer = reviewer
        pia.recommendation = recommendation
        pia.conditions = conditions or []
 
        if approved:
            pia.status = PIAStatus.APPROVED if not conditions else PIAStatus.REQUIRES_CHANGES
        else:
            pia.status = PIAStatus.REJECTED
 
        pia.updated_at = datetime.utcnow()
        self.storage.save_pia(pia)
 
        # Notifica evaluatorul
        self._notify_assessor_of_result(pia)
 
    def generate_pia_report(self, pia_id: str) -> Dict:
        """Genereaza raportul formal PIA."""
        pia = self.storage.get_pia(pia_id)
        if not pia:
            raise ValueError("PIA not found")
 
        return {
            "report_metadata": {
                "pia_id": pia.pia_id,
                "generated_at": datetime.utcnow().isoformat(),
                "status": pia.status.value
            },
            "project_information": {
                "name": pia.project_name,
                "description": pia.description,
                "assessor": pia.assessor,
                "reviewer": pia.reviewer
            },
            "data_processing_overview": {
                "data_subjects": pia.data_subjects,
                "special_categories": pia.special_categories,
                "data_flows": [
                    {
                        "source": f.source,
                        "destination": f.destination,
                        "categories": f.data_categories,
                        "purpose": f.purpose,
                        "legal_basis": f.legal_basis
                    }
                    for f in pia.data_flows
                ]
            },
            "compliance_assessment": {
                "lawfulness": pia.lawfulness_assessment,
                "necessity": pia.necessity_assessment,
                "proportionality": pia.proportionality_assessment
            },
            "risk_assessment": {
                "identified_risks": [
                    {
                        "description": r.description,
                        "overall_risk": r.overall_risk.value,
                        "mitigations": r.mitigations,
                        "residual_risk": r.residual_risk.value
                    }
                    for r in pia.risks
                ],
                "high_risks": len([r for r in pia.risks if r.overall_risk in [RiskLevel.HIGH, RiskLevel.CRITICAL]])
            },
            "recommendation": pia.recommendation,
            "conditions": pia.conditions
        }
 
    def _calculate_overall_risk(
        self,
        likelihood: RiskLevel,
        impact: RiskLevel
    ) -> RiskLevel:
        """Calculeaza riscul general din probabilitate si impact."""
        risk_matrix = {
            (RiskLevel.LOW, RiskLevel.LOW): RiskLevel.LOW,
            (RiskLevel.LOW, RiskLevel.MEDIUM): RiskLevel.LOW,
            (RiskLevel.LOW, RiskLevel.HIGH): RiskLevel.MEDIUM,
            (RiskLevel.LOW, RiskLevel.CRITICAL): RiskLevel.HIGH,
            (RiskLevel.MEDIUM, RiskLevel.LOW): RiskLevel.LOW,
            (RiskLevel.MEDIUM, RiskLevel.MEDIUM): RiskLevel.MEDIUM,
            (RiskLevel.MEDIUM, RiskLevel.HIGH): RiskLevel.HIGH,
            (RiskLevel.MEDIUM, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
            (RiskLevel.HIGH, RiskLevel.LOW): RiskLevel.MEDIUM,
            (RiskLevel.HIGH, RiskLevel.MEDIUM): RiskLevel.HIGH,
            (RiskLevel.HIGH, RiskLevel.HIGH): RiskLevel.CRITICAL,
            (RiskLevel.HIGH, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
            (RiskLevel.CRITICAL, RiskLevel.LOW): RiskLevel.HIGH,
            (RiskLevel.CRITICAL, RiskLevel.MEDIUM): RiskLevel.CRITICAL,
            (RiskLevel.CRITICAL, RiskLevel.HIGH): RiskLevel.CRITICAL,
            (RiskLevel.CRITICAL, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
        }
        return risk_matrix.get((likelihood, impact), RiskLevel.MEDIUM)
 
    def _calculate_residual_risk(
        self,
        overall_risk: RiskLevel,
        mitigations: List[str]
    ) -> RiskLevel:
        """Calculeaza riscul rezidual dupa masurile de atenuare."""
        # Simplificat: fiecare masura de atenuare reduce riscul cu un nivel
        risk_levels = [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL]
        current_index = risk_levels.index(overall_risk)
 
        reduction = min(len(mitigations), 2)  # Reducere maxima de 2 niveluri
        new_index = max(0, current_index - reduction)
 
        return risk_levels[new_index]
 
    def _validate_legal_basis(self, legal_basis: str, pia: PrivacyImpactAssessment) -> bool:
        """Valideaza temeiul juridic."""
        valid_bases = ["consent", "contract", "legal_obligation", "vital_interests", "public_task", "legitimate_interests"]
        return legal_basis in valid_bases
 
    def _validate_necessity(self, pia: PrivacyImpactAssessment) -> bool:
        """Valideaza evaluarea necesitatii."""
        return bool(pia.necessity_assessment.get("data_minimization"))
 
    def _validate_pia_completeness(self, pia: PrivacyImpactAssessment) -> bool:
        """Verifica daca PIA are toate sectiunile necesare."""
        return all([
            pia.data_flows,
            pia.lawfulness_assessment,
            pia.necessity_assessment,
            pia.data_subjects
        ])
 
    def _notify_dpo_for_review(self, pia: PrivacyImpactAssessment):
        """Notifica DPO ca PIA este gata pentru revizuire."""
        pass
 
    def _notify_assessor_of_result(self, pia: PrivacyImpactAssessment):
        """Notifica evaluatorul despre rezultatul revizuirii."""
        pass

Concluzie

Implementarea ISO 27701 necesita controale complete de confidentialitate care acopera:

  1. Drepturile persoanelor vizate - cu gestionarea corespunzatoare a cererilor si verificarea identitatii
  2. Gestionarea consimtamantului - cu urmarire granulara si suport pentru retragere
  3. Evaluarile impactului asupra confidentialitatii - pentru activitati noi de prelucrare
  4. Integrarea cu ISO 27001 - pentru un management unificat al securitatii si confidentialitatii

Aceste controale permit organizatiilor sa demonstreze conformitatea in materie de confidentialitate si sa construiasca incredere cu persoanele vizate.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.