SOC 2 compliance demonstrates your commitment to security, availability, and data protection. This guide covers automating SOC 2 Type II compliance for continuous audit readiness.
SOC 2 Trust Services Criteria Framework
Implement a framework covering all Trust Services Criteria:
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import json
class TrustServiceCategory(Enum):
SECURITY = "security"
AVAILABILITY = "availability"
PROCESSING_INTEGRITY = "processing_integrity"
CONFIDENTIALITY = "confidentiality"
PRIVACY = "privacy"
class ControlStatus(Enum):
IMPLEMENTED = "implemented"
PARTIALLY_IMPLEMENTED = "partially_implemented"
NOT_IMPLEMENTED = "not_implemented"
NOT_APPLICABLE = "not_applicable"
class EvidenceType(Enum):
SCREENSHOT = "screenshot"
LOG_EXPORT = "log_export"
POLICY_DOCUMENT = "policy_document"
CONFIGURATION = "configuration"
AUTOMATED_TEST = "automated_test"
INTERVIEW = "interview"
OBSERVATION = "observation"
@dataclass
class Control:
control_id: str
category: TrustServiceCategory
title: str
description: str
criteria_ref: str
status: ControlStatus
owner: str
implementation_details: str
testing_frequency: str
last_tested: Optional[datetime] = None
evidence_types: List[EvidenceType] = field(default_factory=list)
@dataclass
class Evidence:
evidence_id: str
control_id: str
evidence_type: EvidenceType
title: str
description: str
collected_at: datetime
collected_by: str
file_path: Optional[str] = None
data: Optional[Dict] = None
valid_until: Optional[datetime] = None
@dataclass
class ControlTest:
test_id: str
control_id: str
test_date: datetime
tester: str
result: str
findings: List[str]
evidence_ids: List[str]
passed: bool
class SOC2ComplianceFramework:
def __init__(self):
self.controls: Dict[str, Control] = {}
self.evidence: Dict[str, List[Evidence]] = {}
self.tests: Dict[str, List[ControlTest]] = {}
self._initialize_controls()
def _initialize_controls(self):
"""Initialize SOC 2 control framework."""
controls = [
# Security Controls (CC Series)
Control(
control_id="CC1.1",
category=TrustServiceCategory.SECURITY,
title="Security Policies and Procedures",
description="Organization defines and documents security policies",
criteria_ref="CC1.1",
status=ControlStatus.NOT_IMPLEMENTED,
owner="Security Team",
implementation_details="",
testing_frequency="annually",
evidence_types=[EvidenceType.POLICY_DOCUMENT]
),
Control(
control_id="CC2.1",
category=TrustServiceCategory.SECURITY,
title="Security Awareness Training",
description="Personnel receive security awareness training",
criteria_ref="CC2.1",
status=ControlStatus.NOT_IMPLEMENTED,
owner="HR",
implementation_details="",
testing_frequency="annually",
evidence_types=[EvidenceType.LOG_EXPORT, EvidenceType.SCREENSHOT]
),
Control(
control_id="CC3.1",
category=TrustServiceCategory.SECURITY,
title="Risk Assessment Process",
description="Organization performs periodic risk assessments",
criteria_ref="CC3.1",
status=ControlStatus.NOT_IMPLEMENTED,
owner="Security Team",
implementation_details="",
testing_frequency="annually",
evidence_types=[EvidenceType.POLICY_DOCUMENT, EvidenceType.LOG_EXPORT]
),
Control(
control_id="CC5.1",
category=TrustServiceCategory.SECURITY,
title="Logical Access Controls",
description="Logical access to systems is restricted",
criteria_ref="CC5.1",
status=ControlStatus.NOT_IMPLEMENTED,
owner="IT Operations",
implementation_details="",
testing_frequency="quarterly",
evidence_types=[EvidenceType.CONFIGURATION, EvidenceType.AUTOMATED_TEST]
),
Control(
control_id="CC6.1",
category=TrustServiceCategory.SECURITY,
title="Change Management",
description="Changes follow formal change management process",
criteria_ref="CC6.1",
status=ControlStatus.NOT_IMPLEMENTED,
owner="Engineering",
implementation_details="",
testing_frequency="monthly",
evidence_types=[EvidenceType.LOG_EXPORT, EvidenceType.SCREENSHOT]
),
Control(
control_id="CC7.1",
category=TrustServiceCategory.SECURITY,
title="Vulnerability Management",
description="Vulnerabilities are identified and remediated",
criteria_ref="CC7.1",
status=ControlStatus.NOT_IMPLEMENTED,
owner="Security Team",
implementation_details="",
testing_frequency="monthly",
evidence_types=[EvidenceType.LOG_EXPORT, EvidenceType.AUTOMATED_TEST]
),
Control(
control_id="CC8.1",
category=TrustServiceCategory.SECURITY,
title="Incident Response",
description="Security incidents are detected and responded to",
criteria_ref="CC8.1",
status=ControlStatus.NOT_IMPLEMENTED,
owner="Security Team",
implementation_details="",
testing_frequency="quarterly",
evidence_types=[EvidenceType.POLICY_DOCUMENT, EvidenceType.LOG_EXPORT]
),
# Availability Controls (A Series)
Control(
control_id="A1.1",
category=TrustServiceCategory.AVAILABILITY,
title="System Availability Monitoring",
description="System availability is monitored and maintained",
criteria_ref="A1.1",
status=ControlStatus.NOT_IMPLEMENTED,
owner="IT Operations",
implementation_details="",
testing_frequency="monthly",
evidence_types=[EvidenceType.CONFIGURATION, EvidenceType.LOG_EXPORT]
),
Control(
control_id="A1.2",
category=TrustServiceCategory.AVAILABILITY,
title="Disaster Recovery",
description="Disaster recovery procedures are documented and tested",
criteria_ref="A1.2",
status=ControlStatus.NOT_IMPLEMENTED,
owner="IT Operations",
implementation_details="",
testing_frequency="annually",
evidence_types=[EvidenceType.POLICY_DOCUMENT, EvidenceType.LOG_EXPORT]
),
# Confidentiality Controls (C Series)
Control(
control_id="C1.1",
category=TrustServiceCategory.CONFIDENTIALITY,
title="Data Classification",
description="Confidential information is identified and protected",
criteria_ref="C1.1",
status=ControlStatus.NOT_IMPLEMENTED,
owner="Security Team",
implementation_details="",
testing_frequency="annually",
evidence_types=[EvidenceType.POLICY_DOCUMENT, EvidenceType.CONFIGURATION]
),
Control(
control_id="C1.2",
category=TrustServiceCategory.CONFIDENTIALITY,
title="Data Encryption",
description="Confidential data is encrypted at rest and in transit",
criteria_ref="C1.2",
status=ControlStatus.NOT_IMPLEMENTED,
owner="Engineering",
implementation_details="",
testing_frequency="quarterly",
evidence_types=[EvidenceType.CONFIGURATION, EvidenceType.AUTOMATED_TEST]
)
]
for control in controls:
self.controls[control.control_id] = control
self.evidence[control.control_id] = []
self.tests[control.control_id] = []
def update_control_status(
self,
control_id: str,
status: ControlStatus,
implementation_details: str
):
"""Update control implementation status."""
if control_id not in self.controls:
raise ValueError(f"Control {control_id} not found")
control = self.controls[control_id]
control.status = status
control.implementation_details = implementation_details
def add_evidence(self, evidence: Evidence):
"""Add evidence for a control."""
if evidence.control_id not in self.controls:
raise ValueError(f"Control {evidence.control_id} not found")
self.evidence[evidence.control_id].append(evidence)
def record_test(self, test: ControlTest):
"""Record control test results."""
if test.control_id not in self.controls:
raise ValueError(f"Control {test.control_id} not found")
self.tests[test.control_id].append(test)
self.controls[test.control_id].last_tested = test.test_date
def get_compliance_status(self) -> Dict:
"""Generate compliance status report."""
total_controls = len(self.controls)
implemented = sum(1 for c in self.controls.values() if c.status == ControlStatus.IMPLEMENTED)
partial = sum(1 for c in self.controls.values() if c.status == ControlStatus.PARTIALLY_IMPLEMENTED)
by_category = {}
for category in TrustServiceCategory:
category_controls = [c for c in self.controls.values() if c.category == category]
if category_controls:
by_category[category.value] = {
"total": len(category_controls),
"implemented": sum(1 for c in category_controls if c.status == ControlStatus.IMPLEMENTED),
"compliance_rate": sum(1 for c in category_controls if c.status == ControlStatus.IMPLEMENTED) / len(category_controls)
}
return {
"summary": {
"total_controls": total_controls,
"implemented": implemented,
"partially_implemented": partial,
"compliance_rate": implemented / total_controls if total_controls > 0 else 0
},
"by_category": by_category,
"audit_ready": implemented == total_controls
}Automated Evidence Collection
Build automated evidence collectors:
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from datetime import datetime
import boto3
import requests
class EvidenceCollector(ABC):
@abstractmethod
def collect(self, control_id: str) -> List[Evidence]:
pass
@abstractmethod
def get_supported_controls(self) -> List[str]:
pass
class AWSSecurityEvidenceCollector(EvidenceCollector):
def __init__(self, region: str = 'us-east-1'):
self.iam = boto3.client('iam', region_name=region)
self.ec2 = boto3.client('ec2', region_name=region)
self.s3 = boto3.client('s3', region_name=region)
self.cloudtrail = boto3.client('cloudtrail', region_name=region)
self.kms = boto3.client('kms', region_name=region)
def get_supported_controls(self) -> List[str]:
return ['CC5.1', 'CC6.1', 'CC7.1', 'C1.2', 'A1.1']
def collect(self, control_id: str) -> List[Evidence]:
collectors = {
'CC5.1': self._collect_access_control_evidence,
'CC6.1': self._collect_change_management_evidence,
'CC7.1': self._collect_vulnerability_evidence,
'C1.2': self._collect_encryption_evidence,
'A1.1': self._collect_availability_evidence
}
collector = collectors.get(control_id)
if collector:
return collector(control_id)
return []
def _collect_access_control_evidence(self, control_id: str) -> List[Evidence]:
evidence_list = []
# IAM Password Policy
try:
policy = self.iam.get_account_password_policy()
evidence_list.append(Evidence(
evidence_id=f"AWS-IAM-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
control_id=control_id,
evidence_type=EvidenceType.CONFIGURATION,
title="IAM Password Policy Configuration",
description="Current IAM password policy settings",
collected_at=datetime.utcnow(),
collected_by="automated",
data=policy['PasswordPolicy']
))
except Exception as e:
print(f"Failed to collect password policy: {e}")
# MFA Status
try:
users = self.iam.list_users()['Users']
mfa_status = []
for user in users:
mfa_devices = self.iam.list_mfa_devices(UserName=user['UserName'])
mfa_status.append({
'user': user['UserName'],
'mfa_enabled': len(mfa_devices['MFADevices']) > 0
})
evidence_list.append(Evidence(
evidence_id=f"AWS-MFA-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
control_id=control_id,
evidence_type=EvidenceType.CONFIGURATION,
title="MFA Status for IAM Users",
description="Multi-factor authentication status for all IAM users",
collected_at=datetime.utcnow(),
collected_by="automated",
data={'users': mfa_status, 'total': len(users), 'mfa_enabled': sum(1 for u in mfa_status if u['mfa_enabled'])}
))
except Exception as e:
print(f"Failed to collect MFA status: {e}")
return evidence_list
def _collect_change_management_evidence(self, control_id: str) -> List[Evidence]:
evidence_list = []
# CloudTrail Configuration
try:
trails = self.cloudtrail.describe_trails()['trailList']
trail_configs = []
for trail in trails:
status = self.cloudtrail.get_trail_status(Name=trail['Name'])
trail_configs.append({
'name': trail['Name'],
'is_logging': status['IsLogging'],
'is_multi_region': trail.get('IsMultiRegionTrail', False),
's3_bucket': trail.get('S3BucketName')
})
evidence_list.append(Evidence(
evidence_id=f"AWS-CT-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
control_id=control_id,
evidence_type=EvidenceType.CONFIGURATION,
title="CloudTrail Configuration",
description="AWS CloudTrail logging configuration for audit trail",
collected_at=datetime.utcnow(),
collected_by="automated",
data={'trails': trail_configs}
))
except Exception as e:
print(f"Failed to collect CloudTrail config: {e}")
return evidence_list
def _collect_encryption_evidence(self, control_id: str) -> List[Evidence]:
evidence_list = []
# S3 Bucket Encryption
try:
buckets = self.s3.list_buckets()['Buckets']
bucket_encryption = []
for bucket in buckets:
try:
encryption = self.s3.get_bucket_encryption(Bucket=bucket['Name'])
bucket_encryption.append({
'bucket': bucket['Name'],
'encrypted': True,
'algorithm': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm']
})
except self.s3.exceptions.ClientError:
bucket_encryption.append({
'bucket': bucket['Name'],
'encrypted': False
})
evidence_list.append(Evidence(
evidence_id=f"AWS-S3ENC-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
control_id=control_id,
evidence_type=EvidenceType.CONFIGURATION,
title="S3 Bucket Encryption Status",
description="Server-side encryption configuration for S3 buckets",
collected_at=datetime.utcnow(),
collected_by="automated",
data={'buckets': bucket_encryption, 'encrypted_count': sum(1 for b in bucket_encryption if b['encrypted'])}
))
except Exception as e:
print(f"Failed to collect S3 encryption: {e}")
# KMS Keys
try:
keys = self.kms.list_keys()['Keys']
key_details = []
for key in keys[:10]:
try:
key_info = self.kms.describe_key(KeyId=key['KeyId'])
key_details.append({
'key_id': key['KeyId'],
'description': key_info['KeyMetadata'].get('Description', ''),
'enabled': key_info['KeyMetadata']['Enabled'],
'key_state': key_info['KeyMetadata']['KeyState']
})
except Exception:
continue
evidence_list.append(Evidence(
evidence_id=f"AWS-KMS-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
control_id=control_id,
evidence_type=EvidenceType.CONFIGURATION,
title="KMS Key Configuration",
description="AWS KMS encryption key status",
collected_at=datetime.utcnow(),
collected_by="automated",
data={'keys': key_details}
))
except Exception as e:
print(f"Failed to collect KMS keys: {e}")
return evidence_list
def _collect_vulnerability_evidence(self, control_id: str) -> List[Evidence]:
# This would integrate with AWS Inspector or similar
return []
def _collect_availability_evidence(self, control_id: str) -> List[Evidence]:
evidence_list = []
# EC2 Instance Status
try:
instances = self.ec2.describe_instances()
instance_status = []
for reservation in instances['Reservations']:
for instance in reservation['Instances']:
instance_status.append({
'instance_id': instance['InstanceId'],
'state': instance['State']['Name'],
'availability_zone': instance['Placement']['AvailabilityZone']
})
evidence_list.append(Evidence(
evidence_id=f"AWS-EC2-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
control_id=control_id,
evidence_type=EvidenceType.CONFIGURATION,
title="EC2 Instance Availability Status",
description="Current state and distribution of EC2 instances",
collected_at=datetime.utcnow(),
collected_by="automated",
data={'instances': instance_status}
))
except Exception as e:
print(f"Failed to collect EC2 status: {e}")
return evidence_list
class GitHubEvidenceCollector(EvidenceCollector):
def __init__(self, token: str, org: str):
self.token = token
self.org = org
self.headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
self.base_url = 'https://api.github.com'
def get_supported_controls(self) -> List[str]:
return ['CC6.1', 'CC5.1']
def collect(self, control_id: str) -> List[Evidence]:
if control_id == 'CC6.1':
return self._collect_change_management_evidence(control_id)
elif control_id == 'CC5.1':
return self._collect_access_control_evidence(control_id)
return []
def _collect_change_management_evidence(self, control_id: str) -> List[Evidence]:
evidence_list = []
# Branch Protection Rules
repos = self._get_repos()
protection_status = []
for repo in repos[:20]:
try:
response = requests.get(
f"{self.base_url}/repos/{self.org}/{repo['name']}/branches/{repo['default_branch']}/protection",
headers=self.headers
)
if response.status_code == 200:
protection = response.json()
protection_status.append({
'repo': repo['name'],
'protected': True,
'require_reviews': protection.get('required_pull_request_reviews') is not None,
'require_status_checks': protection.get('required_status_checks') is not None
})
else:
protection_status.append({
'repo': repo['name'],
'protected': False
})
except Exception:
continue
evidence_list.append(Evidence(
evidence_id=f"GH-BP-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
control_id=control_id,
evidence_type=EvidenceType.CONFIGURATION,
title="GitHub Branch Protection Status",
description="Branch protection rules for repositories",
collected_at=datetime.utcnow(),
collected_by="automated",
data={'repos': protection_status, 'protected_count': sum(1 for r in protection_status if r['protected'])}
))
return evidence_list
def _collect_access_control_evidence(self, control_id: str) -> List[Evidence]:
evidence_list = []
# Organization Members and 2FA
try:
response = requests.get(
f"{self.base_url}/orgs/{self.org}/members?filter=2fa_disabled",
headers=self.headers
)
members_without_2fa = response.json() if response.status_code == 200 else []
all_members = requests.get(
f"{self.base_url}/orgs/{self.org}/members",
headers=self.headers
).json()
evidence_list.append(Evidence(
evidence_id=f"GH-2FA-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
control_id=control_id,
evidence_type=EvidenceType.CONFIGURATION,
title="GitHub Organization 2FA Status",
description="Two-factor authentication status for organization members",
collected_at=datetime.utcnow(),
collected_by="automated",
data={
'total_members': len(all_members),
'members_without_2fa': len(members_without_2fa),
'2fa_compliance_rate': (len(all_members) - len(members_without_2fa)) / len(all_members) if all_members else 0
}
))
except Exception as e:
print(f"Failed to collect GitHub 2FA status: {e}")
return evidence_list
def _get_repos(self) -> List[Dict]:
response = requests.get(
f"{self.base_url}/orgs/{self.org}/repos",
headers=self.headers
)
return response.json() if response.status_code == 200 else []Continuous Compliance Monitoring
Implement continuous monitoring:
from typing import List, Dict, Callable
from datetime import datetime, timedelta
import schedule
import threading
class ComplianceMonitor:
def __init__(self, framework: SOC2ComplianceFramework):
self.framework = framework
self.collectors: List[EvidenceCollector] = []
self.alerts: List[Dict] = []
self.thresholds = {
'evidence_age_days': 90,
'min_compliance_rate': 0.95,
'test_frequency_days': {
'monthly': 30,
'quarterly': 90,
'annually': 365
}
}
def register_collector(self, collector: EvidenceCollector):
self.collectors.append(collector)
def run_collection_cycle(self):
"""Run evidence collection for all controls."""
for control_id, control in self.framework.controls.items():
if control.status == ControlStatus.NOT_APPLICABLE:
continue
for collector in self.collectors:
if control_id in collector.get_supported_controls():
try:
evidence_items = collector.collect(control_id)
for evidence in evidence_items:
self.framework.add_evidence(evidence)
print(f"Collected {len(evidence_items)} evidence items for {control_id}")
except Exception as e:
self._create_alert(
'collection_failure',
f"Failed to collect evidence for {control_id}: {e}",
'high'
)
def check_compliance_health(self) -> Dict:
"""Check overall compliance health."""
issues = []
# Check evidence freshness
for control_id, evidence_list in self.framework.evidence.items():
if not evidence_list:
issues.append({
'control': control_id,
'issue': 'No evidence collected',
'severity': 'high'
})
continue
latest = max(evidence_list, key=lambda e: e.collected_at)
age_days = (datetime.utcnow() - latest.collected_at).days
if age_days > self.thresholds['evidence_age_days']:
issues.append({
'control': control_id,
'issue': f'Evidence is {age_days} days old',
'severity': 'medium'
})
# Check test frequency
for control_id, control in self.framework.controls.items():
if control.status == ControlStatus.NOT_APPLICABLE:
continue
if not control.last_tested:
issues.append({
'control': control_id,
'issue': 'Control has never been tested',
'severity': 'high'
})
continue
max_days = self.thresholds['test_frequency_days'].get(control.testing_frequency, 365)
days_since_test = (datetime.utcnow() - control.last_tested).days
if days_since_test > max_days:
issues.append({
'control': control_id,
'issue': f'Test overdue by {days_since_test - max_days} days',
'severity': 'medium'
})
# Check compliance rate
status = self.framework.get_compliance_status()
if status['summary']['compliance_rate'] < self.thresholds['min_compliance_rate']:
issues.append({
'control': 'overall',
'issue': f"Compliance rate {status['summary']['compliance_rate']:.1%} below threshold",
'severity': 'high'
})
return {
'health_check_time': datetime.utcnow().isoformat(),
'issues_found': len(issues),
'issues': issues,
'compliance_status': status
}
def _create_alert(self, alert_type: str, message: str, severity: str):
alert = {
'type': alert_type,
'message': message,
'severity': severity,
'created_at': datetime.utcnow().isoformat()
}
self.alerts.append(alert)
print(f"ALERT [{severity.upper()}]: {message}")
def schedule_monitoring(self):
"""Schedule regular compliance monitoring."""
schedule.every().day.at("02:00").do(self.run_collection_cycle)
schedule.every().hour.do(self.check_compliance_health)
def run_scheduler():
while True:
schedule.run_pending()
import time
time.sleep(60)
thread = threading.Thread(target=run_scheduler, daemon=True)
thread.start()Audit Report Generation
Generate audit-ready reports:
from typing import Dict, List
from datetime import datetime
import json
class AuditReportGenerator:
def __init__(self, framework: SOC2ComplianceFramework):
self.framework = framework
def generate_audit_package(self, audit_period_start: datetime, audit_period_end: datetime) -> Dict:
"""Generate complete audit package."""
return {
"metadata": {
"report_type": "SOC 2 Type II",
"audit_period": {
"start": audit_period_start.isoformat(),
"end": audit_period_end.isoformat()
},
"generated_at": datetime.utcnow().isoformat()
},
"executive_summary": self._generate_executive_summary(),
"control_matrix": self._generate_control_matrix(),
"evidence_inventory": self._generate_evidence_inventory(audit_period_start, audit_period_end),
"test_results": self._generate_test_summary(audit_period_start, audit_period_end),
"gaps_and_remediation": self._identify_gaps()
}
def _generate_executive_summary(self) -> Dict:
status = self.framework.get_compliance_status()
return {
"compliance_rate": status['summary']['compliance_rate'],
"controls_assessed": status['summary']['total_controls'],
"controls_implemented": status['summary']['implemented'],
"audit_ready": status['audit_ready'],
"categories_covered": list(status['by_category'].keys())
}
def _generate_control_matrix(self) -> List[Dict]:
matrix = []
for control_id, control in self.framework.controls.items():
evidence_count = len(self.framework.evidence.get(control_id, []))
test_count = len(self.framework.tests.get(control_id, []))
matrix.append({
"control_id": control_id,
"category": control.category.value,
"title": control.title,
"criteria_ref": control.criteria_ref,
"status": control.status.value,
"owner": control.owner,
"implementation_details": control.implementation_details,
"testing_frequency": control.testing_frequency,
"last_tested": control.last_tested.isoformat() if control.last_tested else None,
"evidence_count": evidence_count,
"test_count": test_count
})
return matrix
def _generate_evidence_inventory(self, start: datetime, end: datetime) -> List[Dict]:
inventory = []
for control_id, evidence_list in self.framework.evidence.items():
for evidence in evidence_list:
if start <= evidence.collected_at <= end:
inventory.append({
"evidence_id": evidence.evidence_id,
"control_id": evidence.control_id,
"type": evidence.evidence_type.value,
"title": evidence.title,
"collected_at": evidence.collected_at.isoformat(),
"collected_by": evidence.collected_by,
"file_path": evidence.file_path
})
return inventory
def _generate_test_summary(self, start: datetime, end: datetime) -> Dict:
all_tests = []
passed = 0
failed = 0
for control_id, tests in self.framework.tests.items():
for test in tests:
if start <= test.test_date <= end:
all_tests.append({
"test_id": test.test_id,
"control_id": test.control_id,
"date": test.test_date.isoformat(),
"tester": test.tester,
"result": test.result,
"passed": test.passed,
"findings": test.findings
})
if test.passed:
passed += 1
else:
failed += 1
return {
"total_tests": len(all_tests),
"passed": passed,
"failed": failed,
"pass_rate": passed / len(all_tests) if all_tests else 0,
"tests": all_tests
}
def _identify_gaps(self) -> List[Dict]:
gaps = []
for control_id, control in self.framework.controls.items():
if control.status == ControlStatus.NOT_IMPLEMENTED:
gaps.append({
"control_id": control_id,
"gap_type": "not_implemented",
"description": f"Control {control.title} is not implemented",
"remediation": f"Implement {control.description}",
"priority": "high"
})
elif control.status == ControlStatus.PARTIALLY_IMPLEMENTED:
gaps.append({
"control_id": control_id,
"gap_type": "partial_implementation",
"description": f"Control {control.title} is only partially implemented",
"remediation": "Complete implementation per control requirements",
"priority": "medium"
})
evidence = self.framework.evidence.get(control_id, [])
if not evidence:
gaps.append({
"control_id": control_id,
"gap_type": "missing_evidence",
"description": f"No evidence collected for {control.title}",
"remediation": "Collect and document evidence",
"priority": "high"
})
return gaps
def export_for_auditor(self, output_dir: str):
"""Export audit package for external auditor."""
import os
os.makedirs(output_dir, exist_ok=True)
audit_period_start = datetime.utcnow() - timedelta(days=365)
audit_period_end = datetime.utcnow()
package = self.generate_audit_package(audit_period_start, audit_period_end)
with open(f"{output_dir}/audit_report.json", 'w') as f:
json.dump(package, f, indent=2)
with open(f"{output_dir}/control_matrix.json", 'w') as f:
json.dump(package['control_matrix'], f, indent=2)
with open(f"{output_dir}/evidence_inventory.json", 'w') as f:
json.dump(package['evidence_inventory'], f, indent=2)
print(f"Audit package exported to {output_dir}")Conclusion
SOC 2 compliance automation transforms audit preparation from a periodic scramble into continuous readiness. Implement automated evidence collection, continuous monitoring, and gap tracking to maintain compliance posture. Regular health checks ensure controls remain effective and evidence stays current throughout the audit period.