DevSecOps

Vulnerability Management in DevSecOps: From Detection to Remediation

DeviDevs Team
12 min read
#vulnerability management#DevSecOps#security scanning#CVSS#remediation

Vulnerability Management in DevSecOps: From Detection to Remediation

Effective vulnerability management requires more than running scanners - it demands a systematic approach to detection, prioritization, and remediation integrated into your development workflow.

Vulnerability Detection Pipeline

Multi-Scanner Integration

# .github/workflows/vulnerability-scan.yml
name: Comprehensive Security Scan
 
on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 6 * * *'  # Daily at 6 AM
 
jobs:
  sast:
    name: Static Application Security Testing
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0
 
      - name: SonarQube Scan
        uses: sonarsource/sonarqube-scan-action@master
        env:
          SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
          SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
 
      - name: Semgrep
        uses: semgrep/semgrep-action@v1
        with:
          config: >-
            p/security-audit
            p/secrets
            p/owasp-top-ten
            p/cwe-top-25
 
      - name: CodeQL Analysis
        uses: github/codeql-action/analyze@v3
        with:
          category: "/language:${{ matrix.language }}"
 
  sca:
    name: Software Composition Analysis
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Snyk SCA
        uses: snyk/actions/node@master
        env:
          SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
        with:
          args: --severity-threshold=medium --json-file-output=snyk-results.json
 
      - name: OWASP Dependency Check
        uses: dependency-check/Dependency-Check_Action@main
        with:
          project: 'my-project'
          path: '.'
          format: 'JSON'
 
      - name: Trivy Filesystem Scan
        uses: aquasecurity/trivy-action@master
        with:
          scan-type: 'fs'
          scan-ref: '.'
          format: 'json'
          output: 'trivy-fs-results.json'
 
  container:
    name: Container Security
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Build Image
        run: docker build -t app:${{ github.sha }} .
 
      - name: Trivy Container Scan
        uses: aquasecurity/trivy-action@master
        with:
          image-ref: 'app:${{ github.sha }}'
          format: 'json'
          output: 'trivy-container-results.json'
          severity: 'CRITICAL,HIGH,MEDIUM'
 
      - name: Grype Scan
        uses: anchore/scan-action@v3
        with:
          image: 'app:${{ github.sha }}'
          fail-build: false
          output-format: json
 
  iac:
    name: Infrastructure as Code Security
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: tfsec
        uses: aquasecurity/tfsec-action@v1.0.3
        with:
          additional_args: --format json --out tfsec-results.json
 
      - name: Checkov
        uses: bridgecrewio/checkov-action@v12
        with:
          framework: terraform,kubernetes,dockerfile
          output_format: json
          output_file_path: checkov-results.json
 
  aggregate:
    name: Aggregate Results
    needs: [sast, sca, container, iac]
    runs-on: ubuntu-latest
    steps:
      - name: Download Artifacts
        uses: actions/download-artifact@v4
 
      - name: Normalize and Aggregate
        run: |
          python scripts/aggregate_vulns.py \
            --sast sast-results/*.json \
            --sca sca-results/*.json \
            --container container-results/*.json \
            --iac iac-results/*.json \
            --output aggregated-vulns.json
 
      - name: Upload to Vulnerability Management Platform
        run: |
          curl -X POST ${{ secrets.VULN_PLATFORM_URL }}/api/import \
            -H "Authorization: Bearer ${{ secrets.VULN_PLATFORM_TOKEN }}" \
            -F "file=@aggregated-vulns.json"

Vulnerability Aggregation and Normalization

# aggregate_vulns.py
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
import hashlib
 
class Severity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"
 
@dataclass
class Vulnerability:
    """Normalized vulnerability record."""
    id: str
    title: str
    description: str
    severity: Severity
    cvss_score: Optional[float]
    cve_ids: List[str]
    cwe_ids: List[str]
    affected_component: str
    affected_version: str
    file_path: Optional[str]
    line_number: Optional[int]
    source_scanner: str
    first_detected: str
    remediation: Optional[str]
    references: List[str]
 
class VulnerabilityAggregator:
    """Aggregate and normalize vulnerabilities from multiple scanners."""
 
    def __init__(self):
        self.vulnerabilities: List[Vulnerability] = []
        self.dedup_hashes = set()
 
    def normalize_semgrep(self, results: Dict) -> List[Vulnerability]:
        """Normalize Semgrep results."""
        vulns = []
 
        for result in results.get('results', []):
            vuln = Vulnerability(
                id=self._generate_id(result),
                title=result.get('check_id', 'Unknown'),
                description=result.get('extra', {}).get('message', ''),
                severity=self._map_severity(result.get('extra', {}).get('severity', 'INFO')),
                cvss_score=None,
                cve_ids=[],
                cwe_ids=result.get('extra', {}).get('metadata', {}).get('cwe', []),
                affected_component=result.get('path', ''),
                affected_version='N/A',
                file_path=result.get('path'),
                line_number=result.get('start', {}).get('line'),
                source_scanner='semgrep',
                first_detected=datetime.utcnow().isoformat(),
                remediation=result.get('extra', {}).get('fix'),
                references=result.get('extra', {}).get('metadata', {}).get('references', [])
            )
            vulns.append(vuln)
 
        return vulns
 
    def normalize_snyk(self, results: Dict) -> List[Vulnerability]:
        """Normalize Snyk SCA results."""
        vulns = []
 
        for vuln_data in results.get('vulnerabilities', []):
            vuln = Vulnerability(
                id=vuln_data.get('id', self._generate_id(vuln_data)),
                title=vuln_data.get('title', 'Unknown'),
                description=vuln_data.get('description', ''),
                severity=self._map_severity(vuln_data.get('severity', 'low')),
                cvss_score=vuln_data.get('cvssScore'),
                cve_ids=vuln_data.get('identifiers', {}).get('CVE', []),
                cwe_ids=vuln_data.get('identifiers', {}).get('CWE', []),
                affected_component=vuln_data.get('packageName', ''),
                affected_version=vuln_data.get('version', ''),
                file_path=vuln_data.get('from', [''])[0] if vuln_data.get('from') else None,
                line_number=None,
                source_scanner='snyk',
                first_detected=datetime.utcnow().isoformat(),
                remediation=vuln_data.get('fixedIn', [None])[0] if vuln_data.get('fixedIn') else None,
                references=[vuln_data.get('url', '')]
            )
            vulns.append(vuln)
 
        return vulns
 
    def normalize_trivy(self, results: Dict) -> List[Vulnerability]:
        """Normalize Trivy results."""
        vulns = []
 
        for result in results.get('Results', []):
            target = result.get('Target', '')
 
            for vuln_data in result.get('Vulnerabilities', []):
                vuln = Vulnerability(
                    id=vuln_data.get('VulnerabilityID', self._generate_id(vuln_data)),
                    title=vuln_data.get('Title', 'Unknown'),
                    description=vuln_data.get('Description', ''),
                    severity=self._map_severity(vuln_data.get('Severity', 'UNKNOWN')),
                    cvss_score=self._extract_cvss(vuln_data),
                    cve_ids=[vuln_data.get('VulnerabilityID', '')] if vuln_data.get('VulnerabilityID', '').startswith('CVE') else [],
                    cwe_ids=vuln_data.get('CweIDs', []),
                    affected_component=vuln_data.get('PkgName', ''),
                    affected_version=vuln_data.get('InstalledVersion', ''),
                    file_path=target,
                    line_number=None,
                    source_scanner='trivy',
                    first_detected=datetime.utcnow().isoformat(),
                    remediation=vuln_data.get('FixedVersion'),
                    references=vuln_data.get('References', [])
                )
                vulns.append(vuln)
 
        return vulns
 
    def deduplicate(self, vulnerabilities: List[Vulnerability]) -> List[Vulnerability]:
        """Remove duplicate vulnerabilities."""
 
        unique_vulns = []
 
        for vuln in vulnerabilities:
            # Create dedup key
            dedup_key = f"{vuln.affected_component}:{vuln.affected_version}:{','.join(vuln.cve_ids)}"
            dedup_hash = hashlib.md5(dedup_key.encode()).hexdigest()
 
            if dedup_hash not in self.dedup_hashes:
                self.dedup_hashes.add(dedup_hash)
                unique_vulns.append(vuln)
 
        return unique_vulns
 
    def _generate_id(self, data: Dict) -> str:
        """Generate unique ID for vulnerability."""
        content = json.dumps(data, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()[:12]
 
    def _map_severity(self, severity: str) -> Severity:
        """Map scanner severity to normalized severity."""
        mapping = {
            'CRITICAL': Severity.CRITICAL,
            'HIGH': Severity.HIGH,
            'MEDIUM': Severity.MEDIUM,
            'MODERATE': Severity.MEDIUM,
            'LOW': Severity.LOW,
            'INFO': Severity.INFO,
            'INFORMATIONAL': Severity.INFO
        }
        return mapping.get(severity.upper(), Severity.INFO)
 
    def _extract_cvss(self, vuln_data: Dict) -> Optional[float]:
        """Extract CVSS score from vulnerability data."""
        cvss = vuln_data.get('CVSS', {})
 
        # Try different CVSS sources
        for source in ['nvd', 'ghsa', 'redhat']:
            if source in cvss and 'V3Score' in cvss[source]:
                return cvss[source]['V3Score']
 
        return None

Vulnerability Prioritization

Risk-Based Prioritization Framework

# vulnerability_prioritization.py
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
 
class ExploitMaturity(Enum):
    UNPROVEN = 1
    POC = 2
    FUNCTIONAL = 3
    HIGH = 4
    WEAPONIZED = 5
 
class AssetCriticality(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4
 
@dataclass
class PrioritizationContext:
    """Context for vulnerability prioritization."""
    asset_criticality: AssetCriticality
    is_internet_facing: bool
    has_sensitive_data: bool
    exploit_available: bool
    exploit_maturity: ExploitMaturity
    in_kev: bool  # CISA Known Exploited Vulnerabilities
    epss_score: Optional[float]  # Exploit Prediction Scoring System
 
class VulnerabilityPrioritizer:
    """Prioritize vulnerabilities based on risk context."""
 
    def __init__(self):
        self.kev_catalog = self._load_kev_catalog()
        self.epss_scores = self._load_epss_scores()
 
    def calculate_priority_score(
        self,
        vulnerability: Vulnerability,
        context: PrioritizationContext
    ) -> float:
        """Calculate priority score (0-100)."""
 
        score = 0
 
        # Base score from CVSS (0-40 points)
        if vulnerability.cvss_score:
            score += vulnerability.cvss_score * 4
 
        # Exploit maturity (0-20 points)
        score += context.exploit_maturity.value * 4
 
        # Asset criticality (0-20 points)
        score += context.asset_criticality.value * 5
 
        # Environmental factors (0-20 points)
        if context.is_internet_facing:
            score += 10
        if context.has_sensitive_data:
            score += 5
        if context.in_kev:
            score += 15  # Significant boost for known exploited
 
        # EPSS adjustment (-10 to +10)
        if context.epss_score:
            if context.epss_score > 0.9:
                score += 10
            elif context.epss_score > 0.7:
                score += 5
            elif context.epss_score < 0.1:
                score -= 10
 
        return min(max(score, 0), 100)
 
    def prioritize_vulnerabilities(
        self,
        vulnerabilities: List[Vulnerability],
        asset_context: Dict
    ) -> List[Dict]:
        """Prioritize list of vulnerabilities."""
 
        prioritized = []
 
        for vuln in vulnerabilities:
            # Build context
            context = self._build_context(vuln, asset_context)
 
            # Calculate score
            score = self.calculate_priority_score(vuln, context)
 
            # Determine SLA
            sla = self._determine_sla(score, context)
 
            prioritized.append({
                'vulnerability': vuln,
                'priority_score': score,
                'context': context,
                'sla': sla,
                'recommended_action': self._recommend_action(vuln, score)
            })
 
        # Sort by priority score descending
        prioritized.sort(key=lambda x: x['priority_score'], reverse=True)
 
        return prioritized
 
    def _build_context(
        self,
        vuln: Vulnerability,
        asset_context: Dict
    ) -> PrioritizationContext:
        """Build prioritization context for vulnerability."""
 
        # Check KEV catalog
        in_kev = any(cve in self.kev_catalog for cve in vuln.cve_ids)
 
        # Get EPSS score
        epss_score = None
        for cve in vuln.cve_ids:
            if cve in self.epss_scores:
                epss_score = self.epss_scores[cve]
                break
 
        return PrioritizationContext(
            asset_criticality=AssetCriticality[asset_context.get('criticality', 'MEDIUM')],
            is_internet_facing=asset_context.get('internet_facing', False),
            has_sensitive_data=asset_context.get('sensitive_data', False),
            exploit_available=vuln.cve_ids and in_kev,
            exploit_maturity=self._get_exploit_maturity(vuln),
            in_kev=in_kev,
            epss_score=epss_score
        )
 
    def _determine_sla(self, score: float, context: PrioritizationContext) -> Dict:
        """Determine remediation SLA based on priority."""
 
        if score >= 90 or context.in_kev:
            return {'days': 1, 'priority': 'P1', 'description': 'Critical - Immediate action required'}
        elif score >= 70:
            return {'days': 7, 'priority': 'P2', 'description': 'High - Remediate within 1 week'}
        elif score >= 50:
            return {'days': 30, 'priority': 'P3', 'description': 'Medium - Remediate within 1 month'}
        elif score >= 30:
            return {'days': 90, 'priority': 'P4', 'description': 'Low - Remediate within quarter'}
        else:
            return {'days': 180, 'priority': 'P5', 'description': 'Informational - Address as able'}
 
    def _recommend_action(self, vuln: Vulnerability, score: float) -> str:
        """Recommend remediation action."""
 
        if vuln.remediation:
            if score >= 70:
                return f"URGENT: Upgrade to {vuln.remediation}"
            else:
                return f"Upgrade to {vuln.remediation}"
 
        if score >= 90:
            return "Apply emergency mitigation - see references"
        elif score >= 70:
            return "Investigate and apply vendor patch or workaround"
        else:
            return "Monitor for updates and plan remediation"
 
    def _get_exploit_maturity(self, vuln: Vulnerability) -> ExploitMaturity:
        """Determine exploit maturity level."""
        # In practice, query exploit databases
        return ExploitMaturity.UNPROVEN
 
    def _load_kev_catalog(self) -> set:
        """Load CISA KEV catalog."""
        # Fetch from https://www.cisa.gov/known-exploited-vulnerabilities-catalog
        return set()
 
    def _load_epss_scores(self) -> Dict[str, float]:
        """Load EPSS scores."""
        # Fetch from https://www.first.org/epss
        return {}

Automated Remediation

Dependency Update Automation

# auto_remediation.py
import subprocess
import json
from typing import List, Dict, Optional
from dataclasses import dataclass
 
@dataclass
class RemediationResult:
    """Result of remediation attempt."""
    vulnerability_id: str
    success: bool
    action_taken: str
    old_version: str
    new_version: Optional[str]
    error: Optional[str]
    requires_review: bool
 
class DependencyRemediator:
    """Automated dependency vulnerability remediation."""
 
    def __init__(self, config: Dict):
        self.config = config
        self.dry_run = config.get('dry_run', True)
        self.max_major_upgrade = config.get('max_major_upgrade', False)
 
    def remediate_npm(
        self,
        vulnerabilities: List[Vulnerability]
    ) -> List[RemediationResult]:
        """Remediate npm vulnerabilities."""
 
        results = []
 
        for vuln in vulnerabilities:
            if vuln.source_scanner not in ['snyk', 'npm_audit', 'trivy']:
                continue
 
            # Get current version
            current = self._get_npm_version(vuln.affected_component)
 
            # Determine safe upgrade
            target = self._find_safe_version(
                vuln.affected_component,
                vuln.remediation,
                current
            )
 
            if not target:
                results.append(RemediationResult(
                    vulnerability_id=vuln.id,
                    success=False,
                    action_taken='none',
                    old_version=current or 'unknown',
                    new_version=None,
                    error='No safe upgrade path found',
                    requires_review=True
                ))
                continue
 
            # Check if upgrade is acceptable
            if not self._is_upgrade_acceptable(current, target):
                results.append(RemediationResult(
                    vulnerability_id=vuln.id,
                    success=False,
                    action_taken='skipped',
                    old_version=current,
                    new_version=target,
                    error='Major version upgrade requires manual review',
                    requires_review=True
                ))
                continue
 
            # Perform upgrade
            if not self.dry_run:
                success = self._npm_upgrade(vuln.affected_component, target)
            else:
                success = True
 
            results.append(RemediationResult(
                vulnerability_id=vuln.id,
                success=success,
                action_taken='upgrade' if not self.dry_run else 'dry_run',
                old_version=current,
                new_version=target,
                error=None,
                requires_review=False
            ))
 
        return results
 
    def remediate_python(
        self,
        vulnerabilities: List[Vulnerability]
    ) -> List[RemediationResult]:
        """Remediate Python vulnerabilities."""
 
        results = []
 
        for vuln in vulnerabilities:
            current = self._get_pip_version(vuln.affected_component)
            target = vuln.remediation
 
            if not target:
                results.append(RemediationResult(
                    vulnerability_id=vuln.id,
                    success=False,
                    action_taken='none',
                    old_version=current or 'unknown',
                    new_version=None,
                    error='No fix version available',
                    requires_review=True
                ))
                continue
 
            if not self.dry_run:
                success = self._pip_upgrade(vuln.affected_component, target)
            else:
                success = True
 
            results.append(RemediationResult(
                vulnerability_id=vuln.id,
                success=success,
                action_taken='upgrade' if not self.dry_run else 'dry_run',
                old_version=current,
                new_version=target,
                error=None,
                requires_review=False
            ))
 
        return results
 
    def create_remediation_pr(
        self,
        results: List[RemediationResult],
        branch_name: str
    ) -> Dict:
        """Create pull request with remediation changes."""
 
        successful = [r for r in results if r.success and r.action_taken == 'upgrade']
 
        if not successful:
            return {'created': False, 'reason': 'No successful remediations'}
 
        # Generate PR description
        description = self._generate_pr_description(successful)
 
        # Create branch and PR
        pr_data = {
            'title': f'Security: Remediate {len(successful)} vulnerabilities',
            'body': description,
            'branch': branch_name,
            'base': 'main',
            'labels': ['security', 'automated']
        }
 
        return pr_data
 
    def _generate_pr_description(self, results: List[RemediationResult]) -> str:
        """Generate PR description for remediation."""
 
        lines = [
            '## Security Vulnerability Remediation',
            '',
            'This PR addresses the following vulnerabilities:',
            ''
        ]
 
        for result in results:
            lines.append(f'- `{result.vulnerability_id}`: {result.old_version}{result.new_version}')
 
        lines.extend([
            '',
            '### Testing',
            '- [ ] Unit tests pass',
            '- [ ] Integration tests pass',
            '- [ ] No breaking changes detected',
            '',
            '### Notes',
            'This PR was automatically generated by the vulnerability remediation system.',
            'Please review changes before merging.'
        ])
 
        return '\n'.join(lines)
 
    def _get_npm_version(self, package: str) -> Optional[str]:
        """Get current npm package version."""
        try:
            result = subprocess.run(
                ['npm', 'list', package, '--json'],
                capture_output=True, text=True
            )
            data = json.loads(result.stdout)
            return data.get('dependencies', {}).get(package, {}).get('version')
        except:
            return None
 
    def _npm_upgrade(self, package: str, version: str) -> bool:
        """Upgrade npm package."""
        try:
            subprocess.run(
                ['npm', 'install', f'{package}@{version}'],
                check=True
            )
            return True
        except:
            return False
 
    def _is_upgrade_acceptable(self, current: str, target: str) -> bool:
        """Check if upgrade is within acceptable bounds."""
        if not current or not target:
            return False
 
        current_parts = current.split('.')
        target_parts = target.split('.')
 
        # Check major version
        if int(target_parts[0]) > int(current_parts[0]):
            return self.max_major_upgrade
 
        return True
 
    def _find_safe_version(
        self,
        package: str,
        fixed_version: Optional[str],
        current_version: Optional[str]
    ) -> Optional[str]:
        """Find safe version to upgrade to."""
 
        if fixed_version:
            return fixed_version
 
        # Query npm for latest patch version
        # Implementation depends on your requirements
        return None
 
    def _get_pip_version(self, package: str) -> Optional[str]:
        """Get current pip package version."""
        try:
            result = subprocess.run(
                ['pip', 'show', package],
                capture_output=True, text=True
            )
            for line in result.stdout.split('\n'):
                if line.startswith('Version:'):
                    return line.split(':')[1].strip()
        except:
            return None
 
    def _pip_upgrade(self, package: str, version: str) -> bool:
        """Upgrade pip package."""
        try:
            subprocess.run(
                ['pip', 'install', f'{package}=={version}'],
                check=True
            )
            return True
        except:
            return False

Metrics and Reporting

Vulnerability Metrics Dashboard

# vulnerability_metrics.py
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime, timedelta
from collections import defaultdict
 
@dataclass
class VulnerabilityMetrics:
    """Vulnerability management metrics."""
    total_open: int
    critical_open: int
    high_open: int
    medium_open: int
    low_open: int
    mttr_critical: float  # Mean Time to Remediate
    mttr_high: float
    sla_compliance_rate: float
    new_this_week: int
    closed_this_week: int
    overdue_count: int
 
class MetricsCalculator:
    """Calculate vulnerability management metrics."""
 
    def __init__(self, vulnerability_store):
        self.store = vulnerability_store
 
    def calculate_metrics(self, as_of: datetime = None) -> VulnerabilityMetrics:
        """Calculate current vulnerability metrics."""
 
        as_of = as_of or datetime.utcnow()
        vulns = self.store.get_all_open()
 
        # Count by severity
        by_severity = defaultdict(int)
        for v in vulns:
            by_severity[v.severity.value] += 1
 
        # Calculate MTTR
        closed_vulns = self.store.get_closed(
            since=as_of - timedelta(days=90)
        )
 
        mttr_critical = self._calculate_mttr(closed_vulns, Severity.CRITICAL)
        mttr_high = self._calculate_mttr(closed_vulns, Severity.HIGH)
 
        # SLA compliance
        sla_compliance = self._calculate_sla_compliance(closed_vulns)
 
        # Weekly activity
        week_ago = as_of - timedelta(days=7)
        new_this_week = len(self.store.get_opened_since(week_ago))
        closed_this_week = len(self.store.get_closed_since(week_ago))
 
        # Overdue
        overdue = len([v for v in vulns if self._is_overdue(v, as_of)])
 
        return VulnerabilityMetrics(
            total_open=len(vulns),
            critical_open=by_severity['critical'],
            high_open=by_severity['high'],
            medium_open=by_severity['medium'],
            low_open=by_severity['low'],
            mttr_critical=mttr_critical,
            mttr_high=mttr_high,
            sla_compliance_rate=sla_compliance,
            new_this_week=new_this_week,
            closed_this_week=closed_this_week,
            overdue_count=overdue
        )
 
    def generate_trend_report(
        self,
        start_date: datetime,
        end_date: datetime,
        interval: str = 'week'
    ) -> List[Dict]:
        """Generate trend report over time period."""
 
        trends = []
        current = start_date
 
        while current <= end_date:
            metrics = self.calculate_metrics(as_of=current)
            trends.append({
                'date': current.isoformat(),
                'total_open': metrics.total_open,
                'critical': metrics.critical_open,
                'high': metrics.high_open,
                'mttr_critical': metrics.mttr_critical
            })
 
            if interval == 'week':
                current += timedelta(weeks=1)
            elif interval == 'day':
                current += timedelta(days=1)
            else:
                current += timedelta(days=30)
 
        return trends
 
    def _calculate_mttr(
        self,
        closed_vulns: List,
        severity: Severity
    ) -> float:
        """Calculate Mean Time to Remediate for severity level."""
 
        relevant = [
            v for v in closed_vulns
            if v.severity == severity
        ]
 
        if not relevant:
            return 0
 
        total_days = sum(
            (v.closed_at - v.first_detected).days
            for v in relevant
        )
 
        return total_days / len(relevant)
 
    def _calculate_sla_compliance(self, closed_vulns: List) -> float:
        """Calculate SLA compliance rate."""
 
        if not closed_vulns:
            return 100.0
 
        compliant = sum(
            1 for v in closed_vulns
            if self._was_within_sla(v)
        )
 
        return (compliant / len(closed_vulns)) * 100
 
    def _was_within_sla(self, vuln) -> bool:
        """Check if vulnerability was remediated within SLA."""
 
        sla_days = {
            'critical': 1,
            'high': 7,
            'medium': 30,
            'low': 90
        }
 
        allowed_days = sla_days.get(vuln.severity.value, 90)
        actual_days = (vuln.closed_at - vuln.first_detected).days
 
        return actual_days <= allowed_days
 
    def _is_overdue(self, vuln, as_of: datetime) -> bool:
        """Check if vulnerability is past SLA."""
 
        sla_days = {
            'critical': 1,
            'high': 7,
            'medium': 30,
            'low': 90
        }
 
        allowed_days = sla_days.get(vuln.severity.value, 90)
        days_open = (as_of - vuln.first_detected).days
 
        return days_open > allowed_days

Conclusion

Effective vulnerability management in DevSecOps requires:

  1. Comprehensive Detection - Multiple scanners covering code, dependencies, containers, and IaC
  2. Intelligent Prioritization - Risk-based scoring considering exploitability and asset context
  3. Automated Remediation - Auto-patching where safe, with human review for major changes
  4. Continuous Monitoring - Track metrics, trends, and SLA compliance

By implementing these practices, organizations can systematically reduce their vulnerability exposure while maintaining development velocity.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.