Compliance

HIPAA Compliance Technical Guide for Healthcare Applications

DeviDevs Team
16 min read
#hipaa#compliance#healthcare#phi-protection#security

HIPAA compliance requires comprehensive technical safeguards to protect Protected Health Information (PHI). This guide provides practical implementations for the Security Rule requirements in healthcare applications.

PHI Data Classification System

Implement systematic PHI identification and classification:

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional, List, Dict, Set
import re
import hashlib
 
class PHIType(Enum):
    """18 HIPAA PHI identifiers."""
    NAME = "name"
    ADDRESS = "address"
    DATES = "dates"  # Except year
    PHONE = "phone"
    FAX = "fax"
    EMAIL = "email"
    SSN = "ssn"
    MRN = "medical_record_number"
    HEALTH_PLAN_ID = "health_plan_id"
    ACCOUNT_NUMBER = "account_number"
    CERTIFICATE_LICENSE = "certificate_license"
    VEHICLE_ID = "vehicle_id"
    DEVICE_ID = "device_id"
    URL = "url"
    IP_ADDRESS = "ip_address"
    BIOMETRIC = "biometric"
    PHOTO = "photo"
    OTHER_UNIQUE = "other_unique_identifier"
 
class SensitivityLevel(Enum):
    HIGH = "high"      # Direct identifiers
    MEDIUM = "medium"  # Quasi-identifiers
    LOW = "low"        # Non-identifying health data
 
@dataclass
class PHIElement:
    phi_type: PHIType
    value: str
    sensitivity: SensitivityLevel
    location: str  # Where in the data
    context: Optional[str] = None
 
@dataclass
class DataClassificationResult:
    contains_phi: bool
    phi_elements: List[PHIElement]
    sensitivity_level: SensitivityLevel
    recommended_actions: List[str]
    timestamp: datetime = field(default_factory=datetime.utcnow)
 
