Conformitatea HIPAA necesita masuri de protectie tehnice cuprinzatoare pentru a proteja Protected Health Information (PHI). Acest ghid ofera implementari practice pentru cerintele Security Rule in aplicatiile din domeniul sanatatii.
Sistem de Clasificare a Datelor PHI
Implementeaza identificarea si clasificarea sistematica a PHI:
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional, List, Dict, Set
import re
import hashlib
class PHIType(Enum):
"""18 HIPAA PHI identifiers."""
NAME = "name"
ADDRESS = "address"
DATES = "dates" # Except year
PHONE = "phone"
FAX = "fax"
EMAIL = "email"
SSN = "ssn"
MRN = "medical_record_number"
HEALTH_PLAN_ID = "health_plan_id"
ACCOUNT_NUMBER = "account_number"
CERTIFICATE_LICENSE = "certificate_license"
VEHICLE_ID = "vehicle_id"
DEVICE_ID = "device_id"
URL = "url"
IP_ADDRESS = "ip_address"
BIOMETRIC = "biometric"
PHOTO = "photo"
OTHER_UNIQUE = "other_unique_identifier"
class SensitivityLevel(Enum):
HIGH = "high" # Direct identifiers
MEDIUM = "medium" # Quasi-identifiers
LOW = "low" # Non-identifying health data
@dataclass
class PHIElement:
phi_type: PHIType
value: str
sensitivity: SensitivityLevel
location: str # Where in the data
context: Optional[str] = None
@dataclass
class DataClassificationResult:
contains_phi: bool
phi_elements: List[PHIElement]
sensitivity_level: SensitivityLevel
recommended_actions: List[str]
timestamp: datetime = field(default_factory=datetime.utcnow)
class PHIClassifier:
def __init__(self):
self.patterns = self._compile_patterns()
self.high_sensitivity_types = {
PHIType.SSN, PHIType.MRN, PHIType.HEALTH_PLAN_ID,
PHIType.BIOMETRIC, PHIType.PHOTO
}
self.medium_sensitivity_types = {
PHIType.NAME, PHIType.ADDRESS, PHIType.PHONE,
PHIType.EMAIL, PHIType.DATES
}
def _compile_patterns(self) -> Dict[PHIType, List[re.Pattern]]:
"""Compile regex patterns for PHI detection."""
return {
PHIType.SSN: [
re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
re.compile(r'\b\d{9}\b') # No dashes
],
PHIType.PHONE: [
re.compile(r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b'),
re.compile(r'\(\d{3}\)\s*\d{3}[-.\s]?\d{4}')
],
PHIType.EMAIL: [
re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
],
PHIType.MRN: [
re.compile(r'\bMRN[:\s#]*\d{6,10}\b', re.IGNORECASE),
re.compile(r'\bMedical Record[:\s#]*\d{6,10}\b', re.IGNORECASE)
],
PHIType.DATES: [
re.compile(r'\b\d{1,2}/\d{1,2}/\d{2,4}\b'),
re.compile(r'\b\d{4}-\d{2}-\d{2}\b'),
re.compile(r'\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+\d{4}\b', re.IGNORECASE)
],
PHIType.IP_ADDRESS: [
re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')
],
PHIType.HEALTH_PLAN_ID: [
re.compile(r'\b[A-Z]{3}\d{9}\b'), # Common format
re.compile(r'\bMember\s*ID[:\s#]*[A-Z0-9]{8,12}\b', re.IGNORECASE)
],
PHIType.ADDRESS: [
re.compile(r'\b\d+\s+[A-Za-z]+\s+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr)\b', re.IGNORECASE)
]
}
def classify(self, data: str, context: str = "unknown") -> DataClassificationResult:
"""Clasifica datele pentru continut PHI."""
phi_elements = []
# Detectare bazata pe pattern-uri
for phi_type, patterns in self.patterns.items():
for pattern in patterns:
for match in pattern.finditer(data):
sensitivity = self._get_sensitivity(phi_type)
phi_elements.append(PHIElement(
phi_type=phi_type,
value=match.group(),
sensitivity=sensitivity,
location=f"position {match.start()}-{match.end()}",
context=context
))
# Determina sensibilitatea generala
if not phi_elements:
overall_sensitivity = SensitivityLevel.LOW
elif any(e.sensitivity == SensitivityLevel.HIGH for e in phi_elements):
overall_sensitivity = SensitivityLevel.HIGH
elif any(e.sensitivity == SensitivityLevel.MEDIUM for e in phi_elements):
overall_sensitivity = SensitivityLevel.MEDIUM
else:
overall_sensitivity = SensitivityLevel.LOW
# Genereaza recomandari
recommendations = self._generate_recommendations(phi_elements, overall_sensitivity)
return DataClassificationResult(
contains_phi=len(phi_elements) > 0,
phi_elements=phi_elements,
sensitivity_level=overall_sensitivity,
recommended_actions=recommendations
)
def _get_sensitivity(self, phi_type: PHIType) -> SensitivityLevel:
"""Determina nivelul de sensibilitate pentru tipul PHI."""
if phi_type in self.high_sensitivity_types:
return SensitivityLevel.HIGH
elif phi_type in self.medium_sensitivity_types:
return SensitivityLevel.MEDIUM
return SensitivityLevel.LOW
def _generate_recommendations(
self,
elements: List[PHIElement],
sensitivity: SensitivityLevel
) -> List[str]:
"""Genereaza recomandari de gestionare."""
recommendations = []
if sensitivity == SensitivityLevel.HIGH:
recommendations.extend([
"Cripteaza datele in repaus si in tranzit",
"Implementeaza controale stricte de acces",
"Activeaza audit logging cuprinzator",
"Ia in considerare de-identificarea inainte de stocare"
])
if any(e.phi_type == PHIType.SSN for e in elements):
recommendations.append("SSN detectat - stocheaza doar ultimele 4 cifre daca este posibil")
if any(e.phi_type == PHIType.DATES for e in elements):
recommendations.append("Ia in considerare generalizarea datelor doar la an pentru de-identificare")
if any(e.phi_type == PHIType.ADDRESS for e in elements):
recommendations.append("Ia in considerare folosirea doar a codului postal (primele 3 cifre pentru populatii sub 20.000)")
return recommendations
def de_identify(self, data: str, method: str = "safe_harbor") -> Dict:
"""De-identifica datele folosind metoda specificata."""
if method == "safe_harbor":
return self._safe_harbor_deidentification(data)
elif method == "expert_determination":
return self._expert_determination_placeholder(data)
else:
raise ValueError(f"Unknown de-identification method: {method}")
def _safe_harbor_deidentification(self, data: str) -> Dict:
"""Aplica de-identificarea Safe Harbor (elimina toti cei 18 identificatori)."""
result = data
removed_elements = []
for phi_type, patterns in self.patterns.items():
for pattern in patterns:
matches = pattern.findall(result)
for match in matches:
removed_elements.append({
'type': phi_type.value,
'original': match,
'replacement': f"[{phi_type.value.upper()}_REMOVED]"
})
result = pattern.sub(f"[{phi_type.value.upper()}_REMOVED]", result)
return {
'original_length': len(data),
'deidentified_data': result,
'deidentified_length': len(result),
'elements_removed': len(removed_elements),
'removal_details': removed_elements,
'method': 'safe_harbor',
'hipaa_compliant': True
}
def _expert_determination_placeholder(self, data: str) -> Dict:
"""Placeholder pentru metoda expert determination."""
# In productie, aceasta ar implica analiza statistica
# pentru a determina riscul de re-identificare
return {
'message': 'Expert determination necesita analiza statistica',
'recommendation': 'Consulta un expert statistic calificat'
}Implementarea Controlului Accesului
Implementeaza controale de acces conforme HIPAA:
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Set, Dict, List
import hashlib
import jwt
class AccessLevel(Enum):
NONE = 0
READ = 1
WRITE = 2
ADMIN = 3
class UserRole(Enum):
PATIENT = "patient"
NURSE = "nurse"
PHYSICIAN = "physician"
ADMIN = "admin"
BILLING = "billing"
IT_SUPPORT = "it_support"
AUDITOR = "auditor"
@dataclass
class User:
user_id: str
username: str
role: UserRole
department: str
active: bool
mfa_enabled: bool
last_login: Optional[datetime]
permissions: Set[str]
access_restrictions: Dict[str, any] = None
@dataclass
class AccessDecision:
allowed: bool
user_id: str
resource: str
action: str
timestamp: datetime
reason: str
requires_break_glass: bool = False
audit_id: str = ""
class HIPAAAccessControl:
def __init__(self):
self.users: Dict[str, User] = {}
self.role_permissions = self._define_role_permissions()
self.break_glass_log: List[Dict] = []
self.session_timeout_minutes = 15 # HIPAA requirement
def _define_role_permissions(self) -> Dict[UserRole, Dict]:
"""Defineste permisiunile bazate pe roluri dupa principiul necesarului minim."""
return {
UserRole.PATIENT: {
'permissions': {'read_own_records', 'request_amendment'},
'phi_access': {'own_only': True},
'restrictions': {'department': None}
},
UserRole.NURSE: {
'permissions': {
'read_patient_records', 'write_vitals', 'write_notes',
'view_medications', 'administer_medications'
},
'phi_access': {'assigned_patients': True, 'department_patients': True},
'restrictions': {'department': 'assigned'}
},
UserRole.PHYSICIAN: {
'permissions': {
'read_patient_records', 'write_diagnosis', 'write_orders',
'prescribe_medications', 'view_all_records', 'write_notes'
},
'phi_access': {'all_patients': True},
'restrictions': None
},
UserRole.BILLING: {
'permissions': {
'read_billing_info', 'write_billing', 'read_insurance',
'read_demographics'
},
'phi_access': {'billing_relevant': True},
'restrictions': {'exclude_clinical_notes': True}
},
UserRole.ADMIN: {
'permissions': {'all'},
'phi_access': {'all': True},
'restrictions': None
},
UserRole.IT_SUPPORT: {
'permissions': {
'system_administration', 'user_management',
'audit_log_access'
},
'phi_access': {'none': True}, # IT nu ar trebui sa acceseze PHI
'restrictions': {'no_phi_access': True}
},
UserRole.AUDITOR: {
'permissions': {
'audit_log_access', 'compliance_reports',
'access_reviews'
},
'phi_access': {'audit_only': True},
'restrictions': {'read_only': True}
}
}
def check_access(
self,
user_id: str,
resource: str,
action: str,
patient_id: Optional[str] = None,
context: Dict = None
) -> AccessDecision:
"""Verifica daca utilizatorul are acces la resursa."""
audit_id = hashlib.md5(
f"{user_id}{resource}{action}{datetime.utcnow().isoformat()}".encode()
).hexdigest()[:12]
if user_id not in self.users:
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="Utilizator negasit",
audit_id=audit_id
)
user = self.users[user_id]
# Verifica daca utilizatorul este activ
if not user.active:
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="Contul utilizatorului este inactiv",
audit_id=audit_id
)
# Verifica cerinta MFA pentru acces PHI
if self._requires_phi_access(resource) and not user.mfa_enabled:
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="MFA necesar pentru acces PHI",
audit_id=audit_id
)
# Obtine permisiunile rolului
role_config = self.role_permissions.get(user.role)
if not role_config:
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="Rol neconfigurat",
audit_id=audit_id
)
# Verifica permisiunea
required_permission = self._get_required_permission(resource, action)
has_permission = (
'all' in role_config['permissions'] or
required_permission in role_config['permissions'] or
required_permission in user.permissions
)
if not has_permission:
# Verifica acces break-glass
if self._is_emergency_context(context):
return AccessDecision(
allowed=True,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="Acces break-glass de urgenta",
requires_break_glass=True,
audit_id=audit_id
)
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason=f"Permisiune lipsa: {required_permission}",
audit_id=audit_id
)
# Verifica restrictii specifice pacientului
if patient_id and not self._check_patient_access(user, patient_id, role_config):
return AccessDecision(
allowed=False,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="Neautorizat pentru inregistrarile acestui pacient",
audit_id=audit_id
)
return AccessDecision(
allowed=True,
user_id=user_id,
resource=resource,
action=action,
timestamp=datetime.utcnow(),
reason="Acces acordat",
audit_id=audit_id
)
def _requires_phi_access(self, resource: str) -> bool:
"""Determina daca resursa contine PHI."""
phi_resources = [
'patient_records', 'medical_history', 'lab_results',
'prescriptions', 'clinical_notes', 'imaging',
'demographics', 'insurance_info'
]
return any(phi in resource for phi in phi_resources)
def _get_required_permission(self, resource: str, action: str) -> str:
"""Mapeaza resursa/actiunea la permisiunea necesara."""
# Mapare simplificata - extinde in functie de sistemul tau
return f"{action}_{resource}"
def _is_emergency_context(self, context: Optional[Dict]) -> bool:
"""Verifica daca aceasta este o cerere de acces de urgenta."""
if not context:
return False
return context.get('emergency', False) and context.get('justification')
def _check_patient_access(
self,
user: User,
patient_id: str,
role_config: Dict
) -> bool:
"""Verifica daca utilizatorul poate accesa inregistrarile unui pacient specific."""
phi_access = role_config.get('phi_access', {})
if phi_access.get('all_patients') or phi_access.get('all'):
return True
if phi_access.get('own_only') and user.user_id == patient_id:
return True
if phi_access.get('assigned_patients'):
# Verifica asignarea - ar interoga tabelul de asignari in productie
return self._is_assigned_to_patient(user.user_id, patient_id)
if phi_access.get('department_patients'):
# Verifica daca pacientul este in departamentul utilizatorului
return self._is_patient_in_department(patient_id, user.department)
return False
def _is_assigned_to_patient(self, user_id: str, patient_id: str) -> bool:
"""Verifica daca utilizatorul este asignat pacientului - placeholder."""
# In productie, interogheaza baza de date de asignari
return False
def _is_patient_in_department(self, patient_id: str, department: str) -> bool:
"""Verifica daca pacientul este in departament - placeholder."""
# In productie, interogheaza locatia/departamentul pacientului
return False
def log_break_glass(
self,
user_id: str,
patient_id: str,
justification: str,
access_decision: AccessDecision
):
"""Inregistreaza accesul break-glass pentru revizuire."""
self.break_glass_log.append({
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'patient_id': patient_id,
'justification': justification,
'audit_id': access_decision.audit_id,
'reviewed': False,
'review_result': None
})Audit Logging HIPAA
Implementeaza audit logging cuprinzator:
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional, Dict, Any, List
from enum import Enum
import json
import hashlib
import uuid
class AuditEventType(Enum):
# Evenimente de acces
LOGIN_SUCCESS = "login_success"
LOGIN_FAILURE = "login_failure"
LOGOUT = "logout"
SESSION_TIMEOUT = "session_timeout"
# Evenimente acces PHI
PHI_VIEW = "phi_view"
PHI_CREATE = "phi_create"
PHI_UPDATE = "phi_update"
PHI_DELETE = "phi_delete"
PHI_EXPORT = "phi_export"
PHI_PRINT = "phi_print"
# Evenimente administrative
USER_CREATE = "user_create"
USER_MODIFY = "user_modify"
USER_DELETE = "user_delete"
PERMISSION_CHANGE = "permission_change"
# Evenimente de securitate
MFA_ENABLED = "mfa_enabled"
MFA_DISABLED = "mfa_disabled"
PASSWORD_CHANGE = "password_change"
BREAK_GLASS_ACCESS = "break_glass_access"
# Evenimente de sistem
SYSTEM_START = "system_start"
SYSTEM_STOP = "system_stop"
CONFIG_CHANGE = "config_change"
BACKUP_CREATED = "backup_created"
@dataclass
class AuditEvent:
event_id: str
timestamp: datetime
event_type: AuditEventType
user_id: str
patient_id: Optional[str]
resource: str
action: str
outcome: str # success, failure, error
source_ip: str
user_agent: Optional[str]
details: Dict[str, Any]
phi_accessed: bool
integrity_hash: str = ""
def __post_init__(self):
if not self.integrity_hash:
self.integrity_hash = self._compute_hash()
def _compute_hash(self) -> str:
"""Calculeaza hash de integritate pentru detectarea falsificarii."""
data = f"{self.event_id}{self.timestamp.isoformat()}{self.event_type.value}"
data += f"{self.user_id}{self.patient_id}{self.resource}{self.action}{self.outcome}"
return hashlib.sha256(data.encode()).hexdigest()
class HIPAAAuditLogger:
def __init__(self, storage_backend):
self.storage = storage_backend
self.retention_years = 6 # HIPAA minimum
self.previous_hash: Optional[str] = None
def log_event(
self,
event_type: AuditEventType,
user_id: str,
resource: str,
action: str,
outcome: str,
source_ip: str,
patient_id: Optional[str] = None,
user_agent: Optional[str] = None,
details: Dict[str, Any] = None,
phi_accessed: bool = False
) -> AuditEvent:
"""Inregistreaza un eveniment de audit."""
event = AuditEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.utcnow(),
event_type=event_type,
user_id=user_id,
patient_id=patient_id,
resource=resource,
action=action,
outcome=outcome,
source_ip=source_ip,
user_agent=user_agent,
details=details or {},
phi_accessed=phi_accessed
)
# Adauga hash de lant pentru dovada anti-falsificare
if self.previous_hash:
event.details['previous_hash'] = self.previous_hash
self.previous_hash = event.integrity_hash
# Stocheaza evenimentul
self.storage.store(event)
return event
def log_phi_access(
self,
user_id: str,
patient_id: str,
record_type: str,
action: str,
source_ip: str,
fields_accessed: List[str] = None,
purpose: str = None
) -> AuditEvent:
"""Inregistreaza accesul PHI cu urmarire detaliata."""
event_type_map = {
'view': AuditEventType.PHI_VIEW,
'create': AuditEventType.PHI_CREATE,
'update': AuditEventType.PHI_UPDATE,
'delete': AuditEventType.PHI_DELETE,
'export': AuditEventType.PHI_EXPORT,
'print': AuditEventType.PHI_PRINT
}
return self.log_event(
event_type=event_type_map.get(action, AuditEventType.PHI_VIEW),
user_id=user_id,
patient_id=patient_id,
resource=record_type,
action=action,
outcome='success',
source_ip=source_ip,
phi_accessed=True,
details={
'fields_accessed': fields_accessed or [],
'purpose': purpose,
'record_type': record_type
}
)
def generate_access_report(
self,
patient_id: str,
start_date: datetime,
end_date: datetime
) -> Dict:
"""Genereaza raport de acces pentru pacient (necesar pentru contabilizarea dezvaluirilor HIPAA)."""
events = self.storage.query(
patient_id=patient_id,
start_date=start_date,
end_date=end_date,
phi_accessed=True
)
report = {
'patient_id': patient_id,
'report_period': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'generated_at': datetime.utcnow().isoformat(),
'total_access_events': len(events),
'access_by_user': {},
'access_by_type': {},
'access_timeline': []
}
for event in events:
# Dupa utilizator
if event.user_id not in report['access_by_user']:
report['access_by_user'][event.user_id] = {
'count': 0,
'actions': []
}
report['access_by_user'][event.user_id]['count'] += 1
report['access_by_user'][event.user_id]['actions'].append(event.action)
# Dupa tip
record_type = event.details.get('record_type', 'unknown')
if record_type not in report['access_by_type']:
report['access_by_type'][record_type] = 0
report['access_by_type'][record_type] += 1
# Cronologie
report['access_timeline'].append({
'timestamp': event.timestamp.isoformat(),
'user_id': event.user_id,
'action': event.action,
'record_type': record_type
})
return report
def verify_log_integrity(self, events: List[AuditEvent]) -> Dict:
"""Verifica integritatea lantului de audit log."""
results = {
'verified': True,
'total_events': len(events),
'integrity_failures': []
}
for i, event in enumerate(events):
# Verifica hash-ul individual al evenimentului
expected_hash = event._compute_hash()
if expected_hash != event.integrity_hash:
results['verified'] = False
results['integrity_failures'].append({
'event_id': event.event_id,
'issue': 'Nepotrivire hash - evenimentul poate fi fost falsificat'
})
# Verifica lantul
if i > 0:
previous_hash = event.details.get('previous_hash')
if previous_hash and previous_hash != events[i-1].integrity_hash:
results['verified'] = False
results['integrity_failures'].append({
'event_id': event.event_id,
'issue': 'Lant intrerupt - evenimentele pot lipsi sau fi reordonate'
})
return resultsImplementarea Criptarii
Implementeaza criptare conforma HIPAA:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
import os
import base64
from dataclasses import dataclass
from typing import Optional, Dict
import json
@dataclass
class EncryptedData:
ciphertext: bytes
nonce: bytes
key_id: str
algorithm: str
version: int
class HIPAAEncryption:
"""Criptare conforma HIPAA pentru PHI."""
def __init__(self, key_manager):
self.key_manager = key_manager
self.algorithm = 'AES-256-GCM' # Recomandat NIST
self.current_key_version = 1
def encrypt_phi(
self,
data: str,
patient_id: str,
data_type: str
) -> EncryptedData:
"""Cripteaza date PHI."""
# Obtine cheia de criptare
key, key_id = self.key_manager.get_current_key()
# Genereaza nonce unic
nonce = os.urandom(12)
# Creaza date autentificate aditionale (AAD)
aad = json.dumps({
'patient_id': patient_id,
'data_type': data_type,
'key_version': self.current_key_version
}).encode()
# Cripteaza
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, data.encode(), aad)
return EncryptedData(
ciphertext=ciphertext,
nonce=nonce,
key_id=key_id,
algorithm=self.algorithm,
version=self.current_key_version
)
def decrypt_phi(
self,
encrypted_data: EncryptedData,
patient_id: str,
data_type: str
) -> str:
"""Decripteaza date PHI."""
# Obtine cheia de decriptare
key = self.key_manager.get_key(
encrypted_data.key_id,
encrypted_data.version
)
# Recreeaza AAD
aad = json.dumps({
'patient_id': patient_id,
'data_type': data_type,
'key_version': encrypted_data.version
}).encode()
# Decripteaza
aesgcm = AESGCM(key)
plaintext = aesgcm.decrypt(
encrypted_data.nonce,
encrypted_data.ciphertext,
aad
)
return plaintext.decode()
def encrypt_field(self, value: str, field_name: str) -> str:
"""Cripteaza un camp individual pentru stocare in baza de date."""
key, key_id = self.key_manager.get_current_key()
nonce = os.urandom(12)
aesgcm = AESGCM(key)
ciphertext = aesgcm.encrypt(nonce, value.encode(), field_name.encode())
# Codifica pentru stocare
return base64.b64encode(json.dumps({
'c': base64.b64encode(ciphertext).decode(),
'n': base64.b64encode(nonce).decode(),
'k': key_id,
'v': self.current_key_version
}).encode()).decode()
def decrypt_field(self, encrypted_value: str, field_name: str) -> str:
"""Decripteaza un camp individual din baza de date."""
data = json.loads(base64.b64decode(encrypted_value))
key = self.key_manager.get_key(data['k'], data['v'])
nonce = base64.b64decode(data['n'])
ciphertext = base64.b64decode(data['c'])
aesgcm = AESGCM(key)
plaintext = aesgcm.decrypt(nonce, ciphertext, field_name.encode())
return plaintext.decode()
class KeyManager:
"""Gestioneaza cheile de criptare pentru conformitatea HIPAA."""
def __init__(self, master_key: bytes):
self.master_key = master_key
self.keys: Dict[str, Dict] = {}
self.current_key_id: Optional[str] = None
self._initialize_key()
def _initialize_key(self):
"""Initializeaza sau roteste cheia de criptare."""
key_id = self._generate_key_id()
key = self._derive_key(key_id, 1)
self.keys[key_id] = {
1: key,
'created_at': datetime.utcnow(),
'rotated_at': None
}
self.current_key_id = key_id
def _generate_key_id(self) -> str:
"""Genereaza identificator unic pentru cheie."""
return hashlib.sha256(os.urandom(32)).hexdigest()[:16]
def _derive_key(self, key_id: str, version: int) -> bytes:
"""Deriveaza cheia de criptare din cheia master."""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=f"{key_id}:{version}".encode(),
iterations=100000,
backend=default_backend()
)
return kdf.derive(self.master_key)
def get_current_key(self) -> tuple[bytes, str]:
"""Obtine cheia curenta de criptare."""
key_data = self.keys[self.current_key_id]
current_version = max(key_data.keys() - {'created_at', 'rotated_at'})
return key_data[current_version], self.current_key_id
def get_key(self, key_id: str, version: int) -> bytes:
"""Obtine o versiune specifica a cheii pentru decriptare."""
if key_id not in self.keys:
raise KeyError(f"Key not found: {key_id}")
if version not in self.keys[key_id]:
raise KeyError(f"Key version not found: {version}")
return self.keys[key_id][version]
def rotate_key(self):
"""Roteste cheia de criptare."""
key_data = self.keys[self.current_key_id]
current_version = max(key_data.keys() - {'created_at', 'rotated_at'})
new_version = current_version + 1
new_key = self._derive_key(self.current_key_id, new_version)
self.keys[self.current_key_id][new_version] = new_key
self.keys[self.current_key_id]['rotated_at'] = datetime.utcnow()
return new_versionSistem de Notificare a Breselor
Implementeaza detectarea si notificarea breselor:
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import List, Optional, Dict
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class BreachSeverity(Enum):
LOW = "low" # < 500 persoane
MEDIUM = "medium" # 500+ persoane
HIGH = "high" # 500+ cu date sensibile
@dataclass
class BreachIncident:
incident_id: str
discovered_date: datetime
breach_date: Optional[datetime]
description: str
affected_individuals: int
phi_types_affected: List[str]
severity: BreachSeverity
status: str # investigating, confirmed, contained, closed
risk_assessment: Dict
notifications_sent: List[Dict]
class HIPAABreachNotification:
def __init__(self, config: Dict):
self.config = config
self.incidents: List[BreachIncident] = []
self.notification_deadline_days = 60 # Cerinta HIPAA
def report_incident(
self,
description: str,
affected_count: int,
phi_types: List[str],
breach_date: Optional[datetime] = None
) -> BreachIncident:
"""Raporteaza un potential incident de bresa."""
incident = BreachIncident(
incident_id=self._generate_incident_id(),
discovered_date=datetime.utcnow(),
breach_date=breach_date,
description=description,
affected_individuals=affected_count,
phi_types_affected=phi_types,
severity=self._assess_severity(affected_count, phi_types),
status='investigating',
risk_assessment={},
notifications_sent=[]
)
self.incidents.append(incident)
return incident
def _generate_incident_id(self) -> str:
"""Genereaza ID unic pentru incident."""
return f"BREACH-{datetime.utcnow().strftime('%Y%m%d')}-{len(self.incidents)+1:04d}"
def _assess_severity(
self,
affected_count: int,
phi_types: List[str]
) -> BreachSeverity:
"""Evalueaza severitatea bresei."""
sensitive_types = {'ssn', 'financial', 'mental_health', 'substance_abuse', 'hiv_status'}
has_sensitive = any(t in sensitive_types for t in phi_types)
if affected_count >= 500 and has_sensitive:
return BreachSeverity.HIGH
elif affected_count >= 500:
return BreachSeverity.MEDIUM
else:
return BreachSeverity.LOW
def perform_risk_assessment(self, incident_id: str) -> Dict:
"""Efectueaza evaluarea de risc cu 4 factori conform ghidajului HIPAA."""
incident = self._get_incident(incident_id)
# Factorul 1: Natura si amploarea PHI
phi_sensitivity_score = self._score_phi_sensitivity(incident.phi_types_affected)
# Factorul 2: Persoana neautorizata care a folosit/accesat PHI
# Acest lucru se determina in timpul investigatiei
unauthorized_access_risk = 0.5 # Placeholder
# Factorul 3: Daca PHI a fost efectiv achizitionat sau vizualizat
actual_acquisition_risk = 0.5 # Placeholder
# Factorul 4: Masura in care riscul a fost atenuat
mitigation_score = 0.5 # Placeholder
overall_risk = (
phi_sensitivity_score * 0.3 +
unauthorized_access_risk * 0.3 +
actual_acquisition_risk * 0.25 +
(1 - mitigation_score) * 0.15
)
assessment = {
'phi_sensitivity': phi_sensitivity_score,
'unauthorized_access_risk': unauthorized_access_risk,
'actual_acquisition_risk': actual_acquisition_risk,
'mitigation_effectiveness': mitigation_score,
'overall_risk_score': overall_risk,
'breach_confirmed': overall_risk > 0.5,
'notification_required': overall_risk > 0.5,
'assessed_at': datetime.utcnow().isoformat()
}
incident.risk_assessment = assessment
return assessment
def _score_phi_sensitivity(self, phi_types: List[str]) -> float:
"""Scor sensibilitate PHI."""
sensitivity_weights = {
'ssn': 1.0,
'financial': 0.9,
'mental_health': 0.95,
'substance_abuse': 0.95,
'hiv_status': 0.95,
'diagnosis': 0.7,
'medications': 0.6,
'demographics': 0.4,
'contact_info': 0.3
}
if not phi_types:
return 0.3
scores = [sensitivity_weights.get(t, 0.5) for t in phi_types]
return max(scores)
def send_notifications(self, incident_id: str) -> Dict:
"""Trimite notificarile necesare pentru bresa."""
incident = self._get_incident(incident_id)
if not incident.risk_assessment.get('notification_required'):
return {'status': 'not_required', 'reason': 'Evaluarea de risc sub prag'}
notifications = {
'individuals': None,
'hhs': None,
'media': None
}
# Notificari individuale (obligatorii in 60 de zile)
if incident.affected_individuals > 0:
notifications['individuals'] = self._notify_individuals(incident)
# Notificare HHS
if incident.affected_individuals >= 500:
# Notifica HHS in 60 de zile
notifications['hhs'] = self._notify_hhs(incident)
# Notificarea media este obligatorie pentru 500+
notifications['media'] = self._notify_media(incident)
else:
# Inregistreaza pentru raportul anual catre HHS (brese < 500)
notifications['hhs'] = {'status': 'logged_for_annual_report'}
incident.notifications_sent.append({
'timestamp': datetime.utcnow().isoformat(),
'notifications': notifications
})
return notifications
def _notify_individuals(self, incident: BreachIncident) -> Dict:
"""Trimite notificari individuale de bresa."""
notification_content = f"""
NOTIFICARE DE BRESA DE DATE
Stimate Pacient,
Va scriem pentru a va informa despre un incident de securitate care ar fi putut afecta informatiile dvs. personale de sanatate.
CE S-A INTAMPLAT:
{incident.description}
DATA BRESEI:
{incident.breach_date or 'In curs de investigare'}
INFORMATII IMPLICATE:
Urmatoarele tipuri de informatii ar fi putut fi afectate:
{', '.join(incident.phi_types_affected)}
CE FACEM:
Am luat masuri imediate pentru a limita acest incident si lucram cu experti in securitate pentru a investiga si preveni incidentele viitoare.
CE PUTETI FACE:
- Monitorizati rapoartele de credit si extrasele financiare
- Luati in considerare plasarea unei alerte de frauda pe dosarul dvs. de credit
- Verificati orice declaratii de beneficii de la asiguratorul dvs. de sanatate
INFORMATII DE CONTACT:
Daca aveti intrebari, contactati Ofiterul nostru de Confidentialitate la {self.config['privacy_officer_contact']}.
Ne cerem sincer scuze pentru acest incident si orice ingrijorare pe care o poate cauza.
Cu stima,
{self.config['organization_name']}
"""
# In productie, trimite notificari reale
return {
'status': 'sent',
'method': 'first_class_mail', # Cerinta HIPAA
'count': incident.affected_individuals,
'deadline': (incident.discovered_date + timedelta(days=60)).isoformat()
}
def _notify_hhs(self, incident: BreachIncident) -> Dict:
"""Trimite notificare de bresa catre HHS."""
# In productie, trimite catre portalul HHS de brese
return {
'status': 'submitted',
'submission_date': datetime.utcnow().isoformat(),
'portal': 'https://ocrportal.hhs.gov/ocr/breach/wizard_breach.jsf'
}
def _notify_media(self, incident: BreachIncident) -> Dict:
"""Notifica institutii media importante pentru brese mari."""
return {
'status': 'distributed',
'press_release_date': datetime.utcnow().isoformat(),
'outlets_contacted': ['local_news', 'healthcare_publications']
}
def _get_incident(self, incident_id: str) -> BreachIncident:
"""Obtine incidentul dupa ID."""
for incident in self.incidents:
if incident.incident_id == incident_id:
return incident
raise ValueError(f"Incident not found: {incident_id}")
def get_notification_status(self) -> Dict:
"""Obtine statusul tuturor notificarilor in asteptare."""
pending = []
deadline_approaching = []
for incident in self.incidents:
if incident.status not in ['closed']:
deadline = incident.discovered_date + timedelta(days=self.notification_deadline_days)
days_remaining = (deadline - datetime.utcnow()).days
if not incident.notifications_sent:
pending.append({
'incident_id': incident.incident_id,
'deadline': deadline.isoformat(),
'days_remaining': days_remaining
})
if days_remaining <= 14:
deadline_approaching.append(incident.incident_id)
return {
'pending_notifications': pending,
'deadline_approaching': deadline_approaching,
'total_incidents': len(self.incidents)
}Concluzie
Conformitatea HIPAA necesita o abordare cuprinzatoare care combina clasificarea PHI, controlul accesului, audit logging, criptare si capabilitati de notificare a breselor. Implementeaza aceste masuri de protectie tehnice ca parte a programului general de securitate si asigura-te ca evaluarile de risc regulate si instruirea personalului completeaza controalele tehnice. Retine ca conformitatea HIPAA este un proces continuu - monitorizeaza, auditeaza si imbunatateste constant postura de securitate pentru a proteja eficient informatiile de sanatate ale pacientilor.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →