Vulnerability Management in DevSecOps: From Detection to Remediation
Effective vulnerability management requires more than running scanners - it demands a systematic approach to detection, prioritization, and remediation integrated into your development workflow.
Vulnerability Detection Pipeline
Multi-Scanner Integration
# .github/workflows/vulnerability-scan.yml
name: Comprehensive Security Scan
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
schedule:
- cron: '0 6 * * *' # Daily at 6 AM
jobs:
sast:
name: Static Application Security Testing
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: SonarQube Scan
uses: sonarsource/sonarqube-scan-action@master
env:
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
SONAR_HOST_URL: ${{ secrets.SONAR_HOST_URL }}
- name: Semgrep
uses: semgrep/semgrep-action@v1
with:
config: >-
p/security-audit
p/secrets
p/owasp-top-ten
p/cwe-top-25
- name: CodeQL Analysis
uses: github/codeql-action/analyze@v3
with:
category: "/language:${{ matrix.language }}"
sca:
name: Software Composition Analysis
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Snyk SCA
uses: snyk/actions/node@master
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
with:
args: --severity-threshold=medium --json-file-output=snyk-results.json
- name: OWASP Dependency Check
uses: dependency-check/Dependency-Check_Action@main
with:
project: 'my-project'
path: '.'
format: 'JSON'
- name: Trivy Filesystem Scan
uses: aquasecurity/trivy-action@master
with:
scan-type: 'fs'
scan-ref: '.'
format: 'json'
output: 'trivy-fs-results.json'
container:
name: Container Security
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build Image
run: docker build -t app:${{ github.sha }} .
- name: Trivy Container Scan
uses: aquasecurity/trivy-action@master
with:
image-ref: 'app:${{ github.sha }}'
format: 'json'
output: 'trivy-container-results.json'
severity: 'CRITICAL,HIGH,MEDIUM'
- name: Grype Scan
uses: anchore/scan-action@v3
with:
image: 'app:${{ github.sha }}'
fail-build: false
output-format: json
iac:
name: Infrastructure as Code Security
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: tfsec
uses: aquasecurity/tfsec-action@v1.0.3
with:
additional_args: --format json --out tfsec-results.json
- name: Checkov
uses: bridgecrewio/checkov-action@v12
with:
framework: terraform,kubernetes,dockerfile
output_format: json
output_file_path: checkov-results.json
aggregate:
name: Aggregate Results
needs: [sast, sca, container, iac]
runs-on: ubuntu-latest
steps:
- name: Download Artifacts
uses: actions/download-artifact@v4
- name: Normalize and Aggregate
run: |
python scripts/aggregate_vulns.py \
--sast sast-results/*.json \
--sca sca-results/*.json \
--container container-results/*.json \
--iac iac-results/*.json \
--output aggregated-vulns.json
- name: Upload to Vulnerability Management Platform
run: |
curl -X POST ${{ secrets.VULN_PLATFORM_URL }}/api/import \
-H "Authorization: Bearer ${{ secrets.VULN_PLATFORM_TOKEN }}" \
-F "file=@aggregated-vulns.json"Vulnerability Aggregation and Normalization
# aggregate_vulns.py
import json
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
import hashlib
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class Vulnerability:
"""Normalized vulnerability record."""
id: str
title: str
description: str
severity: Severity
cvss_score: Optional[float]
cve_ids: List[str]
cwe_ids: List[str]
affected_component: str
affected_version: str
file_path: Optional[str]
line_number: Optional[int]
source_scanner: str
first_detected: str
remediation: Optional[str]
references: List[str]
class VulnerabilityAggregator:
"""Aggregate and normalize vulnerabilities from multiple scanners."""
def __init__(self):
self.vulnerabilities: List[Vulnerability] = []
self.dedup_hashes = set()
def normalize_semgrep(self, results: Dict) -> List[Vulnerability]:
"""Normalize Semgrep results."""
vulns = []
for result in results.get('results', []):
vuln = Vulnerability(
id=self._generate_id(result),
title=result.get('check_id', 'Unknown'),
description=result.get('extra', {}).get('message', ''),
severity=self._map_severity(result.get('extra', {}).get('severity', 'INFO')),
cvss_score=None,
cve_ids=[],
cwe_ids=result.get('extra', {}).get('metadata', {}).get('cwe', []),
affected_component=result.get('path', ''),
affected_version='N/A',
file_path=result.get('path'),
line_number=result.get('start', {}).get('line'),
source_scanner='semgrep',
first_detected=datetime.utcnow().isoformat(),
remediation=result.get('extra', {}).get('fix'),
references=result.get('extra', {}).get('metadata', {}).get('references', [])
)
vulns.append(vuln)
return vulns
def normalize_snyk(self, results: Dict) -> List[Vulnerability]:
"""Normalize Snyk SCA results."""
vulns = []
for vuln_data in results.get('vulnerabilities', []):
vuln = Vulnerability(
id=vuln_data.get('id', self._generate_id(vuln_data)),
title=vuln_data.get('title', 'Unknown'),
description=vuln_data.get('description', ''),
severity=self._map_severity(vuln_data.get('severity', 'low')),
cvss_score=vuln_data.get('cvssScore'),
cve_ids=vuln_data.get('identifiers', {}).get('CVE', []),
cwe_ids=vuln_data.get('identifiers', {}).get('CWE', []),
affected_component=vuln_data.get('packageName', ''),
affected_version=vuln_data.get('version', ''),
file_path=vuln_data.get('from', [''])[0] if vuln_data.get('from') else None,
line_number=None,
source_scanner='snyk',
first_detected=datetime.utcnow().isoformat(),
remediation=vuln_data.get('fixedIn', [None])[0] if vuln_data.get('fixedIn') else None,
references=[vuln_data.get('url', '')]
)
vulns.append(vuln)
return vulns
def normalize_trivy(self, results: Dict) -> List[Vulnerability]:
"""Normalize Trivy results."""
vulns = []
for result in results.get('Results', []):
target = result.get('Target', '')
for vuln_data in result.get('Vulnerabilities', []):
vuln = Vulnerability(
id=vuln_data.get('VulnerabilityID', self._generate_id(vuln_data)),
title=vuln_data.get('Title', 'Unknown'),
description=vuln_data.get('Description', ''),
severity=self._map_severity(vuln_data.get('Severity', 'UNKNOWN')),
cvss_score=self._extract_cvss(vuln_data),
cve_ids=[vuln_data.get('VulnerabilityID', '')] if vuln_data.get('VulnerabilityID', '').startswith('CVE') else [],
cwe_ids=vuln_data.get('CweIDs', []),
affected_component=vuln_data.get('PkgName', ''),
affected_version=vuln_data.get('InstalledVersion', ''),
file_path=target,
line_number=None,
source_scanner='trivy',
first_detected=datetime.utcnow().isoformat(),
remediation=vuln_data.get('FixedVersion'),
references=vuln_data.get('References', [])
)
vulns.append(vuln)
return vulns
def deduplicate(self, vulnerabilities: List[Vulnerability]) -> List[Vulnerability]:
"""Remove duplicate vulnerabilities."""
unique_vulns = []
for vuln in vulnerabilities:
# Create dedup key
dedup_key = f"{vuln.affected_component}:{vuln.affected_version}:{','.join(vuln.cve_ids)}"
dedup_hash = hashlib.md5(dedup_key.encode()).hexdigest()
if dedup_hash not in self.dedup_hashes:
self.dedup_hashes.add(dedup_hash)
unique_vulns.append(vuln)
return unique_vulns
def _generate_id(self, data: Dict) -> str:
"""Generate unique ID for vulnerability."""
content = json.dumps(data, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()[:12]
def _map_severity(self, severity: str) -> Severity:
"""Map scanner severity to normalized severity."""
mapping = {
'CRITICAL': Severity.CRITICAL,
'HIGH': Severity.HIGH,
'MEDIUM': Severity.MEDIUM,
'MODERATE': Severity.MEDIUM,
'LOW': Severity.LOW,
'INFO': Severity.INFO,
'INFORMATIONAL': Severity.INFO
}
return mapping.get(severity.upper(), Severity.INFO)
def _extract_cvss(self, vuln_data: Dict) -> Optional[float]:
"""Extract CVSS score from vulnerability data."""
cvss = vuln_data.get('CVSS', {})
# Try different CVSS sources
for source in ['nvd', 'ghsa', 'redhat']:
if source in cvss and 'V3Score' in cvss[source]:
return cvss[source]['V3Score']
return NoneVulnerability Prioritization
Risk-Based Prioritization Framework
# vulnerability_prioritization.py
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
class ExploitMaturity(Enum):
UNPROVEN = 1
POC = 2
FUNCTIONAL = 3
HIGH = 4
WEAPONIZED = 5
class AssetCriticality(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class PrioritizationContext:
"""Context for vulnerability prioritization."""
asset_criticality: AssetCriticality
is_internet_facing: bool
has_sensitive_data: bool
exploit_available: bool
exploit_maturity: ExploitMaturity
in_kev: bool # CISA Known Exploited Vulnerabilities
epss_score: Optional[float] # Exploit Prediction Scoring System
class VulnerabilityPrioritizer:
"""Prioritize vulnerabilities based on risk context."""
def __init__(self):
self.kev_catalog = self._load_kev_catalog()
self.epss_scores = self._load_epss_scores()
def calculate_priority_score(
self,
vulnerability: Vulnerability,
context: PrioritizationContext
) -> float:
"""Calculate priority score (0-100)."""
score = 0
# Base score from CVSS (0-40 points)
if vulnerability.cvss_score:
score += vulnerability.cvss_score * 4
# Exploit maturity (0-20 points)
score += context.exploit_maturity.value * 4
# Asset criticality (0-20 points)
score += context.asset_criticality.value * 5
# Environmental factors (0-20 points)
if context.is_internet_facing:
score += 10
if context.has_sensitive_data:
score += 5
if context.in_kev:
score += 15 # Significant boost for known exploited
# EPSS adjustment (-10 to +10)
if context.epss_score:
if context.epss_score > 0.9:
score += 10
elif context.epss_score > 0.7:
score += 5
elif context.epss_score < 0.1:
score -= 10
return min(max(score, 0), 100)
def prioritize_vulnerabilities(
self,
vulnerabilities: List[Vulnerability],
asset_context: Dict
) -> List[Dict]:
"""Prioritize list of vulnerabilities."""
prioritized = []
for vuln in vulnerabilities:
# Build context
context = self._build_context(vuln, asset_context)
# Calculate score
score = self.calculate_priority_score(vuln, context)
# Determine SLA
sla = self._determine_sla(score, context)
prioritized.append({
'vulnerability': vuln,
'priority_score': score,
'context': context,
'sla': sla,
'recommended_action': self._recommend_action(vuln, score)
})
# Sort by priority score descending
prioritized.sort(key=lambda x: x['priority_score'], reverse=True)
return prioritized
def _build_context(
self,
vuln: Vulnerability,
asset_context: Dict
) -> PrioritizationContext:
"""Build prioritization context for vulnerability."""
# Check KEV catalog
in_kev = any(cve in self.kev_catalog for cve in vuln.cve_ids)
# Get EPSS score
epss_score = None
for cve in vuln.cve_ids:
if cve in self.epss_scores:
epss_score = self.epss_scores[cve]
break
return PrioritizationContext(
asset_criticality=AssetCriticality[asset_context.get('criticality', 'MEDIUM')],
is_internet_facing=asset_context.get('internet_facing', False),
has_sensitive_data=asset_context.get('sensitive_data', False),
exploit_available=vuln.cve_ids and in_kev,
exploit_maturity=self._get_exploit_maturity(vuln),
in_kev=in_kev,
epss_score=epss_score
)
def _determine_sla(self, score: float, context: PrioritizationContext) -> Dict:
"""Determine remediation SLA based on priority."""
if score >= 90 or context.in_kev:
return {'days': 1, 'priority': 'P1', 'description': 'Critical - Immediate action required'}
elif score >= 70:
return {'days': 7, 'priority': 'P2', 'description': 'High - Remediate within 1 week'}
elif score >= 50:
return {'days': 30, 'priority': 'P3', 'description': 'Medium - Remediate within 1 month'}
elif score >= 30:
return {'days': 90, 'priority': 'P4', 'description': 'Low - Remediate within quarter'}
else:
return {'days': 180, 'priority': 'P5', 'description': 'Informational - Address as able'}
def _recommend_action(self, vuln: Vulnerability, score: float) -> str:
"""Recommend remediation action."""
if vuln.remediation:
if score >= 70:
return f"URGENT: Upgrade to {vuln.remediation}"
else:
return f"Upgrade to {vuln.remediation}"
if score >= 90:
return "Apply emergency mitigation - see references"
elif score >= 70:
return "Investigate and apply vendor patch or workaround"
else:
return "Monitor for updates and plan remediation"
def _get_exploit_maturity(self, vuln: Vulnerability) -> ExploitMaturity:
"""Determine exploit maturity level."""
# In practice, query exploit databases
return ExploitMaturity.UNPROVEN
def _load_kev_catalog(self) -> set:
"""Load CISA KEV catalog."""
# Fetch from https://www.cisa.gov/known-exploited-vulnerabilities-catalog
return set()
def _load_epss_scores(self) -> Dict[str, float]:
"""Load EPSS scores."""
# Fetch from https://www.first.org/epss
return {}Automated Remediation
Dependency Update Automation
# auto_remediation.py
import subprocess
import json
from typing import List, Dict, Optional
from dataclasses import dataclass
@dataclass
class RemediationResult:
"""Result of remediation attempt."""
vulnerability_id: str
success: bool
action_taken: str
old_version: str
new_version: Optional[str]
error: Optional[str]
requires_review: bool
class DependencyRemediator:
"""Automated dependency vulnerability remediation."""
def __init__(self, config: Dict):
self.config = config
self.dry_run = config.get('dry_run', True)
self.max_major_upgrade = config.get('max_major_upgrade', False)
def remediate_npm(
self,
vulnerabilities: List[Vulnerability]
) -> List[RemediationResult]:
"""Remediate npm vulnerabilities."""
results = []
for vuln in vulnerabilities:
if vuln.source_scanner not in ['snyk', 'npm_audit', 'trivy']:
continue
# Get current version
current = self._get_npm_version(vuln.affected_component)
# Determine safe upgrade
target = self._find_safe_version(
vuln.affected_component,
vuln.remediation,
current
)
if not target:
results.append(RemediationResult(
vulnerability_id=vuln.id,
success=False,
action_taken='none',
old_version=current or 'unknown',
new_version=None,
error='No safe upgrade path found',
requires_review=True
))
continue
# Check if upgrade is acceptable
if not self._is_upgrade_acceptable(current, target):
results.append(RemediationResult(
vulnerability_id=vuln.id,
success=False,
action_taken='skipped',
old_version=current,
new_version=target,
error='Major version upgrade requires manual review',
requires_review=True
))
continue
# Perform upgrade
if not self.dry_run:
success = self._npm_upgrade(vuln.affected_component, target)
else:
success = True
results.append(RemediationResult(
vulnerability_id=vuln.id,
success=success,
action_taken='upgrade' if not self.dry_run else 'dry_run',
old_version=current,
new_version=target,
error=None,
requires_review=False
))
return results
def remediate_python(
self,
vulnerabilities: List[Vulnerability]
) -> List[RemediationResult]:
"""Remediate Python vulnerabilities."""
results = []
for vuln in vulnerabilities:
current = self._get_pip_version(vuln.affected_component)
target = vuln.remediation
if not target:
results.append(RemediationResult(
vulnerability_id=vuln.id,
success=False,
action_taken='none',
old_version=current or 'unknown',
new_version=None,
error='No fix version available',
requires_review=True
))
continue
if not self.dry_run:
success = self._pip_upgrade(vuln.affected_component, target)
else:
success = True
results.append(RemediationResult(
vulnerability_id=vuln.id,
success=success,
action_taken='upgrade' if not self.dry_run else 'dry_run',
old_version=current,
new_version=target,
error=None,
requires_review=False
))
return results
def create_remediation_pr(
self,
results: List[RemediationResult],
branch_name: str
) -> Dict:
"""Create pull request with remediation changes."""
successful = [r for r in results if r.success and r.action_taken == 'upgrade']
if not successful:
return {'created': False, 'reason': 'No successful remediations'}
# Generate PR description
description = self._generate_pr_description(successful)
# Create branch and PR
pr_data = {
'title': f'Security: Remediate {len(successful)} vulnerabilities',
'body': description,
'branch': branch_name,
'base': 'main',
'labels': ['security', 'automated']
}
return pr_data
def _generate_pr_description(self, results: List[RemediationResult]) -> str:
"""Generate PR description for remediation."""
lines = [
'## Security Vulnerability Remediation',
'',
'This PR addresses the following vulnerabilities:',
''
]
for result in results:
lines.append(f'- `{result.vulnerability_id}`: {result.old_version} → {result.new_version}')
lines.extend([
'',
'### Testing',
'- [ ] Unit tests pass',
'- [ ] Integration tests pass',
'- [ ] No breaking changes detected',
'',
'### Notes',
'This PR was automatically generated by the vulnerability remediation system.',
'Please review changes before merging.'
])
return '\n'.join(lines)
def _get_npm_version(self, package: str) -> Optional[str]:
"""Get current npm package version."""
try:
result = subprocess.run(
['npm', 'list', package, '--json'],
capture_output=True, text=True
)
data = json.loads(result.stdout)
return data.get('dependencies', {}).get(package, {}).get('version')
except:
return None
def _npm_upgrade(self, package: str, version: str) -> bool:
"""Upgrade npm package."""
try:
subprocess.run(
['npm', 'install', f'{package}@{version}'],
check=True
)
return True
except:
return False
def _is_upgrade_acceptable(self, current: str, target: str) -> bool:
"""Check if upgrade is within acceptable bounds."""
if not current or not target:
return False
current_parts = current.split('.')
target_parts = target.split('.')
# Check major version
if int(target_parts[0]) > int(current_parts[0]):
return self.max_major_upgrade
return True
def _find_safe_version(
self,
package: str,
fixed_version: Optional[str],
current_version: Optional[str]
) -> Optional[str]:
"""Find safe version to upgrade to."""
if fixed_version:
return fixed_version
# Query npm for latest patch version
# Implementation depends on your requirements
return None
def _get_pip_version(self, package: str) -> Optional[str]:
"""Get current pip package version."""
try:
result = subprocess.run(
['pip', 'show', package],
capture_output=True, text=True
)
for line in result.stdout.split('\n'):
if line.startswith('Version:'):
return line.split(':')[1].strip()
except:
return None
def _pip_upgrade(self, package: str, version: str) -> bool:
"""Upgrade pip package."""
try:
subprocess.run(
['pip', 'install', f'{package}=={version}'],
check=True
)
return True
except:
return FalseMetrics and Reporting
Vulnerability Metrics Dashboard
# vulnerability_metrics.py
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime, timedelta
from collections import defaultdict
@dataclass
class VulnerabilityMetrics:
"""Vulnerability management metrics."""
total_open: int
critical_open: int
high_open: int
medium_open: int
low_open: int
mttr_critical: float # Mean Time to Remediate
mttr_high: float
sla_compliance_rate: float
new_this_week: int
closed_this_week: int
overdue_count: int
class MetricsCalculator:
"""Calculate vulnerability management metrics."""
def __init__(self, vulnerability_store):
self.store = vulnerability_store
def calculate_metrics(self, as_of: datetime = None) -> VulnerabilityMetrics:
"""Calculate current vulnerability metrics."""
as_of = as_of or datetime.utcnow()
vulns = self.store.get_all_open()
# Count by severity
by_severity = defaultdict(int)
for v in vulns:
by_severity[v.severity.value] += 1
# Calculate MTTR
closed_vulns = self.store.get_closed(
since=as_of - timedelta(days=90)
)
mttr_critical = self._calculate_mttr(closed_vulns, Severity.CRITICAL)
mttr_high = self._calculate_mttr(closed_vulns, Severity.HIGH)
# SLA compliance
sla_compliance = self._calculate_sla_compliance(closed_vulns)
# Weekly activity
week_ago = as_of - timedelta(days=7)
new_this_week = len(self.store.get_opened_since(week_ago))
closed_this_week = len(self.store.get_closed_since(week_ago))
# Overdue
overdue = len([v for v in vulns if self._is_overdue(v, as_of)])
return VulnerabilityMetrics(
total_open=len(vulns),
critical_open=by_severity['critical'],
high_open=by_severity['high'],
medium_open=by_severity['medium'],
low_open=by_severity['low'],
mttr_critical=mttr_critical,
mttr_high=mttr_high,
sla_compliance_rate=sla_compliance,
new_this_week=new_this_week,
closed_this_week=closed_this_week,
overdue_count=overdue
)
def generate_trend_report(
self,
start_date: datetime,
end_date: datetime,
interval: str = 'week'
) -> List[Dict]:
"""Generate trend report over time period."""
trends = []
current = start_date
while current <= end_date:
metrics = self.calculate_metrics(as_of=current)
trends.append({
'date': current.isoformat(),
'total_open': metrics.total_open,
'critical': metrics.critical_open,
'high': metrics.high_open,
'mttr_critical': metrics.mttr_critical
})
if interval == 'week':
current += timedelta(weeks=1)
elif interval == 'day':
current += timedelta(days=1)
else:
current += timedelta(days=30)
return trends
def _calculate_mttr(
self,
closed_vulns: List,
severity: Severity
) -> float:
"""Calculate Mean Time to Remediate for severity level."""
relevant = [
v for v in closed_vulns
if v.severity == severity
]
if not relevant:
return 0
total_days = sum(
(v.closed_at - v.first_detected).days
for v in relevant
)
return total_days / len(relevant)
def _calculate_sla_compliance(self, closed_vulns: List) -> float:
"""Calculate SLA compliance rate."""
if not closed_vulns:
return 100.0
compliant = sum(
1 for v in closed_vulns
if self._was_within_sla(v)
)
return (compliant / len(closed_vulns)) * 100
def _was_within_sla(self, vuln) -> bool:
"""Check if vulnerability was remediated within SLA."""
sla_days = {
'critical': 1,
'high': 7,
'medium': 30,
'low': 90
}
allowed_days = sla_days.get(vuln.severity.value, 90)
actual_days = (vuln.closed_at - vuln.first_detected).days
return actual_days <= allowed_days
def _is_overdue(self, vuln, as_of: datetime) -> bool:
"""Check if vulnerability is past SLA."""
sla_days = {
'critical': 1,
'high': 7,
'medium': 30,
'low': 90
}
allowed_days = sla_days.get(vuln.severity.value, 90)
days_open = (as_of - vuln.first_detected).days
return days_open > allowed_daysConclusion
Effective vulnerability management in DevSecOps requires:
- Comprehensive Detection - Multiple scanners covering code, dependencies, containers, and IaC
- Intelligent Prioritization - Risk-based scoring considering exploitability and asset context
- Automated Remediation - Auto-patching where safe, with human review for major changes
- Continuous Monitoring - Track metrics, trends, and SLA compliance
By implementing these practices, organizations can systematically reduce their vulnerability exposure while maintaining development velocity.