Implementarea ISO 27701 pentru confidentialitate: Construirea unui sistem de management al informatiilor de confidentialitate
ISO 27701 ofera un cadru pentru implementarea, mentinerea si imbunatatirea unui Sistem de Management al Informatiilor de Confidentialitate (PIMS) ca extensie a ISO 27001. Acest ghid acopera implementarea practica a controalelor de confidentialitate atat pentru operatorii de date, cat si pentru persoanele imputernicite.
Intelegerea ISO 27701
ISO 27701 extinde Sistemul de Management al Securitatii Informatiilor (ISMS) al ISO 27001 pentru a adresa cerinte specifice de confidentialitate:
- Clauza 5: Cerinte specifice PIMS care extind ISO 27001
- Clauza 6: Indrumari specifice PIMS care extind ISO 27002
- Clauza 7: Indrumari suplimentare pentru operatorii de PII
- Clauza 8: Indrumari suplimentare pentru persoanele imputernicite de PII
Implementarea controalelor de confidentialitate
Gestionarea drepturilor persoanelor vizate
# data_subject_rights.py
"""
Implementation of data subject rights per ISO 27701.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from enum import Enum
import json
import hashlib
import secrets
class RightType(Enum):
"""Tipuri de drepturi ale persoanelor vizate."""
ACCESS = "access"
RECTIFICATION = "rectification"
ERASURE = "erasure"
RESTRICTION = "restriction"
PORTABILITY = "portability"
OBJECTION = "objection"
AUTOMATED_DECISION = "automated_decision"
class RequestStatus(Enum):
"""Starea unei cereri a persoanei vizate."""
RECEIVED = "received"
IDENTITY_VERIFICATION = "identity_verification"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
REJECTED = "rejected"
EXTENDED = "extended"
@dataclass
class DataSubjectRequest:
"""Reprezinta o cerere de exercitare a drepturilor persoanei vizate."""
request_id: str
subject_id: str
subject_email: str
right_type: RightType
status: RequestStatus
created_at: datetime
due_date: datetime
completed_at: Optional[datetime] = None
extension_reason: Optional[str] = None
rejection_reason: Optional[str] = None
verification_token: Optional[str] = None
processing_notes: List[str] = field(default_factory=list)
affected_systems: List[str] = field(default_factory=list)
class DataSubjectRightsManager:
"""
Gestioneaza cererile de exercitare a drepturilor persoanelor vizate conform ISO 27701 7.3.
"""
# Termene de raspuns (in zile)
STANDARD_RESPONSE_TIME = 30
MAXIMUM_EXTENSION = 60
def __init__(self, storage_backend, notification_service):
self.storage = storage_backend
self.notifications = notification_service
self.requests: Dict[str, DataSubjectRequest] = {}
def create_request(
self,
subject_email: str,
right_type: RightType,
additional_info: Optional[Dict] = None
) -> DataSubjectRequest:
"""
Creeaza o noua cerere a persoanei vizate.
ISO 27701 7.3.2: Gestionarea cererilor
"""
request_id = f"DSR-{secrets.token_hex(8).upper()}"
subject_id = self._hash_identifier(subject_email)
verification_token = secrets.token_urlsafe(32)
request = DataSubjectRequest(
request_id=request_id,
subject_id=subject_id,
subject_email=subject_email,
right_type=right_type,
status=RequestStatus.RECEIVED,
created_at=datetime.utcnow(),
due_date=datetime.utcnow() + timedelta(days=self.STANDARD_RESPONSE_TIME),
verification_token=verification_token
)
self.requests[request_id] = request
self.storage.save_request(request)
# Trimite email de verificare
self._send_verification_email(request)
# Logare pentru pista de audit
self._log_request_action(request_id, "created", additional_info)
return request
def verify_identity(
self,
request_id: str,
verification_token: str,
identity_documents: Optional[List[str]] = None
) -> bool:
"""
Verifica identitatea persoanei vizate.
ISO 27701 7.3.5: Verificarea identitatii
"""
request = self.requests.get(request_id)
if not request:
return False
if request.verification_token != verification_token:
self._log_request_action(request_id, "verification_failed")
return False
# Pentru cereri cu risc ridicat, necesita verificare suplimentara
high_risk_rights = [RightType.ERASURE, RightType.PORTABILITY]
if request.right_type in high_risk_rights and not identity_documents:
request.status = RequestStatus.IDENTITY_VERIFICATION
request.processing_notes.append(
"Additional identity verification required for high-risk request"
)
return False
request.status = RequestStatus.IN_PROGRESS
self._log_request_action(request_id, "identity_verified")
return True
def process_access_request(
self,
request_id: str,
data_sources: List[str]
) -> Dict[str, Any]:
"""
Proceseaza o cerere de acces la date.
ISO 27701 7.3.3: Accesul la PII
"""
request = self.requests.get(request_id)
if not request or request.right_type != RightType.ACCESS:
raise ValueError("Invalid access request")
collected_data = {}
for source in data_sources:
try:
data = self._collect_data_from_source(
source,
request.subject_id
)
collected_data[source] = {
"data": self._sanitize_for_export(data),
"categories": self._categorize_data(data),
"purposes": self._get_processing_purposes(source),
"retention_period": self._get_retention_period(source)
}
request.affected_systems.append(source)
except Exception as e:
request.processing_notes.append(
f"Error collecting from {source}: {str(e)}"
)
# Compileaza raspunsul
response = {
"request_id": request_id,
"subject_email": request.subject_email,
"generated_at": datetime.utcnow().isoformat(),
"data_collected": collected_data,
"processing_information": self._get_processing_info(),
"third_party_recipients": self._get_third_party_list()
}
self._complete_request(request_id, response)
return response
def process_erasure_request(
self,
request_id: str,
data_sources: List[str]
) -> Dict[str, Any]:
"""
Proceseaza o cerere de stergere a datelor.
ISO 27701 7.3.6: Stergerea PII
"""
request = self.requests.get(request_id)
if not request or request.right_type != RightType.ERASURE:
raise ValueError("Invalid erasure request")
erasure_results = {}
for source in data_sources:
# Verificare suspendari legale sau cerinte de retentie
retention_check = self._check_retention_requirements(
source,
request.subject_id
)
if retention_check["must_retain"]:
erasure_results[source] = {
"status": "retained",
"reason": retention_check["reason"],
"retention_until": retention_check["until"]
}
request.processing_notes.append(
f"Data in {source} retained: {retention_check['reason']}"
)
else:
try:
# Efectueaza stergerea
self._erase_data_from_source(source, request.subject_id)
erasure_results[source] = {
"status": "erased",
"erased_at": datetime.utcnow().isoformat()
}
request.affected_systems.append(source)
# Notifica persoanele imputernicite
self._notify_processors_of_erasure(
source,
request.subject_id
)
except Exception as e:
erasure_results[source] = {
"status": "error",
"error": str(e)
}
response = {
"request_id": request_id,
"erasure_results": erasure_results,
"completed_at": datetime.utcnow().isoformat()
}
self._complete_request(request_id, response)
return response
def process_portability_request(
self,
request_id: str,
data_sources: List[str],
output_format: str = "json"
) -> bytes:
"""
Proceseaza o cerere de portabilitate a datelor.
ISO 27701 7.3.7: Portabilitatea PII
"""
request = self.requests.get(request_id)
if not request or request.right_type != RightType.PORTABILITY:
raise ValueError("Invalid portability request")
portable_data = {
"export_metadata": {
"request_id": request_id,
"exported_at": datetime.utcnow().isoformat(),
"format": output_format,
"schema_version": "1.0"
},
"data": {}
}
for source in data_sources:
# Include doar datele furnizate de subiect sau generate prin utilizare
data = self._collect_portable_data(source, request.subject_id)
if data:
portable_data["data"][source] = data
request.affected_systems.append(source)
# Converteste in formatul cerut
if output_format == "json":
export_data = json.dumps(portable_data, indent=2).encode('utf-8')
elif output_format == "csv":
export_data = self._convert_to_csv(portable_data)
else:
export_data = json.dumps(portable_data).encode('utf-8')
self._complete_request(request_id, {"format": output_format, "size": len(export_data)})
return export_data
def extend_deadline(
self,
request_id: str,
reason: str,
extension_days: int = 30
):
"""
Prelungeste termenul cererii cand este complexa.
ISO 27701 7.3.2: Cererile complexe pot fi prelungite
"""
request = self.requests.get(request_id)
if not request:
raise ValueError("Request not found")
if extension_days > self.MAXIMUM_EXTENSION - self.STANDARD_RESPONSE_TIME:
raise ValueError("Extension exceeds maximum allowed")
original_due = request.due_date
request.due_date = request.created_at + timedelta(
days=self.STANDARD_RESPONSE_TIME + extension_days
)
request.status = RequestStatus.EXTENDED
request.extension_reason = reason
# Notifica subiectul
self._send_extension_notification(request, original_due)
self._log_request_action(request_id, "extended", {"reason": reason})
def reject_request(
self,
request_id: str,
reason: str,
legal_basis: str
):
"""
Respinge o cerere cu justificare corespunzatoare.
ISO 27701 7.3.2: Cererile pot fi respinse in circumstante specifice
"""
request = self.requests.get(request_id)
if not request:
raise ValueError("Request not found")
request.status = RequestStatus.REJECTED
request.rejection_reason = f"{reason} (Legal basis: {legal_basis})"
request.completed_at = datetime.utcnow()
# Notifica subiectul cu informatii despre contestare
self._send_rejection_notification(request)
self._log_request_action(
request_id,
"rejected",
{"reason": reason, "legal_basis": legal_basis}
)
def _complete_request(self, request_id: str, result: Dict):
"""Marcheaza cererea ca finalizata."""
request = self.requests.get(request_id)
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.utcnow()
self._send_completion_notification(request, result)
self._log_request_action(request_id, "completed", result)
def _hash_identifier(self, identifier: str) -> str:
"""Creeaza hash consistent al identificatorului."""
return hashlib.sha256(identifier.lower().encode()).hexdigest()
def _log_request_action(
self,
request_id: str,
action: str,
details: Optional[Dict] = None
):
"""Logheaza actiunea pentru pista de audit."""
self.storage.log_audit_event({
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_id,
"action": action,
"details": details
})
# Metode placeholder pentru implementare specifica sistemului
def _collect_data_from_source(self, source: str, subject_id: str) -> Dict:
"""Colecteaza date dintr-un sistem sursa specific."""
raise NotImplementedError()
def _erase_data_from_source(self, source: str, subject_id: str):
"""Sterge date dintr-un sistem sursa specific."""
raise NotImplementedError()
def _collect_portable_data(self, source: str, subject_id: str) -> Dict:
"""Colecteaza date portabile din sursa."""
raise NotImplementedError()
def _check_retention_requirements(self, source: str, subject_id: str) -> Dict:
"""Verifica daca datele trebuie pastrate."""
raise NotImplementedError()
def _send_verification_email(self, request: DataSubjectRequest):
"""Trimite email de verificare a identitatii."""
raise NotImplementedError()
def _send_extension_notification(self, request: DataSubjectRequest, original_due: datetime):
"""Notifica subiectul despre prelungirea termenului."""
raise NotImplementedError()
def _send_rejection_notification(self, request: DataSubjectRequest):
"""Notifica subiectul despre respingerea cererii."""
raise NotImplementedError()
def _send_completion_notification(self, request: DataSubjectRequest, result: Dict):
"""Notifica subiectul despre finalizarea cererii."""
raise NotImplementedError()
def _sanitize_for_export(self, data: Dict) -> Dict:
"""Elimina campurile interne sensibile inainte de export."""
raise NotImplementedError()
def _categorize_data(self, data: Dict) -> List[str]:
"""Categorizeaza tipurile de date din raspuns."""
raise NotImplementedError()
def _get_processing_purposes(self, source: str) -> List[str]:
"""Obtine scopurile prelucrarii pentru o sursa de date."""
raise NotImplementedError()
def _get_retention_period(self, source: str) -> str:
"""Obtine perioada de retentie pentru o sursa de date."""
raise NotImplementedError()
def _get_processing_info(self) -> Dict:
"""Obtine informatii generale despre prelucrare."""
raise NotImplementedError()
def _get_third_party_list(self) -> List[Dict]:
"""Obtine lista destinatarilor terti."""
raise NotImplementedError()
def _notify_processors_of_erasure(self, source: str, subject_id: str):
"""Notifica persoanele imputernicite despre stergere."""
raise NotImplementedError()
def _convert_to_csv(self, data: Dict) -> bytes:
"""Converteste datele in format CSV."""
raise NotImplementedError()Gestionarea consimtamantului
# consent_management.py
"""
Consent management implementation per ISO 27701.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
import json
class ConsentPurpose(Enum):
"""Scopuri standard ale consimtamantului."""
MARKETING = "marketing"
ANALYTICS = "analytics"
PERSONALIZATION = "personalization"
THIRD_PARTY_SHARING = "third_party_sharing"
PROFILING = "profiling"
AUTOMATED_DECISIONS = "automated_decisions"
RESEARCH = "research"
SERVICE_IMPROVEMENT = "service_improvement"
class ConsentStatus(Enum):
"""Valori ale starii consimtamantului."""
GRANTED = "granted"
DENIED = "denied"
WITHDRAWN = "withdrawn"
EXPIRED = "expired"
@dataclass
class ConsentRecord:
"""Inregistrare individuala de consimtamant."""
consent_id: str
subject_id: str
purpose: ConsentPurpose
status: ConsentStatus
granted_at: Optional[datetime]
withdrawn_at: Optional[datetime]
expires_at: Optional[datetime]
version: str
collection_point: str
proof_reference: str
@dataclass
class ConsentPreferences:
"""Preferinte complete de consimtamant pentru un subiect."""
subject_id: str
consents: Dict[ConsentPurpose, ConsentRecord]
last_updated: datetime
preference_history: List[Dict]
class ConsentManager:
"""
Gestioneaza ciclul de viata al consimtamantului conform ISO 27701 7.2.
"""
def __init__(self, storage_backend, audit_logger):
self.storage = storage_backend
self.audit = audit_logger
self.consent_versions: Dict[ConsentPurpose, str] = {}
def record_consent(
self,
subject_id: str,
purpose: ConsentPurpose,
granted: bool,
collection_point: str,
proof: str,
expiry_days: Optional[int] = None
) -> ConsentRecord:
"""
Inregistreaza o decizie de consimtamant.
ISO 27701 7.2.3: Inregistrari de consimtamant
"""
import secrets
consent = ConsentRecord(
consent_id=f"CON-{secrets.token_hex(8)}",
subject_id=subject_id,
purpose=purpose,
status=ConsentStatus.GRANTED if granted else ConsentStatus.DENIED,
granted_at=datetime.utcnow() if granted else None,
withdrawn_at=None,
expires_at=datetime.utcnow() + timedelta(days=expiry_days) if expiry_days else None,
version=self.consent_versions.get(purpose, "1.0"),
collection_point=collection_point,
proof_reference=proof
)
self.storage.save_consent(consent)
self.audit.log({
"event": "consent_recorded",
"subject_id": subject_id,
"purpose": purpose.value,
"status": consent.status.value,
"collection_point": collection_point
})
return consent
def withdraw_consent(
self,
subject_id: str,
purpose: ConsentPurpose,
reason: Optional[str] = None
) -> ConsentRecord:
"""
Retrage un consimtamant acordat anterior.
ISO 27701 7.2.4: Retragerea consimtamantului
"""
consent = self.storage.get_consent(subject_id, purpose)
if not consent:
raise ValueError("No consent record found")
consent.status = ConsentStatus.WITHDRAWN
consent.withdrawn_at = datetime.utcnow()
self.storage.save_consent(consent)
# Declanseaza prelucrarea downstream
self._handle_consent_withdrawal(subject_id, purpose)
self.audit.log({
"event": "consent_withdrawn",
"subject_id": subject_id,
"purpose": purpose.value,
"reason": reason
})
return consent
def check_consent(
self,
subject_id: str,
purpose: ConsentPurpose
) -> bool:
"""
Verifica daca exista consimtamant valid.
ISO 27701 7.2.5: Verificarea consimtamantului
"""
consent = self.storage.get_consent(subject_id, purpose)
if not consent:
return False
if consent.status != ConsentStatus.GRANTED:
return False
if consent.expires_at and consent.expires_at < datetime.utcnow():
consent.status = ConsentStatus.EXPIRED
self.storage.save_consent(consent)
return False
# Verifica daca versiunea consimtamantului este actuala
current_version = self.consent_versions.get(purpose, "1.0")
if consent.version != current_version:
self.audit.log({
"event": "consent_version_mismatch",
"subject_id": subject_id,
"purpose": purpose.value,
"consent_version": consent.version,
"current_version": current_version
})
# Poate necesita re-consimtamant in functie de modificari
return self._evaluate_version_compatibility(
consent.version,
current_version,
purpose
)
return True
def get_consent_status(
self,
subject_id: str
) -> ConsentPreferences:
"""Obtine starea completa a consimtamantului pentru un subiect."""
consents = {}
for purpose in ConsentPurpose:
consent = self.storage.get_consent(subject_id, purpose)
if consent:
consents[purpose] = consent
history = self.storage.get_consent_history(subject_id)
return ConsentPreferences(
subject_id=subject_id,
consents=consents,
last_updated=datetime.utcnow(),
preference_history=history
)
def update_consent_version(
self,
purpose: ConsentPurpose,
new_version: str,
changes_description: str,
requires_reconsent: bool = False
):
"""
Actualizeaza versiunea consimtamantului cand termenii se schimba.
ISO 27701 7.2.2: Modificari ale consimtamantului
"""
old_version = self.consent_versions.get(purpose, "1.0")
self.consent_versions[purpose] = new_version
self.storage.save_version_change({
"purpose": purpose.value,
"old_version": old_version,
"new_version": new_version,
"changes": changes_description,
"requires_reconsent": requires_reconsent,
"effective_date": datetime.utcnow().isoformat()
})
if requires_reconsent:
# Invalideaza consimtamintele existente
affected = self.storage.get_subjects_with_consent(purpose)
for subject_id in affected:
self._request_reconsent(subject_id, purpose, changes_description)
def generate_consent_receipt(
self,
consent: ConsentRecord
) -> Dict:
"""
Genereaza o recipisa de consimtamant pentru persoana vizata.
ISO 27701 7.2.6: Recipise de consimtamant
"""
return {
"receipt_id": consent.consent_id,
"version": "1.0",
"jurisdiction": "global",
"consent_timestamp": consent.granted_at.isoformat() if consent.granted_at else None,
"collection_method": consent.collection_point,
"consent_receipt_id": consent.consent_id,
"subject": {
"subject_id": consent.subject_id
},
"data_controller": self._get_controller_info(),
"purposes": [{
"purpose": consent.purpose.value,
"purpose_category": self._get_purpose_category(consent.purpose),
"consent_type": "explicit",
"pii_category": self._get_pii_categories(consent.purpose),
"primary_purpose": True,
"termination": consent.expires_at.isoformat() if consent.expires_at else "none",
"third_party_disclosure": self._has_third_party_disclosure(consent.purpose)
}],
"sensitive": self._is_sensitive_purpose(consent.purpose),
"spi_category": [],
"proof": consent.proof_reference
}
def _handle_consent_withdrawal(
self,
subject_id: str,
purpose: ConsentPurpose
):
"""Gestioneaza efectele downstream ale retragerii consimtamantului."""
# Opreste prelucrarea
self.storage.set_processing_flag(subject_id, purpose, False)
# Notifica sistemele afectate
affected_systems = self.storage.get_systems_using_consent(purpose)
for system in affected_systems:
self._notify_system_of_withdrawal(system, subject_id, purpose)
def _evaluate_version_compatibility(
self,
old_version: str,
new_version: str,
purpose: ConsentPurpose
) -> bool:
"""Evalueaza daca versiunea veche a consimtamantului este compatibila cu cea noua."""
# Implementarea depinde de strategia de versionare
# Implicit, necesita re-consimtamant pentru schimbari de versiune majora
old_major = int(old_version.split('.')[0])
new_major = int(new_version.split('.')[0])
return old_major == new_major
def _request_reconsent(
self,
subject_id: str,
purpose: ConsentPurpose,
reason: str
):
"""Solicita re-consimtamant de la subiect."""
self.audit.log({
"event": "reconsent_requested",
"subject_id": subject_id,
"purpose": purpose.value,
"reason": reason
})
def _get_controller_info(self) -> Dict:
"""Obtine informatiile operatorului de date."""
return {
"on_behalf": False,
"contact": "privacy@example.com",
"company": "Example Corp",
"address": "123 Privacy St",
"email": "privacy@example.com",
"phone": "+1-555-0123"
}
def _get_purpose_category(self, purpose: ConsentPurpose) -> str:
"""Obtine categoria scopului."""
categories = {
ConsentPurpose.MARKETING: "marketing",
ConsentPurpose.ANALYTICS: "core_function",
ConsentPurpose.PERSONALIZATION: "improved_service"
}
return categories.get(purpose, "other")
def _get_pii_categories(self, purpose: ConsentPurpose) -> List[str]:
"""Obtine categoriile PII pentru scop."""
return ["contact", "demographic", "behavioral"]
def _has_third_party_disclosure(self, purpose: ConsentPurpose) -> bool:
"""Verifica daca scopul implica dezvaluire catre terti."""
return purpose == ConsentPurpose.THIRD_PARTY_SHARING
def _is_sensitive_purpose(self, purpose: ConsentPurpose) -> bool:
"""Verifica daca scopul implica date sensibile."""
return purpose in [ConsentPurpose.PROFILING, ConsentPurpose.AUTOMATED_DECISIONS]
def _notify_system_of_withdrawal(
self,
system: str,
subject_id: str,
purpose: ConsentPurpose
):
"""Notifica sistemul despre retragerea consimtamantului."""
pass
from datetime import timedeltaEvaluarea impactului asupra confidentialitatii
# privacy_impact_assessment.py
"""
Privacy Impact Assessment (PIA) implementation per ISO 27701.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class PIAStatus(Enum):
DRAFT = "draft"
IN_REVIEW = "in_review"
APPROVED = "approved"
REQUIRES_CHANGES = "requires_changes"
REJECTED = "rejected"
@dataclass
class DataFlow:
"""Descrie un flux de date in sistem."""
flow_id: str
source: str
destination: str
data_categories: List[str]
purpose: str
legal_basis: str
retention_period: str
safeguards: List[str]
@dataclass
class PrivacyRisk:
"""Risc de confidentialitate identificat."""
risk_id: str
description: str
likelihood: RiskLevel
impact: RiskLevel
overall_risk: RiskLevel
affected_rights: List[str]
mitigations: List[str]
residual_risk: RiskLevel
@dataclass
class PrivacyImpactAssessment:
"""Evaluare completa a impactului asupra confidentialitatii."""
pia_id: str
project_name: str
description: str
status: PIAStatus
created_at: datetime
updated_at: datetime
assessor: str
reviewer: Optional[str]
# Componente ale evaluarii
data_flows: List[DataFlow] = field(default_factory=list)
risks: List[PrivacyRisk] = field(default_factory=list)
data_subjects: List[str] = field(default_factory=list)
special_categories: List[str] = field(default_factory=list)
# Verificari de conformitate
lawfulness_assessment: Dict = field(default_factory=dict)
necessity_assessment: Dict = field(default_factory=dict)
proportionality_assessment: Dict = field(default_factory=dict)
# Rezultate
recommendation: Optional[str] = None
conditions: List[str] = field(default_factory=list)
class PIAManager:
"""
Gestioneaza Evaluarile Impactului asupra Confidentialitatii conform ISO 27701 7.2.5.
"""
def __init__(self, storage_backend, dpo_email: str):
self.storage = storage_backend
self.dpo_email = dpo_email
def create_pia(
self,
project_name: str,
description: str,
assessor: str
) -> PrivacyImpactAssessment:
"""Creeaza o noua PIA."""
import secrets
pia = PrivacyImpactAssessment(
pia_id=f"PIA-{secrets.token_hex(6).upper()}",
project_name=project_name,
description=description,
status=PIAStatus.DRAFT,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
assessor=assessor
)
self.storage.save_pia(pia)
return pia
def add_data_flow(
self,
pia_id: str,
source: str,
destination: str,
data_categories: List[str],
purpose: str,
legal_basis: str,
retention_period: str,
safeguards: List[str]
) -> DataFlow:
"""Adauga un flux de date la PIA."""
import secrets
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
flow = DataFlow(
flow_id=f"DF-{secrets.token_hex(4)}",
source=source,
destination=destination,
data_categories=data_categories,
purpose=purpose,
legal_basis=legal_basis,
retention_period=retention_period,
safeguards=safeguards
)
pia.data_flows.append(flow)
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
return flow
def identify_risk(
self,
pia_id: str,
description: str,
likelihood: RiskLevel,
impact: RiskLevel,
affected_rights: List[str],
proposed_mitigations: List[str]
) -> PrivacyRisk:
"""Identifica si documenteaza un risc de confidentialitate."""
import secrets
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
overall_risk = self._calculate_overall_risk(likelihood, impact)
risk = PrivacyRisk(
risk_id=f"RISK-{secrets.token_hex(4)}",
description=description,
likelihood=likelihood,
impact=impact,
overall_risk=overall_risk,
affected_rights=affected_rights,
mitigations=proposed_mitigations,
residual_risk=self._calculate_residual_risk(overall_risk, proposed_mitigations)
)
pia.risks.append(risk)
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
return risk
def assess_lawfulness(
self,
pia_id: str,
legal_basis: str,
justification: str,
documentation: List[str]
):
"""Evalueaza legalitatea prelucrarii."""
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
pia.lawfulness_assessment = {
"legal_basis": legal_basis,
"justification": justification,
"documentation": documentation,
"assessed_at": datetime.utcnow().isoformat(),
"compliant": self._validate_legal_basis(legal_basis, pia)
}
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
def assess_necessity(
self,
pia_id: str,
purpose: str,
data_minimization: Dict,
alternatives_considered: List[str]
):
"""Evalueaza necesitatea si minimizarea datelor."""
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
pia.necessity_assessment = {
"purpose": purpose,
"data_minimization": data_minimization,
"alternatives_considered": alternatives_considered,
"assessed_at": datetime.utcnow().isoformat(),
"compliant": self._validate_necessity(pia)
}
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
def submit_for_review(self, pia_id: str) -> bool:
"""Trimite PIA pentru revizuirea DPO."""
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
# Valideaza completitudinea
if not self._validate_pia_completeness(pia):
return False
pia.status = PIAStatus.IN_REVIEW
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
# Notifica DPO
self._notify_dpo_for_review(pia)
return True
def review_pia(
self,
pia_id: str,
reviewer: str,
approved: bool,
recommendation: str,
conditions: Optional[List[str]] = None
):
"""Revizuirea PIA de catre DPO."""
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
pia.reviewer = reviewer
pia.recommendation = recommendation
pia.conditions = conditions or []
if approved:
pia.status = PIAStatus.APPROVED if not conditions else PIAStatus.REQUIRES_CHANGES
else:
pia.status = PIAStatus.REJECTED
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
# Notifica evaluatorul
self._notify_assessor_of_result(pia)
def generate_pia_report(self, pia_id: str) -> Dict:
"""Genereaza raportul formal PIA."""
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
return {
"report_metadata": {
"pia_id": pia.pia_id,
"generated_at": datetime.utcnow().isoformat(),
"status": pia.status.value
},
"project_information": {
"name": pia.project_name,
"description": pia.description,
"assessor": pia.assessor,
"reviewer": pia.reviewer
},
"data_processing_overview": {
"data_subjects": pia.data_subjects,
"special_categories": pia.special_categories,
"data_flows": [
{
"source": f.source,
"destination": f.destination,
"categories": f.data_categories,
"purpose": f.purpose,
"legal_basis": f.legal_basis
}
for f in pia.data_flows
]
},
"compliance_assessment": {
"lawfulness": pia.lawfulness_assessment,
"necessity": pia.necessity_assessment,
"proportionality": pia.proportionality_assessment
},
"risk_assessment": {
"identified_risks": [
{
"description": r.description,
"overall_risk": r.overall_risk.value,
"mitigations": r.mitigations,
"residual_risk": r.residual_risk.value
}
for r in pia.risks
],
"high_risks": len([r for r in pia.risks if r.overall_risk in [RiskLevel.HIGH, RiskLevel.CRITICAL]])
},
"recommendation": pia.recommendation,
"conditions": pia.conditions
}
def _calculate_overall_risk(
self,
likelihood: RiskLevel,
impact: RiskLevel
) -> RiskLevel:
"""Calculeaza riscul general din probabilitate si impact."""
risk_matrix = {
(RiskLevel.LOW, RiskLevel.LOW): RiskLevel.LOW,
(RiskLevel.LOW, RiskLevel.MEDIUM): RiskLevel.LOW,
(RiskLevel.LOW, RiskLevel.HIGH): RiskLevel.MEDIUM,
(RiskLevel.LOW, RiskLevel.CRITICAL): RiskLevel.HIGH,
(RiskLevel.MEDIUM, RiskLevel.LOW): RiskLevel.LOW,
(RiskLevel.MEDIUM, RiskLevel.MEDIUM): RiskLevel.MEDIUM,
(RiskLevel.MEDIUM, RiskLevel.HIGH): RiskLevel.HIGH,
(RiskLevel.MEDIUM, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
(RiskLevel.HIGH, RiskLevel.LOW): RiskLevel.MEDIUM,
(RiskLevel.HIGH, RiskLevel.MEDIUM): RiskLevel.HIGH,
(RiskLevel.HIGH, RiskLevel.HIGH): RiskLevel.CRITICAL,
(RiskLevel.HIGH, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
(RiskLevel.CRITICAL, RiskLevel.LOW): RiskLevel.HIGH,
(RiskLevel.CRITICAL, RiskLevel.MEDIUM): RiskLevel.CRITICAL,
(RiskLevel.CRITICAL, RiskLevel.HIGH): RiskLevel.CRITICAL,
(RiskLevel.CRITICAL, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
}
return risk_matrix.get((likelihood, impact), RiskLevel.MEDIUM)
def _calculate_residual_risk(
self,
overall_risk: RiskLevel,
mitigations: List[str]
) -> RiskLevel:
"""Calculeaza riscul rezidual dupa masurile de atenuare."""
# Simplificat: fiecare masura de atenuare reduce riscul cu un nivel
risk_levels = [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL]
current_index = risk_levels.index(overall_risk)
reduction = min(len(mitigations), 2) # Reducere maxima de 2 niveluri
new_index = max(0, current_index - reduction)
return risk_levels[new_index]
def _validate_legal_basis(self, legal_basis: str, pia: PrivacyImpactAssessment) -> bool:
"""Valideaza temeiul juridic."""
valid_bases = ["consent", "contract", "legal_obligation", "vital_interests", "public_task", "legitimate_interests"]
return legal_basis in valid_bases
def _validate_necessity(self, pia: PrivacyImpactAssessment) -> bool:
"""Valideaza evaluarea necesitatii."""
return bool(pia.necessity_assessment.get("data_minimization"))
def _validate_pia_completeness(self, pia: PrivacyImpactAssessment) -> bool:
"""Verifica daca PIA are toate sectiunile necesare."""
return all([
pia.data_flows,
pia.lawfulness_assessment,
pia.necessity_assessment,
pia.data_subjects
])
def _notify_dpo_for_review(self, pia: PrivacyImpactAssessment):
"""Notifica DPO ca PIA este gata pentru revizuire."""
pass
def _notify_assessor_of_result(self, pia: PrivacyImpactAssessment):
"""Notifica evaluatorul despre rezultatul revizuirii."""
passConcluzie
Implementarea ISO 27701 necesita controale complete de confidentialitate care acopera:
- Drepturile persoanelor vizate - cu gestionarea corespunzatoare a cererilor si verificarea identitatii
- Gestionarea consimtamantului - cu urmarire granulara si suport pentru retragere
- Evaluarile impactului asupra confidentialitatii - pentru activitati noi de prelucrare
- Integrarea cu ISO 27001 - pentru un management unificat al securitatii si confidentialitatii
Aceste controale permit organizatiilor sa demonstreze conformitatea in materie de confidentialitate si sa construiasca incredere cu persoanele vizate.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →