DevSecOps

Gestionarea vulnerabilitatilor: detectie si remediere

Nicu Constantin
--5 min lectura
#vulnerability management#DevSecOps#security scanning#CVSS#remediation

Managementul Vulnerabilitatilor in DevSecOps: De la Detectie la Remediere

Managementul eficient al vulnerabilitatilor necesita mai mult decat rularea de scanere - cere o abordare sistematica de detectie, prioritizare si remediere integrata in fluxul tau de dezvoltare.

Pipeline de Detectie a Vulnerabilitatilor

Integrare Multi-Scanner

# .github/workflows/vulnerability-scan.yml
name: Comprehensive Security Scan
 
on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 6 * * *'  # Zilnic la 6 AM
 
jobs:
  sast:
    name: Static Application Security Testing
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0
 
      - name: SonarQube Scan
        uses: sonarsource/sonarqube-scan-action@master
        env:
          SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
          SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
 
      - name: Semgrep
        uses: semgrep/semgrep-action@v1
        with:
          config: >-
            p/security-audit
            p/secrets
            p/owasp-top-ten
            p/cwe-top-25
 
      - name: CodeQL Analysis
        uses: github/codeql-action/analyze@v3
        with:
          category: "/language:${{ matrix.language }}"
 
  sca:
    name: Software Composition Analysis
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Snyk SCA
        uses: snyk/actions/node@master
        env:
          SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
        with:
          args: --severity-threshold=medium --json-file-output=snyk-results.json
 
      - name: Trivy Filesystem Scan
        uses: aquasecurity/trivy-action@master
        with:
          scan-type: 'fs'
          scan-ref: '.'
          format: 'json'
          output: 'trivy-fs-results.json'
 
  container:
    name: Securitate Containere
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Build Image
        run: docker build -t app:${{ github.sha }} .
 
      - name: Trivy Container Scan
        uses: aquasecurity/trivy-action@master
        with:
          image-ref: 'app:${{ github.sha }}'
          format: 'json'
          output: 'trivy-container-results.json'
          severity: 'CRITICAL,HIGH,MEDIUM'
 
  iac:
    name: Securitate Infrastructure as Code
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Checkov
        uses: bridgecrewio/checkov-action@v12
        with:
          framework: terraform,kubernetes,dockerfile
          output_format: json
 
  aggregate:
    name: Agregate Rezultate
    needs: [sast, sca, container, iac]
    runs-on: ubuntu-latest
    steps:
      - name: Normalizeaza si Agregeaza
        run: |
          python scripts/aggregate_vulns.py \
            --sast sast-results/*.json \
            --sca sca-results/*.json \
            --container container-results/*.json \
            --iac iac-results/*.json \
            --output aggregated-vulns.json

Agregare si Normalizare Vulnerabilitati

# aggregate_vulns.py
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
import hashlib
 
class Severity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"
 
@dataclass
class Vulnerability:
    """Inregistrare de vulnerabilitate normalizata."""
    id: str
    title: str
    description: str
    severity: Severity
    cvss_score: Optional[float]
    cve_ids: List[str]
    cwe_ids: List[str]
    affected_component: str
    affected_version: str
    file_path: Optional[str]
    line_number: Optional[int]
    source_scanner: str
    first_detected: str
    remediation: Optional[str]
    references: List[str]
 
class VulnerabilityAggregator:
    """Agregeaza si normalizeaza vulnerabilitati din scanere multiple."""
 
    def __init__(self):
        self.vulnerabilities: List[Vulnerability] = []
        self.dedup_hashes = set()
 
    def deduplicate(self, vulnerabilities: List[Vulnerability]) -> List[Vulnerability]:
        """Elimina vulnerabilitatile duplicate."""
 
        unique_vulns = []
 
        for vuln in vulnerabilities:
            # Creeaza cheie deduplicare
            dedup_key = f"{vuln.affected_component}:{vuln.affected_version}:{','.join(vuln.cve_ids)}"
            dedup_hash = hashlib.md5(dedup_key.encode()).hexdigest()
 
            if dedup_hash not in self.dedup_hashes:
                self.dedup_hashes.add(dedup_hash)
                unique_vulns.append(vuln)
 
        return unique_vulns

Prioritizarea Vulnerabilitatilor

Framework de Prioritizare Bazat pe Risc

class VulnerabilityPrioritizer:
    """Prioritizeaza vulnerabilitatile pe baza contextului de risc."""
 
    def calculate_priority_score(
        self,
        vulnerability: Vulnerability,
        context: PrioritizationContext
    ) -> float:
        """Calculeaza scor de prioritate (0-100)."""
 
        score = 0
 
        # Scor de baza din CVSS (0-40 puncte)
        if vulnerability.cvss_score:
            score += vulnerability.cvss_score * 4
 
        # Maturitate exploit (0-20 puncte)
        score += context.exploit_maturity.value * 4
 
        # Criticitate asset (0-20 puncte)
        score += context.asset_criticality.value * 5
 
        # Factori de mediu (0-20 puncte)
        if context.is_internet_facing:
            score += 10
        if context.has_sensitive_data:
            score += 5
        if context.in_kev:
            score += 15  # Boost semnificativ pentru exploitate cunoscute
 
        # Ajustare EPSS (-10 la +10)
        if context.epss_score:
            if context.epss_score > 0.9:
                score += 10
            elif context.epss_score > 0.7:
                score += 5
            elif context.epss_score < 0.1:
                score -= 10
 
        return min(max(score, 0), 100)
 
    def _determine_sla(self, score: float, context) -> Dict:
        """Determina SLA de remediere pe baza prioritatii."""
 
        if score >= 90 or context.in_kev:
            return {'days': 1, 'priority': 'P1', 'description': 'Critica - Actiune imediata necesara'}
        elif score >= 70:
            return {'days': 7, 'priority': 'P2', 'description': 'Mare - Remediaza in 1 saptamana'}
        elif score >= 50:
            return {'days': 30, 'priority': 'P3', 'description': 'Medie - Remediaza in 1 luna'}
        elif score >= 30:
            return {'days': 90, 'priority': 'P4', 'description': 'Scazuta - Remediaza in trimestru'}
        else:
            return {'days': 180, 'priority': 'P5', 'description': 'Informationala - Adreseaza cand posibil'}

Remediere Automata

Automatizarea Actualizarii Dependentelor

class DependencyRemediator:
    """Remediere automata a vulnerabilitatilor de dependente."""
 
    def __init__(self, config: Dict):
        self.config = config
        self.dry_run = config.get('dry_run', True)
        self.max_major_upgrade = config.get('max_major_upgrade', False)
 
    def remediate_npm(self, vulnerabilities: List[Vulnerability]) -> List[RemediationResult]:
        """Remediaza vulnerabilitati npm."""
 
        results = []
 
        for vuln in vulnerabilities:
            # Obtine versiunea curenta
            current = self._get_npm_version(vuln.affected_component)
 
            # Determina upgrade sigur
            target = self._find_safe_version(
                vuln.affected_component, vuln.remediation, current
            )
 
            if not target:
                results.append(RemediationResult(
                    vulnerability_id=vuln.id,
                    success=False,
                    action_taken='none',
                    old_version=current or 'unknown',
                    new_version=None,
                    error='Nu s-a gasit cale sigura de upgrade',
                    requires_review=True
                ))
                continue
 
            # Verifica daca upgrade-ul este acceptabil
            if not self._is_upgrade_acceptable(current, target):
                results.append(RemediationResult(
                    vulnerability_id=vuln.id,
                    success=False,
                    action_taken='skipped',
                    old_version=current,
                    new_version=target,
                    error='Upgrade versiune major necesita review manual',
                    requires_review=True
                ))
                continue
 
            # Efectueaza upgrade-ul
            if not self.dry_run:
                success = self._npm_upgrade(vuln.affected_component, target)
            else:
                success = True
 
            results.append(RemediationResult(
                vulnerability_id=vuln.id,
                success=success,
                action_taken='upgrade' if not self.dry_run else 'dry_run',
                old_version=current,
                new_version=target,
                error=None,
                requires_review=False
            ))
 
        return results

Metrici si Raportare

Dashboard Metrici Vulnerabilitati

class MetricsCalculator:
    """Calculeaza metrici management vulnerabilitati."""
 
    def calculate_metrics(self, as_of: datetime = None) -> VulnerabilityMetrics:
        """Calculeaza metricile curente de vulnerabilitati."""
 
        as_of = as_of or datetime.utcnow()
        vulns = self.store.get_all_open()
 
        # Numara dupa severitate
        by_severity = defaultdict(int)
        for v in vulns:
            by_severity[v.severity.value] += 1
 
        # Calculeaza MTTR
        closed_vulns = self.store.get_closed(since=as_of - timedelta(days=90))
        mttr_critical = self._calculate_mttr(closed_vulns, Severity.CRITICAL)
        mttr_high = self._calculate_mttr(closed_vulns, Severity.HIGH)
 
        # Conformitate SLA
        sla_compliance = self._calculate_sla_compliance(closed_vulns)
 
        # Activitate saptamanala
        week_ago = as_of - timedelta(days=7)
        new_this_week = len(self.store.get_opened_since(week_ago))
        closed_this_week = len(self.store.get_closed_since(week_ago))
 
        # Intarziate
        overdue = len([v for v in vulns if self._is_overdue(v, as_of)])
 
        return VulnerabilityMetrics(
            total_open=len(vulns),
            critical_open=by_severity['critical'],
            high_open=by_severity['high'],
            medium_open=by_severity['medium'],
            low_open=by_severity['low'],
            mttr_critical=mttr_critical,
            mttr_high=mttr_high,
            sla_compliance_rate=sla_compliance,
            new_this_week=new_this_week,
            closed_this_week=closed_this_week,
            overdue_count=overdue
        )

Concluzie

Managementul eficient al vulnerabilitatilor in DevSecOps necesita:

  1. Detectie Completa - Scanere multiple acoperind cod, dependente, containere si IaC
  2. Prioritizare Inteligenta - Scorizare bazata pe risc considerand exploitabilitatea si contextul asset-ului
  3. Remediere Automata - Auto-patching unde e sigur, cu review uman pentru modificari majore
  4. Monitorizare Continua - Urmareste metrici, tendinte si conformitate SLA

Prin implementarea acestor practici, organizatiile pot reduce sistematic expunerea la vulnerabilitati mentinand viteza de dezvoltare.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.