HIPAA Compliance for Cloud Applications: Technical Implementation Guide
Building HIPAA-compliant cloud applications requires careful attention to Protected Health Information (PHI) handling across every layer of your stack. This guide provides technical implementation patterns for achieving and maintaining HIPAA compliance.
Understanding HIPAA Technical Safeguards
Administrative, Physical, and Technical Controls
# hipaa_controls_framework.yaml
hipaa_safeguards:
administrative:
- security_management_process
- assigned_security_responsibility
- workforce_security
- information_access_management
- security_awareness_training
- security_incident_procedures
- contingency_plan
- evaluation
- business_associate_contracts
physical:
- facility_access_controls
- workstation_use
- workstation_security
- device_and_media_controls
technical:
- access_control:
- unique_user_identification
- emergency_access_procedure
- automatic_logoff
- encryption_and_decryption
- audit_controls:
- audit_logging
- audit_review
- audit_reporting
- integrity:
- mechanism_to_authenticate_phi
- person_or_entity_authentication:
- authentication_methods
- transmission_security:
- integrity_controls
- encryptionPHI Data Classification and Handling
Data Classification System
# phi_classification.py
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
class PHICategory(Enum):
"""Categories of Protected Health Information."""
NAMES = "names"
GEOGRAPHIC = "geographic_data"
DATES = "dates"
PHONE_NUMBERS = "phone_numbers"
FAX_NUMBERS = "fax_numbers"
EMAIL_ADDRESSES = "email_addresses"
SSN = "social_security_numbers"
MEDICAL_RECORD_NUMBERS = "medical_record_numbers"
HEALTH_PLAN_NUMBERS = "health_plan_beneficiary_numbers"
ACCOUNT_NUMBERS = "account_numbers"
CERTIFICATE_NUMBERS = "certificate_license_numbers"
VEHICLE_IDENTIFIERS = "vehicle_identifiers"
DEVICE_IDENTIFIERS = "device_identifiers_serial_numbers"
WEB_URLS = "web_urls"
IP_ADDRESSES = "ip_addresses"
BIOMETRIC_IDENTIFIERS = "biometric_identifiers"
FULL_FACE_PHOTOS = "full_face_photos"
OTHER_UNIQUE_IDENTIFIERS = "other_unique_identifying_numbers"
class SensitivityLevel(Enum):
"""Data sensitivity levels."""
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted" # PHI falls here
@dataclass
class DataClassification:
"""Classification for a data field."""
field_name: str
phi_categories: List[PHICategory]
sensitivity_level: SensitivityLevel
encryption_required: bool
audit_required: bool
retention_days: int
disposal_method: str
class PHIClassifier:
"""Classify and detect PHI in data."""
def __init__(self):
self.classifications = self._load_classifications()
def _load_classifications(self) -> dict:
"""Load field classifications."""
return {
'patient_name': DataClassification(
field_name='patient_name',
phi_categories=[PHICategory.NAMES],
sensitivity_level=SensitivityLevel.RESTRICTED,
encryption_required=True,
audit_required=True,
retention_days=2555, # 7 years
disposal_method='crypto_shred'
),
'date_of_birth': DataClassification(
field_name='date_of_birth',
phi_categories=[PHICategory.DATES],
sensitivity_level=SensitivityLevel.RESTRICTED,
encryption_required=True,
audit_required=True,
retention_days=2555,
disposal_method='crypto_shred'
),
'ssn': DataClassification(
field_name='ssn',
phi_categories=[PHICategory.SSN],
sensitivity_level=SensitivityLevel.RESTRICTED,
encryption_required=True,
audit_required=True,
retention_days=2555,
disposal_method='crypto_shred'
),
'medical_record_number': DataClassification(
field_name='medical_record_number',
phi_categories=[PHICategory.MEDICAL_RECORD_NUMBERS],
sensitivity_level=SensitivityLevel.RESTRICTED,
encryption_required=True,
audit_required=True,
retention_days=2555,
disposal_method='crypto_shred'
),
'diagnosis_code': DataClassification(
field_name='diagnosis_code',
phi_categories=[],
sensitivity_level=SensitivityLevel.CONFIDENTIAL,
encryption_required=True,
audit_required=True,
retention_days=2555,
disposal_method='secure_delete'
)
}
def classify_record(self, record: dict) -> dict:
"""Classify all fields in a record."""
result = {
'fields': {},
'overall_sensitivity': SensitivityLevel.PUBLIC,
'contains_phi': False,
'encryption_required_fields': [],
'audit_required_fields': []
}
for field_name, value in record.items():
classification = self.classifications.get(field_name)
if classification:
result['fields'][field_name] = classification
if classification.sensitivity_level == SensitivityLevel.RESTRICTED:
result['overall_sensitivity'] = SensitivityLevel.RESTRICTED
result['contains_phi'] = True
if classification.encryption_required:
result['encryption_required_fields'].append(field_name)
if classification.audit_required:
result['audit_required_fields'].append(field_name)
return resultPHI Detection Engine
# phi_detection.py
import re
from typing import Dict, List, Tuple
class PHIDetector:
"""Detect PHI in unstructured text."""
def __init__(self):
self.patterns = self._compile_patterns()
def _compile_patterns(self) -> Dict[str, re.Pattern]:
"""Compile regex patterns for PHI detection."""
return {
'ssn': re.compile(r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b'),
'phone': re.compile(r'\b\(?(\d{3})\)?[-.\s]?(\d{3})[-.\s]?(\d{4})\b'),
'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
'mrn': re.compile(r'\b(?:MRN|Medical Record Number)[:\s]*([A-Z0-9]{6,12})\b', re.IGNORECASE),
'date_of_birth': re.compile(r'\b(?:DOB|Date of Birth)[:\s]*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\b', re.IGNORECASE),
'ip_address': re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'),
'credit_card': re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b'),
'health_plan_id': re.compile(r'\b(?:Member ID|Policy Number)[:\s]*([A-Z0-9]{8,15})\b', re.IGNORECASE)
}
def detect(self, text: str) -> List[Dict]:
"""Detect PHI in text."""
findings = []
for phi_type, pattern in self.patterns.items():
for match in pattern.finditer(text):
findings.append({
'type': phi_type,
'value': match.group(),
'start': match.start(),
'end': match.end(),
'redacted': self._redact(match.group(), phi_type)
})
return findings
def _redact(self, value: str, phi_type: str) -> str:
"""Redact PHI value."""
if phi_type == 'ssn':
return 'XXX-XX-' + value[-4:]
elif phi_type == 'phone':
return '(XXX) XXX-' + value[-4:]
elif phi_type == 'email':
parts = value.split('@')
return parts[0][:2] + '***@' + parts[1]
elif phi_type == 'credit_card':
return 'XXXX-XXXX-XXXX-' + value[-4:]
else:
return '[REDACTED]'
def redact_text(self, text: str) -> str:
"""Return text with all PHI redacted."""
findings = self.detect(text)
# Sort by position in reverse to replace from end
findings.sort(key=lambda x: x['start'], reverse=True)
redacted_text = text
for finding in findings:
redacted_text = (
redacted_text[:finding['start']] +
finding['redacted'] +
redacted_text[finding['end']:]
)
return redacted_textAccess Control Implementation
Role-Based Access Control for PHI
# hipaa_access_control.py
from enum import Enum
from typing import List, Optional, Set
from dataclasses import dataclass
from datetime import datetime
class Permission(Enum):
"""PHI access permissions."""
VIEW_DEMOGRAPHICS = "view_demographics"
VIEW_CLINICAL = "view_clinical"
VIEW_BILLING = "view_billing"
EDIT_DEMOGRAPHICS = "edit_demographics"
EDIT_CLINICAL = "edit_clinical"
EDIT_BILLING = "edit_billing"
EXPORT_DATA = "export_data"
DELETE_RECORDS = "delete_records"
ADMIN_ACCESS = "admin_access"
class Role(Enum):
"""Healthcare system roles."""
PATIENT = "patient"
NURSE = "nurse"
PHYSICIAN = "physician"
SPECIALIST = "specialist"
BILLING_STAFF = "billing_staff"
ADMIN = "admin"
AUDITOR = "auditor"
EMERGENCY = "emergency"
@dataclass
class AccessPolicy:
"""Access policy for a role."""
role: Role
permissions: Set[Permission]
phi_access_level: str
requires_mfa: bool
session_timeout_minutes: int
break_glass_eligible: bool
class HIPAAAccessController:
"""HIPAA-compliant access control."""
def __init__(self):
self.policies = self._define_policies()
self.break_glass_active = {}
def _define_policies(self) -> dict:
"""Define role-based access policies."""
return {
Role.PATIENT: AccessPolicy(
role=Role.PATIENT,
permissions={
Permission.VIEW_DEMOGRAPHICS,
Permission.VIEW_CLINICAL,
Permission.VIEW_BILLING
},
phi_access_level='own_records_only',
requires_mfa=True,
session_timeout_minutes=30,
break_glass_eligible=False
),
Role.NURSE: AccessPolicy(
role=Role.NURSE,
permissions={
Permission.VIEW_DEMOGRAPHICS,
Permission.VIEW_CLINICAL,
Permission.EDIT_CLINICAL
},
phi_access_level='assigned_patients',
requires_mfa=True,
session_timeout_minutes=15,
break_glass_eligible=True
),
Role.PHYSICIAN: AccessPolicy(
role=Role.PHYSICIAN,
permissions={
Permission.VIEW_DEMOGRAPHICS,
Permission.VIEW_CLINICAL,
Permission.EDIT_CLINICAL,
Permission.VIEW_BILLING
},
phi_access_level='assigned_patients',
requires_mfa=True,
session_timeout_minutes=20,
break_glass_eligible=True
),
Role.BILLING_STAFF: AccessPolicy(
role=Role.BILLING_STAFF,
permissions={
Permission.VIEW_DEMOGRAPHICS,
Permission.VIEW_BILLING,
Permission.EDIT_BILLING
},
phi_access_level='billing_minimum_necessary',
requires_mfa=True,
session_timeout_minutes=15,
break_glass_eligible=False
),
Role.ADMIN: AccessPolicy(
role=Role.ADMIN,
permissions={
Permission.VIEW_DEMOGRAPHICS,
Permission.VIEW_CLINICAL,
Permission.VIEW_BILLING,
Permission.EDIT_DEMOGRAPHICS,
Permission.ADMIN_ACCESS
},
phi_access_level='all_patients',
requires_mfa=True,
session_timeout_minutes=10,
break_glass_eligible=False
),
Role.EMERGENCY: AccessPolicy(
role=Role.EMERGENCY,
permissions={
Permission.VIEW_DEMOGRAPHICS,
Permission.VIEW_CLINICAL,
Permission.EDIT_CLINICAL
},
phi_access_level='all_patients',
requires_mfa=False,
session_timeout_minutes=60,
break_glass_eligible=False
)
}
def check_access(
self,
user_id: str,
user_role: Role,
permission: Permission,
patient_id: str,
context: dict
) -> dict:
"""Check if user has access to perform action."""
policy = self.policies.get(user_role)
if not policy:
return self._deny("Invalid role")
# Check permission
if permission not in policy.permissions:
return self._deny(f"Role {user_role.value} does not have {permission.value}")
# Check PHI access level
access_check = self._check_phi_access_level(
user_id, user_role, patient_id, policy.phi_access_level, context
)
if not access_check['allowed']:
return access_check
# Check for break glass
if self._is_break_glass_required(context):
return self._handle_break_glass(user_id, user_role, patient_id, policy)
return self._allow(user_id, permission, patient_id)
def _check_phi_access_level(
self,
user_id: str,
role: Role,
patient_id: str,
access_level: str,
context: dict
) -> dict:
"""Check PHI access level restrictions."""
if access_level == 'own_records_only':
if user_id != patient_id:
return self._deny("Can only access own records")
elif access_level == 'assigned_patients':
assigned = context.get('assigned_patients', [])
if patient_id not in assigned:
return self._deny("Patient not in assigned list")
elif access_level == 'billing_minimum_necessary':
# Billing staff only sees billing-related PHI
if context.get('requested_data_type') not in ['billing', 'demographics']:
return self._deny("Access limited to billing data")
return {'allowed': True}
def _is_break_glass_required(self, context: dict) -> bool:
"""Check if break glass access is needed."""
return context.get('emergency', False) and not context.get('assigned_patients')
def _handle_break_glass(
self,
user_id: str,
role: Role,
patient_id: str,
policy: AccessPolicy
) -> dict:
"""Handle emergency break glass access."""
if not policy.break_glass_eligible:
return self._deny("Role not eligible for break glass access")
# Log break glass access
self.break_glass_active[f"{user_id}:{patient_id}"] = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'patient_id': patient_id,
'requires_review': True
}
return {
'allowed': True,
'break_glass': True,
'audit_level': 'critical',
'message': 'Break glass access granted - requires review'
}
def _allow(self, user_id: str, permission: Permission, patient_id: str) -> dict:
return {
'allowed': True,
'user_id': user_id,
'permission': permission.value,
'patient_id': patient_id,
'timestamp': datetime.utcnow().isoformat()
}
def _deny(self, reason: str) -> dict:
return {
'allowed': False,
'reason': reason,
'timestamp': datetime.utcnow().isoformat()
}Audit Logging System
HIPAA-Compliant Audit Logger
# hipaa_audit_logger.py
import json
import hashlib
from datetime import datetime
from typing import Dict, Optional, List
from enum import Enum
class AuditEventType(Enum):
"""HIPAA audit event types."""
LOGIN = "login"
LOGOUT = "logout"
LOGIN_FAILED = "login_failed"
PHI_ACCESS = "phi_access"
PHI_CREATE = "phi_create"
PHI_MODIFY = "phi_modify"
PHI_DELETE = "phi_delete"
PHI_EXPORT = "phi_export"
PHI_PRINT = "phi_print"
PHI_FAX = "phi_fax"
BREAK_GLASS = "break_glass"
PERMISSION_CHANGE = "permission_change"
SYSTEM_CONFIG = "system_config"
SECURITY_INCIDENT = "security_incident"
class HIPAAAuditLogger:
"""HIPAA-compliant audit logging system."""
def __init__(self, storage_backend):
self.storage = storage_backend
self.log_chain = []
def log_phi_access(
self,
user_id: str,
patient_id: str,
action: str,
phi_fields: List[str],
reason: str,
context: Optional[Dict] = None
) -> Dict:
"""Log PHI access event."""
event = self._create_audit_event(
event_type=AuditEventType.PHI_ACCESS,
user_id=user_id,
patient_id=patient_id,
action=action,
details={
'phi_fields_accessed': phi_fields,
'reason_for_access': reason,
'minimum_necessary_applied': True
},
context=context
)
return self._store_event(event)
def log_phi_modification(
self,
user_id: str,
patient_id: str,
action: str,
field_name: str,
old_value_hash: str,
new_value_hash: str,
context: Optional[Dict] = None
) -> Dict:
"""Log PHI modification event."""
event = self._create_audit_event(
event_type=AuditEventType.PHI_MODIFY,
user_id=user_id,
patient_id=patient_id,
action=action,
details={
'field_modified': field_name,
'old_value_hash': old_value_hash,
'new_value_hash': new_value_hash
},
context=context
)
return self._store_event(event)
def log_break_glass(
self,
user_id: str,
patient_id: str,
justification: str,
context: Optional[Dict] = None
) -> Dict:
"""Log break glass emergency access."""
event = self._create_audit_event(
event_type=AuditEventType.BREAK_GLASS,
user_id=user_id,
patient_id=patient_id,
action='emergency_access',
details={
'justification': justification,
'requires_review': True,
'review_deadline_hours': 24
},
context=context,
severity='critical'
)
return self._store_event(event)
def _create_audit_event(
self,
event_type: AuditEventType,
user_id: str,
patient_id: str,
action: str,
details: Dict,
context: Optional[Dict] = None,
severity: str = 'info'
) -> Dict:
"""Create audit event record."""
previous_hash = self.log_chain[-1] if self.log_chain else "genesis"
event = {
'event_id': self._generate_event_id(),
'timestamp': datetime.utcnow().isoformat() + 'Z',
'event_type': event_type.value,
'severity': severity,
'actor': {
'user_id': user_id,
'ip_address': context.get('ip_address') if context else None,
'user_agent': context.get('user_agent') if context else None,
'session_id': context.get('session_id') if context else None,
'workstation_id': context.get('workstation_id') if context else None
},
'patient': {
'patient_id': patient_id,
'record_type': context.get('record_type') if context else None
},
'action': action,
'details': details,
'integrity': {
'previous_hash': previous_hash,
'event_hash': None
}
}
# Compute integrity hash
event['integrity']['event_hash'] = self._compute_hash(event)
self.log_chain.append(event['integrity']['event_hash'])
return event
def _generate_event_id(self) -> str:
"""Generate unique event ID."""
import uuid
return str(uuid.uuid4())
def _compute_hash(self, event: Dict) -> str:
"""Compute hash for integrity verification."""
event_copy = event.copy()
event_copy['integrity']['event_hash'] = ""
serialized = json.dumps(event_copy, sort_keys=True)
return hashlib.sha256(serialized.encode()).hexdigest()
def _store_event(self, event: Dict) -> Dict:
"""Store audit event."""
self.storage.store(event)
return event
def query_patient_access_history(
self,
patient_id: str,
start_date: datetime,
end_date: datetime
) -> List[Dict]:
"""Query access history for a patient (for accounting of disclosures)."""
return self.storage.query({
'patient.patient_id': patient_id,
'timestamp': {
'$gte': start_date.isoformat(),
'$lte': end_date.isoformat()
},
'event_type': {
'$in': [
AuditEventType.PHI_ACCESS.value,
AuditEventType.PHI_EXPORT.value,
AuditEventType.BREAK_GLASS.value
]
}
})
def verify_log_integrity(self) -> bool:
"""Verify integrity of audit log chain."""
events = self.storage.retrieve_all()
for i, event in enumerate(events):
expected_previous = events[i-1]['integrity']['event_hash'] if i > 0 else "genesis"
if event['integrity']['previous_hash'] != expected_previous:
return False
computed_hash = self._compute_hash(event)
if computed_hash != event['integrity']['event_hash']:
return False
return TrueEncryption Implementation
Field-Level Encryption for PHI
# phi_encryption.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import base64
import os
from typing import Dict, Any, Optional
import json
class PHIEncryption:
"""Field-level encryption for PHI data."""
def __init__(self, master_key: bytes, key_id: str):
self.master_key = master_key
self.key_id = key_id
self.field_keys = {}
def derive_field_key(self, field_name: str) -> bytes:
"""Derive encryption key for specific field."""
if field_name in self.field_keys:
return self.field_keys[field_name]
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=field_name.encode(),
iterations=100000,
)
key = kdf.derive(self.master_key)
self.field_keys[field_name] = key
return key
def encrypt_field(self, field_name: str, value: Any) -> Dict:
"""Encrypt a single field."""
key = self.derive_field_key(field_name)
nonce = os.urandom(12)
# Serialize value
if isinstance(value, str):
plaintext = value.encode()
else:
plaintext = json.dumps(value).encode()
# Encrypt with AES-GCM
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, plaintext, field_name.encode())
return {
'encrypted': True,
'algorithm': 'AES-256-GCM',
'key_id': self.key_id,
'nonce': base64.b64encode(nonce).decode(),
'ciphertext': base64.b64encode(ciphertext).decode()
}
def decrypt_field(self, field_name: str, encrypted_data: Dict) -> Any:
"""Decrypt a single field."""
key = self.derive_field_key(field_name)
nonce = base64.b64decode(encrypted_data['nonce'])
ciphertext = base64.b64decode(encrypted_data['ciphertext'])
aesgcm = AESGCM(key)
plaintext = aesgcm.decrypt(nonce, ciphertext, field_name.encode())
# Try to deserialize as JSON
try:
return json.loads(plaintext.decode())
except json.JSONDecodeError:
return plaintext.decode()
def encrypt_record(self, record: Dict, phi_fields: list) -> Dict:
"""Encrypt PHI fields in a record."""
encrypted_record = record.copy()
for field in phi_fields:
if field in encrypted_record and encrypted_record[field] is not None:
encrypted_record[field] = self.encrypt_field(
field,
encrypted_record[field]
)
encrypted_record['_encryption_metadata'] = {
'version': '1.0',
'key_id': self.key_id,
'encrypted_fields': phi_fields,
'encrypted_at': datetime.utcnow().isoformat()
}
return encrypted_record
def decrypt_record(self, encrypted_record: Dict) -> Dict:
"""Decrypt PHI fields in a record."""
metadata = encrypted_record.get('_encryption_metadata', {})
phi_fields = metadata.get('encrypted_fields', [])
decrypted_record = encrypted_record.copy()
del decrypted_record['_encryption_metadata']
for field in phi_fields:
if field in decrypted_record and isinstance(decrypted_record[field], dict):
if decrypted_record[field].get('encrypted'):
decrypted_record[field] = self.decrypt_field(
field,
decrypted_record[field]
)
return decrypted_record
class TransmissionEncryption:
"""Ensure encryption in transit for PHI."""
@staticmethod
def verify_tls_connection(conn) -> Dict:
"""Verify TLS configuration meets HIPAA requirements."""
cipher = conn.cipher()
version = conn.version()
requirements = {
'min_tls_version': 'TLSv1.2',
'min_key_size': 2048,
'allowed_ciphers': [
'ECDHE-RSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES128-GCM-SHA256',
'DHE-RSA-AES256-GCM-SHA384',
'DHE-RSA-AES128-GCM-SHA256'
]
}
return {
'compliant': (
version >= requirements['min_tls_version'] and
cipher[0] in requirements['allowed_ciphers']
),
'tls_version': version,
'cipher_suite': cipher[0],
'key_size': cipher[2],
'requirements': requirements
}Business Associate Agreement (BAA) Requirements
BAA Compliance Checker
# baa_compliance.py
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime
@dataclass
class VendorBAA:
"""Business Associate Agreement record."""
vendor_name: str
baa_signed_date: datetime
baa_expiry_date: datetime
services_covered: List[str]
phi_access_type: str
subcontractor_clause: bool
breach_notification_hours: int
security_requirements: List[str]
audit_rights: bool
class BAAComplianceManager:
"""Manage BAA compliance for vendors."""
def __init__(self):
self.vendor_baas: Dict[str, VendorBAA] = {}
def register_vendor(self, vendor: VendorBAA) -> Dict:
"""Register vendor BAA."""
# Validate BAA requirements
validation = self._validate_baa(vendor)
if not validation['valid']:
return {
'registered': False,
'errors': validation['errors']
}
self.vendor_baas[vendor.vendor_name] = vendor
return {
'registered': True,
'vendor': vendor.vendor_name,
'expiry': vendor.baa_expiry_date.isoformat()
}
def _validate_baa(self, vendor: VendorBAA) -> Dict:
"""Validate BAA meets HIPAA requirements."""
errors = []
required_security = [
'encryption_at_rest',
'encryption_in_transit',
'access_controls',
'audit_logging',
'incident_response'
]
# Check security requirements
missing_security = [
req for req in required_security
if req not in vendor.security_requirements
]
if missing_security:
errors.append(f"Missing security requirements: {missing_security}")
# Check breach notification
if vendor.breach_notification_hours > 24:
errors.append("Breach notification must be within 24 hours")
# Check audit rights
if not vendor.audit_rights:
errors.append("BAA must include audit rights")
# Check subcontractor clause
if not vendor.subcontractor_clause:
errors.append("BAA must include subcontractor obligations")
return {
'valid': len(errors) == 0,
'errors': errors
}
def check_vendor_compliance(self, vendor_name: str) -> Dict:
"""Check if vendor BAA is compliant and current."""
vendor = self.vendor_baas.get(vendor_name)
if not vendor:
return {
'compliant': False,
'reason': 'No BAA on file'
}
if vendor.baa_expiry_date < datetime.utcnow():
return {
'compliant': False,
'reason': 'BAA expired',
'expired_date': vendor.baa_expiry_date.isoformat()
}
return {
'compliant': True,
'vendor': vendor_name,
'expiry': vendor.baa_expiry_date.isoformat()
}
def get_compliance_report(self) -> Dict:
"""Generate BAA compliance report."""
report = {
'generated_at': datetime.utcnow().isoformat(),
'total_vendors': len(self.vendor_baas),
'compliant': [],
'non_compliant': [],
'expiring_soon': []
}
for vendor_name, vendor in self.vendor_baas.items():
compliance = self.check_vendor_compliance(vendor_name)
if compliance['compliant']:
report['compliant'].append(vendor_name)
# Check if expiring within 90 days
days_until_expiry = (vendor.baa_expiry_date - datetime.utcnow()).days
if days_until_expiry <= 90:
report['expiring_soon'].append({
'vendor': vendor_name,
'days_until_expiry': days_until_expiry
})
else:
report['non_compliant'].append({
'vendor': vendor_name,
'reason': compliance.get('reason')
})
return reportConclusion
HIPAA compliance for cloud applications requires:
- Data Classification - Identify and classify all PHI
- Access Controls - Implement role-based access with minimum necessary
- Encryption - Encrypt PHI at rest and in transit
- Audit Logging - Maintain comprehensive, tamper-evident audit trails
- BAA Management - Ensure all vendors have compliant BAAs
- Breach Response - Have incident response procedures ready
Regular assessments and continuous monitoring are essential to maintain compliance as your application evolves.