class PHIClassifier:
    def __init__(self):
        self.patterns = self._compile_patterns()
        self.high_sensitivity_types = {
            PHIType.SSN, PHIType.MRN, PHIType.HEALTH_PLAN_ID,
            PHIType.BIOMETRIC, PHIType.PHOTO
        }
        self.medium_sensitivity_types = {
            PHIType.NAME, PHIType.ADDRESS, PHIType.PHONE,
            PHIType.EMAIL, PHIType.DATES
        }
 
    def _compile_patterns(self) -> Dict[PHIType, List[re.Pattern]]:
        """Compile regex patterns for PHI detection."""
        return {
            PHIType.SSN: [
                re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
                re.compile(r'\b\d{9}\b')  # No dashes
            ],
            PHIType.PHONE: [
                re.compile(r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b'),
                re.compile(r'\(\d{3}\)\s*\d{3}[-.\s]?\d{4}')
            ],
            PHIType.EMAIL: [
                re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
            ],
            PHIType.MRN: [
                re.compile(r'\bMRN[:\s#]*\d{6,10}\b', re.IGNORECASE),
                re.compile(r'\bMedical Record[:\s#]*\d{6,10}\b', re.IGNORECASE)
            ],
            PHIType.DATES: [
                re.compile(r'\b\d{1,2}/\d{1,2}/\d{2,4}\b'),
                re.compile(r'\b\d{4}-\d{2}-\d{2}\b'),
                re.compile(r'\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{4}\b', re.IGNORECASE)
            ],
            PHIType.IP_ADDRESS: [
                re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')
            ],
            PHIType.HEALTH_PLAN_ID: [
                re.compile(r'\b[A-Z]{3}\d{9}\b'),  # Common format
                re.compile(r'\bMember\s*ID[:\s#]*[A-Z0-9]{8,12}\b', re.IGNORECASE)
            ],
            PHIType.ADDRESS: [
                re.compile(r'\b\d+\s+[A-Za-z]+\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr)\b', re.IGNORECASE)
            ]
        }
 
    def classify(self, data: str, context: str = "unknown") -> DataClassificationResult:
        """Classify data for PHI content."""
        phi_elements = []
 
        # Pattern-based detection
        for phi_type, patterns in self.patterns.items():
            for pattern in patterns:
                for match in pattern.finditer(data):
                    sensitivity = self._get_sensitivity(phi_type)
                    phi_elements.append(PHIElement(
                        phi_type=phi_type,
                        value=match.group(),
                        sensitivity=sensitivity,
                        location=f"position {match.start()}-{match.end()}",
                        context=context
                    ))
 
        # Determine overall sensitivity
        if not phi_elements:
            overall_sensitivity = SensitivityLevel.LOW
        elif any(e.sensitivity == SensitivityLevel.HIGH for e in phi_elements):
            overall_sensitivity = SensitivityLevel.HIGH
        elif any(e.sensitivity == SensitivityLevel.MEDIUM for e in phi_elements):
            overall_sensitivity = SensitivityLevel.MEDIUM
        else:
            overall_sensitivity = SensitivityLevel.LOW
 
        # Generate recommendations
        recommendations = self._generate_recommendations(phi_elements, overall_sensitivity)
 
        return DataClassificationResult(
            contains_phi=len(phi_elements) > 0,
            phi_elements=phi_elements,
            sensitivity_level=overall_sensitivity,
            recommended_actions=recommendations
        )
 
    def _get_sensitivity(self, phi_type: PHIType) -> SensitivityLevel:
        """Determine sensitivity level for PHI type."""
        if phi_type in self.high_sensitivity_types:
            return SensitivityLevel.HIGH
        elif phi_type in self.medium_sensitivity_types:
            return SensitivityLevel.MEDIUM
        return SensitivityLevel.LOW
 
    def _generate_recommendations(
        self,
        elements: List[PHIElement],
        sensitivity: SensitivityLevel
    ) -> List[str]:
        """Generate handling recommendations."""
        recommendations = []
 
        if sensitivity == SensitivityLevel.HIGH:
            recommendations.extend([
                "Encrypt data at rest and in transit",
                "Implement strict access controls",
                "Enable comprehensive audit logging",
                "Consider de-identification before storage"
            ])
 
        if any(e.phi_type == PHIType.SSN for e in elements):
            recommendations.append("SSN detected - store only last 4 digits if possible")
 
        if any(e.phi_type == PHIType.DATES for e in elements):
            recommendations.append("Consider generalizing dates to year only for de-identification")
 
        if any(e.phi_type == PHIType.ADDRESS for e in elements):
            recommendations.append("Consider using ZIP code only (first 3 digits for populations <20,000)")
 
        return recommendations
 
    def de_identify(self, data: str, method: str = "safe_harbor") -> Dict:
        """De-identify data using specified method."""
        if method == "safe_harbor":
            return self._safe_harbor_deidentification(data)
        elif method == "expert_determination":
            return self._expert_determination_placeholder(data)
        else:
            raise ValueError(f"Unknown de-identification method: {method}")
 
    def _safe_harbor_deidentification(self, data: str) -> Dict:
        """Apply Safe Harbor de-identification (remove all 18 identifiers)."""
        result = data
        removed_elements = []
 
        for phi_type, patterns in self.patterns.items():
            for pattern in patterns:
                matches = pattern.findall(result)
                for match in matches:
                    removed_elements.append({
                        'type': phi_type.value,
                        'original': match,
                        'replacement': f"[{phi_type.value.upper()}_REMOVED]"
                    })
                result = pattern.sub(f"[{phi_type.value.upper()}_REMOVED]", result)
 
        return {
            'original_length': len(data),
            'deidentified_data': result,
            'deidentified_length': len(result),
            'elements_removed': len(removed_elements),
            'removal_details': removed_elements,
            'method': 'safe_harbor',
            'hipaa_compliant': True
        }
 
    def _expert_determination_placeholder(self, data: str) -> Dict:
        """Placeholder for expert determination method."""
        # In production, this would involve statistical analysis
        # to determine re-identification risk
        return {
            'message': 'Expert determination requires statistical analysis',
            'recommendation': 'Consult with qualified statistical expert'
        }

Access Control Implementation

Implement HIPAA-compliant access controls:

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Set, Dict, List
import hashlib
import jwt
 
class AccessLevel(Enum):
    NONE = 0
    READ = 1
    WRITE = 2
    ADMIN = 3
 
class UserRole(Enum):
    PATIENT = "patient"
    NURSE = "nurse"
    PHYSICIAN = "physician"
    ADMIN = "admin"
    BILLING = "billing"
    IT_SUPPORT = "it_support"
    AUDITOR = "auditor"
 
@dataclass
class User:
    user_id: str
    username: str
    role: UserRole
    department: str
    active: bool
    mfa_enabled: bool
    last_login: Optional[datetime]
    permissions: Set[str]
    access_restrictions: Dict[str, any] = None
 
@dataclass
class AccessDecision:
    allowed: bool
    user_id: str
    resource: str
    action: str
    timestamp: datetime
    reason: str
    requires_break_glass: bool = False
    audit_id: str = ""
 
class HIPAAAccessControl:
    def __init__(self):
        self.users: Dict[str, User] = {}
        self.role_permissions = self._define_role_permissions()
        self.break_glass_log: List[Dict] = []
        self.session_timeout_minutes = 15  # HIPAA requirement
 
    def _define_role_permissions(self) -> Dict[UserRole, Dict]:
        """Define role-based permissions following minimum necessary principle."""
        return {
            UserRole.PATIENT: {
                'permissions': {'read_own_records', 'request_amendment'},
                'phi_access': {'own_only': True},
                'restrictions': {'department': None}
            },
            UserRole.NURSE: {
                'permissions': {
                    'read_patient_records', 'write_vitals', 'write_notes',
                    'view_medications', 'administer_medications'
                },
                'phi_access': {'assigned_patients': True, 'department_patients': True},
                'restrictions': {'department': 'assigned'}
            },
            UserRole.PHYSICIAN: {
                'permissions': {
                    'read_patient_records', 'write_diagnosis', 'write_orders',
                    'prescribe_medications', 'view_all_records', 'write_notes'
                },
                'phi_access': {'all_patients': True},
                'restrictions': None
            },
            UserRole.BILLING: {
                'permissions': {
                    'read_billing_info', 'write_billing', 'read_insurance',
                    'read_demographics'
                },
                'phi_access': {'billing_relevant': True},
                'restrictions': {'exclude_clinical_notes': True}
            },
            UserRole.ADMIN: {
                'permissions': {'all'},
                'phi_access': {'all': True},
                'restrictions': None
            },
            UserRole.IT_SUPPORT: {
                'permissions': {
                    'system_administration', 'user_management',
                    'audit_log_access'
                },
                'phi_access': {'none': True},  # IT shouldn't access PHI
                'restrictions': {'no_phi_access': True}
            },
            UserRole.AUDITOR: {
                'permissions': {
                    'audit_log_access', 'compliance_reports',
                    'access_reviews'
                },
                'phi_access': {'audit_only': True},
                'restrictions': {'read_only': True}
            }
        }
 
    def check_access(
        self,
        user_id: str,
        resource: str,
        action: str,
        patient_id: Optional[str] = None,
        context: Dict = None
    ) -> AccessDecision:
        """Check if user has access to resource."""
        audit_id = hashlib.md5(
            f"{user_id}{resource}{action}{datetime.utcnow().isoformat()}".encode()
        ).hexdigest()[:12]
 
        if user_id not in self.users:
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason="User not found",
                audit_id=audit_id
            )
 
        user = self.users[user_id]
 
        # Check if user is active
        if not user.active:
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason="User account is inactive",
                audit_id=audit_id
            )
 
        # Check MFA requirement for PHI access
        if self._requires_phi_access(resource) and not user.mfa_enabled:
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason="MFA required for PHI access",
                audit_id=audit_id
            )
 
        # Get role permissions
        role_config = self.role_permissions.get(user.role)
        if not role_config:
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason="Role not configured",
                audit_id=audit_id
            )
 
        # Check permission
        required_permission = self._get_required_permission(resource, action)
        has_permission = (
            'all' in role_config['permissions'] or
            required_permission in role_config['permissions'] or
            required_permission in user.permissions
        )
 
        if not has_permission:
            # Check for break-glass access
            if self._is_emergency_context(context):
                return AccessDecision(
                    allowed=True,
                    user_id=user_id,
                    resource=resource,
                    action=action,
                    timestamp=datetime.utcnow(),
                    reason="Emergency break-glass access",
                    requires_break_glass=True,
                    audit_id=audit_id
                )
 
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason=f"Missing permission: {required_permission}",
                audit_id=audit_id
            )
 
        # Check patient-specific restrictions
        if patient_id and not self._check_patient_access(user, patient_id, role_config):
            return AccessDecision(
                allowed=False,
                user_id=user_id,
                resource=resource,
                action=action,
                timestamp=datetime.utcnow(),
                reason="Not authorized for this patient's records",
                audit_id=audit_id
            )
 
        return AccessDecision(
            allowed=True,
            user_id=user_id,
            resource=resource,
            action=action,
            timestamp=datetime.utcnow(),
            reason="Access granted",
            audit_id=audit_id
        )
 
    def _requires_phi_access(self, resource: str) -> bool:
        """Determine if resource contains PHI."""
        phi_resources = [
            'patient_records', 'medical_history', 'lab_results',
            'prescriptions', 'clinical_notes', 'imaging',
            'demographics', 'insurance_info'
        ]
        return any(phi in resource for phi in phi_resources)
 
    def _get_required_permission(self, resource: str, action: str) -> str:
        """Map resource/action to required permission."""
        # Simplified mapping - expand based on your system
        return f"{action}_{resource}"
 
    def _is_emergency_context(self, context: Optional[Dict]) -> bool:
        """Check if this is an emergency access request."""
        if not context:
            return False
        return context.get('emergency', False) and context.get('justification')
 
    def _check_patient_access(
        self,
        user: User,
        patient_id: str,
        role_config: Dict
    ) -> bool:
        """Check if user can access specific patient's records."""
        phi_access = role_config.get('phi_access', {})
 
        if phi_access.get('all_patients') or phi_access.get('all'):
            return True
 
        if phi_access.get('own_only') and user.user_id == patient_id:
            return True
 
        if phi_access.get('assigned_patients'):
            # Check assignment - would query assignment table in production
            return self._is_assigned_to_patient(user.user_id, patient_id)
 
        if phi_access.get('department_patients'):
            # Check if patient is in user's department
            return self._is_patient_in_department(patient_id, user.department)
 
        return False
 
    def _is_assigned_to_patient(self, user_id: str, patient_id: str) -> bool:
        """Check if user is assigned to patient - placeholder."""
        # In production, query assignment database
        return False
 
    def _is_patient_in_department(self, patient_id: str, department: str) -> bool:
        """Check if patient is in department - placeholder."""
        # In production, query patient location/department
        return False
 
    def log_break_glass(
        self,
        user_id: str,
        patient_id: str,
        justification: str,
        access_decision: AccessDecision
    ):
        """Log break-glass access for review."""
        self.break_glass_log.append({
            'timestamp': datetime.utcnow().isoformat(),
            'user_id': user_id,
            'patient_id': patient_id,
            'justification': justification,
            'audit_id': access_decision.audit_id,
            'reviewed': False,
            'review_result': None
        })

HIPAA Audit Logging

Implement comprehensive audit logging:

from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
import json
import hashlib
import uuid
 
class AuditEventType(Enum):
    # Access events
    LOGIN_SUCCESS = "login_success"
    LOGIN_FAILURE = "login_failure"
    LOGOUT = "logout"
    SESSION_TIMEOUT = "session_timeout"
 
    # PHI access events
    PHI_VIEW = "phi_view"
    PHI_CREATE = "phi_create"
    PHI_UPDATE = "phi_update"
    PHI_DELETE = "phi_delete"
    PHI_EXPORT = "phi_export"
    PHI_PRINT = "phi_print"
 
    # Administrative events
    USER_CREATE = "user_create"
    USER_MODIFY = "user_modify"
    USER_DELETE = "user_delete"
    PERMISSION_CHANGE = "permission_change"
 
    # Security events
    MFA_ENABLED = "mfa_enabled"
    MFA_DISABLED = "mfa_disabled"
    PASSWORD_CHANGE = "password_change"
    BREAK_GLASS_ACCESS = "break_glass_access"
 
    # System events
    SYSTEM_START = "system_start"
    SYSTEM_STOP = "system_stop"
    CONFIG_CHANGE = "config_change"
    BACKUP_CREATED = "backup_created"
 
@dataclass
class AuditEvent:
    event_id: str
    timestamp: datetime
    event_type: AuditEventType
    user_id: str
    patient_id: Optional[str]
    resource: str
    action: str
    outcome: str  # success, failure, error
    source_ip: str
    user_agent: Optional[str]
    details: Dict[str, Any]
    phi_accessed: bool
    integrity_hash: str = ""
 
    def __post_init__(self):
        if not self.integrity_hash:
            self.integrity_hash = self._compute_hash()
 
    def _compute_hash(self) -> str:
        """Compute integrity hash for tamper detection."""
        data = f"{self.event_id}{self.timestamp.isoformat()}{self.event_type.value}"
        data += f"{self.user_id}{self.patient_id}{self.resource}{self.action}{self.outcome}"
        return hashlib.sha256(data.encode()).hexdigest()
 
class HIPAAAuditLogger:
    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.retention_years = 6  # HIPAA minimum
        self.previous_hash: Optional[str] = None
 
    def log_event(
        self,
        event_type: AuditEventType,
        user_id: str,
        resource: str,
        action: str,
        outcome: str,
        source_ip: str,
        patient_id: Optional[str] = None,
        user_agent: Optional[str] = None,
        details: Dict[str, Any] = None,
        phi_accessed: bool = False
    ) -> AuditEvent:
        """Log an audit event."""
        event = AuditEvent(
            event_id=str(uuid.uuid4()),
            timestamp=datetime.utcnow(),
            event_type=event_type,
            user_id=user_id,
            patient_id=patient_id,
            resource=resource,
            action=action,
            outcome=outcome,
            source_ip=source_ip,
            user_agent=user_agent,
            details=details or {},
            phi_accessed=phi_accessed
        )
 
        # Add chain hash for tamper evidence
        if self.previous_hash:
            event.details['previous_hash'] = self.previous_hash
        self.previous_hash = event.integrity_hash
 
        # Store event
        self.storage.store(event)
 
        return event
 
    def log_phi_access(
        self,
        user_id: str,
        patient_id: str,
        record_type: str,
        action: str,
        source_ip: str,
        fields_accessed: List[str] = None,
        purpose: str = None
    ) -> AuditEvent:
        """Log PHI access with detailed tracking."""
        event_type_map = {
            'view': AuditEventType.PHI_VIEW,
            'create': AuditEventType.PHI_CREATE,
            'update': AuditEventType.PHI_UPDATE,
            'delete': AuditEventType.PHI_DELETE,
            'export': AuditEventType.PHI_EXPORT,
            'print': AuditEventType.PHI_PRINT
        }
 
        return self.log_event(
            event_type=event_type_map.get(action, AuditEventType.PHI_VIEW),
            user_id=user_id,
            patient_id=patient_id,
            resource=record_type,
            action=action,
            outcome='success',
            source_ip=source_ip,
            phi_accessed=True,
            details={
                'fields_accessed': fields_accessed or [],
                'purpose': purpose,
                'record_type': record_type
            }
        )
 
    def generate_access_report(
        self,
        patient_id: str,
        start_date: datetime,
        end_date: datetime
    ) -> Dict:
        """Generate access report for patient (required for HIPAA accounting of disclosures)."""
        events = self.storage.query(
            patient_id=patient_id,
            start_date=start_date,
            end_date=end_date,
            phi_accessed=True
        )
 
        report = {
            'patient_id': patient_id,
            'report_period': {
                'start': start_date.isoformat(),
                'end': end_date.isoformat()
            },
            'generated_at': datetime.utcnow().isoformat(),
            'total_access_events': len(events),
            'access_by_user': {},
            'access_by_type': {},
            'access_timeline': []
        }
 
        for event in events:
            # By user
            if event.user_id not in report['access_by_user']:
                report['access_by_user'][event.user_id] = {
                    'count': 0,
                    'actions': []
                }
            report['access_by_user'][event.user_id]['count'] += 1
            report['access_by_user'][event.user_id]['actions'].append(event.action)
 
            # By type
            record_type = event.details.get('record_type', 'unknown')
            if record_type not in report['access_by_type']:
                report['access_by_type'][record_type] = 0
            report['access_by_type'][record_type] += 1
 
            # Timeline
            report['access_timeline'].append({
                'timestamp': event.timestamp.isoformat(),
                'user_id': event.user_id,
                'action': event.action,
                'record_type': record_type
            })
 
        return report
 
    def verify_log_integrity(self, events: List[AuditEvent]) -> Dict:
        """Verify integrity of audit log chain."""
        results = {
            'verified': True,
            'total_events': len(events),
            'integrity_failures': []
        }
 
        for i, event in enumerate(events):
            # Verify individual event hash
            expected_hash = event._compute_hash()
            if expected_hash != event.integrity_hash:
                results['verified'] = False
                results['integrity_failures'].append({
                    'event_id': event.event_id,
                    'issue': 'Hash mismatch - event may have been tampered'
                })
 
            # Verify chain
            if i > 0:
                previous_hash = event.details.get('previous_hash')
                if previous_hash and previous_hash != events[i-1].integrity_hash:
                    results['verified'] = False
                    results['integrity_failures'].append({
                        'event_id': event.event_id,
                        'issue': 'Chain broken - events may be missing or reordered'
                    })
 
        return results

