Compliance

NIST Cybersecurity Framework: ghid implementare

Nicu Constantin
--15 min lectura
#NIST CSF#cybersecurity#compliance#risk management#security framework

NIST Cybersecurity Framework ofera o abordare completa pentru gestionarea riscurilor de securitate cibernetica. Acest ghid acopera implementarea tehnica a CSF 2.0 pe toate cele cinci functii de baza cu exemple practice de cod si strategii de automatizare.

Privire de Ansamblu asupra Framework-ului

Functiile de Baza CSF 2.0

# nist_csf_framework.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime
 
class CSFFunction(Enum):
    GOVERN = "GV"  # New in CSF 2.0
    IDENTIFY = "ID"
    PROTECT = "PR"
    DETECT = "DE"
    RESPOND = "RS"
    RECOVER = "RC"
 
class ImplementationTier(Enum):
    PARTIAL = 1
    RISK_INFORMED = 2
    REPEATABLE = 3
    ADAPTIVE = 4
 
@dataclass
class Control:
    control_id: str
    function: CSFFunction
    category: str
    subcategory: str
    description: str
    implementation_status: str
    evidence: List[str] = field(default_factory=list)
    gaps: List[str] = field(default_factory=list)
    remediation_plan: Optional[str] = None
 
@dataclass
class CSFProfile:
    profile_id: str
    name: str
    description: str
    target_tier: ImplementationTier
    controls: List[Control]
    created_at: datetime
    last_assessed: Optional[datetime] = None
 
class CSFAssessment:
    """NIST CSF compliance assessment engine"""
 
    def __init__(self):
        self.controls = self._load_controls()
        self.current_profile: Optional[CSFProfile] = None
 
    def _load_controls(self) -> Dict[str, Control]:
        """Load CSF control catalog"""
        # Abbreviated control set - full implementation would include all 106 subcategories
        controls = {
            # GOVERN function (new in 2.0)
            "GV.OC-01": Control(
                control_id="GV.OC-01",
                function=CSFFunction.GOVERN,
                category="Organizational Context",
                subcategory="GV.OC-01",
                description="The organizational mission is understood and informs cybersecurity risk management",
                implementation_status="not_started",
                evidence=[]
            ),
 
            # IDENTIFY function
            "ID.AM-01": Control(
                control_id="ID.AM-01",
                function=CSFFunction.IDENTIFY,
                category="Asset Management",
                subcategory="ID.AM-01",
                description="Inventories of hardware managed by the organization are maintained",
                implementation_status="not_started",
                evidence=[]
            ),
            "ID.AM-02": Control(
                control_id="ID.AM-02",
                function=CSFFunction.IDENTIFY,
                category="Asset Management",
                subcategory="ID.AM-02",
                description="Inventories of software, services, and systems managed by the organization are maintained",
                implementation_status="not_started",
                evidence=[]
            ),
            "ID.RA-01": Control(
                control_id="ID.RA-01",
                function=CSFFunction.IDENTIFY,
                category="Risk Assessment",
                subcategory="ID.RA-01",
                description="Vulnerabilities in assets are identified, validated, and recorded",
                implementation_status="not_started",
                evidence=[]
            ),
 
            # PROTECT function
            "PR.AA-01": Control(
                control_id="PR.AA-01",
                function=CSFFunction.PROTECT,
                category="Identity Management and Access Control",
                subcategory="PR.AA-01",
                description="Identities and credentials for authorized users, services, and hardware are managed",
                implementation_status="not_started",
                evidence=[]
            ),
            "PR.DS-01": Control(
                control_id="PR.DS-01",
                function=CSFFunction.PROTECT,
                category="Data Security",
                subcategory="PR.DS-01",
                description="Data-at-rest is protected",
                implementation_status="not_started",
                evidence=[]
            ),
 
            # DETECT function
            "DE.CM-01": Control(
                control_id="DE.CM-01",
                function=CSFFunction.DETECT,
                category="Continuous Monitoring",
                subcategory="DE.CM-01",
                description="Networks and network services are monitored to find potentially adverse events",
                implementation_status="not_started",
                evidence=[]
            ),
 
            # RESPOND function
            "RS.MA-01": Control(
                control_id="RS.MA-01",
                function=CSFFunction.RESPOND,
                category="Incident Management",
                subcategory="RS.MA-01",
                description="The incident response plan is executed in coordination with relevant third parties",
                implementation_status="not_started",
                evidence=[]
            ),
 
            # RECOVER function
            "RC.RP-01": Control(
                control_id="RC.RP-01",
                function=CSFFunction.RECOVER,
                category="Incident Recovery Plan Execution",
                subcategory="RC.RP-01",
                description="The recovery portion of the incident response plan is executed",
                implementation_status="not_started",
                evidence=[]
            )
        }
        return controls
 
    def assess_control(
        self,
        control_id: str,
        status: str,
        evidence: List[str],
        gaps: List[str] = None
    ):
        """Assess a specific control"""
        if control_id not in self.controls:
            raise ValueError(f"Unknown control: {control_id}")
 
        control = self.controls[control_id]
        control.implementation_status = status
        control.evidence = evidence
        control.gaps = gaps or []
 
    def generate_gap_analysis(self) -> Dict:
        """Generate gap analysis report"""
        by_function = {}
 
        for control in self.controls.values():
            func = control.function.value
            if func not in by_function:
                by_function[func] = {
                    "total": 0,
                    "implemented": 0,
                    "partial": 0,
                    "not_implemented": 0,
                    "gaps": []
                }
 
            by_function[func]["total"] += 1
 
            if control.implementation_status == "implemented":
                by_function[func]["implemented"] += 1
            elif control.implementation_status == "partial":
                by_function[func]["partial"] += 1
            else:
                by_function[func]["not_implemented"] += 1
                by_function[func]["gaps"].extend(control.gaps)
 
        return {
            "assessment_date": datetime.utcnow().isoformat(),
            "by_function": by_function,
            "overall_score": self._calculate_score()
        }
 
    def _calculate_score(self) -> float:
        """Calculate overall compliance score"""
        total = len(self.controls)
        implemented = sum(
            1 for c in self.controls.values()
            if c.implementation_status == "implemented"
        )
        partial = sum(
            1 for c in self.controls.values()
            if c.implementation_status == "partial"
        )
 
        return (implemented + partial * 0.5) / total * 100

Implementarea Functiei IDENTIFY

Automatizarea Inventarului de Active

# asset_inventory.py
import subprocess
import json
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
import boto3
import requests
 
@dataclass
class Asset:
    asset_id: str
    asset_type: str  # hardware, software, data, service
    name: str
    owner: str
    criticality: str  # critical, high, medium, low
    classification: str  # public, internal, confidential, restricted
    location: str
    discovered_at: datetime
    last_seen: datetime
    attributes: Dict
 
class AssetInventory:
    """Automated asset discovery and inventory management"""
 
    def __init__(self, config: Dict):
        self.config = config
        self.assets = {}
 
    def discover_aws_assets(self) -> List[Asset]:
        """Discover AWS infrastructure assets"""
        assets = []
 
        # EC2 instances
        ec2 = boto3.client('ec2')
        instances = ec2.describe_instances()
 
        for reservation in instances['Reservations']:
            for instance in reservation['Instances']:
                tags = {t['Key']: t['Value'] for t in instance.get('Tags', [])}
 
                asset = Asset(
                    asset_id=instance['InstanceId'],
                    asset_type='hardware',
                    name=tags.get('Name', instance['InstanceId']),
                    owner=tags.get('Owner', 'unknown'),
                    criticality=tags.get('Criticality', 'medium'),
                    classification=tags.get('Classification', 'internal'),
                    location=instance['Placement']['AvailabilityZone'],
                    discovered_at=datetime.utcnow(),
                    last_seen=datetime.utcnow(),
                    attributes={
                        'instance_type': instance['InstanceType'],
                        'state': instance['State']['Name'],
                        'private_ip': instance.get('PrivateIpAddress'),
                        'public_ip': instance.get('PublicIpAddress'),
                        'vpc_id': instance.get('VpcId'),
                        'security_groups': [
                            sg['GroupId'] for sg in instance.get('SecurityGroups', [])
                        ]
                    }
                )
                assets.append(asset)
 
        # RDS databases
        rds = boto3.client('rds')
        databases = rds.describe_db_instances()
 
        for db in databases['DBInstances']:
            asset = Asset(
                asset_id=db['DBInstanceIdentifier'],
                asset_type='data',
                name=db['DBInstanceIdentifier'],
                owner='database-team',
                criticality='critical',
                classification='confidential',
                location=db['AvailabilityZone'],
                discovered_at=datetime.utcnow(),
                last_seen=datetime.utcnow(),
                attributes={
                    'engine': db['Engine'],
                    'engine_version': db['EngineVersion'],
                    'instance_class': db['DBInstanceClass'],
                    'storage_encrypted': db['StorageEncrypted'],
                    'multi_az': db['MultiAZ']
                }
            )
            assets.append(asset)
 
        # S3 buckets
        s3 = boto3.client('s3')
        buckets = s3.list_buckets()
 
        for bucket in buckets['Buckets']:
            # Get bucket encryption status
            try:
                encryption = s3.get_bucket_encryption(Bucket=bucket['Name'])
                encrypted = True
            except:
                encrypted = False
 
            asset = Asset(
                asset_id=bucket['Name'],
                asset_type='data',
                name=bucket['Name'],
                owner='unknown',
                criticality='high',
                classification='internal',
                location='global',
                discovered_at=datetime.utcnow(),
                last_seen=datetime.utcnow(),
                attributes={
                    'creation_date': bucket['CreationDate'].isoformat(),
                    'encrypted': encrypted
                }
            )
            assets.append(asset)
 
        return assets
 
    def discover_software_assets(self) -> List[Asset]:
        """Discover software dependencies from package managers"""
        assets = []
 
        # NPM packages
        try:
            result = subprocess.run(
                ['npm', 'list', '--json', '--all'],
                capture_output=True,
                text=True
            )
            npm_deps = json.loads(result.stdout)
            assets.extend(self._parse_npm_deps(npm_deps.get('dependencies', {})))
        except Exception as e:
            print(f"NPM discovery failed: {e}")
 
        # Python packages
        try:
            result = subprocess.run(
                ['pip', 'list', '--format=json'],
                capture_output=True,
                text=True
            )
            pip_deps = json.loads(result.stdout)
            for pkg in pip_deps:
                asset = Asset(
                    asset_id=f"pip:{pkg['name']}:{pkg['version']}",
                    asset_type='software',
                    name=pkg['name'],
                    owner='development-team',
                    criticality='medium',
                    classification='internal',
                    location='application',
                    discovered_at=datetime.utcnow(),
                    last_seen=datetime.utcnow(),
                    attributes={
                        'version': pkg['version'],
                        'package_manager': 'pip'
                    }
                )
                assets.append(asset)
        except Exception as e:
            print(f"Pip discovery failed: {e}")
 
        return assets
 
    def _parse_npm_deps(self, deps: Dict, depth: int = 0) -> List[Asset]:
        """Parse NPM dependency tree"""
        assets = []
 
        for name, info in deps.items():
            version = info.get('version', 'unknown')
            asset = Asset(
                asset_id=f"npm:{name}:{version}",
                asset_type='software',
                name=name,
                owner='development-team',
                criticality='medium' if depth == 0 else 'low',
                classification='internal',
                location='application',
                discovered_at=datetime.utcnow(),
                last_seen=datetime.utcnow(),
                attributes={
                    'version': version,
                    'package_manager': 'npm',
                    'depth': depth,
                    'direct_dependency': depth == 0
                }
            )
            assets.append(asset)
 
            # Recurse for transitive dependencies
            if 'dependencies' in info:
                assets.extend(self._parse_npm_deps(info['dependencies'], depth + 1))
 
        return assets
 
    def generate_inventory_report(self) -> Dict:
        """Generate comprehensive inventory report"""
        all_assets = list(self.assets.values())
 
        return {
            "report_date": datetime.utcnow().isoformat(),
            "total_assets": len(all_assets),
            "by_type": self._group_by(all_assets, 'asset_type'),
            "by_criticality": self._group_by(all_assets, 'criticality'),
            "by_classification": self._group_by(all_assets, 'classification'),
            "assets": [
                {
                    "id": a.asset_id,
                    "name": a.name,
                    "type": a.asset_type,
                    "criticality": a.criticality
                }
                for a in all_assets
            ]
        }
 
    def _group_by(self, assets: List[Asset], field: str) -> Dict[str, int]:
        """Group assets by field"""
        groups = {}
        for asset in assets:
            value = getattr(asset, field)
            groups[value] = groups.get(value, 0) + 1
        return groups

