DORA Compliance for Software Development: Digital Operational Resilience Act Guide
The Digital Operational Resilience Act (DORA) establishes uniform requirements for ICT risk management in the EU financial sector. This guide covers implementing DORA compliance for software development teams.
DORA Requirements Overview
Core Pillars
| Pillar | Description | Key Requirements | |--------|-------------|------------------| | ICT Risk Management | Comprehensive risk framework | Risk identification, protection, detection, response, recovery | | Incident Reporting | Standardized incident classification | Major incidents reported within 24 hours | | Resilience Testing | Regular testing of ICT systems | Threat-led penetration testing for significant entities | | Third-Party Risk | ICT service provider management | Due diligence, contracts, exit strategies | | Information Sharing | Threat intelligence sharing | Voluntary participation in sharing arrangements |
ICT Risk Management Framework
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from enum import Enum
import json
class RiskLevel(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
class RiskCategory(Enum):
AVAILABILITY = "availability"
INTEGRITY = "integrity"
CONFIDENTIALITY = "confidentiality"
CONTINUITY = "continuity"
class AssetType(Enum):
CRITICAL_FUNCTION = "critical_function"
SUPPORTING_SYSTEM = "supporting_system"
DATA_ASSET = "data_asset"
THIRD_PARTY_SERVICE = "third_party_service"
@dataclass
class ICTAsset:
id: str
name: str
asset_type: AssetType
description: str
owner: str
criticality: RiskLevel
dependencies: List[str] = field(default_factory=list)
third_party_providers: List[str] = field(default_factory=list)
recovery_time_objective: int = 0 # minutes
recovery_point_objective: int = 0 # minutes
@dataclass
class ICTRisk:
id: str
title: str
description: str
category: RiskCategory
affected_assets: List[str]
likelihood: int # 1-5
impact: int # 1-5
risk_level: RiskLevel
controls: List[str]
residual_risk: RiskLevel
owner: str
review_date: datetime
status: str = "open"
@dataclass
class Control:
id: str
name: str
description: str
control_type: str # preventive, detective, corrective
implementation_status: str
effectiveness: str # effective, partially_effective, ineffective
last_tested: Optional[datetime] = None
test_results: Optional[str] = None
class DORAICTRiskFramework:
def __init__(self):
self.assets: Dict[str, ICTAsset] = {}
self.risks: Dict[str, ICTRisk] = {}
self.controls: Dict[str, Control] = {}
self.risk_appetite: Dict[RiskCategory, RiskLevel] = {}
def register_asset(self, asset: ICTAsset) -> str:
"""Register an ICT asset for risk management."""
self.assets[asset.id] = asset
# Auto-identify risks based on asset type and criticality
auto_risks = self._identify_asset_risks(asset)
for risk in auto_risks:
self.risks[risk.id] = risk
return asset.id
def _identify_asset_risks(self, asset: ICTAsset) -> List[ICTRisk]:
"""Automatically identify risks for an asset."""
risks = []
# Availability risks
if asset.criticality in [RiskLevel.CRITICAL, RiskLevel.HIGH]:
risks.append(ICTRisk(
id=f"RISK-{asset.id}-AVAIL",
title=f"Service unavailability for {asset.name}",
description=f"Risk of {asset.name} becoming unavailable",
category=RiskCategory.AVAILABILITY,
affected_assets=[asset.id],
likelihood=3,
impact=5 if asset.criticality == RiskLevel.CRITICAL else 4,
risk_level=RiskLevel.HIGH if asset.criticality == RiskLevel.CRITICAL else RiskLevel.MEDIUM,
controls=[],
residual_risk=RiskLevel.MEDIUM,
owner=asset.owner,
review_date=datetime.utcnow() + timedelta(days=90)
))
# Third-party risks
if asset.third_party_providers:
risks.append(ICTRisk(
id=f"RISK-{asset.id}-3P",
title=f"Third-party dependency risk for {asset.name}",
description=f"Risk from third-party provider failure affecting {asset.name}",
category=RiskCategory.CONTINUITY,
affected_assets=[asset.id],
likelihood=2,
impact=4,
risk_level=RiskLevel.MEDIUM,
controls=[],
residual_risk=RiskLevel.LOW,
owner=asset.owner,
review_date=datetime.utcnow() + timedelta(days=90)
))
# Data integrity risks
if asset.asset_type == AssetType.DATA_ASSET:
risks.append(ICTRisk(
id=f"RISK-{asset.id}-INTEG",
title=f"Data integrity risk for {asset.name}",
description=f"Risk of data corruption or unauthorized modification",
category=RiskCategory.INTEGRITY,
affected_assets=[asset.id],
likelihood=2,
impact=4,
risk_level=RiskLevel.MEDIUM,
controls=[],
residual_risk=RiskLevel.LOW,
owner=asset.owner,
review_date=datetime.utcnow() + timedelta(days=90)
))
return risks
def assess_risk(self, risk_id: str) -> Dict:
"""Assess a specific risk."""
risk = self.risks.get(risk_id)
if not risk:
raise ValueError(f"Risk {risk_id} not found")
# Calculate inherent risk score
inherent_score = risk.likelihood * risk.impact
# Get control effectiveness
control_effectiveness = self._calculate_control_effectiveness(risk.controls)
# Calculate residual risk
residual_score = inherent_score * (1 - control_effectiveness)
return {
"risk_id": risk_id,
"inherent_risk": {
"likelihood": risk.likelihood,
"impact": risk.impact,
"score": inherent_score,
"level": self._score_to_level(inherent_score)
},
"controls": {
"count": len(risk.controls),
"effectiveness": control_effectiveness
},
"residual_risk": {
"score": residual_score,
"level": self._score_to_level(residual_score)
},
"within_appetite": self._check_risk_appetite(risk.category, residual_score)
}
def _calculate_control_effectiveness(self, control_ids: List[str]) -> float:
"""Calculate aggregate control effectiveness."""
if not control_ids:
return 0.0
effectiveness_scores = {
"effective": 0.8,
"partially_effective": 0.4,
"ineffective": 0.0
}
total = 0
for control_id in control_ids:
control = self.controls.get(control_id)
if control:
total += effectiveness_scores.get(control.effectiveness, 0)
return total / len(control_ids) if control_ids else 0
def _score_to_level(self, score: float) -> RiskLevel:
"""Convert numeric score to risk level."""
if score >= 20:
return RiskLevel.CRITICAL
elif score >= 12:
return RiskLevel.HIGH
elif score >= 6:
return RiskLevel.MEDIUM
return RiskLevel.LOW
def _check_risk_appetite(self, category: RiskCategory, score: float) -> bool:
"""Check if risk is within appetite."""
appetite_level = self.risk_appetite.get(category, RiskLevel.MEDIUM)
risk_level = self._score_to_level(score)
level_order = [RiskLevel.LOW, RiskLevel.MEDIUM, RiskLevel.HIGH, RiskLevel.CRITICAL]
return level_order.index(risk_level) <= level_order.index(appetite_level)
def generate_risk_report(self) -> Dict:
"""Generate comprehensive risk report for DORA compliance."""
report = {
"generated_at": datetime.utcnow().isoformat(),
"summary": {
"total_assets": len(self.assets),
"total_risks": len(self.risks),
"total_controls": len(self.controls),
"risks_by_level": {},
"risks_by_category": {}
},
"critical_risks": [],
"control_gaps": [],
"third_party_dependencies": [],
"recommendations": []
}
# Aggregate risks
for risk in self.risks.values():
level = risk.risk_level.value
category = risk.category.value
report["summary"]["risks_by_level"][level] = \
report["summary"]["risks_by_level"].get(level, 0) + 1
report["summary"]["risks_by_category"][category] = \
report["summary"]["risks_by_category"].get(category, 0) + 1
# Flag critical risks
if risk.risk_level == RiskLevel.CRITICAL:
report["critical_risks"].append({
"id": risk.id,
"title": risk.title,
"category": category,
"owner": risk.owner
})
# Identify control gaps
if not risk.controls:
report["control_gaps"].append({
"risk_id": risk.id,
"risk_title": risk.title,
"recommendation": "Implement controls for this risk"
})
# Document third-party dependencies
for asset in self.assets.values():
if asset.third_party_providers:
report["third_party_dependencies"].append({
"asset_id": asset.id,
"asset_name": asset.name,
"providers": asset.third_party_providers,
"criticality": asset.criticality.value
})
return reportICT Incident Reporting
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from enum import Enum
class IncidentSeverity(Enum):
CRITICAL = "critical"
MAJOR = "major"
SIGNIFICANT = "significant"
MINOR = "minor"
class IncidentStatus(Enum):
DETECTED = "detected"
INVESTIGATING = "investigating"
CONTAINED = "contained"
RESOLVED = "resolved"
POST_MORTEM = "post_mortem"
CLOSED = "closed"
@dataclass
class DORAIncident:
id: str
title: str
description: str
severity: IncidentSeverity
detected_at: datetime
affected_services: List[str]
affected_clients: int
financial_impact: float
data_compromised: bool
root_cause: Optional[str] = None
resolution: Optional[str] = None
status: IncidentStatus = IncidentStatus.DETECTED
reported_to_authority: bool = False
notification_timestamps: Dict[str, datetime] = field(default_factory=dict)
class DORAIncidentManager:
def __init__(self, authority_api_client=None):
self.incidents: Dict[str, DORAIncident] = {}
self.authority_api = authority_api_client
def create_incident(self, incident_data: Dict) -> DORAIncident:
"""Create and classify a new ICT incident."""
incident = DORAIncident(
id=f"INC-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
title=incident_data['title'],
description=incident_data['description'],
severity=self._classify_severity(incident_data),
detected_at=datetime.utcnow(),
affected_services=incident_data.get('affected_services', []),
affected_clients=incident_data.get('affected_clients', 0),
financial_impact=incident_data.get('financial_impact', 0),
data_compromised=incident_data.get('data_compromised', False)
)
self.incidents[incident.id] = incident
# Check if major incident requiring notification
if self._is_major_incident(incident):
self._initiate_notification_process(incident)
return incident
def _classify_severity(self, incident_data: Dict) -> IncidentSeverity:
"""Classify incident severity per DORA criteria."""
# DORA major incident criteria
major_criteria = {
"clients_affected": 10000,
"duration_hours": 2,
"financial_impact": 100000,
"data_breach": True,
"critical_services": True
}
# Check each criterion
clients_affected = incident_data.get('affected_clients', 0)
expected_duration = incident_data.get('expected_duration_hours', 0)
financial_impact = incident_data.get('financial_impact', 0)
data_compromised = incident_data.get('data_compromised', False)
critical_service = incident_data.get('affects_critical_service', False)
major_count = 0
if clients_affected >= major_criteria['clients_affected']:
major_count += 1
if expected_duration >= major_criteria['duration_hours']:
major_count += 1
if financial_impact >= major_criteria['financial_impact']:
major_count += 1
if data_compromised:
major_count += 2 # Data breach is weighted higher
if critical_service:
major_count += 1
if major_count >= 3 or data_compromised:
return IncidentSeverity.CRITICAL
elif major_count >= 2:
return IncidentSeverity.MAJOR
elif major_count >= 1:
return IncidentSeverity.SIGNIFICANT
return IncidentSeverity.MINOR
def _is_major_incident(self, incident: DORAIncident) -> bool:
"""Determine if incident meets DORA major incident threshold."""
return incident.severity in [IncidentSeverity.CRITICAL, IncidentSeverity.MAJOR]
def _initiate_notification_process(self, incident: DORAIncident):
"""Initiate DORA notification process for major incidents."""
# DORA requires initial notification within 24 hours
# Intermediate report within 72 hours
# Final report within 1 month
notification_schedule = {
"initial": datetime.utcnow() + timedelta(hours=4), # Target 4 hours
"intermediate": datetime.utcnow() + timedelta(hours=72),
"final": datetime.utcnow() + timedelta(days=30)
}
incident.notification_timestamps = notification_schedule
# Send internal alerts
self._send_internal_alert(incident)
def _send_internal_alert(self, incident: DORAIncident):
"""Send internal notification about major incident."""
alert = {
"type": "DORA_MAJOR_INCIDENT",
"incident_id": incident.id,
"severity": incident.severity.value,
"title": incident.title,
"notification_deadline": incident.notification_timestamps.get('initial').isoformat(),
"required_actions": [
"Assess impact scope",
"Document affected services",
"Prepare initial notification report",
"Notify senior management"
]
}
# Would send to alerting system
print(f"ALERT: {json.dumps(alert)}")
def submit_initial_notification(self, incident_id: str) -> Dict:
"""Submit initial notification to competent authority."""
incident = self.incidents.get(incident_id)
if not incident:
raise ValueError(f"Incident {incident_id} not found")
if not self._is_major_incident(incident):
return {"status": "not_required", "reason": "Not a major incident"}
notification = {
"notification_type": "initial",
"incident_id": incident.id,
"reported_at": datetime.utcnow().isoformat(),
"detected_at": incident.detected_at.isoformat(),
"classification": incident.severity.value,
"description": incident.description,
"affected_services": incident.affected_services,
"affected_clients_estimate": incident.affected_clients,
"financial_impact_estimate": incident.financial_impact,
"data_compromised": incident.data_compromised,
"status": incident.status.value,
"preliminary_root_cause": "Under investigation",
"actions_taken": self._get_actions_taken(incident),
"expected_resolution": None
}
# Submit to authority
if self.authority_api:
response = self.authority_api.submit_notification(notification)
incident.reported_to_authority = True
return response
return {"status": "simulated", "notification": notification}
def submit_intermediate_notification(self, incident_id: str, updates: Dict) -> Dict:
"""Submit intermediate notification with updates."""
incident = self.incidents.get(incident_id)
if not incident:
raise ValueError(f"Incident {incident_id} not found")
notification = {
"notification_type": "intermediate",
"incident_id": incident.id,
"reported_at": datetime.utcnow().isoformat(),
"status_update": incident.status.value,
"root_cause_analysis": updates.get('root_cause', 'Ongoing'),
"actual_impact": {
"clients_affected": updates.get('actual_clients', incident.affected_clients),
"financial_impact": updates.get('actual_financial_impact', incident.financial_impact),
"data_records_affected": updates.get('data_records', 0)
},
"containment_measures": updates.get('containment_measures', []),
"recovery_progress": updates.get('recovery_progress', ''),
"estimated_resolution": updates.get('estimated_resolution')
}
return {"status": "submitted", "notification": notification}
def submit_final_notification(self, incident_id: str) -> Dict:
"""Submit final notification with complete analysis."""
incident = self.incidents.get(incident_id)
if not incident:
raise ValueError(f"Incident {incident_id} not found")
notification = {
"notification_type": "final",
"incident_id": incident.id,
"reported_at": datetime.utcnow().isoformat(),
"incident_timeline": self._build_timeline(incident),
"root_cause": incident.root_cause,
"total_impact": {
"clients_affected": incident.affected_clients,
"financial_impact": incident.financial_impact,
"data_compromised": incident.data_compromised,
"duration_hours": self._calculate_duration(incident)
},
"resolution": incident.resolution,
"lessons_learned": self._get_lessons_learned(incident),
"preventive_measures": self._get_preventive_measures(incident),
"regulatory_follow_up": None
}
return {"status": "submitted", "notification": notification}
def _get_actions_taken(self, incident: DORAIncident) -> List[str]:
"""Get list of actions taken for incident."""
return [
"Incident response team activated",
"Impact assessment initiated",
"Stakeholders notified",
"Containment procedures initiated"
]
def _build_timeline(self, incident: DORAIncident) -> List[Dict]:
"""Build incident timeline."""
return [
{"timestamp": incident.detected_at.isoformat(), "event": "Incident detected"},
{"timestamp": datetime.utcnow().isoformat(), "event": "Incident resolved"}
]
def _calculate_duration(self, incident: DORAIncident) -> float:
"""Calculate incident duration in hours."""
if incident.status == IncidentStatus.RESOLVED:
return 0 # Would calculate actual duration
return (datetime.utcnow() - incident.detected_at).total_seconds() / 3600
def _get_lessons_learned(self, incident: DORAIncident) -> List[str]:
"""Get lessons learned from incident."""
return []
def _get_preventive_measures(self, incident: DORAIncident) -> List[str]:
"""Get preventive measures to implement."""
return []
def generate_incident_report(self, period_start: datetime, period_end: datetime) -> Dict:
"""Generate incident report for reporting period."""
period_incidents = [
i for i in self.incidents.values()
if period_start <= i.detected_at <= period_end
]
report = {
"reporting_period": {
"start": period_start.isoformat(),
"end": period_end.isoformat()
},
"summary": {
"total_incidents": len(period_incidents),
"major_incidents": len([i for i in period_incidents if self._is_major_incident(i)]),
"by_severity": {},
"by_status": {},
"total_financial_impact": sum(i.financial_impact for i in period_incidents),
"total_clients_affected": sum(i.affected_clients for i in period_incidents)
},
"major_incidents_detail": [],
"notifications_submitted": []
}
for incident in period_incidents:
severity = incident.severity.value
status = incident.status.value
report["summary"]["by_severity"][severity] = \
report["summary"]["by_severity"].get(severity, 0) + 1
report["summary"]["by_status"][status] = \
report["summary"]["by_status"].get(status, 0) + 1
if self._is_major_incident(incident):
report["major_incidents_detail"].append({
"id": incident.id,
"title": incident.title,
"severity": severity,
"detected_at": incident.detected_at.isoformat(),
"reported_to_authority": incident.reported_to_authority
})
return reportDigital Resilience Testing
class DORAResilienceTesting:
def __init__(self):
self.test_plans: Dict[str, Dict] = {}
self.test_results: List[Dict] = []
def create_test_plan(self, test_type: str, scope: Dict) -> Dict:
"""Create a resilience test plan per DORA requirements."""
test_plan = {
"id": f"TEST-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
"type": test_type,
"scope": scope,
"created_at": datetime.utcnow().isoformat(),
"status": "planned",
"scenarios": [],
"schedule": None
}
# DORA requires different testing based on entity type
if test_type == "threat_led_penetration_test":
# TLPT required for significant entities
test_plan["scenarios"] = [
{
"name": "External Attack Simulation",
"description": "Simulate sophisticated external threat actor",
"target": "critical_infrastructure",
"techniques": ["social_engineering", "network_intrusion", "data_exfiltration"]
},
{
"name": "Insider Threat",
"description": "Simulate malicious insider",
"target": "internal_systems",
"techniques": ["privilege_escalation", "data_theft"]
}
]
test_plan["requirements"] = {
"red_team_provider": "certified_threat_intelligence",
"frequency": "every_3_years",
"regulator_notification": True
}
elif test_type == "scenario_based_testing":
test_plan["scenarios"] = [
{
"name": "Data Center Failure",
"description": "Simulate complete data center outage",
"rto_target": 240, # minutes
"rpo_target": 60
},
{
"name": "Cyber Attack",
"description": "Simulate ransomware attack",
"response_time_target": 30
},
{
"name": "Third-Party Failure",
"description": "Simulate critical third-party service failure",
"failover_target": 60
}
]
elif test_type == "vulnerability_assessment":
test_plan["scenarios"] = [
{"name": "Infrastructure Scan", "target": "all_infrastructure"},
{"name": "Application Security Testing", "target": "web_applications"},
{"name": "Configuration Review", "target": "security_configurations"}
]
test_plan["requirements"] = {
"frequency": "quarterly",
"scope": "all_critical_systems"
}
self.test_plans[test_plan["id"]] = test_plan
return test_plan
def execute_resilience_test(self, plan_id: str) -> Dict:
"""Execute a resilience test."""
plan = self.test_plans.get(plan_id)
if not plan:
raise ValueError(f"Test plan {plan_id} not found")
results = {
"plan_id": plan_id,
"executed_at": datetime.utcnow().isoformat(),
"status": "completed",
"scenario_results": [],
"findings": [],
"recommendations": []
}
for scenario in plan["scenarios"]:
scenario_result = self._execute_scenario(scenario)
results["scenario_results"].append(scenario_result)
if not scenario_result["passed"]:
results["findings"].append({
"scenario": scenario["name"],
"issue": scenario_result["failure_reason"],
"severity": "high" if "critical" in scenario["name"].lower() else "medium"
})
# Generate recommendations based on findings
results["recommendations"] = self._generate_recommendations(results["findings"])
self.test_results.append(results)
return results
def _execute_scenario(self, scenario: Dict) -> Dict:
"""Execute a test scenario."""
# Simplified scenario execution
return {
"scenario_name": scenario["name"],
"started_at": datetime.utcnow().isoformat(),
"completed_at": datetime.utcnow().isoformat(),
"passed": True,
"metrics": {
"rto_achieved": scenario.get("rto_target", 0) * 0.8, # Simulated
"rpo_achieved": scenario.get("rpo_target", 0) * 0.9
},
"observations": []
}
def _generate_recommendations(self, findings: List[Dict]) -> List[str]:
"""Generate recommendations from test findings."""
recommendations = []
for finding in findings:
recommendations.append(f"Address {finding['issue']} identified in {finding['scenario']}")
return recommendations
def generate_testing_report(self) -> Dict:
"""Generate annual testing report for DORA compliance."""
return {
"reporting_period": "annual",
"generated_at": datetime.utcnow().isoformat(),
"summary": {
"total_tests": len(self.test_results),
"tests_passed": len([r for r in self.test_results if r["status"] == "completed"]),
"critical_findings": len([f for r in self.test_results for f in r["findings"] if f.get("severity") == "critical"])
},
"test_results": self.test_results,
"compliance_status": "compliant" if len(self.test_results) > 0 else "non_compliant"
}Third-Party Risk Management
@dataclass
class ThirdPartyProvider:
id: str
name: str
service_type: str
criticality: RiskLevel
contract_start: datetime
contract_end: datetime
services_provided: List[str]
data_access: bool
location: str
subcontractors: List[str] = field(default_factory=list)
last_assessment: Optional[datetime] = None
risk_rating: Optional[RiskLevel] = None
class DORAThirdPartyRiskManager:
def __init__(self):
self.providers: Dict[str, ThirdPartyProvider] = {}
self.assessments: Dict[str, List[Dict]] = {}
self.contracts: Dict[str, Dict] = {}
def register_provider(self, provider: ThirdPartyProvider) -> str:
"""Register a third-party ICT service provider."""
self.providers[provider.id] = provider
# Check if critical provider requires additional controls
if provider.criticality == RiskLevel.CRITICAL:
self._flag_critical_provider(provider)
return provider.id
def _flag_critical_provider(self, provider: ThirdPartyProvider):
"""Flag critical provider for enhanced monitoring."""
print(f"ALERT: Critical third-party provider registered: {provider.name}")
# Would trigger enhanced due diligence workflow
def perform_due_diligence(self, provider_id: str) -> Dict:
"""Perform DORA-required due diligence on provider."""
provider = self.providers.get(provider_id)
if not provider:
raise ValueError(f"Provider {provider_id} not found")
assessment = {
"provider_id": provider_id,
"assessment_date": datetime.utcnow().isoformat(),
"assessment_type": "due_diligence",
"areas_assessed": [],
"findings": [],
"risk_rating": None,
"recommendations": []
}
# DORA due diligence requirements
due_diligence_areas = [
{
"area": "financial_stability",
"checks": ["financial_statements", "credit_rating", "insurance_coverage"]
},
{
"area": "security_controls",
"checks": ["certifications", "security_audit_reports", "incident_history"]
},
{
"area": "business_continuity",
"checks": ["bcp_documentation", "dr_capabilities", "rto_rpo_guarantees"]
},
{
"area": "compliance",
"checks": ["regulatory_compliance", "data_protection", "subcontractor_oversight"]
},
{
"area": "exit_strategy",
"checks": ["data_portability", "transition_support", "termination_procedures"]
}
]
for area in due_diligence_areas:
area_result = self._assess_area(provider, area)
assessment["areas_assessed"].append(area_result)
if not area_result["satisfactory"]:
assessment["findings"].append({
"area": area["area"],
"issue": area_result["issues"],
"severity": "high" if provider.criticality == RiskLevel.CRITICAL else "medium"
})
# Calculate overall risk rating
assessment["risk_rating"] = self._calculate_provider_risk(assessment)
# Update provider record
provider.last_assessment = datetime.utcnow()
provider.risk_rating = RiskLevel(assessment["risk_rating"])
# Store assessment
if provider_id not in self.assessments:
self.assessments[provider_id] = []
self.assessments[provider_id].append(assessment)
return assessment
def _assess_area(self, provider: ThirdPartyProvider, area: Dict) -> Dict:
"""Assess a specific due diligence area."""
# Simplified assessment - would include actual checks
return {
"area": area["area"],
"checks_performed": area["checks"],
"satisfactory": True, # Would be based on actual assessment
"score": 85,
"issues": []
}
def _calculate_provider_risk(self, assessment: Dict) -> str:
"""Calculate overall provider risk rating."""
finding_count = len(assessment["findings"])
high_findings = len([f for f in assessment["findings"] if f["severity"] == "high"])
if high_findings > 2:
return "critical"
elif high_findings > 0 or finding_count > 3:
return "high"
elif finding_count > 0:
return "medium"
return "low"
def verify_contract_requirements(self, provider_id: str, contract: Dict) -> Dict:
"""Verify contract meets DORA requirements."""
required_clauses = [
"service_level_agreements",
"audit_rights",
"data_location",
"subcontracting_approval",
"incident_notification",
"business_continuity",
"exit_provisions",
"data_portability",
"termination_rights",
"liability_and_indemnification"
]
verification = {
"provider_id": provider_id,
"verified_at": datetime.utcnow().isoformat(),
"compliant": True,
"clauses_verified": [],
"missing_clauses": [],
"recommendations": []
}
for clause in required_clauses:
if clause in contract.get("clauses", []):
verification["clauses_verified"].append(clause)
else:
verification["missing_clauses"].append(clause)
verification["compliant"] = False
if not verification["compliant"]:
verification["recommendations"] = [
f"Add {clause} clause to contract"
for clause in verification["missing_clauses"]
]
return verification
def generate_register(self) -> Dict:
"""Generate third-party register for regulatory reporting."""
register = {
"generated_at": datetime.utcnow().isoformat(),
"total_providers": len(self.providers),
"critical_providers": len([p for p in self.providers.values() if p.criticality == RiskLevel.CRITICAL]),
"providers": []
}
for provider in self.providers.values():
provider_entry = {
"id": provider.id,
"name": provider.name,
"service_type": provider.service_type,
"criticality": provider.criticality.value,
"services": provider.services_provided,
"data_access": provider.data_access,
"location": provider.location,
"contract_end": provider.contract_end.isoformat(),
"last_assessment": provider.last_assessment.isoformat() if provider.last_assessment else None,
"risk_rating": provider.risk_rating.value if provider.risk_rating else "not_assessed",
"subcontractors": provider.subcontractors
}
register["providers"].append(provider_entry)
return registerBest Practices
DORA Implementation
- Start with asset inventory: Identify and classify all ICT assets
- Map dependencies: Document third-party and internal dependencies
- Implement continuous monitoring: Real-time risk and incident monitoring
- Regular testing: Conduct resilience tests per DORA requirements
Compliance Maintenance
- Maintain up-to-date risk registers
- Document all incidents with required detail
- Review third-party contracts for DORA compliance
- Report major incidents within required timeframes
DORA compliance requires a comprehensive approach to digital operational resilience that spans risk management, incident handling, testing, and third-party oversight.