Combining SAST and DAST provides comprehensive application security coverage. This guide covers integrating both testing methodologies into CI/CD pipelines with unified vulnerability management.
Unified Security Testing Framework
Build a framework coordinating SAST and DAST:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set
from enum import Enum
from datetime import datetime
from abc import ABC, abstractmethod
import hashlib
import json
class VulnerabilityType(Enum):
SQL_INJECTION = "sql_injection"
XSS = "cross_site_scripting"
COMMAND_INJECTION = "command_injection"
PATH_TRAVERSAL = "path_traversal"
SSRF = "server_side_request_forgery"
INSECURE_DESERIALIZATION = "insecure_deserialization"
BROKEN_AUTH = "broken_authentication"
SENSITIVE_DATA_EXPOSURE = "sensitive_data_exposure"
XXE = "xml_external_entity"
SECURITY_MISCONFIGURATION = "security_misconfiguration"
class Severity(Enum):
CRITICAL = 4
HIGH = 3
MEDIUM = 2
LOW = 1
INFO = 0
class TestingPhase(Enum):
SAST = "static"
DAST = "dynamic"
IAST = "interactive"
SCA = "composition"
@dataclass
class CodeLocation:
file_path: str
line_number: int
column: Optional[int] = None
snippet: Optional[str] = None
function_name: Optional[str] = None
@dataclass
class HttpLocation:
url: str
method: str
parameter: Optional[str] = None
request_headers: Optional[Dict[str, str]] = None
response_code: Optional[int] = None
@dataclass
class Vulnerability:
id: str
title: str
vuln_type: VulnerabilityType
severity: Severity
phase: TestingPhase
description: str
remediation: str
confidence: float
code_location: Optional[CodeLocation] = None
http_location: Optional[HttpLocation] = None
cwe_id: Optional[str] = None
cvss_score: Optional[float] = None
evidence: List[str] = field(default_factory=list)
first_seen: datetime = field(default_factory=datetime.utcnow)
correlated_ids: Set[str] = field(default_factory=set)
class SecurityScanner(ABC):
@abstractmethod
def scan(self, target: str, config: Dict) -> List[Vulnerability]:
pass
@abstractmethod
def get_scanner_name(self) -> str:
pass
class SASTScanner(SecurityScanner):
def __init__(self):
self.rules = self._load_rules()
def _load_rules(self) -> List[Dict]:
return [
{
"id": "SAST-001",
"pattern": r"execute\s*\(\s*['\"]?\s*\+|executeQuery\s*\(\s*['\"]?\s*\+",
"vuln_type": VulnerabilityType.SQL_INJECTION,
"severity": Severity.CRITICAL,
"message": "SQL query built with string concatenation",
"remediation": "Use parameterized queries or prepared statements"
},
{
"id": "SAST-002",
"pattern": r"innerHTML\s*=|document\.write\s*\(",
"vuln_type": VulnerabilityType.XSS,
"severity": Severity.HIGH,
"message": "Potential DOM-based XSS vulnerability",
"remediation": "Use textContent or sanitize input before rendering"
},
{
"id": "SAST-003",
"pattern": r"exec\s*\(|system\s*\(|popen\s*\(",
"vuln_type": VulnerabilityType.COMMAND_INJECTION,
"severity": Severity.CRITICAL,
"message": "Command execution with potential user input",
"remediation": "Use subprocess with shell=False and validate input"
},
{
"id": "SAST-004",
"pattern": r"open\s*\([^)]*\+|Path\s*\([^)]*\+",
"vuln_type": VulnerabilityType.PATH_TRAVERSAL,
"severity": Severity.HIGH,
"message": "File path constructed with user input",
"remediation": "Validate and sanitize file paths, use allowlists"
},
{
"id": "SAST-005",
"pattern": r"pickle\.loads|yaml\.load\s*\([^)]*Loader",
"vuln_type": VulnerabilityType.INSECURE_DESERIALIZATION,
"severity": Severity.CRITICAL,
"message": "Insecure deserialization detected",
"remediation": "Use safe_load for YAML, avoid pickle for untrusted data"
}
]
def scan(self, target: str, config: Dict) -> List[Vulnerability]:
import re
import os
findings = []
extensions = config.get('extensions', ['.py', '.js', '.ts', '.java'])
for root, dirs, files in os.walk(target):
dirs[:] = [d for d in dirs if d not in ['node_modules', '.git', 'venv']]
for file in files:
if not any(file.endswith(ext) for ext in extensions):
continue
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.split('\n')
except Exception:
continue
for rule in self.rules:
for i, line in enumerate(lines):
if re.search(rule['pattern'], line):
vuln_id = self._generate_id(file_path, i, rule['id'])
findings.append(Vulnerability(
id=vuln_id,
title=rule['message'],
vuln_type=rule['vuln_type'],
severity=rule['severity'],
phase=TestingPhase.SAST,
description=f"Found pattern matching {rule['id']}",
remediation=rule['remediation'],
confidence=0.8,
code_location=CodeLocation(
file_path=file_path,
line_number=i + 1,
snippet=line.strip()[:200]
),
cwe_id=self._get_cwe(rule['vuln_type']),
evidence=[f"Pattern: {rule['pattern']}", f"Match: {line.strip()[:100]}"]
))
return findings
def _generate_id(self, file_path: str, line: int, rule_id: str) -> str:
content = f"{file_path}:{line}:{rule_id}"
return f"SAST-{hashlib.sha256(content.encode()).hexdigest()[:12]}"
def _get_cwe(self, vuln_type: VulnerabilityType) -> str:
cwe_map = {
VulnerabilityType.SQL_INJECTION: "CWE-89",
VulnerabilityType.XSS: "CWE-79",
VulnerabilityType.COMMAND_INJECTION: "CWE-78",
VulnerabilityType.PATH_TRAVERSAL: "CWE-22",
VulnerabilityType.INSECURE_DESERIALIZATION: "CWE-502"
}
return cwe_map.get(vuln_type, "CWE-Unknown")
def get_scanner_name(self) -> str:
return "Custom SAST Scanner"
class DASTScanner(SecurityScanner):
def __init__(self):
self.payloads = self._load_payloads()
def _load_payloads(self) -> Dict[VulnerabilityType, List[str]]:
return {
VulnerabilityType.SQL_INJECTION: [
"' OR '1'='1",
"'; DROP TABLE users--",
"1' AND '1'='1",
"1 UNION SELECT NULL--"
],
VulnerabilityType.XSS: [
"<script>alert(1)</script>",
"<img src=x onerror=alert(1)>",
"javascript:alert(1)",
"<svg onload=alert(1)>"
],
VulnerabilityType.COMMAND_INJECTION: [
"; ls -la",
"| cat /etc/passwd",
"`id`",
"$(whoami)"
],
VulnerabilityType.PATH_TRAVERSAL: [
"../../../etc/passwd",
"....//....//etc/passwd",
"%2e%2e%2f%2e%2e%2fetc/passwd"
],
VulnerabilityType.SSRF: [
"http://localhost:22",
"http://169.254.169.254/latest/meta-data/",
"http://127.0.0.1:6379"
]
}
def scan(self, target: str, config: Dict) -> List[Vulnerability]:
import requests
from urllib.parse import urljoin, urlparse, parse_qs
findings = []
endpoints = config.get('endpoints', [])
timeout = config.get('timeout', 10)
for endpoint in endpoints:
url = urljoin(target, endpoint['path'])
method = endpoint.get('method', 'GET')
params = endpoint.get('params', {})
for vuln_type, payloads in self.payloads.items():
for param_name in params.keys():
for payload in payloads:
test_params = params.copy()
test_params[param_name] = payload
try:
if method == 'GET':
response = requests.get(url, params=test_params, timeout=timeout, verify=False)
else:
response = requests.post(url, data=test_params, timeout=timeout, verify=False)
if self._detect_vulnerability(response, vuln_type, payload):
vuln_id = self._generate_id(url, param_name, payload)
findings.append(Vulnerability(
id=vuln_id,
title=f"{vuln_type.value} in {param_name}",
vuln_type=vuln_type,
severity=self._get_severity(vuln_type),
phase=TestingPhase.DAST,
description=f"Parameter {param_name} vulnerable to {vuln_type.value}",
remediation=self._get_remediation(vuln_type),
confidence=0.9,
http_location=HttpLocation(
url=url,
method=method,
parameter=param_name,
response_code=response.status_code
),
cwe_id=self._get_cwe(vuln_type),
evidence=[f"Payload: {payload}", f"Response: {response.text[:200]}"]
))
except requests.RequestException:
continue
return findings
def _detect_vulnerability(self, response, vuln_type: VulnerabilityType, payload: str) -> bool:
indicators = {
VulnerabilityType.SQL_INJECTION: [
"SQL syntax", "mysql_fetch", "ORA-", "PostgreSQL",
"sqlite3.OperationalError", "syntax error"
],
VulnerabilityType.XSS: [payload],
VulnerabilityType.COMMAND_INJECTION: [
"root:", "uid=", "bin/bash", "Permission denied"
],
VulnerabilityType.PATH_TRAVERSAL: [
"root:x:", "[boot loader]", "daemon:"
],
VulnerabilityType.SSRF: ["metadata", "localhost", "internal"]
}
text = response.text.lower()
for indicator in indicators.get(vuln_type, []):
if indicator.lower() in text:
return True
return False
def _generate_id(self, url: str, param: str, payload: str) -> str:
content = f"{url}:{param}:{payload[:20]}"
return f"DAST-{hashlib.sha256(content.encode()).hexdigest()[:12]}"
def _get_severity(self, vuln_type: VulnerabilityType) -> Severity:
severity_map = {
VulnerabilityType.SQL_INJECTION: Severity.CRITICAL,
VulnerabilityType.COMMAND_INJECTION: Severity.CRITICAL,
VulnerabilityType.XSS: Severity.HIGH,
VulnerabilityType.PATH_TRAVERSAL: Severity.HIGH,
VulnerabilityType.SSRF: Severity.HIGH
}
return severity_map.get(vuln_type, Severity.MEDIUM)
def _get_cwe(self, vuln_type: VulnerabilityType) -> str:
cwe_map = {
VulnerabilityType.SQL_INJECTION: "CWE-89",
VulnerabilityType.XSS: "CWE-79",
VulnerabilityType.COMMAND_INJECTION: "CWE-78",
VulnerabilityType.PATH_TRAVERSAL: "CWE-22",
VulnerabilityType.SSRF: "CWE-918"
}
return cwe_map.get(vuln_type, "CWE-Unknown")
def _get_remediation(self, vuln_type: VulnerabilityType) -> str:
remediation_map = {
VulnerabilityType.SQL_INJECTION: "Use parameterized queries",
VulnerabilityType.XSS: "Encode output and validate input",
VulnerabilityType.COMMAND_INJECTION: "Avoid shell commands, use safe APIs",
VulnerabilityType.PATH_TRAVERSAL: "Validate and sanitize file paths",
VulnerabilityType.SSRF: "Validate URLs and use allowlists"
}
return remediation_map.get(vuln_type, "Review and fix the vulnerability")
def get_scanner_name(self) -> str:
return "Custom DAST Scanner"Vulnerability Correlation Engine
Correlate findings from SAST and DAST:
from typing import List, Dict, Tuple
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class CorrelatedVulnerability:
primary: Vulnerability
correlated: List[Vulnerability]
correlation_score: float
verified: bool
combined_severity: Severity
class VulnerabilityCorrelator:
def __init__(self):
self.correlation_rules = self._load_correlation_rules()
def _load_correlation_rules(self) -> List[Dict]:
return [
{
"sast_pattern": "SQL query built with string concatenation",
"dast_vuln_type": VulnerabilityType.SQL_INJECTION,
"weight": 1.0
},
{
"sast_pattern": "DOM-based XSS",
"dast_vuln_type": VulnerabilityType.XSS,
"weight": 0.9
},
{
"sast_pattern": "Command execution",
"dast_vuln_type": VulnerabilityType.COMMAND_INJECTION,
"weight": 1.0
},
{
"sast_pattern": "File path constructed",
"dast_vuln_type": VulnerabilityType.PATH_TRAVERSAL,
"weight": 0.85
}
]
def correlate(
self,
sast_findings: List[Vulnerability],
dast_findings: List[Vulnerability]
) -> List[CorrelatedVulnerability]:
correlated = []
used_dast = set()
sast_by_type = defaultdict(list)
for finding in sast_findings:
sast_by_type[finding.vuln_type].append(finding)
dast_by_type = defaultdict(list)
for finding in dast_findings:
dast_by_type[finding.vuln_type].append(finding)
for vuln_type in VulnerabilityType:
sast_vulns = sast_by_type.get(vuln_type, [])
dast_vulns = dast_by_type.get(vuln_type, [])
for sast_vuln in sast_vulns:
best_match = None
best_score = 0.0
for dast_vuln in dast_vulns:
if dast_vuln.id in used_dast:
continue
score = self._calculate_correlation_score(sast_vuln, dast_vuln)
if score > best_score and score > 0.5:
best_score = score
best_match = dast_vuln
if best_match:
used_dast.add(best_match.id)
sast_vuln.correlated_ids.add(best_match.id)
best_match.correlated_ids.add(sast_vuln.id)
correlated.append(CorrelatedVulnerability(
primary=sast_vuln,
correlated=[best_match],
correlation_score=best_score,
verified=True,
combined_severity=self._combine_severity(sast_vuln, best_match)
))
else:
correlated.append(CorrelatedVulnerability(
primary=sast_vuln,
correlated=[],
correlation_score=0.0,
verified=False,
combined_severity=sast_vuln.severity
))
for dast_vuln in dast_findings:
if dast_vuln.id not in used_dast:
correlated.append(CorrelatedVulnerability(
primary=dast_vuln,
correlated=[],
correlation_score=0.0,
verified=True,
combined_severity=dast_vuln.severity
))
return sorted(correlated, key=lambda x: (x.combined_severity.value, x.correlation_score), reverse=True)
def _calculate_correlation_score(
self,
sast_vuln: Vulnerability,
dast_vuln: Vulnerability
) -> float:
if sast_vuln.vuln_type != dast_vuln.vuln_type:
return 0.0
score = 0.5
if sast_vuln.code_location and dast_vuln.http_location:
file_name = sast_vuln.code_location.file_path.split('/')[-1]
endpoint = dast_vuln.http_location.url.split('/')[-1]
if self._names_related(file_name, endpoint):
score += 0.3
if sast_vuln.cwe_id == dast_vuln.cwe_id:
score += 0.2
return min(score, 1.0)
def _names_related(self, file_name: str, endpoint: str) -> bool:
file_base = file_name.replace('.py', '').replace('.js', '').lower()
endpoint_base = endpoint.split('?')[0].lower()
return file_base in endpoint_base or endpoint_base in file_base
def _combine_severity(self, v1: Vulnerability, v2: Vulnerability) -> Severity:
if v1.severity.value >= Severity.HIGH.value and v2.severity.value >= Severity.HIGH.value:
return Severity.CRITICAL
return max(v1.severity, v2.severity, key=lambda x: x.value)CI/CD Pipeline Integration
Integrate security testing into pipelines:
# .github/workflows/security-testing.yml
name: Application Security Testing
on:
pull_request:
branches: [main, develop]
push:
branches: [main]
jobs:
sast:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run Semgrep SAST
uses: returntocorp/semgrep-action@v1
with:
config: >-
p/security-audit
p/owasp-top-ten
p/python
p/javascript
- name: Run Bandit (Python)
run: |
pip install bandit
bandit -r src/ -f json -o bandit-results.json || true
- name: Run ESLint Security
run: |
npm install eslint-plugin-security
npx eslint --ext .js,.ts src/ -f json -o eslint-results.json || true
- name: Upload SAST Results
uses: actions/upload-artifact@v4
with:
name: sast-results
path: |
bandit-results.json
eslint-results.json
dast:
runs-on: ubuntu-latest
needs: build
services:
app:
image: ${{ needs.build.outputs.image }}
ports:
- 8080:8080
steps:
- uses: actions/checkout@v4
- name: Wait for Application
run: |
timeout 60 bash -c 'until curl -s http://localhost:8080/health; do sleep 2; done'
- name: Run OWASP ZAP
uses: zaproxy/action-full-scan@v0.8.0
with:
target: 'http://localhost:8080'
rules_file_name: '.zap/rules.tsv'
cmd_options: '-a -j'
- name: Run Nuclei
run: |
docker run --network host projectdiscovery/nuclei:latest \
-u http://localhost:8080 \
-t cves/ -t vulnerabilities/ \
-json -o nuclei-results.json
- name: Upload DAST Results
uses: actions/upload-artifact@v4
with:
name: dast-results
path: |
zap-report.json
nuclei-results.json
correlate:
runs-on: ubuntu-latest
needs: [sast, dast]
steps:
- uses: actions/checkout@v4
- name: Download All Results
uses: actions/download-artifact@v4
- name: Run Correlation Engine
run: |
python scripts/correlate_findings.py \
--sast-dir sast-results/ \
--dast-dir dast-results/ \
--output correlated-report.json
- name: Check Thresholds
run: |
python scripts/check_thresholds.py \
--report correlated-report.json \
--max-critical 0 \
--max-high 5
- name: Upload Correlated Report
uses: actions/upload-artifact@v4
with:
name: security-report
path: correlated-report.jsonUnified Reporting Dashboard
Generate comprehensive security reports:
from typing import List, Dict
from datetime import datetime
import json
class SecurityReportGenerator:
def __init__(self):
self.report_data = {}
def generate_report(
self,
correlated_findings: List[CorrelatedVulnerability],
project_name: str,
scan_config: Dict
) -> Dict:
report = {
"metadata": {
"project": project_name,
"generated_at": datetime.utcnow().isoformat(),
"scan_config": scan_config
},
"summary": self._generate_summary(correlated_findings),
"findings": self._format_findings(correlated_findings),
"trends": self._calculate_trends(correlated_findings),
"recommendations": self._generate_recommendations(correlated_findings)
}
self.report_data = report
return report
def _generate_summary(self, findings: List[CorrelatedVulnerability]) -> Dict:
severity_counts = {s.name: 0 for s in Severity}
verified_count = 0
correlated_count = 0
for finding in findings:
severity_counts[finding.combined_severity.name] += 1
if finding.verified:
verified_count += 1
if finding.correlation_score > 0:
correlated_count += 1
return {
"total_findings": len(findings),
"by_severity": severity_counts,
"verified_findings": verified_count,
"correlated_findings": correlated_count,
"verification_rate": verified_count / len(findings) if findings else 0,
"risk_score": self._calculate_risk_score(findings)
}
def _calculate_risk_score(self, findings: List[CorrelatedVulnerability]) -> float:
weights = {
Severity.CRITICAL: 10,
Severity.HIGH: 5,
Severity.MEDIUM: 2,
Severity.LOW: 1,
Severity.INFO: 0
}
total_score = sum(
weights[f.combined_severity] * (1.5 if f.verified else 1.0)
for f in findings
)
return min(total_score / 10, 100)
def _format_findings(self, findings: List[CorrelatedVulnerability]) -> List[Dict]:
formatted = []
for finding in findings:
primary = finding.primary
entry = {
"id": primary.id,
"title": primary.title,
"severity": finding.combined_severity.name,
"type": primary.vuln_type.value,
"phase": primary.phase.value,
"verified": finding.verified,
"correlation_score": finding.correlation_score,
"cwe": primary.cwe_id,
"description": primary.description,
"remediation": primary.remediation,
"evidence": primary.evidence
}
if primary.code_location:
entry["code_location"] = {
"file": primary.code_location.file_path,
"line": primary.code_location.line_number,
"snippet": primary.code_location.snippet
}
if primary.http_location:
entry["http_location"] = {
"url": primary.http_location.url,
"method": primary.http_location.method,
"parameter": primary.http_location.parameter
}
if finding.correlated:
entry["correlated_findings"] = [
{"id": c.id, "phase": c.phase.value}
for c in finding.correlated
]
formatted.append(entry)
return formatted
def _calculate_trends(self, findings: List[CorrelatedVulnerability]) -> Dict:
by_type = {}
for finding in findings:
vuln_type = finding.primary.vuln_type.value
if vuln_type not in by_type:
by_type[vuln_type] = {"count": 0, "severities": []}
by_type[vuln_type]["count"] += 1
by_type[vuln_type]["severities"].append(finding.combined_severity.name)
return {
"by_vulnerability_type": by_type,
"most_common": max(by_type.items(), key=lambda x: x[1]["count"])[0] if by_type else None
}
def _generate_recommendations(self, findings: List[CorrelatedVulnerability]) -> List[Dict]:
recommendations = []
critical_count = sum(1 for f in findings if f.combined_severity == Severity.CRITICAL)
if critical_count > 0:
recommendations.append({
"priority": "IMMEDIATE",
"action": f"Address {critical_count} critical vulnerabilities before deployment",
"impact": "Prevent potential data breach or system compromise"
})
verified_high = [f for f in findings if f.verified and f.combined_severity.value >= Severity.HIGH.value]
if verified_high:
recommendations.append({
"priority": "HIGH",
"action": f"Fix {len(verified_high)} verified high-severity issues",
"impact": "Reduce attack surface significantly"
})
sqli_findings = [f for f in findings if f.primary.vuln_type == VulnerabilityType.SQL_INJECTION]
if sqli_findings:
recommendations.append({
"priority": "HIGH",
"action": "Implement parameterized queries across all database interactions",
"impact": "Eliminate SQL injection attack vector"
})
return recommendations
def export_sarif(self) -> Dict:
"""Export findings in SARIF format for GitHub integration."""
sarif = {
"$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
"version": "2.1.0",
"runs": [{
"tool": {
"driver": {
"name": "Unified Security Scanner",
"version": "1.0.0",
"rules": []
}
},
"results": []
}]
}
rules_map = {}
for finding in self.report_data.get("findings", []):
rule_id = f"{finding['type']}-{finding['severity']}"
if rule_id not in rules_map:
rules_map[rule_id] = {
"id": rule_id,
"name": finding["title"],
"shortDescription": {"text": finding["description"]},
"defaultConfiguration": {"level": self._sarif_level(finding["severity"])}
}
sarif["runs"][0]["tool"]["driver"]["rules"].append(rules_map[rule_id])
result = {
"ruleId": rule_id,
"message": {"text": finding["description"]},
"level": self._sarif_level(finding["severity"])
}
if "code_location" in finding:
result["locations"] = [{
"physicalLocation": {
"artifactLocation": {"uri": finding["code_location"]["file"]},
"region": {"startLine": finding["code_location"]["line"]}
}
}]
sarif["runs"][0]["results"].append(result)
return sarif
def _sarif_level(self, severity: str) -> str:
level_map = {
"CRITICAL": "error",
"HIGH": "error",
"MEDIUM": "warning",
"LOW": "note",
"INFO": "note"
}
return level_map.get(severity, "warning")Conclusion
Combining SAST and DAST provides comprehensive security coverage. SAST finds issues early in code while DAST validates exploitability at runtime. Correlation between findings increases confidence and prioritizes remediation. Integrate both into CI/CD pipelines with unified reporting for continuous security assurance.