ISO 27701 Privacy Implementation: Building a Privacy Information Management System
ISO 27701 provides a framework for implementing, maintaining, and improving a Privacy Information Management System (PIMS) as an extension to ISO 27001. This guide covers practical implementation of privacy controls for both data controllers and processors.
Understanding ISO 27701
ISO 27701 extends ISO 27001's Information Security Management System (ISMS) to address privacy-specific requirements:
- Clause 5: PIMS-specific requirements extending ISO 27001
- Clause 6: PIMS-specific guidance extending ISO 27002
- Clause 7: Additional guidance for PII controllers
- Clause 8: Additional guidance for PII processors
Privacy Control Implementation
Data Subject Rights Management
# data_subject_rights.py
"""
Implementation of data subject rights per ISO 27701.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
from enum import Enum
import json
import hashlib
import secrets
class RightType(Enum):
"""Types of data subject rights."""
ACCESS = "access"
RECTIFICATION = "rectification"
ERASURE = "erasure"
RESTRICTION = "restriction"
PORTABILITY = "portability"
OBJECTION = "objection"
AUTOMATED_DECISION = "automated_decision"
class RequestStatus(Enum):
"""Status of a data subject request."""
RECEIVED = "received"
IDENTITY_VERIFICATION = "identity_verification"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
REJECTED = "rejected"
EXTENDED = "extended"
@dataclass
class DataSubjectRequest:
"""Represents a data subject rights request."""
request_id: str
subject_id: str
subject_email: str
right_type: RightType
status: RequestStatus
created_at: datetime
due_date: datetime
completed_at: Optional[datetime] = None
extension_reason: Optional[str] = None
rejection_reason: Optional[str] = None
verification_token: Optional[str] = None
processing_notes: List[str] = field(default_factory=list)
affected_systems: List[str] = field(default_factory=list)
class DataSubjectRightsManager:
"""
Manages data subject rights requests per ISO 27701 7.3.
"""
# Response timeframes (in days)
STANDARD_RESPONSE_TIME = 30
MAXIMUM_EXTENSION = 60
def __init__(self, storage_backend, notification_service):
self.storage = storage_backend
self.notifications = notification_service
self.requests: Dict[str, DataSubjectRequest] = {}
def create_request(
self,
subject_email: str,
right_type: RightType,
additional_info: Optional[Dict] = None
) -> DataSubjectRequest:
"""
Create a new data subject request.
ISO 27701 7.3.2: Request handling
"""
request_id = f"DSR-{secrets.token_hex(8).upper()}"
subject_id = self._hash_identifier(subject_email)
verification_token = secrets.token_urlsafe(32)
request = DataSubjectRequest(
request_id=request_id,
subject_id=subject_id,
subject_email=subject_email,
right_type=right_type,
status=RequestStatus.RECEIVED,
created_at=datetime.utcnow(),
due_date=datetime.utcnow() + timedelta(days=self.STANDARD_RESPONSE_TIME),
verification_token=verification_token
)
self.requests[request_id] = request
self.storage.save_request(request)
# Send verification email
self._send_verification_email(request)
# Log for audit trail
self._log_request_action(request_id, "created", additional_info)
return request
def verify_identity(
self,
request_id: str,
verification_token: str,
identity_documents: Optional[List[str]] = None
) -> bool:
"""
Verify data subject identity.
ISO 27701 7.3.5: Identity verification
"""
request = self.requests.get(request_id)
if not request:
return False
if request.verification_token != verification_token:
self._log_request_action(request_id, "verification_failed")
return False
# For high-risk requests, require additional verification
high_risk_rights = [RightType.ERASURE, RightType.PORTABILITY]
if request.right_type in high_risk_rights and not identity_documents:
request.status = RequestStatus.IDENTITY_VERIFICATION
request.processing_notes.append(
"Additional identity verification required for high-risk request"
)
return False
request.status = RequestStatus.IN_PROGRESS
self._log_request_action(request_id, "identity_verified")
return True
def process_access_request(
self,
request_id: str,
data_sources: List[str]
) -> Dict[str, Any]:
"""
Process a data access request.
ISO 27701 7.3.3: Access to PII
"""
request = self.requests.get(request_id)
if not request or request.right_type != RightType.ACCESS:
raise ValueError("Invalid access request")
collected_data = {}
for source in data_sources:
try:
data = self._collect_data_from_source(
source,
request.subject_id
)
collected_data[source] = {
"data": self._sanitize_for_export(data),
"categories": self._categorize_data(data),
"purposes": self._get_processing_purposes(source),
"retention_period": self._get_retention_period(source)
}
request.affected_systems.append(source)
except Exception as e:
request.processing_notes.append(
f"Error collecting from {source}: {str(e)}"
)
# Compile response
response = {
"request_id": request_id,
"subject_email": request.subject_email,
"generated_at": datetime.utcnow().isoformat(),
"data_collected": collected_data,
"processing_information": self._get_processing_info(),
"third_party_recipients": self._get_third_party_list()
}
self._complete_request(request_id, response)
return response
def process_erasure_request(
self,
request_id: str,
data_sources: List[str]
) -> Dict[str, Any]:
"""
Process a data erasure request.
ISO 27701 7.3.6: Erasure of PII
"""
request = self.requests.get(request_id)
if not request or request.right_type != RightType.ERASURE:
raise ValueError("Invalid erasure request")
erasure_results = {}
for source in data_sources:
# Check for legal holds or retention requirements
retention_check = self._check_retention_requirements(
source,
request.subject_id
)
if retention_check["must_retain"]:
erasure_results[source] = {
"status": "retained",
"reason": retention_check["reason"],
"retention_until": retention_check["until"]
}
request.processing_notes.append(
f"Data in {source} retained: {retention_check['reason']}"
)
else:
try:
# Perform erasure
self._erase_data_from_source(source, request.subject_id)
erasure_results[source] = {
"status": "erased",
"erased_at": datetime.utcnow().isoformat()
}
request.affected_systems.append(source)
# Notify processors
self._notify_processors_of_erasure(
source,
request.subject_id
)
except Exception as e:
erasure_results[source] = {
"status": "error",
"error": str(e)
}
response = {
"request_id": request_id,
"erasure_results": erasure_results,
"completed_at": datetime.utcnow().isoformat()
}
self._complete_request(request_id, response)
return response
def process_portability_request(
self,
request_id: str,
data_sources: List[str],
output_format: str = "json"
) -> bytes:
"""
Process a data portability request.
ISO 27701 7.3.7: PII portability
"""
request = self.requests.get(request_id)
if not request or request.right_type != RightType.PORTABILITY:
raise ValueError("Invalid portability request")
portable_data = {
"export_metadata": {
"request_id": request_id,
"exported_at": datetime.utcnow().isoformat(),
"format": output_format,
"schema_version": "1.0"
},
"data": {}
}
for source in data_sources:
# Only include data provided by subject or generated through use
data = self._collect_portable_data(source, request.subject_id)
if data:
portable_data["data"][source] = data
request.affected_systems.append(source)
# Convert to requested format
if output_format == "json":
export_data = json.dumps(portable_data, indent=2).encode('utf-8')
elif output_format == "csv":
export_data = self._convert_to_csv(portable_data)
else:
export_data = json.dumps(portable_data).encode('utf-8')
self._complete_request(request_id, {"format": output_format, "size": len(export_data)})
return export_data
def extend_deadline(
self,
request_id: str,
reason: str,
extension_days: int = 30
):
"""
Extend request deadline when complex.
ISO 27701 7.3.2: Complex requests may be extended
"""
request = self.requests.get(request_id)
if not request:
raise ValueError("Request not found")
if extension_days > self.MAXIMUM_EXTENSION - self.STANDARD_RESPONSE_TIME:
raise ValueError("Extension exceeds maximum allowed")
original_due = request.due_date
request.due_date = request.created_at + timedelta(
days=self.STANDARD_RESPONSE_TIME + extension_days
)
request.status = RequestStatus.EXTENDED
request.extension_reason = reason
# Notify subject
self._send_extension_notification(request, original_due)
self._log_request_action(request_id, "extended", {"reason": reason})
def reject_request(
self,
request_id: str,
reason: str,
legal_basis: str
):
"""
Reject a request with proper justification.
ISO 27701 7.3.2: Requests may be rejected in specific circumstances
"""
request = self.requests.get(request_id)
if not request:
raise ValueError("Request not found")
request.status = RequestStatus.REJECTED
request.rejection_reason = f"{reason} (Legal basis: {legal_basis})"
request.completed_at = datetime.utcnow()
# Notify subject with appeal information
self._send_rejection_notification(request)
self._log_request_action(
request_id,
"rejected",
{"reason": reason, "legal_basis": legal_basis}
)
def _complete_request(self, request_id: str, result: Dict):
"""Mark request as completed."""
request = self.requests.get(request_id)
request.status = RequestStatus.COMPLETED
request.completed_at = datetime.utcnow()
self._send_completion_notification(request, result)
self._log_request_action(request_id, "completed", result)
def _hash_identifier(self, identifier: str) -> str:
"""Create consistent hash of identifier."""
return hashlib.sha256(identifier.lower().encode()).hexdigest()
def _log_request_action(
self,
request_id: str,
action: str,
details: Optional[Dict] = None
):
"""Log action for audit trail."""
self.storage.log_audit_event({
"timestamp": datetime.utcnow().isoformat(),
"request_id": request_id,
"action": action,
"details": details
})
# Placeholder methods for system-specific implementation
def _collect_data_from_source(self, source: str, subject_id: str) -> Dict:
"""Collect data from a specific source system."""
raise NotImplementedError()
def _erase_data_from_source(self, source: str, subject_id: str):
"""Erase data from a specific source system."""
raise NotImplementedError()
def _collect_portable_data(self, source: str, subject_id: str) -> Dict:
"""Collect portable data from source."""
raise NotImplementedError()
def _check_retention_requirements(self, source: str, subject_id: str) -> Dict:
"""Check if data must be retained."""
raise NotImplementedError()
def _send_verification_email(self, request: DataSubjectRequest):
"""Send identity verification email."""
raise NotImplementedError()
def _send_extension_notification(self, request: DataSubjectRequest, original_due: datetime):
"""Notify subject of deadline extension."""
raise NotImplementedError()
def _send_rejection_notification(self, request: DataSubjectRequest):
"""Notify subject of request rejection."""
raise NotImplementedError()
def _send_completion_notification(self, request: DataSubjectRequest, result: Dict):
"""Notify subject of request completion."""
raise NotImplementedError()
def _sanitize_for_export(self, data: Dict) -> Dict:
"""Remove sensitive internal fields before export."""
raise NotImplementedError()
def _categorize_data(self, data: Dict) -> List[str]:
"""Categorize data types in response."""
raise NotImplementedError()
def _get_processing_purposes(self, source: str) -> List[str]:
"""Get processing purposes for a data source."""
raise NotImplementedError()
def _get_retention_period(self, source: str) -> str:
"""Get retention period for a data source."""
raise NotImplementedError()
def _get_processing_info(self) -> Dict:
"""Get general processing information."""
raise NotImplementedError()
def _get_third_party_list(self) -> List[Dict]:
"""Get list of third party recipients."""
raise NotImplementedError()
def _notify_processors_of_erasure(self, source: str, subject_id: str):
"""Notify data processors of erasure."""
raise NotImplementedError()
def _convert_to_csv(self, data: Dict) -> bytes:
"""Convert data to CSV format."""
raise NotImplementedError()Consent Management
# consent_management.py
"""
Consent management implementation per ISO 27701.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
import json
class ConsentPurpose(Enum):
"""Standard consent purposes."""
MARKETING = "marketing"
ANALYTICS = "analytics"
PERSONALIZATION = "personalization"
THIRD_PARTY_SHARING = "third_party_sharing"
PROFILING = "profiling"
AUTOMATED_DECISIONS = "automated_decisions"
RESEARCH = "research"
SERVICE_IMPROVEMENT = "service_improvement"
class ConsentStatus(Enum):
"""Consent status values."""
GRANTED = "granted"
DENIED = "denied"
WITHDRAWN = "withdrawn"
EXPIRED = "expired"
@dataclass
class ConsentRecord:
"""Individual consent record."""
consent_id: str
subject_id: str
purpose: ConsentPurpose
status: ConsentStatus
granted_at: Optional[datetime]
withdrawn_at: Optional[datetime]
expires_at: Optional[datetime]
version: str
collection_point: str
proof_reference: str
@dataclass
class ConsentPreferences:
"""Complete consent preferences for a subject."""
subject_id: str
consents: Dict[ConsentPurpose, ConsentRecord]
last_updated: datetime
preference_history: List[Dict]
class ConsentManager:
"""
Manages consent lifecycle per ISO 27701 7.2.
"""
def __init__(self, storage_backend, audit_logger):
self.storage = storage_backend
self.audit = audit_logger
self.consent_versions: Dict[ConsentPurpose, str] = {}
def record_consent(
self,
subject_id: str,
purpose: ConsentPurpose,
granted: bool,
collection_point: str,
proof: str,
expiry_days: Optional[int] = None
) -> ConsentRecord:
"""
Record a consent decision.
ISO 27701 7.2.3: Consent records
"""
import secrets
consent = ConsentRecord(
consent_id=f"CON-{secrets.token_hex(8)}",
subject_id=subject_id,
purpose=purpose,
status=ConsentStatus.GRANTED if granted else ConsentStatus.DENIED,
granted_at=datetime.utcnow() if granted else None,
withdrawn_at=None,
expires_at=datetime.utcnow() + timedelta(days=expiry_days) if expiry_days else None,
version=self.consent_versions.get(purpose, "1.0"),
collection_point=collection_point,
proof_reference=proof
)
self.storage.save_consent(consent)
self.audit.log({
"event": "consent_recorded",
"subject_id": subject_id,
"purpose": purpose.value,
"status": consent.status.value,
"collection_point": collection_point
})
return consent
def withdraw_consent(
self,
subject_id: str,
purpose: ConsentPurpose,
reason: Optional[str] = None
) -> ConsentRecord:
"""
Withdraw previously granted consent.
ISO 27701 7.2.4: Consent withdrawal
"""
consent = self.storage.get_consent(subject_id, purpose)
if not consent:
raise ValueError("No consent record found")
consent.status = ConsentStatus.WITHDRAWN
consent.withdrawn_at = datetime.utcnow()
self.storage.save_consent(consent)
# Trigger downstream processing
self._handle_consent_withdrawal(subject_id, purpose)
self.audit.log({
"event": "consent_withdrawn",
"subject_id": subject_id,
"purpose": purpose.value,
"reason": reason
})
return consent
def check_consent(
self,
subject_id: str,
purpose: ConsentPurpose
) -> bool:
"""
Check if valid consent exists.
ISO 27701 7.2.5: Consent verification
"""
consent = self.storage.get_consent(subject_id, purpose)
if not consent:
return False
if consent.status != ConsentStatus.GRANTED:
return False
if consent.expires_at and consent.expires_at < datetime.utcnow():
consent.status = ConsentStatus.EXPIRED
self.storage.save_consent(consent)
return False
# Check if consent version is current
current_version = self.consent_versions.get(purpose, "1.0")
if consent.version != current_version:
self.audit.log({
"event": "consent_version_mismatch",
"subject_id": subject_id,
"purpose": purpose.value,
"consent_version": consent.version,
"current_version": current_version
})
# May need re-consent depending on changes
return self._evaluate_version_compatibility(
consent.version,
current_version,
purpose
)
return True
def get_consent_status(
self,
subject_id: str
) -> ConsentPreferences:
"""Get complete consent status for a subject."""
consents = {}
for purpose in ConsentPurpose:
consent = self.storage.get_consent(subject_id, purpose)
if consent:
consents[purpose] = consent
history = self.storage.get_consent_history(subject_id)
return ConsentPreferences(
subject_id=subject_id,
consents=consents,
last_updated=datetime.utcnow(),
preference_history=history
)
def update_consent_version(
self,
purpose: ConsentPurpose,
new_version: str,
changes_description: str,
requires_reconsent: bool = False
):
"""
Update consent version when terms change.
ISO 27701 7.2.2: Changes to consent
"""
old_version = self.consent_versions.get(purpose, "1.0")
self.consent_versions[purpose] = new_version
self.storage.save_version_change({
"purpose": purpose.value,
"old_version": old_version,
"new_version": new_version,
"changes": changes_description,
"requires_reconsent": requires_reconsent,
"effective_date": datetime.utcnow().isoformat()
})
if requires_reconsent:
# Invalidate existing consents
affected = self.storage.get_subjects_with_consent(purpose)
for subject_id in affected:
self._request_reconsent(subject_id, purpose, changes_description)
def generate_consent_receipt(
self,
consent: ConsentRecord
) -> Dict:
"""
Generate a consent receipt for the data subject.
ISO 27701 7.2.6: Consent receipts
"""
return {
"receipt_id": consent.consent_id,
"version": "1.0",
"jurisdiction": "global",
"consent_timestamp": consent.granted_at.isoformat() if consent.granted_at else None,
"collection_method": consent.collection_point,
"consent_receipt_id": consent.consent_id,
"subject": {
"subject_id": consent.subject_id
},
"data_controller": self._get_controller_info(),
"purposes": [{
"purpose": consent.purpose.value,
"purpose_category": self._get_purpose_category(consent.purpose),
"consent_type": "explicit",
"pii_category": self._get_pii_categories(consent.purpose),
"primary_purpose": True,
"termination": consent.expires_at.isoformat() if consent.expires_at else "none",
"third_party_disclosure": self._has_third_party_disclosure(consent.purpose)
}],
"sensitive": self._is_sensitive_purpose(consent.purpose),
"spi_category": [],
"proof": consent.proof_reference
}
def _handle_consent_withdrawal(
self,
subject_id: str,
purpose: ConsentPurpose
):
"""Handle downstream effects of consent withdrawal."""
# Stop processing
self.storage.set_processing_flag(subject_id, purpose, False)
# Notify affected systems
affected_systems = self.storage.get_systems_using_consent(purpose)
for system in affected_systems:
self._notify_system_of_withdrawal(system, subject_id, purpose)
def _evaluate_version_compatibility(
self,
old_version: str,
new_version: str,
purpose: ConsentPurpose
) -> bool:
"""Evaluate if old consent version is compatible with new."""
# Implementation depends on versioning strategy
# By default, require reconsent for major version changes
old_major = int(old_version.split('.')[0])
new_major = int(new_version.split('.')[0])
return old_major == new_major
def _request_reconsent(
self,
subject_id: str,
purpose: ConsentPurpose,
reason: str
):
"""Request reconsent from subject."""
self.audit.log({
"event": "reconsent_requested",
"subject_id": subject_id,
"purpose": purpose.value,
"reason": reason
})
def _get_controller_info(self) -> Dict:
"""Get data controller information."""
return {
"on_behalf": False,
"contact": "privacy@example.com",
"company": "Example Corp",
"address": "123 Privacy St",
"email": "privacy@example.com",
"phone": "+1-555-0123"
}
def _get_purpose_category(self, purpose: ConsentPurpose) -> str:
"""Get purpose category."""
categories = {
ConsentPurpose.MARKETING: "marketing",
ConsentPurpose.ANALYTICS: "core_function",
ConsentPurpose.PERSONALIZATION: "improved_service"
}
return categories.get(purpose, "other")
def _get_pii_categories(self, purpose: ConsentPurpose) -> List[str]:
"""Get PII categories for purpose."""
return ["contact", "demographic", "behavioral"]
def _has_third_party_disclosure(self, purpose: ConsentPurpose) -> bool:
"""Check if purpose involves third party disclosure."""
return purpose == ConsentPurpose.THIRD_PARTY_SHARING
def _is_sensitive_purpose(self, purpose: ConsentPurpose) -> bool:
"""Check if purpose involves sensitive data."""
return purpose in [ConsentPurpose.PROFILING, ConsentPurpose.AUTOMATED_DECISIONS]
def _notify_system_of_withdrawal(
self,
system: str,
subject_id: str,
purpose: ConsentPurpose
):
"""Notify system of consent withdrawal."""
pass
from datetime import timedeltaPrivacy Impact Assessment
# privacy_impact_assessment.py
"""
Privacy Impact Assessment (PIA) implementation per ISO 27701.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class PIAStatus(Enum):
DRAFT = "draft"
IN_REVIEW = "in_review"
APPROVED = "approved"
REQUIRES_CHANGES = "requires_changes"
REJECTED = "rejected"
@dataclass
class DataFlow:
"""Describes a data flow in the system."""
flow_id: str
source: str
destination: str
data_categories: List[str]
purpose: str
legal_basis: str
retention_period: str
safeguards: List[str]
@dataclass
class PrivacyRisk:
"""Identified privacy risk."""
risk_id: str
description: str
likelihood: RiskLevel
impact: RiskLevel
overall_risk: RiskLevel
affected_rights: List[str]
mitigations: List[str]
residual_risk: RiskLevel
@dataclass
class PrivacyImpactAssessment:
"""Complete Privacy Impact Assessment."""
pia_id: str
project_name: str
description: str
status: PIAStatus
created_at: datetime
updated_at: datetime
assessor: str
reviewer: Optional[str]
# Assessment components
data_flows: List[DataFlow] = field(default_factory=list)
risks: List[PrivacyRisk] = field(default_factory=list)
data_subjects: List[str] = field(default_factory=list)
special_categories: List[str] = field(default_factory=list)
# Compliance checks
lawfulness_assessment: Dict = field(default_factory=dict)
necessity_assessment: Dict = field(default_factory=dict)
proportionality_assessment: Dict = field(default_factory=dict)
# Results
recommendation: Optional[str] = None
conditions: List[str] = field(default_factory=list)
class PIAManager:
"""
Manages Privacy Impact Assessments per ISO 27701 7.2.5.
"""
def __init__(self, storage_backend, dpo_email: str):
self.storage = storage_backend
self.dpo_email = dpo_email
def create_pia(
self,
project_name: str,
description: str,
assessor: str
) -> PrivacyImpactAssessment:
"""Create a new PIA."""
import secrets
pia = PrivacyImpactAssessment(
pia_id=f"PIA-{secrets.token_hex(6).upper()}",
project_name=project_name,
description=description,
status=PIAStatus.DRAFT,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
assessor=assessor
)
self.storage.save_pia(pia)
return pia
def add_data_flow(
self,
pia_id: str,
source: str,
destination: str,
data_categories: List[str],
purpose: str,
legal_basis: str,
retention_period: str,
safeguards: List[str]
) -> DataFlow:
"""Add a data flow to the PIA."""
import secrets
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
flow = DataFlow(
flow_id=f"DF-{secrets.token_hex(4)}",
source=source,
destination=destination,
data_categories=data_categories,
purpose=purpose,
legal_basis=legal_basis,
retention_period=retention_period,
safeguards=safeguards
)
pia.data_flows.append(flow)
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
return flow
def identify_risk(
self,
pia_id: str,
description: str,
likelihood: RiskLevel,
impact: RiskLevel,
affected_rights: List[str],
proposed_mitigations: List[str]
) -> PrivacyRisk:
"""Identify and document a privacy risk."""
import secrets
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
overall_risk = self._calculate_overall_risk(likelihood, impact)
risk = PrivacyRisk(
risk_id=f"RISK-{secrets.token_hex(4)}",
description=description,
likelihood=likelihood,
impact=impact,
overall_risk=overall_risk,
affected_rights=affected_rights,
mitigations=proposed_mitigations,
residual_risk=self._calculate_residual_risk(overall_risk, proposed_mitigations)
)
pia.risks.append(risk)
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
return risk
def assess_lawfulness(
self,
pia_id: str,
legal_basis: str,
justification: str,
documentation: List[str]
):
"""Assess lawfulness of processing."""
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
pia.lawfulness_assessment = {
"legal_basis": legal_basis,
"justification": justification,
"documentation": documentation,
"assessed_at": datetime.utcnow().isoformat(),
"compliant": self._validate_legal_basis(legal_basis, pia)
}
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
def assess_necessity(
self,
pia_id: str,
purpose: str,
data_minimization: Dict,
alternatives_considered: List[str]
):
"""Assess necessity and data minimization."""
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
pia.necessity_assessment = {
"purpose": purpose,
"data_minimization": data_minimization,
"alternatives_considered": alternatives_considered,
"assessed_at": datetime.utcnow().isoformat(),
"compliant": self._validate_necessity(pia)
}
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
def submit_for_review(self, pia_id: str) -> bool:
"""Submit PIA for DPO review."""
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
# Validate completeness
if not self._validate_pia_completeness(pia):
return False
pia.status = PIAStatus.IN_REVIEW
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
# Notify DPO
self._notify_dpo_for_review(pia)
return True
def review_pia(
self,
pia_id: str,
reviewer: str,
approved: bool,
recommendation: str,
conditions: Optional[List[str]] = None
):
"""DPO review of PIA."""
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
pia.reviewer = reviewer
pia.recommendation = recommendation
pia.conditions = conditions or []
if approved:
pia.status = PIAStatus.APPROVED if not conditions else PIAStatus.REQUIRES_CHANGES
else:
pia.status = PIAStatus.REJECTED
pia.updated_at = datetime.utcnow()
self.storage.save_pia(pia)
# Notify assessor
self._notify_assessor_of_result(pia)
def generate_pia_report(self, pia_id: str) -> Dict:
"""Generate formal PIA report."""
pia = self.storage.get_pia(pia_id)
if not pia:
raise ValueError("PIA not found")
return {
"report_metadata": {
"pia_id": pia.pia_id,
"generated_at": datetime.utcnow().isoformat(),
"status": pia.status.value
},
"project_information": {
"name": pia.project_name,
"description": pia.description,
"assessor": pia.assessor,
"reviewer": pia.reviewer
},
"data_processing_overview": {
"data_subjects": pia.data_subjects,
"special_categories": pia.special_categories,
"data_flows": [
{
"source": f.source,
"destination": f.destination,
"categories": f.data_categories,
"purpose": f.purpose,
"legal_basis": f.legal_basis
}
for f in pia.data_flows
]
},
"compliance_assessment": {
"lawfulness": pia.lawfulness_assessment,
"necessity": pia.necessity_assessment,
"proportionality": pia.proportionality_assessment
},
"risk_assessment": {
"identified_risks": [
{
"description": r.description,
"overall_risk": r.overall_risk.value,
"mitigations": r.mitigations,
"residual_risk": r.residual_risk.value
}
for r in pia.risks
],
"high_risks": len([r for r in pia.risks if r.overall_risk in [RiskLevel.HIGH, RiskLevel.CRITICAL]])
},
"recommendation": pia.recommendation,
"conditions": pia.conditions
}
def _calculate_overall_risk(
self,
likelihood: RiskLevel,
impact: RiskLevel
) -> RiskLevel:
"""Calculate overall risk from likelihood and impact."""
risk_matrix = {
(RiskLevel.LOW, RiskLevel.LOW): RiskLevel.LOW,
(RiskLevel.LOW, RiskLevel.MEDIUM): RiskLevel.LOW,
(RiskLevel.LOW, RiskLevel.HIGH): RiskLevel.MEDIUM,
(RiskLevel.LOW, RiskLevel.CRITICAL): RiskLevel.HIGH,
(RiskLevel.MEDIUM, RiskLevel.LOW): RiskLevel.LOW,
(RiskLevel.MEDIUM, RiskLevel.MEDIUM): RiskLevel.MEDIUM,
(RiskLevel.MEDIUM, RiskLevel.HIGH): RiskLevel.HIGH,
(RiskLevel.MEDIUM, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
(RiskLevel.HIGH, RiskLevel.LOW): RiskLevel.MEDIUM,
(RiskLevel.HIGH, RiskLevel.MEDIUM): RiskLevel.HIGH,
(RiskLevel.HIGH, RiskLevel.HIGH): RiskLevel.CRITICAL,
(RiskLevel.HIGH, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
(RiskLevel.CRITICAL, RiskLevel.LOW): RiskLevel.HIGH,
(RiskLevel.CRITICAL, RiskLevel.MEDIUM): RiskLevel.CRITICAL,
(RiskLevel.CRITICAL, RiskLevel.HIGH): RiskLevel.CRITICAL,
(RiskLevel.CRITICAL, RiskLevel.CRITICAL): RiskLevel.CRITICAL,
}
return risk_matrix.get((likelihood, impact), RiskLevel.MEDIUM)
def _calculate_residual_risk(
self,
overall_risk: RiskLevel,
mitigations: List[str]
) -> RiskLevel:
"""Calculate residual risk after mitigations."""
# Simplified: each mitigation reduces risk by one level
risk_levels = [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL]
current_index = risk_levels.index(overall_risk)
reduction = min(len(mitigations), 2) # Max 2 level reduction
new_index = max(0, current_index - reduction)
return risk_levels[new_index]
def _validate_legal_basis(self, legal_basis: str, pia: PrivacyImpactAssessment) -> bool:
"""Validate the legal basis."""
valid_bases = ["consent", "contract", "legal_obligation", "vital_interests", "public_task", "legitimate_interests"]
return legal_basis in valid_bases
def _validate_necessity(self, pia: PrivacyImpactAssessment) -> bool:
"""Validate necessity assessment."""
return bool(pia.necessity_assessment.get("data_minimization"))
def _validate_pia_completeness(self, pia: PrivacyImpactAssessment) -> bool:
"""Check if PIA has all required sections."""
return all([
pia.data_flows,
pia.lawfulness_assessment,
pia.necessity_assessment,
pia.data_subjects
])
def _notify_dpo_for_review(self, pia: PrivacyImpactAssessment):
"""Notify DPO that PIA is ready for review."""
pass
def _notify_assessor_of_result(self, pia: PrivacyImpactAssessment):
"""Notify assessor of review result."""
passConclusion
ISO 27701 implementation requires comprehensive privacy controls covering:
- Data subject rights with proper request handling and verification
- Consent management with granular tracking and withdrawal support
- Privacy impact assessments for new processing activities
- Integration with ISO 27001 for unified security and privacy management
These controls enable organizations to demonstrate privacy compliance and build trust with data subjects.