Compliance

Conformitate DORA pentru echipe de dezvoltare

Nicu Constantin
--14 min lectura
#DORA#compliance#financial services#ICT risk#resilience

Conformitate DORA pentru Dezvoltarea Software: Ghid Digital Operational Resilience Act

Digital Operational Resilience Act (DORA) stabileste cerinte uniforme pentru managementul riscurilor ICT in sectorul financiar din UE. Acest ghid acopera implementarea conformitatii DORA pentru echipele de dezvoltare software.

Prezentare Generala a Cerintelor DORA

Pilonii Fundamentali

| Pilon | Descriere | Cerinte Cheie | |-------|-----------|---------------| | Managementul Riscurilor ICT | Framework comprehensiv de risc | Identificarea, protectia, detectia, raspunsul, recuperarea riscurilor | | Raportarea Incidentelor | Clasificare standardizata a incidentelor | Incidentele majore raportate in 24 de ore | | Testarea Reziliientei | Testare regulata a sistemelor ICT | Testare de penetrare ghidata de amenintari pentru entitatile semnificative | | Riscul Tertilor | Managementul furnizorilor de servicii ICT | Due diligence, contracte, strategii de iesire | | Partajarea Informatiilor | Partajarea informatiilor despre amenintari | Participare voluntara in aranjamente de partajare |

Framework de Management al Riscurilor ICT

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from enum import Enum
import json
 
class RiskLevel(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
 
class RiskCategory(Enum):
    AVAILABILITY = "availability"
    INTEGRITY = "integrity"
    CONFIDENTIALITY = "confidentiality"
    CONTINUITY = "continuity"
 
class AssetType(Enum):
    CRITICAL_FUNCTION = "critical_function"
    SUPPORTING_SYSTEM = "supporting_system"
    DATA_ASSET = "data_asset"
    THIRD_PARTY_SERVICE = "third_party_service"
 
@dataclass
class ICTAsset:
    id: str
    name: str
    asset_type: AssetType
    description: str
    owner: str
    criticality: RiskLevel
    dependencies: List[str] = field(default_factory=list)
    third_party_providers: List[str] = field(default_factory=list)
    recovery_time_objective: int = 0  # minute
    recovery_point_objective: int = 0  # minute
 
@dataclass
class ICTRisk:
    id: str
    title: str
    description: str
    category: RiskCategory
    affected_assets: List[str]
    likelihood: int  # 1-5
    impact: int  # 1-5
    risk_level: RiskLevel
    controls: List[str]
    residual_risk: RiskLevel
    owner: str
    review_date: datetime
    status: str = "open"
 
@dataclass
class Control:
    id: str
    name: str
    description: str
    control_type: str  # preventiv, detectiv, corectiv
    implementation_status: str
    effectiveness: str  # effective, partially_effective, ineffective
    last_tested: Optional[datetime] = None
    test_results: Optional[str] = None
 
class DORAICTRiskFramework:
    def __init__(self):
        self.assets: Dict[str, ICTAsset] = {}
        self.risks: Dict[str, ICTRisk] = {}
        self.controls: Dict[str, Control] = {}
        self.risk_appetite: Dict[RiskCategory, RiskLevel] = {}
 
    def register_asset(self, asset: ICTAsset) -> str:
        """Inregistreaza un activ ICT pentru managementul riscurilor."""
        self.assets[asset.id] = asset
 
        # Identifica automat riscurile pe baza tipului si criticalitatii activului
        auto_risks = self._identify_asset_risks(asset)
        for risk in auto_risks:
            self.risks[risk.id] = risk
 
        return asset.id
 
    def _identify_asset_risks(self, asset: ICTAsset) -> List[ICTRisk]:
        """Identifica automat riscurile pentru un activ."""
        risks = []
 
        # Riscuri de disponibilitate
        if asset.criticality in [RiskLevel.CRITICAL, RiskLevel.HIGH]:
            risks.append(ICTRisk(
                id=f"RISK-{asset.id}-AVAIL",
                title=f"Indisponibilitatea serviciului pentru {asset.name}",
                description=f"Riscul ca {asset.name} sa devina indisponibil",
                category=RiskCategory.AVAILABILITY,
                affected_assets=[asset.id],
                likelihood=3,
                impact=5 if asset.criticality == RiskLevel.CRITICAL else 4,
                risk_level=RiskLevel.HIGH if asset.criticality == RiskLevel.CRITICAL else RiskLevel.MEDIUM,
                controls=[],
                residual_risk=RiskLevel.MEDIUM,
                owner=asset.owner,
                review_date=datetime.utcnow() + timedelta(days=90)
            ))
 
        # Riscuri de la terti
        if asset.third_party_providers:
            risks.append(ICTRisk(
                id=f"RISK-{asset.id}-3P",
                title=f"Risc de dependenta de terti pentru {asset.name}",
                description=f"Risc din esecul furnizorului tert care afecteaza {asset.name}",
                category=RiskCategory.CONTINUITY,
                affected_assets=[asset.id],
                likelihood=2,
                impact=4,
                risk_level=RiskLevel.MEDIUM,
                controls=[],
                residual_risk=RiskLevel.LOW,
                owner=asset.owner,
                review_date=datetime.utcnow() + timedelta(days=90)
            ))
 
        # Riscuri de integritate a datelor
        if asset.asset_type == AssetType.DATA_ASSET:
            risks.append(ICTRisk(
                id=f"RISK-{asset.id}-INTEG",
                title=f"Risc de integritate a datelor pentru {asset.name}",
                description=f"Riscul de corupere a datelor sau modificare neautorizata",
                category=RiskCategory.INTEGRITY,
                affected_assets=[asset.id],
                likelihood=2,
                impact=4,
                risk_level=RiskLevel.MEDIUM,
                controls=[],
                residual_risk=RiskLevel.LOW,
                owner=asset.owner,
                review_date=datetime.utcnow() + timedelta(days=90)
            ))
 
        return risks
 
    def assess_risk(self, risk_id: str) -> Dict:
        """Evalueaza un risc specific."""
        risk = self.risks.get(risk_id)
        if not risk:
            raise ValueError(f"Riscul {risk_id} nu a fost gasit")
 
        # Calculeaza scorul de risc inerent
        inherent_score = risk.likelihood * risk.impact
 
        # Obtine eficacitatea controalelor
        control_effectiveness = self._calculate_control_effectiveness(risk.controls)
 
        # Calculeaza riscul rezidual
        residual_score = inherent_score * (1 - control_effectiveness)
 
        return {
            "risk_id": risk_id,
            "inherent_risk": {
                "likelihood": risk.likelihood,
                "impact": risk.impact,
                "score": inherent_score,
                "level": self._score_to_level(inherent_score)
            },
            "controls": {
                "count": len(risk.controls),
                "effectiveness": control_effectiveness
            },
            "residual_risk": {
                "score": residual_score,
                "level": self._score_to_level(residual_score)
            },
            "within_appetite": self._check_risk_appetite(risk.category, residual_score)
        }
 
    def _calculate_control_effectiveness(self, control_ids: List[str]) -> float:
        """Calculeaza eficacitatea agregata a controalelor."""
        if not control_ids:
            return 0.0
 
        effectiveness_scores = {
            "effective": 0.8,
            "partially_effective": 0.4,
            "ineffective": 0.0
        }
 
        total = 0
        for control_id in control_ids:
            control = self.controls.get(control_id)
            if control:
                total += effectiveness_scores.get(control.effectiveness, 0)
 
        return total / len(control_ids) if control_ids else 0
 
    def _score_to_level(self, score: float) -> RiskLevel:
        """Converteste scorul numeric in nivel de risc."""
        if score >= 20:
            return RiskLevel.CRITICAL
        elif score >= 12:
            return RiskLevel.HIGH
        elif score >= 6:
            return RiskLevel.MEDIUM
        return RiskLevel.LOW
 
    def _check_risk_appetite(self, category: RiskCategory, score: float) -> bool:
        """Verifica daca riscul este in limita apetitului."""
        appetite_level = self.risk_appetite.get(category, RiskLevel.MEDIUM)
        risk_level = self._score_to_level(score)
 
        level_order = [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL]
        return level_order.index(risk_level) <= level_order.index(appetite_level)
 
    def generate_risk_report(self) -> Dict:
        """Genereaza raportul comprehensiv de risc pentru conformitatea DORA."""
        report = {
            "generated_at": datetime.utcnow().isoformat(),
            "summary": {
                "total_assets": len(self.assets),
                "total_risks": len(self.risks),
                "total_controls": len(self.controls),
                "risks_by_level": {},
                "risks_by_category": {}
            },
            "critical_risks": [],
            "control_gaps": [],
            "third_party_dependencies": [],
            "recommendations": []
        }
 
        # Agregeaza riscurile
        for risk in self.risks.values():
            level = risk.risk_level.value
            category = risk.category.value
 
            report["summary"]["risks_by_level"][level] = \
                report["summary"]["risks_by_level"].get(level, 0) + 1
            report["summary"]["risks_by_category"][category] = \
                report["summary"]["risks_by_category"].get(category, 0) + 1
 
            # Marcheaza riscurile critice
            if risk.risk_level == RiskLevel.CRITICAL:
                report["critical_risks"].append({
                    "id": risk.id,
                    "title": risk.title,
                    "category": category,
                    "owner": risk.owner
                })
 
            # Identifica lacunele de control
            if not risk.controls:
                report["control_gaps"].append({
                    "risk_id": risk.id,
                    "risk_title": risk.title,
                    "recommendation": "Implementeaza controale pentru acest risc"
                })
 
        # Documenteaza dependentele de terti
        for asset in self.assets.values():
            if asset.third_party_providers:
                report["third_party_dependencies"].append({
                    "asset_id": asset.id,
                    "asset_name": asset.name,
                    "providers": asset.third_party_providers,
                    "criticality": asset.criticality.value
                })
 
        return report

Raportarea Incidentelor ICT

from datetime import datetime, timedelta
from typing import Dict, List, Optional
from enum import Enum
 
class IncidentSeverity(Enum):
    CRITICAL = "critical"
    MAJOR = "major"
    SIGNIFICANT = "significant"
    MINOR = "minor"
 
class IncidentStatus(Enum):
    DETECTED = "detected"
    INVESTIGATING = "investigating"
    CONTAINED = "contained"
    RESOLVED = "resolved"
    POST_MORTEM = "post_mortem"
    CLOSED = "closed"
 
@dataclass
class DORAIncident:
    id: str
    title: str
    description: str
    severity: IncidentSeverity
    detected_at: datetime
    affected_services: List[str]
    affected_clients: int
    financial_impact: float
    data_compromised: bool
    root_cause: Optional[str] = None
    resolution: Optional[str] = None
    status: IncidentStatus = IncidentStatus.DETECTED
    reported_to_authority: bool = False
    notification_timestamps: Dict[str, datetime] = field(default_factory=dict)
 
class DORAIncidentManager:
    def __init__(self, authority_api_client=None):
        self.incidents: Dict[str, DORAIncident] = {}
        self.authority_api = authority_api_client
 
    def create_incident(self, incident_data: Dict) -> DORAIncident:
        """Creeaza si clasifica un nou incident ICT."""
        incident = DORAIncident(
            id=f"INC-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
            title=incident_data['title'],
            description=incident_data['description'],
            severity=self._classify_severity(incident_data),
            detected_at=datetime.utcnow(),
            affected_services=incident_data.get('affected_services', []),
            affected_clients=incident_data.get('affected_clients', 0),
            financial_impact=incident_data.get('financial_impact', 0),
            data_compromised=incident_data.get('data_compromised', False)
        )
 
        self.incidents[incident.id] = incident
 
        # Verifica daca este un incident major care necesita notificare
        if self._is_major_incident(incident):
            self._initiate_notification_process(incident)
 
        return incident
 
    def _classify_severity(self, incident_data: Dict) -> IncidentSeverity:
        """Clasifica severitatea incidentului conform criteriilor DORA."""
        # Criteriile DORA pentru incidente majore
        major_criteria = {
            "clients_affected": 10000,
            "duration_hours": 2,
            "financial_impact": 100000,
            "data_breach": True,
            "critical_services": True
        }
 
        # Verifica fiecare criteriu
        clients_affected = incident_data.get('affected_clients', 0)
        expected_duration = incident_data.get('expected_duration_hours', 0)
        financial_impact = incident_data.get('financial_impact', 0)
        data_compromised = incident_data.get('data_compromised', False)
        critical_service = incident_data.get('affects_critical_service', False)
 
        major_count = 0
        if clients_affected >= major_criteria['clients_affected']:
            major_count += 1
        if expected_duration >= major_criteria['duration_hours']:
            major_count += 1
        if financial_impact >= major_criteria['financial_impact']:
            major_count += 1
        if data_compromised:
            major_count += 2  # Breachul de date are pondere mai mare
        if critical_service:
            major_count += 1
 
        if major_count >= 3 or data_compromised:
            return IncidentSeverity.CRITICAL
        elif major_count >= 2:
            return IncidentSeverity.MAJOR
        elif major_count >= 1:
            return IncidentSeverity.SIGNIFICANT
        return IncidentSeverity.MINOR
 
    def _is_major_incident(self, incident: DORAIncident) -> bool:
        """Determina daca incidentul atinge pragul de incident major DORA."""
        return incident.severity in [IncidentSeverity.CRITICAL, IncidentSeverity.MAJOR]
 
    def _initiate_notification_process(self, incident: DORAIncident):
        """Initiaza procesul de notificare DORA pentru incidente majore."""
        # DORA cere notificare initiala in 24 de ore
        # Raport intermediar in 72 de ore
        # Raport final in 1 luna
 
        notification_schedule = {
            "initial": datetime.utcnow() + timedelta(hours=4),  # Tinta 4 ore
            "intermediate": datetime.utcnow() + timedelta(hours=72),
            "final": datetime.utcnow() + timedelta(days=30)
        }
 
        incident.notification_timestamps = notification_schedule
 
        # Trimite alerte interne
        self._send_internal_alert(incident)
 
    def _send_internal_alert(self, incident: DORAIncident):
        """Trimite notificare interna despre incidentul major."""
        alert = {
            "type": "DORA_MAJOR_INCIDENT",
            "incident_id": incident.id,
            "severity": incident.severity.value,
            "title": incident.title,
            "notification_deadline": incident.notification_timestamps.get('initial').isoformat(),
            "required_actions": [
                "Evalueaza amploarea impactului",
                "Documenteaza serviciile afectate",
                "Pregateste raportul de notificare initiala",
                "Notifica managementul superior"
            ]
        }
        # Ar trimite la sistemul de alerte
        print(f"ALERTA: {json.dumps(alert)}")
 
    def submit_initial_notification(self, incident_id: str) -> Dict:
        """Trimite notificarea initiala catre autoritatea competenta."""
        incident = self.incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incidentul {incident_id} nu a fost gasit")
 
        if not self._is_major_incident(incident):
            return {"status": "not_required", "reason": "Nu este un incident major"}
 
        notification = {
            "notification_type": "initial",
            "incident_id": incident.id,
            "reported_at": datetime.utcnow().isoformat(),
            "detected_at": incident.detected_at.isoformat(),
            "classification": incident.severity.value,
            "description": incident.description,
            "affected_services": incident.affected_services,
            "affected_clients_estimate": incident.affected_clients,
            "financial_impact_estimate": incident.financial_impact,
            "data_compromised": incident.data_compromised,
            "status": incident.status.value,
            "preliminary_root_cause": "In investigare",
            "actions_taken": self._get_actions_taken(incident),
            "expected_resolution": None
        }
 
        # Trimite catre autoritate
        if self.authority_api:
            response = self.authority_api.submit_notification(notification)
            incident.reported_to_authority = True
            return response
 
        return {"status": "simulated", "notification": notification}
 
    def submit_intermediate_notification(self, incident_id: str, updates: Dict) -> Dict:
        """Trimite notificarea intermediara cu actualizari."""
        incident = self.incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incidentul {incident_id} nu a fost gasit")
 
        notification = {
            "notification_type": "intermediate",
            "incident_id": incident.id,
            "reported_at": datetime.utcnow().isoformat(),
            "status_update": incident.status.value,
            "root_cause_analysis": updates.get('root_cause', 'In desfasurare'),
            "actual_impact": {
                "clients_affected": updates.get('actual_clients', incident.affected_clients),
                "financial_impact": updates.get('actual_financial_impact', incident.financial_impact),
                "data_records_affected": updates.get('data_records', 0)
            },
            "containment_measures": updates.get('containment_measures', []),
            "recovery_progress": updates.get('recovery_progress', ''),
            "estimated_resolution": updates.get('estimated_resolution')
        }
 
        return {"status": "submitted", "notification": notification}
 
    def submit_final_notification(self, incident_id: str) -> Dict:
        """Trimite notificarea finala cu analiza completa."""
        incident = self.incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incidentul {incident_id} nu a fost gasit")
 
        notification = {
            "notification_type": "final",
            "incident_id": incident.id,
            "reported_at": datetime.utcnow().isoformat(),
            "incident_timeline": self._build_timeline(incident),
            "root_cause": incident.root_cause,
            "total_impact": {
                "clients_affected": incident.affected_clients,
                "financial_impact": incident.financial_impact,
                "data_compromised": incident.data_compromised,
                "duration_hours": self._calculate_duration(incident)
            },
            "resolution": incident.resolution,
            "lessons_learned": self._get_lessons_learned(incident),
            "preventive_measures": self._get_preventive_measures(incident),
            "regulatory_follow_up": None
        }
 
        return {"status": "submitted", "notification": notification}
 
    def _get_actions_taken(self, incident: DORAIncident) -> List[str]:
        """Obtine lista actiunilor luate pentru incident."""
        return [
            "Echipa de raspuns la incidente activata",
            "Evaluarea impactului initiata",
            "Stakeholder-ii notificati",
            "Proceduri de contenire initiate"
        ]
 
    def _build_timeline(self, incident: DORAIncident) -> List[Dict]:
        """Construieste cronologia incidentului."""
        return [
            {"timestamp": incident.detected_at.isoformat(), "event": "Incident detectat"},
            {"timestamp": datetime.utcnow().isoformat(), "event": "Incident rezolvat"}
        ]
 
    def _calculate_duration(self, incident: DORAIncident) -> float:
        """Calculeaza durata incidentului in ore."""
        if incident.status == IncidentStatus.RESOLVED:
            return 0  # Ar calcula durata reala
        return (datetime.utcnow() - incident.detected_at).total_seconds() / 3600
 
    def _get_lessons_learned(self, incident: DORAIncident) -> List[str]:
        """Obtine lectiile invatate din incident."""
        return []
 
    def _get_preventive_measures(self, incident: DORAIncident) -> List[str]:
        """Obtine masurile preventive de implementat."""
        return []
 
    def generate_incident_report(self, period_start: datetime, period_end: datetime) -> Dict:
        """Genereaza raportul de incidente pentru perioada de raportare."""
        period_incidents = [
            i for i in self.incidents.values()
            if period_start <= i.detected_at <= period_end
        ]
 
        report = {
            "reporting_period": {
                "start": period_start.isoformat(),
                "end": period_end.isoformat()
            },
            "summary": {
                "total_incidents": len(period_incidents),
                "major_incidents": len([i for i in period_incidents if self._is_major_incident(i)]),
                "by_severity": {},
                "by_status": {},
                "total_financial_impact": sum(i.financial_impact for i in period_incidents),
                "total_clients_affected": sum(i.affected_clients for i in period_incidents)
            },
            "major_incidents_detail": [],
            "notifications_submitted": []
        }
 
        for incident in period_incidents:
            severity = incident.severity.value
            status = incident.status.value
            report["summary"]["by_severity"][severity] = \
                report["summary"]["by_severity"].get(severity, 0) + 1
            report["summary"]["by_status"][status] = \
                report["summary"]["by_status"].get(status, 0) + 1
 
            if self._is_major_incident(incident):
                report["major_incidents_detail"].append({
                    "id": incident.id,
                    "title": incident.title,
                    "severity": severity,
                    "detected_at": incident.detected_at.isoformat(),
                    "reported_to_authority": incident.reported_to_authority
                })
 
        return report

Testarea Reziliientei Digitale

class DORAResilienceTesting:
    def __init__(self):
        self.test_plans: Dict[str, Dict] = {}
        self.test_results: List[Dict] = []
 
    def create_test_plan(self, test_type: str, scope: Dict) -> Dict:
        """Creeaza un plan de testare a reziliientei conform cerintelor DORA."""
        test_plan = {
            "id": f"TEST-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
            "type": test_type,
            "scope": scope,
            "created_at": datetime.utcnow().isoformat(),
            "status": "planned",
            "scenarios": [],
            "schedule": None
        }
 
        # DORA cere testare diferita in functie de tipul entitatii
        if test_type == "threat_led_penetration_test":
            # TLPT obligatoriu pentru entitatile semnificative
            test_plan["scenarios"] = [
                {
                    "name": "Simulare Atac Extern",
                    "description": "Simuleaza un actor de amenintare extern sofisticat",
                    "target": "critical_infrastructure",
                    "techniques": ["social_engineering", "network_intrusion", "data_exfiltration"]
                },
                {
                    "name": "Amenintare Interna",
                    "description": "Simuleaza un insider malitios",
                    "target": "internal_systems",
                    "techniques": ["privilege_escalation", "data_theft"]
                }
            ]
            test_plan["requirements"] = {
                "red_team_provider": "certified_threat_intelligence",
                "frequency": "every_3_years",
                "regulator_notification": True
            }
 
        elif test_type == "scenario_based_testing":
            test_plan["scenarios"] = [
                {
                    "name": "Esec Data Center",
                    "description": "Simuleaza indisponibilitatea completa a data center-ului",
                    "rto_target": 240,  # minute
                    "rpo_target": 60
                },
                {
                    "name": "Atac Cibernetic",
                    "description": "Simuleaza un atac de tip ransomware",
                    "response_time_target": 30
                },
                {
                    "name": "Esec Furnizor Tert",
                    "description": "Simuleaza esecul unui serviciu tert critic",
                    "failover_target": 60
                }
            ]
 
        elif test_type == "vulnerability_assessment":
            test_plan["scenarios"] = [
                {"name": "Scanare Infrastructura", "target": "all_infrastructure"},
                {"name": "Testare Securitate Aplicatie", "target": "web_applications"},
                {"name": "Revizuire Configuratii", "target": "security_configurations"}
            ]
            test_plan["requirements"] = {
                "frequency": "quarterly",
                "scope": "all_critical_systems"
            }
 
        self.test_plans[test_plan["id"]] = test_plan
        return test_plan
 
    def execute_resilience_test(self, plan_id: str) -> Dict:
        """Executa un test de rezilienta."""
        plan = self.test_plans.get(plan_id)
        if not plan:
            raise ValueError(f"Planul de test {plan_id} nu a fost gasit")
 
        results = {
            "plan_id": plan_id,
            "executed_at": datetime.utcnow().isoformat(),
            "status": "completed",
            "scenario_results": [],
            "findings": [],
            "recommendations": []
        }
 
        for scenario in plan["scenarios"]:
            scenario_result = self._execute_scenario(scenario)
            results["scenario_results"].append(scenario_result)
 
            if not scenario_result["passed"]:
                results["findings"].append({
                    "scenario": scenario["name"],
                    "issue": scenario_result["failure_reason"],
                    "severity": "high" if "critical" in scenario["name"].lower() else "medium"
                })
 
        # Genereaza recomandari pe baza constatarilor
        results["recommendations"] = self._generate_recommendations(results["findings"])
 
        self.test_results.append(results)
        return results
 
    def _execute_scenario(self, scenario: Dict) -> Dict:
        """Executa un scenariu de test."""
        # Executie simplificata a scenariului
        return {
            "scenario_name": scenario["name"],
            "started_at": datetime.utcnow().isoformat(),
            "completed_at": datetime.utcnow().isoformat(),
            "passed": True,
            "metrics": {
                "rto_achieved": scenario.get("rto_target", 0) * 0.8,  # Simulat
                "rpo_achieved": scenario.get("rpo_target", 0) * 0.9
            },
            "observations": []
        }
 
    def _generate_recommendations(self, findings: List[Dict]) -> List[str]:
        """Genereaza recomandari din constatarile testelor."""
        recommendations = []
        for finding in findings:
            recommendations.append(f"Adreseaza {finding['issue']} identificat in {finding['scenario']}")
        return recommendations
 
    def generate_testing_report(self) -> Dict:
        """Genereaza raportul anual de testare pentru conformitatea DORA."""
        return {
            "reporting_period": "annual",
            "generated_at": datetime.utcnow().isoformat(),
            "summary": {
                "total_tests": len(self.test_results),
                "tests_passed": len([r for r in self.test_results if r["status"] == "completed"]),
                "critical_findings": len([f for r in self.test_results for f in r["findings"] if f.get("severity") == "critical"])
            },
            "test_results": self.test_results,
            "compliance_status": "compliant" if len(self.test_results) > 0 else "non_compliant"
        }

Managementul Riscurilor Tertilor

@dataclass
class ThirdPartyProvider:
    id: str
    name: str
    service_type: str
    criticality: RiskLevel
    contract_start: datetime
    contract_end: datetime
    services_provided: List[str]
    data_access: bool
    location: str
    subcontractors: List[str] = field(default_factory=list)
    last_assessment: Optional[datetime] = None
    risk_rating: Optional[RiskLevel] = None
 
class DORAThirdPartyRiskManager:
    def __init__(self):
        self.providers: Dict[str, ThirdPartyProvider] = {}
        self.assessments: Dict[str, List[Dict]] = {}
        self.contracts: Dict[str, Dict] = {}
 
    def register_provider(self, provider: ThirdPartyProvider) -> str:
        """Inregistreaza un furnizor tert de servicii ICT."""
        self.providers[provider.id] = provider
 
        # Verifica daca furnizorul critic necesita controale suplimentare
        if provider.criticality == RiskLevel.CRITICAL:
            self._flag_critical_provider(provider)
 
        return provider.id
 
    def _flag_critical_provider(self, provider: ThirdPartyProvider):
        """Marcheaza furnizorul critic pentru monitorizare avansata."""
        print(f"ALERTA: Furnizor tert critic inregistrat: {provider.name}")
        # Ar declansa un workflow de due diligence avansat
 
    def perform_due_diligence(self, provider_id: str) -> Dict:
        """Efectueaza due diligence conform cerintelor DORA asupra furnizorului."""
        provider = self.providers.get(provider_id)
        if not provider:
            raise ValueError(f"Furnizorul {provider_id} nu a fost gasit")
 
        assessment = {
            "provider_id": provider_id,
            "assessment_date": datetime.utcnow().isoformat(),
            "assessment_type": "due_diligence",
            "areas_assessed": [],
            "findings": [],
            "risk_rating": None,
            "recommendations": []
        }
 
        # Cerintele de due diligence DORA
        due_diligence_areas = [
            {
                "area": "financial_stability",
                "checks": ["financial_statements", "credit_rating", "insurance_coverage"]
            },
            {
                "area": "security_controls",
                "checks": ["certifications", "security_audit_reports", "incident_history"]
            },
            {
                "area": "business_continuity",
                "checks": ["bcp_documentation", "dr_capabilities", "rto_rpo_guarantees"]
            },
            {
                "area": "compliance",
                "checks": ["regulatory_compliance", "data_protection", "subcontractor_oversight"]
            },
            {
                "area": "exit_strategy",
                "checks": ["data_portability", "transition_support", "termination_procedures"]
            }
        ]
 
        for area in due_diligence_areas:
            area_result = self._assess_area(provider, area)
            assessment["areas_assessed"].append(area_result)
 
            if not area_result["satisfactory"]:
                assessment["findings"].append({
                    "area": area["area"],
                    "issue": area_result["issues"],
                    "severity": "high" if provider.criticality == RiskLevel.CRITICAL else "medium"
                })
 
        # Calculeaza ratingul general de risc
        assessment["risk_rating"] = self._calculate_provider_risk(assessment)
 
        # Actualizeaza inregistrarea furnizorului
        provider.last_assessment = datetime.utcnow()
        provider.risk_rating = RiskLevel(assessment["risk_rating"])
 
        # Stocheaza evaluarea
        if provider_id not in self.assessments:
            self.assessments[provider_id] = []
        self.assessments[provider_id].append(assessment)
 
        return assessment
 
    def _assess_area(self, provider: ThirdPartyProvider, area: Dict) -> Dict:
        """Evalueaza o arie specifica de due diligence."""
        # Evaluare simplificata - ar include verificari reale
        return {
            "area": area["area"],
            "checks_performed": area["checks"],
            "satisfactory": True,  # Ar fi pe baza evaluarii reale
            "score": 85,
            "issues": []
        }
 
    def _calculate_provider_risk(self, assessment: Dict) -> str:
        """Calculeaza ratingul general de risc al furnizorului."""
        finding_count = len(assessment["findings"])
        high_findings = len([f for f in assessment["findings"] if f["severity"] == "high"])
 
        if high_findings > 2:
            return "critical"
        elif high_findings > 0 or finding_count > 3:
            return "high"
        elif finding_count > 0:
            return "medium"
        return "low"
 
    def verify_contract_requirements(self, provider_id: str, contract: Dict) -> Dict:
        """Verifica daca contractul indeplineste cerintele DORA."""
        required_clauses = [
            "service_level_agreements",
            "audit_rights",
            "data_location",
            "subcontracting_approval",
            "incident_notification",
            "business_continuity",
            "exit_provisions",
            "data_portability",
            "termination_rights",
            "liability_and_indemnification"
        ]
 
        verification = {
            "provider_id": provider_id,
            "verified_at": datetime.utcnow().isoformat(),
            "compliant": True,
            "clauses_verified": [],
            "missing_clauses": [],
            "recommendations": []
        }
 
        for clause in required_clauses:
            if clause in contract.get("clauses", []):
                verification["clauses_verified"].append(clause)
            else:
                verification["missing_clauses"].append(clause)
                verification["compliant"] = False
 
        if not verification["compliant"]:
            verification["recommendations"] = [
                f"Adauga clauza {clause} in contract"
                for clause in verification["missing_clauses"]
            ]
 
        return verification
 
    def generate_register(self) -> Dict:
        """Genereaza registrul tertilor pentru raportarea regulamentara."""
        register = {
            "generated_at": datetime.utcnow().isoformat(),
            "total_providers": len(self.providers),
            "critical_providers": len([p for p in self.providers.values() if p.criticality == RiskLevel.CRITICAL]),
            "providers": []
        }
 
        for provider in self.providers.values():
            provider_entry = {
                "id": provider.id,
                "name": provider.name,
                "service_type": provider.service_type,
                "criticality": provider.criticality.value,
                "services": provider.services_provided,
                "data_access": provider.data_access,
                "location": provider.location,
                "contract_end": provider.contract_end.isoformat(),
                "last_assessment": provider.last_assessment.isoformat() if provider.last_assessment else None,
                "risk_rating": provider.risk_rating.value if provider.risk_rating else "not_assessed",
                "subcontractors": provider.subcontractors
            }
            register["providers"].append(provider_entry)
 
        return register

Bune Practici

Implementarea DORA

  1. Incepe cu inventarul activelor: Identifica si clasifica toate activele ICT
  2. Harta dependentele: Documenteaza dependentele interne si de terti
  3. Implementeaza monitorizare continua: Monitorizare in timp real a riscurilor si incidentelor
  4. Testare regulata: Efectueaza teste de rezilienta conform cerintelor DORA

Mentinerea Conformitatii

  • Mentine registrele de riscuri actualizate
  • Documenteaza toate incidentele cu detaliile necesare
  • Revizuieste contractele tertilor pentru conformitatea DORA
  • Raporteaza incidentele majore in termenele impuse

Conformitatea DORA necesita o abordare comprehensiva a reziliientei operationale digitale care acopera managementul riscurilor, gestionarea incidentelor, testarea si supravegherea tertilor.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.