Compliance

NIST Cybersecurity Framework Implementation: A Technical Guide

DeviDevs Team
14 min read
#NIST CSF#cybersecurity#compliance#risk management#security framework

The NIST Cybersecurity Framework provides a comprehensive approach to managing cybersecurity risk. This guide covers technical implementation of CSF 2.0 across all five core functions with practical code examples and automation strategies.

Framework Overview

CSF 2.0 Core Functions

# nist_csf_framework.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
 
class CSFFunction(Enum):
    GOVERN = "GV"  # New in CSF 2.0
    IDENTIFY = "ID"
    PROTECT = "PR"
    DETECT = "DE"
    RESPOND = "RS"
    RECOVER = "RC"
 
class ImplementationTier(Enum):
    PARTIAL = 1
    RISK_INFORMED = 2
    REPEATABLE = 3
    ADAPTIVE = 4
 
@dataclass
class Control:
    control_id: str
    function: CSFFunction
    category: str
    subcategory: str
    description: str
    implementation_status: str
    evidence: List[str] = field(default_factory=list)
    gaps: List[str] = field(default_factory=list)
    remediation_plan: Optional[str] = None
 
@dataclass
class CSFProfile:
    profile_id: str
    name: str
    description: str
    target_tier: ImplementationTier
    controls: List[Control]
    created_at: datetime
    last_assessed: Optional[datetime] = None
 
class CSFAssessment:
    """NIST CSF compliance assessment engine"""
 
    def __init__(self):
        self.controls = self._load_controls()
        self.current_profile: Optional[CSFProfile] = None
 
    def _load_controls(self) -> Dict[str, Control]:
        """Load CSF control catalog"""
        # Abbreviated control set - full implementation would include all 106 subcategories
        controls = {
            # GOVERN function (new in 2.0)
            "GV.OC-01": Control(
                control_id="GV.OC-01",
                function=CSFFunction.GOVERN,
                category="Organizational Context",
                subcategory="GV.OC-01",
                description="The organizational mission is understood and informs cybersecurity risk management",
                implementation_status="not_started",
                evidence=[]
            ),
 
            # IDENTIFY function
            "ID.AM-01": Control(
                control_id="ID.AM-01",
                function=CSFFunction.IDENTIFY,
                category="Asset Management",
                subcategory="ID.AM-01",
                description="Inventories of hardware managed by the organization are maintained",
                implementation_status="not_started",
                evidence=[]
            ),
            "ID.AM-02": Control(
                control_id="ID.AM-02",
                function=CSFFunction.IDENTIFY,
                category="Asset Management",
                subcategory="ID.AM-02",
                description="Inventories of software, services, and systems managed by the organization are maintained",
                implementation_status="not_started",
                evidence=[]
            ),
            "ID.RA-01": Control(
                control_id="ID.RA-01",
                function=CSFFunction.IDENTIFY,
                category="Risk Assessment",
                subcategory="ID.RA-01",
                description="Vulnerabilities in assets are identified, validated, and recorded",
                implementation_status="not_started",
                evidence=[]
            ),
 
            # PROTECT function
            "PR.AA-01": Control(
                control_id="PR.AA-01",
                function=CSFFunction.PROTECT,
                category="Identity Management and Access Control",
                subcategory="PR.AA-01",
                description="Identities and credentials for authorized users, services, and hardware are managed",
                implementation_status="not_started",
                evidence=[]
            ),
            "PR.DS-01": Control(
                control_id="PR.DS-01",
                function=CSFFunction.PROTECT,
                category="Data Security",
                subcategory="PR.DS-01",
                description="Data-at-rest is protected",
                implementation_status="not_started",
                evidence=[]
            ),
 
            # DETECT function
            "DE.CM-01": Control(
                control_id="DE.CM-01",
                function=CSFFunction.DETECT,
                category="Continuous Monitoring",
                subcategory="DE.CM-01",
                description="Networks and network services are monitored to find potentially adverse events",
                implementation_status="not_started",
                evidence=[]
            ),
 
            # RESPOND function
            "RS.MA-01": Control(
                control_id="RS.MA-01",
                function=CSFFunction.RESPOND,
                category="Incident Management",
                subcategory="RS.MA-01",
                description="The incident response plan is executed in coordination with relevant third parties",
                implementation_status="not_started",
                evidence=[]
            ),
 
            # RECOVER function
            "RC.RP-01": Control(
                control_id="RC.RP-01",
                function=CSFFunction.RECOVER,
                category="Incident Recovery Plan Execution",
                subcategory="RC.RP-01",
                description="The recovery portion of the incident response plan is executed",
                implementation_status="not_started",
                evidence=[]
            )
        }
        return controls
 
    def assess_control(
        self,
        control_id: str,
        status: str,
        evidence: List[str],
        gaps: List[str] = None
    ):
        """Assess a specific control"""
        if control_id not in self.controls:
            raise ValueError(f"Unknown control: {control_id}")
 
        control = self.controls[control_id]
        control.implementation_status = status
        control.evidence = evidence
        control.gaps = gaps or []
 
    def generate_gap_analysis(self) -> Dict:
        """Generate gap analysis report"""
        by_function = {}
 
        for control in self.controls.values():
            func = control.function.value
            if func not in by_function:
                by_function[func] = {
                    "total": 0,
                    "implemented": 0,
                    "partial": 0,
                    "not_implemented": 0,
                    "gaps": []
                }
 
            by_function[func]["total"] += 1
 
            if control.implementation_status == "implemented":
                by_function[func]["implemented"] += 1
            elif control.implementation_status == "partial":
                by_function[func]["partial"] += 1
            else:
                by_function[func]["not_implemented"] += 1
                by_function[func]["gaps"].extend(control.gaps)
 
        return {
            "assessment_date": datetime.utcnow().isoformat(),
            "by_function": by_function,
            "overall_score": self._calculate_score()
        }
 
    def _calculate_score(self) -> float:
        """Calculate overall compliance score"""
        total = len(self.controls)
        implemented = sum(
            1 for c in self.controls.values()
            if c.implementation_status == "implemented"
        )
        partial = sum(
            1 for c in self.controls.values()
            if c.implementation_status == "partial"
        )
 
        return (implemented + partial * 0.5) / total * 100

IDENTIFY Function Implementation

Asset Inventory Automation

# asset_inventory.py
import subprocess
import json
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
import boto3
import requests
 
@dataclass
class Asset:
    asset_id: str
    asset_type: str  # hardware, software, data, service
    name: str
    owner: str
    criticality: str  # critical, high, medium, low
    classification: str  # public, internal, confidential, restricted
    location: str
    discovered_at: datetime
    last_seen: datetime
    attributes: Dict
 
class AssetInventory:
    """Automated asset discovery and inventory management"""
 
    def __init__(self, config: Dict):
        self.config = config
        self.assets = {}
 
    def discover_aws_assets(self) -> List[Asset]:
        """Discover AWS infrastructure assets"""
        assets = []
 
        # EC2 instances
        ec2 = boto3.client('ec2')
        instances = ec2.describe_instances()
 
        for reservation in instances['Reservations']:
            for instance in reservation['Instances']:
                tags = {t['Key']: t['Value'] for t in instance.get('Tags', [])}
 
                asset = Asset(
                    asset_id=instance['InstanceId'],
                    asset_type='hardware',
                    name=tags.get('Name', instance['InstanceId']),
                    owner=tags.get('Owner', 'unknown'),
                    criticality=tags.get('Criticality', 'medium'),
                    classification=tags.get('Classification', 'internal'),
                    location=instance['Placement']['AvailabilityZone'],
                    discovered_at=datetime.utcnow(),
                    last_seen=datetime.utcnow(),
                    attributes={
                        'instance_type': instance['InstanceType'],
                        'state': instance['State']['Name'],
                        'private_ip': instance.get('PrivateIpAddress'),
                        'public_ip': instance.get('PublicIpAddress'),
                        'vpc_id': instance.get('VpcId'),
                        'security_groups': [
                            sg['GroupId'] for sg in instance.get('SecurityGroups', [])
                        ]
                    }
                )
                assets.append(asset)
 
        # RDS databases
        rds = boto3.client('rds')
        databases = rds.describe_db_instances()
 
        for db in databases['DBInstances']:
            asset = Asset(
                asset_id=db['DBInstanceIdentifier'],
                asset_type='data',
                name=db['DBInstanceIdentifier'],
                owner='database-team',
                criticality='critical',
                classification='confidential',
                location=db['AvailabilityZone'],
                discovered_at=datetime.utcnow(),
                last_seen=datetime.utcnow(),
                attributes={
                    'engine': db['Engine'],
                    'engine_version': db['EngineVersion'],
                    'instance_class': db['DBInstanceClass'],
                    'storage_encrypted': db['StorageEncrypted'],
                    'multi_az': db['MultiAZ']
                }
            )
            assets.append(asset)
 
        # S3 buckets
        s3 = boto3.client('s3')
        buckets = s3.list_buckets()
 
        for bucket in buckets['Buckets']:
            # Get bucket encryption status
            try:
                encryption = s3.get_bucket_encryption(Bucket=bucket['Name'])
                encrypted = True
            except:
                encrypted = False
 
            asset = Asset(
                asset_id=bucket['Name'],
                asset_type='data',
                name=bucket['Name'],
                owner='unknown',
                criticality='high',
                classification='internal',
                location='global',
                discovered_at=datetime.utcnow(),
                last_seen=datetime.utcnow(),
                attributes={
                    'creation_date': bucket['CreationDate'].isoformat(),
                    'encrypted': encrypted
                }
            )
            assets.append(asset)
 
        return assets
 
    def discover_software_assets(self) -> List[Asset]:
        """Discover software dependencies from package managers"""
        assets = []
 
        # NPM packages
        try:
            result = subprocess.run(
                ['npm', 'list', '--json', '--all'],
                capture_output=True,
                text=True
            )
            npm_deps = json.loads(result.stdout)
            assets.extend(self._parse_npm_deps(npm_deps.get('dependencies', {})))
        except Exception as e:
            print(f"NPM discovery failed: {e}")
 
        # Python packages
        try:
            result = subprocess.run(
                ['pip', 'list', '--format=json'],
                capture_output=True,
                text=True
            )
            pip_deps = json.loads(result.stdout)
            for pkg in pip_deps:
                asset = Asset(
                    asset_id=f"pip:{pkg['name']}:{pkg['version']}",
                    asset_type='software',
                    name=pkg['name'],
                    owner='development-team',
                    criticality='medium',
                    classification='internal',
                    location='application',
                    discovered_at=datetime.utcnow(),
                    last_seen=datetime.utcnow(),
                    attributes={
                        'version': pkg['version'],
                        'package_manager': 'pip'
                    }
                )
                assets.append(asset)
        except Exception as e:
            print(f"Pip discovery failed: {e}")
 
        return assets
 
    def _parse_npm_deps(self, deps: Dict, depth: int = 0) -> List[Asset]:
        """Parse NPM dependency tree"""
        assets = []
 
        for name, info in deps.items():
            version = info.get('version', 'unknown')
            asset = Asset(
                asset_id=f"npm:{name}:{version}",
                asset_type='software',
                name=name,
                owner='development-team',
                criticality='medium' if depth == 0 else 'low',
                classification='internal',
                location='application',
                discovered_at=datetime.utcnow(),
                last_seen=datetime.utcnow(),
                attributes={
                    'version': version,
                    'package_manager': 'npm',
                    'depth': depth,
                    'direct_dependency': depth == 0
                }
            )
            assets.append(asset)
 
            # Recurse for transitive dependencies
            if 'dependencies' in info:
                assets.extend(self._parse_npm_deps(info['dependencies'], depth + 1))
 
        return assets
 
    def generate_inventory_report(self) -> Dict:
        """Generate comprehensive inventory report"""
        all_assets = list(self.assets.values())
 
        return {
            "report_date": datetime.utcnow().isoformat(),
            "total_assets": len(all_assets),
            "by_type": self._group_by(all_assets, 'asset_type'),
            "by_criticality": self._group_by(all_assets, 'criticality'),
            "by_classification": self._group_by(all_assets, 'classification'),
            "assets": [
                {
                    "id": a.asset_id,
                    "name": a.name,
                    "type": a.asset_type,
                    "criticality": a.criticality
                }
                for a in all_assets
            ]
        }
 
    def _group_by(self, assets: List[Asset], field: str) -> Dict[str, int]:
        """Group assets by field"""
        groups = {}
        for asset in assets:
            value = getattr(asset, field)
            groups[value] = groups.get(value, 0) + 1
        return groups

PROTECT Function Implementation

Access Control Implementation

# access_control.py
from dataclasses import dataclass
from typing import Dict, List, Optional, Set
from enum import Enum
from datetime import datetime, timedelta
import hashlib
import secrets
 
class Permission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"
 
@dataclass
class Role:
    role_id: str
    name: str
    permissions: Set[Permission]
    resources: List[str]  # Resource patterns
    conditions: Dict  # Additional access conditions
 
@dataclass
class User:
    user_id: str
    email: str
    roles: List[str]
    attributes: Dict  # For ABAC
    mfa_enabled: bool
    last_login: Optional[datetime]
    password_changed: Optional[datetime]
 
class AccessControlSystem:
    """Role-based and attribute-based access control"""
 
    def __init__(self):
        self.roles: Dict[str, Role] = {}
        self.users: Dict[str, User] = {}
        self.access_logs = []
 
    def create_role(self, role: Role):
        """Create a role"""
        self.roles[role.role_id] = role
 
    def assign_role(self, user_id: str, role_id: str):
        """Assign role to user"""
        if user_id not in self.users:
            raise ValueError(f"User not found: {user_id}")
        if role_id not in self.roles:
            raise ValueError(f"Role not found: {role_id}")
 
        user = self.users[user_id]
        if role_id not in user.roles:
            user.roles.append(role_id)
 
    def check_access(
        self,
        user_id: str,
        resource: str,
        permission: Permission,
        context: Dict = None
    ) -> bool:
        """Check if user has access to resource"""
        if user_id not in self.users:
            self._log_access(user_id, resource, permission, False, "User not found")
            return False
 
        user = self.users[user_id]
        context = context or {}
 
        # Check each role
        for role_id in user.roles:
            role = self.roles.get(role_id)
            if not role:
                continue
 
            # Check permission
            if permission not in role.permissions:
                continue
 
            # Check resource pattern
            if not self._matches_resource(resource, role.resources):
                continue
 
            # Check conditions (ABAC)
            if not self._evaluate_conditions(role.conditions, user, context):
                continue
 
            # Access granted
            self._log_access(user_id, resource, permission, True, f"Role: {role_id}")
            return True
 
        # Access denied
        self._log_access(user_id, resource, permission, False, "No matching role")
        return False
 
    def _matches_resource(self, resource: str, patterns: List[str]) -> bool:
        """Check if resource matches any pattern"""
        import fnmatch
        for pattern in patterns:
            if fnmatch.fnmatch(resource, pattern):
                return True
        return False
 
    def _evaluate_conditions(
        self,
        conditions: Dict,
        user: User,
        context: Dict
    ) -> bool:
        """Evaluate ABAC conditions"""
        for condition_key, expected_value in conditions.items():
            if condition_key == "mfa_required" and expected_value:
                if not user.mfa_enabled or not context.get("mfa_verified"):
                    return False
 
            elif condition_key == "ip_whitelist":
                if context.get("ip") not in expected_value:
                    return False
 
            elif condition_key == "time_range":
                now = datetime.utcnow()
                start = datetime.strptime(expected_value["start"], "%H:%M").time()
                end = datetime.strptime(expected_value["end"], "%H:%M").time()
                if not (start <= now.time() <= end):
                    return False
 
            elif condition_key.startswith("user."):
                attr = condition_key[5:]
                if user.attributes.get(attr) != expected_value:
                    return False
 
        return True
 
    def _log_access(
        self,
        user_id: str,
        resource: str,
        permission: Permission,
        granted: bool,
        reason: str
    ):
        """Log access decision"""
        self.access_logs.append({
            "timestamp": datetime.utcnow().isoformat(),
            "user_id": user_id,
            "resource": resource,
            "permission": permission.value,
            "granted": granted,
            "reason": reason
        })
 
 
# NIST-compliant role definitions
def create_nist_compliant_roles() -> List[Role]:
    """Create roles following least privilege principle"""
    return [
        Role(
            role_id="viewer",
            name="Read-Only Viewer",
            permissions={Permission.READ},
            resources=["*"],
            conditions={}
        ),
        Role(
            role_id="operator",
            name="System Operator",
            permissions={Permission.READ, Permission.WRITE},
            resources=["systems/*", "logs/*"],
            conditions={
                "mfa_required": True,
                "time_range": {"start": "06:00", "end": "22:00"}
            }
        ),
        Role(
            role_id="admin",
            name="System Administrator",
            permissions={Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN},
            resources=["*"],
            conditions={
                "mfa_required": True,
                "ip_whitelist": ["10.0.0.0/8", "192.168.0.0/16"]
            }
        ),
        Role(
            role_id="security_analyst",
            name="Security Analyst",
            permissions={Permission.READ},
            resources=["security/*", "logs/*", "alerts/*"],
            conditions={
                "mfa_required": True
            }
        )
    ]

Data Protection Controls

# data_protection.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
import base64
import os
from typing import Dict, Optional
from dataclasses import dataclass
 
@dataclass
class DataClassification:
    level: str  # public, internal, confidential, restricted
    retention_days: int
    encryption_required: bool
    access_logging: bool
    dlp_enabled: bool
 
class DataProtection:
    """Data protection controls for NIST PR.DS"""
 
    CLASSIFICATIONS = {
        "public": DataClassification(
            level="public",
            retention_days=365,
            encryption_required=False,
            access_logging=False,
            dlp_enabled=False
        ),
        "internal": DataClassification(
            level="internal",
            retention_days=730,
            encryption_required=True,
            access_logging=True,
            dlp_enabled=False
        ),
        "confidential": DataClassification(
            level="confidential",
            retention_days=2555,  # 7 years
            encryption_required=True,
            access_logging=True,
            dlp_enabled=True
        ),
        "restricted": DataClassification(
            level="restricted",
            retention_days=2555,
            encryption_required=True,
            access_logging=True,
            dlp_enabled=True
        )
    }
 
    def __init__(self, master_key: bytes):
        self.master_key = master_key
        self._key_cache = {}
 
    def encrypt_data(
        self,
        data: bytes,
        classification: str,
        context: str
    ) -> Dict:
        """Encrypt data based on classification"""
        config = self.CLASSIFICATIONS.get(classification)
        if not config:
            raise ValueError(f"Unknown classification: {classification}")
 
        if not config.encryption_required:
            return {"encrypted": False, "data": data}
 
        # Derive context-specific key
        key = self._derive_key(context)
        fernet = Fernet(key)
 
        encrypted = fernet.encrypt(data)
 
        return {
            "encrypted": True,
            "data": encrypted,
            "context": context,
            "classification": classification,
            "algorithm": "AES-256-GCM"
        }
 
    def decrypt_data(
        self,
        encrypted_data: Dict
    ) -> bytes:
        """Decrypt data"""
        if not encrypted_data.get("encrypted"):
            return encrypted_data["data"]
 
        key = self._derive_key(encrypted_data["context"])
        fernet = Fernet(key)
 
        return fernet.decrypt(encrypted_data["data"])
 
    def _derive_key(self, context: str) -> bytes:
        """Derive encryption key from master key and context"""
        if context in self._key_cache:
            return self._key_cache[context]
 
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=context.encode(),
            iterations=100000,
            backend=default_backend()
        )
 
        key = base64.urlsafe_b64encode(kdf.derive(self.master_key))
        self._key_cache[context] = key
 
        return key
 
    def scan_for_sensitive_data(self, content: str) -> Dict:
        """DLP scan for sensitive data patterns"""
        import re
 
        patterns = {
            "credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
            "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
            "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
            "phone": r"\b(?:\+1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b",
            "api_key": r"\b(?:api[_-]?key|apikey|api[_-]?secret)[\"']?\s*[:=]\s*[\"']?[\w-]{20,}",
            "aws_key": r"(?:AKIA|ABIA|ACCA|ASIA)[0-9A-Z]{16}"
        }
 
        findings = {}
        for name, pattern in patterns.items():
            matches = re.findall(pattern, content, re.IGNORECASE)
            if matches:
                findings[name] = {
                    "count": len(matches),
                    "samples": [self._mask_finding(m) for m in matches[:3]]
                }
 
        return {
            "has_sensitive_data": len(findings) > 0,
            "findings": findings,
            "risk_level": self._calculate_risk(findings)
        }
 
    def _mask_finding(self, value: str) -> str:
        """Mask sensitive value for logging"""
        if len(value) <= 8:
            return "*" * len(value)
        return value[:4] + "*" * (len(value) - 8) + value[-4:]
 
    def _calculate_risk(self, findings: Dict) -> str:
        """Calculate risk level from findings"""
        high_risk = {"credit_card", "ssn", "api_key", "aws_key"}
 
        if any(f in findings for f in high_risk):
            return "high"
        elif findings:
            return "medium"
        return "low"

DETECT Function Implementation

Continuous Monitoring

# continuous_monitoring.py
from dataclasses import dataclass
from typing import Dict, List, Callable
from datetime import datetime, timedelta
import asyncio
 
@dataclass
class MonitoringRule:
    rule_id: str
    name: str
    description: str
    check_function: Callable
    interval_seconds: int
    severity: str
    enabled: bool = True
 
@dataclass
class MonitoringAlert:
    alert_id: str
    rule_id: str
    timestamp: datetime
    severity: str
    message: str
    details: Dict
 
class ContinuousMonitoring:
    """NIST DE.CM continuous monitoring implementation"""
 
    def __init__(self):
        self.rules: Dict[str, MonitoringRule] = {}
        self.alerts: List[MonitoringAlert] = []
        self.metrics = {}
 
    def register_rule(self, rule: MonitoringRule):
        """Register monitoring rule"""
        self.rules[rule.rule_id] = rule
 
    async def start_monitoring(self):
        """Start all monitoring tasks"""
        tasks = []
        for rule in self.rules.values():
            if rule.enabled:
                tasks.append(self._monitor_rule(rule))
 
        await asyncio.gather(*tasks)
 
    async def _monitor_rule(self, rule: MonitoringRule):
        """Run monitoring rule on schedule"""
        while True:
            try:
                result = await rule.check_function()
 
                if result.get("alert"):
                    alert = MonitoringAlert(
                        alert_id=f"{rule.rule_id}-{datetime.utcnow().timestamp()}",
                        rule_id=rule.rule_id,
                        timestamp=datetime.utcnow(),
                        severity=rule.severity,
                        message=result.get("message", "Alert triggered"),
                        details=result.get("details", {})
                    )
                    self.alerts.append(alert)
                    await self._send_alert(alert)
 
                # Update metrics
                self.metrics[rule.rule_id] = {
                    "last_check": datetime.utcnow().isoformat(),
                    "status": result.get("status", "unknown"),
                    "value": result.get("value")
                }
 
            except Exception as e:
                self.metrics[rule.rule_id] = {
                    "last_check": datetime.utcnow().isoformat(),
                    "status": "error",
                    "error": str(e)
                }
 
            await asyncio.sleep(rule.interval_seconds)
 
    async def _send_alert(self, alert: MonitoringAlert):
        """Send alert to notification channels"""
        # Implementation depends on alerting infrastructure
        pass
 
# Example monitoring rules
async def check_failed_logins() -> Dict:
    """Check for excessive failed logins"""
    # Query authentication logs
    failed_count = 0  # Would query actual logs
 
    threshold = 10
    if failed_count > threshold:
        return {
            "alert": True,
            "message": f"High number of failed logins: {failed_count}",
            "details": {"count": failed_count, "threshold": threshold}
        }
 
    return {"status": "ok", "value": failed_count}
 
async def check_unauthorized_access() -> Dict:
    """Check for unauthorized access attempts"""
    # Query authorization logs for denied access
    denied_count = 0  # Would query actual logs
 
    threshold = 5
    if denied_count > threshold:
        return {
            "alert": True,
            "message": f"Multiple unauthorized access attempts: {denied_count}",
            "details": {"count": denied_count}
        }
 
    return {"status": "ok", "value": denied_count}
 
async def check_security_group_changes() -> Dict:
    """Check for security group modifications"""
    import boto3
 
    # Query CloudTrail for security group changes
    # This is simplified - real implementation would use CloudTrail
 
    return {"status": "ok", "value": 0}

RESPOND and RECOVER Functions

Incident Response Automation

# incident_response.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
import uuid
 
class IncidentSeverity(Enum):
    CRITICAL = 1
    HIGH = 2
    MEDIUM = 3
    LOW = 4
 
class IncidentStatus(Enum):
    DETECTED = "detected"
    TRIAGED = "triaged"
    CONTAINED = "contained"
    ERADICATED = "eradicated"
    RECOVERED = "recovered"
    CLOSED = "closed"
 
@dataclass
class Incident:
    incident_id: str
    title: str
    description: str
    severity: IncidentSeverity
    status: IncidentStatus
    detected_at: datetime
    affected_systems: List[str]
    indicators: List[Dict]
    timeline: List[Dict] = field(default_factory=list)
    assigned_to: Optional[str] = None
    resolved_at: Optional[datetime] = None
 
class IncidentResponseManager:
    """NIST RS and RC incident response implementation"""
 
    def __init__(self):
        self.incidents: Dict[str, Incident] = {}
        self.playbooks = {}
 
    def create_incident(
        self,
        title: str,
        description: str,
        severity: IncidentSeverity,
        affected_systems: List[str],
        indicators: List[Dict]
    ) -> Incident:
        """Create new incident"""
        incident = Incident(
            incident_id=str(uuid.uuid4()),
            title=title,
            description=description,
            severity=severity,
            status=IncidentStatus.DETECTED,
            detected_at=datetime.utcnow(),
            affected_systems=affected_systems,
            indicators=indicators
        )
 
        # Add to timeline
        incident.timeline.append({
            "timestamp": datetime.utcnow().isoformat(),
            "action": "incident_created",
            "details": {"severity": severity.name}
        })
 
        self.incidents[incident.incident_id] = incident
 
        # Auto-assign based on severity
        if severity in [IncidentSeverity.CRITICAL, IncidentSeverity.HIGH]:
            self._escalate(incident)
 
        return incident
 
    def update_status(
        self,
        incident_id: str,
        new_status: IncidentStatus,
        notes: str = None
    ):
        """Update incident status"""
        incident = self.incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incident not found: {incident_id}")
 
        old_status = incident.status
        incident.status = new_status
 
        incident.timeline.append({
            "timestamp": datetime.utcnow().isoformat(),
            "action": "status_change",
            "details": {
                "from": old_status.value,
                "to": new_status.value,
                "notes": notes
            }
        })
 
        if new_status == IncidentStatus.CLOSED:
            incident.resolved_at = datetime.utcnow()
 
    def execute_playbook(
        self,
        incident_id: str,
        playbook_name: str
    ) -> List[Dict]:
        """Execute response playbook"""
        incident = self.incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incident not found: {incident_id}")
 
        playbook = self.playbooks.get(playbook_name)
        if not playbook:
            raise ValueError(f"Playbook not found: {playbook_name}")
 
        results = []
        for step in playbook["steps"]:
            result = self._execute_step(incident, step)
            results.append(result)
 
            incident.timeline.append({
                "timestamp": datetime.utcnow().isoformat(),
                "action": "playbook_step",
                "details": {
                    "playbook": playbook_name,
                    "step": step["name"],
                    "result": result
                }
            })
 
        return results
 
    def _execute_step(self, incident: Incident, step: Dict) -> Dict:
        """Execute single playbook step"""
        step_type = step["type"]
 
        if step_type == "isolate_system":
            return self._isolate_systems(incident.affected_systems)
        elif step_type == "block_indicators":
            return self._block_indicators(incident.indicators)
        elif step_type == "collect_evidence":
            return self._collect_evidence(incident)
        elif step_type == "notify_stakeholders":
            return self._notify_stakeholders(incident, step.get("stakeholders", []))
 
        return {"status": "unknown_step_type"}
 
    def _isolate_systems(self, systems: List[str]) -> Dict:
        """Isolate affected systems"""
        # Implementation would call network/EDR APIs
        return {
            "status": "success",
            "isolated_systems": systems
        }
 
    def _block_indicators(self, indicators: List[Dict]) -> Dict:
        """Block malicious indicators"""
        blocked = []
        for indicator in indicators:
            # Block IPs, domains, hashes, etc.
            blocked.append(indicator)
 
        return {
            "status": "success",
            "blocked_indicators": len(blocked)
        }
 
    def _collect_evidence(self, incident: Incident) -> Dict:
        """Collect forensic evidence"""
        evidence = []
        for system in incident.affected_systems:
            # Collect logs, memory dumps, etc.
            evidence.append({
                "system": system,
                "collected_at": datetime.utcnow().isoformat(),
                "types": ["logs", "network_captures"]
            })
 
        return {
            "status": "success",
            "evidence_collected": len(evidence)
        }
 
    def _notify_stakeholders(
        self,
        incident: Incident,
        stakeholders: List[str]
    ) -> Dict:
        """Notify relevant stakeholders"""
        # Send notifications
        return {
            "status": "success",
            "notified": stakeholders
        }
 
    def _escalate(self, incident: Incident):
        """Escalate critical incident"""
        incident.timeline.append({
            "timestamp": datetime.utcnow().isoformat(),
            "action": "escalated",
            "details": {"reason": f"Severity: {incident.severity.name}"}
        })

Summary

NIST CSF implementation requires systematic coverage of all core functions:

  1. GOVERN: Establish cybersecurity governance and risk management
  2. IDENTIFY: Maintain asset inventory and understand risk
  3. PROTECT: Implement access controls and data protection
  4. DETECT: Deploy continuous monitoring and anomaly detection
  5. RESPOND: Establish incident response capabilities
  6. RECOVER: Plan and execute recovery procedures

Use these technical implementations as a foundation, customizing for your organization's specific risk profile and compliance requirements.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.