Compliance

Conformitate HIPAA pentru aplicatii cloud

Nicu Constantin
--12 min lectura
#HIPAA#compliance#healthcare#cloud security#PHI

Conformitate HIPAA pentru Aplicatii Cloud: Ghid Tehnic de Implementare

Construirea aplicatiilor cloud conforme cu HIPAA necesita atentie la gestionarea Protected Health Information (PHI) la fiecare nivel al stivei tehnologice. Acest ghid ofera modele de implementare tehnica pentru obtinerea si mentinerea conformitatii HIPAA.

Intelegerea Masurilor de Protectie Tehnice HIPAA

Controale Administrative, Fizice si Tehnice

# hipaa_controls_framework.yaml
hipaa_safeguards:
  administrative:
    - security_management_process
    - assigned_security_responsibility
    - workforce_security
    - information_access_management
    - security_awareness_training
    - security_incident_procedures
    - contingency_plan
    - evaluation
    - business_associate_contracts
 
  physical:
    - facility_access_controls
    - workstation_use
    - workstation_security
    - device_and_media_controls
 
  technical:
    - access_control:
        - unique_user_identification
        - emergency_access_procedure
        - automatic_logoff
        - encryption_and_decryption
    - audit_controls:
        - audit_logging
        - audit_review
        - audit_reporting
    - integrity:
        - mechanism_to_authenticate_phi
    - person_or_entity_authentication:
        - authentication_methods
    - transmission_security:
        - integrity_controls
        - encryption

Clasificarea si Gestionarea Datelor PHI

Sistem de Clasificare a Datelor

# phi_classification.py
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
 
class PHICategory(Enum):
    """Categories of Protected Health Information."""
    NAMES = "names"
    GEOGRAPHIC = "geographic_data"
    DATES = "dates"
    PHONE_NUMBERS = "phone_numbers"
    FAX_NUMBERS = "fax_numbers"
    EMAIL_ADDRESSES = "email_addresses"
    SSN = "social_security_numbers"
    MEDICAL_RECORD_NUMBERS = "medical_record_numbers"
    HEALTH_PLAN_NUMBERS = "health_plan_beneficiary_numbers"
    ACCOUNT_NUMBERS = "account_numbers"
    CERTIFICATE_NUMBERS = "certificate_license_numbers"
    VEHICLE_IDENTIFIERS = "vehicle_identifiers"
    DEVICE_IDENTIFIERS = "device_identifiers_serial_numbers"
    WEB_URLS = "web_urls"
    IP_ADDRESSES = "ip_addresses"
    BIOMETRIC_IDENTIFIERS = "biometric_identifiers"
    FULL_FACE_PHOTOS = "full_face_photos"
    OTHER_UNIQUE_IDENTIFIERS = "other_unique_identifying_numbers"
 
class SensitivityLevel(Enum):
    """Data sensitivity levels."""
    PUBLIC = "public"
    INTERNAL = "internal"
    CONFIDENTIAL = "confidential"
    RESTRICTED = "restricted"  # PHI falls here
 
@dataclass
class DataClassification:
    """Classification for a data field."""
    field_name: str
    phi_categories: List[PHICategory]
    sensitivity_level: SensitivityLevel
    encryption_required: bool
    audit_required: bool
    retention_days: int
    disposal_method: str
 
class PHIClassifier:
    """Classify and detect PHI in data."""
 
    def __init__(self):
        self.classifications = self._load_classifications()
 
    def _load_classifications(self) -> dict:
        """Load field classifications."""
        return {
            'patient_name': DataClassification(
                field_name='patient_name',
                phi_categories=[PHICategory.NAMES],
                sensitivity_level=SensitivityLevel.RESTRICTED,
                encryption_required=True,
                audit_required=True,
                retention_days=2555,  # 7 years
                disposal_method='crypto_shred'
            ),
            'date_of_birth': DataClassification(
                field_name='date_of_birth',
                phi_categories=[PHICategory.DATES],
                sensitivity_level=SensitivityLevel.RESTRICTED,
                encryption_required=True,
                audit_required=True,
                retention_days=2555,
                disposal_method='crypto_shred'
            ),
            'ssn': DataClassification(
                field_name='ssn',
                phi_categories=[PHICategory.SSN],
                sensitivity_level=SensitivityLevel.RESTRICTED,
                encryption_required=True,
                audit_required=True,
                retention_days=2555,
                disposal_method='crypto_shred'
            ),
            'medical_record_number': DataClassification(
                field_name='medical_record_number',
                phi_categories=[PHICategory.MEDICAL_RECORD_NUMBERS],
                sensitivity_level=SensitivityLevel.RESTRICTED,
                encryption_required=True,
                audit_required=True,
                retention_days=2555,
                disposal_method='crypto_shred'
            ),
            'diagnosis_code': DataClassification(
                field_name='diagnosis_code',
                phi_categories=[],
                sensitivity_level=SensitivityLevel.CONFIDENTIAL,
                encryption_required=True,
                audit_required=True,
                retention_days=2555,
                disposal_method='secure_delete'
            )
        }
 
    def classify_record(self, record: dict) -> dict:
        """Classify all fields in a record."""
        result = {
            'fields': {},
            'overall_sensitivity': SensitivityLevel.PUBLIC,
            'contains_phi': False,
            'encryption_required_fields': [],
            'audit_required_fields': []
        }
 
        for field_name, value in record.items():
            classification = self.classifications.get(field_name)
 
            if classification:
                result['fields'][field_name] = classification
 
                if classification.sensitivity_level == SensitivityLevel.RESTRICTED:
                    result['overall_sensitivity'] = SensitivityLevel.RESTRICTED
                    result['contains_phi'] = True
 
                if classification.encryption_required:
                    result['encryption_required_fields'].append(field_name)
 
                if classification.audit_required:
                    result['audit_required_fields'].append(field_name)
 
        return result

Motor de Detectare PHI

# phi_detection.py
import re
from typing import Dict, List, Tuple
 
class PHIDetector:
    """Detect PHI in unstructured text."""
 
    def __init__(self):
        self.patterns = self._compile_patterns()
 
    def _compile_patterns(self) -> Dict[str, re.Pattern]:
        """Compile regex patterns for PHI detection."""
        return {
            'ssn': re.compile(r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b'),
            'phone': re.compile(r'\b\(?(\d{3})\)?[-.\s]?(\d{3})[-.\s]?(\d{4})\b'),
            'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
            'mrn': re.compile(r'\b(?:MRN|Medical Record Number)[:\s]*([A-Z0-9]{6,12})\b', re.IGNORECASE),
            'date_of_birth': re.compile(r'\b(?:DOB|Date of Birth)[:\s]*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\b', re.IGNORECASE),
            'ip_address': re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'),
            'credit_card': re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b'),
            'health_plan_id': re.compile(r'\b(?:Member ID|Policy Number)[:\s]*([A-Z0-9]{8,15})\b', re.IGNORECASE)
        }
 
    def detect(self, text: str) -> List[Dict]:
        """Detect PHI in text."""
        findings = []
 
        for phi_type, pattern in self.patterns.items():
            for match in pattern.finditer(text):
                findings.append({
                    'type': phi_type,
                    'value': match.group(),
                    'start': match.start(),
                    'end': match.end(),
                    'redacted': self._redact(match.group(), phi_type)
                })
 
        return findings
 
    def _redact(self, value: str, phi_type: str) -> str:
        """Redact PHI value."""
        if phi_type == 'ssn':
            return 'XXX-XX-' + value[-4:]
        elif phi_type == 'phone':
            return '(XXX) XXX-' + value[-4:]
        elif phi_type == 'email':
            parts = value.split('@')
            return parts[0][:2] + '***@' + parts[1]
        elif phi_type == 'credit_card':
            return 'XXXX-XXXX-XXXX-' + value[-4:]
        else:
            return '[REDACTED]'
 
    def redact_text(self, text: str) -> str:
        """Return text with all PHI redacted."""
        findings = self.detect(text)
 
        # Sort by position in reverse to replace from end
        findings.sort(key=lambda x: x['start'], reverse=True)
 
        redacted_text = text
        for finding in findings:
            redacted_text = (
                redacted_text[:finding['start']] +
                finding['redacted'] +
                redacted_text[finding['end']:]
            )
 
        return redacted_text

Implementarea Controlului Accesului

Control al Accesului Bazat pe Roluri pentru PHI

# hipaa_access_control.py
from enum import Enum
from typing import List, Optional, Set
from dataclasses import dataclass
from datetime import datetime
 
class Permission(Enum):
    """PHI access permissions."""
    VIEW_DEMOGRAPHICS = "view_demographics"
    VIEW_CLINICAL = "view_clinical"
    VIEW_BILLING = "view_billing"
    EDIT_DEMOGRAPHICS = "edit_demographics"
    EDIT_CLINICAL = "edit_clinical"
    EDIT_BILLING = "edit_billing"
    EXPORT_DATA = "export_data"
    DELETE_RECORDS = "delete_records"
    ADMIN_ACCESS = "admin_access"
 
class Role(Enum):
    """Healthcare system roles."""
    PATIENT = "patient"
    NURSE = "nurse"
    PHYSICIAN = "physician"
    SPECIALIST = "specialist"
    BILLING_STAFF = "billing_staff"
    ADMIN = "admin"
    AUDITOR = "auditor"
    EMERGENCY = "emergency"
 
@dataclass
class AccessPolicy:
    """Access policy for a role."""
    role: Role
    permissions: Set[Permission]
    phi_access_level: str
    requires_mfa: bool
    session_timeout_minutes: int
    break_glass_eligible: bool
 
class HIPAAAccessController:
    """HIPAA-compliant access control."""
 
    def __init__(self):
        self.policies = self._define_policies()
        self.break_glass_active = {}
 
    def _define_policies(self) -> dict:
        """Define role-based access policies."""
        return {
            Role.PATIENT: AccessPolicy(
                role=Role.PATIENT,
                permissions={
                    Permission.VIEW_DEMOGRAPHICS,
                    Permission.VIEW_CLINICAL,
                    Permission.VIEW_BILLING
                },
                phi_access_level='own_records_only',
                requires_mfa=True,
                session_timeout_minutes=30,
                break_glass_eligible=False
            ),
            Role.NURSE: AccessPolicy(
                role=Role.NURSE,
                permissions={
                    Permission.VIEW_DEMOGRAPHICS,
                    Permission.VIEW_CLINICAL,
                    Permission.EDIT_CLINICAL
                },
                phi_access_level='assigned_patients',
                requires_mfa=True,
                session_timeout_minutes=15,
                break_glass_eligible=True
            ),
            Role.PHYSICIAN: AccessPolicy(
                role=Role.PHYSICIAN,
                permissions={
                    Permission.VIEW_DEMOGRAPHICS,
                    Permission.VIEW_CLINICAL,
                    Permission.EDIT_CLINICAL,
                    Permission.VIEW_BILLING
                },
                phi_access_level='assigned_patients',
                requires_mfa=True,
                session_timeout_minutes=20,
                break_glass_eligible=True
            ),
            Role.BILLING_STAFF: AccessPolicy(
                role=Role.BILLING_STAFF,
                permissions={
                    Permission.VIEW_DEMOGRAPHICS,
                    Permission.VIEW_BILLING,
                    Permission.EDIT_BILLING
                },
                phi_access_level='billing_minimum_necessary',
                requires_mfa=True,
                session_timeout_minutes=15,
                break_glass_eligible=False
            ),
            Role.ADMIN: AccessPolicy(
                role=Role.ADMIN,
                permissions={
                    Permission.VIEW_DEMOGRAPHICS,
                    Permission.VIEW_CLINICAL,
                    Permission.VIEW_BILLING,
                    Permission.EDIT_DEMOGRAPHICS,
                    Permission.ADMIN_ACCESS
                },
                phi_access_level='all_patients',
                requires_mfa=True,
                session_timeout_minutes=10,
                break_glass_eligible=False
            ),
            Role.EMERGENCY: AccessPolicy(
                role=Role.EMERGENCY,
                permissions={
                    Permission.VIEW_DEMOGRAPHICS,
                    Permission.VIEW_CLINICAL,
                    Permission.EDIT_CLINICAL
                },
                phi_access_level='all_patients',
                requires_mfa=False,
                session_timeout_minutes=60,
                break_glass_eligible=False
            )
        }
 
    def check_access(
        self,
        user_id: str,
        user_role: Role,
        permission: Permission,
        patient_id: str,
        context: dict
    ) -> dict:
        """Check if user has access to perform action."""
 
        policy = self.policies.get(user_role)
        if not policy:
            return self._deny("Rol invalid")
 
        # Verificare permisiune
        if permission not in policy.permissions:
            return self._deny(f"Rolul {user_role.value} nu are permisiunea {permission.value}")
 
        # Verificare nivel acces PHI
        access_check = self._check_phi_access_level(
            user_id, user_role, patient_id, policy.phi_access_level, context
        )
 
        if not access_check['allowed']:
            return access_check
 
        # Verificare break glass
        if self._is_break_glass_required(context):
            return self._handle_break_glass(user_id, user_role, patient_id, policy)
 
        return self._allow(user_id, permission, patient_id)
 
    def _check_phi_access_level(
        self,
        user_id: str,
        role: Role,
        patient_id: str,
        access_level: str,
        context: dict
    ) -> dict:
        """Verificare restrictii nivel acces PHI."""
 
        if access_level == 'own_records_only':
            if user_id != patient_id:
                return self._deny("Poate accesa doar propriile inregistrari")
 
        elif access_level == 'assigned_patients':
            assigned = context.get('assigned_patients', [])
            if patient_id not in assigned:
                return self._deny("Pacientul nu este in lista asignata")
 
        elif access_level == 'billing_minimum_necessary':
            # Personalul de facturare vede doar PHI relevante facturarii
            if context.get('requested_data_type') not in ['billing', 'demographics']:
                return self._deny("Acces limitat la datele de facturare")
 
        return {'allowed': True}
 
    def _is_break_glass_required(self, context: dict) -> bool:
        """Verifica daca este necesar acces break glass."""
        return context.get('emergency', False) and not context.get('assigned_patients')
 
    def _handle_break_glass(
        self,
        user_id: str,
        role: Role,
        patient_id: str,
        policy: AccessPolicy
    ) -> dict:
        """Gestioneaza accesul de urgenta break glass."""
 
        if not policy.break_glass_eligible:
            return self._deny("Rolul nu este eligibil pentru acces break glass")
 
        # Inregistreaza accesul break glass
        self.break_glass_active[f"{user_id}:{patient_id}"] = {
            'timestamp': datetime.utcnow().isoformat(),
            'user_id': user_id,
            'patient_id': patient_id,
            'requires_review': True
        }
 
        return {
            'allowed': True,
            'break_glass': True,
            'audit_level': 'critical',
            'message': 'Acces break glass acordat - necesita revizuire'
        }
 
    def _allow(self, user_id: str, permission: Permission, patient_id: str) -> dict:
        return {
            'allowed': True,
            'user_id': user_id,
            'permission': permission.value,
            'patient_id': patient_id,
            'timestamp': datetime.utcnow().isoformat()
        }
 
    def _deny(self, reason: str) -> dict:
        return {
            'allowed': False,
            'reason': reason,
            'timestamp': datetime.utcnow().isoformat()
        }

Sistem de Audit Logging

Logger de Audit Conform HIPAA

# hipaa_audit_logger.py
import json
import hashlib
from datetime import datetime
from typing import Dict, Optional, List
from enum import Enum
 
class AuditEventType(Enum):
    """HIPAA audit event types."""
    LOGIN = "login"
    LOGOUT = "logout"
    LOGIN_FAILED = "login_failed"
    PHI_ACCESS = "phi_access"
    PHI_CREATE = "phi_create"
    PHI_MODIFY = "phi_modify"
    PHI_DELETE = "phi_delete"
    PHI_EXPORT = "phi_export"
    PHI_PRINT = "phi_print"
    PHI_FAX = "phi_fax"
    BREAK_GLASS = "break_glass"
    PERMISSION_CHANGE = "permission_change"
    SYSTEM_CONFIG = "system_config"
    SECURITY_INCIDENT = "security_incident"
 
class HIPAAAuditLogger:
    """HIPAA-compliant audit logging system."""
 
    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.log_chain = []
 
    def log_phi_access(
        self,
        user_id: str,
        patient_id: str,
        action: str,
        phi_fields: List[str],
        reason: str,
        context: Optional[Dict] = None
    ) -> Dict:
        """Inregistreaza eveniment de acces PHI."""
 
        event = self._create_audit_event(
            event_type=AuditEventType.PHI_ACCESS,
            user_id=user_id,
            patient_id=patient_id,
            action=action,
            details={
                'phi_fields_accessed': phi_fields,
                'reason_for_access': reason,
                'minimum_necessary_applied': True
            },
            context=context
        )
 
        return self._store_event(event)
 
    def log_phi_modification(
        self,
        user_id: str,
        patient_id: str,
        action: str,
        field_name: str,
        old_value_hash: str,
        new_value_hash: str,
        context: Optional[Dict] = None
    ) -> Dict:
        """Inregistreaza eveniment de modificare PHI."""
 
        event = self._create_audit_event(
            event_type=AuditEventType.PHI_MODIFY,
            user_id=user_id,
            patient_id=patient_id,
            action=action,
            details={
                'field_modified': field_name,
                'old_value_hash': old_value_hash,
                'new_value_hash': new_value_hash
            },
            context=context
        )
 
        return self._store_event(event)
 
    def log_break_glass(
        self,
        user_id: str,
        patient_id: str,
        justification: str,
        context: Optional[Dict] = None
    ) -> Dict:
        """Inregistreaza acces de urgenta break glass."""
 
        event = self._create_audit_event(
            event_type=AuditEventType.BREAK_GLASS,
            user_id=user_id,
            patient_id=patient_id,
            action='emergency_access',
            details={
                'justification': justification,
                'requires_review': True,
                'review_deadline_hours': 24
            },
            context=context,
            severity='critical'
        )
 
        return self._store_event(event)
 
    def _create_audit_event(
        self,
        event_type: AuditEventType,
        user_id: str,
        patient_id: str,
        action: str,
        details: Dict,
        context: Optional[Dict] = None,
        severity: str = 'info'
    ) -> Dict:
        """Creaza inregistrare eveniment audit."""
 
        previous_hash = self.log_chain[-1] if self.log_chain else "genesis"
 
        event = {
            'event_id': self._generate_event_id(),
            'timestamp': datetime.utcnow().isoformat() + 'Z',
            'event_type': event_type.value,
            'severity': severity,
            'actor': {
                'user_id': user_id,
                'ip_address': context.get('ip_address') if context else None,
                'user_agent': context.get('user_agent') if context else None,
                'session_id': context.get('session_id') if context else None,
                'workstation_id': context.get('workstation_id') if context else None
            },
            'patient': {
                'patient_id': patient_id,
                'record_type': context.get('record_type') if context else None
            },
            'action': action,
            'details': details,
            'integrity': {
                'previous_hash': previous_hash,
                'event_hash': None
            }
        }
 
        # Calculeaza hash-ul de integritate
        event['integrity']['event_hash'] = self._compute_hash(event)
        self.log_chain.append(event['integrity']['event_hash'])
 
        return event
 
    def _generate_event_id(self) -> str:
        """Genereaza ID unic pentru eveniment."""
        import uuid
        return str(uuid.uuid4())
 
    def _compute_hash(self, event: Dict) -> str:
        """Calculeaza hash pentru verificarea integritatii."""
        event_copy = event.copy()
        event_copy['integrity']['event_hash'] = ""
        serialized = json.dumps(event_copy, sort_keys=True)
        return hashlib.sha256(serialized.encode()).hexdigest()
 
    def _store_event(self, event: Dict) -> Dict:
        """Stocheaza evenimentul de audit."""
        self.storage.store(event)
        return event
 
    def query_patient_access_history(
        self,
        patient_id: str,
        start_date: datetime,
        end_date: datetime
    ) -> List[Dict]:
        """Interogheaza istoricul accesului pentru un pacient (pentru contabilizarea dezvaluirilor)."""
 
        return self.storage.query({
            'patient.patient_id': patient_id,
            'timestamp': {
                '$gte': start_date.isoformat(),
                '$lte': end_date.isoformat()
            },
            'event_type': {
                '$in': [
                    AuditEventType.PHI_ACCESS.value,
                    AuditEventType.PHI_EXPORT.value,
                    AuditEventType.BREAK_GLASS.value
                ]
            }
        })
 
    def verify_log_integrity(self) -> bool:
        """Verifica integritatea lantului de audit log."""
        events = self.storage.retrieve_all()
 
        for i, event in enumerate(events):
            expected_previous = events[i-1]['integrity']['event_hash'] if i > 0 else "genesis"
 
            if event['integrity']['previous_hash'] != expected_previous:
                return False
 
            computed_hash = self._compute_hash(event)
            if computed_hash != event['integrity']['event_hash']:
                return False
 
        return True

Implementarea Criptarii

Criptare la Nivel de Camp pentru PHI

# phi_encryption.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import base64
import os
from typing import Dict, Any, Optional
import json
 
class PHIEncryption:
    """Field-level encryption for PHI data."""
 
    def __init__(self, master_key: bytes, key_id: str):
        self.master_key = master_key
        self.key_id = key_id
        self.field_keys = {}
 
    def derive_field_key(self, field_name: str) -> bytes:
        """Deriveaza cheia de criptare pentru un camp specific."""
 
        if field_name in self.field_keys:
            return self.field_keys[field_name]
 
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=field_name.encode(),
            iterations=100000,
        )
 
        key = kdf.derive(self.master_key)
        self.field_keys[field_name] = key
        return key
 
    def encrypt_field(self, field_name: str, value: Any) -> Dict:
        """Cripteaza un singur camp."""
 
        key = self.derive_field_key(field_name)
        nonce = os.urandom(12)
 
        # Serializare valoare
        if isinstance(value, str):
            plaintext = value.encode()
        else:
            plaintext = json.dumps(value).encode()
 
        # Criptare cu AES-GCM
        aesgcm = AESGCM(key)
        ciphertext = aesgcm.encrypt(nonce, plaintext, field_name.encode())
 
        return {
            'encrypted': True,
            'algorithm': 'AES-256-GCM',
            'key_id': self.key_id,
            'nonce': base64.b64encode(nonce).decode(),
            'ciphertext': base64.b64encode(ciphertext).decode()
        }
 
    def decrypt_field(self, field_name: str, encrypted_data: Dict) -> Any:
        """Decripteaza un singur camp."""
 
        key = self.derive_field_key(field_name)
        nonce = base64.b64decode(encrypted_data['nonce'])
        ciphertext = base64.b64decode(encrypted_data['ciphertext'])
 
        aesgcm = AESGCM(key)
        plaintext = aesgcm.decrypt(nonce, ciphertext, field_name.encode())
 
        # Incearca deserializare ca JSON
        try:
            return json.loads(plaintext.decode())
        except json.JSONDecodeError:
            return plaintext.decode()
 
    def encrypt_record(self, record: Dict, phi_fields: list) -> Dict:
        """Cripteaza campurile PHI dintr-o inregistrare."""
 
        encrypted_record = record.copy()
 
        for field in phi_fields:
            if field in encrypted_record and encrypted_record[field] is not None:
                encrypted_record[field] = self.encrypt_field(
                    field,
                    encrypted_record[field]
                )
 
        encrypted_record['_encryption_metadata'] = {
            'version': '1.0',
            'key_id': self.key_id,
            'encrypted_fields': phi_fields,
            'encrypted_at': datetime.utcnow().isoformat()
        }
 
        return encrypted_record
 
    def decrypt_record(self, encrypted_record: Dict) -> Dict:
        """Decripteaza campurile PHI dintr-o inregistrare."""
 
        metadata = encrypted_record.get('_encryption_metadata', {})
        phi_fields = metadata.get('encrypted_fields', [])
 
        decrypted_record = encrypted_record.copy()
        del decrypted_record['_encryption_metadata']
 
        for field in phi_fields:
            if field in decrypted_record and isinstance(decrypted_record[field], dict):
                if decrypted_record[field].get('encrypted'):
                    decrypted_record[field] = self.decrypt_field(
                        field,
                        decrypted_record[field]
                    )
 
        return decrypted_record
 
 
class TransmissionEncryption:
    """Asigura criptarea in tranzit pentru PHI."""
 
    @staticmethod
    def verify_tls_connection(conn) -> Dict:
        """Verifica daca configuratia TLS respecta cerintele HIPAA."""
 
        cipher = conn.cipher()
        version = conn.version()
 
        requirements = {
            'min_tls_version': 'TLSv1.2',
            'min_key_size': 2048,
            'allowed_ciphers': [
                'ECDHE-RSA-AES256-GCM-SHA384',
                'ECDHE-RSA-AES128-GCM-SHA256',
                'DHE-RSA-AES256-GCM-SHA384',
                'DHE-RSA-AES128-GCM-SHA256'
            ]
        }
 
        return {
            'compliant': (
                version >= requirements['min_tls_version'] and
                cipher[0] in requirements['allowed_ciphers']
            ),
            'tls_version': version,
            'cipher_suite': cipher[0],
            'key_size': cipher[2],
            'requirements': requirements
        }

Cerinte Business Associate Agreement (BAA)

Verificator Conformitate BAA

# baa_compliance.py
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime
 
@dataclass
class VendorBAA:
    """Business Associate Agreement record."""
    vendor_name: str
    baa_signed_date: datetime
    baa_expiry_date: datetime
    services_covered: List[str]
    phi_access_type: str
    subcontractor_clause: bool
    breach_notification_hours: int
    security_requirements: List[str]
    audit_rights: bool
 
class BAAComplianceManager:
    """Gestioneaza conformitatea BAA pentru furnizori."""
 
    def __init__(self):
        self.vendor_baas: Dict[str, VendorBAA] = {}
 
    def register_vendor(self, vendor: VendorBAA) -> Dict:
        """Inregistreaza BAA-ul unui furnizor."""
 
        # Valideaza cerintele BAA
        validation = self._validate_baa(vendor)
 
        if not validation['valid']:
            return {
                'registered': False,
                'errors': validation['errors']
            }
 
        self.vendor_baas[vendor.vendor_name] = vendor
 
        return {
            'registered': True,
            'vendor': vendor.vendor_name,
            'expiry': vendor.baa_expiry_date.isoformat()
        }
 
    def _validate_baa(self, vendor: VendorBAA) -> Dict:
        """Valideaza daca BAA respecta cerintele HIPAA."""
 
        errors = []
        required_security = [
            'encryption_at_rest',
            'encryption_in_transit',
            'access_controls',
            'audit_logging',
            'incident_response'
        ]
 
        # Verifica cerintele de securitate
        missing_security = [
            req for req in required_security
            if req not in vendor.security_requirements
        ]
        if missing_security:
            errors.append(f"Cerinte de securitate lipsa: {missing_security}")
 
        # Verifica notificarea bresei
        if vendor.breach_notification_hours > 24:
            errors.append("Notificarea bresei trebuie sa fie in 24 de ore")
 
        # Verifica drepturile de audit
        if not vendor.audit_rights:
            errors.append("BAA trebuie sa includa drepturi de audit")
 
        # Verifica clauza subcontractant
        if not vendor.subcontractor_clause:
            errors.append("BAA trebuie sa includa obligatii pentru subcontractanti")
 
        return {
            'valid': len(errors) == 0,
            'errors': errors
        }
 
    def check_vendor_compliance(self, vendor_name: str) -> Dict:
        """Verifica daca BAA-ul furnizorului este conform si actual."""
 
        vendor = self.vendor_baas.get(vendor_name)
 
        if not vendor:
            return {
                'compliant': False,
                'reason': 'Niciun BAA inregistrat'
            }
 
        if vendor.baa_expiry_date < datetime.utcnow():
            return {
                'compliant': False,
                'reason': 'BAA expirat',
                'expired_date': vendor.baa_expiry_date.isoformat()
            }
 
        return {
            'compliant': True,
            'vendor': vendor_name,
            'expiry': vendor.baa_expiry_date.isoformat()
        }
 
    def get_compliance_report(self) -> Dict:
        """Genereaza raport de conformitate BAA."""
 
        report = {
            'generated_at': datetime.utcnow().isoformat(),
            'total_vendors': len(self.vendor_baas),
            'compliant': [],
            'non_compliant': [],
            'expiring_soon': []
        }
 
        for vendor_name, vendor in self.vendor_baas.items():
            compliance = self.check_vendor_compliance(vendor_name)
 
            if compliance['compliant']:
                report['compliant'].append(vendor_name)
 
                # Verifica daca expira in 90 de zile
                days_until_expiry = (vendor.baa_expiry_date - datetime.utcnow()).days
                if days_until_expiry <= 90:
                    report['expiring_soon'].append({
                        'vendor': vendor_name,
                        'days_until_expiry': days_until_expiry
                    })
            else:
                report['non_compliant'].append({
                    'vendor': vendor_name,
                    'reason': compliance.get('reason')
                })
 
        return report

Concluzie

Conformitatea HIPAA pentru aplicatii cloud necesita:

  1. Clasificarea datelor - Identifica si clasifica toate datele PHI
  2. Controlul accesului - Implementeaza acces bazat pe roluri cu principiul necesarului minim
  3. Criptare - Cripteaza PHI in repaus si in tranzit
  4. Audit logging - Mentine piste de audit complete si rezistente la falsificare
  5. Gestionare BAA - Asigura-te ca toti furnizorii au BAA-uri conforme
  6. Raspuns la brese - Pregateste proceduri de raspuns la incidente

Evaluarile regulate si monitorizarea continua sunt esentiale pentru mentinerea conformitatii pe masura ce aplicatia evolueaza.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.