Compliance

PCI DSS 4.0 Compliance Guide for Software Development Teams

DeviDevs Team
16 min read
#pci-dss#compliance#payment-security#secure-coding#vulnerability-management

PCI DSS 4.0 introduces significant changes affecting how development teams build and maintain payment card processing systems. This guide provides practical implementations for achieving and maintaining compliance across all relevant requirements.

Secure Coding Standards Framework

Implement requirement 6.2 secure development practices:

from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import re
import hashlib
 
class VulnerabilityCategory(Enum):
    INJECTION = "injection"
    BROKEN_AUTH = "broken_authentication"
    SENSITIVE_DATA = "sensitive_data_exposure"
    XXE = "xml_external_entities"
    BROKEN_ACCESS = "broken_access_control"
    SECURITY_MISCONFIG = "security_misconfiguration"
    XSS = "cross_site_scripting"
    INSECURE_DESERIALIZATION = "insecure_deserialization"
    VULNERABLE_COMPONENTS = "vulnerable_components"
    INSUFFICIENT_LOGGING = "insufficient_logging"
 
@dataclass
class CodeReviewFinding:
    file_path: str
    line_number: int
    category: VulnerabilityCategory
    severity: str
    description: str
    remediation: str
    cwe_id: Optional[str] = None
    pci_requirement: Optional[str] = None
 
class PCIDSSCodeAnalyzer:
    def __init__(self):
        self.findings: list[CodeReviewFinding] = []
        self.patterns = self._load_vulnerability_patterns()
 
    def _load_vulnerability_patterns(self) -> dict:
        return {
            VulnerabilityCategory.INJECTION: [
                {
                    'pattern': r'execute\s*\(\s*["\'].*\%s.*["\']\s*\%',
                    'description': 'SQL injection via string formatting',
                    'remediation': 'Use parameterized queries',
                    'cwe': 'CWE-89',
                    'pci': '6.2.4'
                },
                {
                    'pattern': r'cursor\.execute\s*\(\s*f["\']',
                    'description': 'SQL injection via f-string',
                    'remediation': 'Use parameterized queries with placeholders',
                    'cwe': 'CWE-89',
                    'pci': '6.2.4'
                },
                {
                    'pattern': r'subprocess\.(call|run|Popen)\s*\([^)]*shell\s*=\s*True',
                    'description': 'Command injection risk with shell=True',
                    'remediation': 'Use shell=False and pass arguments as list',
                    'cwe': 'CWE-78',
                    'pci': '6.2.4'
                },
                {
                    'pattern': r'eval\s*\(\s*.*request',
                    'description': 'Code injection via eval on user input',
                    'remediation': 'Never use eval on untrusted input',
                    'cwe': 'CWE-94',
                    'pci': '6.2.4'
                }
            ],
            VulnerabilityCategory.SENSITIVE_DATA: [
                {
                    'pattern': r'(password|secret|api_key|token)\s*=\s*["\'][^"\']+["\']',
                    'description': 'Hardcoded credential detected',
                    'remediation': 'Use environment variables or secrets manager',
                    'cwe': 'CWE-798',
                    'pci': '6.2.4, 8.3.1'
                },
                {
                    'pattern': r'print\s*\(.*password',
                    'description': 'Potential credential logging',
                    'remediation': 'Never log sensitive data',
                    'cwe': 'CWE-532',
                    'pci': '3.3.1'
                },
                {
                    'pattern': r'logging\.(info|debug|warning|error)\s*\(.*card',
                    'description': 'Potential PAN logging',
                    'remediation': 'Mask or exclude card data from logs',
                    'cwe': 'CWE-532',
                    'pci': '3.3.1, 3.4'
                }
            ],
            VulnerabilityCategory.BROKEN_AUTH: [
                {
                    'pattern': r'md5\s*\(',
                    'description': 'Weak hashing algorithm (MD5)',
                    'remediation': 'Use bcrypt, scrypt, or Argon2 for passwords',
                    'cwe': 'CWE-328',
                    'pci': '8.3.2'
                },
                {
                    'pattern': r'sha1\s*\(',
                    'description': 'Weak hashing algorithm (SHA1)',
                    'remediation': 'Use SHA-256 or stronger',
                    'cwe': 'CWE-328',
                    'pci': '8.3.2'
                },
                {
                    'pattern': r'verify\s*=\s*False',
                    'description': 'SSL verification disabled',
                    'remediation': 'Enable SSL certificate verification',
                    'cwe': 'CWE-295',
                    'pci': '4.2.1'
                }
            ],
            VulnerabilityCategory.XSS: [
                {
                    'pattern': r'innerHTML\s*=',
                    'description': 'Potential XSS via innerHTML',
                    'remediation': 'Use textContent or sanitize HTML',
                    'cwe': 'CWE-79',
                    'pci': '6.2.4'
                },
                {
                    'pattern': r'document\.write\s*\(',
                    'description': 'Potential XSS via document.write',
                    'remediation': 'Use DOM manipulation methods',
                    'cwe': 'CWE-79',
                    'pci': '6.2.4'
                },
                {
                    'pattern': r'\{\{\s*.*\|safe\s*\}\}',
                    'description': 'Template XSS via safe filter',
                    'remediation': 'Validate and sanitize before marking safe',
                    'cwe': 'CWE-79',
                    'pci': '6.2.4'
                }
            ],
            VulnerabilityCategory.BROKEN_ACCESS: [
                {
                    'pattern': r'@app\.route.*methods.*POST.*\n(?!.*@login_required)',
                    'description': 'POST endpoint without authentication',
                    'remediation': 'Add authentication decorator',
                    'cwe': 'CWE-306',
                    'pci': '7.2.1'
                },
                {
                    'pattern': r'if\s+request\.user\.id\s*==\s*',
                    'description': 'Client-side authorization check',
                    'remediation': 'Implement server-side authorization',
                    'cwe': 'CWE-639',
                    'pci': '7.2.2'
                }
            ]
        }
 
    def analyze_file(self, file_path: str, content: str) -> list[CodeReviewFinding]:
        """Analyze source code file for PCI DSS violations."""
        findings = []
        lines = content.split('\n')
 
        for category, patterns in self.patterns.items():
            for pattern_def in patterns:
                for match in re.finditer(pattern_def['pattern'], content, re.MULTILINE | re.IGNORECASE):
                    # Find line number
                    line_num = content[:match.start()].count('\n') + 1
 
                    finding = CodeReviewFinding(
                        file_path=file_path,
                        line_number=line_num,
                        category=category,
                        severity=self._calculate_severity(category),
                        description=pattern_def['description'],
                        remediation=pattern_def['remediation'],
                        cwe_id=pattern_def.get('cwe'),
                        pci_requirement=pattern_def.get('pci')
                    )
                    findings.append(finding)
 
        self.findings.extend(findings)
        return findings
 
    def _calculate_severity(self, category: VulnerabilityCategory) -> str:
        """Calculate severity based on vulnerability category."""
        critical_categories = [
            VulnerabilityCategory.INJECTION,
            VulnerabilityCategory.SENSITIVE_DATA
        ]
        high_categories = [
            VulnerabilityCategory.BROKEN_AUTH,
            VulnerabilityCategory.BROKEN_ACCESS
        ]
 
        if category in critical_categories:
            return 'CRITICAL'
        elif category in high_categories:
            return 'HIGH'
        else:
            return 'MEDIUM'
 
    def generate_compliance_report(self) -> dict:
        """Generate PCI DSS compliance report."""
        requirements_coverage = {}
 
        for finding in self.findings:
            if finding.pci_requirement:
                reqs = finding.pci_requirement.split(', ')
                for req in reqs:
                    if req not in requirements_coverage:
                        requirements_coverage[req] = []
                    requirements_coverage[req].append({
                        'file': finding.file_path,
                        'line': finding.line_number,
                        'severity': finding.severity,
                        'description': finding.description
                    })
 
        return {
            'scan_date': datetime.utcnow().isoformat(),
            'total_findings': len(self.findings),
            'critical_count': sum(1 for f in self.findings if f.severity == 'CRITICAL'),
            'high_count': sum(1 for f in self.findings if f.severity == 'HIGH'),
            'requirements_affected': requirements_coverage,
            'findings': [
                {
                    'file': f.file_path,
                    'line': f.line_number,
                    'category': f.category.value,
                    'severity': f.severity,
                    'description': f.description,
                    'remediation': f.remediation,
                    'cwe': f.cwe_id,
                    'pci_requirement': f.pci_requirement
                }
                for f in self.findings
            ]
        }

Cardholder Data Protection

Implement requirement 3 data protection controls:

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import os
import base64
import re
from typing import Optional
 
class CardDataProtection:
    def __init__(self, master_key: bytes):
        self.master_key = master_key
        self._validate_key()
 
    def _validate_key(self):
        """Ensure key meets PCI DSS 4.0 requirements."""
        # AES-256 requires 32-byte key
        if len(self.master_key) != 32:
            raise ValueError("Master key must be 256 bits (32 bytes)")
 
    def encrypt_pan(self, pan: str) -> dict:
        """
        Encrypt PAN using AES-256-GCM.
        PCI DSS Requirement 3.5.1 - Strong cryptography
        """
        # Validate PAN format
        pan_clean = re.sub(r'\D', '', pan)
        if not self._validate_pan(pan_clean):
            raise ValueError("Invalid PAN format")
 
        # Generate unique nonce for each encryption
        nonce = os.urandom(12)  # 96-bit nonce for GCM
 
        # Derive encryption key from master key
        dek = self._derive_data_key(nonce)
 
        # Encrypt
        aesgcm = AESGCM(dek)
        ciphertext = aesgcm.encrypt(nonce, pan_clean.encode(), None)
 
        return {
            'encrypted_pan': base64.b64encode(ciphertext).decode(),
            'nonce': base64.b64encode(nonce).decode(),
            'masked_pan': self.mask_pan(pan_clean),
            'last_four': pan_clean[-4:],
            'bin': pan_clean[:6]
        }
 
    def decrypt_pan(self, encrypted_data: dict) -> str:
        """Decrypt PAN - requires appropriate access controls."""
        ciphertext = base64.b64decode(encrypted_data['encrypted_pan'])
        nonce = base64.b64decode(encrypted_data['nonce'])
 
        # Derive same key
        dek = self._derive_data_key(nonce)
 
        # Decrypt
        aesgcm = AESGCM(dek)
        pan = aesgcm.decrypt(nonce, ciphertext, None)
 
        return pan.decode()
 
    def _derive_data_key(self, salt: bytes) -> bytes:
        """Derive data encryption key from master key."""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000
        )
        return kdf.derive(self.master_key)
 
    def _validate_pan(self, pan: str) -> bool:
        """Validate PAN using Luhn algorithm."""
        if not pan.isdigit() or len(pan) < 13 or len(pan) > 19:
            return False
 
        # Luhn check
        digits = [int(d) for d in pan]
        odd_digits = digits[-1::-2]
        even_digits = digits[-2::-2]
 
        total = sum(odd_digits)
        for d in even_digits:
            total += sum(divmod(d * 2, 10))
 
        return total % 10 == 0
 
    def mask_pan(self, pan: str) -> str:
        """
        Mask PAN per PCI DSS 3.4 requirements.
        Show maximum first 6 and last 4 digits.
        """
        pan_clean = re.sub(r'\D', '', pan)
        if len(pan_clean) < 13:
            return '*' * len(pan_clean)
 
        masked_length = len(pan_clean) - 10
        return f"{pan_clean[:6]}{'*' * masked_length}{pan_clean[-4:]}"
 
    def tokenize_pan(self, pan: str) -> dict:
        """
        Create token for PAN (Requirement 3.4).
        Token is not mathematically reversible to PAN.
        """
        pan_clean = re.sub(r'\D', '', pan)
 
        # Generate random token
        token = base64.urlsafe_b64encode(os.urandom(16)).decode().rstrip('=')
 
        # Create hash for validation (not for retrieving PAN)
        pan_hash = hashlib.sha256(
            pan_clean.encode() + self.master_key
        ).hexdigest()
 
        return {
            'token': token,
            'pan_hash': pan_hash,
            'last_four': pan_clean[-4:],
            'bin': pan_clean[:6]
        }
 