Encryption Implementation

Implement HIPAA-compliant encryption:

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
import os
import base64
from dataclasses import dataclass
from typing import Optional, Dict
import json
 
@dataclass
class EncryptedData:
    ciphertext: bytes
    nonce: bytes
    key_id: str
    algorithm: str
    version: int
 
class HIPAAEncryption:
    """HIPAA-compliant encryption for PHI."""
 
    def __init__(self, key_manager):
        self.key_manager = key_manager
        self.algorithm = 'AES-256-GCM'  # NIST recommended
        self.current_key_version = 1
 
    def encrypt_phi(
        self,
        data: str,
        patient_id: str,
        data_type: str
    ) -> EncryptedData:
        """Encrypt PHI data."""
        # Get encryption key
        key, key_id = self.key_manager.get_current_key()
 
        # Generate unique nonce
        nonce = os.urandom(12)
 
        # Create additional authenticated data (AAD)
        aad = json.dumps({
            'patient_id': patient_id,
            'data_type': data_type,
            'key_version': self.current_key_version
        }).encode()
 
        # Encrypt
        aesgcm = AESGCM(key)
        ciphertext = aesgcm.encrypt(nonce, data.encode(), aad)
 
        return EncryptedData(
            ciphertext=ciphertext,
            nonce=nonce,
            key_id=key_id,
            algorithm=self.algorithm,
            version=self.current_key_version
        )
 
    def decrypt_phi(
        self,
        encrypted_data: EncryptedData,
        patient_id: str,
        data_type: str
    ) -> str:
        """Decrypt PHI data."""
        # Get decryption key
        key = self.key_manager.get_key(
            encrypted_data.key_id,
            encrypted_data.version
        )
 
        # Recreate AAD
        aad = json.dumps({
            'patient_id': patient_id,
            'data_type': data_type,
            'key_version': encrypted_data.version
        }).encode()
 
        # Decrypt
        aesgcm = AESGCM(key)
        plaintext = aesgcm.decrypt(
            encrypted_data.nonce,
            encrypted_data.ciphertext,
            aad
        )
 
        return plaintext.decode()
 
    def encrypt_field(self, value: str, field_name: str) -> str:
        """Encrypt individual field for database storage."""
        key, key_id = self.key_manager.get_current_key()
        nonce = os.urandom(12)
 
        aesgcm = AESGCM(key)
        ciphertext = aesgcm.encrypt(nonce, value.encode(), field_name.encode())
 
        # Encode for storage
        return base64.b64encode(json.dumps({
            'c': base64.b64encode(ciphertext).decode(),
            'n': base64.b64encode(nonce).decode(),
            'k': key_id,
            'v': self.current_key_version
        }).encode()).decode()
 
    def decrypt_field(self, encrypted_value: str, field_name: str) -> str:
        """Decrypt individual field from database."""
        data = json.loads(base64.b64decode(encrypted_value))
 
        key = self.key_manager.get_key(data['k'], data['v'])
        nonce = base64.b64decode(data['n'])
        ciphertext = base64.b64decode(data['c'])
 
        aesgcm = AESGCM(key)
        plaintext = aesgcm.decrypt(nonce, ciphertext, field_name.encode())
 
        return plaintext.decode()
 
class KeyManager:
    """Manage encryption keys for HIPAA compliance."""
 
    def __init__(self, master_key: bytes):
        self.master_key = master_key
        self.keys: Dict[str, Dict] = {}
        self.current_key_id: Optional[str] = None
        self._initialize_key()
 
    def _initialize_key(self):
        """Initialize or rotate encryption key."""
        key_id = self._generate_key_id()
        key = self._derive_key(key_id, 1)
 
        self.keys[key_id] = {
            1: key,
            'created_at': datetime.utcnow(),
            'rotated_at': None
        }
        self.current_key_id = key_id
 
    def _generate_key_id(self) -> str:
        """Generate unique key identifier."""
        return hashlib.sha256(os.urandom(32)).hexdigest()[:16]
 
    def _derive_key(self, key_id: str, version: int) -> bytes:
        """Derive encryption key from master key."""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=f"{key_id}:{version}".encode(),
            iterations=100000,
            backend=default_backend()
        )
        return kdf.derive(self.master_key)
 
    def get_current_key(self) -> tuple[bytes, str]:
        """Get current encryption key."""
        key_data = self.keys[self.current_key_id]
        current_version = max(key_data.keys() - {'created_at', 'rotated_at'})
        return key_data[current_version], self.current_key_id
 
    def get_key(self, key_id: str, version: int) -> bytes:
        """Get specific key version for decryption."""
        if key_id not in self.keys:
            raise KeyError(f"Key not found: {key_id}")
 
        if version not in self.keys[key_id]:
            raise KeyError(f"Key version not found: {version}")
 
        return self.keys[key_id][version]
 
    def rotate_key(self):
        """Rotate encryption key."""
        key_data = self.keys[self.current_key_id]
        current_version = max(key_data.keys() - {'created_at', 'rotated_at'})
        new_version = current_version + 1
 
        new_key = self._derive_key(self.current_key_id, new_version)
        self.keys[self.current_key_id][new_version] = new_key
        self.keys[self.current_key_id]['rotated_at'] = datetime.utcnow()
 
        return new_version

Breach Notification System

Implement breach detection and notification:

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Optional, Dict
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
 
class BreachSeverity(Enum):
    LOW = "low"           # < 500 individuals
    MEDIUM = "medium"     # 500+ individuals
    HIGH = "high"         # 500+ with sensitive data
 
@dataclass
class BreachIncident:
    incident_id: str
    discovered_date: datetime
    breach_date: Optional[datetime]
    description: str
    affected_individuals: int
    phi_types_affected: List[str]
    severity: BreachSeverity
    status: str  # investigating, confirmed, contained, closed
    risk_assessment: Dict
    notifications_sent: List[Dict]
 
class HIPAABreachNotification:
    def __init__(self, config: Dict):
        self.config = config
        self.incidents: List[BreachIncident] = []
        self.notification_deadline_days = 60  # HIPAA requirement
 
    def report_incident(
        self,
        description: str,
        affected_count: int,
        phi_types: List[str],
        breach_date: Optional[datetime] = None
    ) -> BreachIncident:
        """Report a potential breach incident."""
        incident = BreachIncident(
            incident_id=self._generate_incident_id(),
            discovered_date=datetime.utcnow(),
            breach_date=breach_date,
            description=description,
            affected_individuals=affected_count,
            phi_types_affected=phi_types,
            severity=self._assess_severity(affected_count, phi_types),
            status='investigating',
            risk_assessment={},
            notifications_sent=[]
        )
 
        self.incidents.append(incident)
        return incident
 
    def _generate_incident_id(self) -> str:
        """Generate unique incident ID."""
        return f"BREACH-{datetime.utcnow().strftime('%Y%m%d')}-{len(self.incidents)+1:04d}"
 
    def _assess_severity(
        self,
        affected_count: int,
        phi_types: List[str]
    ) -> BreachSeverity:
        """Assess breach severity."""
        sensitive_types = {'ssn', 'financial', 'mental_health', 'substance_abuse', 'hiv_status'}
 
        has_sensitive = any(t in sensitive_types for t in phi_types)
 
        if affected_count >= 500 and has_sensitive:
            return BreachSeverity.HIGH
        elif affected_count >= 500:
            return BreachSeverity.MEDIUM
        else:
            return BreachSeverity.LOW
 
    def perform_risk_assessment(self, incident_id: str) -> Dict:
        """Perform 4-factor risk assessment per HIPAA guidance."""
        incident = self._get_incident(incident_id)
 
        # Factor 1: Nature and extent of PHI
        phi_sensitivity_score = self._score_phi_sensitivity(incident.phi_types_affected)
 
        # Factor 2: Unauthorized person who used/accessed PHI
        # This would be determined during investigation
        unauthorized_access_risk = 0.5  # Placeholder
 
        # Factor 3: Whether PHI was actually acquired or viewed
        actual_acquisition_risk = 0.5  # Placeholder
 
        # Factor 4: Extent to which risk has been mitigated
        mitigation_score = 0.5  # Placeholder
 
        overall_risk = (
            phi_sensitivity_score * 0.3 +
            unauthorized_access_risk * 0.3 +
            actual_acquisition_risk * 0.25 +
            (1 - mitigation_score) * 0.15
        )
 
        assessment = {
            'phi_sensitivity': phi_sensitivity_score,
            'unauthorized_access_risk': unauthorized_access_risk,
            'actual_acquisition_risk': actual_acquisition_risk,
            'mitigation_effectiveness': mitigation_score,
            'overall_risk_score': overall_risk,
            'breach_confirmed': overall_risk > 0.5,
            'notification_required': overall_risk > 0.5,
            'assessed_at': datetime.utcnow().isoformat()
        }
 
        incident.risk_assessment = assessment
        return assessment
 
    def _score_phi_sensitivity(self, phi_types: List[str]) -> float:
        """Score PHI sensitivity."""
        sensitivity_weights = {
            'ssn': 1.0,
            'financial': 0.9,
            'mental_health': 0.95,
            'substance_abuse': 0.95,
            'hiv_status': 0.95,
            'diagnosis': 0.7,
            'medications': 0.6,
            'demographics': 0.4,
            'contact_info': 0.3
        }
 
        if not phi_types:
            return 0.3
 
        scores = [sensitivity_weights.get(t, 0.5) for t in phi_types]
        return max(scores)
 
    def send_notifications(self, incident_id: str) -> Dict:
        """Send required breach notifications."""
        incident = self._get_incident(incident_id)
 
        if not incident.risk_assessment.get('notification_required'):
            return {'status': 'not_required', 'reason': 'Risk assessment below threshold'}
 
        notifications = {
            'individuals': None,
            'hhs': None,
            'media': None
        }
 
        # Individual notifications (required within 60 days)
        if incident.affected_individuals > 0:
            notifications['individuals'] = self._notify_individuals(incident)
 
        # HHS notification
        if incident.affected_individuals >= 500:
            # Notify HHS within 60 days
            notifications['hhs'] = self._notify_hhs(incident)
 
            # Media notification required for 500+
            notifications['media'] = self._notify_media(incident)
        else:
            # Log for annual report to HHS (breaches < 500)
            notifications['hhs'] = {'status': 'logged_for_annual_report'}
 
        incident.notifications_sent.append({
            'timestamp': datetime.utcnow().isoformat(),
            'notifications': notifications
        })
 
        return notifications
 
    def _notify_individuals(self, incident: BreachIncident) -> Dict:
        """Send individual breach notifications."""
        notification_content = f"""
NOTICE OF DATA BREACH
 
Dear Patient,
 
We are writing to inform you of a security incident that may have affected your personal health information.
 
WHAT HAPPENED:
{incident.description}
 
DATE OF BREACH:
{incident.breach_date or 'Under investigation'}
 
INFORMATION INVOLVED:
The following types of information may have been affected:
{', '.join(incident.phi_types_affected)}
 
WHAT WE ARE DOING:
We have taken immediate steps to contain this incident and are working with security experts to investigate and prevent future occurrences.
 
WHAT YOU CAN DO:
- Monitor your credit reports and financial statements
- Consider placing a fraud alert on your credit file
- Review any explanation of benefits statements from your health insurer
 
CONTACT INFORMATION:
If you have questions, please contact our Privacy Officer at {self.config['privacy_officer_contact']}.
 
We sincerely apologize for this incident and any concern it may cause.
 
Sincerely,
{self.config['organization_name']}
"""
 
        # In production, send actual notifications
        return {
            'status': 'sent',
            'method': 'first_class_mail',  # HIPAA requirement
            'count': incident.affected_individuals,
            'deadline': (incident.discovered_date + timedelta(days=60)).isoformat()
        }
 
    def _notify_hhs(self, incident: BreachIncident) -> Dict:
        """Submit breach notification to HHS."""
        # In production, submit to HHS breach portal
        return {
            'status': 'submitted',
            'submission_date': datetime.utcnow().isoformat(),
            'portal': 'https://ocrportal.hhs.gov/ocr/breach/wizard_breach.jsf'
        }
 
    def _notify_media(self, incident: BreachIncident) -> Dict:
        """Notify prominent media outlets for large breaches."""
        return {
            'status': 'distributed',
            'press_release_date': datetime.utcnow().isoformat(),
            'outlets_contacted': ['local_news', 'healthcare_publications']
        }
 
    def _get_incident(self, incident_id: str) -> BreachIncident:
        """Get incident by ID."""
        for incident in self.incidents:
            if incident.incident_id == incident_id:
                return incident
        raise ValueError(f"Incident not found: {incident_id}")
 
    def get_notification_status(self) -> Dict:
        """Get status of all pending notifications."""
        pending = []
        deadline_approaching = []
 
        for incident in self.incidents:
            if incident.status not in ['closed']:
                deadline = incident.discovered_date + timedelta(days=self.notification_deadline_days)
                days_remaining = (deadline - datetime.utcnow()).days
 
                if not incident.notifications_sent:
                    pending.append({
                        'incident_id': incident.incident_id,
                        'deadline': deadline.isoformat(),
                        'days_remaining': days_remaining
                    })
 
                    if days_remaining <= 14:
                        deadline_approaching.append(incident.incident_id)
 
        return {
            'pending_notifications': pending,
            'deadline_approaching': deadline_approaching,
            'total_incidents': len(self.incidents)
        }

Conclusion

HIPAA compliance requires a comprehensive approach combining PHI classification, access controls, audit logging, encryption, and breach notification capabilities. Implement these technical safeguards as part of your overall security program, and ensure regular risk assessments and staff training complement your technical controls. Remember that HIPAA compliance is ongoing - continuously monitor, audit, and improve your security posture to protect patient health information effectively.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.