Compliance

Ghid de Implementare NIST Cybersecurity Framework 2.0

Nicu Constantin
--10 min lectura
#nist-csf#cybersecurity#compliance#risk-management#security-framework

NIST Cybersecurity Framework 2.0 oferă o abordare completă pentru gestionarea riscurilor de securitate cibernetică. Acest ghid acoperă implementarea practică a celor șase funcții de bază pentru organizațiile software.

Implementarea nucleului Framework-ului

Implementează funcțiile de bază NIST CSF:

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional
from enum import Enum
 
class CoreFunction(Enum):
    GOVERN = "GV"
    IDENTIFY = "ID"
    PROTECT = "PR"
    DETECT = "DE"
    RESPOND = "RS"
    RECOVER = "RC"
 
class ImplementationTier(Enum):
    PARTIAL = 1
    RISK_INFORMED = 2
    REPEATABLE = 3
    ADAPTIVE = 4
 
@dataclass
class SubCategory:
    id: str
    description: str
    implementation_status: str  # not_implemented, partial, implemented
    current_tier: ImplementationTier
    target_tier: ImplementationTier
    controls: List[str]
    evidence: List[str]
    gaps: List[str]
    owner: str
    review_date: Optional[datetime] = None
 
@dataclass
class Category:
    id: str
    name: str
    function: CoreFunction
    subcategories: List[SubCategory]
 
class NISTCSFImplementation:
    def __init__(self, organization_name: str):
        self.organization_name = organization_name
        self.categories = self._initialize_framework()
        self.current_profile: Dict = {}
        self.target_profile: Dict = {}
 
    def _initialize_framework(self) -> Dict[str, Category]:
        """Initialize NIST CSF 2.0 categories and subcategories."""
        return {
            # GOVERN Function (New in 2.0)
            "GV.OC": Category(
                id="GV.OC",
                name="Organizational Context",
                function=CoreFunction.GOVERN,
                subcategories=[
                    SubCategory(
                        id="GV.OC-01",
                        description="Mission is understood and informs cybersecurity risk management",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=[],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="GV.OC-02",
                        description="Internal and external stakeholders are understood",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=[],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            "GV.RM": Category(
                id="GV.RM",
                name="Risk Management Strategy",
                function=CoreFunction.GOVERN,
                subcategories=[
                    SubCategory(
                        id="GV.RM-01",
                        description="Risk management objectives are established and agreed upon",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.RISK_INFORMED,
                        controls=[],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            # IDENTIFY Function
            "ID.AM": Category(
                id="ID.AM",
                name="Asset Management",
                function=CoreFunction.IDENTIFY,
                subcategories=[
                    SubCategory(
                        id="ID.AM-01",
                        description="Inventories of hardware managed by the organization",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["CMDB", "Asset discovery tools"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="ID.AM-02",
                        description="Inventories of software and services managed by the organization",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["Software inventory", "License management"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="ID.AM-07",
                        description="Data classification scheme established and used",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["Data classification policy", "DLP"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            "ID.RA": Category(
                id="ID.RA",
                name="Risk Assessment",
                function=CoreFunction.IDENTIFY,
                subcategories=[
                    SubCategory(
                        id="ID.RA-01",
                        description="Vulnerabilities in assets are identified, validated, and recorded",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["Vulnerability scanning", "Penetration testing"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="ID.RA-03",
                        description="Threats are identified and documented",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.RISK_INFORMED,
                        controls=["Threat intelligence", "Threat modeling"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            # PROTECT Function
            "PR.AA": Category(
                id="PR.AA",
                name="Identity Management and Access Control",
                function=CoreFunction.PROTECT,
                subcategories=[
                    SubCategory(
                        id="PR.AA-01",
                        description="Identities and credentials for users and services managed",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["IAM", "SSO", "MFA"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="PR.AA-05",
                        description="Access permissions are managed with least privilege",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["RBAC", "Access reviews"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            "PR.DS": Category(
                id="PR.DS",
                name="Data Security",
                function=CoreFunction.PROTECT,
                subcategories=[
                    SubCategory(
                        id="PR.DS-01",
                        description="Data-at-rest is protected",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["Encryption at rest", "Key management"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="PR.DS-02",
                        description="Data-in-transit is protected",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["TLS", "VPN", "Certificate management"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            # DETECT Function
            "DE.CM": Category(
                id="DE.CM",
                name="Continuous Monitoring",
                function=CoreFunction.DETECT,
                subcategories=[
                    SubCategory(
                        id="DE.CM-01",
                        description="Networks are monitored to detect potential cybersecurity events",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["SIEM", "IDS/IPS", "NDR"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="DE.CM-06",
                        description="External service provider activity is monitored",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.RISK_INFORMED,
                        controls=["Vendor monitoring", "API monitoring"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            # RESPOND Function
            "RS.MA": Category(
                id="RS.MA",
                name="Incident Management",
                function=CoreFunction.RESPOND,
                subcategories=[
                    SubCategory(
                        id="RS.MA-01",
                        description="Incident response plan is executed",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["IR plan", "IR team", "Playbooks"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            # RECOVER Function
            "RC.RP": Category(
                id="RC.RP",
                name="Incident Recovery Plan Execution",
                function=CoreFunction.RECOVER,
                subcategories=[
                    SubCategory(
                        id="RC.RP-01",
                        description="Recovery plan is executed during or after an incident",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["BCP", "DR plan", "Backup systems"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            )
        }
 
    def create_current_profile(self) -> Dict:
        """Create current state profile."""
        profile = {
            "profile_type": "current",
            "created_date": datetime.utcnow().isoformat(),
            "organization": self.organization_name,
            "functions": {}
        }
 
        for func in CoreFunction:
            profile["functions"][func.value] = {
                "categories": {},
                "overall_tier": ImplementationTier.PARTIAL.value
            }
 
        for cat_id, category in self.categories.items():
            func_key = category.function.value
            profile["functions"][func_key]["categories"][cat_id] = {
                "name": category.name,
                "subcategories": {}
            }
 
            for subcat in category.subcategories:
                profile["functions"][func_key]["categories"][cat_id]["subcategories"][subcat.id] = {
                    "status": subcat.implementation_status,
                    "tier": subcat.current_tier.value,
                    "controls": subcat.controls,
                    "evidence": subcat.evidence
                }
 
        self.current_profile = profile
        return profile
 
    def create_target_profile(self, target_tier: ImplementationTier = ImplementationTier.REPEATABLE) -> Dict:
        """Create target state profile."""
        profile = {
            "profile_type": "target",
            "created_date": datetime.utcnow().isoformat(),
            "organization": self.organization_name,
            "target_tier": target_tier.value,
            "functions": {}
        }
 
        for func in CoreFunction:
            profile["functions"][func.value] = {
                "target_tier": target_tier.value,
                "categories": {}
            }
 
        for cat_id, category in self.categories.items():
            func_key = category.function.value
            profile["functions"][func_key]["categories"][cat_id] = {
                "name": category.name,
                "subcategories": {}
            }
 
            for subcat in category.subcategories:
                profile["functions"][func_key]["categories"][cat_id]["subcategories"][subcat.id] = {
                    "target_tier": subcat.target_tier.value,
                    "required_controls": subcat.controls,
                    "priority": self._calculate_priority(subcat)
                }
 
        self.target_profile = profile
        return profile
 
    def _calculate_priority(self, subcat: SubCategory) -> str:
        """Calculate implementation priority."""
        tier_gap = subcat.target_tier.value - subcat.current_tier.value
 
        if tier_gap >= 2:
            return "high"
        elif tier_gap == 1:
            return "medium"
        else:
            return "low"
 
    def gap_analysis(self) -> Dict:
        """Perform gap analysis between current and target profiles."""
        gaps = {
            "analysis_date": datetime.utcnow().isoformat(),
            "summary": {
                "total_gaps": 0,
                "high_priority": 0,
                "medium_priority": 0,
                "low_priority": 0
            },
            "by_function": {},
            "detailed_gaps": []
        }
 
        for func in CoreFunction:
            gaps["by_function"][func.value] = {"gaps": 0, "items": []}
 
        for cat_id, category in self.categories.items():
            for subcat in category.subcategories:
                if subcat.current_tier.value < subcat.target_tier.value:
                    gap_item = {
                        "subcategory_id": subcat.id,
                        "description": subcat.description,
                        "category": category.name,
                        "function": category.function.value,
                        "current_tier": subcat.current_tier.value,
                        "target_tier": subcat.target_tier.value,
                        "gap_size": subcat.target_tier.value - subcat.current_tier.value,
                        "priority": self._calculate_priority(subcat),
                        "required_controls": subcat.controls,
                        "current_gaps": subcat.gaps
                    }
 
                    gaps["detailed_gaps"].append(gap_item)
                    gaps["by_function"][category.function.value]["gaps"] += 1
                    gaps["by_function"][category.function.value]["items"].append(subcat.id)
                    gaps["summary"]["total_gaps"] += 1
 
                    priority = self._calculate_priority(subcat)
                    gaps["summary"][f"{priority}_priority"] += 1
 
        return gaps
 
    def generate_implementation_roadmap(self, months: int = 12) -> Dict:
        """Generate implementation roadmap."""
        gap_analysis = self.gap_analysis()
 
        roadmap = {
            "duration_months": months,
            "phases": [],
            "milestones": []
        }
 
        # Sort gaps by priority
        high_priority = [g for g in gap_analysis["detailed_gaps"] if g["priority"] == "high"]
        medium_priority = [g for g in gap_analysis["detailed_gaps"] if g["priority"] == "medium"]
        low_priority = [g for g in gap_analysis["detailed_gaps"] if g["priority"] == "low"]
 
        # Phase 1: High priority (months 1-4)
        roadmap["phases"].append({
            "phase": 1,
            "name": "Critical Controls",
            "duration": "Months 1-4",
            "items": [g["subcategory_id"] for g in high_priority],
            "focus": "Implement critical security controls"
        })
 
        # Phase 2: Medium priority (months 5-8)
        roadmap["phases"].append({
            "phase": 2,
            "name": "Enhanced Controls",
            "duration": "Months 5-8",
            "items": [g["subcategory_id"] for g in medium_priority],
            "focus": "Enhance security posture"
        })
 
        # Phase 3: Low priority and optimization (months 9-12)
        roadmap["phases"].append({
            "phase": 3,
            "name": "Optimization",
            "duration": "Months 9-12",
            "items": [g["subcategory_id"] for g in low_priority],
            "focus": "Optimize and mature controls"
        })
 
        # Milestones
        roadmap["milestones"] = [
            {"month": 2, "milestone": "Core identity and access controls implemented"},
            {"month": 4, "milestone": "Detection capabilities operational"},
            {"month": 6, "milestone": "Incident response tested"},
            {"month": 8, "milestone": "Recovery capabilities validated"},
            {"month": 10, "milestone": "Governance framework established"},
            {"month": 12, "milestone": "Target profile achieved"}
        ]
 
        return roadmap

Dashboard de Monitorizare Continuă

Implementează metrici și monitorizare CSF:

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Dict
import json
 
@dataclass
class SecurityMetric:
    metric_id: str
    name: str
    category: str
    function: CoreFunction
    value: float
    target: float
    trend: str  # improving, stable, declining
    last_updated: datetime
 
class CSFMetricsDashboard:
    def __init__(self, csf_implementation: NISTCSFImplementation):
        self.csf = csf_implementation
        self.metrics: Dict[str, SecurityMetric] = {}
        self._initialize_metrics()
 
    def _initialize_metrics(self):
        """Initialize security metrics aligned with CSF."""
        metrics_config = [
            # GOVERN metrics
            ("GV-01", "Policy Compliance Rate", "GV.PO", CoreFunction.GOVERN, 95.0),
            ("GV-02", "Risk Assessment Coverage", "GV.RM", CoreFunction.GOVERN, 100.0),
 
            # IDENTIFY metrics
            ("ID-01", "Asset Inventory Coverage", "ID.AM", CoreFunction.IDENTIFY, 98.0),
            ("ID-02", "Vulnerability Scan Coverage", "ID.RA", CoreFunction.IDENTIFY, 100.0),
            ("ID-03", "Critical Vulnerability Remediation Time", "ID.RA", CoreFunction.IDENTIFY, 7.0),
 
            # PROTECT metrics
            ("PR-01", "MFA Adoption Rate", "PR.AA", CoreFunction.PROTECT, 100.0),
            ("PR-02", "Privileged Access Review Compliance", "PR.AA", CoreFunction.PROTECT, 100.0),
            ("PR-03", "Encryption Coverage", "PR.DS", CoreFunction.PROTECT, 100.0),
            ("PR-04", "Security Training Completion", "PR.AT", CoreFunction.PROTECT, 95.0),
 
            # DETECT metrics
            ("DE-01", "Mean Time to Detect (hours)", "DE.CM", CoreFunction.DETECT, 1.0),
            ("DE-02", "Alert Response Rate", "DE.AE", CoreFunction.DETECT, 100.0),
            ("DE-03", "False Positive Rate", "DE.AE", CoreFunction.DETECT, 10.0),
 
            # RESPOND metrics
            ("RS-01", "Mean Time to Respond (hours)", "RS.MA", CoreFunction.RESPOND, 4.0),
            ("RS-02", "Incident Documentation Rate", "RS.AN", CoreFunction.RESPOND, 100.0),
            ("RS-03", "Containment Effectiveness", "RS.MI", CoreFunction.RESPOND, 95.0),
 
            # RECOVER metrics
            ("RC-01", "Recovery Time Objective Achievement", "RC.RP", CoreFunction.RECOVER, 100.0),
            ("RC-02", "Backup Success Rate", "RC.RP", CoreFunction.RECOVER, 99.9),
            ("RC-03", "DR Test Frequency (per year)", "RC.RP", CoreFunction.RECOVER, 4.0)
        ]
 
        for metric_id, name, category, function, target in metrics_config:
            self.metrics[metric_id] = SecurityMetric(
                metric_id=metric_id,
                name=name,
                category=category,
                function=function,
                value=0.0,
                target=target,
                trend="stable",
                last_updated=datetime.utcnow()
            )
 
    def update_metric(self, metric_id: str, value: float):
        """Update metric value and calculate trend."""
        if metric_id not in self.metrics:
            raise ValueError(f"Unknown metric: {metric_id}")
 
        metric = self.metrics[metric_id]
        old_value = metric.value
 
        # Calculate trend
        if value > old_value:
            metric.trend = "improving"
        elif value < old_value:
            metric.trend = "declining"
        else:
            metric.trend = "stable"
 
        metric.value = value
        metric.last_updated = datetime.utcnow()
 
    def get_function_score(self, function: CoreFunction) -> Dict:
        """Calculate score for a CSF function."""
        function_metrics = [m for m in self.metrics.values() if m.function == function]
 
        if not function_metrics:
            return {"score": 0, "metrics_count": 0}
 
        # Calculate weighted score based on target achievement
        total_score = 0
        for metric in function_metrics:
            if metric.target > 0:
                achievement = min(metric.value / metric.target, 1.0) * 100
                total_score += achievement
 
        return {
            "function": function.value,
            "score": total_score / len(function_metrics),
            "metrics_count": len(function_metrics),
            "metrics_meeting_target": sum(1 for m in function_metrics if m.value >= m.target)
        }
 
    def generate_dashboard(self) -> Dict:
        """Generate complete dashboard data."""
        dashboard = {
            "generated_at": datetime.utcnow().isoformat(),
            "organization": self.csf.organization_name,
            "overall_score": 0,
            "function_scores": {},
            "metrics_summary": {
                "total": len(self.metrics),
                "meeting_target": 0,
                "improving": 0,
                "declining": 0
            },
            "detailed_metrics": []
        }
 
        # Calculate function scores
        total_function_score = 0
        for function in CoreFunction:
            score_data = self.get_function_score(function)
            dashboard["function_scores"][function.value] = score_data
            total_function_score += score_data["score"]
 
        dashboard["overall_score"] = total_function_score / len(CoreFunction)
 
        # Metrics summary
        for metric in self.metrics.values():
            if metric.value >= metric.target:
                dashboard["metrics_summary"]["meeting_target"] += 1
 
            if metric.trend == "improving":
                dashboard["metrics_summary"]["improving"] += 1
            elif metric.trend == "declining":
                dashboard["metrics_summary"]["declining"] += 1
 
            dashboard["detailed_metrics"].append({
                "id": metric.metric_id,
                "name": metric.name,
                "function": metric.function.value,
                "category": metric.category,
                "value": metric.value,
                "target": metric.target,
                "achievement": (metric.value / metric.target * 100) if metric.target > 0 else 0,
                "trend": metric.trend,
                "last_updated": metric.last_updated.isoformat()
            })
 
        return dashboard
 
    def get_recommendations(self) -> List[Dict]:
        """Generate improvement recommendations."""
        recommendations = []
 
        for metric in self.metrics.values():
            if metric.value < metric.target:
                gap = metric.target - metric.value
                priority = "high" if gap / metric.target > 0.3 else "medium" if gap / metric.target > 0.1 else "low"
 
                recommendations.append({
                    "metric_id": metric.metric_id,
                    "metric_name": metric.name,
                    "function": metric.function.value,
                    "current_value": metric.value,
                    "target_value": metric.target,
                    "gap": gap,
                    "gap_percentage": (gap / metric.target * 100) if metric.target > 0 else 0,
                    "priority": priority,
                    "trend": metric.trend,
                    "recommendation": self._generate_recommendation(metric)
                })
 
        # Sort by priority and gap
        recommendations.sort(key=lambda x: (-{"high": 3, "medium": 2, "low": 1}[x["priority"]], -x["gap_percentage"]))
 
        return recommendations
 
    def _generate_recommendation(self, metric: SecurityMetric) -> str:
        """Generate specific recommendation for metric."""
        recommendations_map = {
            "GV-01": "Review and update security policies, conduct policy awareness training",
            "ID-01": "Deploy asset discovery tools, implement automated inventory updates",
            "ID-02": "Increase vulnerability scan frequency, expand scan coverage",
            "ID-03": "Implement automated patching, prioritize critical vulnerability remediation",
            "PR-01": "Enforce MFA for all users, implement adaptive authentication",
            "PR-02": "Schedule regular access reviews, implement just-in-time access",
            "PR-03": "Deploy encryption for all data stores, implement key rotation",
            "PR-04": "Launch security awareness program, require annual training completion",
            "DE-01": "Tune detection rules, implement behavioral analytics",
            "DE-02": "Review alert triage process, automate response for common alerts",
            "DE-03": "Tune detection signatures, implement machine learning for anomaly detection",
            "RS-01": "Automate incident response, conduct regular IR drills",
            "RS-02": "Implement incident management system, require documentation",
            "RS-03": "Update containment playbooks, automate containment actions",
            "RC-01": "Test recovery procedures, update RTO requirements",
            "RC-02": "Review backup infrastructure, implement backup verification",
            "RC-03": "Schedule regular DR tests, document test results"
        }
 
        return recommendations_map.get(metric.metric_id, "Review and improve control implementation")

Concluzie

Implementarea NIST CSF 2.0 necesită o evaluare sistematică a tuturor celor șase funcții, dezvoltarea profilurilor, analiza gap-urilor și monitorizare continuă. Începe prin stabilirea guvernanței și înțelegerea contextului organizațional. Creează profiluri curente și țintă pentru a ghida implementarea. Folosește metrici pentru a urmări progresul și a identifica zonele care necesită atenție. Nu uita că framework-ul este conceput să fie adaptiv - revizuiește și actualizează periodic implementarea pe măsură ce amenințările evoluează și organizația ta se schimbă.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.