class SensitiveAuthData:
    """Handle sensitive authentication data (SAD) per Requirement 3.2."""
 
    @staticmethod
    def detect_sad_in_data(data: str) -> list[dict]:
        """Detect any SAD that should not be stored."""
        findings = []
 
        # CVV/CVC patterns
        cvv_pattern = r'\b\d{3,4}\b'
 
        # Track 1 data pattern
        track1_pattern = r'%B\d{13,19}\^[\w\s/]+\^\d{4}\d*\?'
 
        # Track 2 data pattern
        track2_pattern = r';\d{13,19}=\d{4}\d*\?'
 
        # PIN block patterns
        pin_pattern = r'\b\d{4,12}\b.*pin'
 
        if re.search(track1_pattern, data):
            findings.append({
                'type': 'Track 1 Data',
                'requirement': '3.2.1',
                'action': 'Must not be stored after authorization'
            })
 
        if re.search(track2_pattern, data):
            findings.append({
                'type': 'Track 2 Data',
                'requirement': '3.2.1',
                'action': 'Must not be stored after authorization'
            })
 
        if re.search(pin_pattern, data, re.IGNORECASE):
            findings.append({
                'type': 'PIN Data',
                'requirement': '3.2.2',
                'action': 'Must not be stored after authorization'
            })
 
        return findings
 
    @staticmethod
    def secure_delete(data: bytearray):
        """Securely overwrite sensitive data in memory."""
        for i in range(len(data)):
            data[i] = 0
        # Multiple overwrites for additional security
        for i in range(len(data)):
            data[i] = 0xFF
        for i in range(len(data)):
            data[i] = 0
 
# Example usage
master_key = os.urandom(32)  # In production, use HSM or key management service
protection = CardDataProtection(master_key)
 
# Encrypt card data
pan = "4111111111111111"
encrypted = protection.encrypt_pan(pan)
print(f"Masked PAN: {encrypted['masked_pan']}")
print(f"Token data available: last_four={encrypted['last_four']}, bin={encrypted['bin']}")

Vulnerability Management System

Implement requirement 6.3 and 11.3 vulnerability management:

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
import json
 
class VulnerabilitySeverity(Enum):
    CRITICAL = 4
    HIGH = 3
    MEDIUM = 2
    LOW = 1
    INFO = 0
 
@dataclass
class Vulnerability:
    id: str
    title: str
    severity: VulnerabilitySeverity
    cvss_score: float
    cve_id: Optional[str]
    affected_component: str
    discovered_date: datetime
    remediation_deadline: datetime
    status: str
    remediation_notes: Optional[str] = None
 
class PCIDSSVulnerabilityManager:
    def __init__(self):
        self.vulnerabilities: dict[str, Vulnerability] = {}
        self.sla_days = {
            VulnerabilitySeverity.CRITICAL: 1,  # PCI DSS 4.0: ASAP, typically same day
            VulnerabilitySeverity.HIGH: 30,
            VulnerabilitySeverity.MEDIUM: 90,
            VulnerabilitySeverity.LOW: 180
        }
 
    def import_scan_results(self, scan_results: list[dict]) -> list[Vulnerability]:
        """Import vulnerability scan results."""
        new_vulnerabilities = []
 
        for result in scan_results:
            severity = self._map_severity(result.get('severity', 'info'))
            discovered = datetime.utcnow()
            deadline = discovered + timedelta(days=self.sla_days[severity])
 
            vuln = Vulnerability(
                id=result['id'],
                title=result['title'],
                severity=severity,
                cvss_score=result.get('cvss_score', 0.0),
                cve_id=result.get('cve_id'),
                affected_component=result['component'],
                discovered_date=discovered,
                remediation_deadline=deadline,
                status='open'
            )
 
            self.vulnerabilities[vuln.id] = vuln
            new_vulnerabilities.append(vuln)
 
        return new_vulnerabilities
 
    def _map_severity(self, severity_str: str) -> VulnerabilitySeverity:
        """Map severity string to enum."""
        mapping = {
            'critical': VulnerabilitySeverity.CRITICAL,
            'high': VulnerabilitySeverity.HIGH,
            'medium': VulnerabilitySeverity.MEDIUM,
            'low': VulnerabilitySeverity.LOW,
            'info': VulnerabilitySeverity.INFO
        }
        return mapping.get(severity_str.lower(), VulnerabilitySeverity.INFO)
 
    def get_overdue_vulnerabilities(self) -> list[Vulnerability]:
        """Get vulnerabilities past their remediation deadline."""
        now = datetime.utcnow()
        return [
            v for v in self.vulnerabilities.values()
            if v.status == 'open' and v.remediation_deadline < now
        ]
 
    def update_vulnerability_status(
        self,
        vuln_id: str,
        status: str,
        notes: Optional[str] = None
    ):
        """Update vulnerability status with audit trail."""
        if vuln_id not in self.vulnerabilities:
            raise ValueError(f"Vulnerability {vuln_id} not found")
 
        vuln = self.vulnerabilities[vuln_id]
        old_status = vuln.status
        vuln.status = status
        vuln.remediation_notes = notes
 
        # Audit log entry
        self._log_status_change(vuln_id, old_status, status, notes)
 
    def _log_status_change(
        self,
        vuln_id: str,
        old_status: str,
        new_status: str,
        notes: Optional[str]
    ):
        """Log vulnerability status changes for audit."""
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'vulnerability_id': vuln_id,
            'old_status': old_status,
            'new_status': new_status,
            'notes': notes
        }
        print(f"AUDIT: {json.dumps(log_entry)}")
 
    def generate_compliance_report(self) -> dict:
        """Generate PCI DSS vulnerability management report."""
        now = datetime.utcnow()
 
        open_vulns = [v for v in self.vulnerabilities.values() if v.status == 'open']
        overdue = self.get_overdue_vulnerabilities()
 
        severity_counts = {s: 0 for s in VulnerabilitySeverity}
        for v in open_vulns:
            severity_counts[v.severity] += 1
 
        # Calculate average remediation time for closed vulns
        closed_vulns = [v for v in self.vulnerabilities.values() if v.status == 'closed']
 
        return {
            'report_date': now.isoformat(),
            'pci_requirements': ['6.3', '11.3.1', '11.3.2'],
            'summary': {
                'total_vulnerabilities': len(self.vulnerabilities),
                'open_vulnerabilities': len(open_vulns),
                'overdue_vulnerabilities': len(overdue),
                'closed_vulnerabilities': len(closed_vulns)
            },
            'severity_breakdown': {
                s.name: severity_counts[s] for s in VulnerabilitySeverity
            },
            'sla_compliance': {
                'within_sla': len(open_vulns) - len(overdue),
                'overdue': len(overdue),
                'compliance_rate': (
                    (len(open_vulns) - len(overdue)) / len(open_vulns) * 100
                    if open_vulns else 100
                )
            },
            'critical_high_details': [
                {
                    'id': v.id,
                    'title': v.title,
                    'severity': v.severity.name,
                    'days_open': (now - v.discovered_date).days,
                    'deadline': v.remediation_deadline.isoformat(),
                    'overdue': v.remediation_deadline < now
                }
                for v in open_vulns
                if v.severity in [VulnerabilitySeverity.CRITICAL, VulnerabilitySeverity.HIGH]
            ]
        }
 
    def check_scan_coverage(self, assets: list[str], scanned_assets: list[str]) -> dict:
        """
        Verify quarterly ASV scan coverage.
        PCI DSS Requirement 11.3.2
        """
        scanned_set = set(scanned_assets)
        assets_set = set(assets)
 
        covered = assets_set.intersection(scanned_set)
        missing = assets_set - scanned_set
 
        return {
            'total_assets': len(assets),
            'scanned_assets': len(covered),
            'missing_assets': list(missing),
            'coverage_percentage': len(covered) / len(assets) * 100 if assets else 100,
            'compliant': len(missing) == 0
        }

Access Control Implementation

Implement requirements 7 and 8 access controls:

from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Set
import hashlib
import secrets
import re
 
class AccessLevel(Enum):
    NONE = 0
    READ = 1
    WRITE = 2
    ADMIN = 3
 
@dataclass
class User:
    user_id: str
    username: str
    roles: Set[str]
    last_login: Optional[datetime]
    password_changed: datetime
    failed_attempts: int
    locked_until: Optional[datetime]
    mfa_enabled: bool
 
@dataclass
class AccessAttempt:
    timestamp: datetime
    user_id: str
    resource: str
    action: str
    granted: bool
    reason: str
 
class PCIDSSAccessControl:
    def __init__(self):
        self.users: dict[str, User] = {}
        self.role_permissions: dict[str, dict] = {}
        self.access_log: list[AccessAttempt] = []
        self.config = {
            'max_failed_attempts': 6,  # Req 8.1.6
            'lockout_duration_minutes': 30,  # Req 8.1.6
            'password_max_age_days': 90,  # Req 8.3.9
            'session_timeout_minutes': 15,  # Req 8.1.8
            'password_min_length': 12,  # Req 8.3.6
            'password_history': 4  # Req 8.3.7
        }
 
    def define_role(self, role_name: str, permissions: dict):
        """
        Define role with least privilege principle.
        PCI DSS Requirement 7.1
        """
        self.role_permissions[role_name] = {
            'resources': permissions.get('resources', {}),
            'created_at': datetime.utcnow().isoformat(),
            'business_justification': permissions.get('justification', 'Not provided')
        }
 
    def check_access(
        self,
        user_id: str,
        resource: str,
        action: str
    ) -> tuple[bool, str]:
        """
        Check if user has access to resource.
        PCI DSS Requirements 7.2
        """
        if user_id not in self.users:
            self._log_access(user_id, resource, action, False, "User not found")
            return False, "User not found"
 
        user = self.users[user_id]
 
        # Check if account is locked
        if user.locked_until and user.locked_until > datetime.utcnow():
            self._log_access(user_id, resource, action, False, "Account locked")
            return False, "Account locked"
 
        # Check MFA requirement for cardholder data access
        if self._requires_mfa(resource) and not user.mfa_enabled:
            self._log_access(user_id, resource, action, False, "MFA required")
            return False, "MFA required for this resource"
 
        # Check role-based permissions
        for role in user.roles:
            if role in self.role_permissions:
                role_perms = self.role_permissions[role]['resources']
                if resource in role_perms:
                    allowed_actions = role_perms[resource]
                    if action in allowed_actions or 'all' in allowed_actions:
                        self._log_access(user_id, resource, action, True, f"Granted via role: {role}")
                        return True, f"Access granted via role: {role}"
 
        self._log_access(user_id, resource, action, False, "No matching permission")
        return False, "Access denied - no matching permission"
 
    def _requires_mfa(self, resource: str) -> bool:
        """Check if resource requires MFA (Req 8.4.2)."""
        mfa_required_patterns = [
            r'cardholder_data',
            r'pan_.*',
            r'admin_.*',
            r'key_management'
        ]
        return any(re.match(pattern, resource) for pattern in mfa_required_patterns)
 
    def _log_access(
        self,
        user_id: str,
        resource: str,
        action: str,
        granted: bool,
        reason: str
    ):
        """Log all access attempts (Req 10.2.1)."""
        attempt = AccessAttempt(
            timestamp=datetime.utcnow(),
            user_id=user_id,
            resource=resource,
            action=action,
            granted=granted,
            reason=reason
        )
        self.access_log.append(attempt)
 
    def validate_password(self, password: str) -> tuple[bool, list[str]]:
        """
        Validate password against PCI DSS 4.0 requirements.
        Requirement 8.3.6
        """
        errors = []
 
        if len(password) < self.config['password_min_length']:
            errors.append(f"Password must be at least {self.config['password_min_length']} characters")
 
        if not re.search(r'[A-Z]', password):
            errors.append("Password must contain uppercase letter")
 
        if not re.search(r'[a-z]', password):
            errors.append("Password must contain lowercase letter")
 
        if not re.search(r'\d', password):
            errors.append("Password must contain digit")
 
        if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            errors.append("Password must contain special character")
 
        # Check for common patterns
        common_patterns = ['password', '123456', 'qwerty', user_id if 'user_id' in dir() else '']
        if any(pattern in password.lower() for pattern in common_patterns if pattern):
            errors.append("Password contains common/weak pattern")
 
        return len(errors) == 0, errors
 
    def handle_failed_login(self, user_id: str):
        """
        Handle failed login attempt.
        PCI DSS Requirement 8.1.6
        """
        if user_id not in self.users:
            return
 
        user = self.users[user_id]
        user.failed_attempts += 1
 
        if user.failed_attempts >= self.config['max_failed_attempts']:
            user.locked_until = datetime.utcnow() + timedelta(
                minutes=self.config['lockout_duration_minutes']
            )
            self._log_access(user_id, 'authentication', 'login', False,
                           f"Account locked after {user.failed_attempts} failed attempts")
 
    def reset_failed_attempts(self, user_id: str):
        """Reset failed login counter on successful login."""
        if user_id in self.users:
            self.users[user_id].failed_attempts = 0
            self.users[user_id].locked_until = None
 
    def check_password_expiry(self, user_id: str) -> tuple[bool, int]:
        """
        Check if password needs to be changed.
        PCI DSS Requirement 8.3.9
        """
        if user_id not in self.users:
            return True, 0
 
        user = self.users[user_id]
        days_since_change = (datetime.utcnow() - user.password_changed).days
        days_remaining = self.config['password_max_age_days'] - days_since_change
 
        return days_remaining <= 0, days_remaining
 
    def generate_access_review_report(self) -> dict:
        """
        Generate quarterly access review report.
        PCI DSS Requirement 7.2.2
        """
        return {
            'review_date': datetime.utcnow().isoformat(),
            'requirement': '7.2.2',
            'users': [
                {
                    'user_id': u.user_id,
                    'username': u.username,
                    'roles': list(u.roles),
                    'last_login': u.last_login.isoformat() if u.last_login else None,
                    'mfa_enabled': u.mfa_enabled,
                    'status': 'active' if not u.locked_until else 'locked'
                }
                for u in self.users.values()
            ],
            'roles': [
                {
                    'role_name': name,
                    'resources': list(perms['resources'].keys()),
                    'justification': perms['business_justification']
                }
                for name, perms in self.role_permissions.items()
            ],
            'inactive_users': [
                u.user_id for u in self.users.values()
                if u.last_login and (datetime.utcnow() - u.last_login).days > 90
            ]
        }

Continuous Compliance Monitoring

Implement automated compliance monitoring:

from dataclasses import dataclass
from datetime import datetime
from typing import Callable
import json
 
@dataclass
class ComplianceCheck:
    requirement_id: str
    description: str
    check_function: Callable
    frequency: str  # daily, weekly, quarterly
    last_run: datetime = None
    last_result: bool = None
 
class PCIDSSComplianceMonitor:
    def __init__(self):
        self.checks: list[ComplianceCheck] = []
        self.results_history: list[dict] = []
        self._register_default_checks()
 
    def _register_default_checks(self):
        """Register PCI DSS 4.0 compliance checks."""
        checks = [
            ComplianceCheck(
                requirement_id='3.4',
                description='PAN is masked when displayed',
                check_function=self._check_pan_masking,
                frequency='daily'
            ),
            ComplianceCheck(
                requirement_id='3.5.1',
                description='Strong cryptography used for stored PAN',
                check_function=self._check_encryption_strength,
                frequency='weekly'
            ),
            ComplianceCheck(
                requirement_id='6.3.3',
                description='Critical patches applied within 30 days',
                check_function=self._check_patch_compliance,
                frequency='weekly'
            ),
            ComplianceCheck(
                requirement_id='8.3.6',
                description='Password complexity requirements enforced',
                check_function=self._check_password_policy,
                frequency='daily'
            ),
            ComplianceCheck(
                requirement_id='10.2',
                description='Audit logs capturing required events',
                check_function=self._check_audit_logging,
                frequency='daily'
            ),
            ComplianceCheck(
                requirement_id='10.4.1',
                description='Audit logs reviewed daily',
                check_function=self._check_log_review,
                frequency='daily'
            ),
            ComplianceCheck(
                requirement_id='11.3.1',
                description='Internal vulnerability scans quarterly',
                check_function=self._check_internal_scans,
                frequency='quarterly'
            ),
            ComplianceCheck(
                requirement_id='11.3.2',
                description='External ASV scans quarterly',
                check_function=self._check_asv_scans,
                frequency='quarterly'
            ),
            ComplianceCheck(
                requirement_id='12.3.1',
                description='Risk assessment performed annually',
                check_function=self._check_risk_assessment,
                frequency='quarterly'
            )
        ]
        self.checks.extend(checks)
 
    def run_all_checks(self) -> dict:
        """Run all compliance checks."""
        results = {
            'timestamp': datetime.utcnow().isoformat(),
            'checks': [],
            'overall_compliance': True
        }
 
        for check in self.checks:
            try:
                passed, details = check.check_function()
                check.last_run = datetime.utcnow()
                check.last_result = passed
 
                result = {
                    'requirement_id': check.requirement_id,
                    'description': check.description,
                    'passed': passed,
                    'details': details
                }
                results['checks'].append(result)
 
                if not passed:
                    results['overall_compliance'] = False
 
            except Exception as e:
                results['checks'].append({
                    'requirement_id': check.requirement_id,
                    'description': check.description,
                    'passed': False,
                    'details': f"Check failed with error: {str(e)}"
                })
                results['overall_compliance'] = False
 
        self.results_history.append(results)
        return results
 
    def _check_pan_masking(self) -> tuple[bool, str]:
        """Check that PAN is properly masked in displays/logs."""
        # Implementation would scan logs and UI for unmasked PANs
        # This is a placeholder for the actual implementation
        return True, "PAN masking verified in application logs"
 
    def _check_encryption_strength(self) -> tuple[bool, str]:
        """Verify AES-256 or equivalent encryption for stored PAN."""
        # Check encryption configuration
        return True, "AES-256-GCM encryption verified"
 
    def _check_patch_compliance(self) -> tuple[bool, str]:
        """Verify critical patches applied within SLA."""
        # Query patch management system
        return True, "All critical patches applied within 30-day SLA"
 
    def _check_password_policy(self) -> tuple[bool, str]:
        """Verify password policy enforcement."""
        policy_settings = {
            'min_length': 12,
            'complexity_required': True,
            'max_age_days': 90,
            'history_count': 4
        }
        return True, f"Password policy enforced: {json.dumps(policy_settings)}"
 
    def _check_audit_logging(self) -> tuple[bool, str]:
        """Verify audit logs capture required events."""
        required_events = [
            'authentication_success',
            'authentication_failure',
            'authorization_failure',
            'user_creation',
            'user_modification',
            'privilege_escalation',
            'system_events',
            'cardholder_data_access'
        ]
        return True, f"Logging configured for: {', '.join(required_events)}"
 
    def _check_log_review(self) -> tuple[bool, str]:
        """Verify daily log review is occurring."""
        # Check log review records
        return True, "Daily log review completed"
 
    def _check_internal_scans(self) -> tuple[bool, str]:
        """Verify quarterly internal vulnerability scans."""
        # Check scan records
        return True, "Internal vulnerability scan completed this quarter"
 
    def _check_asv_scans(self) -> tuple[bool, str]:
        """Verify quarterly ASV scans with passing status."""
        return True, "ASV scan completed with passing status"
 
    def _check_risk_assessment(self) -> tuple[bool, str]:
        """Verify annual risk assessment completion."""
        return True, "Annual risk assessment completed"
 
    def generate_compliance_dashboard(self) -> dict:
        """Generate compliance status dashboard."""
        if not self.results_history:
            self.run_all_checks()
 
        latest = self.results_history[-1]
 
        passed_checks = sum(1 for c in latest['checks'] if c['passed'])
        total_checks = len(latest['checks'])
 
        return {
            'last_updated': latest['timestamp'],
            'overall_status': 'COMPLIANT' if latest['overall_compliance'] else 'NON-COMPLIANT',
            'compliance_score': f"{passed_checks}/{total_checks}",
            'compliance_percentage': round(passed_checks / total_checks * 100, 1),
            'failed_requirements': [
                c['requirement_id'] for c in latest['checks'] if not c['passed']
            ],
            'next_actions': self._get_remediation_actions(latest)
        }
 
    def _get_remediation_actions(self, results: dict) -> list[dict]:
        """Get remediation actions for failed checks."""
        actions = []
        for check in results['checks']:
            if not check['passed']:
                actions.append({
                    'requirement': check['requirement_id'],
                    'issue': check['description'],
                    'details': check['details'],
                    'priority': 'HIGH' if check['requirement_id'].startswith(('3.', '8.')) else 'MEDIUM'
                })
        return actions
 
# Initialize and run compliance monitoring
monitor = PCIDSSComplianceMonitor()
dashboard = monitor.generate_compliance_dashboard()
print(json.dumps(dashboard, indent=2))

Conclusion

PCI DSS 4.0 compliance requires a comprehensive approach spanning secure development, data protection, vulnerability management, and access controls. Implement automated compliance monitoring to maintain continuous compliance rather than point-in-time assessments. Regular security reviews, penetration testing, and staff training complement technical controls. Remember that compliance is a continuous process - build security into your development lifecycle from the start.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.