DevSecOps

Vulnerability Management in DevSecOps: From Detection to Remediation

Nicu Constantin
--12 min lectura
#vulnerability management#DevSecOps#security scanning#CVSS#remediation

Vulnerability Management in DevSecOps: From Detection to Remediation

Effective vulnerability management requires more than running scanners - it demands a systematic approach to detection, prioritization, and remediation integrated into your development workflow.

Vulnerability Detection Pipeline

Multi-Scanner Integration

# .github/workflows/vulnerability-scan.yml
name: Comprehensive Security Scan
 
on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 6 * * *'  # Daily at 6 AM
 
jobs:
  sast:
    name: Static Application Security Testing
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0
 
      - name: SonarQube Scan
        uses: sonarsource/sonarqube-scan-action@master
        env:
          SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
          SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
 
      - name: Semgrep
        uses: semgrep/semgrep-action@v1
        with:
          config: >-
            p/security-audit
            p/secrets
            p/owasp-top-ten
            p/cwe-top-25
 
      - name: CodeQL Analysis
        uses: github/codeql-action/analyze@v3
        with:
          category: "/language:${{ matrix.language }}"
 
  sca:
    name: Software Composition Analysis
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Snyk SCA
        uses: snyk/actions/node@master
        env:
          SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
        with:
          args: --severity-threshold=medium --json-file-output=snyk-results.json
 
      - name: OWASP Dependency Check
        uses: dependency-check/Dependency-Check_Action@main
        with:
          project: 'my-project'
          path: '.'
          format: 'JSON'
 
      - name: Trivy Filesystem Scan
        uses: aquasecurity/trivy-action@master
        with:
          scan-type: 'fs'
          scan-ref: '.'
          format: 'json'
          output: 'trivy-fs-results.json'
 
  container:
    name: Container Security
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Build Image
        run: docker build -t app:${{ github.sha }} .
 
      - name: Trivy Container Scan
        uses: aquasecurity/trivy-action@master
        with:
          image-ref: 'app:${{ github.sha }}'
          format: 'json'
          output: 'trivy-container-results.json'
          severity: 'CRITICAL,HIGH,MEDIUM'
 
      - name: Grype Scan
        uses: anchore/scan-action@v3
        with:
          image: 'app:${{ github.sha }}'
          fail-build: false
          output-format: json
 
  iac:
    name: Infrastructure as Code Security
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: tfsec
        uses: aquasecurity/tfsec-action@v1.0.3
        with:
          additional_args: --format json --out tfsec-results.json
 
      - name: Checkov
        uses: bridgecrewio/checkov-action@v12
        with:
          framework: terraform,kubernetes,dockerfile
          output_format: json
          output_file_path: checkov-results.json
 
  aggregate:
    name: Aggregate Results
    needs: [sast, sca, container, iac]
    runs-on: ubuntu-latest
    steps:
      - name: Download Artifacts
        uses: actions/download-artifact@v4
 
      - name: Normalize and Aggregate
        run: |
          python scripts/aggregate_vulns.py \
            --sast sast-results/*.json \
            --sca sca-results/*.json \
            --container container-results/*.json \
            --iac iac-results/*.json \
            --output aggregated-vulns.json
 
      - name: Upload to Vulnerability Management Platform
        run: |
          curl -X POST ${{ secrets.VULN_PLATFORM_URL }}/api/import \
            -H "Authorization: Bearer ${{ secrets.VULN_PLATFORM_TOKEN }}" \
            -F "file=@aggregated-vulns.json"

Vulnerability Aggregation and Normalization

# aggregate_vulns.py
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
import hashlib
 
class Severity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"
 
@dataclass
class Vulnerability:
    """Normalized vulnerability record."""
    id: str
    title: str
    description: str
    severity: Severity
    cvss_score: Optional[float]
    cve_ids: List[str]
    cwe_ids: List[str]
    affected_component: str
    affected_version: str
    file_path: Optional[str]
    line_number: Optional[int]
    source_scanner: str
    first_detected: str
    remediation: Optional[str]
    references: List[str]
 
class VulnerabilityAggregator:
    """Aggregate and normalize vulnerabilities from multiple scanners."""
 
    def __init__(self):
        self.vulnerabilities: List[Vulnerability] = []
        self.dedup_hashes = set()
 
    def normalize_semgrep(self, results: Dict) -> List[Vulnerability]:
        """Normalize Semgrep results."""
        vulns = []
 
        for result in results.get('results', []):
            vuln = Vulnerability(
                id=self._generate_id(result),
                title=result.get('check_id', 'Unknown'),
                description=result.get('extra', {}).get('message', ''),
                severity=self._map_severity(result.get('extra', {}).get('severity', 'INFO')),
                cvss_score=None,
                cve_ids=[],
                cwe_ids=result.get('extra', {}).get('metadata', {}).get('cwe', []),
                affected_component=result.get('path', ''),
                affected_version='N/A',
                file_path=result.get('path'),
                line_number=result.get('start', {}).get('line'),
                source_scanner='semgrep',
                first_detected=datetime.utcnow().isoformat(),
                remediation=result.get('extra', {}).get('fix'),
                references=result.get('extra', {}).get('metadata', {}).get('references', [])
            )
            vulns.append(vuln)
 
        return vulns
 
    def normalize_snyk(self, results: Dict) -> List[Vulnerability]:
        """Normalize Snyk SCA results."""
        vulns = []
 
        for vuln_data in results.get('vulnerabilities', []):
            vuln = Vulnerability(
                id=vuln_data.get('id', self._generate_id(vuln_data)),
                title=vuln_data.get('title', 'Unknown'),
                description=vuln_data.get('description', ''),
                severity=self._map_severity(vuln_data.get('severity', 'low')),
                cvss_score=vuln_data.get('cvssScore'),
                cve_ids=vuln_data.get('identifiers', {}).get('CVE', []),
                cwe_ids=vuln_data.get('identifiers', {}).get('CWE', []),
                affected_component=vuln_data.get('packageName', ''),
                affected_version=vuln_data.get('version', ''),
                file_path=vuln_data.get('from', [''])[0] if vuln_data.get('from') else None,
                line_number=None,
                source_scanner='snyk',
                first_detected=datetime.utcnow().isoformat(),
                remediation=vuln_data.get('fixedIn', [None])[0] if vuln_data.get('fixedIn') else None,
                references=[vuln_data.get('url', '')]
            )
            vulns.append(vuln)
 
        return vulns
 
    def normalize_trivy(self, results: Dict) -> List[Vulnerability]:
        """Normalize Trivy results."""
        vulns = []
 
        for result in results.get('Results', []):
            target = result.get('Target', '')
 
            for vuln_data in result.get('Vulnerabilities', []):
                vuln = Vulnerability(
                    id=vuln_data.get('VulnerabilityID', self._generate_id(vuln_data)),
                    title=vuln_data.get('Title', 'Unknown'),
                    description=vuln_data.get('Description', ''),
                    severity=self._map_severity(vuln_data.get('Severity', 'UNKNOWN')),
                    cvss_score=self._extract_cvss(vuln_data),
                    cve_ids=[vuln_data.get('VulnerabilityID', '')] if vuln_data.get('VulnerabilityID', '').startswith('CVE') else [],
                    cwe_ids=vuln_data.get('CweIDs', []),
                    affected_component=vuln_data.get('PkgName', ''),
                    affected_version=vuln_data.get('InstalledVersion', ''),
                    file_path=target,
                    line_number=None,
                    source_scanner='trivy',
                    first_detected=datetime.utcnow().isoformat(),
                    remediation=vuln_data.get('FixedVersion'),
                    references=vuln_data.get('References', [])
                )
                vulns.append(vuln)
 
        return vulns
 
    def deduplicate(self, vulnerabilities: List[Vulnerability]) -> List[Vulnerability]:
        """Remove duplicate vulnerabilities."""
 
        unique_vulns = []
 
        for vuln in vulnerabilities:
            # Create dedup key
            dedup_key = f"{vuln.affected_component}:{vuln.affected_version}:{','.join(vuln.cve_ids)}"
            dedup_hash = hashlib.md5(dedup_key.encode()).hexdigest()
 
            if dedup_hash not in self.dedup_hashes:
                self.dedup_hashes.add(dedup_hash)
                unique_vulns.append(vuln)
 
        return unique_vulns
 
    def _generate_id(self, data: Dict) -> str:
        """Generate unique ID for vulnerability."""
        content = json.dumps(data, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()[:12]
 
    def _map_severity(self, severity: str) -> Severity:
        """Map scanner severity to normalized severity."""
        mapping = {
            'CRITICAL': Severity.CRITICAL,
            'HIGH': Severity.HIGH,
            'MEDIUM': Severity.MEDIUM,
            'MODERATE': Severity.MEDIUM,
            'LOW': Severity.LOW,
            'INFO': Severity.INFO,
            'INFORMATIONAL': Severity.INFO
        }
        return mapping.get(severity.upper(), Severity.INFO)
 
    def _extract_cvss(self, vuln_data: Dict) -> Optional[float]:
        """Extract CVSS score from vulnerability data."""
        cvss = vuln_data.get('CVSS', {})
 
        # Try different CVSS sources
        for source in ['nvd', 'ghsa', 'redhat']:
            if source in cvss and 'V3Score' in cvss[source]:
                return cvss[source]['V3Score']
 
        return None

Vulnerability Prioritization

Risk-Based Prioritization Framework

# vulnerability_prioritization.py
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
 
class ExploitMaturity(Enum):
    UNPROVEN = 1
    POC = 2
    FUNCTIONAL = 3
    HIGH = 4
    WEAPONIZED = 5
 
class AssetCriticality(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4
 
@dataclass
class PrioritizationContext:
    """Context for vulnerability prioritization."""
    asset_criticality: AssetCriticality
    is_internet_facing: bool
    has_sensitive_data: bool
    exploit_available: bool
    exploit_maturity: ExploitMaturity
    in_kev: bool  # CISA Known Exploited Vulnerabilities
    epss_score: Optional[float]  # Exploit Prediction Scoring System
 
class VulnerabilityPrioritizer:
    """Prioritize vulnerabilities based on risk context."""
 
    def __init__(self):
        self.kev_catalog = self._load_kev_catalog()
        self.epss_scores = self._load_epss_scores()
 
    def calculate_priority_score(
        self,
        vulnerability: Vulnerability,
        context: PrioritizationContext
    ) -> float:
        """Calculate priority score (0-100)."""
 
        score = 0
 
        # Base score from CVSS (0-40 points)
        if vulnerability.cvss_score:
            score += vulnerability.cvss_score * 4
 
        # Exploit maturity (0-20 points)
        score += context.exploit_maturity.value * 4
 
        # Asset criticality (0-20 points)
        score += context.asset_criticality.value * 5
 
        # Environmental factors (0-20 points)
        if context.is_internet_facing:
            score += 10
        if context.has_sensitive_data:
            score += 5
        if context.in_kev:
            score += 15  # Significant boost for known exploited
 
        # EPSS adjustment (-10 to +10)
        if context.epss_score:
            if context.epss_score > 0.9:
                score += 10
            elif context.epss_score > 0.7:
                score += 5
            elif context.epss_score < 0.1:
                score -= 10
 
        return min(max(score, 0), 100)
 
    def prioritize_vulnerabilities(
        self,
        vulnerabilities: List[Vulnerability],
        asset_context: Dict
    ) -> List[Dict]:
        """Prioritize list of vulnerabilities."""
 
        prioritized = []
 
        for vuln in vulnerabilities:
            # Build context
            context = self._build_context(vuln, asset_context)
 
            # Calculate score
            score = self.calculate_priority_score(vuln, context)
 
            # Determine SLA
            sla = self._determine_sla(score, context)
 
            prioritized.append({
                'vulnerability': vuln,
                'priority_score': score,
                'context': context,
                'sla': sla,
                'recommended_action': self._recommend_action(vuln, score)
            })
 
        # Sort by priority score descending
        prioritized.sort(key=lambda x: x['priority_score'], reverse=True)
 
        return prioritized
 
    def _build_context(
        self,
        vuln: Vulnerability,
        asset_context: Dict
    ) -> PrioritizationContext:
        """Build prioritization context for vulnerability."""
 
        # Check KEV catalog
        in_kev = any(cve in self.kev_catalog for cve in vuln.cve_ids)
 
        # Get EPSS score
        epss_score = None
        for cve in vuln.cve_ids:
            if cve in self.epss_scores:
                epss_score = self.epss_scores[cve]
                break
 
        return PrioritizationContext(
            asset_criticality=AssetCriticality[asset_context.get('criticality', 'MEDIUM')],
            is_internet_facing=asset_context.get('internet_facing', False),
            has_sensitive_data=asset_context.get('sensitive_data', False),
            exploit_available=vuln.cve_ids and in_kev,
            exploit_maturity=self._get_exploit_maturity(vuln),
            in_kev=in_kev,
            epss_score=epss_score
        )
 
    def _determine_sla(self, score: float, context: PrioritizationContext) -> Dict:
        """Determine remediation SLA based on priority."""
 
        if score >= 90 or context.in_kev:
            return {'days': 1, 'priority': 'P1', 'description': 'Critical - Immediate action required'}
        elif score >= 70:
            return {'days': 7, 'priority': 'P2', 'description': 'High - Remediate within 1 week'}
        elif score >= 50:
            return {'days': 30, 'priority': 'P3', 'description': 'Medium - Remediate within 1 month'}
        elif score >= 30:
            return {'days': 90, 'priority': 'P4', 'description': 'Low - Remediate within quarter'}
        else:
            return {'days': 180, 'priority': 'P5', 'description': 'Informational - Address as able'}
 
    def _recommend_action(self, vuln: Vulnerability, score: float) -> str:
        """Recommend remediation action."""
 
        if vuln.remediation:
            if score >= 70:
                return f"URGENT: Upgrade to {vuln.remediation}"
            else:
                return f"Upgrade to {vuln.remediation}"
 
        if score >= 90:
            return "Apply emergency mitigation - see references"
        elif score >= 70:
            return "Investigate and apply vendor patch or workaround"
        else:
            return "Monitor for updates and plan remediation"
 
    def _get_exploit_maturity(self, vuln: Vulnerability) -> ExploitMaturity:
        """Determine exploit maturity level."""
        # In practice, query exploit databases
        return ExploitMaturity.UNPROVEN
 
    def _load_kev_catalog(self) -> set:
        """Load CISA KEV catalog."""
        # Fetch from https://www.cisa.gov/known-exploited-vulnerabilities-catalog
        return set()
 
    def _load_epss_scores(self) -> Dict[str, float]:
        """Load EPSS scores."""
        # Fetch from https://www.first.org/epss
        return {}

Automated Remediation

Dependency Update Automation

# auto_remediation.py
import subprocess
import json
from typing import List, Dict, Optional
from dataclasses import dataclass
 
@dataclass
class RemediationResult:
    """Result of remediation attempt."""
    vulnerability_id: str
    success: bool
    action_taken: str
    old_version: str
    new_version: Optional[str]
    error: Optional[str]
    requires_review: bool
 
class DependencyRemediator:
    """Automated dependency vulnerability remediation."""
 
    def __init__(self, config: Dict):
        self.config = config
        self.dry_run = config.get('dry_run', True)
        self.max_major_upgrade = config.get('max_major_upgrade', False)
 
    def remediate_npm(
        self,
        vulnerabilities: List[Vulnerability]
    ) -> List[RemediationResult]:
        """Remediate npm vulnerabilities."""
 
        results = []
 
        for vuln in vulnerabilities:
            if vuln.source_scanner not in ['snyk', 'npm_audit', 'trivy']:
                continue
 
            # Get current version
            current = self._get_npm_version(vuln.affected_component)
 
            # Determine safe upgrade
            target = self._find_safe_version(
                vuln.affected_component,
                vuln.remediation,
                current
            )
 
            if not target:
                results.append(RemediationResult(
                    vulnerability_id=vuln.id,
                    success=False,
                    action_taken='none',
                    old_version=current or 'unknown',
                    new_version=None,
                    error='No safe upgrade path found',
                    requires_review=True
                ))
                continue
 
            # Check if upgrade is acceptable
            if not self._is_upgrade_acceptable(current, target):
                results.append(RemediationResult(
                    vulnerability_id=vuln.id,
                    success=False,
                    action_taken='skipped',
                    old_version=current,
                    new_version=target,
                    error='Major version upgrade requires manual review',
                    requires_review=True
                ))
                continue
 
            # Perform upgrade
            if not self.dry_run:
                success = self._npm_upgrade(vuln.affected_component, target)
            else:
                success = True
 
            results.append(RemediationResult(
                vulnerability_id=vuln.id,
                success=success,
                action_taken='upgrade' if not self.dry_run else 'dry_run',
                old_version=current,
                new_version=target,
                error=None,
                requires_review=False
            ))
 
        return results
 
    def remediate_python(
        self,
        vulnerabilities: List[Vulnerability]
    ) -> List[RemediationResult]:
        """Remediate Python vulnerabilities."""
 
        results = []
 
        for vuln in vulnerabilities:
            current = self._get_pip_version(vuln.affected_component)
            target = vuln.remediation
 
            if not target:
                results.append(RemediationResult(
                    vulnerability_id=vuln.id,
                    success=False,
                    action_taken='none',
                    old_version=current or 'unknown',
                    new_version=None,
                    error='No fix version available',
                    requires_review=True
                ))
                continue
 
            if not self.dry_run:
                success = self._pip_upgrade(vuln.affected_component, target)
            else:
                success = True
 
            results.append(RemediationResult(
                vulnerability_id=vuln.id,
                success=success,
                action_taken='upgrade' if not self.dry_run else 'dry_run',
                old_version=current,
                new_version=target,
                error=None,
                requires_review=False
            ))
 
        return results
 
    def create_remediation_pr(
        self,
        results: List[RemediationResult],
        branch_name: str
    ) -> Dict:
        """Create pull request with remediation changes."""
 
        successful = [r for r in results if r.success and r.action_taken == 'upgrade']
 
        if not successful:
            return {'created': False, 'reason': 'No successful remediations'}
 
        # Generate PR description
        description = self._generate_pr_description(successful)
 
        # Create branch and PR
        pr_data = {
            'title': f'Security: Remediate {len(successful)} vulnerabilities',
            'body': description,
            'branch': branch_name,
            'base': 'main',
            'labels': ['security', 'automated']
        }
 
        return pr_data
 
    def _generate_pr_description(self, results: List[RemediationResult]) -> str:
        """Generate PR description for remediation."""
 
        lines = [
            '## Security Vulnerability Remediation',
            '',
            'This PR addresses the following vulnerabilities:',
            ''
        ]
 
        for result in results:
            lines.append(f'- `{result.vulnerability_id}`: {result.old_version}{result.new_version}')
 
        lines.extend([
            '',
            '### Testing',
            '- [ ] Unit tests pass',
            '- [ ] Integration tests pass',
            '- [ ] No breaking changes detected',
            '',
            '### Notes',
            'This PR was automatically generated by the vulnerability remediation system.',
            'Please review changes before merging.'
        ])
 
        return '\n'.join(lines)
 
    def _get_npm_version(self, package: str) -> Optional[str]:
        """Get current npm package version."""
        try:
            result = subprocess.run(
                ['npm', 'list', package, '--json'],
                capture_output=True, text=True
            )
            data = json.loads(result.stdout)
            return data.get('dependencies', {}).get(package, {}).get('version')
        except:
            return None
 
    def _npm_upgrade(self, package: str, version: str) -> bool:
        """Upgrade npm package."""
        try:
            subprocess.run(
                ['npm', 'install', f'{package}@{version}'],
                check=True
            )
            return True
        except:
            return False
 
    def _is_upgrade_acceptable(self, current: str, target: str) -> bool:
        """Check if upgrade is within acceptable bounds."""
        if not current or not target:
            return False
 
        current_parts = current.split('.')
        target_parts = target.split('.')
 
        # Check major version
        if int(target_parts[0]) > int(current_parts[0]):
            return self.max_major_upgrade
 
        return True
 
    def _find_safe_version(
        self,
        package: str,
        fixed_version: Optional[str],
        current_version: Optional[str]
    ) -> Optional[str]:
        """Find safe version to upgrade to."""
 
        if fixed_version:
            return fixed_version
 
        # Query npm for latest patch version
        # Implementation depends on your requirements
        return None
 
    def _get_pip_version(self, package: str) -> Optional[str]:
        """Get current pip package version."""
        try:
            result = subprocess.run(
                ['pip', 'show', package],
                capture_output=True, text=True
            )
            for line in result.stdout.split('\n'):
                if line.startswith('Version:'):
                    return line.split(':')[1].strip()
        except:
            return None
 
    def _pip_upgrade(self, package: str, version: str) -> bool:
        """Upgrade pip package."""
        try:
            subprocess.run(
                ['pip', 'install', f'{package}=={version}'],
                check=True
            )
            return True
        except:
            return False

Metrics and Reporting

Vulnerability Metrics Dashboard

# vulnerability_metrics.py
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime, timedelta
from collections import defaultdict
 
@dataclass
class VulnerabilityMetrics:
    """Vulnerability management metrics."""
    total_open: int
    critical_open: int
    high_open: int
    medium_open: int
    low_open: int
    mttr_critical: float  # Mean Time to Remediate
    mttr_high: float
    sla_compliance_rate: float
    new_this_week: int
    closed_this_week: int
    overdue_count: int
 
class MetricsCalculator:
    """Calculate vulnerability management metrics."""
 
    def __init__(self, vulnerability_store):
        self.store = vulnerability_store
 
    def calculate_metrics(self, as_of: datetime = None) -> VulnerabilityMetrics:
        """Calculate current vulnerability metrics."""
 
        as_of = as_of or datetime.utcnow()
        vulns = self.store.get_all_open()
 
        # Count by severity
        by_severity = defaultdict(int)
        for v in vulns:
            by_severity[v.severity.value] += 1
 
        # Calculate MTTR
        closed_vulns = self.store.get_closed(
            since=as_of - timedelta(days=90)
        )
 
        mttr_critical = self._calculate_mttr(closed_vulns, Severity.CRITICAL)
        mttr_high = self._calculate_mttr(closed_vulns, Severity.HIGH)
 
        # SLA compliance
        sla_compliance = self._calculate_sla_compliance(closed_vulns)
 
        # Weekly activity
        week_ago = as_of - timedelta(days=7)
        new_this_week = len(self.store.get_opened_since(week_ago))
        closed_this_week = len(self.store.get_closed_since(week_ago))
 
        # Overdue
        overdue = len([v for v in vulns if self._is_overdue(v, as_of)])
 
        return VulnerabilityMetrics(
            total_open=len(vulns),
            critical_open=by_severity['critical'],
            high_open=by_severity['high'],
            medium_open=by_severity['medium'],
            low_open=by_severity['low'],
            mttr_critical=mttr_critical,
            mttr_high=mttr_high,
            sla_compliance_rate=sla_compliance,
            new_this_week=new_this_week,
            closed_this_week=closed_this_week,
            overdue_count=overdue
        )
 
    def generate_trend_report(
        self,
        start_date: datetime,
        end_date: datetime,
        interval: str = 'week'
    ) -> List[Dict]:
        """Generate trend report over time period."""
 
        trends = []
        current = start_date
 
        while current <= end_date:
            metrics = self.calculate_metrics(as_of=current)
            trends.append({
                'date': current.isoformat(),
                'total_open': metrics.total_open,
                'critical': metrics.critical_open,
                'high': metrics.high_open,
                'mttr_critical': metrics.mttr_critical
            })
 
            if interval == 'week':
                current += timedelta(weeks=1)
            elif interval == 'day':
                current += timedelta(days=1)
            else:
                current += timedelta(days=30)
 
        return trends
 
    def _calculate_mttr(
        self,
        closed_vulns: List,
        severity: Severity
    ) -> float:
        """Calculate Mean Time to Remediate for severity level."""
 
        relevant = [
            v for v in closed_vulns
            if v.severity == severity
        ]
 
        if not relevant:
            return 0
 
        total_days = sum(
            (v.closed_at - v.first_detected).days
            for v in relevant
        )
 
        return total_days / len(relevant)
 
    def _calculate_sla_compliance(self, closed_vulns: List) -> float:
        """Calculate SLA compliance rate."""
 
        if not closed_vulns:
            return 100.0
 
        compliant = sum(
            1 for v in closed_vulns
            if self._was_within_sla(v)
        )
 
        return (compliant / len(closed_vulns)) * 100
 
    def _was_within_sla(self, vuln) -> bool:
        """Check if vulnerability was remediated within SLA."""
 
        sla_days = {
            'critical': 1,
            'high': 7,
            'medium': 30,
            'low': 90
        }
 
        allowed_days = sla_days.get(vuln.severity.value, 90)
        actual_days = (vuln.closed_at - vuln.first_detected).days
 
        return actual_days <= allowed_days
 
    def _is_overdue(self, vuln, as_of: datetime) -> bool:
        """Check if vulnerability is past SLA."""
 
        sla_days = {
            'critical': 1,
            'high': 7,
            'medium': 30,
            'low': 90
        }
 
        allowed_days = sla_days.get(vuln.severity.value, 90)
        days_open = (as_of - vuln.first_detected).days
 
        return days_open > allowed_days

Conclusion

Effective vulnerability management in DevSecOps requires:

  1. Comprehensive Detection - Multiple scanners covering code, dependencies, containers, and IaC
  2. Intelligent Prioritization - Risk-based scoring considering exploitability and asset context
  3. Automated Remediation - Auto-patching where safe, with human review for major changes
  4. Continuous Monitoring - Track metrics, trends, and SLA compliance

By implementing these practices, organizations can systematically reduce their vulnerability exposure while maintaining development velocity.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.