Combinarea SAST si DAST ofera acoperire completa a securitatii aplicatiilor. Acest ghid acopera integrarea ambelor metodologii de testare in pipeline-uri CI/CD cu management unificat al vulnerabilitatilor.
Framework Unificat de Testare a Securitatii
Construieste un framework care coordoneaza SAST si DAST:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set
from enum import Enum
from datetime import datetime
from abc import ABC, abstractmethod
import hashlib
import json
class VulnerabilityType(Enum):
SQL_INJECTION = "sql_injection"
XSS = "cross_site_scripting"
COMMAND_INJECTION = "command_injection"
PATH_TRAVERSAL = "path_traversal"
SSRF = "server_side_request_forgery"
INSECURE_DESERIALIZATION = "insecure_deserialization"
BROKEN_AUTH = "broken_authentication"
SENSITIVE_DATA_EXPOSURE = "sensitive_data_exposure"
XXE = "xml_external_entity"
SECURITY_MISCONFIGURATION = "security_misconfiguration"
class Severity(Enum):
CRITICAL = 4
HIGH = 3
MEDIUM = 2
LOW = 1
INFO = 0
class TestingPhase(Enum):
SAST = "static"
DAST = "dynamic"
IAST = "interactive"
SCA = "composition"
@dataclass
class CodeLocation:
file_path: str
line_number: int
column: Optional[int] = None
snippet: Optional[str] = None
function_name: Optional[str] = None
@dataclass
class HttpLocation:
url: str
method: str
parameter: Optional[str] = None
request_headers: Optional[Dict[str, str]] = None
response_code: Optional[int] = None
@dataclass
class Vulnerability:
id: str
title: str
vuln_type: VulnerabilityType
severity: Severity
phase: TestingPhase
description: str
remediation: str
confidence: float
code_location: Optional[CodeLocation] = None
http_location: Optional[HttpLocation] = None
cwe_id: Optional[str] = None
cvss_score: Optional[float] = None
evidence: List[str] = field(default_factory=list)
first_seen: datetime = field(default_factory=datetime.utcnow)
correlated_ids: Set[str] = field(default_factory=set)
class SecurityScanner(ABC):
@abstractmethod
def scan(self, target: str, config: Dict) -> List[Vulnerability]:
pass
@abstractmethod
def get_scanner_name(self) -> str:
pass
class SASTScanner(SecurityScanner):
def __init__(self):
self.rules = self._load_rules()
def _load_rules(self) -> List[Dict]:
return [
{
"id": "SAST-001",
"pattern": r"execute\s*\(\s*['\"]?\s*\+|executeQuery\s*\(\s*['\"]?\s*\+",
"vuln_type": VulnerabilityType.SQL_INJECTION,
"severity": Severity.CRITICAL,
"message": "Query SQL construit cu concatenare de string-uri",
"remediation": "Foloseste query-uri parametrizate sau prepared statements"
},
{
"id": "SAST-002",
"pattern": r"innerHTML\s*=|document\.write\s*\(",
"vuln_type": VulnerabilityType.XSS,
"severity": Severity.HIGH,
"message": "Vulnerabilitate potentiala XSS bazata pe DOM",
"remediation": "Foloseste textContent sau sanitizeaza inputul inainte de randare"
},
{
"id": "SAST-003",
"pattern": r"exec\s*\(|system\s*\(|popen\s*\(",
"vuln_type": VulnerabilityType.COMMAND_INJECTION,
"severity": Severity.CRITICAL,
"message": "Executie de comenzi cu potential input de la utilizator",
"remediation": "Foloseste subprocess cu shell=False si valideaza inputul"
},
{
"id": "SAST-004",
"pattern": r"open\s*\([^)]*\+|Path\s*\([^)]*\+",
"vuln_type": VulnerabilityType.PATH_TRAVERSAL,
"severity": Severity.HIGH,
"message": "Cale de fisier construita cu input de la utilizator",
"remediation": "Valideaza si sanitizeaza caile de fisier, foloseste allowlists"
},
{
"id": "SAST-005",
"pattern": r"pickle\.loads|yaml\.load\s*\([^)]*Loader",
"vuln_type": VulnerabilityType.INSECURE_DESERIALIZATION,
"severity": Severity.CRITICAL,
"message": "Deserializare nesigura detectata",
"remediation": "Foloseste safe_load pentru YAML, evita pickle pentru date nesigure"
}
]
def scan(self, target: str, config: Dict) -> List[Vulnerability]:
import re
import os
findings = []
extensions = config.get('extensions', ['.py', '.js', '.ts', '.java'])
for root, dirs, files in os.walk(target):
dirs[:] = [d for d in dirs if d not in ['node_modules', '.git', 'venv']]
for file in files:
if not any(file.endswith(ext) for ext in extensions):
continue
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
lines = content.split('\n')
except Exception:
continue
for rule in self.rules:
for i, line in enumerate(lines):
if re.search(rule['pattern'], line):
vuln_id = self._generate_id(file_path, i, rule['id'])
findings.append(Vulnerability(
id=vuln_id,
title=rule['message'],
vuln_type=rule['vuln_type'],
severity=rule['severity'],
phase=TestingPhase.SAST,
description=f"Pattern gasit care se potriveste cu {rule['id']}",
remediation=rule['remediation'],
confidence=0.8,
code_location=CodeLocation(
file_path=file_path,
line_number=i + 1,
snippet=line.strip()[:200]
),
cwe_id=self._get_cwe(rule['vuln_type']),
evidence=[f"Pattern: {rule['pattern']}", f"Match: {line.strip()[:100]}"]
))
return findings
def _generate_id(self, file_path: str, line: int, rule_id: str) -> str:
content = f"{file_path}:{line}:{rule_id}"
return f"SAST-{hashlib.sha256(content.encode()).hexdigest()[:12]}"
def _get_cwe(self, vuln_type: VulnerabilityType) -> str:
cwe_map = {
VulnerabilityType.SQL_INJECTION: "CWE-89",
VulnerabilityType.XSS: "CWE-79",
VulnerabilityType.COMMAND_INJECTION: "CWE-78",
VulnerabilityType.PATH_TRAVERSAL: "CWE-22",
VulnerabilityType.INSECURE_DESERIALIZATION: "CWE-502"
}
return cwe_map.get(vuln_type, "CWE-Unknown")
def get_scanner_name(self) -> str:
return "Custom SAST Scanner"
class DASTScanner(SecurityScanner):
def __init__(self):
self.payloads = self._load_payloads()
def _load_payloads(self) -> Dict[VulnerabilityType, List[str]]:
return {
VulnerabilityType.SQL_INJECTION: [
"' OR '1'='1",
"'; DROP TABLE users--",
"1' AND '1'='1",
"1 UNION SELECT NULL--"
],
VulnerabilityType.XSS: [
"<script>alert(1)</script>",
"<img src=x onerror=alert(1)>",
"javascript:alert(1)",
"<svg onload=alert(1)>"
],
VulnerabilityType.COMMAND_INJECTION: [
"; ls -la",
"| cat /etc/passwd",
"`id`",
"$(whoami)"
],
VulnerabilityType.PATH_TRAVERSAL: [
"../../../etc/passwd",
"....//....//etc/passwd",
"%2e%2e%2f%2e%2e%2fetc/passwd"
],
VulnerabilityType.SSRF: [
"http://localhost:22",
"http://169.254.169.254/latest/meta-data/",
"http://127.0.0.1:6379"
]
}
def scan(self, target: str, config: Dict) -> List[Vulnerability]:
import requests
from urllib.parse import urljoin, urlparse, parse_qs
findings = []
endpoints = config.get('endpoints', [])
timeout = config.get('timeout', 10)
for endpoint in endpoints:
url = urljoin(target, endpoint['path'])
method = endpoint.get('method', 'GET')
params = endpoint.get('params', {})
for vuln_type, payloads in self.payloads.items():
for param_name in params.keys():
for payload in payloads:
test_params = params.copy()
test_params[param_name] = payload
try:
if method == 'GET':
response = requests.get(url, params=test_params, timeout=timeout, verify=False)
else:
response = requests.post(url, data=test_params, timeout=timeout, verify=False)
if self._detect_vulnerability(response, vuln_type, payload):
vuln_id = self._generate_id(url, param_name, payload)
findings.append(Vulnerability(
id=vuln_id,
title=f"{vuln_type.value} in {param_name}",
vuln_type=vuln_type,
severity=self._get_severity(vuln_type),
phase=TestingPhase.DAST,
description=f"Parametrul {param_name} e vulnerabil la {vuln_type.value}",
remediation=self._get_remediation(vuln_type),
confidence=0.9,
http_location=HttpLocation(
url=url,
method=method,
parameter=param_name,
response_code=response.status_code
),
cwe_id=self._get_cwe(vuln_type),
evidence=[f"Payload: {payload}", f"Response: {response.text[:200]}"]
))
except requests.RequestException:
continue
return findings
def _detect_vulnerability(self, response, vuln_type: VulnerabilityType, payload: str) -> bool:
indicators = {
VulnerabilityType.SQL_INJECTION: [
"SQL syntax", "mysql_fetch", "ORA-", "PostgreSQL",
"sqlite3.OperationalError", "syntax error"
],
VulnerabilityType.XSS: [payload],
VulnerabilityType.COMMAND_INJECTION: [
"root:", "uid=", "bin/bash", "Permission denied"
],
VulnerabilityType.PATH_TRAVERSAL: [
"root:x:", "[boot loader]", "daemon:"
],
VulnerabilityType.SSRF: ["metadata", "localhost", "internal"]
}
text = response.text.lower()
for indicator in indicators.get(vuln_type, []):
if indicator.lower() in text:
return True
return False
def _generate_id(self, url: str, param: str, payload: str) -> str:
content = f"{url}:{param}:{payload[:20]}"
return f"DAST-{hashlib.sha256(content.encode()).hexdigest()[:12]}"
def _get_severity(self, vuln_type: VulnerabilityType) -> Severity:
severity_map = {
VulnerabilityType.SQL_INJECTION: Severity.CRITICAL,
VulnerabilityType.COMMAND_INJECTION: Severity.CRITICAL,
VulnerabilityType.XSS: Severity.HIGH,
VulnerabilityType.PATH_TRAVERSAL: Severity.HIGH,
VulnerabilityType.SSRF: Severity.HIGH
}
return severity_map.get(vuln_type, Severity.MEDIUM)
def _get_cwe(self, vuln_type: VulnerabilityType) -> str:
cwe_map = {
VulnerabilityType.SQL_INJECTION: "CWE-89",
VulnerabilityType.XSS: "CWE-79",
VulnerabilityType.COMMAND_INJECTION: "CWE-78",
VulnerabilityType.PATH_TRAVERSAL: "CWE-22",
VulnerabilityType.SSRF: "CWE-918"
}
return cwe_map.get(vuln_type, "CWE-Unknown")
def _get_remediation(self, vuln_type: VulnerabilityType) -> str:
remediation_map = {
VulnerabilityType.SQL_INJECTION: "Foloseste query-uri parametrizate",
VulnerabilityType.XSS: "Encodeaza outputul si valideaza inputul",
VulnerabilityType.COMMAND_INJECTION: "Evita comenzile shell, foloseste API-uri sigure",
VulnerabilityType.PATH_TRAVERSAL: "Valideaza si sanitizeaza caile de fisier",
VulnerabilityType.SSRF: "Valideaza URL-urile si foloseste allowlists"
}
return remediation_map.get(vuln_type, "Revizuieste si remediaza vulnerabilitatea")
def get_scanner_name(self) -> str:
return "Custom DAST Scanner"Motor de Corelatie a Vulnerabilitatilor
Coreleaza descoperirile din SAST si DAST:
from typing import List, Dict, Tuple
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class CorrelatedVulnerability:
primary: Vulnerability
correlated: List[Vulnerability]
correlation_score: float
verified: bool
combined_severity: Severity
class VulnerabilityCorrelator:
def __init__(self):
self.correlation_rules = self._load_correlation_rules()
def _load_correlation_rules(self) -> List[Dict]:
return [
{
"sast_pattern": "SQL query built with string concatenation",
"dast_vuln_type": VulnerabilityType.SQL_INJECTION,
"weight": 1.0
},
{
"sast_pattern": "DOM-based XSS",
"dast_vuln_type": VulnerabilityType.XSS,
"weight": 0.9
},
{
"sast_pattern": "Command execution",
"dast_vuln_type": VulnerabilityType.COMMAND_INJECTION,
"weight": 1.0
},
{
"sast_pattern": "File path constructed",
"dast_vuln_type": VulnerabilityType.PATH_TRAVERSAL,
"weight": 0.85
}
]
def correlate(
self,
sast_findings: List[Vulnerability],
dast_findings: List[Vulnerability]
) -> List[CorrelatedVulnerability]:
correlated = []
used_dast = set()
sast_by_type = defaultdict(list)
for finding in sast_findings:
sast_by_type[finding.vuln_type].append(finding)
dast_by_type = defaultdict(list)
for finding in dast_findings:
dast_by_type[finding.vuln_type].append(finding)
for vuln_type in VulnerabilityType:
sast_vulns = sast_by_type.get(vuln_type, [])
dast_vulns = dast_by_type.get(vuln_type, [])
for sast_vuln in sast_vulns:
best_match = None
best_score = 0.0
for dast_vuln in dast_vulns:
if dast_vuln.id in used_dast:
continue
score = self._calculate_correlation_score(sast_vuln, dast_vuln)
if score > best_score and score > 0.5:
best_score = score
best_match = dast_vuln
if best_match:
used_dast.add(best_match.id)
sast_vuln.correlated_ids.add(best_match.id)
best_match.correlated_ids.add(sast_vuln.id)
correlated.append(CorrelatedVulnerability(
primary=sast_vuln,
correlated=[best_match],
correlation_score=best_score,
verified=True,
combined_severity=self._combine_severity(sast_vuln, best_match)
))
else:
correlated.append(CorrelatedVulnerability(
primary=sast_vuln,
correlated=[],
correlation_score=0.0,
verified=False,
combined_severity=sast_vuln.severity
))
for dast_vuln in dast_findings:
if dast_vuln.id not in used_dast:
correlated.append(CorrelatedVulnerability(
primary=dast_vuln,
correlated=[],
correlation_score=0.0,
verified=True,
combined_severity=dast_vuln.severity
))
return sorted(correlated, key=lambda x: (x.combined_severity.value, x.correlation_score), reverse=True)
def _calculate_correlation_score(
self,
sast_vuln: Vulnerability,
dast_vuln: Vulnerability
) -> float:
if sast_vuln.vuln_type != dast_vuln.vuln_type:
return 0.0
score = 0.5
if sast_vuln.code_location and dast_vuln.http_location:
file_name = sast_vuln.code_location.file_path.split('/')[-1]
endpoint = dast_vuln.http_location.url.split('/')[-1]
if self._names_related(file_name, endpoint):
score += 0.3
if sast_vuln.cwe_id == dast_vuln.cwe_id:
score += 0.2
return min(score, 1.0)
def _names_related(self, file_name: str, endpoint: str) -> bool:
file_base = file_name.replace('.py', '').replace('.js', '').lower()
endpoint_base = endpoint.split('?')[0].lower()
return file_base in endpoint_base or endpoint_base in file_base
def _combine_severity(self, v1: Vulnerability, v2: Vulnerability) -> Severity:
if v1.severity.value >= Severity.HIGH.value and v2.severity.value >= Severity.HIGH.value:
return Severity.CRITICAL
return max(v1.severity, v2.severity, key=lambda x: x.value)Integrare Pipeline CI/CD
Integreaza testarea de securitate in pipeline-uri:
# .github/workflows/security-testing.yml
name: Application Security Testing
on:
pull_request:
branches: [main, develop]
push:
branches: [main]
jobs:
sast:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run Semgrep SAST
uses: returntocorp/semgrep-action@v1
with:
config: >-
p/security-audit
p/owasp-top-ten
p/python
p/javascript
- name: Run Bandit (Python)
run: |
pip install bandit
bandit -r src/ -f json -o bandit-results.json || true
- name: Run ESLint Security
run: |
npm install eslint-plugin-security
npx eslint --ext .js,.ts src/ -f json -o eslint-results.json || true
- name: Upload SAST Results
uses: actions/upload-artifact@v4
with:
name: sast-results
path: |
bandit-results.json
eslint-results.json
dast:
runs-on: ubuntu-latest
needs: build
services:
app:
image: ${{ needs.build.outputs.image }}
ports:
- 8080:8080
steps:
- uses: actions/checkout@v4
- name: Wait for Application
run: |
timeout 60 bash -c 'until curl -s http://localhost:8080/health; do sleep 2; done'
- name: Run OWASP ZAP
uses: zaproxy/action-full-scan@v0.8.0
with:
target: 'http://localhost:8080'
rules_file_name: '.zap/rules.tsv'
cmd_options: '-a -j'
- name: Run Nuclei
run: |
docker run --network host projectdiscovery/nuclei:latest \
-u http://localhost:8080 \
-t cves/ -t vulnerabilities/ \
-json -o nuclei-results.json
- name: Upload DAST Results
uses: actions/upload-artifact@v4
with:
name: dast-results
path: |
zap-report.json
nuclei-results.json
correlate:
runs-on: ubuntu-latest
needs: [sast, dast]
steps:
- uses: actions/checkout@v4
- name: Download All Results
uses: actions/download-artifact@v4
- name: Run Correlation Engine
run: |
python scripts/correlate_findings.py \
--sast-dir sast-results/ \
--dast-dir dast-results/ \
--output correlated-report.json
- name: Check Thresholds
run: |
python scripts/check_thresholds.py \
--report correlated-report.json \
--max-critical 0 \
--max-high 5
- name: Upload Correlated Report
uses: actions/upload-artifact@v4
with:
name: security-report
path: correlated-report.jsonDashboard Unificat de Raportare
Genereaza rapoarte de securitate complete:
from typing import List, Dict
from datetime import datetime
import json
class SecurityReportGenerator:
def __init__(self):
self.report_data = {}
def generate_report(
self,
correlated_findings: List[CorrelatedVulnerability],
project_name: str,
scan_config: Dict
) -> Dict:
report = {
"metadata": {
"project": project_name,
"generated_at": datetime.utcnow().isoformat(),
"scan_config": scan_config
},
"summary": self._generate_summary(correlated_findings),
"findings": self._format_findings(correlated_findings),
"trends": self._calculate_trends(correlated_findings),
"recommendations": self._generate_recommendations(correlated_findings)
}
self.report_data = report
return report
def _generate_summary(self, findings: List[CorrelatedVulnerability]) -> Dict:
severity_counts = {s.name: 0 for s in Severity}
verified_count = 0
correlated_count = 0
for finding in findings:
severity_counts[finding.combined_severity.name] += 1
if finding.verified:
verified_count += 1
if finding.correlation_score > 0:
correlated_count += 1
return {
"total_findings": len(findings),
"by_severity": severity_counts,
"verified_findings": verified_count,
"correlated_findings": correlated_count,
"verification_rate": verified_count / len(findings) if findings else 0,
"risk_score": self._calculate_risk_score(findings)
}
def _calculate_risk_score(self, findings: List[CorrelatedVulnerability]) -> float:
weights = {
Severity.CRITICAL: 10,
Severity.HIGH: 5,
Severity.MEDIUM: 2,
Severity.LOW: 1,
Severity.INFO: 0
}
total_score = sum(
weights[f.combined_severity] * (1.5 if f.verified else 1.0)
for f in findings
)
return min(total_score / 10, 100)
def _format_findings(self, findings: List[CorrelatedVulnerability]) -> List[Dict]:
formatted = []
for finding in findings:
primary = finding.primary
entry = {
"id": primary.id,
"title": primary.title,
"severity": finding.combined_severity.name,
"type": primary.vuln_type.value,
"phase": primary.phase.value,
"verified": finding.verified,
"correlation_score": finding.correlation_score,
"cwe": primary.cwe_id,
"description": primary.description,
"remediation": primary.remediation,
"evidence": primary.evidence
}
if primary.code_location:
entry["code_location"] = {
"file": primary.code_location.file_path,
"line": primary.code_location.line_number,
"snippet": primary.code_location.snippet
}
if primary.http_location:
entry["http_location"] = {
"url": primary.http_location.url,
"method": primary.http_location.method,
"parameter": primary.http_location.parameter
}
if finding.correlated:
entry["correlated_findings"] = [
{"id": c.id, "phase": c.phase.value}
for c in finding.correlated
]
formatted.append(entry)
return formatted
def _calculate_trends(self, findings: List[CorrelatedVulnerability]) -> Dict:
by_type = {}
for finding in findings:
vuln_type = finding.primary.vuln_type.value
if vuln_type not in by_type:
by_type[vuln_type] = {"count": 0, "severities": []}
by_type[vuln_type]["count"] += 1
by_type[vuln_type]["severities"].append(finding.combined_severity.name)
return {
"by_vulnerability_type": by_type,
"most_common": max(by_type.items(), key=lambda x: x[1]["count"])[0] if by_type else None
}
def _generate_recommendations(self, findings: List[CorrelatedVulnerability]) -> List[Dict]:
recommendations = []
critical_count = sum(1 for f in findings if f.combined_severity == Severity.CRITICAL)
if critical_count > 0:
recommendations.append({
"priority": "IMEDIAT",
"action": f"Rezolva {critical_count} vulnerabilitati critice inainte de deployment",
"impact": "Previne o potentiala bresa de date sau compromiterea sistemului"
})
verified_high = [f for f in findings if f.verified and f.combined_severity.value >= Severity.HIGH.value]
if verified_high:
recommendations.append({
"priority": "RIDICAT",
"action": f"Remediaza {len(verified_high)} probleme verificate de severitate ridicata",
"impact": "Reduce semnificativ suprafata de atac"
})
sqli_findings = [f for f in findings if f.primary.vuln_type == VulnerabilityType.SQL_INJECTION]
if sqli_findings:
recommendations.append({
"priority": "RIDICAT",
"action": "Implementeaza query-uri parametrizate in toate interactiunile cu baza de date",
"impact": "Elimina vectorul de atac SQL injection"
})
return recommendations
def export_sarif(self) -> Dict:
"""Exporta descoperirile in format SARIF pentru integrare GitHub."""
sarif = {
"$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
"version": "2.1.0",
"runs": [{
"tool": {
"driver": {
"name": "Unified Security Scanner",
"version": "1.0.0",
"rules": []
}
},
"results": []
}]
}
rules_map = {}
for finding in self.report_data.get("findings", []):
rule_id = f"{finding['type']}-{finding['severity']}"
if rule_id not in rules_map:
rules_map[rule_id] = {
"id": rule_id,
"name": finding["title"],
"shortDescription": {"text": finding["description"]},
"defaultConfiguration": {"level": self._sarif_level(finding["severity"])}
}
sarif["runs"][0]["tool"]["driver"]["rules"].append(rules_map[rule_id])
result = {
"ruleId": rule_id,
"message": {"text": finding["description"]},
"level": self._sarif_level(finding["severity"])
}
if "code_location" in finding:
result["locations"] = [{
"physicalLocation": {
"artifactLocation": {"uri": finding["code_location"]["file"]},
"region": {"startLine": finding["code_location"]["line"]}
}
}]
sarif["runs"][0]["results"].append(result)
return sarif
def _sarif_level(self, severity: str) -> str:
level_map = {
"CRITICAL": "error",
"HIGH": "error",
"MEDIUM": "warning",
"LOW": "note",
"INFO": "note"
}
return level_map.get(severity, "warning")Concluzie
Combinarea SAST si DAST ofera acoperire completa de securitate. SAST gaseste probleme devreme in cod, in timp ce DAST valideaza exploatabilitatea la runtime. Corelatia dintre descoperiri creste increderea si prioritizeaza remedierea. Integreaza ambele in pipeline-uri CI/CD cu raportare unificata pentru asigurare continua a securitatii.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →