NIST Cybersecurity Framework ofera o abordare completa pentru gestionarea riscurilor de securitate cibernetica. Acest ghid acopera implementarea tehnica a CSF 2.0 pe toate cele cinci functii de baza cu exemple practice de cod si strategii de automatizare.
Privire de Ansamblu asupra Framework-ului
Functiile de Baza CSF 2.0
# nist_csf_framework.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
class CSFFunction(Enum):
GOVERN = "GV" # New in CSF 2.0
IDENTIFY = "ID"
PROTECT = "PR"
DETECT = "DE"
RESPOND = "RS"
RECOVER = "RC"
class ImplementationTier(Enum):
PARTIAL = 1
RISK_INFORMED = 2
REPEATABLE = 3
ADAPTIVE = 4
@dataclass
class Control:
control_id: str
function: CSFFunction
category: str
subcategory: str
description: str
implementation_status: str
evidence: List[str] = field(default_factory=list)
gaps: List[str] = field(default_factory=list)
remediation_plan: Optional[str] = None
@dataclass
class CSFProfile:
profile_id: str
name: str
description: str
target_tier: ImplementationTier
controls: List[Control]
created_at: datetime
last_assessed: Optional[datetime] = None
class CSFAssessment:
"""NIST CSF compliance assessment engine"""
def __init__(self):
self.controls = self._load_controls()
self.current_profile: Optional[CSFProfile] = None
def _load_controls(self) -> Dict[str, Control]:
"""Load CSF control catalog"""
# Abbreviated control set - full implementation would include all 106 subcategories
controls = {
# GOVERN function (new in 2.0)
"GV.OC-01": Control(
control_id="GV.OC-01",
function=CSFFunction.GOVERN,
category="Organizational Context",
subcategory="GV.OC-01",
description="The organizational mission is understood and informs cybersecurity risk management",
implementation_status="not_started",
evidence=[]
),
# IDENTIFY function
"ID.AM-01": Control(
control_id="ID.AM-01",
function=CSFFunction.IDENTIFY,
category="Asset Management",
subcategory="ID.AM-01",
description="Inventories of hardware managed by the organization are maintained",
implementation_status="not_started",
evidence=[]
),
"ID.AM-02": Control(
control_id="ID.AM-02",
function=CSFFunction.IDENTIFY,
category="Asset Management",
subcategory="ID.AM-02",
description="Inventories of software, services, and systems managed by the organization are maintained",
implementation_status="not_started",
evidence=[]
),
"ID.RA-01": Control(
control_id="ID.RA-01",
function=CSFFunction.IDENTIFY,
category="Risk Assessment",
subcategory="ID.RA-01",
description="Vulnerabilities in assets are identified, validated, and recorded",
implementation_status="not_started",
evidence=[]
),
# PROTECT function
"PR.AA-01": Control(
control_id="PR.AA-01",
function=CSFFunction.PROTECT,
category="Identity Management and Access Control",
subcategory="PR.AA-01",
description="Identities and credentials for authorized users, services, and hardware are managed",
implementation_status="not_started",
evidence=[]
),
"PR.DS-01": Control(
control_id="PR.DS-01",
function=CSFFunction.PROTECT,
category="Data Security",
subcategory="PR.DS-01",
description="Data-at-rest is protected",
implementation_status="not_started",
evidence=[]
),
# DETECT function
"DE.CM-01": Control(
control_id="DE.CM-01",
function=CSFFunction.DETECT,
category="Continuous Monitoring",
subcategory="DE.CM-01",
description="Networks and network services are monitored to find potentially adverse events",
implementation_status="not_started",
evidence=[]
),
# RESPOND function
"RS.MA-01": Control(
control_id="RS.MA-01",
function=CSFFunction.RESPOND,
category="Incident Management",
subcategory="RS.MA-01",
description="The incident response plan is executed in coordination with relevant third parties",
implementation_status="not_started",
evidence=[]
),
# RECOVER function
"RC.RP-01": Control(
control_id="RC.RP-01",
function=CSFFunction.RECOVER,
category="Incident Recovery Plan Execution",
subcategory="RC.RP-01",
description="The recovery portion of the incident response plan is executed",
implementation_status="not_started",
evidence=[]
)
}
return controls
def assess_control(
self,
control_id: str,
status: str,
evidence: List[str],
gaps: List[str] = None
):
"""Assess a specific control"""
if control_id not in self.controls:
raise ValueError(f"Unknown control: {control_id}")
control = self.controls[control_id]
control.implementation_status = status
control.evidence = evidence
control.gaps = gaps or []
def generate_gap_analysis(self) -> Dict:
"""Generate gap analysis report"""
by_function = {}
for control in self.controls.values():
func = control.function.value
if func not in by_function:
by_function[func] = {
"total": 0,
"implemented": 0,
"partial": 0,
"not_implemented": 0,
"gaps": []
}
by_function[func]["total"] += 1
if control.implementation_status == "implemented":
by_function[func]["implemented"] += 1
elif control.implementation_status == "partial":
by_function[func]["partial"] += 1
else:
by_function[func]["not_implemented"] += 1
by_function[func]["gaps"].extend(control.gaps)
return {
"assessment_date": datetime.utcnow().isoformat(),
"by_function": by_function,
"overall_score": self._calculate_score()
}
def _calculate_score(self) -> float:
"""Calculate overall compliance score"""
total = len(self.controls)
implemented = sum(
1 for c in self.controls.values()
if c.implementation_status == "implemented"
)
partial = sum(
1 for c in self.controls.values()
if c.implementation_status == "partial"
)
return (implemented + partial * 0.5) / total * 100Implementarea Functiei IDENTIFY
Automatizarea Inventarului de Active
# asset_inventory.py
import subprocess
import json
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
import boto3
import requests
@dataclass
class Asset:
asset_id: str
asset_type: str # hardware, software, data, service
name: str
owner: str
criticality: str # critical, high, medium, low
classification: str # public, internal, confidential, restricted
location: str
discovered_at: datetime
last_seen: datetime
attributes: Dict
class AssetInventory:
"""Automated asset discovery and inventory management"""
def __init__(self, config: Dict):
self.config = config
self.assets = {}
def discover_aws_assets(self) -> List[Asset]:
"""Discover AWS infrastructure assets"""
assets = []
# EC2 instances
ec2 = boto3.client('ec2')
instances = ec2.describe_instances()
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
tags = {t['Key']: t['Value'] for t in instance.get('Tags', [])}
asset = Asset(
asset_id=instance['InstanceId'],
asset_type='hardware',
name=tags.get('Name', instance['InstanceId']),
owner=tags.get('Owner', 'unknown'),
criticality=tags.get('Criticality', 'medium'),
classification=tags.get('Classification', 'internal'),
location=instance['Placement']['AvailabilityZone'],
discovered_at=datetime.utcnow(),
last_seen=datetime.utcnow(),
attributes={
'instance_type': instance['InstanceType'],
'state': instance['State']['Name'],
'private_ip': instance.get('PrivateIpAddress'),
'public_ip': instance.get('PublicIpAddress'),
'vpc_id': instance.get('VpcId'),
'security_groups': [
sg['GroupId'] for sg in instance.get('SecurityGroups', [])
]
}
)
assets.append(asset)
# RDS databases
rds = boto3.client('rds')
databases = rds.describe_db_instances()
for db in databases['DBInstances']:
asset = Asset(
asset_id=db['DBInstanceIdentifier'],
asset_type='data',
name=db['DBInstanceIdentifier'],
owner='database-team',
criticality='critical',
classification='confidential',
location=db['AvailabilityZone'],
discovered_at=datetime.utcnow(),
last_seen=datetime.utcnow(),
attributes={
'engine': db['Engine'],
'engine_version': db['EngineVersion'],
'instance_class': db['DBInstanceClass'],
'storage_encrypted': db['StorageEncrypted'],
'multi_az': db['MultiAZ']
}
)
assets.append(asset)
# S3 buckets
s3 = boto3.client('s3')
buckets = s3.list_buckets()
for bucket in buckets['Buckets']:
# Get bucket encryption status
try:
encryption = s3.get_bucket_encryption(Bucket=bucket['Name'])
encrypted = True
except:
encrypted = False
asset = Asset(
asset_id=bucket['Name'],
asset_type='data',
name=bucket['Name'],
owner='unknown',
criticality='high',
classification='internal',
location='global',
discovered_at=datetime.utcnow(),
last_seen=datetime.utcnow(),
attributes={
'creation_date': bucket['CreationDate'].isoformat(),
'encrypted': encrypted
}
)
assets.append(asset)
return assets
def discover_software_assets(self) -> List[Asset]:
"""Discover software dependencies from package managers"""
assets = []
# NPM packages
try:
result = subprocess.run(
['npm', 'list', '--json', '--all'],
capture_output=True,
text=True
)
npm_deps = json.loads(result.stdout)
assets.extend(self._parse_npm_deps(npm_deps.get('dependencies', {})))
except Exception as e:
print(f"NPM discovery failed: {e}")
# Python packages
try:
result = subprocess.run(
['pip', 'list', '--format=json'],
capture_output=True,
text=True
)
pip_deps = json.loads(result.stdout)
for pkg in pip_deps:
asset = Asset(
asset_id=f"pip:{pkg['name']}:{pkg['version']}",
asset_type='software',
name=pkg['name'],
owner='development-team',
criticality='medium',
classification='internal',
location='application',
discovered_at=datetime.utcnow(),
last_seen=datetime.utcnow(),
attributes={
'version': pkg['version'],
'package_manager': 'pip'
}
)
assets.append(asset)
except Exception as e:
print(f"Pip discovery failed: {e}")
return assets
def _parse_npm_deps(self, deps: Dict, depth: int = 0) -> List[Asset]:
"""Parse NPM dependency tree"""
assets = []
for name, info in deps.items():
version = info.get('version', 'unknown')
asset = Asset(
asset_id=f"npm:{name}:{version}",
asset_type='software',
name=name,
owner='development-team',
criticality='medium' if depth == 0 else 'low',
classification='internal',
location='application',
discovered_at=datetime.utcnow(),
last_seen=datetime.utcnow(),
attributes={
'version': version,
'package_manager': 'npm',
'depth': depth,
'direct_dependency': depth == 0
}
)
assets.append(asset)
# Recurse for transitive dependencies
if 'dependencies' in info:
assets.extend(self._parse_npm_deps(info['dependencies'], depth + 1))
return assets
def generate_inventory_report(self) -> Dict:
"""Generate comprehensive inventory report"""
all_assets = list(self.assets.values())
return {
"report_date": datetime.utcnow().isoformat(),
"total_assets": len(all_assets),
"by_type": self._group_by(all_assets, 'asset_type'),
"by_criticality": self._group_by(all_assets, 'criticality'),
"by_classification": self._group_by(all_assets, 'classification'),
"assets": [
{
"id": a.asset_id,
"name": a.name,
"type": a.asset_type,
"criticality": a.criticality
}
for a in all_assets
]
}
def _group_by(self, assets: List[Asset], field: str) -> Dict[str, int]:
"""Group assets by field"""
groups = {}
for asset in assets:
value = getattr(asset, field)
groups[value] = groups.get(value, 0) + 1
return groupsImplementarea Functiei PROTECT
Implementarea Controlului Accesului
# access_control.py
from dataclasses import dataclass
from typing import Dict, List, Optional, Set
from enum import Enum
from datetime import datetime, timedelta
import hashlib
import secrets
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
@dataclass
class Role:
role_id: str
name: str
permissions: Set[Permission]
resources: List[str] # Resource patterns
conditions: Dict # Additional access conditions
@dataclass
class User:
user_id: str
email: str
roles: List[str]
attributes: Dict # For ABAC
mfa_enabled: bool
last_login: Optional[datetime]
password_changed: Optional[datetime]
class AccessControlSystem:
"""Role-based and attribute-based access control"""
def __init__(self):
self.roles: Dict[str, Role] = {}
self.users: Dict[str, User] = {}
self.access_logs = []
def create_role(self, role: Role):
"""Create a role"""
self.roles[role.role_id] = role
def assign_role(self, user_id: str, role_id: str):
"""Assign role to user"""
if user_id not in self.users:
raise ValueError(f"User not found: {user_id}")
if role_id not in self.roles:
raise ValueError(f"Role not found: {role_id}")
user = self.users[user_id]
if role_id not in user.roles:
user.roles.append(role_id)
def check_access(
self,
user_id: str,
resource: str,
permission: Permission,
context: Dict = None
) -> bool:
"""Check if user has access to resource"""
if user_id not in self.users:
self._log_access(user_id, resource, permission, False, "User not found")
return False
user = self.users[user_id]
context = context or {}
# Check each role
for role_id in user.roles:
role = self.roles.get(role_id)
if not role:
continue
# Check permission
if permission not in role.permissions:
continue
# Check resource pattern
if not self._matches_resource(resource, role.resources):
continue
# Check conditions (ABAC)
if not self._evaluate_conditions(role.conditions, user, context):
continue
# Access granted
self._log_access(user_id, resource, permission, True, f"Role: {role_id}")
return True
# Access denied
self._log_access(user_id, resource, permission, False, "No matching role")
return False
def _matches_resource(self, resource: str, patterns: List[str]) -> bool:
"""Check if resource matches any pattern"""
import fnmatch
for pattern in patterns:
if fnmatch.fnmatch(resource, pattern):
return True
return False
def _evaluate_conditions(
self,
conditions: Dict,
user: User,
context: Dict
) -> bool:
"""Evaluate ABAC conditions"""
for condition_key, expected_value in conditions.items():
if condition_key == "mfa_required" and expected_value:
if not user.mfa_enabled or not context.get("mfa_verified"):
return False
elif condition_key == "ip_whitelist":
if context.get("ip") not in expected_value:
return False
elif condition_key == "time_range":
now = datetime.utcnow()
start = datetime.strptime(expected_value["start"], "%H:%M").time()
end = datetime.strptime(expected_value["end"], "%H:%M").time()
if not (start <= now.time() <= end):
return False
elif condition_key.startswith("user."):
attr = condition_key[5:]
if user.attributes.get(attr) != expected_value:
return False
return True
def _log_access(
self,
user_id: str,
resource: str,
permission: Permission,
granted: bool,
reason: str
):
"""Log access decision"""
self.access_logs.append({
"timestamp": datetime.utcnow().isoformat(),
"user_id": user_id,
"resource": resource,
"permission": permission.value,
"granted": granted,
"reason": reason
})
# NIST-compliant role definitions
def create_nist_compliant_roles() -> List[Role]:
"""Create roles following least privilege principle"""
return [
Role(
role_id="viewer",
name="Read-Only Viewer",
permissions={Permission.READ},
resources=["*"],
conditions={}
),
Role(
role_id="operator",
name="System Operator",
permissions={Permission.READ, Permission.WRITE},
resources=["systems/*", "logs/*"],
conditions={
"mfa_required": True,
"time_range": {"start": "06:00", "end": "22:00"}
}
),
Role(
role_id="admin",
name="System Administrator",
permissions={Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN},
resources=["*"],
conditions={
"mfa_required": True,
"ip_whitelist": ["10.0.0.0/8", "192.168.0.0/16"]
}
),
Role(
role_id="security_analyst",
name="Security Analyst",
permissions={Permission.READ},
resources=["security/*", "logs/*", "alerts/*"],
conditions={
"mfa_required": True
}
)
]Controale de Protectie a Datelor
# data_protection.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
import base64
import os
from typing import Dict, Optional
from dataclasses import dataclass
@dataclass
class DataClassification:
level: str # public, internal, confidential, restricted
retention_days: int
encryption_required: bool
access_logging: bool
dlp_enabled: bool
class DataProtection:
"""Data protection controls for NIST PR.DS"""
CLASSIFICATIONS = {
"public": DataClassification(
level="public",
retention_days=365,
encryption_required=False,
access_logging=False,
dlp_enabled=False
),
"internal": DataClassification(
level="internal",
retention_days=730,
encryption_required=True,
access_logging=True,
dlp_enabled=False
),
"confidential": DataClassification(
level="confidential",
retention_days=2555, # 7 years
encryption_required=True,
access_logging=True,
dlp_enabled=True
),
"restricted": DataClassification(
level="restricted",
retention_days=2555,
encryption_required=True,
access_logging=True,
dlp_enabled=True
)
}
def __init__(self, master_key: bytes):
self.master_key = master_key
self._key_cache = {}
def encrypt_data(
self,
data: bytes,
classification: str,
context: str
) -> Dict:
"""Encrypt data based on classification"""
config = self.CLASSIFICATIONS.get(classification)
if not config:
raise ValueError(f"Unknown classification: {classification}")
if not config.encryption_required:
return {"encrypted": False, "data": data}
# Derive context-specific key
key = self._derive_key(context)
fernet = Fernet(key)
encrypted = fernet.encrypt(data)
return {
"encrypted": True,
"data": encrypted,
"context": context,
"classification": classification,
"algorithm": "AES-256-GCM"
}
def decrypt_data(
self,
encrypted_data: Dict
) -> bytes:
"""Decrypt data"""
if not encrypted_data.get("encrypted"):
return encrypted_data["data"]
key = self._derive_key(encrypted_data["context"])
fernet = Fernet(key)
return fernet.decrypt(encrypted_data["data"])
def _derive_key(self, context: str) -> bytes:
"""Derive encryption key from master key and context"""
if context in self._key_cache:
return self._key_cache[context]
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=context.encode(),
iterations=100000,
backend=default_backend()
)
key = base64.urlsafe_b64encode(kdf.derive(self.master_key))
self._key_cache[context] = key
return key
def scan_for_sensitive_data(self, content: str) -> Dict:
"""DLP scan for sensitive data patterns"""
import re
patterns = {
"credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b(?:\+1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b",
"api_key": r"\b(?:api[_-]?key|apikey|api[_-]?secret)[\"']?\s*[:=]\s*[\"']?[\w-]{20,}",
"aws_key": r"(?:AKIA|ABIA|ACCA|ASIA)[0-9A-Z]{16}"
}
findings = {}
for name, pattern in patterns.items():
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
findings[name] = {
"count": len(matches),
"samples": [self._mask_finding(m) for m in matches[:3]]
}
return {
"has_sensitive_data": len(findings) > 0,
"findings": findings,
"risk_level": self._calculate_risk(findings)
}
def _mask_finding(self, value: str) -> str:
"""Mask sensitive value for logging"""
if len(value) <= 8:
return "*" * len(value)
return value[:4] + "*" * (len(value) - 8) + value[-4:]
def _calculate_risk(self, findings: Dict) -> str:
"""Calculate risk level from findings"""
high_risk = {"credit_card", "ssn", "api_key", "aws_key"}
if any(f in findings for f in high_risk):
return "high"
elif findings:
return "medium"
return "low"Implementarea Functiei DETECT
Monitorizare Continua
# continuous_monitoring.py
from dataclasses import dataclass
from typing import Dict, List, Callable
from datetime import datetime, timedelta
import asyncio
@dataclass
class MonitoringRule:
rule_id: str
name: str
description: str
check_function: Callable
interval_seconds: int
severity: str
enabled: bool = True
@dataclass
class MonitoringAlert:
alert_id: str
rule_id: str
timestamp: datetime
severity: str
message: str
details: Dict
class ContinuousMonitoring:
"""NIST DE.CM continuous monitoring implementation"""
def __init__(self):
self.rules: Dict[str, MonitoringRule] = {}
self.alerts: List[MonitoringAlert] = []
self.metrics = {}
def register_rule(self, rule: MonitoringRule):
"""Register monitoring rule"""
self.rules[rule.rule_id] = rule
async def start_monitoring(self):
"""Start all monitoring tasks"""
tasks = []
for rule in self.rules.values():
if rule.enabled:
tasks.append(self._monitor_rule(rule))
await asyncio.gather(*tasks)
async def _monitor_rule(self, rule: MonitoringRule):
"""Run monitoring rule on schedule"""
while True:
try:
result = await rule.check_function()
if result.get("alert"):
alert = MonitoringAlert(
alert_id=f"{rule.rule_id}-{datetime.utcnow().timestamp()}",
rule_id=rule.rule_id,
timestamp=datetime.utcnow(),
severity=rule.severity,
message=result.get("message", "Alert triggered"),
details=result.get("details", {})
)
self.alerts.append(alert)
await self._send_alert(alert)
# Update metrics
self.metrics[rule.rule_id] = {
"last_check": datetime.utcnow().isoformat(),
"status": result.get("status", "unknown"),
"value": result.get("value")
}
except Exception as e:
self.metrics[rule.rule_id] = {
"last_check": datetime.utcnow().isoformat(),
"status": "error",
"error": str(e)
}
await asyncio.sleep(rule.interval_seconds)
async def _send_alert(self, alert: MonitoringAlert):
"""Send alert to notification channels"""
# Implementation depends on alerting infrastructure
pass
# Example monitoring rules
async def check_failed_logins() -> Dict:
"""Check for excessive failed logins"""
# Query authentication logs
failed_count = 0 # Would query actual logs
threshold = 10
if failed_count > threshold:
return {
"alert": True,
"message": f"High number of failed logins: {failed_count}",
"details": {"count": failed_count, "threshold": threshold}
}
return {"status": "ok", "value": failed_count}
async def check_unauthorized_access() -> Dict:
"""Check for unauthorized access attempts"""
# Query authorization logs for denied access
denied_count = 0 # Would query actual logs
threshold = 5
if denied_count > threshold:
return {
"alert": True,
"message": f"Multiple unauthorized access attempts: {denied_count}",
"details": {"count": denied_count}
}
return {"status": "ok", "value": denied_count}
async def check_security_group_changes() -> Dict:
"""Check for security group modifications"""
import boto3
# Query CloudTrail for security group changes
# This is simplified - real implementation would use CloudTrail
return {"status": "ok", "value": 0}Functiile RESPOND si RECOVER
Automatizarea Raspunsului la Incidente
# incident_response.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
import uuid
class IncidentSeverity(Enum):
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
class IncidentStatus(Enum):
DETECTED = "detected"
TRIAGED = "triaged"
CONTAINED = "contained"
ERADICATED = "eradicated"
RECOVERED = "recovered"
CLOSED = "closed"
@dataclass
class Incident:
incident_id: str
title: str
description: str
severity: IncidentSeverity
status: IncidentStatus
detected_at: datetime
affected_systems: List[str]
indicators: List[Dict]
timeline: List[Dict] = field(default_factory=list)
assigned_to: Optional[str] = None
resolved_at: Optional[datetime] = None
class IncidentResponseManager:
"""NIST RS and RC incident response implementation"""
def __init__(self):
self.incidents: Dict[str, Incident] = {}
self.playbooks = {}
def create_incident(
self,
title: str,
description: str,
severity: IncidentSeverity,
affected_systems: List[str],
indicators: List[Dict]
) -> Incident:
"""Create new incident"""
incident = Incident(
incident_id=str(uuid.uuid4()),
title=title,
description=description,
severity=severity,
status=IncidentStatus.DETECTED,
detected_at=datetime.utcnow(),
affected_systems=affected_systems,
indicators=indicators
)
# Add to timeline
incident.timeline.append({
"timestamp": datetime.utcnow().isoformat(),
"action": "incident_created",
"details": {"severity": severity.name}
})
self.incidents[incident.incident_id] = incident
# Auto-assign based on severity
if severity in [IncidentSeverity.CRITICAL, IncidentSeverity.HIGH]:
self._escalate(incident)
return incident
def update_status(
self,
incident_id: str,
new_status: IncidentStatus,
notes: str = None
):
"""Update incident status"""
incident = self.incidents.get(incident_id)
if not incident:
raise ValueError(f"Incident not found: {incident_id}")
old_status = incident.status
incident.status = new_status
incident.timeline.append({
"timestamp": datetime.utcnow().isoformat(),
"action": "status_change",
"details": {
"from": old_status.value,
"to": new_status.value,
"notes": notes
}
})
if new_status == IncidentStatus.CLOSED:
incident.resolved_at = datetime.utcnow()
def execute_playbook(
self,
incident_id: str,
playbook_name: str
) -> List[Dict]:
"""Execute response playbook"""
incident = self.incidents.get(incident_id)
if not incident:
raise ValueError(f"Incident not found: {incident_id}")
playbook = self.playbooks.get(playbook_name)
if not playbook:
raise ValueError(f"Playbook not found: {playbook_name}")
results = []
for step in playbook["steps"]:
result = self._execute_step(incident, step)
results.append(result)
incident.timeline.append({
"timestamp": datetime.utcnow().isoformat(),
"action": "playbook_step",
"details": {
"playbook": playbook_name,
"step": step["name"],
"result": result
}
})
return results
def _execute_step(self, incident: Incident, step: Dict) -> Dict:
"""Execute single playbook step"""
step_type = step["type"]
if step_type == "isolate_system":
return self._isolate_systems(incident.affected_systems)
elif step_type == "block_indicators":
return self._block_indicators(incident.indicators)
elif step_type == "collect_evidence":
return self._collect_evidence(incident)
elif step_type == "notify_stakeholders":
return self._notify_stakeholders(incident, step.get("stakeholders", []))
return {"status": "unknown_step_type"}
def _isolate_systems(self, systems: List[str]) -> Dict:
"""Isolate affected systems"""
# Implementation would call network/EDR APIs
return {
"status": "success",
"isolated_systems": systems
}
def _block_indicators(self, indicators: List[Dict]) -> Dict:
"""Block malicious indicators"""
blocked = []
for indicator in indicators:
# Block IPs, domains, hashes, etc.
blocked.append(indicator)
return {
"status": "success",
"blocked_indicators": len(blocked)
}
def _collect_evidence(self, incident: Incident) -> Dict:
"""Collect forensic evidence"""
evidence = []
for system in incident.affected_systems:
# Collect logs, memory dumps, etc.
evidence.append({
"system": system,
"collected_at": datetime.utcnow().isoformat(),
"types": ["logs", "network_captures"]
})
return {
"status": "success",
"evidence_collected": len(evidence)
}
def _notify_stakeholders(
self,
incident: Incident,
stakeholders: List[str]
) -> Dict:
"""Notify relevant stakeholders"""
# Send notifications
return {
"status": "success",
"notified": stakeholders
}
def _escalate(self, incident: Incident):
"""Escalate critical incident"""
incident.timeline.append({
"timestamp": datetime.utcnow().isoformat(),
"action": "escalated",
"details": {"reason": f"Severity: {incident.severity.name}"}
})Rezumat
Implementarea NIST CSF necesita acoperire sistematica a tuturor functiilor de baza:
- GOVERN: Stabileste guvernanta securitatii cibernetice si managementul riscurilor
- IDENTIFY: Mentine inventarul activelor si intelege riscurile
- PROTECT: Implementeaza controale de acces si protectia datelor
- DETECT: Deployeaza monitorizare continua si detectie de anomalii
- RESPOND: Stabileste capabilitati de raspuns la incidente
- RECOVER: Planifica si executa proceduri de recuperare
Foloseste aceste implementari tehnice ca fundatie, personalizandu-le pentru profilul de risc si cerintele de conformitate specifice organizatiei tale.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →