Compliance

Ghid Tehnic de Conformitate HIPAA pentru Aplicatii in Sanatate

Nicu Constantin
--17 min lectura
#hipaa#compliance#healthcare#phi-protection#security

Conformitatea HIPAA necesita masuri de protectie tehnice cuprinzatoare pentru a proteja Protected Health Information (PHI). Acest ghid ofera implementari practice pentru cerintele Security Rule in aplicatiile din domeniul sanatatii.

Sistem de Clasificare a Datelor PHI

Implementeaza identificarea si clasificarea sistematica a PHI:

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional, List, Dict, Set
import re
import hashlib
 
class PHIType(Enum):
    """18 HIPAA PHI identifiers."""
    NAME = "name"
    ADDRESS = "address"
    DATES = "dates"  # Except year
    PHONE = "phone"
    FAX = "fax"
    EMAIL = "email"
    SSN = "ssn"
    MRN = "medical_record_number"
    HEALTH_PLAN_ID = "health_plan_id"
    ACCOUNT_NUMBER = "account_number"
    CERTIFICATE_LICENSE = "certificate_license"
    VEHICLE_ID = "vehicle_id"
    DEVICE_ID = "device_id"
    URL = "url"
    IP_ADDRESS = "ip_address"
    BIOMETRIC = "biometric"
    PHOTO = "photo"
    OTHER_UNIQUE = "other_unique_identifier"
 
class SensitivityLevel(Enum):
    HIGH = "high"      # Direct identifiers
    MEDIUM = "medium"  # Quasi-identifiers
    LOW = "low"        # Non-identifying health data
 
@dataclass
class PHIElement:
    phi_type: PHIType
    value: str
    sensitivity: SensitivityLevel
    location: str  # Where in the data
    context: Optional[str] = None
 
@dataclass
class DataClassificationResult:
    contains_phi: bool
    phi_elements: List[PHIElement]
    sensitivity_level: SensitivityLevel
    recommended_actions: List[str]
    timestamp: datetime = field(default_factory=datetime.utcnow)
 
class PHIClassifier:
    def __init__(self):
        self.patterns = self._compile_patterns()
        self.high_sensitivity_types = {
            PHIType.SSN, PHIType.MRN, PHIType.HEALTH_PLAN_ID,
            PHIType.BIOMETRIC, PHIType.PHOTO
        }
        self.medium_sensitivity_types = {
            PHIType.NAME, PHIType.ADDRESS, PHIType.PHONE,
            PHIType.EMAIL, PHIType.DATES
        }
 
    def _compile_patterns(self) -> Dict[PHIType, List[re.Pattern]]:
        """Compile regex patterns for PHI detection."""
        return {
            PHIType.SSN: [
                re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
                re.compile(r'\b\d{9}\b')  # No dashes
            ],
            PHIType.PHONE: [
                re.compile(r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b'),
                re.compile(r'\(\d{3}\)\s*\d{3}[-.\s]?\d{4}')
            ],
            PHIType.EMAIL: [
                re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
            ],
            PHIType.MRN: [
                re.compile(r'\bMRN[:\s#]*\d{6,10}\b', re.IGNORECASE),
                re.compile(r'\bMedical Record[:\s#]*\d{6,10}\b', re.IGNORECASE)
            ],
            PHIType.DATES: [
                re.compile(r'\b\d{1,2}/\d{1,2}/\d{2,4}\b'),
                re.compile(r'\b\d{4}-\d{2}-\d{2}\b'),
                re.compile(r'\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{4}\b', re.IGNORECASE)
            ],
            PHIType.IP_ADDRESS: [
                re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')
            ],
            PHIType.HEALTH_PLAN_ID: [
                re.compile(r'\b[A-Z]{3}\d{9}\b'),  # Common format
                re.compile(r'\bMember\s*ID[:\s#]*[A-Z0-9]{8,12}\b', re.IGNORECASE)
            ],
            PHIType.ADDRESS: [
                re.compile(r'\b\d+\s+[A-Za-z]+\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr)\b', re.IGNORECASE)
            ]
        }
 
    def classify(self, data: str, context: str = "unknown") -> DataClassificationResult:
        """Clasifica datele pentru continut PHI."""
        phi_elements = []
 
        # Detectare bazata pe pattern-uri
        for phi_type, patterns in self.patterns.items():
            for pattern in patterns:
                for match in pattern.finditer(data):
                    sensitivity = self._get_sensitivity(phi_type)
                    phi_elements.append(PHIElement(
                        phi_type=phi_type,
                        value=match.group(),
                        sensitivity=sensitivity,
                        location=f"position {match.start()}-{match.end()}",
                        context=context
                    ))
 
        # Determina sensibilitatea generala
        if not phi_elements:
            overall_sensitivity = SensitivityLevel.LOW
        elif any(e.sensitivity == SensitivityLevel.HIGH for e in phi_elements):
            overall_sensitivity = SensitivityLevel.HIGH
        elif any(e.sensitivity == SensitivityLevel.MEDIUM for e in phi_elements):
            overall_sensitivity = SensitivityLevel.MEDIUM
        else:
            overall_sensitivity = SensitivityLevel.LOW
 
        # Genereaza recomandari
        recommendations = self._generate_recommendations(phi_elements, overall_sensitivity)
 
        return DataClassificationResult(
            contains_phi=len(phi_elements) > 0,
            phi_elements=phi_elements,
            sensitivity_level=overall_sensitivity,
            recommended_actions=recommendations
        )
 
    def _get_sensitivity(self, phi_type: PHIType) -> SensitivityLevel:
        """Determina nivelul de sensibilitate pentru tipul PHI."""
        if phi_type in self.high_sensitivity_types:
            return SensitivityLevel.HIGH
        elif phi_type in self.medium_sensitivity_types:
            return SensitivityLevel.MEDIUM
        return SensitivityLevel.LOW
 
    def _generate_recommendations(
        self,
        elements: List[PHIElement],
        sensitivity: SensitivityLevel
    ) -> List[str]:
        """Genereaza recomandari de gestionare."""
        recommendations = []
 
        if sensitivity == SensitivityLevel.HIGH:
            recommendations.extend([
                "Cripteaza datele in repaus si in tranzit",
                "Implementeaza controale stricte de acces",
                "Activeaza audit logging cuprinzator",
                "Ia in considerare de-identificarea inainte de stocare"
            ])
 
        if any(e.phi_type == PHIType.SSN for e in elements):
            recommendations.append("SSN detectat - stocheaza doar ultimele 4 cifre daca este posibil")
 
        if any(e.phi_type == PHIType.DATES for e in elements):
            recommendations.append("Ia in considerare generalizarea datelor doar la an pentru de-identificare")
 
        if any(e.phi_type == PHIType.ADDRESS for e in elements):
            recommendations.append("Ia in considerare folosirea doar a codului postal (primele 3 cifre pentru populatii sub 20.000)")
 
        return recommendations
 
    def de_identify(self, data: str, method: str = "safe_harbor") -> Dict:
        """De-identifica datele folosind metoda specificata."""
        if method == "safe_harbor":
            return self._safe_harbor_deidentification(data)
        elif method == "expert_determination":
            return self._expert_determination_placeholder(data)
        else:
            raise ValueError(f"Unknown de-identification method: {method}")
 
    def _safe_harbor_deidentification(self, data: str) -> Dict:
        """Aplica de-identificarea Safe Harbor (elimina toti cei 18 identificatori)."""
        result = data
        removed_elements = []
 
        for phi_type, patterns in self.patterns.items():
            for pattern in patterns:
                matches = pattern.findall(result)
                for match in matches:
                    removed_elements.append({
                        'type': phi_type.value,
                        'original': match,
                        'replacement': f"[{phi_type.value.upper()}_REMOVED]"
                    })
                result = pattern.sub(f"[{phi_type.value.upper()}_REMOVED]", result)
 
        return {
            'original_length': len(data),
            'deidentified_data': result,
            'deidentified_length': len(result),
            'elements_removed': len(removed_elements),
            'removal_details': removed_elements,
            'method': 'safe_harbor',
            'hipaa_compliant': True
        }
 
    def _expert_determination_placeholder(self, data: str) -> Dict:
        """Placeholder pentru metoda expert determination."""
        # In productie, aceasta ar implica analiza statistica
        # pentru a determina riscul de re-identificare
        return {
            'message': 'Expert determination necesita analiza statistica',
            'recommendation': 'Consulta un expert statistic calificat'
        }

Implementarea Controlului Accesului

Implementeaza controale de acces conforme HIPAA:

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Set, Dict, List
import hashlib
import jwt
 
class AccessLevel(Enum):
    NONE = 0
    READ = 1
    WRITE = 2
    ADMIN = 3
 
class UserRole(Enum):
    PATIENT = "patient"
    NURSE = "nurse"
    PHYSICIAN = "physician"
    ADMIN = "admin"
    BILLING = "billing"
    IT_SUPPORT = "it_support"
    AUDITOR = "auditor"
 
@dataclass
class User:
    user_id: str
    username: str
    role: UserRole
    department: str
    active: bool
    mfa_enabled: bool
    last_login: Optional[datetime]
    permissions: Set[str]
    access_restrictions: Dict[str, any] = None
 
@dataclass
class AccessDecision:
    allowed: bool
    user_id: str
    resource: str
    action: str
    timestamp: datetime
    reason: str
    requires_break_glass: bool = False
    audit_id: str = ""
 
class HIPAAAccessControl:
    def __init__(self):
        self.users: Dict[str, User] = {}
        self.role_permissions = self._define_role_permissions()
        self.break_glass_log: List[Dict] = []
        self.session_timeout_minutes = 15  # HIPAA requirement
 
    def _define_role_permissions(self) -> Dict[UserRole, Dict]:
        """Defineste permisiunile bazate pe roluri dupa principiul necesarului minim."""
        return {
            UserRole.PATIENT: {
                'permissions': {'read_own_records', 'request_amendment'},
                'phi_access': {'own_only': True},
                'restrictions': {'department': None}
            },
            UserRole.NURSE: {
                'permissions': {
                    'read_patient_records', 'write_vitals', 'write_notes',
                    'view_medications', 'administer_medications'
                },
                'phi_access': {'assigned_patients': True, 'department_patients': True},
                'restrictions': {'department': 'assigned'}
            },
            UserRole.PHYSICIAN: {
                'permissions': {
                    'read_patient_records', 'write_diagnosis', 'write_orders',
                    'prescribe_medications', 'view_all_records', 'write_notes'
                },
                'phi_access': {'all_patients': True},
                'restrictions': None
            },
            UserRole.BILLING: {
                'permissions': {
                    'read_billing_info', 'write_billing', 'read_insurance',
                    'read_demographics'
                },
                'phi_access': {'billing_relevant': True},
                'restrictions': {'exclude_clinical_notes': True}
            },
            UserRole.ADMIN: {
                'permissions': {'all'},
                'phi_access': {'all': True},
                'restrictions': None
            },
            UserRole.IT_SUPPORT: {
                'permissions': {
                    'system_administration', 'user_management',
                    'audit_log_access'
                },
                'phi_access': {'none': True},  # IT nu ar trebui sa acceseze PHI
                'restrictions': {'no_phi_access': True}
            },
            UserRole.AUDITOR: {
                'permissions': {
                    'audit_log_access', 'compliance_reports',
                    'access_reviews'
                },
                'phi_access': {'audit_only': True},
                'restrictions': {'read_only': True}
            }
        }
 
    def check_access(
        self,
        user_id: str,
        resource: str,
        action: str,
        patient_id: Optional[str] = None,
        context: Dict = None
    ) -> AccessDecision:
        """Verifica daca utilizatorul are acces la resursa."""
        audit_id = hashlib.md5(
            f"{user_id}{resource}{action}{datetime.utcnow().isoformat()}".encode()
        ).hexdigest()[:12]
 
        if user_id not in self.users:
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason="Utilizator negasit",
                audit_id=audit_id
            )
 
        user = self.users[user_id]
 
        # Verifica daca utilizatorul este activ
        if not user.active:
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason="Contul utilizatorului este inactiv",
                audit_id=audit_id
            )
 
        # Verifica cerinta MFA pentru acces PHI
        if self._requires_phi_access(resource) and not user.mfa_enabled:
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason="MFA necesar pentru acces PHI",
                audit_id=audit_id
            )
 
        # Obtine permisiunile rolului
        role_config = self.role_permissions.get(user.role)
        if not role_config:
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason="Rol neconfigurat",
                audit_id=audit_id
            )
 
        # Verifica permisiunea
        required_permission = self._get_required_permission(resource, action)
        has_permission = (
            'all' in role_config['permissions'] or
            required_permission in role_config['permissions'] or
            required_permission in user.permissions
        )
 
        if not has_permission:
            # Verifica acces break-glass
            if self._is_emergency_context(context):
                return AccessDecision(
                    allowed=True,
                    user_id=user_id,
                    resource=resource,
                    action=action,
                    timestamp=datetime.utcnow(),
                    reason="Acces break-glass de urgenta",
                    requires_break_glass=True,
                    audit_id=audit_id
                )
 
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason=f"Permisiune lipsa: {required_permission}",
                audit_id=audit_id
            )
 
        # Verifica restrictii specifice pacientului
        if patient_id and not self._check_patient_access(user, patient_id, role_config):
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason="Neautorizat pentru inregistrarile acestui pacient",
                audit_id=audit_id
            )
 
        return AccessDecision(
            allowed=True,
            user_id=user_id,
            resource=resource,
            action=action,
            timestamp=datetime.utcnow(),
            reason="Acces acordat",
            audit_id=audit_id
        )
 
    def _requires_phi_access(self, resource: str) -> bool:
        """Determina daca resursa contine PHI."""
        phi_resources = [
            'patient_records', 'medical_history', 'lab_results',
            'prescriptions', 'clinical_notes', 'imaging',
            'demographics', 'insurance_info'
        ]
        return any(phi in resource for phi in phi_resources)
 
    def _get_required_permission(self, resource: str, action: str) -> str:
        """Mapeaza resursa/actiunea la permisiunea necesara."""
        # Mapare simplificata - extinde in functie de sistemul tau
        return f"{action}_{resource}"
 
    def _is_emergency_context(self, context: Optional[Dict]) -> bool:
        """Verifica daca aceasta este o cerere de acces de urgenta."""
        if not context:
            return False
        return context.get('emergency', False) and context.get('justification')
 
    def _check_patient_access(
        self,
        user: User,
        patient_id: str,
        role_config: Dict
    ) -> bool:
        """Verifica daca utilizatorul poate accesa inregistrarile unui pacient specific."""
        phi_access = role_config.get('phi_access', {})
 
        if phi_access.get('all_patients') or phi_access.get('all'):
            return True
 
        if phi_access.get('own_only') and user.user_id == patient_id:
            return True
 
        if phi_access.get('assigned_patients'):
            # Verifica asignarea - ar interoga tabelul de asignari in productie
            return self._is_assigned_to_patient(user.user_id, patient_id)
 
        if phi_access.get('department_patients'):
            # Verifica daca pacientul este in departamentul utilizatorului
            return self._is_patient_in_department(patient_id, user.department)
 
        return False
 
    def _is_assigned_to_patient(self, user_id: str, patient_id: str) -> bool:
        """Verifica daca utilizatorul este asignat pacientului - placeholder."""
        # In productie, interogheaza baza de date de asignari
        return False
 
    def _is_patient_in_department(self, patient_id: str, department: str) -> bool:
        """Verifica daca pacientul este in departament - placeholder."""
        # In productie, interogheaza locatia/departamentul pacientului
        return False
 
    def log_break_glass(
        self,
        user_id: str,
        patient_id: str,
        justification: str,
        access_decision: AccessDecision
    ):
        """Inregistreaza accesul break-glass pentru revizuire."""
        self.break_glass_log.append({
            'timestamp': datetime.utcnow().isoformat(),
            'user_id': user_id,
            'patient_id': patient_id,
            'justification': justification,
            'audit_id': access_decision.audit_id,
            'reviewed': False,
            'review_result': None
        })

Audit Logging HIPAA

Implementeaza audit logging cuprinzator:

from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
import json
import hashlib
import uuid
 
class AuditEventType(Enum):
    # Evenimente de acces
    LOGIN_SUCCESS = "login_success"
    LOGIN_FAILURE = "login_failure"
    LOGOUT = "logout"
    SESSION_TIMEOUT = "session_timeout"
 
    # Evenimente acces PHI
    PHI_VIEW = "phi_view"
    PHI_CREATE = "phi_create"
    PHI_UPDATE = "phi_update"
    PHI_DELETE = "phi_delete"
    PHI_EXPORT = "phi_export"
    PHI_PRINT = "phi_print"
 
    # Evenimente administrative
    USER_CREATE = "user_create"
    USER_MODIFY = "user_modify"
    USER_DELETE = "user_delete"
    PERMISSION_CHANGE = "permission_change"
 
    # Evenimente de securitate
    MFA_ENABLED = "mfa_enabled"
    MFA_DISABLED = "mfa_disabled"
    PASSWORD_CHANGE = "password_change"
    BREAK_GLASS_ACCESS = "break_glass_access"
 
    # Evenimente de sistem
    SYSTEM_START = "system_start"
    SYSTEM_STOP = "system_stop"
    CONFIG_CHANGE = "config_change"
    BACKUP_CREATED = "backup_created"
 
@dataclass
class AuditEvent:
    event_id: str
    timestamp: datetime
    event_type: AuditEventType
    user_id: str
    patient_id: Optional[str]
    resource: str
    action: str
    outcome: str  # success, failure, error
    source_ip: str
    user_agent: Optional[str]
    details: Dict[str, Any]
    phi_accessed: bool
    integrity_hash: str = ""
 
    def __post_init__(self):
        if not self.integrity_hash:
            self.integrity_hash = self._compute_hash()
 
    def _compute_hash(self) -> str:
        """Calculeaza hash de integritate pentru detectarea falsificarii."""
        data = f"{self.event_id}{self.timestamp.isoformat()}{self.event_type.value}"
        data += f"{self.user_id}{self.patient_id}{self.resource}{self.action}{self.outcome}"
        return hashlib.sha256(data.encode()).hexdigest()
 
class HIPAAAuditLogger:
    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.retention_years = 6  # HIPAA minimum
        self.previous_hash: Optional[str] = None
 
    def log_event(
        self,
        event_type: AuditEventType,
        user_id: str,
        resource: str,
        action: str,
        outcome: str,
        source_ip: str,
        patient_id: Optional[str] = None,
        user_agent: Optional[str] = None,
        details: Dict[str, Any] = None,
        phi_accessed: bool = False
    ) -> AuditEvent:
        """Inregistreaza un eveniment de audit."""
        event = AuditEvent(
            event_id=str(uuid.uuid4()),
            timestamp=datetime.utcnow(),
            event_type=event_type,
            user_id=user_id,
            patient_id=patient_id,
            resource=resource,
            action=action,
            outcome=outcome,
            source_ip=source_ip,
            user_agent=user_agent,
            details=details or {},
            phi_accessed=phi_accessed
        )
 
        # Adauga hash de lant pentru dovada anti-falsificare
        if self.previous_hash:
            event.details['previous_hash'] = self.previous_hash
        self.previous_hash = event.integrity_hash
 
        # Stocheaza evenimentul
        self.storage.store(event)
 
        return event
 
    def log_phi_access(
        self,
        user_id: str,
        patient_id: str,
        record_type: str,
        action: str,
        source_ip: str,
        fields_accessed: List[str] = None,
        purpose: str = None
    ) -> AuditEvent:
        """Inregistreaza accesul PHI cu urmarire detaliata."""
        event_type_map = {
            'view': AuditEventType.PHI_VIEW,
            'create': AuditEventType.PHI_CREATE,
            'update': AuditEventType.PHI_UPDATE,
            'delete': AuditEventType.PHI_DELETE,
            'export': AuditEventType.PHI_EXPORT,
            'print': AuditEventType.PHI_PRINT
        }
 
        return self.log_event(
            event_type=event_type_map.get(action, AuditEventType.PHI_VIEW),
            user_id=user_id,
            patient_id=patient_id,
            resource=record_type,
            action=action,
            outcome='success',
            source_ip=source_ip,
            phi_accessed=True,
            details={
                'fields_accessed': fields_accessed or [],
                'purpose': purpose,
                'record_type': record_type
            }
        )
 
    def generate_access_report(
        self,
        patient_id: str,
        start_date: datetime,
        end_date: datetime
    ) -> Dict:
        """Genereaza raport de acces pentru pacient (necesar pentru contabilizarea dezvaluirilor HIPAA)."""
        events = self.storage.query(
            patient_id=patient_id,
            start_date=start_date,
            end_date=end_date,
            phi_accessed=True
        )
 
        report = {
            'patient_id': patient_id,
            'report_period': {
                'start': start_date.isoformat(),
                'end': end_date.isoformat()
            },
            'generated_at': datetime.utcnow().isoformat(),
            'total_access_events': len(events),
            'access_by_user': {},
            'access_by_type': {},
            'access_timeline': []
        }
 
        for event in events:
            # Dupa utilizator
            if event.user_id not in report['access_by_user']:
                report['access_by_user'][event.user_id] = {
                    'count': 0,
                    'actions': []
                }
            report['access_by_user'][event.user_id]['count'] += 1
            report['access_by_user'][event.user_id]['actions'].append(event.action)
 
            # Dupa tip
            record_type = event.details.get('record_type', 'unknown')
            if record_type not in report['access_by_type']:
                report['access_by_type'][record_type] = 0
            report['access_by_type'][record_type] += 1
 
            # Cronologie
            report['access_timeline'].append({
                'timestamp': event.timestamp.isoformat(),
                'user_id': event.user_id,
                'action': event.action,
                'record_type': record_type
            })
 
        return report
 
    def verify_log_integrity(self, events: List[AuditEvent]) -> Dict:
        """Verifica integritatea lantului de audit log."""
        results = {
            'verified': True,
            'total_events': len(events),
            'integrity_failures': []
        }
 
        for i, event in enumerate(events):
            # Verifica hash-ul individual al evenimentului
            expected_hash = event._compute_hash()
            if expected_hash != event.integrity_hash:
                results['verified'] = False
                results['integrity_failures'].append({
                    'event_id': event.event_id,
                    'issue': 'Nepotrivire hash - evenimentul poate fi fost falsificat'
                })
 
            # Verifica lantul
            if i > 0:
                previous_hash = event.details.get('previous_hash')
                if previous_hash and previous_hash != events[i-1].integrity_hash:
                    results['verified'] = False
                    results['integrity_failures'].append({
                        'event_id': event.event_id,
                        'issue': 'Lant intrerupt - evenimentele pot lipsi sau fi reordonate'
                    })
 
        return results

Implementarea Criptarii

Implementeaza criptare conforma HIPAA:

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
import os
import base64
from dataclasses import dataclass
from typing import Optional, Dict
import json
 
@dataclass
class EncryptedData:
    ciphertext: bytes
    nonce: bytes
    key_id: str
    algorithm: str
    version: int
 
class HIPAAEncryption:
    """Criptare conforma HIPAA pentru PHI."""
 
    def __init__(self, key_manager):
        self.key_manager = key_manager
        self.algorithm = 'AES-256-GCM'  # Recomandat NIST
        self.current_key_version = 1
 
    def encrypt_phi(
        self,
        data: str,
        patient_id: str,
        data_type: str
    ) -> EncryptedData:
        """Cripteaza date PHI."""
        # Obtine cheia de criptare
        key, key_id = self.key_manager.get_current_key()
 
        # Genereaza nonce unic
        nonce = os.urandom(12)
 
        # Creaza date autentificate aditionale (AAD)
        aad = json.dumps({
            'patient_id': patient_id,
            'data_type': data_type,
            'key_version': self.current_key_version
        }).encode()
 
        # Cripteaza
        aesgcm = AESGCM(key)
        ciphertext = aesgcm.encrypt(nonce, data.encode(), aad)
 
        return EncryptedData(
            ciphertext=ciphertext,
            nonce=nonce,
            key_id=key_id,
            algorithm=self.algorithm,
            version=self.current_key_version
        )
 
    def decrypt_phi(
        self,
        encrypted_data: EncryptedData,
        patient_id: str,
        data_type: str
    ) -> str:
        """Decripteaza date PHI."""
        # Obtine cheia de decriptare
        key = self.key_manager.get_key(
            encrypted_data.key_id,
            encrypted_data.version
        )
 
        # Recreeaza AAD
        aad = json.dumps({
            'patient_id': patient_id,
            'data_type': data_type,
            'key_version': encrypted_data.version
        }).encode()
 
        # Decripteaza
        aesgcm = AESGCM(key)
        plaintext = aesgcm.decrypt(
            encrypted_data.nonce,
            encrypted_data.ciphertext,
            aad
        )
 
        return plaintext.decode()
 
    def encrypt_field(self, value: str, field_name: str) -> str:
        """Cripteaza un camp individual pentru stocare in baza de date."""
        key, key_id = self.key_manager.get_current_key()
        nonce = os.urandom(12)
 
        aesgcm = AESGCM(key)
        ciphertext = aesgcm.encrypt(nonce, value.encode(), field_name.encode())
 
        # Codifica pentru stocare
        return base64.b64encode(json.dumps({
            'c': base64.b64encode(ciphertext).decode(),
            'n': base64.b64encode(nonce).decode(),
            'k': key_id,
            'v': self.current_key_version
        }).encode()).decode()
 
    def decrypt_field(self, encrypted_value: str, field_name: str) -> str:
        """Decripteaza un camp individual din baza de date."""
        data = json.loads(base64.b64decode(encrypted_value))
 
        key = self.key_manager.get_key(data['k'], data['v'])
        nonce = base64.b64decode(data['n'])
        ciphertext = base64.b64decode(data['c'])
 
        aesgcm = AESGCM(key)
        plaintext = aesgcm.decrypt(nonce, ciphertext, field_name.encode())
 
        return plaintext.decode()
 
class KeyManager:
    """Gestioneaza cheile de criptare pentru conformitatea HIPAA."""
 
    def __init__(self, master_key: bytes):
        self.master_key = master_key
        self.keys: Dict[str, Dict] = {}
        self.current_key_id: Optional[str] = None
        self._initialize_key()
 
    def _initialize_key(self):
        """Initializeaza sau roteste cheia de criptare."""
        key_id = self._generate_key_id()
        key = self._derive_key(key_id, 1)
 
        self.keys[key_id] = {
            1: key,
            'created_at': datetime.utcnow(),
            'rotated_at': None
        }
        self.current_key_id = key_id
 
    def _generate_key_id(self) -> str:
        """Genereaza identificator unic pentru cheie."""
        return hashlib.sha256(os.urandom(32)).hexdigest()[:16]
 
    def _derive_key(self, key_id: str, version: int) -> bytes:
        """Deriveaza cheia de criptare din cheia master."""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=f"{key_id}:{version}".encode(),
            iterations=100000,
            backend=default_backend()
        )
        return kdf.derive(self.master_key)
 
    def get_current_key(self) -> tuple[bytes, str]:
        """Obtine cheia curenta de criptare."""
        key_data = self.keys[self.current_key_id]
        current_version = max(key_data.keys() - {'created_at', 'rotated_at'})
        return key_data[current_version], self.current_key_id
 
    def get_key(self, key_id: str, version: int) -> bytes:
        """Obtine o versiune specifica a cheii pentru decriptare."""
        if key_id not in self.keys:
            raise KeyError(f"Key not found: {key_id}")
 
        if version not in self.keys[key_id]:
            raise KeyError(f"Key version not found: {version}")
 
        return self.keys[key_id][version]
 
    def rotate_key(self):
        """Roteste cheia de criptare."""
        key_data = self.keys[self.current_key_id]
        current_version = max(key_data.keys() - {'created_at', 'rotated_at'})
        new_version = current_version + 1
 
        new_key = self._derive_key(self.current_key_id, new_version)
        self.keys[self.current_key_id][new_version] = new_key
        self.keys[self.current_key_id]['rotated_at'] = datetime.utcnow()
 
        return new_version

Sistem de Notificare a Breselor

Implementeaza detectarea si notificarea breselor:

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Optional, Dict
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
 
class BreachSeverity(Enum):
    LOW = "low"           # < 500 persoane
    MEDIUM = "medium"     # 500+ persoane
    HIGH = "high"         # 500+ cu date sensibile
 
@dataclass
class BreachIncident:
    incident_id: str
    discovered_date: datetime
    breach_date: Optional[datetime]
    description: str
    affected_individuals: int
    phi_types_affected: List[str]
    severity: BreachSeverity
    status: str  # investigating, confirmed, contained, closed
    risk_assessment: Dict
    notifications_sent: List[Dict]
 
class HIPAABreachNotification:
    def __init__(self, config: Dict):
        self.config = config
        self.incidents: List[BreachIncident] = []
        self.notification_deadline_days = 60  # Cerinta HIPAA
 
    def report_incident(
        self,
        description: str,
        affected_count: int,
        phi_types: List[str],
        breach_date: Optional[datetime] = None
    ) -> BreachIncident:
        """Raporteaza un potential incident de bresa."""
        incident = BreachIncident(
            incident_id=self._generate_incident_id(),
            discovered_date=datetime.utcnow(),
            breach_date=breach_date,
            description=description,
            affected_individuals=affected_count,
            phi_types_affected=phi_types,
            severity=self._assess_severity(affected_count, phi_types),
            status='investigating',
            risk_assessment={},
            notifications_sent=[]
        )
 
        self.incidents.append(incident)
        return incident
 
    def _generate_incident_id(self) -> str:
        """Genereaza ID unic pentru incident."""
        return f"BREACH-{datetime.utcnow().strftime('%Y%m%d')}-{len(self.incidents)+1:04d}"
 
    def _assess_severity(
        self,
        affected_count: int,
        phi_types: List[str]
    ) -> BreachSeverity:
        """Evalueaza severitatea bresei."""
        sensitive_types = {'ssn', 'financial', 'mental_health', 'substance_abuse', 'hiv_status'}
 
        has_sensitive = any(t in sensitive_types for t in phi_types)
 
        if affected_count >= 500 and has_sensitive:
            return BreachSeverity.HIGH
        elif affected_count >= 500:
            return BreachSeverity.MEDIUM
        else:
            return BreachSeverity.LOW
 
    def perform_risk_assessment(self, incident_id: str) -> Dict:
        """Efectueaza evaluarea de risc cu 4 factori conform ghidajului HIPAA."""
        incident = self._get_incident(incident_id)
 
        # Factorul 1: Natura si amploarea PHI
        phi_sensitivity_score = self._score_phi_sensitivity(incident.phi_types_affected)
 
        # Factorul 2: Persoana neautorizata care a folosit/accesat PHI
        # Acest lucru se determina in timpul investigatiei
        unauthorized_access_risk = 0.5  # Placeholder
 
        # Factorul 3: Daca PHI a fost efectiv achizitionat sau vizualizat
        actual_acquisition_risk = 0.5  # Placeholder
 
        # Factorul 4: Masura in care riscul a fost atenuat
        mitigation_score = 0.5  # Placeholder
 
        overall_risk = (
            phi_sensitivity_score * 0.3 +
            unauthorized_access_risk * 0.3 +
            actual_acquisition_risk * 0.25 +
            (1 - mitigation_score) * 0.15
        )
 
        assessment = {
            'phi_sensitivity': phi_sensitivity_score,
            'unauthorized_access_risk': unauthorized_access_risk,
            'actual_acquisition_risk': actual_acquisition_risk,
            'mitigation_effectiveness': mitigation_score,
            'overall_risk_score': overall_risk,
            'breach_confirmed': overall_risk > 0.5,
            'notification_required': overall_risk > 0.5,
            'assessed_at': datetime.utcnow().isoformat()
        }
 
        incident.risk_assessment = assessment
        return assessment
 
    def _score_phi_sensitivity(self, phi_types: List[str]) -> float:
        """Scor sensibilitate PHI."""
        sensitivity_weights = {
            'ssn': 1.0,
            'financial': 0.9,
            'mental_health': 0.95,
            'substance_abuse': 0.95,
            'hiv_status': 0.95,
            'diagnosis': 0.7,
            'medications': 0.6,
            'demographics': 0.4,
            'contact_info': 0.3
        }
 
        if not phi_types:
            return 0.3
 
        scores = [sensitivity_weights.get(t, 0.5) for t in phi_types]
        return max(scores)
 
    def send_notifications(self, incident_id: str) -> Dict:
        """Trimite notificarile necesare pentru bresa."""
        incident = self._get_incident(incident_id)
 
        if not incident.risk_assessment.get('notification_required'):
            return {'status': 'not_required', 'reason': 'Evaluarea de risc sub prag'}
 
        notifications = {
            'individuals': None,
            'hhs': None,
            'media': None
        }
 
        # Notificari individuale (obligatorii in 60 de zile)
        if incident.affected_individuals > 0:
            notifications['individuals'] = self._notify_individuals(incident)
 
        # Notificare HHS
        if incident.affected_individuals >= 500:
            # Notifica HHS in 60 de zile
            notifications['hhs'] = self._notify_hhs(incident)
 
            # Notificarea media este obligatorie pentru 500+
            notifications['media'] = self._notify_media(incident)
        else:
            # Inregistreaza pentru raportul anual catre HHS (brese < 500)
            notifications['hhs'] = {'status': 'logged_for_annual_report'}
 
        incident.notifications_sent.append({
            'timestamp': datetime.utcnow().isoformat(),
            'notifications': notifications
        })
 
        return notifications
 
    def _notify_individuals(self, incident: BreachIncident) -> Dict:
        """Trimite notificari individuale de bresa."""
        notification_content = f"""
NOTIFICARE DE BRESA DE DATE
 
Stimate Pacient,
 
Va scriem pentru a va informa despre un incident de securitate care ar fi putut afecta informatiile dvs. personale de sanatate.
 
CE S-A INTAMPLAT:
{incident.description}
 
DATA BRESEI:
{incident.breach_date or 'In curs de investigare'}
 
INFORMATII IMPLICATE:
Urmatoarele tipuri de informatii ar fi putut fi afectate:
{', '.join(incident.phi_types_affected)}
 
CE FACEM:
Am luat masuri imediate pentru a limita acest incident si lucram cu experti in securitate pentru a investiga si preveni incidentele viitoare.
 
CE PUTETI FACE:
- Monitorizati rapoartele de credit si extrasele financiare
- Luati in considerare plasarea unei alerte de frauda pe dosarul dvs. de credit
- Verificati orice declaratii de beneficii de la asiguratorul dvs. de sanatate
 
INFORMATII DE CONTACT:
Daca aveti intrebari, contactati Ofiterul nostru de Confidentialitate la {self.config['privacy_officer_contact']}.
 
Ne cerem sincer scuze pentru acest incident si orice ingrijorare pe care o poate cauza.
 
Cu stima,
{self.config['organization_name']}
"""
 
        # In productie, trimite notificari reale
        return {
            'status': 'sent',
            'method': 'first_class_mail',  # Cerinta HIPAA
            'count': incident.affected_individuals,
            'deadline': (incident.discovered_date + timedelta(days=60)).isoformat()
        }
 
    def _notify_hhs(self, incident: BreachIncident) -> Dict:
        """Trimite notificare de bresa catre HHS."""
        # In productie, trimite catre portalul HHS de brese
        return {
            'status': 'submitted',
            'submission_date': datetime.utcnow().isoformat(),
            'portal': 'https://ocrportal.hhs.gov/ocr/breach/wizard_breach.jsf'
        }
 
    def _notify_media(self, incident: BreachIncident) -> Dict:
        """Notifica institutii media importante pentru brese mari."""
        return {
            'status': 'distributed',
            'press_release_date': datetime.utcnow().isoformat(),
            'outlets_contacted': ['local_news', 'healthcare_publications']
        }
 
    def _get_incident(self, incident_id: str) -> BreachIncident:
        """Obtine incidentul dupa ID."""
        for incident in self.incidents:
            if incident.incident_id == incident_id:
                return incident
        raise ValueError(f"Incident not found: {incident_id}")
 
    def get_notification_status(self) -> Dict:
        """Obtine statusul tuturor notificarilor in asteptare."""
        pending = []
        deadline_approaching = []
 
        for incident in self.incidents:
            if incident.status not in ['closed']:
                deadline = incident.discovered_date + timedelta(days=self.notification_deadline_days)
                days_remaining = (deadline - datetime.utcnow()).days
 
                if not incident.notifications_sent:
                    pending.append({
                        'incident_id': incident.incident_id,
                        'deadline': deadline.isoformat(),
                        'days_remaining': days_remaining
                    })
 
                    if days_remaining <= 14:
                        deadline_approaching.append(incident.incident_id)
 
        return {
            'pending_notifications': pending,
            'deadline_approaching': deadline_approaching,
            'total_incidents': len(self.incidents)
        }

Concluzie

Conformitatea HIPAA necesita o abordare cuprinzatoare care combina clasificarea PHI, controlul accesului, audit logging, criptare si capabilitati de notificare a breselor. Implementeaza aceste masuri de protectie tehnice ca parte a programului general de securitate si asigura-te ca evaluarile de risc regulate si instruirea personalului completeaza controalele tehnice. Retine ca conformitatea HIPAA este un proces continuu - monitorizeaza, auditeaza si imbunatateste constant postura de securitate pentru a proteja eficient informatiile de sanatate ale pacientilor.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.