HIPAA compliance requires comprehensive technical safeguards to protect Protected Health Information (PHI). This guide provides practical implementations for the Security Rule requirements in healthcare applications.
PHI Data Classification System
Implement systematic PHI identification and classification:
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional, List, Dict, Set
import re
import hashlib
class PHIType(Enum):
"""18 HIPAA PHI identifiers."""
NAME = "name"
ADDRESS = "address"
DATES = "dates" # Except year
PHONE = "phone"
FAX = "fax"
EMAIL = "email"
SSN = "ssn"
MRN = "medical_record_number"
HEALTH_PLAN_ID = "health_plan_id"
ACCOUNT_NUMBER = "account_number"
CERTIFICATE_LICENSE = "certificate_license"
VEHICLE_ID = "vehicle_id"
DEVICE_ID = "device_id"
URL = "url"
IP_ADDRESS = "ip_address"
BIOMETRIC = "biometric"
PHOTO = "photo"
OTHER_UNIQUE = "other_unique_identifier"
class SensitivityLevel(Enum):
HIGH = "high" # Direct identifiers
MEDIUM = "medium" # Quasi-identifiers
LOW = "low" # Non-identifying health data
@dataclass
class PHIElement:
phi_type: PHIType
value: str
sensitivity: SensitivityLevel
location: str # Where in the data
context: Optional[str] = None
@dataclass
class DataClassificationResult:
contains_phi: bool
phi_elements: List[PHIElement]
sensitivity_level: SensitivityLevel
recommended_actions: List[str]
timestamp: datetime = field(default_factory=datetime.utcnow)
class PHIClassifier:
def __init__(self):
self.patterns = self._compile_patterns()
self.high_sensitivity_types = {
PHIType.SSN, PHIType.MRN, PHIType.HEALTH_PLAN_ID,
PHIType.BIOMETRIC, PHIType.PHOTO
}
self.medium_sensitivity_types = {
PHIType.NAME, PHIType.ADDRESS, PHIType.PHONE,
PHIType.EMAIL, PHIType.DATES
}
def _compile_patterns(self) -> Dict[PHIType, List[re.Pattern]]:
"""Compile regex patterns for PHI detection."""
return {
PHIType.SSN: [
re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
re.compile(r'\b\d{9}\b') # No dashes
],
PHIType.PHONE: [
re.compile(r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b'),
re.compile(r'\(\d{3}\)\s*\d{3}[-.\s]?\d{4}')
],
PHIType.EMAIL: [
re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
],
PHIType.MRN: [
re.compile(r'\bMRN[:\s#]*\d{6,10}\b', re.IGNORECASE),
re.compile(r'\bMedical Record[:\s#]*\d{6,10}\b', re.IGNORECASE)
],
PHIType.DATES: [
re.compile(r'\b\d{1,2}/\d{1,2}/\d{2,4}\b'),
re.compile(r'\b\d{4}-\d{2}-\d{2}\b'),
re.compile(r'\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{4}\b', re.IGNORECASE)
],
PHIType.IP_ADDRESS: [
re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')
],
PHIType.HEALTH_PLAN_ID: [
re.compile(r'\b[A-Z]{3}\d{9}\b'), # Common format
re.compile(r'\bMember\s*ID[:\s#]*[A-Z0-9]{8,12}\b', re.IGNORECASE)
],
PHIType.ADDRESS: [
re.compile(r'\b\d+\s+[A-Za-z]+\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr)\b', re.IGNORECASE)
]
}
def classify(self, data: str, context: str = "unknown") -> DataClassificationResult:
"""Classify data for PHI content."""
phi_elements = []
# Pattern-based detection
for phi_type, patterns in self.patterns.items():
for pattern in patterns:
for match in pattern.finditer(data):
sensitivity = self._get_sensitivity(phi_type)
phi_elements.append(PHIElement(
phi_type=phi_type,
value=match.group(),
sensitivity=sensitivity,
location=f"position {match.start()}-{match.end()}",
context=context
))
# Determine overall sensitivity
if not phi_elements:
overall_sensitivity = SensitivityLevel.LOW
elif any(e.sensitivity == SensitivityLevel.HIGH for e in phi_elements):
overall_sensitivity = SensitivityLevel.HIGH
elif any(e.sensitivity == SensitivityLevel.MEDIUM for e in phi_elements):
overall_sensitivity = SensitivityLevel.MEDIUM
else:
overall_sensitivity = SensitivityLevel.LOW
# Generate recommendations
recommendations = self._generate_recommendations(phi_elements, overall_sensitivity)
return DataClassificationResult(
contains_phi=len(phi_elements) > 0,
phi_elements=phi_elements,
sensitivity_level=overall_sensitivity,
recommended_actions=recommendations
)
def _get_sensitivity(self, phi_type: PHIType) -> SensitivityLevel:
"""Determine sensitivity level for PHI type."""
if phi_type in self.high_sensitivity_types:
return SensitivityLevel.HIGH
elif phi_type in self.medium_sensitivity_types:
return SensitivityLevel.MEDIUM
return SensitivityLevel.LOW
def _generate_recommendations(
self,
elements: List[PHIElement],
sensitivity: SensitivityLevel
) -> List[str]:
"""Generate handling recommendations."""
recommendations = []
if sensitivity == SensitivityLevel.HIGH:
recommendations.extend([
"Encrypt data at rest and in transit",
"Implement strict access controls",
"Enable comprehensive audit logging",
"Consider de-identification before storage"
])
if any(e.phi_type == PHIType.SSN for e in elements):
recommendations.append("SSN detected - store only last 4 digits if possible")
if any(e.phi_type == PHIType.DATES for e in elements):
recommendations.append("Consider generalizing dates to year only for de-identification")
if any(e.phi_type == PHIType.ADDRESS for e in elements):
recommendations.append("Consider using ZIP code only (first 3 digits for populations <20,000)")
return recommendations
def de_identify(self, data: str, method: str = "safe_harbor") -> Dict:
"""De-identify data using specified method."""
if method == "safe_harbor":
return self._safe_harbor_deidentification(data)
elif method == "expert_determination":
return self._expert_determination_placeholder(data)
else:
raise ValueError(f"Unknown de-identification method: {method}")
def _safe_harbor_deidentification(self, data: str) -> Dict:
"""Apply Safe Harbor de-identification (remove all 18 identifiers)."""
result = data
removed_elements = []
for phi_type, patterns in self.patterns.items():
for pattern in patterns:
matches = pattern.findall(result)
for match in matches:
removed_elements.append({
'type': phi_type.value,
'original': match,
'replacement': f"[{phi_type.value.upper()}_REMOVED]"
})
result = pattern.sub(f"[{phi_type.value.upper()}_REMOVED]", result)
return {
'original_length': len(data),
'deidentified_data': result,
'deidentified_length': len(result),
'elements_removed': len(removed_elements),
'removal_details': removed_elements,
'method': 'safe_harbor',
'hipaa_compliant': True
}
def _expert_determination_placeholder(self, data: str) -> Dict:
"""Placeholder for expert determination method."""
# In production, this would involve statistical analysis
# to determine re-identification risk
return {
'message': 'Expert determination requires statistical analysis',
'recommendation': 'Consult with qualified statistical expert'
}Access Control Implementation
Implement HIPAA-compliant access controls:
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Set, Dict, List
import hashlib
import jwt
class AccessLevel(Enum):
NONE = 0
READ = 1
WRITE = 2
ADMIN = 3
class UserRole(Enum):
PATIENT = "patient"
NURSE = "nurse"
PHYSICIAN = "physician"
ADMIN = "admin"
BILLING = "billing"
IT_SUPPORT = "it_support"
AUDITOR = "auditor"
@dataclass
class User:
user_id: str
username: str
role: UserRole
department: str
active: bool
mfa_enabled: bool
last_login: Optional[datetime]
permissions: Set[str]
access_restrictions: Dict[str, any] = None
@dataclass
class AccessDecision:
allowed: bool
user_id: str
resource: str
action: str
timestamp: datetime
reason: str
requires_break_glass: bool = False
audit_id: str = ""
class HIPAAAccessControl:
def __init__(self):
self.users: Dict[str, User] = {}
self.role_permissions = self._define_role_permissions()
self.break_glass_log: List[Dict] = []
self.session_timeout_minutes = 15 # HIPAA requirement
def _define_role_permissions(self) -> Dict[UserRole, Dict]:
"""Define role-based permissions following minimum necessary principle."""
return {
UserRole.PATIENT: {
'permissions': {'read_own_records', 'request_amendment'},
'phi_access': {'own_only': True},
'restrictions': {'department': None}
},
UserRole.NURSE: {
'permissions': {
'read_patient_records', 'write_vitals', 'write_notes',
'view_medications', 'administer_medications'
},
'phi_access': {'assigned_patients': True, 'department_patients': True},
'restrictions': {'department': 'assigned'}
},
UserRole.PHYSICIAN: {
'permissions': {
'read_patient_records', 'write_diagnosis', 'write_orders',
'prescribe_medications', 'view_all_records', 'write_notes'
},
'phi_access': {'all_patients': True},
'restrictions': None
},
UserRole.BILLING: {
'permissions': {
'read_billing_info', 'write_billing', 'read_insurance',
'read_demographics'
},
'phi_access': {'billing_relevant': True},
'restrictions': {'exclude_clinical_notes': True}
},
UserRole.ADMIN: {
'permissions': {'all'},
'phi_access': {'all': True},
'restrictions': None
},
UserRole.IT_SUPPORT: {
'permissions': {
'system_administration', 'user_management',
'audit_log_access'
},
'phi_access': {'none': True}, # IT shouldn't access PHI
'restrictions': {'no_phi_access': True}
},
UserRole.AUDITOR: {
'permissions': {
'audit_log_access', 'compliance_reports',
'access_reviews'
},
'phi_access': {'audit_only': True},
'restrictions': {'read_only': True}
}
}
def check_access(
self,
user_id: str,
resource: str,
action: str,
patient_id: Optional[str] = None,
context: Dict = None
) -> AccessDecision:
"""Check if user has access to resource."""
audit_id = hashlib.md5(
f"{user_id}{resource}{action}{datetime.utcnow().isoformat()}".encode()
).hexdigest()[:12]
if user_id not in self.users:
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="User not found",
audit_id=audit_id
)
user = self.users[user_id]
# Check if user is active
if not user.active:
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="User account is inactive",
audit_id=audit_id
)
# Check MFA requirement for PHI access
if self._requires_phi_access(resource) and not user.mfa_enabled:
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="MFA required for PHI access",
audit_id=audit_id
)
# Get role permissions
role_config = self.role_permissions.get(user.role)
if not role_config:
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="Role not configured",
audit_id=audit_id
)
# Check permission
required_permission = self._get_required_permission(resource, action)
has_permission = (
'all' in role_config['permissions'] or
required_permission in role_config['permissions'] or
required_permission in user.permissions
)
if not has_permission:
# Check for break-glass access
if self._is_emergency_context(context):
return AccessDecision(
allowed=True,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="Emergency break-glass access",
requires_break_glass=True,
audit_id=audit_id
)
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason=f"Missing permission: {required_permission}",
audit_id=audit_id
)
# Check patient-specific restrictions
if patient_id and not self._check_patient_access(user, patient_id, role_config):
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="Not authorized for this patient's records",
audit_id=audit_id
)
return AccessDecision(
allowed=True,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="Access granted",
audit_id=audit_id
)
def _requires_phi_access(self, resource: str) -> bool:
"""Determine if resource contains PHI."""
phi_resources = [
'patient_records', 'medical_history', 'lab_results',
'prescriptions', 'clinical_notes', 'imaging',
'demographics', 'insurance_info'
]
return any(phi in resource for phi in phi_resources)
def _get_required_permission(self, resource: str, action: str) -> str:
"""Map resource/action to required permission."""
# Simplified mapping - expand based on your system
return f"{action}_{resource}"
def _is_emergency_context(self, context: Optional[Dict]) -> bool:
"""Check if this is an emergency access request."""
if not context:
return False
return context.get('emergency', False) and context.get('justification')
def _check_patient_access(
self,
user: User,
patient_id: str,
role_config: Dict
) -> bool:
"""Check if user can access specific patient's records."""
phi_access = role_config.get('phi_access', {})
if phi_access.get('all_patients') or phi_access.get('all'):
return True
if phi_access.get('own_only') and user.user_id == patient_id:
return True
if phi_access.get('assigned_patients'):
# Check assignment - would query assignment table in production
return self._is_assigned_to_patient(user.user_id, patient_id)
if phi_access.get('department_patients'):
# Check if patient is in user's department
return self._is_patient_in_department(patient_id, user.department)
return False
def _is_assigned_to_patient(self, user_id: str, patient_id: str) -> bool:
"""Check if user is assigned to patient - placeholder."""
# In production, query assignment database
return False
def _is_patient_in_department(self, patient_id: str, department: str) -> bool:
"""Check if patient is in department - placeholder."""
# In production, query patient location/department
return False
def log_break_glass(
self,
user_id: str,
patient_id: str,
justification: str,
access_decision: AccessDecision
):
"""Log break-glass access for review."""
self.break_glass_log.append({
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'patient_id': patient_id,
'justification': justification,
'audit_id': access_decision.audit_id,
'reviewed': False,
'review_result': None
})HIPAA Audit Logging
Implement comprehensive audit logging:
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
import json
import hashlib
import uuid
class AuditEventType(Enum):
# Access events
LOGIN_SUCCESS = "login_success"
LOGIN_FAILURE = "login_failure"
LOGOUT = "logout"
SESSION_TIMEOUT = "session_timeout"
# PHI access events
PHI_VIEW = "phi_view"
PHI_CREATE = "phi_create"
PHI_UPDATE = "phi_update"
PHI_DELETE = "phi_delete"
PHI_EXPORT = "phi_export"
PHI_PRINT = "phi_print"
# Administrative events
USER_CREATE = "user_create"
USER_MODIFY = "user_modify"
USER_DELETE = "user_delete"
PERMISSION_CHANGE = "permission_change"
# Security events
MFA_ENABLED = "mfa_enabled"
MFA_DISABLED = "mfa_disabled"
PASSWORD_CHANGE = "password_change"
BREAK_GLASS_ACCESS = "break_glass_access"
# System events
SYSTEM_START = "system_start"
SYSTEM_STOP = "system_stop"
CONFIG_CHANGE = "config_change"
BACKUP_CREATED = "backup_created"
@dataclass
class AuditEvent:
event_id: str
timestamp: datetime
event_type: AuditEventType
user_id: str
patient_id: Optional[str]
resource: str
action: str
outcome: str # success, failure, error
source_ip: str
user_agent: Optional[str]
details: Dict[str, Any]
phi_accessed: bool
integrity_hash: str = ""
def __post_init__(self):
if not self.integrity_hash:
self.integrity_hash = self._compute_hash()
def _compute_hash(self) -> str:
"""Compute integrity hash for tamper detection."""
data = f"{self.event_id}{self.timestamp.isoformat()}{self.event_type.value}"
data += f"{self.user_id}{self.patient_id}{self.resource}{self.action}{self.outcome}"
return hashlib.sha256(data.encode()).hexdigest()
class HIPAAAuditLogger:
def __init__(self, storage_backend):
self.storage = storage_backend
self.retention_years = 6 # HIPAA minimum
self.previous_hash: Optional[str] = None
def log_event(
self,
event_type: AuditEventType,
user_id: str,
resource: str,
action: str,
outcome: str,
source_ip: str,
patient_id: Optional[str] = None,
user_agent: Optional[str] = None,
details: Dict[str, Any] = None,
phi_accessed: bool = False
) -> AuditEvent:
"""Log an audit event."""
event = AuditEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.utcnow(),
event_type=event_type,
user_id=user_id,
patient_id=patient_id,
resource=resource,
action=action,
outcome=outcome,
source_ip=source_ip,
user_agent=user_agent,
details=details or {},
phi_accessed=phi_accessed
)
# Add chain hash for tamper evidence
if self.previous_hash:
event.details['previous_hash'] = self.previous_hash
self.previous_hash = event.integrity_hash
# Store event
self.storage.store(event)
return event
def log_phi_access(
self,
user_id: str,
patient_id: str,
record_type: str,
action: str,
source_ip: str,
fields_accessed: List[str] = None,
purpose: str = None
) -> AuditEvent:
"""Log PHI access with detailed tracking."""
event_type_map = {
'view': AuditEventType.PHI_VIEW,
'create': AuditEventType.PHI_CREATE,
'update': AuditEventType.PHI_UPDATE,
'delete': AuditEventType.PHI_DELETE,
'export': AuditEventType.PHI_EXPORT,
'print': AuditEventType.PHI_PRINT
}
return self.log_event(
event_type=event_type_map.get(action, AuditEventType.PHI_VIEW),
user_id=user_id,
patient_id=patient_id,
resource=record_type,
action=action,
outcome='success',
source_ip=source_ip,
phi_accessed=True,
details={
'fields_accessed': fields_accessed or [],
'purpose': purpose,
'record_type': record_type
}
)
def generate_access_report(
self,
patient_id: str,
start_date: datetime,
end_date: datetime
) -> Dict:
"""Generate access report for patient (required for HIPAA accounting of disclosures)."""
events = self.storage.query(
patient_id=patient_id,
start_date=start_date,
end_date=end_date,
phi_accessed=True
)
report = {
'patient_id': patient_id,
'report_period': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'generated_at': datetime.utcnow().isoformat(),
'total_access_events': len(events),
'access_by_user': {},
'access_by_type': {},
'access_timeline': []
}
for event in events:
# By user
if event.user_id not in report['access_by_user']:
report['access_by_user'][event.user_id] = {
'count': 0,
'actions': []
}
report['access_by_user'][event.user_id]['count'] += 1
report['access_by_user'][event.user_id]['actions'].append(event.action)
# By type
record_type = event.details.get('record_type', 'unknown')
if record_type not in report['access_by_type']:
report['access_by_type'][record_type] = 0
report['access_by_type'][record_type] += 1
# Timeline
report['access_timeline'].append({
'timestamp': event.timestamp.isoformat(),
'user_id': event.user_id,
'action': event.action,
'record_type': record_type
})
return report
def verify_log_integrity(self, events: List[AuditEvent]) -> Dict:
"""Verify integrity of audit log chain."""
results = {
'verified': True,
'total_events': len(events),
'integrity_failures': []
}
for i, event in enumerate(events):
# Verify individual event hash
expected_hash = event._compute_hash()
if expected_hash != event.integrity_hash:
results['verified'] = False
results['integrity_failures'].append({
'event_id': event.event_id,
'issue': 'Hash mismatch - event may have been tampered'
})
# Verify chain
if i > 0:
previous_hash = event.details.get('previous_hash')
if previous_hash and previous_hash != events[i-1].integrity_hash:
results['verified'] = False
results['integrity_failures'].append({
'event_id': event.event_id,
'issue': 'Chain broken - events may be missing or reordered'
})
return resultsEncryption Implementation
Implement HIPAA-compliant encryption:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
import os
import base64
from dataclasses import dataclass
from typing import Optional, Dict
import json
@dataclass
class EncryptedData:
ciphertext: bytes
nonce: bytes
key_id: str
algorithm: str
version: int
class HIPAAEncryption:
"""HIPAA-compliant encryption for PHI."""
def __init__(self, key_manager):
self.key_manager = key_manager
self.algorithm = 'AES-256-GCM' # NIST recommended
self.current_key_version = 1
def encrypt_phi(
self,
data: str,
patient_id: str,
data_type: str
) -> EncryptedData:
"""Encrypt PHI data."""
# Get encryption key
key, key_id = self.key_manager.get_current_key()
# Generate unique nonce
nonce = os.urandom(12)
# Create additional authenticated data (AAD)
aad = json.dumps({
'patient_id': patient_id,
'data_type': data_type,
'key_version': self.current_key_version
}).encode()
# Encrypt
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, data.encode(), aad)
return EncryptedData(
ciphertext=ciphertext,
nonce=nonce,
key_id=key_id,
algorithm=self.algorithm,
version=self.current_key_version
)
def decrypt_phi(
self,
encrypted_data: EncryptedData,
patient_id: str,
data_type: str
) -> str:
"""Decrypt PHI data."""
# Get decryption key
key = self.key_manager.get_key(
encrypted_data.key_id,
encrypted_data.version
)
# Recreate AAD
aad = json.dumps({
'patient_id': patient_id,
'data_type': data_type,
'key_version': encrypted_data.version
}).encode()
# Decrypt
aesgcm = AESGCM(key)
plaintext = aesgcm.decrypt(
encrypted_data.nonce,
encrypted_data.ciphertext,
aad
)
return plaintext.decode()
def encrypt_field(self, value: str, field_name: str) -> str:
"""Encrypt individual field for database storage."""
key, key_id = self.key_manager.get_current_key()
nonce = os.urandom(12)
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, value.encode(), field_name.encode())
# Encode for storage
return base64.b64encode(json.dumps({
'c': base64.b64encode(ciphertext).decode(),
'n': base64.b64encode(nonce).decode(),
'k': key_id,
'v': self.current_key_version
}).encode()).decode()
def decrypt_field(self, encrypted_value: str, field_name: str) -> str:
"""Decrypt individual field from database."""
data = json.loads(base64.b64decode(encrypted_value))
key = self.key_manager.get_key(data['k'], data['v'])
nonce = base64.b64decode(data['n'])
ciphertext = base64.b64decode(data['c'])
aesgcm = AESGCM(key)
plaintext = aesgcm.decrypt(nonce, ciphertext, field_name.encode())
return plaintext.decode()
class KeyManager:
"""Manage encryption keys for HIPAA compliance."""
def __init__(self, master_key: bytes):
self.master_key = master_key
self.keys: Dict[str, Dict] = {}
self.current_key_id: Optional[str] = None
self._initialize_key()
def _initialize_key(self):
"""Initialize or rotate encryption key."""
key_id = self._generate_key_id()
key = self._derive_key(key_id, 1)
self.keys[key_id] = {
1: key,
'created_at': datetime.utcnow(),
'rotated_at': None
}
self.current_key_id = key_id
def _generate_key_id(self) -> str:
"""Generate unique key identifier."""
return hashlib.sha256(os.urandom(32)).hexdigest()[:16]
def _derive_key(self, key_id: str, version: int) -> bytes:
"""Derive encryption key from master key."""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=f"{key_id}:{version}".encode(),
iterations=100000,
backend=default_backend()
)
return kdf.derive(self.master_key)
def get_current_key(self) -> tuple[bytes, str]:
"""Get current encryption key."""
key_data = self.keys[self.current_key_id]
current_version = max(key_data.keys() - {'created_at', 'rotated_at'})
return key_data[current_version], self.current_key_id
def get_key(self, key_id: str, version: int) -> bytes:
"""Get specific key version for decryption."""
if key_id not in self.keys:
raise KeyError(f"Key not found: {key_id}")
if version not in self.keys[key_id]:
raise KeyError(f"Key version not found: {version}")
return self.keys[key_id][version]
def rotate_key(self):
"""Rotate encryption key."""
key_data = self.keys[self.current_key_id]
current_version = max(key_data.keys() - {'created_at', 'rotated_at'})
new_version = current_version + 1
new_key = self._derive_key(self.current_key_id, new_version)
self.keys[self.current_key_id][new_version] = new_key
self.keys[self.current_key_id]['rotated_at'] = datetime.utcnow()
return new_versionBreach Notification System
Implement breach detection and notification:
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Optional, Dict
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class BreachSeverity(Enum):
LOW = "low" # < 500 individuals
MEDIUM = "medium" # 500+ individuals
HIGH = "high" # 500+ with sensitive data
@dataclass
class BreachIncident:
incident_id: str
discovered_date: datetime
breach_date: Optional[datetime]
description: str
affected_individuals: int
phi_types_affected: List[str]
severity: BreachSeverity
status: str # investigating, confirmed, contained, closed
risk_assessment: Dict
notifications_sent: List[Dict]
class HIPAABreachNotification:
def __init__(self, config: Dict):
self.config = config
self.incidents: List[BreachIncident] = []
self.notification_deadline_days = 60 # HIPAA requirement
def report_incident(
self,
description: str,
affected_count: int,
phi_types: List[str],
breach_date: Optional[datetime] = None
) -> BreachIncident:
"""Report a potential breach incident."""
incident = BreachIncident(
incident_id=self._generate_incident_id(),
discovered_date=datetime.utcnow(),
breach_date=breach_date,
description=description,
affected_individuals=affected_count,
phi_types_affected=phi_types,
severity=self._assess_severity(affected_count, phi_types),
status='investigating',
risk_assessment={},
notifications_sent=[]
)
self.incidents.append(incident)
return incident
def _generate_incident_id(self) -> str:
"""Generate unique incident ID."""
return f"BREACH-{datetime.utcnow().strftime('%Y%m%d')}-{len(self.incidents)+1:04d}"
def _assess_severity(
self,
affected_count: int,
phi_types: List[str]
) -> BreachSeverity:
"""Assess breach severity."""
sensitive_types = {'ssn', 'financial', 'mental_health', 'substance_abuse', 'hiv_status'}
has_sensitive = any(t in sensitive_types for t in phi_types)
if affected_count >= 500 and has_sensitive:
return BreachSeverity.HIGH
elif affected_count >= 500:
return BreachSeverity.MEDIUM
else:
return BreachSeverity.LOW
def perform_risk_assessment(self, incident_id: str) -> Dict:
"""Perform 4-factor risk assessment per HIPAA guidance."""
incident = self._get_incident(incident_id)
# Factor 1: Nature and extent of PHI
phi_sensitivity_score = self._score_phi_sensitivity(incident.phi_types_affected)
# Factor 2: Unauthorized person who used/accessed PHI
# This would be determined during investigation
unauthorized_access_risk = 0.5 # Placeholder
# Factor 3: Whether PHI was actually acquired or viewed
actual_acquisition_risk = 0.5 # Placeholder
# Factor 4: Extent to which risk has been mitigated
mitigation_score = 0.5 # Placeholder
overall_risk = (
phi_sensitivity_score * 0.3 +
unauthorized_access_risk * 0.3 +
actual_acquisition_risk * 0.25 +
(1 - mitigation_score) * 0.15
)
assessment = {
'phi_sensitivity': phi_sensitivity_score,
'unauthorized_access_risk': unauthorized_access_risk,
'actual_acquisition_risk': actual_acquisition_risk,
'mitigation_effectiveness': mitigation_score,
'overall_risk_score': overall_risk,
'breach_confirmed': overall_risk > 0.5,
'notification_required': overall_risk > 0.5,
'assessed_at': datetime.utcnow().isoformat()
}
incident.risk_assessment = assessment
return assessment
def _score_phi_sensitivity(self, phi_types: List[str]) -> float:
"""Score PHI sensitivity."""
sensitivity_weights = {
'ssn': 1.0,
'financial': 0.9,
'mental_health': 0.95,
'substance_abuse': 0.95,
'hiv_status': 0.95,
'diagnosis': 0.7,
'medications': 0.6,
'demographics': 0.4,
'contact_info': 0.3
}
if not phi_types:
return 0.3
scores = [sensitivity_weights.get(t, 0.5) for t in phi_types]
return max(scores)
def send_notifications(self, incident_id: str) -> Dict:
"""Send required breach notifications."""
incident = self._get_incident(incident_id)
if not incident.risk_assessment.get('notification_required'):
return {'status': 'not_required', 'reason': 'Risk assessment below threshold'}
notifications = {
'individuals': None,
'hhs': None,
'media': None
}
# Individual notifications (required within 60 days)
if incident.affected_individuals > 0:
notifications['individuals'] = self._notify_individuals(incident)
# HHS notification
if incident.affected_individuals >= 500:
# Notify HHS within 60 days
notifications['hhs'] = self._notify_hhs(incident)
# Media notification required for 500+
notifications['media'] = self._notify_media(incident)
else:
# Log for annual report to HHS (breaches < 500)
notifications['hhs'] = {'status': 'logged_for_annual_report'}
incident.notifications_sent.append({
'timestamp': datetime.utcnow().isoformat(),
'notifications': notifications
})
return notifications
def _notify_individuals(self, incident: BreachIncident) -> Dict:
"""Send individual breach notifications."""
notification_content = f"""
NOTICE OF DATA BREACH
Dear Patient,
We are writing to inform you of a security incident that may have affected your personal health information.
WHAT HAPPENED:
{incident.description}
DATE OF BREACH:
{incident.breach_date or 'Under investigation'}
INFORMATION INVOLVED:
The following types of information may have been affected:
{', '.join(incident.phi_types_affected)}
WHAT WE ARE DOING:
We have taken immediate steps to contain this incident and are working with security experts to investigate and prevent future occurrences.
WHAT YOU CAN DO:
- Monitor your credit reports and financial statements
- Consider placing a fraud alert on your credit file
- Review any explanation of benefits statements from your health insurer
CONTACT INFORMATION:
If you have questions, please contact our Privacy Officer at {self.config['privacy_officer_contact']}.
We sincerely apologize for this incident and any concern it may cause.
Sincerely,
{self.config['organization_name']}
"""
# In production, send actual notifications
return {
'status': 'sent',
'method': 'first_class_mail', # HIPAA requirement
'count': incident.affected_individuals,
'deadline': (incident.discovered_date + timedelta(days=60)).isoformat()
}
def _notify_hhs(self, incident: BreachIncident) -> Dict:
"""Submit breach notification to HHS."""
# In production, submit to HHS breach portal
return {
'status': 'submitted',
'submission_date': datetime.utcnow().isoformat(),
'portal': 'https://ocrportal.hhs.gov/ocr/breach/wizard_breach.jsf'
}
def _notify_media(self, incident: BreachIncident) -> Dict:
"""Notify prominent media outlets for large breaches."""
return {
'status': 'distributed',
'press_release_date': datetime.utcnow().isoformat(),
'outlets_contacted': ['local_news', 'healthcare_publications']
}
def _get_incident(self, incident_id: str) -> BreachIncident:
"""Get incident by ID."""
for incident in self.incidents:
if incident.incident_id == incident_id:
return incident
raise ValueError(f"Incident not found: {incident_id}")
def get_notification_status(self) -> Dict:
"""Get status of all pending notifications."""
pending = []
deadline_approaching = []
for incident in self.incidents:
if incident.status not in ['closed']:
deadline = incident.discovered_date + timedelta(days=self.notification_deadline_days)
days_remaining = (deadline - datetime.utcnow()).days
if not incident.notifications_sent:
pending.append({
'incident_id': incident.incident_id,
'deadline': deadline.isoformat(),
'days_remaining': days_remaining
})
if days_remaining <= 14:
deadline_approaching.append(incident.incident_id)
return {
'pending_notifications': pending,
'deadline_approaching': deadline_approaching,
'total_incidents': len(self.incidents)
}Conclusion
HIPAA compliance requires a comprehensive approach combining PHI classification, access controls, audit logging, encryption, and breach notification capabilities. Implement these technical safeguards as part of your overall security program, and ensure regular risk assessments and staff training complement your technical controls. Remember that HIPAA compliance is ongoing - continuously monitor, audit, and improve your security posture to protect patient health information effectively.