Vulnerability Management in DevSecOps: From Detection to Remediation
Effective vulnerability management requires more than running scanners - it demands a systematic approach to detection, prioritization, and remediation integrated into your development workflow.
Vulnerability Detection Pipeline
Multi-Scanner Integration
# .github/workflows/vulnerability-scan.yml
name: Comprehensive Security Scan
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
schedule:
- cron: '0 6 * * *' # Daily at 6 AM
jobs:
sast:
name: Static Application Security Testing
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: SonarQube Scan
uses: sonarsource/sonarqube-scan-action@master
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
- name: Semgrep
uses: semgrep/semgrep-action@v1
with:
config: >-
p/security-audit
p/secrets
p/owasp-top-ten
p/cwe-top-25
- name: CodeQL Analysis
uses: github/codeql-action/analyze@v3
with:
category: "/language:${{ matrix.language }}"
sca:
name: Software Composition Analysis
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Snyk SCA
uses: snyk/actions/node@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
with:
args: --severity-threshold=medium --json-file-output=snyk-results.json
- name: OWASP Dependency Check
uses: dependency-check/Dependency-Check_Action@main
with:
project: 'my-project'
path: '.'
format: 'JSON'
- name: Trivy Filesystem Scan
uses: aquasecurity/trivy-action@master
with:
scan-type: 'fs'
scan-ref: '.'
format: 'json'
output: 'trivy-fs-results.json'
container:
name: Container Security
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build Image
run: docker build -t app:${{ github.sha }} .
- name: Trivy Container Scan
uses: aquasecurity/trivy-action@master
with:
image-ref: 'app:${{ github.sha }}'
format: 'json'
output: 'trivy-container-results.json'
severity: 'CRITICAL,HIGH,MEDIUM'
- name: Grype Scan
uses: anchore/scan-action@v3
with:
image: 'app:${{ github.sha }}'
fail-build: false
output-format: json
iac:
name: Infrastructure as Code Security
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: tfsec
uses: aquasecurity/tfsec-action@v1.0.3
with:
additional_args: --format json --out tfsec-results.json
- name: Checkov
uses: bridgecrewio/checkov-action@v12
with:
framework: terraform,kubernetes,dockerfile
output_format: json
output_file_path: checkov-results.json
aggregate:
name: Aggregate Results
needs: [sast, sca, container, iac]
runs-on: ubuntu-latest
steps:
- name: Download Artifacts
uses: actions/download-artifact@v4
- name: Normalize and Aggregate
run: |
python scripts/aggregate_vulns.py \
--sast sast-results/*.json \
--sca sca-results/*.json \
--container container-results/*.json \
--iac iac-results/*.json \
--output aggregated-vulns.json
- name: Upload to Vulnerability Management Platform
run: |
curl -X POST ${{ secrets.VULN_PLATFORM_URL }}/api/import \
-H "Authorization: Bearer ${{ secrets.VULN_PLATFORM_TOKEN }}" \
-F "file=@aggregated-vulns.json"Vulnerability Aggregation and Normalization
# aggregate_vulns.py
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
import hashlib
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class Vulnerability:
"""Normalized vulnerability record."""
id: str
title: str
description: str
severity: Severity
cvss_score: Optional[float]
cve_ids: List[str]
cwe_ids: List[str]
affected_component: str
affected_version: str
file_path: Optional[str]
line_number: Optional[int]
source_scanner: str
first_detected: str
remediation: Optional[str]
references: List[str]
class VulnerabilityAggregator:
"""Aggregate and normalize vulnerabilities from multiple scanners."""
def __init__(self):
self.vulnerabilities: List[Vulnerability] = []
self.dedup_hashes = set()
def normalize_semgrep(self, results: Dict) -> List[Vulnerability]:
"""Normalize Semgrep results."""
vulns = []
for result in results.get('results', []):
vuln = Vulnerability(
id=self._generate_id(result),
title=result.get('check_id', 'Unknown'),
description=result.get('extra', {}).get('message', ''),
severity=self._map_severity(result.get('extra', {}).get('severity', 'INFO')),
cvss_score=None,
cve_ids=[],
cwe_ids=result.get('extra', {}).get('metadata', {}).get('cwe', []),
affected_component=result.get('path', ''),
affected_version='N/A',
file_path=result.get('path'),
line_number=result.get('start', {}).get('line'),
source_scanner='semgrep',
first_detected=datetime.utcnow().isoformat(),
remediation=result.get('extra', {}).get('fix'),
references=result.get('extra', {}).get('metadata', {}).get('references', [])
)
vulns.append(vuln)
return vulns
def normalize_snyk(self, results: Dict) -> List[Vulnerability]:
"""Normalize Snyk SCA results."""
vulns = []
for vuln_data in results.get('vulnerabilities', []):
vuln = Vulnerability(
id=vuln_data.get('id', self._generate_id(vuln_data)),
title=vuln_data.get('title', 'Unknown'),
description=vuln_data.get('description', ''),
severity=self._map_severity(vuln_data.get('severity', 'low')),
cvss_score=vuln_data.get('cvssScore'),
cve_ids=vuln_data.get('identifiers', {}).get('CVE', []),
cwe_ids=vuln_data.get('identifiers', {}).get('CWE', []),
affected_component=vuln_data.get('packageName', ''),
affected_version=vuln_data.get('version', ''),
file_path=vuln_data.get('from', [''])[0] if vuln_data.get('from') else None,
line_number=None,
source_scanner='snyk',
first_detected=datetime.utcnow().isoformat(),
remediation=vuln_data.get('fixedIn', [None])[0] if vuln_data.get('fixedIn') else None,
references=[vuln_data.get('url', '')]
)
vulns.append(vuln)
return vulns
def normalize_trivy(self, results: Dict) -> List[Vulnerability]:
"""Normalize Trivy results."""
vulns = []
for result in results.get('Results', []):
target = result.get('Target', '')
for vuln_data in result.get('Vulnerabilities', []):
vuln = Vulnerability(
id=vuln_data.get('VulnerabilityID', self._generate_id(vuln_data)),
title=vuln_data.get('Title', 'Unknown'),
description=vuln_data.get('Description', ''),
severity=self._map_severity(vuln_data.get('Severity', 'UNKNOWN')),
cvss_score=self._extract_cvss(vuln_data),
cve_ids=[vuln_data.get('VulnerabilityID', '')] if vuln_data.get('VulnerabilityID', '').startswith('CVE') else [],
cwe_ids=vuln_data.get('CweIDs', []),
affected_component=vuln_data.get('PkgName', ''),
affected_version=vuln_data.get('InstalledVersion', ''),
file_path=target,
line_number=None,
source_scanner='trivy',
first_detected=datetime.utcnow().isoformat(),
remediation=vuln_data.get('FixedVersion'),
references=vuln_data.get('References', [])
)
vulns.append(vuln)
return vulns
def deduplicate(self, vulnerabilities: List[Vulnerability]) -> List[Vulnerability]:
"""Remove duplicate vulnerabilities."""
unique_vulns = []
for vuln in vulnerabilities:
# Create dedup key
dedup_key = f"{vuln.affected_component}:{vuln.affected_version}:{','.join(vuln.cve_ids)}"
dedup_hash = hashlib.md5(dedup_key.encode()).hexdigest()
if dedup_hash not in self.dedup_hashes:
self.dedup_hashes.add(dedup_hash)
unique_vulns.append(vuln)
return unique_vulns
def _generate_id(self, data: Dict) -> str:
"""Generate unique ID for vulnerability."""
content = json.dumps(data, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()[:12]
def _map_severity(self, severity: str) -> Severity:
"""Map scanner severity to normalized severity."""
mapping = {
'CRITICAL': Severity.CRITICAL,
'HIGH': Severity.HIGH,
'MEDIUM': Severity.MEDIUM,
'MODERATE': Severity.MEDIUM,
'LOW': Severity.LOW,
'INFO': Severity.INFO,
'INFORMATIONAL': Severity.INFO
}
return mapping.get(severity.upper(), Severity.INFO)
def _extract_cvss(self, vuln_data: Dict) -> Optional[float]:
"""Extract CVSS score from vulnerability data."""
cvss = vuln_data.get('CVSS', {})
# Try different CVSS sources
for source in ['nvd', 'ghsa', 'redhat']:
if source in cvss and 'V3Score' in cvss[source]:
return cvss[source]['V3Score']
return NoneVulnerability Prioritization
Risk-Based Prioritization Framework
# vulnerability_prioritization.py
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
class ExploitMaturity(Enum):
UNPROVEN = 1
POC = 2
FUNCTIONAL = 3
HIGH = 4
WEAPONIZED = 5
class AssetCriticality(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class PrioritizationContext:
"""Context for vulnerability prioritization."""
asset_criticality: AssetCriticality
is_internet_facing: bool
has_sensitive_data: bool
exploit_available: bool
exploit_maturity: ExploitMaturity
in_kev: bool # CISA Known Exploited Vulnerabilities
epss_score: Optional[float] # Exploit Prediction Scoring System
class VulnerabilityPrioritizer:
"""Prioritize vulnerabilities based on risk context."""
def __init__(self):
self.kev_catalog = self._load_kev_catalog()
self.epss_scores = self._load_epss_scores()
def calculate_priority_score(
self,
vulnerability: Vulnerability,
context: PrioritizationContext
) -> float:
"""Calculate priority score (0-100)."""
score = 0
# Base score from CVSS (0-40 points)
if vulnerability.cvss_score:
score += vulnerability.cvss_score * 4
# Exploit maturity (0-20 points)
score += context.exploit_maturity.value * 4
# Asset criticality (0-20 points)
score += context.asset_criticality.value * 5
# Environmental factors (0-20 points)
if context.is_internet_facing:
score += 10
if context.has_sensitive_data:
score += 5
if context.in_kev:
score += 15 # Significant boost for known exploited
# EPSS adjustment (-10 to +10)
if context.epss_score:
if context.epss_score > 0.9:
score += 10
elif context.epss_score > 0.7:
score += 5
elif context.epss_score < 0.1:
score -= 10
return min(max(score, 0), 100)
def prioritize_vulnerabilities(
self,
vulnerabilities: List[Vulnerability],
asset_context: Dict
) -> List[Dict]:
"""Prioritize list of vulnerabilities."""
prioritized = []
for vuln in vulnerabilities:
# Build context
context = self._build_context(vuln, asset_context)
# Calculate score
score = self.calculate_priority_score(vuln, context)
# Determine SLA
sla = self._determine_sla(score, context)
prioritized.append({
'vulnerability': vuln,
'priority_score': score,
'context': context,
'sla': sla,
'recommended_action': self._recommend_action(vuln, score)
})
# Sort by priority score descending
prioritized.sort(key=lambda x: x['priority_score'], reverse=True)
return prioritized
def _build_context(
self,
vuln: Vulnerability,
asset_context: Dict
) -> PrioritizationContext:
"""Build prioritization context for vulnerability."""
# Check KEV catalog
in_kev = any(cve in self.kev_catalog for cve in vuln.cve_ids)
# Get EPSS score
epss_score = None
for cve in vuln.cve_ids:
if cve in self.epss_scores:
epss_score = self.epss_scores[cve]
break
return PrioritizationContext(
asset_criticality=AssetCriticality[asset_context.get('criticality', 'MEDIUM')],
is_internet_facing=asset_context.get('internet_facing', False),
has_sensitive_data=asset_context.get('sensitive_data', False),
exploit_available=vuln.cve_ids and in_kev,
exploit_maturity=self._get_exploit_maturity(vuln),
in_kev=in_kev,
epss_score=epss_score
)
def _determine_sla(self, score: float, context: PrioritizationContext) -> Dict:
"""Determine remediation SLA based on priority."""
if score >= 90 or context.in_kev:
return {'days': 1, 'priority': 'P1', 'description': 'Critical - Immediate action required'}
elif score >= 70:
return {'days': 7, 'priority': 'P2', 'description': 'High - Remediate within 1 week'}
elif score >= 50:
return {'days': 30, 'priority': 'P3', 'description': 'Medium - Remediate within 1 month'}
elif score >= 30:
return {'days': 90, 'priority': 'P4', 'description': 'Low - Remediate within quarter'}
else:
return {'days': 180, 'priority': 'P5', 'description': 'Informational - Address as able'}
def _recommend_action(self, vuln: Vulnerability, score: float) -> str:
"""Recommend remediation action."""
if vuln.remediation:
if score >= 70:
return f"URGENT: Upgrade to {vuln.remediation}"
else:
return f"Upgrade to {vuln.remediation}"
if score >= 90:
return "Apply emergency mitigation - see references"
elif score >= 70:
return "Investigate and apply vendor patch or workaround"
else:
return "Monitor for updates and plan remediation"
def _get_exploit_maturity(self, vuln: Vulnerability) -> ExploitMaturity:
"""Determine exploit maturity level."""
# In practice, query exploit databases
return ExploitMaturity.UNPROVEN
def _load_kev_catalog(self) -> set:
"""Load CISA KEV catalog."""
# Fetch from https://www.cisa.gov/known-exploited-vulnerabilities-catalog
return set()
def _load_epss_scores(self) -> Dict[str, float]:
"""Load EPSS scores."""
# Fetch from https://www.first.org/epss
return {}Automated Remediation
Dependency Update Automation
# auto_remediation.py
import subprocess
import json
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class RemediationResult:
"""Result of remediation attempt."""
vulnerability_id: str
success: bool
action_taken: str
old_version: str
new_version: Optional[str]
error: Optional[str]
requires_review: bool
class DependencyRemediator:
"""Automated dependency vulnerability remediation."""
def __init__(self, config: Dict):
self.config = config
self.dry_run = config.get('dry_run', True)
self.max_major_upgrade = config.get('max_major_upgrade', False)
def remediate_npm(
self,
vulnerabilities: List[Vulnerability]
) -> List[RemediationResult]:
"""Remediate npm vulnerabilities."""
results = []
for vuln in vulnerabilities:
if vuln.source_scanner not in ['snyk', 'npm_audit', 'trivy']:
continue
# Get current version
current = self._get_npm_version(vuln.affected_component)
# Determine safe upgrade
target = self._find_safe_version(
vuln.affected_component,
vuln.remediation,
current
)
if not target:
results.append(RemediationResult(
vulnerability_id=vuln.id,
success=False,
action_taken='none',
old_version=current or 'unknown',
new_version=None,
error='No safe upgrade path found',
requires_review=True
))
continue
# Check if upgrade is acceptable
if not self._is_upgrade_acceptable(current, target):
results.append(RemediationResult(
vulnerability_id=vuln.id,
success=False,
action_taken='skipped',
old_version=current,
new_version=target,
error='Major version upgrade requires manual review',
requires_review=True
))
continue
# Perform upgrade
if not self.dry_run:
success = self._npm_upgrade(vuln.affected_component, target)
else:
success = True
results.append(RemediationResult(
vulnerability_id=vuln.id,
success=success,
action_taken='upgrade' if not self.dry_run else 'dry_run',
old_version=current,
new_version=target,
error=None,
requires_review=False
))
return results
def remediate_python(
self,
vulnerabilities: List[Vulnerability]
) -> List[RemediationResult]:
"""Remediate Python vulnerabilities."""
results = []
for vuln in vulnerabilities:
current = self._get_pip_version(vuln.affected_component)
target = vuln.remediation
if not target:
results.append(RemediationResult(
vulnerability_id=vuln.id,
success=False,
action_taken='none',
old_version=current or 'unknown',
new_version=None,
error='No fix version available',
requires_review=True
))
continue
if not self.dry_run:
success = self._pip_upgrade(vuln.affected_component, target)
else:
success = True
results.append(RemediationResult(
vulnerability_id=vuln.id,
success=success,
action_taken='upgrade' if not self.dry_run else 'dry_run',
old_version=current,
new_version=target,
error=None,
requires_review=False
))
return results
def create_remediation_pr(
self,
results: List[RemediationResult],
branch_name: str
) -> Dict:
"""Create pull request with remediation changes."""
successful = [r for r in results if r.success and r.action_taken == 'upgrade']
if not successful:
return {'created': False, 'reason': 'No successful remediations'}
# Generate PR description
description = self._generate_pr_description(successful)
# Create branch and PR
pr_data = {
'title': f'Security: Remediate {len(successful)} vulnerabilities',
'body': description,
'branch': branch_name,
'base': 'main',
'labels': ['security', 'automated']
}
return pr_data
def _generate_pr_description(self, results: List[RemediationResult]) -> str:
"""Generate PR description for remediation."""
lines = [
'## Security Vulnerability Remediation',
'',
'This PR addresses the following vulnerabilities:',
''
]
for result in results:
lines.append(f'- `{result.vulnerability_id}`: {result.old_version} → {result.new_version}')
lines.extend([
'',
'### Testing',
'- [ ] Unit tests pass',
'- [ ] Integration tests pass',
'- [ ] No breaking changes detected',
'',
'### Notes',
'This PR was automatically generated by the vulnerability remediation system.',
'Please review changes before merging.'
])
return '\n'.join(lines)
def _get_npm_version(self, package: str) -> Optional[str]:
"""Get current npm package version."""
try:
result = subprocess.run(
['npm', 'list', package, '--json'],
capture_output=True, text=True
)
data = json.loads(result.stdout)
return data.get('dependencies', {}).get(package, {}).get('version')
except:
return None
def _npm_upgrade(self, package: str, version: str) -> bool:
"""Upgrade npm package."""
try:
subprocess.run(
['npm', 'install', f'{package}@{version}'],
check=True
)
return True
except:
return False
def _is_upgrade_acceptable(self, current: str, target: str) -> bool:
"""Check if upgrade is within acceptable bounds."""
if not current or not target:
return False
current_parts = current.split('.')
target_parts = target.split('.')
# Check major version
if int(target_parts[0]) > int(current_parts[0]):
return self.max_major_upgrade
return True
def _find_safe_version(
self,
package: str,
fixed_version: Optional[str],
current_version: Optional[str]
) -> Optional[str]:
"""Find safe version to upgrade to."""
if fixed_version:
return fixed_version
# Query npm for latest patch version
# Implementation depends on your requirements
return None
def _get_pip_version(self, package: str) -> Optional[str]:
"""Get current pip package version."""
try:
result = subprocess.run(
['pip', 'show', package],
capture_output=True, text=True
)
for line in result.stdout.split('\n'):
if line.startswith('Version:'):
return line.split(':')[1].strip()
except:
return None
def _pip_upgrade(self, package: str, version: str) -> bool:
"""Upgrade pip package."""
try:
subprocess.run(
['pip', 'install', f'{package}=={version}'],
check=True
)
return True
except:
return FalseMetrics and Reporting
Vulnerability Metrics Dashboard
# vulnerability_metrics.py
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime, timedelta
from collections import defaultdict
@dataclass
class VulnerabilityMetrics:
"""Vulnerability management metrics."""
total_open: int
critical_open: int
high_open: int
medium_open: int
low_open: int
mttr_critical: float # Mean Time to Remediate
mttr_high: float
sla_compliance_rate: float
new_this_week: int
closed_this_week: int
overdue_count: int
class MetricsCalculator:
"""Calculate vulnerability management metrics."""
def __init__(self, vulnerability_store):
self.store = vulnerability_store
def calculate_metrics(self, as_of: datetime = None) -> VulnerabilityMetrics:
"""Calculate current vulnerability metrics."""
as_of = as_of or datetime.utcnow()
vulns = self.store.get_all_open()
# Count by severity
by_severity = defaultdict(int)
for v in vulns:
by_severity[v.severity.value] += 1
# Calculate MTTR
closed_vulns = self.store.get_closed(
since=as_of - timedelta(days=90)
)
mttr_critical = self._calculate_mttr(closed_vulns, Severity.CRITICAL)
mttr_high = self._calculate_mttr(closed_vulns, Severity.HIGH)
# SLA compliance
sla_compliance = self._calculate_sla_compliance(closed_vulns)
# Weekly activity
week_ago = as_of - timedelta(days=7)
new_this_week = len(self.store.get_opened_since(week_ago))
closed_this_week = len(self.store.get_closed_since(week_ago))
# Overdue
overdue = len([v for v in vulns if self._is_overdue(v, as_of)])
return VulnerabilityMetrics(
total_open=len(vulns),
critical_open=by_severity['critical'],
high_open=by_severity['high'],
medium_open=by_severity['medium'],
low_open=by_severity['low'],
mttr_critical=mttr_critical,
mttr_high=mttr_high,
sla_compliance_rate=sla_compliance,
new_this_week=new_this_week,
closed_this_week=closed_this_week,
overdue_count=overdue
)
def generate_trend_report(
self,
start_date: datetime,
end_date: datetime,
interval: str = 'week'
) -> List[Dict]:
"""Generate trend report over time period."""
trends = []
current = start_date
while current <= end_date:
metrics = self.calculate_metrics(as_of=current)
trends.append({
'date': current.isoformat(),
'total_open': metrics.total_open,
'critical': metrics.critical_open,
'high': metrics.high_open,
'mttr_critical': metrics.mttr_critical
})
if interval == 'week':
current += timedelta(weeks=1)
elif interval == 'day':
current += timedelta(days=1)
else:
current += timedelta(days=30)
return trends
def _calculate_mttr(
self,
closed_vulns: List,
severity: Severity
) -> float:
"""Calculate Mean Time to Remediate for severity level."""
relevant = [
v for v in closed_vulns
if v.severity == severity
]
if not relevant:
return 0
total_days = sum(
(v.closed_at - v.first_detected).days
for v in relevant
)
return total_days / len(relevant)
def _calculate_sla_compliance(self, closed_vulns: List) -> float:
"""Calculate SLA compliance rate."""
if not closed_vulns:
return 100.0
compliant = sum(
1 for v in closed_vulns
if self._was_within_sla(v)
)
return (compliant / len(closed_vulns)) * 100
def _was_within_sla(self, vuln) -> bool:
"""Check if vulnerability was remediated within SLA."""
sla_days = {
'critical': 1,
'high': 7,
'medium': 30,
'low': 90
}
allowed_days = sla_days.get(vuln.severity.value, 90)
actual_days = (vuln.closed_at - vuln.first_detected).days
return actual_days <= allowed_days
def _is_overdue(self, vuln, as_of: datetime) -> bool:
"""Check if vulnerability is past SLA."""
sla_days = {
'critical': 1,
'high': 7,
'medium': 30,
'low': 90
}
allowed_days = sla_days.get(vuln.severity.value, 90)
days_open = (as_of - vuln.first_detected).days
return days_open > allowed_daysConclusion
Effective vulnerability management in DevSecOps requires:
- Comprehensive Detection - Multiple scanners covering code, dependencies, containers, and IaC
- Intelligent Prioritization - Risk-based scoring considering exploitability and asset context
- Automated Remediation - Auto-patching where safe, with human review for major changes
- Continuous Monitoring - Track metrics, trends, and SLA compliance
By implementing these practices, organizations can systematically reduce their vulnerability exposure while maintaining development velocity.
Is your AI system compliant with the EU AI Act? Free risk assessment - find out in 2 minutes →