Threat Modeling DevSecOps: Integrarea Designului de Securitate in Pipeline-urile CI/CD
Threat modeling identifica amenintarile potentiale de securitate inca din faza de dezvoltare. Acest ghid arata cum sa integrezi threat modeling in pipeline-urile DevSecOps pentru validarea continua a securitatii.
Framework-ul de Threat Modeling STRIDE
Analiza STRIDE Automatizata
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional
import json
class ThreatCategory(Enum):
SPOOFING = "Spoofing"
TAMPERING = "Tampering"
REPUDIATION = "Repudiation"
INFORMATION_DISCLOSURE = "Information Disclosure"
DENIAL_OF_SERVICE = "Denial of Service"
ELEVATION_OF_PRIVILEGE = "Elevation of Privilege"
@dataclass
class Component:
id: str
name: str
type: str # web_app, api, database, external_service, user
trust_level: int # 0-10, mai mare = mai de incredere
data_sensitivity: str # public, internal, confidential, restricted
authentication_required: bool = True
connections: List[str] = field(default_factory=list)
@dataclass
class DataFlow:
id: str
source: str
destination: str
protocol: str
data_type: str
encrypted: bool = False
authenticated: bool = False
@dataclass
class Threat:
id: str
category: ThreatCategory
component_id: str
description: str
risk_score: float
mitigations: List[str]
status: str = "identified"
class STRIDEAnalyzer:
def __init__(self):
self.components: Dict[str, Component] = {}
self.data_flows: List[DataFlow] = []
self.threats: List[Threat] = []
self.threat_counter = 0
def add_component(self, component: Component):
self.components[component.id] = component
def add_data_flow(self, flow: DataFlow):
self.data_flows.append(flow)
def analyze(self) -> List[Threat]:
"""Ruleaza analiza STRIDE completa."""
self.threats = []
for component in self.components.values():
self._analyze_spoofing(component)
self._analyze_tampering(component)
self._analyze_repudiation(component)
self._analyze_information_disclosure(component)
self._analyze_denial_of_service(component)
self._analyze_elevation_of_privilege(component)
for flow in self.data_flows:
self._analyze_data_flow_threats(flow)
return sorted(self.threats, key=lambda t: t.risk_score, reverse=True)
def _generate_threat_id(self) -> str:
self.threat_counter += 1
return f"THR-{self.threat_counter:04d}"
def _analyze_spoofing(self, component: Component):
"""Analizeaza amenintarile de spoofing."""
if component.type == "user":
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.SPOOFING,
component_id=component.id,
description=f"Un atacator ar putea impersona utilizatorul legitim '{component.name}'",
risk_score=self._calculate_risk(component, 8),
mitigations=[
"Implementeaza autentificare multi-factor",
"Foloseste politici puternice de parole",
"Implementeaza mecanisme de blocare a contului",
"Monitorizeaza pattern-uri suspecte de autentificare"
]
))
if component.type == "api" and not component.authentication_required:
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.SPOOFING,
component_id=component.id,
description=f"API-ul '{component.name}' nu are autentificare, permitand spoofing-ul cererilor",
risk_score=self._calculate_risk(component, 9),
mitigations=[
"Implementeaza autentificare API (OAuth2, chei API)",
"Foloseste mutual TLS pentru comunicarea service-to-service",
"Valideaza semnaturile cererilor"
]
))
if component.type == "external_service":
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.SPOOFING,
component_id=component.id,
description=f"Serviciul extern '{component.name}' ar putea fi falsificat de un atacator",
risk_score=self._calculate_risk(component, 7),
mitigations=[
"Verifica certificatele SSL/TLS",
"Foloseste certificate pinning",
"Implementeaza validarea semnaturilor webhook"
]
))
def _analyze_tampering(self, component: Component):
"""Analizeaza amenintarile de tampering."""
if component.type == "database":
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.TAMPERING,
component_id=component.id,
description=f"Datele din '{component.name}' ar putea fi modificate fara autorizare",
risk_score=self._calculate_risk(component, 9),
mitigations=[
"Implementeaza securitate la nivel de rand",
"Foloseste logarea de audit a bazei de date",
"Cripteaza datele sensibile in repaus",
"Implementeaza verificari de integritate (checksum-uri, semnaturi)"
]
))
if component.type == "api":
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.TAMPERING,
component_id=component.id,
description=f"Cererile API catre '{component.name}' ar putea fi alterate in tranzit",
risk_score=self._calculate_risk(component, 7),
mitigations=[
"Foloseste HTTPS/TLS pentru toate comunicatiile",
"Implementeaza semnarea cererilor",
"Valideaza datele de input in detaliu",
"Foloseste HMAC pentru integritatea mesajelor"
]
))
def _analyze_repudiation(self, component: Component):
"""Analizeaza amenintarile de repudiere."""
if component.trust_level < 5:
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.REPUDIATION,
component_id=component.id,
description=f"Actiunile efectuate de '{component.name}' s-ar putea sa nu fie logate corespunzator",
risk_score=self._calculate_risk(component, 6),
mitigations=[
"Implementeaza logare completa de audit",
"Foloseste logare rezistenta la manipulare (append-only)",
"Include timestamp-uri si identificatori de utilizator",
"Stocheaza logurile intr-o locatie centralizata si securizata"
]
))
if component.data_sensitivity in ["confidential", "restricted"]:
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.REPUDIATION,
component_id=component.id,
description=f"Accesul la datele sensibile din '{component.name}' ar putea fi negat de utilizatori",
risk_score=self._calculate_risk(component, 7),
mitigations=[
"Implementeaza semnaturi digitale pentru operatiuni critice",
"Foloseste loguri de tranzactii non-repudiabile",
"Cere confirmare pentru actiunile sensibile"
]
))
def _analyze_information_disclosure(self, component: Component):
"""Analizeaza amenintarile de divulgare a informatiilor."""
if component.data_sensitivity in ["confidential", "restricted"]:
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.INFORMATION_DISCLOSURE,
component_id=component.id,
description=f"Datele sensibile din '{component.name}' ar putea fi expuse",
risk_score=self._calculate_risk(component, 9),
mitigations=[
"Cripteaza datele in repaus si in tranzit",
"Implementeaza controale de acces adecvate",
"Foloseste mascare de date pentru mediile non-productie",
"Implementeaza controale DLP (Data Loss Prevention)"
]
))
if component.type == "api":
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.INFORMATION_DISCLOSURE,
component_id=component.id,
description=f"API-ul '{component.name}' ar putea scurge date sensibile in raspunsuri sau erori",
risk_score=self._calculate_risk(component, 7),
mitigations=[
"Implementeaza filtrarea raspunsurilor",
"Foloseste mesaje de eroare generice",
"Elimina informatiile de debug in productie",
"Valideaza autorizarea inainte de accesul la date"
]
))
def _analyze_denial_of_service(self, component: Component):
"""Analizeaza amenintarile de denial of service."""
if component.type in ["api", "web_app"]:
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.DENIAL_OF_SERVICE,
component_id=component.id,
description=f"'{component.name}' ar putea fi coplesit de cereri malitioase",
risk_score=self._calculate_risk(component, 7),
mitigations=[
"Implementeaza rate limiting",
"Foloseste servicii de protectie DDoS",
"Implementeaza coada de cereri",
"Seteaza limite de resurse si timeout-uri",
"Foloseste infrastructura cu auto-scaling"
]
))
if component.type == "database":
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.DENIAL_OF_SERVICE,
component_id=component.id,
description=f"Baza de date '{component.name}' ar putea fi epuizata de query-uri costisitoare",
risk_score=self._calculate_risk(component, 8),
mitigations=[
"Implementeaza timeout-uri pentru query-uri",
"Foloseste connection pooling",
"Limiteaza complexitatea query-urilor",
"Implementeaza straturi de caching"
]
))
def _analyze_elevation_of_privilege(self, component: Component):
"""Analizeaza amenintarile de escaladare a privilegiilor."""
if component.type == "api":
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.ELEVATION_OF_PRIVILEGE,
component_id=component.id,
description=f"Un atacator ar putea obtine acces elevat prin '{component.name}'",
risk_score=self._calculate_risk(component, 9),
mitigations=[
"Implementeaza principiul privilegiului minim",
"Foloseste control al accesului bazat pe roluri (RBAC)",
"Valideaza permisiunile la fiecare cerere",
"Implementeaza managementul corect al sesiunilor"
]
))
if component.type == "web_app":
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.ELEVATION_OF_PRIVILEGE,
component_id=component.id,
description=f"Atacurile client-side ar putea escalada privilegiile in '{component.name}'",
risk_score=self._calculate_risk(component, 8),
mitigations=[
"Implementeaza verificari de autorizare pe server",
"Foloseste Content Security Policy (CSP)",
"Sanitizeaza toate input-urile utilizatorilor",
"Implementeaza politici CORS corecte"
]
))
def _analyze_data_flow_threats(self, flow: DataFlow):
"""Analizeaza amenintarile specifice fluxurilor de date."""
source = self.components.get(flow.source)
dest = self.components.get(flow.destination)
if not source or not dest:
return
# Verifica fluxuri de date sensibile necriptate
if not flow.encrypted and dest.data_sensitivity in ["confidential", "restricted"]:
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.INFORMATION_DISCLOSURE,
component_id=flow.id,
description=f"Datele sensibile curg necriptate de la {source.name} la {dest.name}",
risk_score=9.0,
mitigations=[
"Activeaza criptarea TLS/SSL",
"Foloseste VPN pentru comunicatiile interne",
"Implementeaza criptare end-to-end pentru datele sensibile"
]
))
# Verifica traversarile limitelor de incredere
if abs(source.trust_level - dest.trust_level) > 3:
self.threats.append(Threat(
id=self._generate_threat_id(),
category=ThreatCategory.TAMPERING,
component_id=flow.id,
description=f"Datele traverseaza o limita semnificativa de incredere: {source.name} -> {dest.name}",
risk_score=7.5,
mitigations=[
"Implementeaza validare stricta a input-urilor la granita",
"Foloseste semnarea/verificarea mesajelor",
"Adauga autentificare suplimentara la granita"
]
))
def _calculate_risk(self, component: Component, base_score: float) -> float:
"""Calculeaza scorul de risc pe baza proprietatilor componentei."""
score = base_score
# Ajusteaza pentru sensibilitatea datelor
sensitivity_multiplier = {
"public": 0.5,
"internal": 0.75,
"confidential": 1.0,
"restricted": 1.25
}
score *= sensitivity_multiplier.get(component.data_sensitivity, 1.0)
# Ajusteaza pentru nivelul de incredere (incredere mai mica = risc mai mare)
score *= (11 - component.trust_level) / 10
return min(10.0, round(score, 1))
def generate_report(self) -> Dict:
"""Genereaza raportul de threat modeling."""
threats_by_category = {}
for threat in self.threats:
category = threat.category.value
if category not in threats_by_category:
threats_by_category[category] = []
threats_by_category[category].append({
"id": threat.id,
"component": threat.component_id,
"description": threat.description,
"risk_score": threat.risk_score,
"mitigations": threat.mitigations,
"status": threat.status
})
high_risk = [t for t in self.threats if t.risk_score >= 8.0]
medium_risk = [t for t in self.threats if 5.0 <= t.risk_score < 8.0]
low_risk = [t for t in self.threats if t.risk_score < 5.0]
return {
"summary": {
"total_threats": len(self.threats),
"high_risk": len(high_risk),
"medium_risk": len(medium_risk),
"low_risk": len(low_risk),
"components_analyzed": len(self.components),
"data_flows_analyzed": len(self.data_flows)
},
"threats_by_category": threats_by_category,
"high_risk_threats": [
{"id": t.id, "description": t.description, "score": t.risk_score}
for t in high_risk
],
"recommended_priorities": self._prioritize_mitigations()
}
def _prioritize_mitigations(self) -> List[Dict]:
"""Prioritizeaza mitigarile pe baza impactului."""
mitigation_impact = {}
for threat in self.threats:
for mitigation in threat.mitigations:
if mitigation not in mitigation_impact:
mitigation_impact[mitigation] = {
"count": 0,
"total_risk_reduced": 0,
"threats": []
}
mitigation_impact[mitigation]["count"] += 1
mitigation_impact[mitigation]["total_risk_reduced"] += threat.risk_score
mitigation_impact[mitigation]["threats"].append(threat.id)
prioritized = [
{
"mitigation": m,
"threats_addressed": data["count"],
"risk_reduction": round(data["total_risk_reduced"], 1),
"threat_ids": data["threats"]
}
for m, data in mitigation_impact.items()
]
return sorted(prioritized, key=lambda x: x["risk_reduction"], reverse=True)[:10]
# Exemplu de utilizare
def analyze_web_application():
analyzer = STRIDEAnalyzer()
# Defineste componentele
analyzer.add_component(Component(
id="user",
name="End User",
type="user",
trust_level=2,
data_sensitivity="public",
authentication_required=False
))
analyzer.add_component(Component(
id="web_app",
name="Web Application",
type="web_app",
trust_level=6,
data_sensitivity="confidential",
authentication_required=True
))
analyzer.add_component(Component(
id="api",
name="Backend API",
type="api",
trust_level=7,
data_sensitivity="confidential",
authentication_required=True
))
analyzer.add_component(Component(
id="database",
name="PostgreSQL Database",
type="database",
trust_level=9,
data_sensitivity="restricted"
))
# Defineste fluxurile de date
analyzer.add_data_flow(DataFlow(
id="flow_1",
source="user",
destination="web_app",
protocol="HTTPS",
data_type="user_credentials",
encrypted=True,
authenticated=False
))
analyzer.add_data_flow(DataFlow(
id="flow_2",
source="web_app",
destination="api",
protocol="HTTPS",
data_type="api_request",
encrypted=True,
authenticated=True
))
# Ruleaza analiza
threats = analyzer.analyze()
report = analyzer.generate_report()
print(json.dumps(report, indent=2))
return reportIntegrare CI/CD
Threat Model as Code
# threat-model.yaml
version: "1.0"
application: "ecommerce-platform"
team: "platform-engineering"
components:
- id: web_frontend
name: "Web Frontend"
type: web_app
trust_level: 5
data_sensitivity: internal
technologies:
- React
- TypeScript
- id: api_gateway
name: "API Gateway"
type: api
trust_level: 6
data_sensitivity: confidential
authentication:
type: OAuth2
mfa_enabled: true
- id: user_service
name: "User Service"
type: api
trust_level: 7
data_sensitivity: restricted
- id: payment_service
name: "Payment Service"
type: api
trust_level: 8
data_sensitivity: restricted
pci_scope: true
- id: postgres_db
name: "Primary Database"
type: database
trust_level: 9
data_sensitivity: restricted
encryption_at_rest: true
data_flows:
- from: web_frontend
to: api_gateway
protocol: HTTPS
data_types:
- user_input
- session_token
encrypted: true
- from: api_gateway
to: user_service
protocol: gRPC
data_types:
- user_data
- authentication
encrypted: true
mutual_tls: true
- from: api_gateway
to: payment_service
protocol: gRPC
data_types:
- payment_info
- pii
encrypted: true
mutual_tls: true
trust_boundaries:
- name: "Internet Boundary"
components:
- web_frontend
external: true
- name: "Service Mesh"
components:
- api_gateway
- user_service
- payment_service
- name: "Data Layer"
components:
- postgres_db
security_controls:
authentication:
- OAuth2 with PKCE
- JWT validation
- API key management
authorization:
- RBAC
- Policy-based access control
encryption:
- TLS 1.3 minimum
- AES-256 at rest
monitoring:
- Centralized logging
- SIEM integration
- Anomaly detectionAnaliza Amenintarilor cu GitHub Actions
# .github/workflows/threat-model.yml
name: Threat Model Analysis
on:
pull_request:
paths:
- 'threat-model.yaml'
- 'src/**'
- 'infrastructure/**'
push:
branches: [main]
schedule:
- cron: '0 6 * * 1' # Saptamanal, Luni la 6 AM
jobs:
analyze-threats:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install pyyaml jsonschema
- name: Validate threat model schema
run: |
python scripts/validate_threat_model.py threat-model.yaml
- name: Run STRIDE analysis
id: stride
run: |
python scripts/stride_analyzer.py threat-model.yaml > threat-report.json
# Extrage rezumatul pentru comentariul pe PR
HIGH_RISK=$(jq '.summary.high_risk' threat-report.json)
MEDIUM_RISK=$(jq '.summary.medium_risk' threat-report.json)
TOTAL=$(jq '.summary.total_threats' threat-report.json)
echo "high_risk=$HIGH_RISK" >> $GITHUB_OUTPUT
echo "medium_risk=$MEDIUM_RISK" >> $GITHUB_OUTPUT
echo "total=$TOTAL" >> $GITHUB_OUTPUT
- name: Check threat thresholds
run: |
HIGH_RISK=${{ steps.stride.outputs.high_risk }}
if [ "$HIGH_RISK" -gt 5 ]; then
echo "::error::Prea multe amenintari de risc ridicat identificate ($HIGH_RISK)"
exit 1
fi
- name: Upload threat report
uses: actions/upload-artifact@v4
with:
name: threat-report
path: threat-report.json
- name: Comment on PR
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const report = JSON.parse(fs.readFileSync('threat-report.json', 'utf8'));
const body = `## 🔒 Threat Model Analysis
| Metric | Count |
|--------|-------|
| Total Threats | ${report.summary.total_threats} |
| 🔴 High Risk | ${report.summary.high_risk} |
| 🟡 Medium Risk | ${report.summary.medium_risk} |
| 🟢 Low Risk | ${report.summary.low_risk} |
### Top Mitigari Recomandate
${report.recommended_priorities.slice(0, 5).map((m, i) =>
`${i + 1}. **${m.mitigation}** - Adreseaza ${m.threats_addressed} amenintari`
).join('\n')}
<details>
<summary>Amenintari de Risc Ridicat</summary>
${report.high_risk_threats.map(t =>
`- **${t.id}** (Scor: ${t.score}): ${t.description}`
).join('\n')}
</details>
`;
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: body
});
compare-baseline:
runs-on: ubuntu-latest
needs: analyze-threats
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v4
with:
ref: main
- name: Get baseline threat report
run: |
python scripts/stride_analyzer.py threat-model.yaml > baseline-report.json || echo '{"summary":{"total_threats":0}}' > baseline-report.json
- uses: actions/checkout@v4
- name: Download current report
uses: actions/download-artifact@v4
with:
name: threat-report
- name: Compare reports
run: |
python scripts/compare_threats.py baseline-report.json threat-report.jsonScript de Comparatie
# scripts/compare_threats.py
import json
import sys
def compare_reports(baseline_path: str, current_path: str):
with open(baseline_path) as f:
baseline = json.load(f)
with open(current_path) as f:
current = json.load(f)
baseline_total = baseline.get('summary', {}).get('total_threats', 0)
current_total = current.get('summary', {}).get('total_threats', 0)
baseline_high = baseline.get('summary', {}).get('high_risk', 0)
current_high = current.get('summary', {}).get('high_risk', 0)
diff_total = current_total - baseline_total
diff_high = current_high - baseline_high
print(f"Comparatie Amenintari:")
print(f" Total: {baseline_total} -> {current_total} ({'+' if diff_total > 0 else ''}{diff_total})")
print(f" Risc Ridicat: {baseline_high} -> {current_high} ({'+' if diff_high > 0 else ''}{diff_high})")
# Identifica amenintari noi
baseline_ids = set()
for category in baseline.get('threats_by_category', {}).values():
for threat in category:
baseline_ids.add(threat['id'])
new_threats = []
for category in current.get('threats_by_category', {}).values():
for threat in category:
if threat['id'] not in baseline_ids:
new_threats.append(threat)
if new_threats:
print(f"\nAmenintari Noi Identificate ({len(new_threats)}):")
for threat in new_threats:
print(f" - {threat['id']}: {threat['description'][:60]}...")
# Esueaza daca amenintarile de risc ridicat au crescut
if diff_high > 0:
print(f"\nAmenintarile de risc ridicat au crescut cu {diff_high}")
sys.exit(1)
print("\nNicio crestere a amenintarilor de risc ridicat")
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Utilizare: compare_threats.py <baseline.json> <current.json>")
sys.exit(1)
compare_reports(sys.argv[1], sys.argv[2])Generarea Arborilor de Atac
from dataclasses import dataclass, field
from typing import List, Dict, Optional
import json
@dataclass
class AttackNode:
id: str
description: str
node_type: str # goal, sub_goal, attack, defense
probability: float = 0.0
cost: float = 0.0
children: List['AttackNode'] = field(default_factory=list)
operator: str = "OR" # AND, OR
class AttackTreeGenerator:
def __init__(self, threat_report: Dict):
self.threats = threat_report
self.node_counter = 0
def _generate_node_id(self) -> str:
self.node_counter += 1
return f"ATK-{self.node_counter:04d}"
def generate_tree(self, goal: str) -> AttackNode:
"""Genereaza arborele de atac pentru un obiectiv specific."""
root = AttackNode(
id=self._generate_node_id(),
description=goal,
node_type="goal",
operator="OR"
)
# Grupeaza amenintarile conexe
related_threats = self._find_related_threats(goal)
for category, threats in related_threats.items():
sub_goal = AttackNode(
id=self._generate_node_id(),
description=f"Exploatare prin {category}",
node_type="sub_goal",
operator="OR"
)
for threat in threats:
attack = AttackNode(
id=self._generate_node_id(),
description=threat['description'],
node_type="attack",
probability=threat['risk_score'] / 10,
cost=self._estimate_attack_cost(threat)
)
# Adauga noduri de aparare
for mitigation in threat['mitigations'][:3]:
defense = AttackNode(
id=self._generate_node_id(),
description=mitigation,
node_type="defense"
)
attack.children.append(defense)
sub_goal.children.append(attack)
if sub_goal.children:
root.children.append(sub_goal)
return root
def _find_related_threats(self, goal: str) -> Dict[str, List]:
"""Gaseste amenintarile legate de obiectivul atacului."""
goal_lower = goal.lower()
related = {}
keywords = {
"data breach": ["Information Disclosure", "Tampering"],
"account takeover": ["Spoofing", "Elevation of Privilege"],
"service disruption": ["Denial of Service"],
"unauthorized access": ["Spoofing", "Elevation of Privilege"],
"data manipulation": ["Tampering", "Repudiation"]
}
target_categories = []
for key, categories in keywords.items():
if key in goal_lower:
target_categories.extend(categories)
if not target_categories:
target_categories = list(self.threats.get('threats_by_category', {}).keys())
for category in target_categories:
if category in self.threats.get('threats_by_category', {}):
related[category] = self.threats['threats_by_category'][category]
return related
def _estimate_attack_cost(self, threat: Dict) -> float:
"""Estimeaza costul/efortul de executare a atacului."""
base_cost = 5.0
# Riscul mai mare inseamna de obicei exploatare mai usoara
risk_adjustment = (10 - threat['risk_score']) * 0.5
return base_cost + risk_adjustment
def to_mermaid(self, root: AttackNode) -> str:
"""Converteste arborele de atac in diagrama Mermaid."""
lines = ["graph TD"]
def process_node(node: AttackNode, parent_id: Optional[str] = None):
node_label = f"{node.id}[\"{node.description[:40]}...\"]" if len(node.description) > 40 else f"{node.id}[\"{node.description}\"]"
# Stil bazat pe tipul nodului
styles = {
"goal": ":::goal",
"sub_goal": ":::subgoal",
"attack": ":::attack",
"defense": ":::defense"
}
lines.append(f" {node_label}{styles.get(node.node_type, '')}")
if parent_id:
connector = "-->|AND|" if node.operator == "AND" else "-->"
lines.append(f" {parent_id} {connector} {node.id}")
for child in node.children:
process_node(child, node.id)
process_node(root)
# Adauga stilurile
lines.extend([
"",
" classDef goal fill:#ff6b6b,stroke:#333,stroke-width:2px",
" classDef subgoal fill:#ffd93d,stroke:#333",
" classDef attack fill:#ff8c42,stroke:#333",
" classDef defense fill:#6bcb77,stroke:#333"
])
return "\n".join(lines)
# Genereaza arbori de atac din raportul de amenintari
def generate_attack_trees(threat_report: Dict):
generator = AttackTreeGenerator(threat_report)
attack_goals = [
"Realizarea unei brese de date a informatiilor clientilor",
"Obtinerea accesului administrativ neautorizat",
"Perturbarea disponibilitatii serviciului"
]
trees = {}
for goal in attack_goals:
tree = generator.generate_tree(goal)
trees[goal] = {
"tree": tree,
"mermaid": generator.to_mermaid(tree)
}
return treesValidarea Continua a Amenintarilor
import subprocess
import json
from typing import List, Dict
class ThreatValidator:
def __init__(self, threat_model_path: str):
with open(threat_model_path) as f:
self.model = json.load(f) if threat_model_path.endswith('.json') else {}
def validate_mitigations(self) -> List[Dict]:
"""Valideaza ca mitigarile sunt implementate."""
results = []
validation_checks = {
"Implement multi-factor authentication": self._check_mfa,
"Use HTTPS/TLS": self._check_tls,
"Implement rate limiting": self._check_rate_limiting,
"Enable audit logging": self._check_audit_logging,
"Encrypt data at rest": self._check_encryption_at_rest
}
for mitigation, check_func in validation_checks.items():
try:
is_implemented, details = check_func()
results.append({
"mitigation": mitigation,
"implemented": is_implemented,
"details": details
})
except Exception as e:
results.append({
"mitigation": mitigation,
"implemented": False,
"details": f"Verificarea a esuat: {str(e)}"
})
return results
def _check_mfa(self) -> tuple:
"""Verifica daca MFA este configurat."""
try:
result = subprocess.run(
["grep", "-r", "mfa", "src/auth/", "--include=*.ts"],
capture_output=True, text=True
)
has_mfa = bool(result.stdout)
return has_mfa, "Configurare MFA gasita" if has_mfa else "Nicio configurare MFA detectata"
except:
return False, "Nu s-a putut verifica configurarea MFA"
def _check_tls(self) -> tuple:
"""Verifica configurarea TLS."""
try:
result = subprocess.run(
["grep", "-r", "ssl_certificate", "infrastructure/"],
capture_output=True, text=True
)
has_tls = bool(result.stdout)
return has_tls, "Certificate TLS configurate" if has_tls else "Nicio configurare TLS gasita"
except:
return False, "Nu s-a putut verifica configurarea TLS"
def _check_rate_limiting(self) -> tuple:
"""Verifica implementarea rate limiting."""
try:
result = subprocess.run(
["grep", "-r", "rateLimit", "src/", "--include=*.ts"],
capture_output=True, text=True
)
has_rate_limit = bool(result.stdout)
return has_rate_limit, "Rate limiting implementat" if has_rate_limit else "Niciun rate limiting gasit"
except:
return False, "Nu s-a putut verifica rate limiting"
def _check_audit_logging(self) -> tuple:
"""Verifica configurarea logarii de audit."""
try:
result = subprocess.run(
["grep", "-r", "audit", "src/", "--include=*.ts"],
capture_output=True, text=True
)
has_audit = bool(result.stdout)
return has_audit, "Logare de audit gasita" if has_audit else "Nicio logare de audit detectata"
except:
return False, "Nu s-a putut verifica logarea de audit"
def _check_encryption_at_rest(self) -> tuple:
"""Verifica configurarea criptarii in repaus."""
try:
result = subprocess.run(
["grep", "-r", "encrypt", "infrastructure/database/"],
capture_output=True, text=True
)
has_encryption = bool(result.stdout)
return has_encryption, "Criptare configurata" if has_encryption else "Nicio configurare de criptare gasita"
except:
return False, "Nu s-a putut verifica criptarea"
def generate_validation_report(self) -> Dict:
"""Genereaza raportul complet de validare."""
validations = self.validate_mitigations()
implemented = [v for v in validations if v['implemented']]
not_implemented = [v for v in validations if not v['implemented']]
return {
"summary": {
"total_checks": len(validations),
"implemented": len(implemented),
"not_implemented": len(not_implemented),
"coverage": f"{len(implemented) / len(validations) * 100:.1f}%"
},
"implemented_mitigations": implemented,
"missing_mitigations": not_implemented,
"recommendations": [
f"Implementeaza: {v['mitigation']}" for v in not_implemented
]
}Bune Practici
Integrarea Threat Modeling
- Incepe devreme: Incepe threat modeling in faza de design
- Itereaza continuu: Actualizeaza modelele pe masura ce arhitectura evolueaza
- Automatizeaza validarea: Verifica mitigarile in CI/CD
- Urmareste metricile: Monitorizeaza numarul de amenintari in timp
- Implica toti stakeholder-ii: Include dezvoltatori, securitate si operatiuni
Acoperire STRIDE
- Asigura-te ca toate componentele sunt analizate pentru fiecare categorie STRIDE
- Concentreaza-te pe limitele de incredere si fluxurile de date
- Documenteaza presupunerile si riscurile acceptate
- Revizuieste mitigarile pentru eficacitate
Integrarea threat modeling in pipeline-urile DevSecOps asigura ca securitatea este luata in considerare pe tot parcursul ciclului de viata al dezvoltarii, nu doar la deployment.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →