NIST Cybersecurity Framework 2.0 oferă o abordare completă pentru gestionarea riscurilor de securitate cibernetică. Acest ghid acoperă implementarea practică a celor șase funcții de bază pentru organizațiile software.
Implementarea nucleului Framework-ului
Implementează funcțiile de bază NIST CSF:
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional
from enum import Enum
class CoreFunction(Enum):
GOVERN = "GV"
IDENTIFY = "ID"
PROTECT = "PR"
DETECT = "DE"
RESPOND = "RS"
RECOVER = "RC"
class ImplementationTier(Enum):
PARTIAL = 1
RISK_INFORMED = 2
REPEATABLE = 3
ADAPTIVE = 4
@dataclass
class SubCategory:
id: str
description: str
implementation_status: str # not_implemented, partial, implemented
current_tier: ImplementationTier
target_tier: ImplementationTier
controls: List[str]
evidence: List[str]
gaps: List[str]
owner: str
review_date: Optional[datetime] = None
@dataclass
class Category:
id: str
name: str
function: CoreFunction
subcategories: List[SubCategory]
class NISTCSFImplementation:
def __init__(self, organization_name: str):
self.organization_name = organization_name
self.categories = self._initialize_framework()
self.current_profile: Dict = {}
self.target_profile: Dict = {}
def _initialize_framework(self) -> Dict[str, Category]:
"""Initialize NIST CSF 2.0 categories and subcategories."""
return {
# GOVERN Function (New in 2.0)
"GV.OC": Category(
id="GV.OC",
name="Organizational Context",
function=CoreFunction.GOVERN,
subcategories=[
SubCategory(
id="GV.OC-01",
description="Mission is understood and informs cybersecurity risk management",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=[],
evidence=[],
gaps=[],
owner=""
),
SubCategory(
id="GV.OC-02",
description="Internal and external stakeholders are understood",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=[],
evidence=[],
gaps=[],
owner=""
)
]
),
"GV.RM": Category(
id="GV.RM",
name="Risk Management Strategy",
function=CoreFunction.GOVERN,
subcategories=[
SubCategory(
id="GV.RM-01",
description="Risk management objectives are established and agreed upon",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.RISK_INFORMED,
controls=[],
evidence=[],
gaps=[],
owner=""
)
]
),
# IDENTIFY Function
"ID.AM": Category(
id="ID.AM",
name="Asset Management",
function=CoreFunction.IDENTIFY,
subcategories=[
SubCategory(
id="ID.AM-01",
description="Inventories of hardware managed by the organization",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=["CMDB", "Asset discovery tools"],
evidence=[],
gaps=[],
owner=""
),
SubCategory(
id="ID.AM-02",
description="Inventories of software and services managed by the organization",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=["Software inventory", "License management"],
evidence=[],
gaps=[],
owner=""
),
SubCategory(
id="ID.AM-07",
description="Data classification scheme established and used",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=["Data classification policy", "DLP"],
evidence=[],
gaps=[],
owner=""
)
]
),
"ID.RA": Category(
id="ID.RA",
name="Risk Assessment",
function=CoreFunction.IDENTIFY,
subcategories=[
SubCategory(
id="ID.RA-01",
description="Vulnerabilities in assets are identified, validated, and recorded",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=["Vulnerability scanning", "Penetration testing"],
evidence=[],
gaps=[],
owner=""
),
SubCategory(
id="ID.RA-03",
description="Threats are identified and documented",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.RISK_INFORMED,
controls=["Threat intelligence", "Threat modeling"],
evidence=[],
gaps=[],
owner=""
)
]
),
# PROTECT Function
"PR.AA": Category(
id="PR.AA",
name="Identity Management and Access Control",
function=CoreFunction.PROTECT,
subcategories=[
SubCategory(
id="PR.AA-01",
description="Identities and credentials for users and services managed",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=["IAM", "SSO", "MFA"],
evidence=[],
gaps=[],
owner=""
),
SubCategory(
id="PR.AA-05",
description="Access permissions are managed with least privilege",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=["RBAC", "Access reviews"],
evidence=[],
gaps=[],
owner=""
)
]
),
"PR.DS": Category(
id="PR.DS",
name="Data Security",
function=CoreFunction.PROTECT,
subcategories=[
SubCategory(
id="PR.DS-01",
description="Data-at-rest is protected",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=["Encryption at rest", "Key management"],
evidence=[],
gaps=[],
owner=""
),
SubCategory(
id="PR.DS-02",
description="Data-in-transit is protected",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=["TLS", "VPN", "Certificate management"],
evidence=[],
gaps=[],
owner=""
)
]
),
# DETECT Function
"DE.CM": Category(
id="DE.CM",
name="Continuous Monitoring",
function=CoreFunction.DETECT,
subcategories=[
SubCategory(
id="DE.CM-01",
description="Networks are monitored to detect potential cybersecurity events",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=["SIEM", "IDS/IPS", "NDR"],
evidence=[],
gaps=[],
owner=""
),
SubCategory(
id="DE.CM-06",
description="External service provider activity is monitored",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.RISK_INFORMED,
controls=["Vendor monitoring", "API monitoring"],
evidence=[],
gaps=[],
owner=""
)
]
),
# RESPOND Function
"RS.MA": Category(
id="RS.MA",
name="Incident Management",
function=CoreFunction.RESPOND,
subcategories=[
SubCategory(
id="RS.MA-01",
description="Incident response plan is executed",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=["IR plan", "IR team", "Playbooks"],
evidence=[],
gaps=[],
owner=""
)
]
),
# RECOVER Function
"RC.RP": Category(
id="RC.RP",
name="Incident Recovery Plan Execution",
function=CoreFunction.RECOVER,
subcategories=[
SubCategory(
id="RC.RP-01",
description="Recovery plan is executed during or after an incident",
implementation_status="not_implemented",
current_tier=ImplementationTier.PARTIAL,
target_tier=ImplementationTier.REPEATABLE,
controls=["BCP", "DR plan", "Backup systems"],
evidence=[],
gaps=[],
owner=""
)
]
)
}
def create_current_profile(self) -> Dict:
"""Create current state profile."""
profile = {
"profile_type": "current",
"created_date": datetime.utcnow().isoformat(),
"organization": self.organization_name,
"functions": {}
}
for func in CoreFunction:
profile["functions"][func.value] = {
"categories": {},
"overall_tier": ImplementationTier.PARTIAL.value
}
for cat_id, category in self.categories.items():
func_key = category.function.value
profile["functions"][func_key]["categories"][cat_id] = {
"name": category.name,
"subcategories": {}
}
for subcat in category.subcategories:
profile["functions"][func_key]["categories"][cat_id]["subcategories"][subcat.id] = {
"status": subcat.implementation_status,
"tier": subcat.current_tier.value,
"controls": subcat.controls,
"evidence": subcat.evidence
}
self.current_profile = profile
return profile
def create_target_profile(self, target_tier: ImplementationTier = ImplementationTier.REPEATABLE) -> Dict:
"""Create target state profile."""
profile = {
"profile_type": "target",
"created_date": datetime.utcnow().isoformat(),
"organization": self.organization_name,
"target_tier": target_tier.value,
"functions": {}
}
for func in CoreFunction:
profile["functions"][func.value] = {
"target_tier": target_tier.value,
"categories": {}
}
for cat_id, category in self.categories.items():
func_key = category.function.value
profile["functions"][func_key]["categories"][cat_id] = {
"name": category.name,
"subcategories": {}
}
for subcat in category.subcategories:
profile["functions"][func_key]["categories"][cat_id]["subcategories"][subcat.id] = {
"target_tier": subcat.target_tier.value,
"required_controls": subcat.controls,
"priority": self._calculate_priority(subcat)
}
self.target_profile = profile
return profile
def _calculate_priority(self, subcat: SubCategory) -> str:
"""Calculate implementation priority."""
tier_gap = subcat.target_tier.value - subcat.current_tier.value
if tier_gap >= 2:
return "high"
elif tier_gap == 1:
return "medium"
else:
return "low"
def gap_analysis(self) -> Dict:
"""Perform gap analysis between current and target profiles."""
gaps = {
"analysis_date": datetime.utcnow().isoformat(),
"summary": {
"total_gaps": 0,
"high_priority": 0,
"medium_priority": 0,
"low_priority": 0
},
"by_function": {},
"detailed_gaps": []
}
for func in CoreFunction:
gaps["by_function"][func.value] = {"gaps": 0, "items": []}
for cat_id, category in self.categories.items():
for subcat in category.subcategories:
if subcat.current_tier.value < subcat.target_tier.value:
gap_item = {
"subcategory_id": subcat.id,
"description": subcat.description,
"category": category.name,
"function": category.function.value,
"current_tier": subcat.current_tier.value,
"target_tier": subcat.target_tier.value,
"gap_size": subcat.target_tier.value - subcat.current_tier.value,
"priority": self._calculate_priority(subcat),
"required_controls": subcat.controls,
"current_gaps": subcat.gaps
}
gaps["detailed_gaps"].append(gap_item)
gaps["by_function"][category.function.value]["gaps"] += 1
gaps["by_function"][category.function.value]["items"].append(subcat.id)
gaps["summary"]["total_gaps"] += 1
priority = self._calculate_priority(subcat)
gaps["summary"][f"{priority}_priority"] += 1
return gaps
def generate_implementation_roadmap(self, months: int = 12) -> Dict:
"""Generate implementation roadmap."""
gap_analysis = self.gap_analysis()
roadmap = {
"duration_months": months,
"phases": [],
"milestones": []
}
# Sort gaps by priority
high_priority = [g for g in gap_analysis["detailed_gaps"] if g["priority"] == "high"]
medium_priority = [g for g in gap_analysis["detailed_gaps"] if g["priority"] == "medium"]
low_priority = [g for g in gap_analysis["detailed_gaps"] if g["priority"] == "low"]
# Phase 1: High priority (months 1-4)
roadmap["phases"].append({
"phase": 1,
"name": "Critical Controls",
"duration": "Months 1-4",
"items": [g["subcategory_id"] for g in high_priority],
"focus": "Implement critical security controls"
})
# Phase 2: Medium priority (months 5-8)
roadmap["phases"].append({
"phase": 2,
"name": "Enhanced Controls",
"duration": "Months 5-8",
"items": [g["subcategory_id"] for g in medium_priority],
"focus": "Enhance security posture"
})
# Phase 3: Low priority and optimization (months 9-12)
roadmap["phases"].append({
"phase": 3,
"name": "Optimization",
"duration": "Months 9-12",
"items": [g["subcategory_id"] for g in low_priority],
"focus": "Optimize and mature controls"
})
# Milestones
roadmap["milestones"] = [
{"month": 2, "milestone": "Core identity and access controls implemented"},
{"month": 4, "milestone": "Detection capabilities operational"},
{"month": 6, "milestone": "Incident response tested"},
{"month": 8, "milestone": "Recovery capabilities validated"},
{"month": 10, "milestone": "Governance framework established"},
{"month": 12, "milestone": "Target profile achieved"}
]
return roadmapDashboard de Monitorizare Continuă
Implementează metrici și monitorizare CSF:
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Dict
import json
@dataclass
class SecurityMetric:
metric_id: str
name: str
category: str
function: CoreFunction
value: float
target: float
trend: str # improving, stable, declining
last_updated: datetime
class CSFMetricsDashboard:
def __init__(self, csf_implementation: NISTCSFImplementation):
self.csf = csf_implementation
self.metrics: Dict[str, SecurityMetric] = {}
self._initialize_metrics()
def _initialize_metrics(self):
"""Initialize security metrics aligned with CSF."""
metrics_config = [
# GOVERN metrics
("GV-01", "Policy Compliance Rate", "GV.PO", CoreFunction.GOVERN, 95.0),
("GV-02", "Risk Assessment Coverage", "GV.RM", CoreFunction.GOVERN, 100.0),
# IDENTIFY metrics
("ID-01", "Asset Inventory Coverage", "ID.AM", CoreFunction.IDENTIFY, 98.0),
("ID-02", "Vulnerability Scan Coverage", "ID.RA", CoreFunction.IDENTIFY, 100.0),
("ID-03", "Critical Vulnerability Remediation Time", "ID.RA", CoreFunction.IDENTIFY, 7.0),
# PROTECT metrics
("PR-01", "MFA Adoption Rate", "PR.AA", CoreFunction.PROTECT, 100.0),
("PR-02", "Privileged Access Review Compliance", "PR.AA", CoreFunction.PROTECT, 100.0),
("PR-03", "Encryption Coverage", "PR.DS", CoreFunction.PROTECT, 100.0),
("PR-04", "Security Training Completion", "PR.AT", CoreFunction.PROTECT, 95.0),
# DETECT metrics
("DE-01", "Mean Time to Detect (hours)", "DE.CM", CoreFunction.DETECT, 1.0),
("DE-02", "Alert Response Rate", "DE.AE", CoreFunction.DETECT, 100.0),
("DE-03", "False Positive Rate", "DE.AE", CoreFunction.DETECT, 10.0),
# RESPOND metrics
("RS-01", "Mean Time to Respond (hours)", "RS.MA", CoreFunction.RESPOND, 4.0),
("RS-02", "Incident Documentation Rate", "RS.AN", CoreFunction.RESPOND, 100.0),
("RS-03", "Containment Effectiveness", "RS.MI", CoreFunction.RESPOND, 95.0),
# RECOVER metrics
("RC-01", "Recovery Time Objective Achievement", "RC.RP", CoreFunction.RECOVER, 100.0),
("RC-02", "Backup Success Rate", "RC.RP", CoreFunction.RECOVER, 99.9),
("RC-03", "DR Test Frequency (per year)", "RC.RP", CoreFunction.RECOVER, 4.0)
]
for metric_id, name, category, function, target in metrics_config:
self.metrics[metric_id] = SecurityMetric(
metric_id=metric_id,
name=name,
category=category,
function=function,
value=0.0,
target=target,
trend="stable",
last_updated=datetime.utcnow()
)
def update_metric(self, metric_id: str, value: float):
"""Update metric value and calculate trend."""
if metric_id not in self.metrics:
raise ValueError(f"Unknown metric: {metric_id}")
metric = self.metrics[metric_id]
old_value = metric.value
# Calculate trend
if value > old_value:
metric.trend = "improving"
elif value < old_value:
metric.trend = "declining"
else:
metric.trend = "stable"
metric.value = value
metric.last_updated = datetime.utcnow()
def get_function_score(self, function: CoreFunction) -> Dict:
"""Calculate score for a CSF function."""
function_metrics = [m for m in self.metrics.values() if m.function == function]
if not function_metrics:
return {"score": 0, "metrics_count": 0}
# Calculate weighted score based on target achievement
total_score = 0
for metric in function_metrics:
if metric.target > 0:
achievement = min(metric.value / metric.target, 1.0) * 100
total_score += achievement
return {
"function": function.value,
"score": total_score / len(function_metrics),
"metrics_count": len(function_metrics),
"metrics_meeting_target": sum(1 for m in function_metrics if m.value >= m.target)
}
def generate_dashboard(self) -> Dict:
"""Generate complete dashboard data."""
dashboard = {
"generated_at": datetime.utcnow().isoformat(),
"organization": self.csf.organization_name,
"overall_score": 0,
"function_scores": {},
"metrics_summary": {
"total": len(self.metrics),
"meeting_target": 0,
"improving": 0,
"declining": 0
},
"detailed_metrics": []
}
# Calculate function scores
total_function_score = 0
for function in CoreFunction:
score_data = self.get_function_score(function)
dashboard["function_scores"][function.value] = score_data
total_function_score += score_data["score"]
dashboard["overall_score"] = total_function_score / len(CoreFunction)
# Metrics summary
for metric in self.metrics.values():
if metric.value >= metric.target:
dashboard["metrics_summary"]["meeting_target"] += 1
if metric.trend == "improving":
dashboard["metrics_summary"]["improving"] += 1
elif metric.trend == "declining":
dashboard["metrics_summary"]["declining"] += 1
dashboard["detailed_metrics"].append({
"id": metric.metric_id,
"name": metric.name,
"function": metric.function.value,
"category": metric.category,
"value": metric.value,
"target": metric.target,
"achievement": (metric.value / metric.target * 100) if metric.target > 0 else 0,
"trend": metric.trend,
"last_updated": metric.last_updated.isoformat()
})
return dashboard
def get_recommendations(self) -> List[Dict]:
"""Generate improvement recommendations."""
recommendations = []
for metric in self.metrics.values():
if metric.value < metric.target:
gap = metric.target - metric.value
priority = "high" if gap / metric.target > 0.3 else "medium" if gap / metric.target > 0.1 else "low"
recommendations.append({
"metric_id": metric.metric_id,
"metric_name": metric.name,
"function": metric.function.value,
"current_value": metric.value,
"target_value": metric.target,
"gap": gap,
"gap_percentage": (gap / metric.target * 100) if metric.target > 0 else 0,
"priority": priority,
"trend": metric.trend,
"recommendation": self._generate_recommendation(metric)
})
# Sort by priority and gap
recommendations.sort(key=lambda x: (-{"high": 3, "medium": 2, "low": 1}[x["priority"]], -x["gap_percentage"]))
return recommendations
def _generate_recommendation(self, metric: SecurityMetric) -> str:
"""Generate specific recommendation for metric."""
recommendations_map = {
"GV-01": "Review and update security policies, conduct policy awareness training",
"ID-01": "Deploy asset discovery tools, implement automated inventory updates",
"ID-02": "Increase vulnerability scan frequency, expand scan coverage",
"ID-03": "Implement automated patching, prioritize critical vulnerability remediation",
"PR-01": "Enforce MFA for all users, implement adaptive authentication",
"PR-02": "Schedule regular access reviews, implement just-in-time access",
"PR-03": "Deploy encryption for all data stores, implement key rotation",
"PR-04": "Launch security awareness program, require annual training completion",
"DE-01": "Tune detection rules, implement behavioral analytics",
"DE-02": "Review alert triage process, automate response for common alerts",
"DE-03": "Tune detection signatures, implement machine learning for anomaly detection",
"RS-01": "Automate incident response, conduct regular IR drills",
"RS-02": "Implement incident management system, require documentation",
"RS-03": "Update containment playbooks, automate containment actions",
"RC-01": "Test recovery procedures, update RTO requirements",
"RC-02": "Review backup infrastructure, implement backup verification",
"RC-03": "Schedule regular DR tests, document test results"
}
return recommendations_map.get(metric.metric_id, "Review and improve control implementation")Concluzie
Implementarea NIST CSF 2.0 necesită o evaluare sistematică a tuturor celor șase funcții, dezvoltarea profilurilor, analiza gap-urilor și monitorizare continuă. Începe prin stabilirea guvernanței și înțelegerea contextului organizațional. Creează profiluri curente și țintă pentru a ghida implementarea. Folosește metrici pentru a urmări progresul și a identifica zonele care necesită atenție. Nu uita că framework-ul este conceput să fie adaptiv - revizuiește și actualizează periodic implementarea pe măsură ce amenințările evoluează și organizația ta se schimbă.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →