Compliance

SOC 2 Compliance Automation for SaaS Companies

DeviDevs Team
11 min read
#soc2#compliance-automation#saas-security#audit-readiness#trust-services

SOC 2 compliance demonstrates your commitment to security, availability, and data protection. This guide covers automating SOC 2 Type II compliance for continuous audit readiness.

SOC 2 Trust Services Criteria Framework

Implement a framework covering all Trust Services Criteria:

from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
from datetime import datetime, timedelta
from abc import ABC, abstractmethod
import json
 
class TrustServiceCategory(Enum):
    SECURITY = "security"
    AVAILABILITY = "availability"
    PROCESSING_INTEGRITY = "processing_integrity"
    CONFIDENTIALITY = "confidentiality"
    PRIVACY = "privacy"
 
class ControlStatus(Enum):
    IMPLEMENTED = "implemented"
    PARTIALLY_IMPLEMENTED = "partially_implemented"
    NOT_IMPLEMENTED = "not_implemented"
    NOT_APPLICABLE = "not_applicable"
 
class EvidenceType(Enum):
    SCREENSHOT = "screenshot"
    LOG_EXPORT = "log_export"
    POLICY_DOCUMENT = "policy_document"
    CONFIGURATION = "configuration"
    AUTOMATED_TEST = "automated_test"
    INTERVIEW = "interview"
    OBSERVATION = "observation"
 
@dataclass
class Control:
    control_id: str
    category: TrustServiceCategory
    title: str
    description: str
    criteria_ref: str
    status: ControlStatus
    owner: str
    implementation_details: str
    testing_frequency: str
    last_tested: Optional[datetime] = None
    evidence_types: List[EvidenceType] = field(default_factory=list)
 
@dataclass
class Evidence:
    evidence_id: str
    control_id: str
    evidence_type: EvidenceType
    title: str
    description: str
    collected_at: datetime
    collected_by: str
    file_path: Optional[str] = None
    data: Optional[Dict] = None
    valid_until: Optional[datetime] = None
 
@dataclass
class ControlTest:
    test_id: str
    control_id: str
    test_date: datetime
    tester: str
    result: str
    findings: List[str]
    evidence_ids: List[str]
    passed: bool
 
class SOC2ComplianceFramework:
    def __init__(self):
        self.controls: Dict[str, Control] = {}
        self.evidence: Dict[str, List[Evidence]] = {}
        self.tests: Dict[str, List[ControlTest]] = {}
        self._initialize_controls()
 
    def _initialize_controls(self):
        """Initialize SOC 2 control framework."""
        controls = [
            # Security Controls (CC Series)
            Control(
                control_id="CC1.1",
                category=TrustServiceCategory.SECURITY,
                title="Security Policies and Procedures",
                description="Organization defines and documents security policies",
                criteria_ref="CC1.1",
                status=ControlStatus.NOT_IMPLEMENTED,
                owner="Security Team",
                implementation_details="",
                testing_frequency="annually",
                evidence_types=[EvidenceType.POLICY_DOCUMENT]
            ),
            Control(
                control_id="CC2.1",
                category=TrustServiceCategory.SECURITY,
                title="Security Awareness Training",
                description="Personnel receive security awareness training",
                criteria_ref="CC2.1",
                status=ControlStatus.NOT_IMPLEMENTED,
                owner="HR",
                implementation_details="",
                testing_frequency="annually",
                evidence_types=[EvidenceType.LOG_EXPORT, EvidenceType.SCREENSHOT]
            ),
            Control(
                control_id="CC3.1",
                category=TrustServiceCategory.SECURITY,
                title="Risk Assessment Process",
                description="Organization performs periodic risk assessments",
                criteria_ref="CC3.1",
                status=ControlStatus.NOT_IMPLEMENTED,
                owner="Security Team",
                implementation_details="",
                testing_frequency="annually",
                evidence_types=[EvidenceType.POLICY_DOCUMENT, EvidenceType.LOG_EXPORT]
            ),
            Control(
                control_id="CC5.1",
                category=TrustServiceCategory.SECURITY,
                title="Logical Access Controls",
                description="Logical access to systems is restricted",
                criteria_ref="CC5.1",
                status=ControlStatus.NOT_IMPLEMENTED,
                owner="IT Operations",
                implementation_details="",
                testing_frequency="quarterly",
                evidence_types=[EvidenceType.CONFIGURATION, EvidenceType.AUTOMATED_TEST]
            ),
            Control(
                control_id="CC6.1",
                category=TrustServiceCategory.SECURITY,
                title="Change Management",
                description="Changes follow formal change management process",
                criteria_ref="CC6.1",
                status=ControlStatus.NOT_IMPLEMENTED,
                owner="Engineering",
                implementation_details="",
                testing_frequency="monthly",
                evidence_types=[EvidenceType.LOG_EXPORT, EvidenceType.SCREENSHOT]
            ),
            Control(
                control_id="CC7.1",
                category=TrustServiceCategory.SECURITY,
                title="Vulnerability Management",
                description="Vulnerabilities are identified and remediated",
                criteria_ref="CC7.1",
                status=ControlStatus.NOT_IMPLEMENTED,
                owner="Security Team",
                implementation_details="",
                testing_frequency="monthly",
                evidence_types=[EvidenceType.LOG_EXPORT, EvidenceType.AUTOMATED_TEST]
            ),
            Control(
                control_id="CC8.1",
                category=TrustServiceCategory.SECURITY,
                title="Incident Response",
                description="Security incidents are detected and responded to",
                criteria_ref="CC8.1",
                status=ControlStatus.NOT_IMPLEMENTED,
                owner="Security Team",
                implementation_details="",
                testing_frequency="quarterly",
                evidence_types=[EvidenceType.POLICY_DOCUMENT, EvidenceType.LOG_EXPORT]
            ),
            # Availability Controls (A Series)
            Control(
                control_id="A1.1",
                category=TrustServiceCategory.AVAILABILITY,
                title="System Availability Monitoring",
                description="System availability is monitored and maintained",
                criteria_ref="A1.1",
                status=ControlStatus.NOT_IMPLEMENTED,
                owner="IT Operations",
                implementation_details="",
                testing_frequency="monthly",
                evidence_types=[EvidenceType.CONFIGURATION, EvidenceType.LOG_EXPORT]
            ),
            Control(
                control_id="A1.2",
                category=TrustServiceCategory.AVAILABILITY,
                title="Disaster Recovery",
                description="Disaster recovery procedures are documented and tested",
                criteria_ref="A1.2",
                status=ControlStatus.NOT_IMPLEMENTED,
                owner="IT Operations",
                implementation_details="",
                testing_frequency="annually",
                evidence_types=[EvidenceType.POLICY_DOCUMENT, EvidenceType.LOG_EXPORT]
            ),
            # Confidentiality Controls (C Series)
            Control(
                control_id="C1.1",
                category=TrustServiceCategory.CONFIDENTIALITY,
                title="Data Classification",
                description="Confidential information is identified and protected",
                criteria_ref="C1.1",
                status=ControlStatus.NOT_IMPLEMENTED,
                owner="Security Team",
                implementation_details="",
                testing_frequency="annually",
                evidence_types=[EvidenceType.POLICY_DOCUMENT, EvidenceType.CONFIGURATION]
            ),
            Control(
                control_id="C1.2",
                category=TrustServiceCategory.CONFIDENTIALITY,
                title="Data Encryption",
                description="Confidential data is encrypted at rest and in transit",
                criteria_ref="C1.2",
                status=ControlStatus.NOT_IMPLEMENTED,
                owner="Engineering",
                implementation_details="",
                testing_frequency="quarterly",
                evidence_types=[EvidenceType.CONFIGURATION, EvidenceType.AUTOMATED_TEST]
            )
        ]
 
        for control in controls:
            self.controls[control.control_id] = control
            self.evidence[control.control_id] = []
            self.tests[control.control_id] = []
 
    def update_control_status(
        self,
        control_id: str,
        status: ControlStatus,
        implementation_details: str
    ):
        """Update control implementation status."""
        if control_id not in self.controls:
            raise ValueError(f"Control {control_id} not found")
 
        control = self.controls[control_id]
        control.status = status
        control.implementation_details = implementation_details
 
    def add_evidence(self, evidence: Evidence):
        """Add evidence for a control."""
        if evidence.control_id not in self.controls:
            raise ValueError(f"Control {evidence.control_id} not found")
 
        self.evidence[evidence.control_id].append(evidence)
 
    def record_test(self, test: ControlTest):
        """Record control test results."""
        if test.control_id not in self.controls:
            raise ValueError(f"Control {test.control_id} not found")
 
        self.tests[test.control_id].append(test)
        self.controls[test.control_id].last_tested = test.test_date
 
    def get_compliance_status(self) -> Dict:
        """Generate compliance status report."""
        total_controls = len(self.controls)
        implemented = sum(1 for c in self.controls.values() if c.status == ControlStatus.IMPLEMENTED)
        partial = sum(1 for c in self.controls.values() if c.status == ControlStatus.PARTIALLY_IMPLEMENTED)
 
        by_category = {}
        for category in TrustServiceCategory:
            category_controls = [c for c in self.controls.values() if c.category == category]
            if category_controls:
                by_category[category.value] = {
                    "total": len(category_controls),
                    "implemented": sum(1 for c in category_controls if c.status == ControlStatus.IMPLEMENTED),
                    "compliance_rate": sum(1 for c in category_controls if c.status == ControlStatus.IMPLEMENTED) / len(category_controls)
                }
 
        return {
            "summary": {
                "total_controls": total_controls,
                "implemented": implemented,
                "partially_implemented": partial,
                "compliance_rate": implemented / total_controls if total_controls > 0 else 0
            },
            "by_category": by_category,
            "audit_ready": implemented == total_controls
        }

Automated Evidence Collection

Build automated evidence collectors:

from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from datetime import datetime
import boto3
import requests
 
class EvidenceCollector(ABC):
    @abstractmethod
    def collect(self, control_id: str) -> List[Evidence]:
        pass
 
    @abstractmethod
    def get_supported_controls(self) -> List[str]:
        pass
 
class AWSSecurityEvidenceCollector(EvidenceCollector):
    def __init__(self, region: str = 'us-east-1'):
        self.iam = boto3.client('iam', region_name=region)
        self.ec2 = boto3.client('ec2', region_name=region)
        self.s3 = boto3.client('s3', region_name=region)
        self.cloudtrail = boto3.client('cloudtrail', region_name=region)
        self.kms = boto3.client('kms', region_name=region)
 
    def get_supported_controls(self) -> List[str]:
        return ['CC5.1', 'CC6.1', 'CC7.1', 'C1.2', 'A1.1']
 
    def collect(self, control_id: str) -> List[Evidence]:
        collectors = {
            'CC5.1': self._collect_access_control_evidence,
            'CC6.1': self._collect_change_management_evidence,
            'CC7.1': self._collect_vulnerability_evidence,
            'C1.2': self._collect_encryption_evidence,
            'A1.1': self._collect_availability_evidence
        }
 
        collector = collectors.get(control_id)
        if collector:
            return collector(control_id)
        return []
 
    def _collect_access_control_evidence(self, control_id: str) -> List[Evidence]:
        evidence_list = []
 
        # IAM Password Policy
        try:
            policy = self.iam.get_account_password_policy()
            evidence_list.append(Evidence(
                evidence_id=f"AWS-IAM-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
                control_id=control_id,
                evidence_type=EvidenceType.CONFIGURATION,
                title="IAM Password Policy Configuration",
                description="Current IAM password policy settings",
                collected_at=datetime.utcnow(),
                collected_by="automated",
                data=policy['PasswordPolicy']
            ))
        except Exception as e:
            print(f"Failed to collect password policy: {e}")
 
        # MFA Status
        try:
            users = self.iam.list_users()['Users']
            mfa_status = []
            for user in users:
                mfa_devices = self.iam.list_mfa_devices(UserName=user['UserName'])
                mfa_status.append({
                    'user': user['UserName'],
                    'mfa_enabled': len(mfa_devices['MFADevices']) > 0
                })
 
            evidence_list.append(Evidence(
                evidence_id=f"AWS-MFA-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
                control_id=control_id,
                evidence_type=EvidenceType.CONFIGURATION,
                title="MFA Status for IAM Users",
                description="Multi-factor authentication status for all IAM users",
                collected_at=datetime.utcnow(),
                collected_by="automated",
                data={'users': mfa_status, 'total': len(users), 'mfa_enabled': sum(1 for u in mfa_status if u['mfa_enabled'])}
            ))
        except Exception as e:
            print(f"Failed to collect MFA status: {e}")
 
        return evidence_list
 
    def _collect_change_management_evidence(self, control_id: str) -> List[Evidence]:
        evidence_list = []
 
        # CloudTrail Configuration
        try:
            trails = self.cloudtrail.describe_trails()['trailList']
            trail_configs = []
            for trail in trails:
                status = self.cloudtrail.get_trail_status(Name=trail['Name'])
                trail_configs.append({
                    'name': trail['Name'],
                    'is_logging': status['IsLogging'],
                    'is_multi_region': trail.get('IsMultiRegionTrail', False),
                    's3_bucket': trail.get('S3BucketName')
                })
 
            evidence_list.append(Evidence(
                evidence_id=f"AWS-CT-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
                control_id=control_id,
                evidence_type=EvidenceType.CONFIGURATION,
                title="CloudTrail Configuration",
                description="AWS CloudTrail logging configuration for audit trail",
                collected_at=datetime.utcnow(),
                collected_by="automated",
                data={'trails': trail_configs}
            ))
        except Exception as e:
            print(f"Failed to collect CloudTrail config: {e}")
 
        return evidence_list
 
    def _collect_encryption_evidence(self, control_id: str) -> List[Evidence]:
        evidence_list = []
 
        # S3 Bucket Encryption
        try:
            buckets = self.s3.list_buckets()['Buckets']
            bucket_encryption = []
            for bucket in buckets:
                try:
                    encryption = self.s3.get_bucket_encryption(Bucket=bucket['Name'])
                    bucket_encryption.append({
                        'bucket': bucket['Name'],
                        'encrypted': True,
                        'algorithm': encryption['ServerSideEncryptionConfiguration']['Rules'][0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm']
                    })
                except self.s3.exceptions.ClientError:
                    bucket_encryption.append({
                        'bucket': bucket['Name'],
                        'encrypted': False
                    })
 
            evidence_list.append(Evidence(
                evidence_id=f"AWS-S3ENC-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
                control_id=control_id,
                evidence_type=EvidenceType.CONFIGURATION,
                title="S3 Bucket Encryption Status",
                description="Server-side encryption configuration for S3 buckets",
                collected_at=datetime.utcnow(),
                collected_by="automated",
                data={'buckets': bucket_encryption, 'encrypted_count': sum(1 for b in bucket_encryption if b['encrypted'])}
            ))
        except Exception as e:
            print(f"Failed to collect S3 encryption: {e}")
 
        # KMS Keys
        try:
            keys = self.kms.list_keys()['Keys']
            key_details = []
            for key in keys[:10]:
                try:
                    key_info = self.kms.describe_key(KeyId=key['KeyId'])
                    key_details.append({
                        'key_id': key['KeyId'],
                        'description': key_info['KeyMetadata'].get('Description', ''),
                        'enabled': key_info['KeyMetadata']['Enabled'],
                        'key_state': key_info['KeyMetadata']['KeyState']
                    })
                except Exception:
                    continue
 
            evidence_list.append(Evidence(
                evidence_id=f"AWS-KMS-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
                control_id=control_id,
                evidence_type=EvidenceType.CONFIGURATION,
                title="KMS Key Configuration",
                description="AWS KMS encryption key status",
                collected_at=datetime.utcnow(),
                collected_by="automated",
                data={'keys': key_details}
            ))
        except Exception as e:
            print(f"Failed to collect KMS keys: {e}")
 
        return evidence_list
 
    def _collect_vulnerability_evidence(self, control_id: str) -> List[Evidence]:
        # This would integrate with AWS Inspector or similar
        return []
 
    def _collect_availability_evidence(self, control_id: str) -> List[Evidence]:
        evidence_list = []
 
        # EC2 Instance Status
        try:
            instances = self.ec2.describe_instances()
            instance_status = []
            for reservation in instances['Reservations']:
                for instance in reservation['Instances']:
                    instance_status.append({
                        'instance_id': instance['InstanceId'],
                        'state': instance['State']['Name'],
                        'availability_zone': instance['Placement']['AvailabilityZone']
                    })
 
            evidence_list.append(Evidence(
                evidence_id=f"AWS-EC2-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
                control_id=control_id,
                evidence_type=EvidenceType.CONFIGURATION,
                title="EC2 Instance Availability Status",
                description="Current state and distribution of EC2 instances",
                collected_at=datetime.utcnow(),
                collected_by="automated",
                data={'instances': instance_status}
            ))
        except Exception as e:
            print(f"Failed to collect EC2 status: {e}")
 
        return evidence_list
 
class GitHubEvidenceCollector(EvidenceCollector):
    def __init__(self, token: str, org: str):
        self.token = token
        self.org = org
        self.headers = {
            'Authorization': f'token {token}',
            'Accept': 'application/vnd.github.v3+json'
        }
        self.base_url = 'https://api.github.com'
 
    def get_supported_controls(self) -> List[str]:
        return ['CC6.1', 'CC5.1']
 
    def collect(self, control_id: str) -> List[Evidence]:
        if control_id == 'CC6.1':
            return self._collect_change_management_evidence(control_id)
        elif control_id == 'CC5.1':
            return self._collect_access_control_evidence(control_id)
        return []
 
    def _collect_change_management_evidence(self, control_id: str) -> List[Evidence]:
        evidence_list = []
 
        # Branch Protection Rules
        repos = self._get_repos()
        protection_status = []
 
        for repo in repos[:20]:
            try:
                response = requests.get(
                    f"{self.base_url}/repos/{self.org}/{repo['name']}/branches/{repo['default_branch']}/protection",
                    headers=self.headers
                )
                if response.status_code == 200:
                    protection = response.json()
                    protection_status.append({
                        'repo': repo['name'],
                        'protected': True,
                        'require_reviews': protection.get('required_pull_request_reviews') is not None,
                        'require_status_checks': protection.get('required_status_checks') is not None
                    })
                else:
                    protection_status.append({
                        'repo': repo['name'],
                        'protected': False
                    })
            except Exception:
                continue
 
        evidence_list.append(Evidence(
            evidence_id=f"GH-BP-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
            control_id=control_id,
            evidence_type=EvidenceType.CONFIGURATION,
            title="GitHub Branch Protection Status",
            description="Branch protection rules for repositories",
            collected_at=datetime.utcnow(),
            collected_by="automated",
            data={'repos': protection_status, 'protected_count': sum(1 for r in protection_status if r['protected'])}
        ))
 
        return evidence_list
 
    def _collect_access_control_evidence(self, control_id: str) -> List[Evidence]:
        evidence_list = []
 
        # Organization Members and 2FA
        try:
            response = requests.get(
                f"{self.base_url}/orgs/{self.org}/members?filter=2fa_disabled",
                headers=self.headers
            )
            members_without_2fa = response.json() if response.status_code == 200 else []
 
            all_members = requests.get(
                f"{self.base_url}/orgs/{self.org}/members",
                headers=self.headers
            ).json()
 
            evidence_list.append(Evidence(
                evidence_id=f"GH-2FA-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
                control_id=control_id,
                evidence_type=EvidenceType.CONFIGURATION,
                title="GitHub Organization 2FA Status",
                description="Two-factor authentication status for organization members",
                collected_at=datetime.utcnow(),
                collected_by="automated",
                data={
                    'total_members': len(all_members),
                    'members_without_2fa': len(members_without_2fa),
                    '2fa_compliance_rate': (len(all_members) - len(members_without_2fa)) / len(all_members) if all_members else 0
                }
            ))
        except Exception as e:
            print(f"Failed to collect GitHub 2FA status: {e}")
 
        return evidence_list
 
    def _get_repos(self) -> List[Dict]:
        response = requests.get(
            f"{self.base_url}/orgs/{self.org}/repos",
            headers=self.headers
        )
        return response.json() if response.status_code == 200 else []

Continuous Compliance Monitoring

Implement continuous monitoring:

from typing import List, Dict, Callable
from datetime import datetime, timedelta
import schedule
import threading
 
class ComplianceMonitor:
    def __init__(self, framework: SOC2ComplianceFramework):
        self.framework = framework
        self.collectors: List[EvidenceCollector] = []
        self.alerts: List[Dict] = []
        self.thresholds = {
            'evidence_age_days': 90,
            'min_compliance_rate': 0.95,
            'test_frequency_days': {
                'monthly': 30,
                'quarterly': 90,
                'annually': 365
            }
        }
 
    def register_collector(self, collector: EvidenceCollector):
        self.collectors.append(collector)
 
    def run_collection_cycle(self):
        """Run evidence collection for all controls."""
        for control_id, control in self.framework.controls.items():
            if control.status == ControlStatus.NOT_APPLICABLE:
                continue
 
            for collector in self.collectors:
                if control_id in collector.get_supported_controls():
                    try:
                        evidence_items = collector.collect(control_id)
                        for evidence in evidence_items:
                            self.framework.add_evidence(evidence)
                        print(f"Collected {len(evidence_items)} evidence items for {control_id}")
                    except Exception as e:
                        self._create_alert(
                            'collection_failure',
                            f"Failed to collect evidence for {control_id}: {e}",
                            'high'
                        )
 
    def check_compliance_health(self) -> Dict:
        """Check overall compliance health."""
        issues = []
 
        # Check evidence freshness
        for control_id, evidence_list in self.framework.evidence.items():
            if not evidence_list:
                issues.append({
                    'control': control_id,
                    'issue': 'No evidence collected',
                    'severity': 'high'
                })
                continue
 
            latest = max(evidence_list, key=lambda e: e.collected_at)
            age_days = (datetime.utcnow() - latest.collected_at).days
 
            if age_days > self.thresholds['evidence_age_days']:
                issues.append({
                    'control': control_id,
                    'issue': f'Evidence is {age_days} days old',
                    'severity': 'medium'
                })
 
        # Check test frequency
        for control_id, control in self.framework.controls.items():
            if control.status == ControlStatus.NOT_APPLICABLE:
                continue
 
            if not control.last_tested:
                issues.append({
                    'control': control_id,
                    'issue': 'Control has never been tested',
                    'severity': 'high'
                })
                continue
 
            max_days = self.thresholds['test_frequency_days'].get(control.testing_frequency, 365)
            days_since_test = (datetime.utcnow() - control.last_tested).days
 
            if days_since_test > max_days:
                issues.append({
                    'control': control_id,
                    'issue': f'Test overdue by {days_since_test - max_days} days',
                    'severity': 'medium'
                })
 
        # Check compliance rate
        status = self.framework.get_compliance_status()
        if status['summary']['compliance_rate'] < self.thresholds['min_compliance_rate']:
            issues.append({
                'control': 'overall',
                'issue': f"Compliance rate {status['summary']['compliance_rate']:.1%} below threshold",
                'severity': 'high'
            })
 
        return {
            'health_check_time': datetime.utcnow().isoformat(),
            'issues_found': len(issues),
            'issues': issues,
            'compliance_status': status
        }
 
    def _create_alert(self, alert_type: str, message: str, severity: str):
        alert = {
            'type': alert_type,
            'message': message,
            'severity': severity,
            'created_at': datetime.utcnow().isoformat()
        }
        self.alerts.append(alert)
        print(f"ALERT [{severity.upper()}]: {message}")
 
    def schedule_monitoring(self):
        """Schedule regular compliance monitoring."""
        schedule.every().day.at("02:00").do(self.run_collection_cycle)
        schedule.every().hour.do(self.check_compliance_health)
 
        def run_scheduler():
            while True:
                schedule.run_pending()
                import time
                time.sleep(60)
 
        thread = threading.Thread(target=run_scheduler, daemon=True)
        thread.start()

Audit Report Generation

Generate audit-ready reports:

from typing import Dict, List
from datetime import datetime
import json
 
class AuditReportGenerator:
    def __init__(self, framework: SOC2ComplianceFramework):
        self.framework = framework
 
    def generate_audit_package(self, audit_period_start: datetime, audit_period_end: datetime) -> Dict:
        """Generate complete audit package."""
        return {
            "metadata": {
                "report_type": "SOC 2 Type II",
                "audit_period": {
                    "start": audit_period_start.isoformat(),
                    "end": audit_period_end.isoformat()
                },
                "generated_at": datetime.utcnow().isoformat()
            },
            "executive_summary": self._generate_executive_summary(),
            "control_matrix": self._generate_control_matrix(),
            "evidence_inventory": self._generate_evidence_inventory(audit_period_start, audit_period_end),
            "test_results": self._generate_test_summary(audit_period_start, audit_period_end),
            "gaps_and_remediation": self._identify_gaps()
        }
 
    def _generate_executive_summary(self) -> Dict:
        status = self.framework.get_compliance_status()
        return {
            "compliance_rate": status['summary']['compliance_rate'],
            "controls_assessed": status['summary']['total_controls'],
            "controls_implemented": status['summary']['implemented'],
            "audit_ready": status['audit_ready'],
            "categories_covered": list(status['by_category'].keys())
        }
 
    def _generate_control_matrix(self) -> List[Dict]:
        matrix = []
        for control_id, control in self.framework.controls.items():
            evidence_count = len(self.framework.evidence.get(control_id, []))
            test_count = len(self.framework.tests.get(control_id, []))
 
            matrix.append({
                "control_id": control_id,
                "category": control.category.value,
                "title": control.title,
                "criteria_ref": control.criteria_ref,
                "status": control.status.value,
                "owner": control.owner,
                "implementation_details": control.implementation_details,
                "testing_frequency": control.testing_frequency,
                "last_tested": control.last_tested.isoformat() if control.last_tested else None,
                "evidence_count": evidence_count,
                "test_count": test_count
            })
 
        return matrix
 
    def _generate_evidence_inventory(self, start: datetime, end: datetime) -> List[Dict]:
        inventory = []
        for control_id, evidence_list in self.framework.evidence.items():
            for evidence in evidence_list:
                if start <= evidence.collected_at <= end:
                    inventory.append({
                        "evidence_id": evidence.evidence_id,
                        "control_id": evidence.control_id,
                        "type": evidence.evidence_type.value,
                        "title": evidence.title,
                        "collected_at": evidence.collected_at.isoformat(),
                        "collected_by": evidence.collected_by,
                        "file_path": evidence.file_path
                    })
 
        return inventory
 
    def _generate_test_summary(self, start: datetime, end: datetime) -> Dict:
        all_tests = []
        passed = 0
        failed = 0
 
        for control_id, tests in self.framework.tests.items():
            for test in tests:
                if start <= test.test_date <= end:
                    all_tests.append({
                        "test_id": test.test_id,
                        "control_id": test.control_id,
                        "date": test.test_date.isoformat(),
                        "tester": test.tester,
                        "result": test.result,
                        "passed": test.passed,
                        "findings": test.findings
                    })
                    if test.passed:
                        passed += 1
                    else:
                        failed += 1
 
        return {
            "total_tests": len(all_tests),
            "passed": passed,
            "failed": failed,
            "pass_rate": passed / len(all_tests) if all_tests else 0,
            "tests": all_tests
        }
 
    def _identify_gaps(self) -> List[Dict]:
        gaps = []
 
        for control_id, control in self.framework.controls.items():
            if control.status == ControlStatus.NOT_IMPLEMENTED:
                gaps.append({
                    "control_id": control_id,
                    "gap_type": "not_implemented",
                    "description": f"Control {control.title} is not implemented",
                    "remediation": f"Implement {control.description}",
                    "priority": "high"
                })
            elif control.status == ControlStatus.PARTIALLY_IMPLEMENTED:
                gaps.append({
                    "control_id": control_id,
                    "gap_type": "partial_implementation",
                    "description": f"Control {control.title} is only partially implemented",
                    "remediation": "Complete implementation per control requirements",
                    "priority": "medium"
                })
 
            evidence = self.framework.evidence.get(control_id, [])
            if not evidence:
                gaps.append({
                    "control_id": control_id,
                    "gap_type": "missing_evidence",
                    "description": f"No evidence collected for {control.title}",
                    "remediation": "Collect and document evidence",
                    "priority": "high"
                })
 
        return gaps
 
    def export_for_auditor(self, output_dir: str):
        """Export audit package for external auditor."""
        import os
 
        os.makedirs(output_dir, exist_ok=True)
 
        audit_period_start = datetime.utcnow() - timedelta(days=365)
        audit_period_end = datetime.utcnow()
 
        package = self.generate_audit_package(audit_period_start, audit_period_end)
 
        with open(f"{output_dir}/audit_report.json", 'w') as f:
            json.dump(package, f, indent=2)
 
        with open(f"{output_dir}/control_matrix.json", 'w') as f:
            json.dump(package['control_matrix'], f, indent=2)
 
        with open(f"{output_dir}/evidence_inventory.json", 'w') as f:
            json.dump(package['evidence_inventory'], f, indent=2)
 
        print(f"Audit package exported to {output_dir}")

Conclusion

SOC 2 compliance automation transforms audit preparation from a periodic scramble into continuous readiness. Implement automated evidence collection, continuous monitoring, and gap tracking to maintain compliance posture. Regular health checks ensure controls remain effective and evidence stays current throughout the audit period.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.