DevSecOps

Integrarea SAST si DAST pentru Securitate Completa a Aplicatiilor

Nicu Constantin
--11 min lectura
#sast#dast#appsec#vulnerability-management#ci-cd-security

Combinarea SAST si DAST ofera acoperire completa a securitatii aplicatiilor. Acest ghid acopera integrarea ambelor metodologii de testare in pipeline-uri CI/CD cu management unificat al vulnerabilitatilor.

Framework Unificat de Testare a Securitatii

Construieste un framework care coordoneaza SAST si DAST:

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Set
from enum import Enum
from datetime import datetime
from abc import ABC, abstractmethod
import hashlib
import json
 
class VulnerabilityType(Enum):
    SQL_INJECTION = "sql_injection"
    XSS = "cross_site_scripting"
    COMMAND_INJECTION = "command_injection"
    PATH_TRAVERSAL = "path_traversal"
    SSRF = "server_side_request_forgery"
    INSECURE_DESERIALIZATION = "insecure_deserialization"
    BROKEN_AUTH = "broken_authentication"
    SENSITIVE_DATA_EXPOSURE = "sensitive_data_exposure"
    XXE = "xml_external_entity"
    SECURITY_MISCONFIGURATION = "security_misconfiguration"
 
class Severity(Enum):
    CRITICAL = 4
    HIGH = 3
    MEDIUM = 2
    LOW = 1
    INFO = 0
 
class TestingPhase(Enum):
    SAST = "static"
    DAST = "dynamic"
    IAST = "interactive"
    SCA = "composition"
 
@dataclass
class CodeLocation:
    file_path: str
    line_number: int
    column: Optional[int] = None
    snippet: Optional[str] = None
    function_name: Optional[str] = None
 
@dataclass
class HttpLocation:
    url: str
    method: str
    parameter: Optional[str] = None
    request_headers: Optional[Dict[str, str]] = None
    response_code: Optional[int] = None
 
@dataclass
class Vulnerability:
    id: str
    title: str
    vuln_type: VulnerabilityType
    severity: Severity
    phase: TestingPhase
    description: str
    remediation: str
    confidence: float
    code_location: Optional[CodeLocation] = None
    http_location: Optional[HttpLocation] = None
    cwe_id: Optional[str] = None
    cvss_score: Optional[float] = None
    evidence: List[str] = field(default_factory=list)
    first_seen: datetime = field(default_factory=datetime.utcnow)
    correlated_ids: Set[str] = field(default_factory=set)
 
class SecurityScanner(ABC):
    @abstractmethod
    def scan(self, target: str, config: Dict) -> List[Vulnerability]:
        pass
 
    @abstractmethod
    def get_scanner_name(self) -> str:
        pass
 
class SASTScanner(SecurityScanner):
    def __init__(self):
        self.rules = self._load_rules()
 
    def _load_rules(self) -> List[Dict]:
        return [
            {
                "id": "SAST-001",
                "pattern": r"execute\s*\(\s*['\"]?\s*\+|executeQuery\s*\(\s*['\"]?\s*\+",
                "vuln_type": VulnerabilityType.SQL_INJECTION,
                "severity": Severity.CRITICAL,
                "message": "Query SQL construit cu concatenare de string-uri",
                "remediation": "Foloseste query-uri parametrizate sau prepared statements"
            },
            {
                "id": "SAST-002",
                "pattern": r"innerHTML\s*=|document\.write\s*\(",
                "vuln_type": VulnerabilityType.XSS,
                "severity": Severity.HIGH,
                "message": "Vulnerabilitate potentiala XSS bazata pe DOM",
                "remediation": "Foloseste textContent sau sanitizeaza inputul inainte de randare"
            },
            {
                "id": "SAST-003",
                "pattern": r"exec\s*\(|system\s*\(|popen\s*\(",
                "vuln_type": VulnerabilityType.COMMAND_INJECTION,
                "severity": Severity.CRITICAL,
                "message": "Executie de comenzi cu potential input de la utilizator",
                "remediation": "Foloseste subprocess cu shell=False si valideaza inputul"
            },
            {
                "id": "SAST-004",
                "pattern": r"open\s*\([^)]*\+|Path\s*\([^)]*\+",
                "vuln_type": VulnerabilityType.PATH_TRAVERSAL,
                "severity": Severity.HIGH,
                "message": "Cale de fisier construita cu input de la utilizator",
                "remediation": "Valideaza si sanitizeaza caile de fisier, foloseste allowlists"
            },
            {
                "id": "SAST-005",
                "pattern": r"pickle\.loads|yaml\.load\s*\([^)]*Loader",
                "vuln_type": VulnerabilityType.INSECURE_DESERIALIZATION,
                "severity": Severity.CRITICAL,
                "message": "Deserializare nesigura detectata",
                "remediation": "Foloseste safe_load pentru YAML, evita pickle pentru date nesigure"
            }
        ]
 
    def scan(self, target: str, config: Dict) -> List[Vulnerability]:
        import re
        import os
 
        findings = []
        extensions = config.get('extensions', ['.py', '.js', '.ts', '.java'])
 
        for root, dirs, files in os.walk(target):
            dirs[:] = [d for d in dirs if d not in ['node_modules', '.git', 'venv']]
 
            for file in files:
                if not any(file.endswith(ext) for ext in extensions):
                    continue
 
                file_path = os.path.join(root, file)
                try:
                    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                        content = f.read()
                        lines = content.split('\n')
                except Exception:
                    continue
 
                for rule in self.rules:
                    for i, line in enumerate(lines):
                        if re.search(rule['pattern'], line):
                            vuln_id = self._generate_id(file_path, i, rule['id'])
                            findings.append(Vulnerability(
                                id=vuln_id,
                                title=rule['message'],
                                vuln_type=rule['vuln_type'],
                                severity=rule['severity'],
                                phase=TestingPhase.SAST,
                                description=f"Pattern gasit care se potriveste cu {rule['id']}",
                                remediation=rule['remediation'],
                                confidence=0.8,
                                code_location=CodeLocation(
                                    file_path=file_path,
                                    line_number=i + 1,
                                    snippet=line.strip()[:200]
                                ),
                                cwe_id=self._get_cwe(rule['vuln_type']),
                                evidence=[f"Pattern: {rule['pattern']}", f"Match: {line.strip()[:100]}"]
                            ))
 
        return findings
 
    def _generate_id(self, file_path: str, line: int, rule_id: str) -> str:
        content = f"{file_path}:{line}:{rule_id}"
        return f"SAST-{hashlib.sha256(content.encode()).hexdigest()[:12]}"
 
    def _get_cwe(self, vuln_type: VulnerabilityType) -> str:
        cwe_map = {
            VulnerabilityType.SQL_INJECTION: "CWE-89",
            VulnerabilityType.XSS: "CWE-79",
            VulnerabilityType.COMMAND_INJECTION: "CWE-78",
            VulnerabilityType.PATH_TRAVERSAL: "CWE-22",
            VulnerabilityType.INSECURE_DESERIALIZATION: "CWE-502"
        }
        return cwe_map.get(vuln_type, "CWE-Unknown")
 
    def get_scanner_name(self) -> str:
        return "Custom SAST Scanner"
 
class DASTScanner(SecurityScanner):
    def __init__(self):
        self.payloads = self._load_payloads()
 
    def _load_payloads(self) -> Dict[VulnerabilityType, List[str]]:
        return {
            VulnerabilityType.SQL_INJECTION: [
                "' OR '1'='1",
                "'; DROP TABLE users--",
                "1' AND '1'='1",
                "1 UNION SELECT NULL--"
            ],
            VulnerabilityType.XSS: [
                "<script>alert(1)</script>",
                "<img src=x onerror=alert(1)>",
                "javascript:alert(1)",
                "<svg onload=alert(1)>"
            ],
            VulnerabilityType.COMMAND_INJECTION: [
                "; ls -la",
                "| cat /etc/passwd",
                "`id`",
                "$(whoami)"
            ],
            VulnerabilityType.PATH_TRAVERSAL: [
                "../../../etc/passwd",
                "....//....//etc/passwd",
                "%2e%2e%2f%2e%2e%2fetc/passwd"
            ],
            VulnerabilityType.SSRF: [
                "http://localhost:22",
                "http://169.254.169.254/latest/meta-data/",
                "http://127.0.0.1:6379"
            ]
        }
 
    def scan(self, target: str, config: Dict) -> List[Vulnerability]:
        import requests
        from urllib.parse import urljoin, urlparse, parse_qs
 
        findings = []
        endpoints = config.get('endpoints', [])
        timeout = config.get('timeout', 10)
 
        for endpoint in endpoints:
            url = urljoin(target, endpoint['path'])
            method = endpoint.get('method', 'GET')
            params = endpoint.get('params', {})
 
            for vuln_type, payloads in self.payloads.items():
                for param_name in params.keys():
                    for payload in payloads:
                        test_params = params.copy()
                        test_params[param_name] = payload
 
                        try:
                            if method == 'GET':
                                response = requests.get(url, params=test_params, timeout=timeout, verify=False)
                            else:
                                response = requests.post(url, data=test_params, timeout=timeout, verify=False)
 
                            if self._detect_vulnerability(response, vuln_type, payload):
                                vuln_id = self._generate_id(url, param_name, payload)
                                findings.append(Vulnerability(
                                    id=vuln_id,
                                    title=f"{vuln_type.value} in {param_name}",
                                    vuln_type=vuln_type,
                                    severity=self._get_severity(vuln_type),
                                    phase=TestingPhase.DAST,
                                    description=f"Parametrul {param_name} e vulnerabil la {vuln_type.value}",
                                    remediation=self._get_remediation(vuln_type),
                                    confidence=0.9,
                                    http_location=HttpLocation(
                                        url=url,
                                        method=method,
                                        parameter=param_name,
                                        response_code=response.status_code
                                    ),
                                    cwe_id=self._get_cwe(vuln_type),
                                    evidence=[f"Payload: {payload}", f"Response: {response.text[:200]}"]
                                ))
                        except requests.RequestException:
                            continue
 
        return findings
 
    def _detect_vulnerability(self, response, vuln_type: VulnerabilityType, payload: str) -> bool:
        indicators = {
            VulnerabilityType.SQL_INJECTION: [
                "SQL syntax", "mysql_fetch", "ORA-", "PostgreSQL",
                "sqlite3.OperationalError", "syntax error"
            ],
            VulnerabilityType.XSS: [payload],
            VulnerabilityType.COMMAND_INJECTION: [
                "root:", "uid=", "bin/bash", "Permission denied"
            ],
            VulnerabilityType.PATH_TRAVERSAL: [
                "root:x:", "[boot loader]", "daemon:"
            ],
            VulnerabilityType.SSRF: ["metadata", "localhost", "internal"]
        }
 
        text = response.text.lower()
        for indicator in indicators.get(vuln_type, []):
            if indicator.lower() in text:
                return True
        return False
 
    def _generate_id(self, url: str, param: str, payload: str) -> str:
        content = f"{url}:{param}:{payload[:20]}"
        return f"DAST-{hashlib.sha256(content.encode()).hexdigest()[:12]}"
 
    def _get_severity(self, vuln_type: VulnerabilityType) -> Severity:
        severity_map = {
            VulnerabilityType.SQL_INJECTION: Severity.CRITICAL,
            VulnerabilityType.COMMAND_INJECTION: Severity.CRITICAL,
            VulnerabilityType.XSS: Severity.HIGH,
            VulnerabilityType.PATH_TRAVERSAL: Severity.HIGH,
            VulnerabilityType.SSRF: Severity.HIGH
        }
        return severity_map.get(vuln_type, Severity.MEDIUM)
 
    def _get_cwe(self, vuln_type: VulnerabilityType) -> str:
        cwe_map = {
            VulnerabilityType.SQL_INJECTION: "CWE-89",
            VulnerabilityType.XSS: "CWE-79",
            VulnerabilityType.COMMAND_INJECTION: "CWE-78",
            VulnerabilityType.PATH_TRAVERSAL: "CWE-22",
            VulnerabilityType.SSRF: "CWE-918"
        }
        return cwe_map.get(vuln_type, "CWE-Unknown")
 
    def _get_remediation(self, vuln_type: VulnerabilityType) -> str:
        remediation_map = {
            VulnerabilityType.SQL_INJECTION: "Foloseste query-uri parametrizate",
            VulnerabilityType.XSS: "Encodeaza outputul si valideaza inputul",
            VulnerabilityType.COMMAND_INJECTION: "Evita comenzile shell, foloseste API-uri sigure",
            VulnerabilityType.PATH_TRAVERSAL: "Valideaza si sanitizeaza caile de fisier",
            VulnerabilityType.SSRF: "Valideaza URL-urile si foloseste allowlists"
        }
        return remediation_map.get(vuln_type, "Revizuieste si remediaza vulnerabilitatea")
 
    def get_scanner_name(self) -> str:
        return "Custom DAST Scanner"

Motor de Corelatie a Vulnerabilitatilor

Coreleaza descoperirile din SAST si DAST:

from typing import List, Dict, Tuple
from dataclasses import dataclass
from collections import defaultdict
 
@dataclass
class CorrelatedVulnerability:
    primary: Vulnerability
    correlated: List[Vulnerability]
    correlation_score: float
    verified: bool
    combined_severity: Severity
 
class VulnerabilityCorrelator:
    def __init__(self):
        self.correlation_rules = self._load_correlation_rules()
 
    def _load_correlation_rules(self) -> List[Dict]:
        return [
            {
                "sast_pattern": "SQL query built with string concatenation",
                "dast_vuln_type": VulnerabilityType.SQL_INJECTION,
                "weight": 1.0
            },
            {
                "sast_pattern": "DOM-based XSS",
                "dast_vuln_type": VulnerabilityType.XSS,
                "weight": 0.9
            },
            {
                "sast_pattern": "Command execution",
                "dast_vuln_type": VulnerabilityType.COMMAND_INJECTION,
                "weight": 1.0
            },
            {
                "sast_pattern": "File path constructed",
                "dast_vuln_type": VulnerabilityType.PATH_TRAVERSAL,
                "weight": 0.85
            }
        ]
 
    def correlate(
        self,
        sast_findings: List[Vulnerability],
        dast_findings: List[Vulnerability]
    ) -> List[CorrelatedVulnerability]:
        correlated = []
        used_dast = set()
 
        sast_by_type = defaultdict(list)
        for finding in sast_findings:
            sast_by_type[finding.vuln_type].append(finding)
 
        dast_by_type = defaultdict(list)
        for finding in dast_findings:
            dast_by_type[finding.vuln_type].append(finding)
 
        for vuln_type in VulnerabilityType:
            sast_vulns = sast_by_type.get(vuln_type, [])
            dast_vulns = dast_by_type.get(vuln_type, [])
 
            for sast_vuln in sast_vulns:
                best_match = None
                best_score = 0.0
 
                for dast_vuln in dast_vulns:
                    if dast_vuln.id in used_dast:
                        continue
 
                    score = self._calculate_correlation_score(sast_vuln, dast_vuln)
                    if score > best_score and score > 0.5:
                        best_score = score
                        best_match = dast_vuln
 
                if best_match:
                    used_dast.add(best_match.id)
                    sast_vuln.correlated_ids.add(best_match.id)
                    best_match.correlated_ids.add(sast_vuln.id)
 
                    correlated.append(CorrelatedVulnerability(
                        primary=sast_vuln,
                        correlated=[best_match],
                        correlation_score=best_score,
                        verified=True,
                        combined_severity=self._combine_severity(sast_vuln, best_match)
                    ))
                else:
                    correlated.append(CorrelatedVulnerability(
                        primary=sast_vuln,
                        correlated=[],
                        correlation_score=0.0,
                        verified=False,
                        combined_severity=sast_vuln.severity
                    ))
 
        for dast_vuln in dast_findings:
            if dast_vuln.id not in used_dast:
                correlated.append(CorrelatedVulnerability(
                    primary=dast_vuln,
                    correlated=[],
                    correlation_score=0.0,
                    verified=True,
                    combined_severity=dast_vuln.severity
                ))
 
        return sorted(correlated, key=lambda x: (x.combined_severity.value, x.correlation_score), reverse=True)
 
    def _calculate_correlation_score(
        self,
        sast_vuln: Vulnerability,
        dast_vuln: Vulnerability
    ) -> float:
        if sast_vuln.vuln_type != dast_vuln.vuln_type:
            return 0.0
 
        score = 0.5
 
        if sast_vuln.code_location and dast_vuln.http_location:
            file_name = sast_vuln.code_location.file_path.split('/')[-1]
            endpoint = dast_vuln.http_location.url.split('/')[-1]
 
            if self._names_related(file_name, endpoint):
                score += 0.3
 
        if sast_vuln.cwe_id == dast_vuln.cwe_id:
            score += 0.2
 
        return min(score, 1.0)
 
    def _names_related(self, file_name: str, endpoint: str) -> bool:
        file_base = file_name.replace('.py', '').replace('.js', '').lower()
        endpoint_base = endpoint.split('?')[0].lower()
        return file_base in endpoint_base or endpoint_base in file_base
 
    def _combine_severity(self, v1: Vulnerability, v2: Vulnerability) -> Severity:
        if v1.severity.value >= Severity.HIGH.value and v2.severity.value >= Severity.HIGH.value:
            return Severity.CRITICAL
        return max(v1.severity, v2.severity, key=lambda x: x.value)

Integrare Pipeline CI/CD

Integreaza testarea de securitate in pipeline-uri:

# .github/workflows/security-testing.yml
name: Application Security Testing
 
on:
  pull_request:
    branches: [main, develop]
  push:
    branches: [main]
 
jobs:
  sast:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Run Semgrep SAST
        uses: returntocorp/semgrep-action@v1
        with:
          config: >-
            p/security-audit
            p/owasp-top-ten
            p/python
            p/javascript
 
      - name: Run Bandit (Python)
        run: |
          pip install bandit
          bandit -r src/ -f json -o bandit-results.json || true
 
      - name: Run ESLint Security
        run: |
          npm install eslint-plugin-security
          npx eslint --ext .js,.ts src/ -f json -o eslint-results.json || true
 
      - name: Upload SAST Results
        uses: actions/upload-artifact@v4
        with:
          name: sast-results
          path: |
            bandit-results.json
            eslint-results.json
 
  dast:
    runs-on: ubuntu-latest
    needs: build
    services:
      app:
        image: ${{ needs.build.outputs.image }}
        ports:
          - 8080:8080
 
    steps:
      - uses: actions/checkout@v4
 
      - name: Wait for Application
        run: |
          timeout 60 bash -c 'until curl -s http://localhost:8080/health; do sleep 2; done'
 
      - name: Run OWASP ZAP
        uses: zaproxy/action-full-scan@v0.8.0
        with:
          target: 'http://localhost:8080'
          rules_file_name: '.zap/rules.tsv'
          cmd_options: '-a -j'
 
      - name: Run Nuclei
        run: |
          docker run --network host projectdiscovery/nuclei:latest \
            -u http://localhost:8080 \
            -t cves/ -t vulnerabilities/ \
            -json -o nuclei-results.json
 
      - name: Upload DAST Results
        uses: actions/upload-artifact@v4
        with:
          name: dast-results
          path: |
            zap-report.json
            nuclei-results.json
 
  correlate:
    runs-on: ubuntu-latest
    needs: [sast, dast]
    steps:
      - uses: actions/checkout@v4
 
      - name: Download All Results
        uses: actions/download-artifact@v4
 
      - name: Run Correlation Engine
        run: |
          python scripts/correlate_findings.py \
            --sast-dir sast-results/ \
            --dast-dir dast-results/ \
            --output correlated-report.json
 
      - name: Check Thresholds
        run: |
          python scripts/check_thresholds.py \
            --report correlated-report.json \
            --max-critical 0 \
            --max-high 5
 
      - name: Upload Correlated Report
        uses: actions/upload-artifact@v4
        with:
          name: security-report
          path: correlated-report.json

Dashboard Unificat de Raportare

Genereaza rapoarte de securitate complete:

from typing import List, Dict
from datetime import datetime
import json
 
class SecurityReportGenerator:
    def __init__(self):
        self.report_data = {}
 
    def generate_report(
        self,
        correlated_findings: List[CorrelatedVulnerability],
        project_name: str,
        scan_config: Dict
    ) -> Dict:
        report = {
            "metadata": {
                "project": project_name,
                "generated_at": datetime.utcnow().isoformat(),
                "scan_config": scan_config
            },
            "summary": self._generate_summary(correlated_findings),
            "findings": self._format_findings(correlated_findings),
            "trends": self._calculate_trends(correlated_findings),
            "recommendations": self._generate_recommendations(correlated_findings)
        }
 
        self.report_data = report
        return report
 
    def _generate_summary(self, findings: List[CorrelatedVulnerability]) -> Dict:
        severity_counts = {s.name: 0 for s in Severity}
        verified_count = 0
        correlated_count = 0
 
        for finding in findings:
            severity_counts[finding.combined_severity.name] += 1
            if finding.verified:
                verified_count += 1
            if finding.correlation_score > 0:
                correlated_count += 1
 
        return {
            "total_findings": len(findings),
            "by_severity": severity_counts,
            "verified_findings": verified_count,
            "correlated_findings": correlated_count,
            "verification_rate": verified_count / len(findings) if findings else 0,
            "risk_score": self._calculate_risk_score(findings)
        }
 
    def _calculate_risk_score(self, findings: List[CorrelatedVulnerability]) -> float:
        weights = {
            Severity.CRITICAL: 10,
            Severity.HIGH: 5,
            Severity.MEDIUM: 2,
            Severity.LOW: 1,
            Severity.INFO: 0
        }
 
        total_score = sum(
            weights[f.combined_severity] * (1.5 if f.verified else 1.0)
            for f in findings
        )
 
        return min(total_score / 10, 100)
 
    def _format_findings(self, findings: List[CorrelatedVulnerability]) -> List[Dict]:
        formatted = []
        for finding in findings:
            primary = finding.primary
            entry = {
                "id": primary.id,
                "title": primary.title,
                "severity": finding.combined_severity.name,
                "type": primary.vuln_type.value,
                "phase": primary.phase.value,
                "verified": finding.verified,
                "correlation_score": finding.correlation_score,
                "cwe": primary.cwe_id,
                "description": primary.description,
                "remediation": primary.remediation,
                "evidence": primary.evidence
            }
 
            if primary.code_location:
                entry["code_location"] = {
                    "file": primary.code_location.file_path,
                    "line": primary.code_location.line_number,
                    "snippet": primary.code_location.snippet
                }
 
            if primary.http_location:
                entry["http_location"] = {
                    "url": primary.http_location.url,
                    "method": primary.http_location.method,
                    "parameter": primary.http_location.parameter
                }
 
            if finding.correlated:
                entry["correlated_findings"] = [
                    {"id": c.id, "phase": c.phase.value}
                    for c in finding.correlated
                ]
 
            formatted.append(entry)
 
        return formatted
 
    def _calculate_trends(self, findings: List[CorrelatedVulnerability]) -> Dict:
        by_type = {}
        for finding in findings:
            vuln_type = finding.primary.vuln_type.value
            if vuln_type not in by_type:
                by_type[vuln_type] = {"count": 0, "severities": []}
            by_type[vuln_type]["count"] += 1
            by_type[vuln_type]["severities"].append(finding.combined_severity.name)
 
        return {
            "by_vulnerability_type": by_type,
            "most_common": max(by_type.items(), key=lambda x: x[1]["count"])[0] if by_type else None
        }
 
    def _generate_recommendations(self, findings: List[CorrelatedVulnerability]) -> List[Dict]:
        recommendations = []
 
        critical_count = sum(1 for f in findings if f.combined_severity == Severity.CRITICAL)
        if critical_count > 0:
            recommendations.append({
                "priority": "IMEDIAT",
                "action": f"Rezolva {critical_count} vulnerabilitati critice inainte de deployment",
                "impact": "Previne o potentiala bresa de date sau compromiterea sistemului"
            })
 
        verified_high = [f for f in findings if f.verified and f.combined_severity.value >= Severity.HIGH.value]
        if verified_high:
            recommendations.append({
                "priority": "RIDICAT",
                "action": f"Remediaza {len(verified_high)} probleme verificate de severitate ridicata",
                "impact": "Reduce semnificativ suprafata de atac"
            })
 
        sqli_findings = [f for f in findings if f.primary.vuln_type == VulnerabilityType.SQL_INJECTION]
        if sqli_findings:
            recommendations.append({
                "priority": "RIDICAT",
                "action": "Implementeaza query-uri parametrizate in toate interactiunile cu baza de date",
                "impact": "Elimina vectorul de atac SQL injection"
            })
 
        return recommendations
 
    def export_sarif(self) -> Dict:
        """Exporta descoperirile in format SARIF pentru integrare GitHub."""
        sarif = {
            "$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
            "version": "2.1.0",
            "runs": [{
                "tool": {
                    "driver": {
                        "name": "Unified Security Scanner",
                        "version": "1.0.0",
                        "rules": []
                    }
                },
                "results": []
            }]
        }
 
        rules_map = {}
        for finding in self.report_data.get("findings", []):
            rule_id = f"{finding['type']}-{finding['severity']}"
 
            if rule_id not in rules_map:
                rules_map[rule_id] = {
                    "id": rule_id,
                    "name": finding["title"],
                    "shortDescription": {"text": finding["description"]},
                    "defaultConfiguration": {"level": self._sarif_level(finding["severity"])}
                }
                sarif["runs"][0]["tool"]["driver"]["rules"].append(rules_map[rule_id])
 
            result = {
                "ruleId": rule_id,
                "message": {"text": finding["description"]},
                "level": self._sarif_level(finding["severity"])
            }
 
            if "code_location" in finding:
                result["locations"] = [{
                    "physicalLocation": {
                        "artifactLocation": {"uri": finding["code_location"]["file"]},
                        "region": {"startLine": finding["code_location"]["line"]}
                    }
                }]
 
            sarif["runs"][0]["results"].append(result)
 
        return sarif
 
    def _sarif_level(self, severity: str) -> str:
        level_map = {
            "CRITICAL": "error",
            "HIGH": "error",
            "MEDIUM": "warning",
            "LOW": "note",
            "INFO": "note"
        }
        return level_map.get(severity, "warning")

Concluzie

Combinarea SAST si DAST ofera acoperire completa de securitate. SAST gaseste probleme devreme in cod, in timp ce DAST valideaza exploatabilitatea la runtime. Corelatia dintre descoperiri creste increderea si prioritizeaza remedierea. Integreaza ambele in pipeline-uri CI/CD cu raportare unificata pentru asigurare continua a securitatii.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.