The NIST Cybersecurity Framework provides a comprehensive approach to managing cybersecurity risk. This guide covers technical implementation of CSF 2.0 across all five core functions with practical code examples and automation strategies.
Framework Overview
CSF 2.0 Core Functions
# nist_csf_framework.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
class CSFFunction(Enum):
GOVERN = "GV" # New in CSF 2.0
IDENTIFY = "ID"
PROTECT = "PR"
DETECT = "DE"
RESPOND = "RS"
RECOVER = "RC"
class ImplementationTier(Enum):
PARTIAL = 1
RISK_INFORMED = 2
REPEATABLE = 3
ADAPTIVE = 4
@dataclass
class Control:
control_id: str
function: CSFFunction
category: str
subcategory: str
description: str
implementation_status: str
evidence: List[str] = field(default_factory=list)
gaps: List[str] = field(default_factory=list)
remediation_plan: Optional[str] = None
@dataclass
class CSFProfile:
profile_id: str
name: str
description: str
target_tier: ImplementationTier
controls: List[Control]
created_at: datetime
last_assessed: Optional[datetime] = None
class CSFAssessment:
"""NIST CSF compliance assessment engine"""
def __init__(self):
self.controls = self._load_controls()
self.current_profile: Optional[CSFProfile] = None
def _load_controls(self) -> Dict[str, Control]:
"""Load CSF control catalog"""
# Abbreviated control set - full implementation would include all 106 subcategories
controls = {
# GOVERN function (new in 2.0)
"GV.OC-01": Control(
control_id="GV.OC-01",
function=CSFFunction.GOVERN,
category="Organizational Context",
subcategory="GV.OC-01",
description="The organizational mission is understood and informs cybersecurity risk management",
implementation_status="not_started",
evidence=[]
),
# IDENTIFY function
"ID.AM-01": Control(
control_id="ID.AM-01",
function=CSFFunction.IDENTIFY,
category="Asset Management",
subcategory="ID.AM-01",
description="Inventories of hardware managed by the organization are maintained",
implementation_status="not_started",
evidence=[]
),
"ID.AM-02": Control(
control_id="ID.AM-02",
function=CSFFunction.IDENTIFY,
category="Asset Management",
subcategory="ID.AM-02",
description="Inventories of software, services, and systems managed by the organization are maintained",
implementation_status="not_started",
evidence=[]
),
"ID.RA-01": Control(
control_id="ID.RA-01",
function=CSFFunction.IDENTIFY,
category="Risk Assessment",
subcategory="ID.RA-01",
description="Vulnerabilities in assets are identified, validated, and recorded",
implementation_status="not_started",
evidence=[]
),
# PROTECT function
"PR.AA-01": Control(
control_id="PR.AA-01",
function=CSFFunction.PROTECT,
category="Identity Management and Access Control",
subcategory="PR.AA-01",
description="Identities and credentials for authorized users, services, and hardware are managed",
implementation_status="not_started",
evidence=[]
),
"PR.DS-01": Control(
control_id="PR.DS-01",
function=CSFFunction.PROTECT,
category="Data Security",
subcategory="PR.DS-01",
description="Data-at-rest is protected",
implementation_status="not_started",
evidence=[]
),
# DETECT function
"DE.CM-01": Control(
control_id="DE.CM-01",
function=CSFFunction.DETECT,
category="Continuous Monitoring",
subcategory="DE.CM-01",
description="Networks and network services are monitored to find potentially adverse events",
implementation_status="not_started",
evidence=[]
),
# RESPOND function
"RS.MA-01": Control(
control_id="RS.MA-01",
function=CSFFunction.RESPOND,
category="Incident Management",
subcategory="RS.MA-01",
description="The incident response plan is executed in coordination with relevant third parties",
implementation_status="not_started",
evidence=[]
),
# RECOVER function
"RC.RP-01": Control(
control_id="RC.RP-01",
function=CSFFunction.RECOVER,
category="Incident Recovery Plan Execution",
subcategory="RC.RP-01",
description="The recovery portion of the incident response plan is executed",
implementation_status="not_started",
evidence=[]
)
}
return controls
def assess_control(
self,
control_id: str,
status: str,
evidence: List[str],
gaps: List[str] = None
):
"""Assess a specific control"""
if control_id not in self.controls:
raise ValueError(f"Unknown control: {control_id}")
control = self.controls[control_id]
control.implementation_status = status
control.evidence = evidence
control.gaps = gaps or []
def generate_gap_analysis(self) -> Dict:
"""Generate gap analysis report"""
by_function = {}
for control in self.controls.values():
func = control.function.value
if func not in by_function:
by_function[func] = {
"total": 0,
"implemented": 0,
"partial": 0,
"not_implemented": 0,
"gaps": []
}
by_function[func]["total"] += 1
if control.implementation_status == "implemented":
by_function[func]["implemented"] += 1
elif control.implementation_status == "partial":
by_function[func]["partial"] += 1
else:
by_function[func]["not_implemented"] += 1
by_function[func]["gaps"].extend(control.gaps)
return {
"assessment_date": datetime.utcnow().isoformat(),
"by_function": by_function,
"overall_score": self._calculate_score()
}
def _calculate_score(self) -> float:
"""Calculate overall compliance score"""
total = len(self.controls)
implemented = sum(
1 for c in self.controls.values()
if c.implementation_status == "implemented"
)
partial = sum(
1 for c in self.controls.values()
if c.implementation_status == "partial"
)
return (implemented + partial * 0.5) / total * 100IDENTIFY Function Implementation
Asset Inventory Automation
# asset_inventory.py
import subprocess
import json
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
import boto3
import requests
@dataclass
class Asset:
asset_id: str
asset_type: str # hardware, software, data, service
name: str
owner: str
criticality: str # critical, high, medium, low
classification: str # public, internal, confidential, restricted
location: str
discovered_at: datetime
last_seen: datetime
attributes: Dict
class AssetInventory:
"""Automated asset discovery and inventory management"""
def __init__(self, config: Dict):
self.config = config
self.assets = {}
def discover_aws_assets(self) -> List[Asset]:
"""Discover AWS infrastructure assets"""
assets = []
# EC2 instances
ec2 = boto3.client('ec2')
instances = ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
tags = {t['Key']: t['Value'] for t in instance.get('Tags', [])}
asset = Asset(
asset_id=instance['InstanceId'],
asset_type='hardware',
name=tags.get('Name', instance['InstanceId']),
owner=tags.get('Owner', 'unknown'),
criticality=tags.get('Criticality', 'medium'),
classification=tags.get('Classification', 'internal'),
location=instance['Placement']['AvailabilityZone'],
discovered_at=datetime.utcnow(),
last_seen=datetime.utcnow(),
attributes={
'instance_type': instance['InstanceType'],
'state': instance['State']['Name'],
'private_ip': instance.get('PrivateIpAddress'),
'public_ip': instance.get('PublicIpAddress'),
'vpc_id': instance.get('VpcId'),
'security_groups': [
sg['GroupId'] for sg in instance.get('SecurityGroups', [])
]
}
)
assets.append(asset)
# RDS databases
rds = boto3.client('rds')
databases = rds.describe_db_instances()
for db in databases['DBInstances']:
asset = Asset(
asset_id=db['DBInstanceIdentifier'],
asset_type='data',
name=db['DBInstanceIdentifier'],
owner='database-team',
criticality='critical',
classification='confidential',
location=db['AvailabilityZone'],
discovered_at=datetime.utcnow(),
last_seen=datetime.utcnow(),
attributes={
'engine': db['Engine'],
'engine_version': db['EngineVersion'],
'instance_class': db['DBInstanceClass'],
'storage_encrypted': db['StorageEncrypted'],
'multi_az': db['MultiAZ']
}
)
assets.append(asset)
# S3 buckets
s3 = boto3.client('s3')
buckets = s3.list_buckets()
for bucket in buckets['Buckets']:
# Get bucket encryption status
try:
encryption = s3.get_bucket_encryption(Bucket=bucket['Name'])
encrypted = True
except:
encrypted = False
asset = Asset(
asset_id=bucket['Name'],
asset_type='data',
name=bucket['Name'],
owner='unknown',
criticality='high',
classification='internal',
location='global',
discovered_at=datetime.utcnow(),
last_seen=datetime.utcnow(),
attributes={
'creation_date': bucket['CreationDate'].isoformat(),
'encrypted': encrypted
}
)
assets.append(asset)
return assets
def discover_software_assets(self) -> List[Asset]:
"""Discover software dependencies from package managers"""
assets = []
# NPM packages
try:
result = subprocess.run(
['npm', 'list', '--json', '--all'],
capture_output=True,
text=True
)
npm_deps = json.loads(result.stdout)
assets.extend(self._parse_npm_deps(npm_deps.get('dependencies', {})))
except Exception as e:
print(f"NPM discovery failed: {e}")
# Python packages
try:
result = subprocess.run(
['pip', 'list', '--format=json'],
capture_output=True,
text=True
)
pip_deps = json.loads(result.stdout)
for pkg in pip_deps:
asset = Asset(
asset_id=f"pip:{pkg['name']}:{pkg['version']}",
asset_type='software',
name=pkg['name'],
owner='development-team',
criticality='medium',
classification='internal',
location='application',
discovered_at=datetime.utcnow(),
last_seen=datetime.utcnow(),
attributes={
'version': pkg['version'],
'package_manager': 'pip'
}
)
assets.append(asset)
except Exception as e:
print(f"Pip discovery failed: {e}")
return assets
def _parse_npm_deps(self, deps: Dict, depth: int = 0) -> List[Asset]:
"""Parse NPM dependency tree"""
assets = []
for name, info in deps.items():
version = info.get('version', 'unknown')
asset = Asset(
asset_id=f"npm:{name}:{version}",
asset_type='software',
name=name,
owner='development-team',
criticality='medium' if depth == 0 else 'low',
classification='internal',
location='application',
discovered_at=datetime.utcnow(),
last_seen=datetime.utcnow(),
attributes={
'version': version,
'package_manager': 'npm',
'depth': depth,
'direct_dependency': depth == 0
}
)
assets.append(asset)
# Recurse for transitive dependencies
if 'dependencies' in info:
assets.extend(self._parse_npm_deps(info['dependencies'], depth + 1))
return assets
def generate_inventory_report(self) -> Dict:
"""Generate comprehensive inventory report"""
all_assets = list(self.assets.values())
return {
"report_date": datetime.utcnow().isoformat(),
"total_assets": len(all_assets),
"by_type": self._group_by(all_assets, 'asset_type'),
"by_criticality": self._group_by(all_assets, 'criticality'),
"by_classification": self._group_by(all_assets, 'classification'),
"assets": [
{
"id": a.asset_id,
"name": a.name,
"type": a.asset_type,
"criticality": a.criticality
}
for a in all_assets
]
}
def _group_by(self, assets: List[Asset], field: str) -> Dict[str, int]:
"""Group assets by field"""
groups = {}
for asset in assets:
value = getattr(asset, field)
groups[value] = groups.get(value, 0) + 1
return groupsPROTECT Function Implementation
Access Control Implementation
# access_control.py
from dataclasses import dataclass
from typing import Dict, List, Optional, Set
from enum import Enum
from datetime import datetime, timedelta
import hashlib
import secrets
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
@dataclass
class Role:
role_id: str
name: str
permissions: Set[Permission]
resources: List[str] # Resource patterns
conditions: Dict # Additional access conditions
@dataclass
class User:
user_id: str
email: str
roles: List[str]
attributes: Dict # For ABAC
mfa_enabled: bool
last_login: Optional[datetime]
password_changed: Optional[datetime]
class AccessControlSystem:
"""Role-based and attribute-based access control"""
def __init__(self):
self.roles: Dict[str, Role] = {}
self.users: Dict[str, User] = {}
self.access_logs = []
def create_role(self, role: Role):
"""Create a role"""
self.roles[role.role_id] = role
def assign_role(self, user_id: str, role_id: str):
"""Assign role to user"""
if user_id not in self.users:
raise ValueError(f"User not found: {user_id}")
if role_id not in self.roles:
raise ValueError(f"Role not found: {role_id}")
user = self.users[user_id]
if role_id not in user.roles:
user.roles.append(role_id)
def check_access(
self,
user_id: str,
resource: str,
permission: Permission,
context: Dict = None
) -> bool:
"""Check if user has access to resource"""
if user_id not in self.users:
self._log_access(user_id, resource, permission, False, "User not found")
return False
user = self.users[user_id]
context = context or {}
# Check each role
for role_id in user.roles:
role = self.roles.get(role_id)
if not role:
continue
# Check permission
if permission not in role.permissions:
continue
# Check resource pattern
if not self._matches_resource(resource, role.resources):
continue
# Check conditions (ABAC)
if not self._evaluate_conditions(role.conditions, user, context):
continue
# Access granted
self._log_access(user_id, resource, permission, True, f"Role: {role_id}")
return True
# Access denied
self._log_access(user_id, resource, permission, False, "No matching role")
return False
def _matches_resource(self, resource: str, patterns: List[str]) -> bool:
"""Check if resource matches any pattern"""
import fnmatch
for pattern in patterns:
if fnmatch.fnmatch(resource, pattern):
return True
return False
def _evaluate_conditions(
self,
conditions: Dict,
user: User,
context: Dict
) -> bool:
"""Evaluate ABAC conditions"""
for condition_key, expected_value in conditions.items():
if condition_key == "mfa_required" and expected_value:
if not user.mfa_enabled or not context.get("mfa_verified"):
return False
elif condition_key == "ip_whitelist":
if context.get("ip") not in expected_value:
return False
elif condition_key == "time_range":
now = datetime.utcnow()
start = datetime.strptime(expected_value["start"], "%H:%M").time()
end = datetime.strptime(expected_value["end"], "%H:%M").time()
if not (start <= now.time() <= end):
return False
elif condition_key.startswith("user."):
attr = condition_key[5:]
if user.attributes.get(attr) != expected_value:
return False
return True
def _log_access(
self,
user_id: str,
resource: str,
permission: Permission,
granted: bool,
reason: str
):
"""Log access decision"""
self.access_logs.append({
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"resource": resource,
"permission": permission.value,
"granted": granted,
"reason": reason
})
# NIST-compliant role definitions
def create_nist_compliant_roles() -> List[Role]:
"""Create roles following least privilege principle"""
return [
Role(
role_id="viewer",
name="Read-Only Viewer",
permissions={Permission.READ},
resources=["*"],
conditions={}
),
Role(
role_id="operator",
name="System Operator",
permissions={Permission.READ, Permission.WRITE},
resources=["systems/*", "logs/*"],
conditions={
"mfa_required": True,
"time_range": {"start": "06:00", "end": "22:00"}
}
),
Role(
role_id="admin",
name="System Administrator",
permissions={Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN},
resources=["*"],
conditions={
"mfa_required": True,
"ip_whitelist": ["10.0.0.0/8", "192.168.0.0/16"]
}
),
Role(
role_id="security_analyst",
name="Security Analyst",
permissions={Permission.READ},
resources=["security/*", "logs/*", "alerts/*"],
conditions={
"mfa_required": True
}
)
]Data Protection Controls
# data_protection.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
import base64
import os
from typing import Dict, Optional
from dataclasses import dataclass
@dataclass
class DataClassification:
level: str # public, internal, confidential, restricted
retention_days: int
encryption_required: bool
access_logging: bool
dlp_enabled: bool
class DataProtection:
"""Data protection controls for NIST PR.DS"""
CLASSIFICATIONS = {
"public": DataClassification(
level="public",
retention_days=365,
encryption_required=False,
access_logging=False,
dlp_enabled=False
),
"internal": DataClassification(
level="internal",
retention_days=730,
encryption_required=True,
access_logging=True,
dlp_enabled=False
),
"confidential": DataClassification(
level="confidential",
retention_days=2555, # 7 years
encryption_required=True,
access_logging=True,
dlp_enabled=True
),
"restricted": DataClassification(
level="restricted",
retention_days=2555,
encryption_required=True,
access_logging=True,
dlp_enabled=True
)
}
def __init__(self, master_key: bytes):
self.master_key = master_key
self._key_cache = {}
def encrypt_data(
self,
data: bytes,
classification: str,
context: str
) -> Dict:
"""Encrypt data based on classification"""
config = self.CLASSIFICATIONS.get(classification)
if not config:
raise ValueError(f"Unknown classification: {classification}")
if not config.encryption_required:
return {"encrypted": False, "data": data}
# Derive context-specific key
key = self._derive_key(context)
fernet = Fernet(key)
encrypted = fernet.encrypt(data)
return {
"encrypted": True,
"data": encrypted,
"context": context,
"classification": classification,
"algorithm": "AES-256-GCM"
}
def decrypt_data(
self,
encrypted_data: Dict
) -> bytes:
"""Decrypt data"""
if not encrypted_data.get("encrypted"):
return encrypted_data["data"]
key = self._derive_key(encrypted_data["context"])
fernet = Fernet(key)
return fernet.decrypt(encrypted_data["data"])
def _derive_key(self, context: str) -> bytes:
"""Derive encryption key from master key and context"""
if context in self._key_cache:
return self._key_cache[context]
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=context.encode(),
iterations=100000,
backend=default_backend()
)
key = base64.urlsafe_b64encode(kdf.derive(self.master_key))
self._key_cache[context] = key
return key
def scan_for_sensitive_data(self, content: str) -> Dict:
"""DLP scan for sensitive data patterns"""
import re
patterns = {
"credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b(?:\+1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b",
"api_key": r"\b(?:api[_-]?key|apikey|api[_-]?secret)[\"']?\s*[:=]\s*[\"']?[\w-]{20,}",
"aws_key": r"(?:AKIA|ABIA|ACCA|ASIA)[0-9A-Z]{16}"
}
findings = {}
for name, pattern in patterns.items():
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
findings[name] = {
"count": len(matches),
"samples": [self._mask_finding(m) for m in matches[:3]]
}
return {
"has_sensitive_data": len(findings) > 0,
"findings": findings,
"risk_level": self._calculate_risk(findings)
}
def _mask_finding(self, value: str) -> str:
"""Mask sensitive value for logging"""
if len(value) <= 8:
return "*" * len(value)
return value[:4] + "*" * (len(value) - 8) + value[-4:]
def _calculate_risk(self, findings: Dict) -> str:
"""Calculate risk level from findings"""
high_risk = {"credit_card", "ssn", "api_key", "aws_key"}
if any(f in findings for f in high_risk):
return "high"
elif findings:
return "medium"
return "low"DETECT Function Implementation
Continuous Monitoring
# continuous_monitoring.py
from dataclasses import dataclass
from typing import Dict, List, Callable
from datetime import datetime, timedelta
import asyncio
@dataclass
class MonitoringRule:
rule_id: str
name: str
description: str
check_function: Callable
interval_seconds: int
severity: str
enabled: bool = True
@dataclass
class MonitoringAlert:
alert_id: str
rule_id: str
timestamp: datetime
severity: str
message: str
details: Dict
class ContinuousMonitoring:
"""NIST DE.CM continuous monitoring implementation"""
def __init__(self):
self.rules: Dict[str, MonitoringRule] = {}
self.alerts: List[MonitoringAlert] = []
self.metrics = {}
def register_rule(self, rule: MonitoringRule):
"""Register monitoring rule"""
self.rules[rule.rule_id] = rule
async def start_monitoring(self):
"""Start all monitoring tasks"""
tasks = []
for rule in self.rules.values():
if rule.enabled:
tasks.append(self._monitor_rule(rule))
await asyncio.gather(*tasks)
async def _monitor_rule(self, rule: MonitoringRule):
"""Run monitoring rule on schedule"""
while True:
try:
result = await rule.check_function()
if result.get("alert"):
alert = MonitoringAlert(
alert_id=f"{rule.rule_id}-{datetime.utcnow().timestamp()}",
rule_id=rule.rule_id,
timestamp=datetime.utcnow(),
severity=rule.severity,
message=result.get("message", "Alert triggered"),
details=result.get("details", {})
)
self.alerts.append(alert)
await self._send_alert(alert)
# Update metrics
self.metrics[rule.rule_id] = {
"last_check": datetime.utcnow().isoformat(),
"status": result.get("status", "unknown"),
"value": result.get("value")
}
except Exception as e:
self.metrics[rule.rule_id] = {
"last_check": datetime.utcnow().isoformat(),
"status": "error",
"error": str(e)
}
await asyncio.sleep(rule.interval_seconds)
async def _send_alert(self, alert: MonitoringAlert):
"""Send alert to notification channels"""
# Implementation depends on alerting infrastructure
pass
# Example monitoring rules
async def check_failed_logins() -> Dict:
"""Check for excessive failed logins"""
# Query authentication logs
failed_count = 0 # Would query actual logs
threshold = 10
if failed_count > threshold:
return {
"alert": True,
"message": f"High number of failed logins: {failed_count}",
"details": {"count": failed_count, "threshold": threshold}
}
return {"status": "ok", "value": failed_count}
async def check_unauthorized_access() -> Dict:
"""Check for unauthorized access attempts"""
# Query authorization logs for denied access
denied_count = 0 # Would query actual logs
threshold = 5
if denied_count > threshold:
return {
"alert": True,
"message": f"Multiple unauthorized access attempts: {denied_count}",
"details": {"count": denied_count}
}
return {"status": "ok", "value": denied_count}
async def check_security_group_changes() -> Dict:
"""Check for security group modifications"""
import boto3
# Query CloudTrail for security group changes
# This is simplified - real implementation would use CloudTrail
return {"status": "ok", "value": 0}RESPOND and RECOVER Functions
Incident Response Automation
# incident_response.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
import uuid
class IncidentSeverity(Enum):
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
class IncidentStatus(Enum):
DETECTED = "detected"
TRIAGED = "triaged"
CONTAINED = "contained"
ERADICATED = "eradicated"
RECOVERED = "recovered"
CLOSED = "closed"
@dataclass
class Incident:
incident_id: str
title: str
description: str
severity: IncidentSeverity
status: IncidentStatus
detected_at: datetime
affected_systems: List[str]
indicators: List[Dict]
timeline: List[Dict] = field(default_factory=list)
assigned_to: Optional[str] = None
resolved_at: Optional[datetime] = None
class IncidentResponseManager:
"""NIST RS and RC incident response implementation"""
def __init__(self):
self.incidents: Dict[str, Incident] = {}
self.playbooks = {}
def create_incident(
self,
title: str,
description: str,
severity: IncidentSeverity,
affected_systems: List[str],
indicators: List[Dict]
) -> Incident:
"""Create new incident"""
incident = Incident(
incident_id=str(uuid.uuid4()),
title=title,
description=description,
severity=severity,
status=IncidentStatus.DETECTED,
detected_at=datetime.utcnow(),
affected_systems=affected_systems,
indicators=indicators
)
# Add to timeline
incident.timeline.append({
"timestamp": datetime.utcnow().isoformat(),
"action": "incident_created",
"details": {"severity": severity.name}
})
self.incidents[incident.incident_id] = incident
# Auto-assign based on severity
if severity in [IncidentSeverity.CRITICAL, IncidentSeverity.HIGH]:
self._escalate(incident)
return incident
def update_status(
self,
incident_id: str,
new_status: IncidentStatus,
notes: str = None
):
"""Update incident status"""
incident = self.incidents.get(incident_id)
if not incident:
raise ValueError(f"Incident not found: {incident_id}")
old_status = incident.status
incident.status = new_status
incident.timeline.append({
"timestamp": datetime.utcnow().isoformat(),
"action": "status_change",
"details": {
"from": old_status.value,
"to": new_status.value,
"notes": notes
}
})
if new_status == IncidentStatus.CLOSED:
incident.resolved_at = datetime.utcnow()
def execute_playbook(
self,
incident_id: str,
playbook_name: str
) -> List[Dict]:
"""Execute response playbook"""
incident = self.incidents.get(incident_id)
if not incident:
raise ValueError(f"Incident not found: {incident_id}")
playbook = self.playbooks.get(playbook_name)
if not playbook:
raise ValueError(f"Playbook not found: {playbook_name}")
results = []
for step in playbook["steps"]:
result = self._execute_step(incident, step)
results.append(result)
incident.timeline.append({
"timestamp": datetime.utcnow().isoformat(),
"action": "playbook_step",
"details": {
"playbook": playbook_name,
"step": step["name"],
"result": result
}
})
return results
def _execute_step(self, incident: Incident, step: Dict) -> Dict:
"""Execute single playbook step"""
step_type = step["type"]
if step_type == "isolate_system":
return self._isolate_systems(incident.affected_systems)
elif step_type == "block_indicators":
return self._block_indicators(incident.indicators)
elif step_type == "collect_evidence":
return self._collect_evidence(incident)
elif step_type == "notify_stakeholders":
return self._notify_stakeholders(incident, step.get("stakeholders", []))
return {"status": "unknown_step_type"}
def _isolate_systems(self, systems: List[str]) -> Dict:
"""Isolate affected systems"""
# Implementation would call network/EDR APIs
return {
"status": "success",
"isolated_systems": systems
}
def _block_indicators(self, indicators: List[Dict]) -> Dict:
"""Block malicious indicators"""
blocked = []
for indicator in indicators:
# Block IPs, domains, hashes, etc.
blocked.append(indicator)
return {
"status": "success",
"blocked_indicators": len(blocked)
}
def _collect_evidence(self, incident: Incident) -> Dict:
"""Collect forensic evidence"""
evidence = []
for system in incident.affected_systems:
# Collect logs, memory dumps, etc.
evidence.append({
"system": system,
"collected_at": datetime.utcnow().isoformat(),
"types": ["logs", "network_captures"]
})
return {
"status": "success",
"evidence_collected": len(evidence)
}
def _notify_stakeholders(
self,
incident: Incident,
stakeholders: List[str]
) -> Dict:
"""Notify relevant stakeholders"""
# Send notifications
return {
"status": "success",
"notified": stakeholders
}
def _escalate(self, incident: Incident):
"""Escalate critical incident"""
incident.timeline.append({
"timestamp": datetime.utcnow().isoformat(),
"action": "escalated",
"details": {"reason": f"Severity: {incident.severity.name}"}
})Summary
NIST CSF implementation requires systematic coverage of all core functions:
- GOVERN: Establish cybersecurity governance and risk management
- IDENTIFY: Maintain asset inventory and understand risk
- PROTECT: Implement access controls and data protection
- DETECT: Deploy continuous monitoring and anomaly detection
- RESPOND: Establish incident response capabilities
- RECOVER: Plan and execute recovery procedures
Use these technical implementations as a foundation, customizing for your organization's specific risk profile and compliance requirements.