DevSecOps

Implementarea Arhitecturii Zero Trust: Ghid Practic

Nicu Constantin
--5 min lectura
#zero trust#security architecture#identity#micro-segmentation#DevSecOps

Implementarea Arhitecturii Zero Trust: Ghid Practic

Zero Trust este un framework de securitate care elimina increderea implicita si valideaza continuu fiecare etapa a interactiunii digitale. Acest ghid ofera pattern-uri practice de implementare pentru organizatii moderne.

Principiile Zero Trust

Principii Fundamentale

# zero_trust_principles.yaml
zero_trust_principles:
  never_trust_always_verify:
    - verifica_fiecare_cerere
    - presupune_bresa
    - trateaza_fiecare_retea_ca_ostila
 
  least_privilege_access:
    - acces_just_in_time
    - acces_doar_cat_e_necesar
    - control_acces_bazat_pe_roluri
    - control_acces_bazat_pe_atribute
 
  assume_breach:
    - segmenteaza_accesul
    - minimizeaza_raza_de_explozie
    - cripteaza_toate_datele
    - logging_complet
 
pillars:
  identitate:
    - autentificare_puternica
    - validare_continua
    - acces_bazat_pe_risc
 
  dispozitive:
    - verificare_sanatate_dispozitiv
    - protectie_endpoint
    - management_dispozitive_mobile
 
  retele:
    - micro_segmentare
    - trafic_criptat
    - control_acces_retea
 
  aplicatii:
    - acces_securizat
    - securitate_api
    - protectie_workload
 
  date:
    - clasificare_date
    - criptare
    - prevenire_pierdere_date
 
  vizibilitate:
    - logging_complet
    - analitice
    - automatizare

Securitate Centrata pe Identitate

Verificare Continua a Identitatii

# identity_verification.py
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
 
class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"
 
class AuthenticationStrength(Enum):
    WEAK = 1
    STANDARD = 2
    STRONG = 3
    VERY_STRONG = 4
 
@dataclass
class UserContext:
    """Contextul utilizatorului pentru decizia de acces."""
    user_id: str
    session_id: str
    device_id: str
    ip_address: str
    location: Dict
    user_agent: str
    authentication_method: str
    authentication_time: datetime
    authentication_strength: AuthenticationStrength
 
class ContinuousVerificationEngine:
    """Motor pentru verificare continua a identitatii."""
 
    def __init__(self, config: Dict):
        self.config = config
        self.risk_thresholds = config.get("risk_thresholds", {
            RiskLevel.LOW: 0.3,
            RiskLevel.MEDIUM: 0.5,
            RiskLevel.HIGH: 0.7,
            RiskLevel.CRITICAL: 0.9
        })
 
    def evaluate_access(self, request) -> Dict:
        """Evalueaza cererea de acces cu verificare continua."""
 
        # Calculeaza scorul de risc
        risk_score = self._calculate_risk_score(request)
        risk_level = self._determine_risk_level(risk_score)
 
        # Determina puterea de autentificare necesara
        required_strength = self._get_required_strength(request.resource, risk_level)
        current_strength = request.user_context.authentication_strength
 
        if current_strength.value < required_strength.value:
            return {
                "decision": "step_up_required",
                "required_strength": required_strength.name,
                "current_strength": current_strength.name,
                "risk_score": risk_score
            }
 
        # Verifica validitatea sesiunii
        session_valid = self._verify_session(request.user_context)
        if not session_valid:
            return {
                "decision": "reauthentication_required",
                "reason": "Sesiune expirata sau invalida",
                "risk_score": risk_score
            }
 
        # Verifica increderea dispozitivului
        device_trusted = self._verify_device(request.user_context)
        if not device_trusted:
            return {
                "decision": "denied",
                "reason": "Dispozitiv de neincredere",
                "risk_score": risk_score
            }
 
        return {
            "decision": "allowed",
            "risk_score": risk_score,
            "risk_level": risk_level.value,
            "conditions": self._get_access_conditions(risk_level)
        }
 
    def _calculate_risk_score(self, request) -> float:
        """Calculeaza scorul de risc pentru cererea de acces."""
 
        score = 0.0
 
        # Risc bazat pe timp
        auth_age = (request.request_time - request.user_context.authentication_time).total_seconds()
        if auth_age > 3600:  # Mai mult de 1 ora
            score += 0.1
 
        # Risc locatie
        location_risk = self._assess_location_risk(request.user_context.location)
        score += location_risk
 
        # Risc dispozitiv
        device_risk = self._assess_device_risk(request.user_context.device_id)
        score += device_risk
 
        # Sensibilitate resursa
        resource_sensitivity = self._get_resource_sensitivity(request.resource)
        score += resource_sensitivity * 0.3
 
        # Anomalie comportamentala
        behavior_risk = self._assess_behavior_anomaly(request)
        score += behavior_risk
 
        return min(score, 1.0)

Micro-Segmentare

Motor de Politici de Segmentare Retea

# micro_segmentation.py
from dataclasses import dataclass
from typing import List, Dict, Optional
import ipaddress
 
class MicroSegmentationEngine:
    """Motor de politici de micro-segmentare."""
 
    def __init__(self):
        self.segments: Dict[str, NetworkSegment] = {}
        self.policies: List[SegmentPolicy] = []
        self.default_policy = "deny"
 
    def evaluate_traffic(self, source_ip: str, destination_ip: str,
                        port: int, protocol: str) -> Dict:
        """Evalueaza daca traficul este permis."""
 
        # Gaseste segmentele sursa si destinatie
        source_segment = self._find_segment(source_ip)
        dest_segment = self._find_segment(destination_ip)
 
        if not source_segment or not dest_segment:
            return {
                "allowed": False,
                "reason": "Segment necunoscut",
                "action": "deny"
            }
 
        # Gaseste politica potrivita
        matching_policy = self._find_matching_policy(
            source_segment.segment_id,
            dest_segment.segment_id,
            port, protocol
        )
 
        if not matching_policy:
            return {
                "allowed": False,
                "reason": "Nicio politica potrivita",
                "action": self.default_policy
            }
 
        return {
            "allowed": matching_policy.action != "deny",
            "policy": matching_policy.policy_id,
            "action": matching_policy.action
        }
 
    def generate_firewall_rules(self) -> List[Dict]:
        """Genereaza reguli de firewall din politici."""
 
        rules = []
        for policy in self.policies:
            source_segment = self.segments[policy.source_segment]
            dest_segment = self.segments[policy.destination_segment]
 
            for source_cidr in source_segment.cidr_ranges:
                for dest_cidr in dest_segment.cidr_ranges:
                    for port in policy.allowed_ports:
                        for protocol in policy.allowed_protocols:
                            rules.append({
                                "source": source_cidr,
                                "destination": dest_cidr,
                                "port": port,
                                "protocol": protocol,
                                "action": policy.action
                            })
 
        return rules

Incredere Dispozitiv

Evaluarea Sanatatii Dispozitivului

class DeviceTrustManager:
    """Gestioneaza increderea dispozitivelor pentru Zero Trust."""
 
    def assess_device(self, device_health) -> Dict:
        """Evalueaza conformitatea si nivelul de incredere al dispozitivului."""
 
        violations = []
        trust_score = 100
 
        # Verifica nivelul patch OS
        if self._is_os_outdated(device_health.os_patch_level):
            violations.append({
                "check": "os_patch_level",
                "severity": "high",
                "message": "Nivelul de patch OS este invechit"
            })
            trust_score -= 20
 
        # Verifica antivirus
        if device_health.antivirus_status != "active":
            violations.append({
                "check": "antivirus_status",
                "severity": "critical",
                "message": "Antivirusul nu este activ"
            })
            trust_score -= 30
 
        # Verifica criptarea
        if not device_health.encryption_enabled:
            violations.append({
                "check": "encryption",
                "severity": "critical",
                "message": "Criptarea discului nu este activata"
            })
            trust_score -= 25
 
        # Verifica firewall-ul
        if not device_health.firewall_enabled:
            violations.append({
                "check": "firewall",
                "severity": "high",
                "message": "Firewall-ul nu este activat"
            })
            trust_score -= 15
 
        # Verifica vulnerabilitatile
        critical_vulns = len([v for v in device_health.vulnerabilities if v.get("severity") == "critical"])
        if critical_vulns > 0:
            violations.append({
                "check": "critical_vulnerabilities",
                "severity": "critical",
                "message": f"{critical_vulns} vulnerabilitati critice gasite"
            })
            trust_score -= 30
 
        return {
            "device_id": device_health.device_id,
            "trust_score": max(0, trust_score),
            "violations": violations,
            "recommendations": self._get_recommendations(violations),
            "access_level": self._determine_access_level(trust_score)
        }
 
    def _determine_access_level(self, trust_score: int) -> str:
        """Determina nivelul de acces pe baza scorului de incredere."""
        if trust_score >= 80:
            return "full"
        elif trust_score >= 60:
            return "restricted"
        elif trust_score >= 40:
            return "limited"
        else:
            return "denied"

Logging Complet

Logger de Audit Zero Trust

class ZeroTrustAuditLogger:
    """Logging complet pentru Zero Trust."""
 
    def log_access_decision(self, user_id: str, device_id: str,
                           source_ip: str, resource: str, action: str,
                           decision: str, risk_score: float,
                           authentication_method: str, session_id: str,
                           conditions_applied: List[str] = None,
                           policy_matched: str = None) -> Dict:
        """Logheaza decizia de acces."""
 
        previous_hash = self.log_chain[-1] if self.log_chain else "genesis"
 
        entry = {
            "log_id": self._generate_log_id(),
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": "access_decision",
            "user_id": user_id,
            "device_id": device_id,
            "source_ip": source_ip,
            "resource": resource,
            "action": action,
            "decision": decision,
            "risk_score": risk_score,
            "authentication_method": authentication_method,
            "session_id": session_id,
            "conditions_applied": conditions_applied or [],
            "policy_matched": policy_matched,
        }
 
        # Calculeaza hash-ul de integritate
        entry["integrity_hash"] = self._calculate_hash(entry, previous_hash)
        self.log_chain.append(entry["integrity_hash"])
 
        # Stocheaza intrarea
        self.storage.store(entry)
 
        return entry

Concluzie

Implementarea arhitecturii Zero Trust necesita:

  1. Securitate Centrata pe Identitate - Verificare continua a utilizatorilor si sesiunilor
  2. Micro-Segmentare - Politici granulare de retea limitand miscarea laterala
  3. Incredere Dispozitiv - Verificarea sanatatii si conformitatii dispozitivului
  4. Logging Complet - Trasee detaliate de audit pentru toate deciziile de acces
  5. Acces Bazat pe Risc - Control dinamic al accesului bazat pe context

Zero Trust nu este un singur produs, ci o abordare completa a securitatii care trebuie implementata in toate straturile tehnologice.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.