Compliance

DORA Compliance for Software Development: Digital Operational Resilience Act Guide

DeviDevs Team
14 min read
#DORA#compliance#financial services#ICT risk#resilience

DORA Compliance for Software Development: Digital Operational Resilience Act Guide

The Digital Operational Resilience Act (DORA) establishes uniform requirements for ICT risk management in the EU financial sector. This guide covers implementing DORA compliance for software development teams.

DORA Requirements Overview

Core Pillars

| Pillar | Description | Key Requirements | |--------|-------------|------------------| | ICT Risk Management | Comprehensive risk framework | Risk identification, protection, detection, response, recovery | | Incident Reporting | Standardized incident classification | Major incidents reported within 24 hours | | Resilience Testing | Regular testing of ICT systems | Threat-led penetration testing for significant entities | | Third-Party Risk | ICT service provider management | Due diligence, contracts, exit strategies | | Information Sharing | Threat intelligence sharing | Voluntary participation in sharing arrangements |

ICT Risk Management Framework

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from enum import Enum
import json
 
class RiskLevel(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
 
class RiskCategory(Enum):
    AVAILABILITY = "availability"
    INTEGRITY = "integrity"
    CONFIDENTIALITY = "confidentiality"
    CONTINUITY = "continuity"
 
class AssetType(Enum):
    CRITICAL_FUNCTION = "critical_function"
    SUPPORTING_SYSTEM = "supporting_system"
    DATA_ASSET = "data_asset"
    THIRD_PARTY_SERVICE = "third_party_service"
 
@dataclass
class ICTAsset:
    id: str
    name: str
    asset_type: AssetType
    description: str
    owner: str
    criticality: RiskLevel
    dependencies: List[str] = field(default_factory=list)
    third_party_providers: List[str] = field(default_factory=list)
    recovery_time_objective: int = 0  # minutes
    recovery_point_objective: int = 0  # minutes
 
@dataclass
class ICTRisk:
    id: str
    title: str
    description: str
    category: RiskCategory
    affected_assets: List[str]
    likelihood: int  # 1-5
    impact: int  # 1-5
    risk_level: RiskLevel
    controls: List[str]
    residual_risk: RiskLevel
    owner: str
    review_date: datetime
    status: str = "open"
 
@dataclass
class Control:
    id: str
    name: str
    description: str
    control_type: str  # preventive, detective, corrective
    implementation_status: str
    effectiveness: str  # effective, partially_effective, ineffective
    last_tested: Optional[datetime] = None
    test_results: Optional[str] = None
 
class DORAICTRiskFramework:
    def __init__(self):
        self.assets: Dict[str, ICTAsset] = {}
        self.risks: Dict[str, ICTRisk] = {}
        self.controls: Dict[str, Control] = {}
        self.risk_appetite: Dict[RiskCategory, RiskLevel] = {}
 
    def register_asset(self, asset: ICTAsset) -> str:
        """Register an ICT asset for risk management."""
        self.assets[asset.id] = asset
 
        # Auto-identify risks based on asset type and criticality
        auto_risks = self._identify_asset_risks(asset)
        for risk in auto_risks:
            self.risks[risk.id] = risk
 
        return asset.id
 
    def _identify_asset_risks(self, asset: ICTAsset) -> List[ICTRisk]:
        """Automatically identify risks for an asset."""
        risks = []
 
        # Availability risks
        if asset.criticality in [RiskLevel.CRITICAL, RiskLevel.HIGH]:
            risks.append(ICTRisk(
                id=f"RISK-{asset.id}-AVAIL",
                title=f"Service unavailability for {asset.name}",
                description=f"Risk of {asset.name} becoming unavailable",
                category=RiskCategory.AVAILABILITY,
                affected_assets=[asset.id],
                likelihood=3,
                impact=5 if asset.criticality == RiskLevel.CRITICAL else 4,
                risk_level=RiskLevel.HIGH if asset.criticality == RiskLevel.CRITICAL else RiskLevel.MEDIUM,
                controls=[],
                residual_risk=RiskLevel.MEDIUM,
                owner=asset.owner,
                review_date=datetime.utcnow() + timedelta(days=90)
            ))
 
        # Third-party risks
        if asset.third_party_providers:
            risks.append(ICTRisk(
                id=f"RISK-{asset.id}-3P",
                title=f"Third-party dependency risk for {asset.name}",
                description=f"Risk from third-party provider failure affecting {asset.name}",
                category=RiskCategory.CONTINUITY,
                affected_assets=[asset.id],
                likelihood=2,
                impact=4,
                risk_level=RiskLevel.MEDIUM,
                controls=[],
                residual_risk=RiskLevel.LOW,
                owner=asset.owner,
                review_date=datetime.utcnow() + timedelta(days=90)
            ))
 
        # Data integrity risks
        if asset.asset_type == AssetType.DATA_ASSET:
            risks.append(ICTRisk(
                id=f"RISK-{asset.id}-INTEG",
                title=f"Data integrity risk for {asset.name}",
                description=f"Risk of data corruption or unauthorized modification",
                category=RiskCategory.INTEGRITY,
                affected_assets=[asset.id],
                likelihood=2,
                impact=4,
                risk_level=RiskLevel.MEDIUM,
                controls=[],
                residual_risk=RiskLevel.LOW,
                owner=asset.owner,
                review_date=datetime.utcnow() + timedelta(days=90)
            ))
 
        return risks
 
    def assess_risk(self, risk_id: str) -> Dict:
        """Assess a specific risk."""
        risk = self.risks.get(risk_id)
        if not risk:
            raise ValueError(f"Risk {risk_id} not found")
 
        # Calculate inherent risk score
        inherent_score = risk.likelihood * risk.impact
 
        # Get control effectiveness
        control_effectiveness = self._calculate_control_effectiveness(risk.controls)
 
        # Calculate residual risk
        residual_score = inherent_score * (1 - control_effectiveness)
 
        return {
            "risk_id": risk_id,
            "inherent_risk": {
                "likelihood": risk.likelihood,
                "impact": risk.impact,
                "score": inherent_score,
                "level": self._score_to_level(inherent_score)
            },
            "controls": {
                "count": len(risk.controls),
                "effectiveness": control_effectiveness
            },
            "residual_risk": {
                "score": residual_score,
                "level": self._score_to_level(residual_score)
            },
            "within_appetite": self._check_risk_appetite(risk.category, residual_score)
        }
 
    def _calculate_control_effectiveness(self, control_ids: List[str]) -> float:
        """Calculate aggregate control effectiveness."""
        if not control_ids:
            return 0.0
 
        effectiveness_scores = {
            "effective": 0.8,
            "partially_effective": 0.4,
            "ineffective": 0.0
        }
 
        total = 0
        for control_id in control_ids:
            control = self.controls.get(control_id)
            if control:
                total += effectiveness_scores.get(control.effectiveness, 0)
 
        return total / len(control_ids) if control_ids else 0
 
    def _score_to_level(self, score: float) -> RiskLevel:
        """Convert numeric score to risk level."""
        if score >= 20:
            return RiskLevel.CRITICAL
        elif score >= 12:
            return RiskLevel.HIGH
        elif score >= 6:
            return RiskLevel.MEDIUM
        return RiskLevel.LOW
 
    def _check_risk_appetite(self, category: RiskCategory, score: float) -> bool:
        """Check if risk is within appetite."""
        appetite_level = self.risk_appetite.get(category, RiskLevel.MEDIUM)
        risk_level = self._score_to_level(score)
 
        level_order = [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL]
        return level_order.index(risk_level) <= level_order.index(appetite_level)
 
    def generate_risk_report(self) -> Dict:
        """Generate comprehensive risk report for DORA compliance."""
        report = {
            "generated_at": datetime.utcnow().isoformat(),
            "summary": {
                "total_assets": len(self.assets),
                "total_risks": len(self.risks),
                "total_controls": len(self.controls),
                "risks_by_level": {},
                "risks_by_category": {}
            },
            "critical_risks": [],
            "control_gaps": [],
            "third_party_dependencies": [],
            "recommendations": []
        }
 
        # Aggregate risks
        for risk in self.risks.values():
            level = risk.risk_level.value
            category = risk.category.value
 
            report["summary"]["risks_by_level"][level] = \
                report["summary"]["risks_by_level"].get(level, 0) + 1
            report["summary"]["risks_by_category"][category] = \
                report["summary"]["risks_by_category"].get(category, 0) + 1
 
            # Flag critical risks
            if risk.risk_level == RiskLevel.CRITICAL:
                report["critical_risks"].append({
                    "id": risk.id,
                    "title": risk.title,
                    "category": category,
                    "owner": risk.owner
                })
 
            # Identify control gaps
            if not risk.controls:
                report["control_gaps"].append({
                    "risk_id": risk.id,
                    "risk_title": risk.title,
                    "recommendation": "Implement controls for this risk"
                })
 
        # Document third-party dependencies
        for asset in self.assets.values():
            if asset.third_party_providers:
                report["third_party_dependencies"].append({
                    "asset_id": asset.id,
                    "asset_name": asset.name,
                    "providers": asset.third_party_providers,
                    "criticality": asset.criticality.value
                })
 
        return report

ICT Incident Reporting

from datetime import datetime, timedelta
from typing import Dict, List, Optional
from enum import Enum
 
class IncidentSeverity(Enum):
    CRITICAL = "critical"
    MAJOR = "major"
    SIGNIFICANT = "significant"
    MINOR = "minor"
 
class IncidentStatus(Enum):
    DETECTED = "detected"
    INVESTIGATING = "investigating"
    CONTAINED = "contained"
    RESOLVED = "resolved"
    POST_MORTEM = "post_mortem"
    CLOSED = "closed"
 
@dataclass
class DORAIncident:
    id: str
    title: str
    description: str
    severity: IncidentSeverity
    detected_at: datetime
    affected_services: List[str]
    affected_clients: int
    financial_impact: float
    data_compromised: bool
    root_cause: Optional[str] = None
    resolution: Optional[str] = None
    status: IncidentStatus = IncidentStatus.DETECTED
    reported_to_authority: bool = False
    notification_timestamps: Dict[str, datetime] = field(default_factory=dict)
 
class DORAIncidentManager:
    def __init__(self, authority_api_client=None):
        self.incidents: Dict[str, DORAIncident] = {}
        self.authority_api = authority_api_client
 
    def create_incident(self, incident_data: Dict) -> DORAIncident:
        """Create and classify a new ICT incident."""
        incident = DORAIncident(
            id=f"INC-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
            title=incident_data['title'],
            description=incident_data['description'],
            severity=self._classify_severity(incident_data),
            detected_at=datetime.utcnow(),
            affected_services=incident_data.get('affected_services', []),
            affected_clients=incident_data.get('affected_clients', 0),
            financial_impact=incident_data.get('financial_impact', 0),
            data_compromised=incident_data.get('data_compromised', False)
        )
 
        self.incidents[incident.id] = incident
 
        # Check if major incident requiring notification
        if self._is_major_incident(incident):
            self._initiate_notification_process(incident)
 
        return incident
 
    def _classify_severity(self, incident_data: Dict) -> IncidentSeverity:
        """Classify incident severity per DORA criteria."""
        # DORA major incident criteria
        major_criteria = {
            "clients_affected": 10000,
            "duration_hours": 2,
            "financial_impact": 100000,
            "data_breach": True,
            "critical_services": True
        }
 
        # Check each criterion
        clients_affected = incident_data.get('affected_clients', 0)
        expected_duration = incident_data.get('expected_duration_hours', 0)
        financial_impact = incident_data.get('financial_impact', 0)
        data_compromised = incident_data.get('data_compromised', False)
        critical_service = incident_data.get('affects_critical_service', False)
 
        major_count = 0
        if clients_affected >= major_criteria['clients_affected']:
            major_count += 1
        if expected_duration >= major_criteria['duration_hours']:
            major_count += 1
        if financial_impact >= major_criteria['financial_impact']:
            major_count += 1
        if data_compromised:
            major_count += 2  # Data breach is weighted higher
        if critical_service:
            major_count += 1
 
        if major_count >= 3 or data_compromised:
            return IncidentSeverity.CRITICAL
        elif major_count >= 2:
            return IncidentSeverity.MAJOR
        elif major_count >= 1:
            return IncidentSeverity.SIGNIFICANT
        return IncidentSeverity.MINOR
 
    def _is_major_incident(self, incident: DORAIncident) -> bool:
        """Determine if incident meets DORA major incident threshold."""
        return incident.severity in [IncidentSeverity.CRITICAL, IncidentSeverity.MAJOR]
 
    def _initiate_notification_process(self, incident: DORAIncident):
        """Initiate DORA notification process for major incidents."""
        # DORA requires initial notification within 24 hours
        # Intermediate report within 72 hours
        # Final report within 1 month
 
        notification_schedule = {
            "initial": datetime.utcnow() + timedelta(hours=4),  # Target 4 hours
            "intermediate": datetime.utcnow() + timedelta(hours=72),
            "final": datetime.utcnow() + timedelta(days=30)
        }
 
        incident.notification_timestamps = notification_schedule
 
        # Send internal alerts
        self._send_internal_alert(incident)
 
    def _send_internal_alert(self, incident: DORAIncident):
        """Send internal notification about major incident."""
        alert = {
            "type": "DORA_MAJOR_INCIDENT",
            "incident_id": incident.id,
            "severity": incident.severity.value,
            "title": incident.title,
            "notification_deadline": incident.notification_timestamps.get('initial').isoformat(),
            "required_actions": [
                "Assess impact scope",
                "Document affected services",
                "Prepare initial notification report",
                "Notify senior management"
            ]
        }
        # Would send to alerting system
        print(f"ALERT: {json.dumps(alert)}")
 
    def submit_initial_notification(self, incident_id: str) -> Dict:
        """Submit initial notification to competent authority."""
        incident = self.incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incident {incident_id} not found")
 
        if not self._is_major_incident(incident):
            return {"status": "not_required", "reason": "Not a major incident"}
 
        notification = {
            "notification_type": "initial",
            "incident_id": incident.id,
            "reported_at": datetime.utcnow().isoformat(),
            "detected_at": incident.detected_at.isoformat(),
            "classification": incident.severity.value,
            "description": incident.description,
            "affected_services": incident.affected_services,
            "affected_clients_estimate": incident.affected_clients,
            "financial_impact_estimate": incident.financial_impact,
            "data_compromised": incident.data_compromised,
            "status": incident.status.value,
            "preliminary_root_cause": "Under investigation",
            "actions_taken": self._get_actions_taken(incident),
            "expected_resolution": None
        }
 
        # Submit to authority
        if self.authority_api:
            response = self.authority_api.submit_notification(notification)
            incident.reported_to_authority = True
            return response
 
        return {"status": "simulated", "notification": notification}
 
    def submit_intermediate_notification(self, incident_id: str, updates: Dict) -> Dict:
        """Submit intermediate notification with updates."""
        incident = self.incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incident {incident_id} not found")
 
        notification = {
            "notification_type": "intermediate",
            "incident_id": incident.id,
            "reported_at": datetime.utcnow().isoformat(),
            "status_update": incident.status.value,
            "root_cause_analysis": updates.get('root_cause', 'Ongoing'),
            "actual_impact": {
                "clients_affected": updates.get('actual_clients', incident.affected_clients),
                "financial_impact": updates.get('actual_financial_impact', incident.financial_impact),
                "data_records_affected": updates.get('data_records', 0)
            },
            "containment_measures": updates.get('containment_measures', []),
            "recovery_progress": updates.get('recovery_progress', ''),
            "estimated_resolution": updates.get('estimated_resolution')
        }
 
        return {"status": "submitted", "notification": notification}
 
    def submit_final_notification(self, incident_id: str) -> Dict:
        """Submit final notification with complete analysis."""
        incident = self.incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incident {incident_id} not found")
 
        notification = {
            "notification_type": "final",
            "incident_id": incident.id,
            "reported_at": datetime.utcnow().isoformat(),
            "incident_timeline": self._build_timeline(incident),
            "root_cause": incident.root_cause,
            "total_impact": {
                "clients_affected": incident.affected_clients,
                "financial_impact": incident.financial_impact,
                "data_compromised": incident.data_compromised,
                "duration_hours": self._calculate_duration(incident)
            },
            "resolution": incident.resolution,
            "lessons_learned": self._get_lessons_learned(incident),
            "preventive_measures": self._get_preventive_measures(incident),
            "regulatory_follow_up": None
        }
 
        return {"status": "submitted", "notification": notification}
 
    def _get_actions_taken(self, incident: DORAIncident) -> List[str]:
        """Get list of actions taken for incident."""
        return [
            "Incident response team activated",
            "Impact assessment initiated",
            "Stakeholders notified",
            "Containment procedures initiated"
        ]
 
    def _build_timeline(self, incident: DORAIncident) -> List[Dict]:
        """Build incident timeline."""
        return [
            {"timestamp": incident.detected_at.isoformat(), "event": "Incident detected"},
            {"timestamp": datetime.utcnow().isoformat(), "event": "Incident resolved"}
        ]
 
    def _calculate_duration(self, incident: DORAIncident) -> float:
        """Calculate incident duration in hours."""
        if incident.status == IncidentStatus.RESOLVED:
            return 0  # Would calculate actual duration
        return (datetime.utcnow() - incident.detected_at).total_seconds() / 3600
 
    def _get_lessons_learned(self, incident: DORAIncident) -> List[str]:
        """Get lessons learned from incident."""
        return []
 
    def _get_preventive_measures(self, incident: DORAIncident) -> List[str]:
        """Get preventive measures to implement."""
        return []
 
    def generate_incident_report(self, period_start: datetime, period_end: datetime) -> Dict:
        """Generate incident report for reporting period."""
        period_incidents = [
            i for i in self.incidents.values()
            if period_start <= i.detected_at <= period_end
        ]
 
        report = {
            "reporting_period": {
                "start": period_start.isoformat(),
                "end": period_end.isoformat()
            },
            "summary": {
                "total_incidents": len(period_incidents),
                "major_incidents": len([i for i in period_incidents if self._is_major_incident(i)]),
                "by_severity": {},
                "by_status": {},
                "total_financial_impact": sum(i.financial_impact for i in period_incidents),
                "total_clients_affected": sum(i.affected_clients for i in period_incidents)
            },
            "major_incidents_detail": [],
            "notifications_submitted": []
        }
 
        for incident in period_incidents:
            severity = incident.severity.value
            status = incident.status.value
            report["summary"]["by_severity"][severity] = \
                report["summary"]["by_severity"].get(severity, 0) + 1
            report["summary"]["by_status"][status] = \
                report["summary"]["by_status"].get(status, 0) + 1
 
            if self._is_major_incident(incident):
                report["major_incidents_detail"].append({
                    "id": incident.id,
                    "title": incident.title,
                    "severity": severity,
                    "detected_at": incident.detected_at.isoformat(),
                    "reported_to_authority": incident.reported_to_authority
                })
 
        return report

Digital Resilience Testing

class DORAResilienceTesting:
    def __init__(self):
        self.test_plans: Dict[str, Dict] = {}
        self.test_results: List[Dict] = []
 
    def create_test_plan(self, test_type: str, scope: Dict) -> Dict:
        """Create a resilience test plan per DORA requirements."""
        test_plan = {
            "id": f"TEST-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
            "type": test_type,
            "scope": scope,
            "created_at": datetime.utcnow().isoformat(),
            "status": "planned",
            "scenarios": [],
            "schedule": None
        }
 
        # DORA requires different testing based on entity type
        if test_type == "threat_led_penetration_test":
            # TLPT required for significant entities
            test_plan["scenarios"] = [
                {
                    "name": "External Attack Simulation",
                    "description": "Simulate sophisticated external threat actor",
                    "target": "critical_infrastructure",
                    "techniques": ["social_engineering", "network_intrusion", "data_exfiltration"]
                },
                {
                    "name": "Insider Threat",
                    "description": "Simulate malicious insider",
                    "target": "internal_systems",
                    "techniques": ["privilege_escalation", "data_theft"]
                }
            ]
            test_plan["requirements"] = {
                "red_team_provider": "certified_threat_intelligence",
                "frequency": "every_3_years",
                "regulator_notification": True
            }
 
        elif test_type == "scenario_based_testing":
            test_plan["scenarios"] = [
                {
                    "name": "Data Center Failure",
                    "description": "Simulate complete data center outage",
                    "rto_target": 240,  # minutes
                    "rpo_target": 60
                },
                {
                    "name": "Cyber Attack",
                    "description": "Simulate ransomware attack",
                    "response_time_target": 30
                },
                {
                    "name": "Third-Party Failure",
                    "description": "Simulate critical third-party service failure",
                    "failover_target": 60
                }
            ]
 
        elif test_type == "vulnerability_assessment":
            test_plan["scenarios"] = [
                {"name": "Infrastructure Scan", "target": "all_infrastructure"},
                {"name": "Application Security Testing", "target": "web_applications"},
                {"name": "Configuration Review", "target": "security_configurations"}
            ]
            test_plan["requirements"] = {
                "frequency": "quarterly",
                "scope": "all_critical_systems"
            }
 
        self.test_plans[test_plan["id"]] = test_plan
        return test_plan
 
    def execute_resilience_test(self, plan_id: str) -> Dict:
        """Execute a resilience test."""
        plan = self.test_plans.get(plan_id)
        if not plan:
            raise ValueError(f"Test plan {plan_id} not found")
 
        results = {
            "plan_id": plan_id,
            "executed_at": datetime.utcnow().isoformat(),
            "status": "completed",
            "scenario_results": [],
            "findings": [],
            "recommendations": []
        }
 
        for scenario in plan["scenarios"]:
            scenario_result = self._execute_scenario(scenario)
            results["scenario_results"].append(scenario_result)
 
            if not scenario_result["passed"]:
                results["findings"].append({
                    "scenario": scenario["name"],
                    "issue": scenario_result["failure_reason"],
                    "severity": "high" if "critical" in scenario["name"].lower() else "medium"
                })
 
        # Generate recommendations based on findings
        results["recommendations"] = self._generate_recommendations(results["findings"])
 
        self.test_results.append(results)
        return results
 
    def _execute_scenario(self, scenario: Dict) -> Dict:
        """Execute a test scenario."""
        # Simplified scenario execution
        return {
            "scenario_name": scenario["name"],
            "started_at": datetime.utcnow().isoformat(),
            "completed_at": datetime.utcnow().isoformat(),
            "passed": True,
            "metrics": {
                "rto_achieved": scenario.get("rto_target", 0) * 0.8,  # Simulated
                "rpo_achieved": scenario.get("rpo_target", 0) * 0.9
            },
            "observations": []
        }
 
    def _generate_recommendations(self, findings: List[Dict]) -> List[str]:
        """Generate recommendations from test findings."""
        recommendations = []
        for finding in findings:
            recommendations.append(f"Address {finding['issue']} identified in {finding['scenario']}")
        return recommendations
 
    def generate_testing_report(self) -> Dict:
        """Generate annual testing report for DORA compliance."""
        return {
            "reporting_period": "annual",
            "generated_at": datetime.utcnow().isoformat(),
            "summary": {
                "total_tests": len(self.test_results),
                "tests_passed": len([r for r in self.test_results if r["status"] == "completed"]),
                "critical_findings": len([f for r in self.test_results for f in r["findings"] if f.get("severity") == "critical"])
            },
            "test_results": self.test_results,
            "compliance_status": "compliant" if len(self.test_results) > 0 else "non_compliant"
        }

Third-Party Risk Management

@dataclass
class ThirdPartyProvider:
    id: str
    name: str
    service_type: str
    criticality: RiskLevel
    contract_start: datetime
    contract_end: datetime
    services_provided: List[str]
    data_access: bool
    location: str
    subcontractors: List[str] = field(default_factory=list)
    last_assessment: Optional[datetime] = None
    risk_rating: Optional[RiskLevel] = None
 
class DORAThirdPartyRiskManager:
    def __init__(self):
        self.providers: Dict[str, ThirdPartyProvider] = {}
        self.assessments: Dict[str, List[Dict]] = {}
        self.contracts: Dict[str, Dict] = {}
 
    def register_provider(self, provider: ThirdPartyProvider) -> str:
        """Register a third-party ICT service provider."""
        self.providers[provider.id] = provider
 
        # Check if critical provider requires additional controls
        if provider.criticality == RiskLevel.CRITICAL:
            self._flag_critical_provider(provider)
 
        return provider.id
 
    def _flag_critical_provider(self, provider: ThirdPartyProvider):
        """Flag critical provider for enhanced monitoring."""
        print(f"ALERT: Critical third-party provider registered: {provider.name}")
        # Would trigger enhanced due diligence workflow
 
    def perform_due_diligence(self, provider_id: str) -> Dict:
        """Perform DORA-required due diligence on provider."""
        provider = self.providers.get(provider_id)
        if not provider:
            raise ValueError(f"Provider {provider_id} not found")
 
        assessment = {
            "provider_id": provider_id,
            "assessment_date": datetime.utcnow().isoformat(),
            "assessment_type": "due_diligence",
            "areas_assessed": [],
            "findings": [],
            "risk_rating": None,
            "recommendations": []
        }
 
        # DORA due diligence requirements
        due_diligence_areas = [
            {
                "area": "financial_stability",
                "checks": ["financial_statements", "credit_rating", "insurance_coverage"]
            },
            {
                "area": "security_controls",
                "checks": ["certifications", "security_audit_reports", "incident_history"]
            },
            {
                "area": "business_continuity",
                "checks": ["bcp_documentation", "dr_capabilities", "rto_rpo_guarantees"]
            },
            {
                "area": "compliance",
                "checks": ["regulatory_compliance", "data_protection", "subcontractor_oversight"]
            },
            {
                "area": "exit_strategy",
                "checks": ["data_portability", "transition_support", "termination_procedures"]
            }
        ]
 
        for area in due_diligence_areas:
            area_result = self._assess_area(provider, area)
            assessment["areas_assessed"].append(area_result)
 
            if not area_result["satisfactory"]:
                assessment["findings"].append({
                    "area": area["area"],
                    "issue": area_result["issues"],
                    "severity": "high" if provider.criticality == RiskLevel.CRITICAL else "medium"
                })
 
        # Calculate overall risk rating
        assessment["risk_rating"] = self._calculate_provider_risk(assessment)
 
        # Update provider record
        provider.last_assessment = datetime.utcnow()
        provider.risk_rating = RiskLevel(assessment["risk_rating"])
 
        # Store assessment
        if provider_id not in self.assessments:
            self.assessments[provider_id] = []
        self.assessments[provider_id].append(assessment)
 
        return assessment
 
    def _assess_area(self, provider: ThirdPartyProvider, area: Dict) -> Dict:
        """Assess a specific due diligence area."""
        # Simplified assessment - would include actual checks
        return {
            "area": area["area"],
            "checks_performed": area["checks"],
            "satisfactory": True,  # Would be based on actual assessment
            "score": 85,
            "issues": []
        }
 
    def _calculate_provider_risk(self, assessment: Dict) -> str:
        """Calculate overall provider risk rating."""
        finding_count = len(assessment["findings"])
        high_findings = len([f for f in assessment["findings"] if f["severity"] == "high"])
 
        if high_findings > 2:
            return "critical"
        elif high_findings > 0 or finding_count > 3:
            return "high"
        elif finding_count > 0:
            return "medium"
        return "low"
 
    def verify_contract_requirements(self, provider_id: str, contract: Dict) -> Dict:
        """Verify contract meets DORA requirements."""
        required_clauses = [
            "service_level_agreements",
            "audit_rights",
            "data_location",
            "subcontracting_approval",
            "incident_notification",
            "business_continuity",
            "exit_provisions",
            "data_portability",
            "termination_rights",
            "liability_and_indemnification"
        ]
 
        verification = {
            "provider_id": provider_id,
            "verified_at": datetime.utcnow().isoformat(),
            "compliant": True,
            "clauses_verified": [],
            "missing_clauses": [],
            "recommendations": []
        }
 
        for clause in required_clauses:
            if clause in contract.get("clauses", []):
                verification["clauses_verified"].append(clause)
            else:
                verification["missing_clauses"].append(clause)
                verification["compliant"] = False
 
        if not verification["compliant"]:
            verification["recommendations"] = [
                f"Add {clause} clause to contract"
                for clause in verification["missing_clauses"]
            ]
 
        return verification
 
    def generate_register(self) -> Dict:
        """Generate third-party register for regulatory reporting."""
        register = {
            "generated_at": datetime.utcnow().isoformat(),
            "total_providers": len(self.providers),
            "critical_providers": len([p for p in self.providers.values() if p.criticality == RiskLevel.CRITICAL]),
            "providers": []
        }
 
        for provider in self.providers.values():
            provider_entry = {
                "id": provider.id,
                "name": provider.name,
                "service_type": provider.service_type,
                "criticality": provider.criticality.value,
                "services": provider.services_provided,
                "data_access": provider.data_access,
                "location": provider.location,
                "contract_end": provider.contract_end.isoformat(),
                "last_assessment": provider.last_assessment.isoformat() if provider.last_assessment else None,
                "risk_rating": provider.risk_rating.value if provider.risk_rating else "not_assessed",
                "subcontractors": provider.subcontractors
            }
            register["providers"].append(provider_entry)
 
        return register

Best Practices

DORA Implementation

  1. Start with asset inventory: Identify and classify all ICT assets
  2. Map dependencies: Document third-party and internal dependencies
  3. Implement continuous monitoring: Real-time risk and incident monitoring
  4. Regular testing: Conduct resilience tests per DORA requirements

Compliance Maintenance

  • Maintain up-to-date risk registers
  • Document all incidents with required detail
  • Review third-party contracts for DORA compliance
  • Report major incidents within required timeframes

DORA compliance requires a comprehensive approach to digital operational resilience that spans risk management, incident handling, testing, and third-party oversight.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.