Implementarea Functiei PROTECT

Implementarea Controlului Accesului

# access_control.py
from dataclasses import dataclass
from typing import Dict, List, Optional, Set
from enum import Enum
from datetime import datetime, timedelta
import hashlib
import secrets
 
class Permission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"
 
@dataclass
class Role:
    role_id: str
    name: str
    permissions: Set[Permission]
    resources: List[str]  # Resource patterns
    conditions: Dict  # Additional access conditions
 
@dataclass
class User:
    user_id: str
    email: str
    roles: List[str]
    attributes: Dict  # For ABAC
    mfa_enabled: bool
    last_login: Optional[datetime]
    password_changed: Optional[datetime]
 
class AccessControlSystem:
    """Role-based and attribute-based access control"""
 
    def __init__(self):
        self.roles: Dict[str, Role] = {}
        self.users: Dict[str, User] = {}
        self.access_logs = []
 
    def create_role(self, role: Role):
        """Create a role"""
        self.roles[role.role_id] = role
 
    def assign_role(self, user_id: str, role_id: str):
        """Assign role to user"""
        if user_id not in self.users:
            raise ValueError(f"User not found: {user_id}")
        if role_id not in self.roles:
            raise ValueError(f"Role not found: {role_id}")
 
        user = self.users[user_id]
        if role_id not in user.roles:
            user.roles.append(role_id)
 
    def check_access(
        self,
        user_id: str,
        resource: str,
        permission: Permission,
        context: Dict = None
    ) -> bool:
        """Check if user has access to resource"""
        if user_id not in self.users:
            self._log_access(user_id, resource, permission, False, "User not found")
            return False
 
        user = self.users[user_id]
        context = context or {}
 
        # Check each role
        for role_id in user.roles:
            role = self.roles.get(role_id)
            if not role:
                continue
 
            # Check permission
            if permission not in role.permissions:
                continue
 
            # Check resource pattern
            if not self._matches_resource(resource, role.resources):
                continue
 
            # Check conditions (ABAC)
            if not self._evaluate_conditions(role.conditions, user, context):
                continue
 
            # Access granted
            self._log_access(user_id, resource, permission, True, f"Role: {role_id}")
            return True
 
        # Access denied
        self._log_access(user_id, resource, permission, False, "No matching role")
        return False
 
    def _matches_resource(self, resource: str, patterns: List[str]) -> bool:
        """Check if resource matches any pattern"""
        import fnmatch
        for pattern in patterns:
            if fnmatch.fnmatch(resource, pattern):
                return True
        return False
 
    def _evaluate_conditions(
        self,
        conditions: Dict,
        user: User,
        context: Dict
    ) -> bool:
        """Evaluate ABAC conditions"""
        for condition_key, expected_value in conditions.items():
            if condition_key == "mfa_required" and expected_value:
                if not user.mfa_enabled or not context.get("mfa_verified"):
                    return False
 
            elif condition_key == "ip_whitelist":
                if context.get("ip") not in expected_value:
                    return False
 
            elif condition_key == "time_range":
                now = datetime.utcnow()
                start = datetime.strptime(expected_value["start"], "%H:%M").time()
                end = datetime.strptime(expected_value["end"], "%H:%M").time()
                if not (start <= now.time() <= end):
                    return False
 
            elif condition_key.startswith("user."):
                attr = condition_key[5:]
                if user.attributes.get(attr) != expected_value:
                    return False
 
        return True
 
    def _log_access(
        self,
        user_id: str,
        resource: str,
        permission: Permission,
        granted: bool,
        reason: str
    ):
        """Log access decision"""
        self.access_logs.append({
            "timestamp": datetime.utcnow().isoformat(),
            "user_id": user_id,
            "resource": resource,
            "permission": permission.value,
            "granted": granted,
            "reason": reason
        })
 
 
# NIST-compliant role definitions
def create_nist_compliant_roles() -> List[Role]:
    """Create roles following least privilege principle"""
    return [
        Role(
            role_id="viewer",
            name="Read-Only Viewer",
            permissions={Permission.READ},
            resources=["*"],
            conditions={}
        ),
        Role(
            role_id="operator",
            name="System Operator",
            permissions={Permission.READ, Permission.WRITE},
            resources=["systems/*", "logs/*"],
            conditions={
                "mfa_required": True,
                "time_range": {"start": "06:00", "end": "22:00"}
            }
        ),
        Role(
            role_id="admin",
            name="System Administrator",
            permissions={Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN},
            resources=["*"],
            conditions={
                "mfa_required": True,
                "ip_whitelist": ["10.0.0.0/8", "192.168.0.0/16"]
            }
        ),
        Role(
            role_id="security_analyst",
            name="Security Analyst",
            permissions={Permission.READ},
            resources=["security/*", "logs/*", "alerts/*"],
            conditions={
                "mfa_required": True
            }
        )
    ]

Controale de Protectie a Datelor

# data_protection.py
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
import base64
import os
from typing import Dict, Optional
from dataclasses import dataclass
 
@dataclass
class DataClassification:
    level: str  # public, internal, confidential, restricted
    retention_days: int
    encryption_required: bool
    access_logging: bool
    dlp_enabled: bool
 
class DataProtection:
    """Data protection controls for NIST PR.DS"""
 
    CLASSIFICATIONS = {
        "public": DataClassification(
            level="public",
            retention_days=365,
            encryption_required=False,
            access_logging=False,
            dlp_enabled=False
        ),
        "internal": DataClassification(
            level="internal",
            retention_days=730,
            encryption_required=True,
            access_logging=True,
            dlp_enabled=False
        ),
        "confidential": DataClassification(
            level="confidential",
            retention_days=2555,  # 7 years
            encryption_required=True,
            access_logging=True,
            dlp_enabled=True
        ),
        "restricted": DataClassification(
            level="restricted",
            retention_days=2555,
            encryption_required=True,
            access_logging=True,
            dlp_enabled=True
        )
    }
 
    def __init__(self, master_key: bytes):
        self.master_key = master_key
        self._key_cache = {}
 
    def encrypt_data(
        self,
        data: bytes,
        classification: str,
        context: str
    ) -> Dict:
        """Encrypt data based on classification"""
        config = self.CLASSIFICATIONS.get(classification)
        if not config:
            raise ValueError(f"Unknown classification: {classification}")
 
        if not config.encryption_required:
            return {"encrypted": False, "data": data}
 
        # Derive context-specific key
        key = self._derive_key(context)
        fernet = Fernet(key)
 
        encrypted = fernet.encrypt(data)
 
        return {
            "encrypted": True,
            "data": encrypted,
            "context": context,
            "classification": classification,
            "algorithm": "AES-256-GCM"
        }
 
    def decrypt_data(
        self,
        encrypted_data: Dict
    ) -> bytes:
        """Decrypt data"""
        if not encrypted_data.get("encrypted"):
            return encrypted_data["data"]
 
        key = self._derive_key(encrypted_data["context"])
        fernet = Fernet(key)
 
        return fernet.decrypt(encrypted_data["data"])
 
    def _derive_key(self, context: str) -> bytes:
        """Derive encryption key from master key and context"""
        if context in self._key_cache:
            return self._key_cache[context]
 
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=context.encode(),
            iterations=100000,
            backend=default_backend()
        )
 
        key = base64.urlsafe_b64encode(kdf.derive(self.master_key))
        self._key_cache[context] = key
 
        return key
 
    def scan_for_sensitive_data(self, content: str) -> Dict:
        """DLP scan for sensitive data patterns"""
        import re
 
        patterns = {
            "credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
            "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
            "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
            "phone": r"\b(?:\+1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b",
            "api_key": r"\b(?:api[_-]?key|apikey|api[_-]?secret)[\"']?\s*[:=]\s*[\"']?[\w-]{20,}",
            "aws_key": r"(?:AKIA|ABIA|ACCA|ASIA)[0-9A-Z]{16}"
        }
 
        findings = {}
        for name, pattern in patterns.items():
            matches = re.findall(pattern, content, re.IGNORECASE)
            if matches:
                findings[name] = {
                    "count": len(matches),
                    "samples": [self._mask_finding(m) for m in matches[:3]]
                }
 
        return {
            "has_sensitive_data": len(findings) > 0,
            "findings": findings,
            "risk_level": self._calculate_risk(findings)
        }
 
    def _mask_finding(self, value: str) -> str:
        """Mask sensitive value for logging"""
        if len(value) <= 8:
            return "*" * len(value)
        return value[:4] + "*" * (len(value) - 8) + value[-4:]
 
    def _calculate_risk(self, findings: Dict) -> str:
        """Calculate risk level from findings"""
        high_risk = {"credit_card", "ssn", "api_key", "aws_key"}
 
        if any(f in findings for f in high_risk):
            return "high"
        elif findings:
            return "medium"
        return "low"

Implementarea Functiei DETECT

Monitorizare Continua

# continuous_monitoring.py
from dataclasses import dataclass
from typing import Dict, List, Callable
from datetime import datetime, timedelta
import asyncio
 
@dataclass
class MonitoringRule:
    rule_id: str
    name: str
    description: str
    check_function: Callable
    interval_seconds: int
    severity: str
    enabled: bool = True
 
@dataclass
class MonitoringAlert:
    alert_id: str
    rule_id: str
    timestamp: datetime
    severity: str
    message: str
    details: Dict
 
class ContinuousMonitoring:
    """NIST DE.CM continuous monitoring implementation"""
 
    def __init__(self):
        self.rules: Dict[str, MonitoringRule] = {}
        self.alerts: List[MonitoringAlert] = []
        self.metrics = {}
 
    def register_rule(self, rule: MonitoringRule):
        """Register monitoring rule"""
        self.rules[rule.rule_id] = rule
 
    async def start_monitoring(self):
        """Start all monitoring tasks"""
        tasks = []
        for rule in self.rules.values():
            if rule.enabled:
                tasks.append(self._monitor_rule(rule))
 
        await asyncio.gather(*tasks)
 
    async def _monitor_rule(self, rule: MonitoringRule):
        """Run monitoring rule on schedule"""
        while True:
            try:
                result = await rule.check_function()
 
                if result.get("alert"):
                    alert = MonitoringAlert(
                        alert_id=f"{rule.rule_id}-{datetime.utcnow().timestamp()}",
                        rule_id=rule.rule_id,
                        timestamp=datetime.utcnow(),
                        severity=rule.severity,
                        message=result.get("message", "Alert triggered"),
                        details=result.get("details", {})
                    )
                    self.alerts.append(alert)
                    await self._send_alert(alert)
 
                # Update metrics
                self.metrics[rule.rule_id] = {
                    "last_check": datetime.utcnow().isoformat(),
                    "status": result.get("status", "unknown"),
                    "value": result.get("value")
                }
 
            except Exception as e:
                self.metrics[rule.rule_id] = {
                    "last_check": datetime.utcnow().isoformat(),
                    "status": "error",
                    "error": str(e)
                }
 
            await asyncio.sleep(rule.interval_seconds)
 
    async def _send_alert(self, alert: MonitoringAlert):
        """Send alert to notification channels"""
        # Implementation depends on alerting infrastructure
        pass
 
# Example monitoring rules
async def check_failed_logins() -> Dict:
    """Check for excessive failed logins"""
    # Query authentication logs
    failed_count = 0  # Would query actual logs
 
    threshold = 10
    if failed_count > threshold:
        return {
            "alert": True,
            "message": f"High number of failed logins: {failed_count}",
            "details": {"count": failed_count, "threshold": threshold}
        }
 
    return {"status": "ok", "value": failed_count}
 
async def check_unauthorized_access() -> Dict:
    """Check for unauthorized access attempts"""
    # Query authorization logs for denied access
    denied_count = 0  # Would query actual logs
 
    threshold = 5
    if denied_count > threshold:
        return {
            "alert": True,
            "message": f"Multiple unauthorized access attempts: {denied_count}",
            "details": {"count": denied_count}
        }
 
    return {"status": "ok", "value": denied_count}
 
async def check_security_group_changes() -> Dict:
    """Check for security group modifications"""
    import boto3
 
    # Query CloudTrail for security group changes
    # This is simplified - real implementation would use CloudTrail
 
    return {"status": "ok", "value": 0}

Functiile RESPOND si RECOVER

Automatizarea Raspunsului la Incidente

# incident_response.py
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime
from enum import Enum
import uuid
 
class IncidentSeverity(Enum):
    CRITICAL = 1
    HIGH = 2
    MEDIUM = 3
    LOW = 4
 
class IncidentStatus(Enum):
    DETECTED = "detected"
    TRIAGED = "triaged"
    CONTAINED = "contained"
    ERADICATED = "eradicated"
    RECOVERED = "recovered"
    CLOSED = "closed"
 
@dataclass
class Incident:
    incident_id: str
    title: str
    description: str
    severity: IncidentSeverity
    status: IncidentStatus
    detected_at: datetime
    affected_systems: List[str]
    indicators: List[Dict]
    timeline: List[Dict] = field(default_factory=list)
    assigned_to: Optional[str] = None
    resolved_at: Optional[datetime] = None
 
class IncidentResponseManager:
    """NIST RS and RC incident response implementation"""
 
    def __init__(self):
        self.incidents: Dict[str, Incident] = {}
        self.playbooks = {}
 
    def create_incident(
        self,
        title: str,
        description: str,
        severity: IncidentSeverity,
        affected_systems: List[str],
        indicators: List[Dict]
    ) -> Incident:
        """Create new incident"""
        incident = Incident(
            incident_id=str(uuid.uuid4()),
            title=title,
            description=description,
            severity=severity,
            status=IncidentStatus.DETECTED,
            detected_at=datetime.utcnow(),
            affected_systems=affected_systems,
            indicators=indicators
        )
 
        # Add to timeline
        incident.timeline.append({
            "timestamp": datetime.utcnow().isoformat(),
            "action": "incident_created",
            "details": {"severity": severity.name}
        })
 
        self.incidents[incident.incident_id] = incident
 
        # Auto-assign based on severity
        if severity in [IncidentSeverity.CRITICAL, IncidentSeverity.HIGH]:
            self._escalate(incident)
 
        return incident
 
    def update_status(
        self,
        incident_id: str,
        new_status: IncidentStatus,
        notes: str = None
    ):
        """Update incident status"""
        incident = self.incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incident not found: {incident_id}")
 
        old_status = incident.status
        incident.status = new_status
 
        incident.timeline.append({
            "timestamp": datetime.utcnow().isoformat(),
            "action": "status_change",
            "details": {
                "from": old_status.value,
                "to": new_status.value,
                "notes": notes
            }
        })
 
        if new_status == IncidentStatus.CLOSED:
            incident.resolved_at = datetime.utcnow()
 
    def execute_playbook(
        self,
        incident_id: str,
        playbook_name: str
    ) -> List[Dict]:
        """Execute response playbook"""
        incident = self.incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incident not found: {incident_id}")
 
        playbook = self.playbooks.get(playbook_name)
        if not playbook:
            raise ValueError(f"Playbook not found: {playbook_name}")
 
        results = []
        for step in playbook["steps"]:
            result = self._execute_step(incident, step)
            results.append(result)
 
            incident.timeline.append({
                "timestamp": datetime.utcnow().isoformat(),
                "action": "playbook_step",
                "details": {
                    "playbook": playbook_name,
                    "step": step["name"],
                    "result": result
                }
            })
 
        return results
 
    def _execute_step(self, incident: Incident, step: Dict) -> Dict:
        """Execute single playbook step"""
        step_type = step["type"]
 
        if step_type == "isolate_system":
            return self._isolate_systems(incident.affected_systems)
        elif step_type == "block_indicators":
            return self._block_indicators(incident.indicators)
        elif step_type == "collect_evidence":
            return self._collect_evidence(incident)
        elif step_type == "notify_stakeholders":
            return self._notify_stakeholders(incident, step.get("stakeholders", []))
 
        return {"status": "unknown_step_type"}
 
    def _isolate_systems(self, systems: List[str]) -> Dict:
        """Isolate affected systems"""
        # Implementation would call network/EDR APIs
        return {
            "status": "success",
            "isolated_systems": systems
        }
 
    def _block_indicators(self, indicators: List[Dict]) -> Dict:
        """Block malicious indicators"""
        blocked = []
        for indicator in indicators:
            # Block IPs, domains, hashes, etc.
            blocked.append(indicator)
 
        return {
            "status": "success",
            "blocked_indicators": len(blocked)
        }
 
    def _collect_evidence(self, incident: Incident) -> Dict:
        """Collect forensic evidence"""
        evidence = []
        for system in incident.affected_systems:
            # Collect logs, memory dumps, etc.
            evidence.append({
                "system": system,
                "collected_at": datetime.utcnow().isoformat(),
                "types": ["logs", "network_captures"]
            })
 
        return {
            "status": "success",
            "evidence_collected": len(evidence)
        }
 
    def _notify_stakeholders(
        self,
        incident: Incident,
        stakeholders: List[str]
    ) -> Dict:
        """Notify relevant stakeholders"""
        # Send notifications
        return {
            "status": "success",
            "notified": stakeholders
        }
 
    def _escalate(self, incident: Incident):
        """Escalate critical incident"""
        incident.timeline.append({
            "timestamp": datetime.utcnow().isoformat(),
            "action": "escalated",
            "details": {"reason": f"Severity: {incident.severity.name}"}
        })

Rezumat

Implementarea NIST CSF necesita acoperire sistematica a tuturor functiilor de baza:

  1. GOVERN: Stabileste guvernanta securitatii cibernetice si managementul riscurilor
  2. IDENTIFY: Mentine inventarul activelor si intelege riscurile
  3. PROTECT: Implementeaza controale de acces si protectia datelor
  4. DETECT: Deployeaza monitorizare continua si detectie de anomalii
  5. RESPOND: Stabileste capabilitati de raspuns la incidente
  6. RECOVER: Planifica si executa proceduri de recuperare

Foloseste aceste implementari tehnice ca fundatie, personalizandu-le pentru profilul de risc si cerintele de conformitate specifice organizatiei tale.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.