Implementarea Arhitecturii Zero Trust: Ghid Practic
Zero Trust este un framework de securitate care elimina increderea implicita si valideaza continuu fiecare etapa a interactiunii digitale. Acest ghid ofera pattern-uri practice de implementare pentru organizatii moderne.
Principiile Zero Trust
Principii Fundamentale
# zero_trust_principles.yaml
zero_trust_principles:
never_trust_always_verify:
- verifica_fiecare_cerere
- presupune_bresa
- trateaza_fiecare_retea_ca_ostila
least_privilege_access:
- acces_just_in_time
- acces_doar_cat_e_necesar
- control_acces_bazat_pe_roluri
- control_acces_bazat_pe_atribute
assume_breach:
- segmenteaza_accesul
- minimizeaza_raza_de_explozie
- cripteaza_toate_datele
- logging_complet
pillars:
identitate:
- autentificare_puternica
- validare_continua
- acces_bazat_pe_risc
dispozitive:
- verificare_sanatate_dispozitiv
- protectie_endpoint
- management_dispozitive_mobile
retele:
- micro_segmentare
- trafic_criptat
- control_acces_retea
aplicatii:
- acces_securizat
- securitate_api
- protectie_workload
date:
- clasificare_date
- criptare
- prevenire_pierdere_date
vizibilitate:
- logging_complet
- analitice
- automatizareSecuritate Centrata pe Identitate
Verificare Continua a Identitatii
# identity_verification.py
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AuthenticationStrength(Enum):
WEAK = 1
STANDARD = 2
STRONG = 3
VERY_STRONG = 4
@dataclass
class UserContext:
"""Contextul utilizatorului pentru decizia de acces."""
user_id: str
session_id: str
device_id: str
ip_address: str
location: Dict
user_agent: str
authentication_method: str
authentication_time: datetime
authentication_strength: AuthenticationStrength
class ContinuousVerificationEngine:
"""Motor pentru verificare continua a identitatii."""
def __init__(self, config: Dict):
self.config = config
self.risk_thresholds = config.get("risk_thresholds", {
RiskLevel.LOW: 0.3,
RiskLevel.MEDIUM: 0.5,
RiskLevel.HIGH: 0.7,
RiskLevel.CRITICAL: 0.9
})
def evaluate_access(self, request) -> Dict:
"""Evalueaza cererea de acces cu verificare continua."""
# Calculeaza scorul de risc
risk_score = self._calculate_risk_score(request)
risk_level = self._determine_risk_level(risk_score)
# Determina puterea de autentificare necesara
required_strength = self._get_required_strength(request.resource, risk_level)
current_strength = request.user_context.authentication_strength
if current_strength.value < required_strength.value:
return {
"decision": "step_up_required",
"required_strength": required_strength.name,
"current_strength": current_strength.name,
"risk_score": risk_score
}
# Verifica validitatea sesiunii
session_valid = self._verify_session(request.user_context)
if not session_valid:
return {
"decision": "reauthentication_required",
"reason": "Sesiune expirata sau invalida",
"risk_score": risk_score
}
# Verifica increderea dispozitivului
device_trusted = self._verify_device(request.user_context)
if not device_trusted:
return {
"decision": "denied",
"reason": "Dispozitiv de neincredere",
"risk_score": risk_score
}
return {
"decision": "allowed",
"risk_score": risk_score,
"risk_level": risk_level.value,
"conditions": self._get_access_conditions(risk_level)
}
def _calculate_risk_score(self, request) -> float:
"""Calculeaza scorul de risc pentru cererea de acces."""
score = 0.0
# Risc bazat pe timp
auth_age = (request.request_time - request.user_context.authentication_time).total_seconds()
if auth_age > 3600: # Mai mult de 1 ora
score += 0.1
# Risc locatie
location_risk = self._assess_location_risk(request.user_context.location)
score += location_risk
# Risc dispozitiv
device_risk = self._assess_device_risk(request.user_context.device_id)
score += device_risk
# Sensibilitate resursa
resource_sensitivity = self._get_resource_sensitivity(request.resource)
score += resource_sensitivity * 0.3
# Anomalie comportamentala
behavior_risk = self._assess_behavior_anomaly(request)
score += behavior_risk
return min(score, 1.0)Micro-Segmentare
Motor de Politici de Segmentare Retea
# micro_segmentation.py
from dataclasses import dataclass
from typing import List, Dict, Optional
import ipaddress
class MicroSegmentationEngine:
"""Motor de politici de micro-segmentare."""
def __init__(self):
self.segments: Dict[str, NetworkSegment] = {}
self.policies: List[SegmentPolicy] = []
self.default_policy = "deny"
def evaluate_traffic(self, source_ip: str, destination_ip: str,
port: int, protocol: str) -> Dict:
"""Evalueaza daca traficul este permis."""
# Gaseste segmentele sursa si destinatie
source_segment = self._find_segment(source_ip)
dest_segment = self._find_segment(destination_ip)
if not source_segment or not dest_segment:
return {
"allowed": False,
"reason": "Segment necunoscut",
"action": "deny"
}
# Gaseste politica potrivita
matching_policy = self._find_matching_policy(
source_segment.segment_id,
dest_segment.segment_id,
port, protocol
)
if not matching_policy:
return {
"allowed": False,
"reason": "Nicio politica potrivita",
"action": self.default_policy
}
return {
"allowed": matching_policy.action != "deny",
"policy": matching_policy.policy_id,
"action": matching_policy.action
}
def generate_firewall_rules(self) -> List[Dict]:
"""Genereaza reguli de firewall din politici."""
rules = []
for policy in self.policies:
source_segment = self.segments[policy.source_segment]
dest_segment = self.segments[policy.destination_segment]
for source_cidr in source_segment.cidr_ranges:
for dest_cidr in dest_segment.cidr_ranges:
for port in policy.allowed_ports:
for protocol in policy.allowed_protocols:
rules.append({
"source": source_cidr,
"destination": dest_cidr,
"port": port,
"protocol": protocol,
"action": policy.action
})
return rulesIncredere Dispozitiv
Evaluarea Sanatatii Dispozitivului
class DeviceTrustManager:
"""Gestioneaza increderea dispozitivelor pentru Zero Trust."""
def assess_device(self, device_health) -> Dict:
"""Evalueaza conformitatea si nivelul de incredere al dispozitivului."""
violations = []
trust_score = 100
# Verifica nivelul patch OS
if self._is_os_outdated(device_health.os_patch_level):
violations.append({
"check": "os_patch_level",
"severity": "high",
"message": "Nivelul de patch OS este invechit"
})
trust_score -= 20
# Verifica antivirus
if device_health.antivirus_status != "active":
violations.append({
"check": "antivirus_status",
"severity": "critical",
"message": "Antivirusul nu este activ"
})
trust_score -= 30
# Verifica criptarea
if not device_health.encryption_enabled:
violations.append({
"check": "encryption",
"severity": "critical",
"message": "Criptarea discului nu este activata"
})
trust_score -= 25
# Verifica firewall-ul
if not device_health.firewall_enabled:
violations.append({
"check": "firewall",
"severity": "high",
"message": "Firewall-ul nu este activat"
})
trust_score -= 15
# Verifica vulnerabilitatile
critical_vulns = len([v for v in device_health.vulnerabilities if v.get("severity") == "critical"])
if critical_vulns > 0:
violations.append({
"check": "critical_vulnerabilities",
"severity": "critical",
"message": f"{critical_vulns} vulnerabilitati critice gasite"
})
trust_score -= 30
return {
"device_id": device_health.device_id,
"trust_score": max(0, trust_score),
"violations": violations,
"recommendations": self._get_recommendations(violations),
"access_level": self._determine_access_level(trust_score)
}
def _determine_access_level(self, trust_score: int) -> str:
"""Determina nivelul de acces pe baza scorului de incredere."""
if trust_score >= 80:
return "full"
elif trust_score >= 60:
return "restricted"
elif trust_score >= 40:
return "limited"
else:
return "denied"Logging Complet
Logger de Audit Zero Trust
class ZeroTrustAuditLogger:
"""Logging complet pentru Zero Trust."""
def log_access_decision(self, user_id: str, device_id: str,
source_ip: str, resource: str, action: str,
decision: str, risk_score: float,
authentication_method: str, session_id: str,
conditions_applied: List[str] = None,
policy_matched: str = None) -> Dict:
"""Logheaza decizia de acces."""
previous_hash = self.log_chain[-1] if self.log_chain else "genesis"
entry = {
"log_id": self._generate_log_id(),
"timestamp": datetime.utcnow().isoformat(),
"event_type": "access_decision",
"user_id": user_id,
"device_id": device_id,
"source_ip": source_ip,
"resource": resource,
"action": action,
"decision": decision,
"risk_score": risk_score,
"authentication_method": authentication_method,
"session_id": session_id,
"conditions_applied": conditions_applied or [],
"policy_matched": policy_matched,
}
# Calculeaza hash-ul de integritate
entry["integrity_hash"] = self._calculate_hash(entry, previous_hash)
self.log_chain.append(entry["integrity_hash"])
# Stocheaza intrarea
self.storage.store(entry)
return entryConcluzie
Implementarea arhitecturii Zero Trust necesita:
- Securitate Centrata pe Identitate - Verificare continua a utilizatorilor si sesiunilor
- Micro-Segmentare - Politici granulare de retea limitand miscarea laterala
- Incredere Dispozitiv - Verificarea sanatatii si conformitatii dispozitivului
- Logging Complet - Trasee detaliate de audit pentru toate deciziile de acces
- Acces Bazat pe Risc - Control dinamic al accesului bazat pe context
Zero Trust nu este un singur produs, ci o abordare completa a securitatii care trebuie implementata in toate straturile tehnologice.