Compliance

NIST Cybersecurity Framework 2.0 Implementation Guide

DeviDevs Team
10 min read
#nist-csf#cybersecurity#compliance#risk-management#security-framework

The NIST Cybersecurity Framework 2.0 provides a comprehensive approach to managing cybersecurity risk. This guide covers practical implementation of the six core functions for software organizations.

Framework Core Implementation

Implement the NIST CSF core functions:

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional
from enum import Enum
 
class CoreFunction(Enum):
    GOVERN = "GV"
    IDENTIFY = "ID"
    PROTECT = "PR"
    DETECT = "DE"
    RESPOND = "RS"
    RECOVER = "RC"
 
class ImplementationTier(Enum):
    PARTIAL = 1
    RISK_INFORMED = 2
    REPEATABLE = 3
    ADAPTIVE = 4
 
@dataclass
class SubCategory:
    id: str
    description: str
    implementation_status: str  # not_implemented, partial, implemented
    current_tier: ImplementationTier
    target_tier: ImplementationTier
    controls: List[str]
    evidence: List[str]
    gaps: List[str]
    owner: str
    review_date: Optional[datetime] = None
 
@dataclass
class Category:
    id: str
    name: str
    function: CoreFunction
    subcategories: List[SubCategory]
 
class NISTCSFImplementation:
    def __init__(self, organization_name: str):
        self.organization_name = organization_name
        self.categories = self._initialize_framework()
        self.current_profile: Dict = {}
        self.target_profile: Dict = {}
 
    def _initialize_framework(self) -> Dict[str, Category]:
        """Initialize NIST CSF 2.0 categories and subcategories."""
        return {
            # GOVERN Function (New in 2.0)
            "GV.OC": Category(
                id="GV.OC",
                name="Organizational Context",
                function=CoreFunction.GOVERN,
                subcategories=[
                    SubCategory(
                        id="GV.OC-01",
                        description="Mission is understood and informs cybersecurity risk management",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=[],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="GV.OC-02",
                        description="Internal and external stakeholders are understood",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=[],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            "GV.RM": Category(
                id="GV.RM",
                name="Risk Management Strategy",
                function=CoreFunction.GOVERN,
                subcategories=[
                    SubCategory(
                        id="GV.RM-01",
                        description="Risk management objectives are established and agreed upon",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.RISK_INFORMED,
                        controls=[],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            # IDENTIFY Function
            "ID.AM": Category(
                id="ID.AM",
                name="Asset Management",
                function=CoreFunction.IDENTIFY,
                subcategories=[
                    SubCategory(
                        id="ID.AM-01",
                        description="Inventories of hardware managed by the organization",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["CMDB", "Asset discovery tools"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="ID.AM-02",
                        description="Inventories of software and services managed by the organization",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["Software inventory", "License management"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="ID.AM-07",
                        description="Data classification scheme established and used",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["Data classification policy", "DLP"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            "ID.RA": Category(
                id="ID.RA",
                name="Risk Assessment",
                function=CoreFunction.IDENTIFY,
                subcategories=[
                    SubCategory(
                        id="ID.RA-01",
                        description="Vulnerabilities in assets are identified, validated, and recorded",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["Vulnerability scanning", "Penetration testing"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="ID.RA-03",
                        description="Threats are identified and documented",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.RISK_INFORMED,
                        controls=["Threat intelligence", "Threat modeling"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            # PROTECT Function
            "PR.AA": Category(
                id="PR.AA",
                name="Identity Management and Access Control",
                function=CoreFunction.PROTECT,
                subcategories=[
                    SubCategory(
                        id="PR.AA-01",
                        description="Identities and credentials for users and services managed",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["IAM", "SSO", "MFA"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="PR.AA-05",
                        description="Access permissions are managed with least privilege",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["RBAC", "Access reviews"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            "PR.DS": Category(
                id="PR.DS",
                name="Data Security",
                function=CoreFunction.PROTECT,
                subcategories=[
                    SubCategory(
                        id="PR.DS-01",
                        description="Data-at-rest is protected",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["Encryption at rest", "Key management"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="PR.DS-02",
                        description="Data-in-transit is protected",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["TLS", "VPN", "Certificate management"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            # DETECT Function
            "DE.CM": Category(
                id="DE.CM",
                name="Continuous Monitoring",
                function=CoreFunction.DETECT,
                subcategories=[
                    SubCategory(
                        id="DE.CM-01",
                        description="Networks are monitored to detect potential cybersecurity events",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["SIEM", "IDS/IPS", "NDR"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    ),
                    SubCategory(
                        id="DE.CM-06",
                        description="External service provider activity is monitored",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.RISK_INFORMED,
                        controls=["Vendor monitoring", "API monitoring"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            # RESPOND Function
            "RS.MA": Category(
                id="RS.MA",
                name="Incident Management",
                function=CoreFunction.RESPOND,
                subcategories=[
                    SubCategory(
                        id="RS.MA-01",
                        description="Incident response plan is executed",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["IR plan", "IR team", "Playbooks"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            ),
            # RECOVER Function
            "RC.RP": Category(
                id="RC.RP",
                name="Incident Recovery Plan Execution",
                function=CoreFunction.RECOVER,
                subcategories=[
                    SubCategory(
                        id="RC.RP-01",
                        description="Recovery plan is executed during or after an incident",
                        implementation_status="not_implemented",
                        current_tier=ImplementationTier.PARTIAL,
                        target_tier=ImplementationTier.REPEATABLE,
                        controls=["BCP", "DR plan", "Backup systems"],
                        evidence=[],
                        gaps=[],
                        owner=""
                    )
                ]
            )
        }
 
    def create_current_profile(self) -> Dict:
        """Create current state profile."""
        profile = {
            "profile_type": "current",
            "created_date": datetime.utcnow().isoformat(),
            "organization": self.organization_name,
            "functions": {}
        }
 
        for func in CoreFunction:
            profile["functions"][func.value] = {
                "categories": {},
                "overall_tier": ImplementationTier.PARTIAL.value
            }
 
        for cat_id, category in self.categories.items():
            func_key = category.function.value
            profile["functions"][func_key]["categories"][cat_id] = {
                "name": category.name,
                "subcategories": {}
            }
 
            for subcat in category.subcategories:
                profile["functions"][func_key]["categories"][cat_id]["subcategories"][subcat.id] = {
                    "status": subcat.implementation_status,
                    "tier": subcat.current_tier.value,
                    "controls": subcat.controls,
                    "evidence": subcat.evidence
                }
 
        self.current_profile = profile
        return profile
 
    def create_target_profile(self, target_tier: ImplementationTier = ImplementationTier.REPEATABLE) -> Dict:
        """Create target state profile."""
        profile = {
            "profile_type": "target",
            "created_date": datetime.utcnow().isoformat(),
            "organization": self.organization_name,
            "target_tier": target_tier.value,
            "functions": {}
        }
 
        for func in CoreFunction:
            profile["functions"][func.value] = {
                "target_tier": target_tier.value,
                "categories": {}
            }
 
        for cat_id, category in self.categories.items():
            func_key = category.function.value
            profile["functions"][func_key]["categories"][cat_id] = {
                "name": category.name,
                "subcategories": {}
            }
 
            for subcat in category.subcategories:
                profile["functions"][func_key]["categories"][cat_id]["subcategories"][subcat.id] = {
                    "target_tier": subcat.target_tier.value,
                    "required_controls": subcat.controls,
                    "priority": self._calculate_priority(subcat)
                }
 
        self.target_profile = profile
        return profile
 
    def _calculate_priority(self, subcat: SubCategory) -> str:
        """Calculate implementation priority."""
        tier_gap = subcat.target_tier.value - subcat.current_tier.value
 
        if tier_gap >= 2:
            return "high"
        elif tier_gap == 1:
            return "medium"
        else:
            return "low"
 
    def gap_analysis(self) -> Dict:
        """Perform gap analysis between current and target profiles."""
        gaps = {
            "analysis_date": datetime.utcnow().isoformat(),
            "summary": {
                "total_gaps": 0,
                "high_priority": 0,
                "medium_priority": 0,
                "low_priority": 0
            },
            "by_function": {},
            "detailed_gaps": []
        }
 
        for func in CoreFunction:
            gaps["by_function"][func.value] = {"gaps": 0, "items": []}
 
        for cat_id, category in self.categories.items():
            for subcat in category.subcategories:
                if subcat.current_tier.value < subcat.target_tier.value:
                    gap_item = {
                        "subcategory_id": subcat.id,
                        "description": subcat.description,
                        "category": category.name,
                        "function": category.function.value,
                        "current_tier": subcat.current_tier.value,
                        "target_tier": subcat.target_tier.value,
                        "gap_size": subcat.target_tier.value - subcat.current_tier.value,
                        "priority": self._calculate_priority(subcat),
                        "required_controls": subcat.controls,
                        "current_gaps": subcat.gaps
                    }
 
                    gaps["detailed_gaps"].append(gap_item)
                    gaps["by_function"][category.function.value]["gaps"] += 1
                    gaps["by_function"][category.function.value]["items"].append(subcat.id)
                    gaps["summary"]["total_gaps"] += 1
 
                    priority = self._calculate_priority(subcat)
                    gaps["summary"][f"{priority}_priority"] += 1
 
        return gaps
 
    def generate_implementation_roadmap(self, months: int = 12) -> Dict:
        """Generate implementation roadmap."""
        gap_analysis = self.gap_analysis()
 
        roadmap = {
            "duration_months": months,
            "phases": [],
            "milestones": []
        }
 
        # Sort gaps by priority
        high_priority = [g for g in gap_analysis["detailed_gaps"] if g["priority"] == "high"]
        medium_priority = [g for g in gap_analysis["detailed_gaps"] if g["priority"] == "medium"]
        low_priority = [g for g in gap_analysis["detailed_gaps"] if g["priority"] == "low"]
 
        # Phase 1: High priority (months 1-4)
        roadmap["phases"].append({
            "phase": 1,
            "name": "Critical Controls",
            "duration": "Months 1-4",
            "items": [g["subcategory_id"] for g in high_priority],
            "focus": "Implement critical security controls"
        })
 
        # Phase 2: Medium priority (months 5-8)
        roadmap["phases"].append({
            "phase": 2,
            "name": "Enhanced Controls",
            "duration": "Months 5-8",
            "items": [g["subcategory_id"] for g in medium_priority],
            "focus": "Enhance security posture"
        })
 
        # Phase 3: Low priority and optimization (months 9-12)
        roadmap["phases"].append({
            "phase": 3,
            "name": "Optimization",
            "duration": "Months 9-12",
            "items": [g["subcategory_id"] for g in low_priority],
            "focus": "Optimize and mature controls"
        })
 
        # Milestones
        roadmap["milestones"] = [
            {"month": 2, "milestone": "Core identity and access controls implemented"},
            {"month": 4, "milestone": "Detection capabilities operational"},
            {"month": 6, "milestone": "Incident response tested"},
            {"month": 8, "milestone": "Recovery capabilities validated"},
            {"month": 10, "milestone": "Governance framework established"},
            {"month": 12, "milestone": "Target profile achieved"}
        ]
 
        return roadmap

Continuous Monitoring Dashboard

Implement CSF metrics and monitoring:

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Dict
import json
 
@dataclass
class SecurityMetric:
    metric_id: str
    name: str
    category: str
    function: CoreFunction
    value: float
    target: float
    trend: str  # improving, stable, declining
    last_updated: datetime
 
class CSFMetricsDashboard:
    def __init__(self, csf_implementation: NISTCSFImplementation):
        self.csf = csf_implementation
        self.metrics: Dict[str, SecurityMetric] = {}
        self._initialize_metrics()
 
    def _initialize_metrics(self):
        """Initialize security metrics aligned with CSF."""
        metrics_config = [
            # GOVERN metrics
            ("GV-01", "Policy Compliance Rate", "GV.PO", CoreFunction.GOVERN, 95.0),
            ("GV-02", "Risk Assessment Coverage", "GV.RM", CoreFunction.GOVERN, 100.0),
 
            # IDENTIFY metrics
            ("ID-01", "Asset Inventory Coverage", "ID.AM", CoreFunction.IDENTIFY, 98.0),
            ("ID-02", "Vulnerability Scan Coverage", "ID.RA", CoreFunction.IDENTIFY, 100.0),
            ("ID-03", "Critical Vulnerability Remediation Time", "ID.RA", CoreFunction.IDENTIFY, 7.0),
 
            # PROTECT metrics
            ("PR-01", "MFA Adoption Rate", "PR.AA", CoreFunction.PROTECT, 100.0),
            ("PR-02", "Privileged Access Review Compliance", "PR.AA", CoreFunction.PROTECT, 100.0),
            ("PR-03", "Encryption Coverage", "PR.DS", CoreFunction.PROTECT, 100.0),
            ("PR-04", "Security Training Completion", "PR.AT", CoreFunction.PROTECT, 95.0),
 
            # DETECT metrics
            ("DE-01", "Mean Time to Detect (hours)", "DE.CM", CoreFunction.DETECT, 1.0),
            ("DE-02", "Alert Response Rate", "DE.AE", CoreFunction.DETECT, 100.0),
            ("DE-03", "False Positive Rate", "DE.AE", CoreFunction.DETECT, 10.0),
 
            # RESPOND metrics
            ("RS-01", "Mean Time to Respond (hours)", "RS.MA", CoreFunction.RESPOND, 4.0),
            ("RS-02", "Incident Documentation Rate", "RS.AN", CoreFunction.RESPOND, 100.0),
            ("RS-03", "Containment Effectiveness", "RS.MI", CoreFunction.RESPOND, 95.0),
 
            # RECOVER metrics
            ("RC-01", "Recovery Time Objective Achievement", "RC.RP", CoreFunction.RECOVER, 100.0),
            ("RC-02", "Backup Success Rate", "RC.RP", CoreFunction.RECOVER, 99.9),
            ("RC-03", "DR Test Frequency (per year)", "RC.RP", CoreFunction.RECOVER, 4.0)
        ]
 
        for metric_id, name, category, function, target in metrics_config:
            self.metrics[metric_id] = SecurityMetric(
                metric_id=metric_id,
                name=name,
                category=category,
                function=function,
                value=0.0,
                target=target,
                trend="stable",
                last_updated=datetime.utcnow()
            )
 
    def update_metric(self, metric_id: str, value: float):
        """Update metric value and calculate trend."""
        if metric_id not in self.metrics:
            raise ValueError(f"Unknown metric: {metric_id}")
 
        metric = self.metrics[metric_id]
        old_value = metric.value
 
        # Calculate trend
        if value > old_value:
            metric.trend = "improving"
        elif value < old_value:
            metric.trend = "declining"
        else:
            metric.trend = "stable"
 
        metric.value = value
        metric.last_updated = datetime.utcnow()
 
    def get_function_score(self, function: CoreFunction) -> Dict:
        """Calculate score for a CSF function."""
        function_metrics = [m for m in self.metrics.values() if m.function == function]
 
        if not function_metrics:
            return {"score": 0, "metrics_count": 0}
 
        # Calculate weighted score based on target achievement
        total_score = 0
        for metric in function_metrics:
            if metric.target > 0:
                achievement = min(metric.value / metric.target, 1.0) * 100
                total_score += achievement
 
        return {
            "function": function.value,
            "score": total_score / len(function_metrics),
            "metrics_count": len(function_metrics),
            "metrics_meeting_target": sum(1 for m in function_metrics if m.value >= m.target)
        }
 
    def generate_dashboard(self) -> Dict:
        """Generate complete dashboard data."""
        dashboard = {
            "generated_at": datetime.utcnow().isoformat(),
            "organization": self.csf.organization_name,
            "overall_score": 0,
            "function_scores": {},
            "metrics_summary": {
                "total": len(self.metrics),
                "meeting_target": 0,
                "improving": 0,
                "declining": 0
            },
            "detailed_metrics": []
        }
 
        # Calculate function scores
        total_function_score = 0
        for function in CoreFunction:
            score_data = self.get_function_score(function)
            dashboard["function_scores"][function.value] = score_data
            total_function_score += score_data["score"]
 
        dashboard["overall_score"] = total_function_score / len(CoreFunction)
 
        # Metrics summary
        for metric in self.metrics.values():
            if metric.value >= metric.target:
                dashboard["metrics_summary"]["meeting_target"] += 1
 
            if metric.trend == "improving":
                dashboard["metrics_summary"]["improving"] += 1
            elif metric.trend == "declining":
                dashboard["metrics_summary"]["declining"] += 1
 
            dashboard["detailed_metrics"].append({
                "id": metric.metric_id,
                "name": metric.name,
                "function": metric.function.value,
                "category": metric.category,
                "value": metric.value,
                "target": metric.target,
                "achievement": (metric.value / metric.target * 100) if metric.target > 0 else 0,
                "trend": metric.trend,
                "last_updated": metric.last_updated.isoformat()
            })
 
        return dashboard
 
    def get_recommendations(self) -> List[Dict]:
        """Generate improvement recommendations."""
        recommendations = []
 
        for metric in self.metrics.values():
            if metric.value < metric.target:
                gap = metric.target - metric.value
                priority = "high" if gap / metric.target > 0.3 else "medium" if gap / metric.target > 0.1 else "low"
 
                recommendations.append({
                    "metric_id": metric.metric_id,
                    "metric_name": metric.name,
                    "function": metric.function.value,
                    "current_value": metric.value,
                    "target_value": metric.target,
                    "gap": gap,
                    "gap_percentage": (gap / metric.target * 100) if metric.target > 0 else 0,
                    "priority": priority,
                    "trend": metric.trend,
                    "recommendation": self._generate_recommendation(metric)
                })
 
        # Sort by priority and gap
        recommendations.sort(key=lambda x: (-{"high": 3, "medium": 2, "low": 1}[x["priority"]], -x["gap_percentage"]))
 
        return recommendations
 
    def _generate_recommendation(self, metric: SecurityMetric) -> str:
        """Generate specific recommendation for metric."""
        recommendations_map = {
            "GV-01": "Review and update security policies, conduct policy awareness training",
            "ID-01": "Deploy asset discovery tools, implement automated inventory updates",
            "ID-02": "Increase vulnerability scan frequency, expand scan coverage",
            "ID-03": "Implement automated patching, prioritize critical vulnerability remediation",
            "PR-01": "Enforce MFA for all users, implement adaptive authentication",
            "PR-02": "Schedule regular access reviews, implement just-in-time access",
            "PR-03": "Deploy encryption for all data stores, implement key rotation",
            "PR-04": "Launch security awareness program, require annual training completion",
            "DE-01": "Tune detection rules, implement behavioral analytics",
            "DE-02": "Review alert triage process, automate response for common alerts",
            "DE-03": "Tune detection signatures, implement machine learning for anomaly detection",
            "RS-01": "Automate incident response, conduct regular IR drills",
            "RS-02": "Implement incident management system, require documentation",
            "RS-03": "Update containment playbooks, automate containment actions",
            "RC-01": "Test recovery procedures, update RTO requirements",
            "RC-02": "Review backup infrastructure, implement backup verification",
            "RC-03": "Schedule regular DR tests, document test results"
        }
 
        return recommendations_map.get(metric.metric_id, "Review and improve control implementation")

Conclusion

NIST CSF 2.0 implementation requires systematic assessment across all six functions, profile development, gap analysis, and continuous monitoring. Start by establishing governance and understanding your organizational context. Create current and target profiles to guide implementation. Use metrics to track progress and identify areas needing attention. Remember that the framework is meant to be adaptive - regularly review and update your implementation as threats evolve and your organization changes.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.