Compliance

PCI DSS 4.0 Compliance Implementation Guide for Modern Applications

DeviDevs Team
13 min read
#PCI DSS#compliance#payment security#data protection#security

PCI DSS 4.0 Compliance Implementation Guide for Modern Applications

PCI DSS 4.0 introduces significant changes to payment card security requirements, with a focus on outcome-based security and customized approaches. This guide provides technical implementation patterns for achieving and maintaining PCI DSS compliance.

Understanding PCI DSS 4.0 Requirements

Requirement Categories Overview

# pci_dss_requirements.yaml
pci_dss_4_0_requirements:
  build_and_maintain_secure_network:
    1: "Install and maintain network security controls"
    2: "Apply secure configurations to all system components"
 
  protect_cardholder_data:
    3: "Protect stored account data"
    4: "Protect cardholder data with strong cryptography during transmission"
 
  maintain_vulnerability_management:
    5: "Protect all systems and networks from malicious software"
    6: "Develop and maintain secure systems and software"
 
  implement_access_control:
    7: "Restrict access to system components and cardholder data"
    8: "Identify users and authenticate access to system components"
    9: "Restrict physical access to cardholder data"
 
  monitor_and_test:
    10: "Log and monitor all access to system components and cardholder data"
    11: "Test security of systems and networks regularly"
 
  maintain_security_policy:
    12: "Support information security with organizational policies and programs"

Cardholder Data Protection

Data Classification and Scope

# cardholder_data_protection.py
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
import re
import hashlib
 
class DataCategory(Enum):
    """PCI DSS data categories."""
    PAN = "primary_account_number"
    CARDHOLDER_NAME = "cardholder_name"
    EXPIRATION_DATE = "expiration_date"
    SERVICE_CODE = "service_code"
    CVV = "cvv"  # Cannot be stored
    PIN = "pin"  # Cannot be stored
    TRACK_DATA = "track_data"  # Cannot be stored
 
class StoragePolicy(Enum):
    """Data storage policies."""
    ALLOWED_ENCRYPTED = "allowed_encrypted"
    ALLOWED_MASKED = "allowed_masked"
    PROHIBITED = "prohibited"
    ALLOWED_TOKENIZED = "allowed_tokenized"
 
@dataclass
class DataClassification:
    """Classification for cardholder data."""
    category: DataCategory
    storage_policy: StoragePolicy
    retention_period_days: Optional[int]
    encryption_required: bool
    masking_rules: Optional[Dict]
 
class CardholderDataManager:
    """Manage cardholder data according to PCI DSS."""
 
    CLASSIFICATIONS = {
        DataCategory.PAN: DataClassification(
            category=DataCategory.PAN,
            storage_policy=StoragePolicy.ALLOWED_ENCRYPTED,
            retention_period_days=365,  # Business-specific
            encryption_required=True,
            masking_rules={'show_first': 6, 'show_last': 4}
        ),
        DataCategory.CARDHOLDER_NAME: DataClassification(
            category=DataCategory.CARDHOLDER_NAME,
            storage_policy=StoragePolicy.ALLOWED_ENCRYPTED,
            retention_period_days=365,
            encryption_required=True,
            masking_rules=None
        ),
        DataCategory.EXPIRATION_DATE: DataClassification(
            category=DataCategory.EXPIRATION_DATE,
            storage_policy=StoragePolicy.ALLOWED_ENCRYPTED,
            retention_period_days=365,
            encryption_required=True,
            masking_rules=None
        ),
        DataCategory.CVV: DataClassification(
            category=DataCategory.CVV,
            storage_policy=StoragePolicy.PROHIBITED,
            retention_period_days=None,
            encryption_required=False,  # Cannot store
            masking_rules=None
        ),
        DataCategory.PIN: DataClassification(
            category=DataCategory.PIN,
            storage_policy=StoragePolicy.PROHIBITED,
            retention_period_days=None,
            encryption_required=False,
            masking_rules=None
        ),
        DataCategory.TRACK_DATA: DataClassification(
            category=DataCategory.TRACK_DATA,
            storage_policy=StoragePolicy.PROHIBITED,
            retention_period_days=None,
            encryption_required=False,
            masking_rules=None
        )
    }
 
    def __init__(self, encryption_service, tokenization_service):
        self.encryption = encryption_service
        self.tokenization = tokenization_service
 
    def validate_pan(self, pan: str) -> Dict:
        """Validate PAN using Luhn algorithm."""
 
        # Remove spaces and dashes
        clean_pan = re.sub(r'[\s-]', '', pan)
 
        if not clean_pan.isdigit():
            return {'valid': False, 'error': 'PAN must contain only digits'}
 
        if len(clean_pan) < 13 or len(clean_pan) > 19:
            return {'valid': False, 'error': 'Invalid PAN length'}
 
        # Luhn algorithm
        digits = [int(d) for d in clean_pan]
        checksum = 0
 
        for i, digit in enumerate(reversed(digits)):
            if i % 2 == 1:
                digit *= 2
                if digit > 9:
                    digit -= 9
            checksum += digit
 
        is_valid = checksum % 10 == 0
 
        return {
            'valid': is_valid,
            'card_brand': self._identify_brand(clean_pan),
            'issuer_bin': clean_pan[:6]
        }
 
    def _identify_brand(self, pan: str) -> str:
        """Identify card brand from PAN."""
 
        if pan.startswith('4'):
            return 'Visa'
        elif pan.startswith(('51', '52', '53', '54', '55')) or \
             (2221 <= int(pan[:4]) <= 2720):
            return 'Mastercard'
        elif pan.startswith(('34', '37')):
            return 'American Express'
        elif pan.startswith('6011') or pan.startswith('65'):
            return 'Discover'
        else:
            return 'Unknown'
 
    def mask_pan(self, pan: str) -> str:
        """Mask PAN according to PCI DSS requirements."""
 
        clean_pan = re.sub(r'[\s-]', '', pan)
        rules = self.CLASSIFICATIONS[DataCategory.PAN].masking_rules
 
        first_digits = clean_pan[:rules['show_first']]
        last_digits = clean_pan[-rules['show_last']:]
        masked_length = len(clean_pan) - rules['show_first'] - rules['show_last']
 
        return f"{first_digits}{'*' * masked_length}{last_digits}"
 
    def tokenize_pan(self, pan: str) -> Dict:
        """Tokenize PAN for storage."""
 
        validation = self.validate_pan(pan)
        if not validation['valid']:
            raise ValueError(f"Invalid PAN: {validation['error']}")
 
        # Generate token
        token = self.tokenization.create_token(
            data=pan,
            format='preserving',  # Maintains format for legacy systems
            token_vault='pci_vault'
        )
 
        return {
            'token': token,
            'masked_pan': self.mask_pan(pan),
            'card_brand': validation['card_brand'],
            'last_four': pan[-4:]
        }
 
    def can_store(self, category: DataCategory) -> bool:
        """Check if data category can be stored."""
 
        classification = self.CLASSIFICATIONS.get(category)
        if not classification:
            return False
 
        return classification.storage_policy != StoragePolicy.PROHIBITED
 
    def prepare_for_storage(self, data: Dict[DataCategory, str]) -> Dict:
        """Prepare cardholder data for secure storage."""
 
        prepared = {}
 
        for category, value in data.items():
            classification = self.CLASSIFICATIONS.get(category)
 
            if not classification:
                raise ValueError(f"Unknown data category: {category}")
 
            if classification.storage_policy == StoragePolicy.PROHIBITED:
                raise ValueError(
                    f"Cannot store {category.value} - prohibited by PCI DSS"
                )
 
            if category == DataCategory.PAN:
                # Always tokenize PAN
                token_data = self.tokenize_pan(value)
                prepared['pan_token'] = token_data['token']
                prepared['masked_pan'] = token_data['masked_pan']
                prepared['last_four'] = token_data['last_four']
            elif classification.encryption_required:
                # Encrypt other sensitive data
                prepared[category.value] = self.encryption.encrypt(value)
            else:
                prepared[category.value] = value
 
        return prepared

Encryption Implementation

# pci_encryption.py
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import os
import base64
from typing import Dict, Optional
from datetime import datetime, timedelta
 
class PCIEncryptionService:
    """PCI DSS compliant encryption service."""
 
    def __init__(self, key_management_service):
        self.kms = key_management_service
        self.key_rotation_interval = timedelta(days=365)
 
    def encrypt(self, plaintext: str, key_id: Optional[str] = None) -> Dict:
        """Encrypt data with authenticated encryption."""
 
        if not key_id:
            key_id = self.kms.get_current_key_id()
 
        # Get encryption key from KMS
        key = self.kms.get_key(key_id)
 
        # Generate random nonce
        nonce = os.urandom(12)
 
        # Encrypt using AES-256-GCM
        aesgcm = AESGCM(key)
        ciphertext = aesgcm.encrypt(nonce, plaintext.encode(), None)
 
        return {
            'ciphertext': base64.b64encode(ciphertext).decode(),
            'nonce': base64.b64encode(nonce).decode(),
            'key_id': key_id,
            'algorithm': 'AES-256-GCM',
            'encrypted_at': datetime.utcnow().isoformat()
        }
 
    def decrypt(self, encrypted_data: Dict) -> str:
        """Decrypt data."""
 
        key = self.kms.get_key(encrypted_data['key_id'])
        nonce = base64.b64decode(encrypted_data['nonce'])
        ciphertext = base64.b64decode(encrypted_data['ciphertext'])
 
        aesgcm = AESGCM(key)
        plaintext = aesgcm.decrypt(nonce, ciphertext, None)
 
        return plaintext.decode()
 
    def rotate_encryption_key(self) -> str:
        """Rotate encryption key."""
 
        # Generate new key
        new_key_id = self.kms.create_key(
            key_type='AES-256',
            purpose='cardholder_data_encryption'
        )
 
        # Mark old key for retirement
        old_key_id = self.kms.get_current_key_id()
        self.kms.schedule_key_retirement(
            key_id=old_key_id,
            retain_for_decryption=True
        )
 
        # Set new key as current
        self.kms.set_current_key(new_key_id)
 
        return new_key_id
 
 
class KeyManagementService:
    """Key management service for PCI compliance."""
 
    def __init__(self, hsm_connection):
        self.hsm = hsm_connection
        self.key_metadata = {}
 
    def create_key(self, key_type: str, purpose: str) -> str:
        """Create new encryption key in HSM."""
 
        key_id = self.hsm.generate_key(
            algorithm=key_type,
            extractable=False
        )
 
        self.key_metadata[key_id] = {
            'created_at': datetime.utcnow().isoformat(),
            'purpose': purpose,
            'status': 'active',
            'version': 1
        }
 
        return key_id
 
    def get_key(self, key_id: str) -> bytes:
        """Get key from HSM for use."""
 
        # Keys never leave HSM - this returns a key handle
        return self.hsm.get_key_handle(key_id)
 
    def get_current_key_id(self) -> str:
        """Get current active key ID."""
 
        for key_id, metadata in self.key_metadata.items():
            if metadata['status'] == 'active' and \
               metadata['purpose'] == 'cardholder_data_encryption':
                return key_id
 
        raise ValueError("No active encryption key found")
 
    def schedule_key_retirement(
        self,
        key_id: str,
        retain_for_decryption: bool = True
    ):
        """Schedule key for retirement."""
 
        self.key_metadata[key_id]['status'] = 'retiring'
        self.key_metadata[key_id]['decrypt_only'] = retain_for_decryption
        self.key_metadata[key_id]['retired_at'] = datetime.utcnow().isoformat()

Network Security Controls

Network Segmentation

# network_segmentation.py
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
 
class NetworkZone(Enum):
    """Network security zones."""
    CDE = "cardholder_data_environment"
    INTERNAL = "internal_network"
    DMZ = "demilitarized_zone"
    EXTERNAL = "external_network"
    MANAGEMENT = "management_network"
 
@dataclass
class FirewallRule:
    """Firewall rule definition."""
    rule_id: str
    source_zone: NetworkZone
    destination_zone: NetworkZone
    source_ips: List[str]
    destination_ips: List[str]
    ports: List[int]
    protocol: str
    action: str  # allow/deny
    logging: bool
    justification: str
 
class NetworkSecurityController:
    """Manage network security for PCI compliance."""
 
    def __init__(self):
        self.firewall_rules = []
        self.zone_definitions = self._define_zones()
 
    def _define_zones(self) -> Dict[NetworkZone, Dict]:
        """Define network zones and their properties."""
 
        return {
            NetworkZone.CDE: {
                'cidr': '10.0.1.0/24',
                'description': 'Cardholder Data Environment',
                'security_level': 'critical',
                'requires_segmentation': True,
                'allowed_inbound_zones': [NetworkZone.INTERNAL],
                'allowed_outbound_zones': []
            },
            NetworkZone.INTERNAL: {
                'cidr': '10.0.2.0/24',
                'description': 'Internal Corporate Network',
                'security_level': 'high',
                'requires_segmentation': True,
                'allowed_inbound_zones': [NetworkZone.DMZ],
                'allowed_outbound_zones': [NetworkZone.CDE, NetworkZone.DMZ]
            },
            NetworkZone.DMZ: {
                'cidr': '10.0.3.0/24',
                'description': 'DMZ for public-facing services',
                'security_level': 'medium',
                'requires_segmentation': True,
                'allowed_inbound_zones': [NetworkZone.EXTERNAL],
                'allowed_outbound_zones': [NetworkZone.INTERNAL]
            },
            NetworkZone.MANAGEMENT: {
                'cidr': '10.0.4.0/24',
                'description': 'Management and monitoring',
                'security_level': 'critical',
                'requires_segmentation': True,
                'allowed_inbound_zones': [],
                'allowed_outbound_zones': [
                    NetworkZone.CDE,
                    NetworkZone.INTERNAL,
                    NetworkZone.DMZ
                ]
            }
        }
 
    def add_firewall_rule(self, rule: FirewallRule) -> Dict:
        """Add and validate firewall rule."""
 
        # Validate rule against zone policies
        validation = self._validate_rule(rule)
        if not validation['valid']:
            return validation
 
        self.firewall_rules.append(rule)
 
        return {
            'valid': True,
            'rule_id': rule.rule_id,
            'message': 'Rule added successfully'
        }
 
    def _validate_rule(self, rule: FirewallRule) -> Dict:
        """Validate rule against PCI requirements."""
 
        errors = []
 
        # Check zone access policies
        source_zone_config = self.zone_definitions.get(rule.source_zone)
        dest_zone_config = self.zone_definitions.get(rule.destination_zone)
 
        if rule.action == 'allow':
            # Verify allowed zone transitions
            if rule.destination_zone not in \
               source_zone_config['allowed_outbound_zones']:
                errors.append(
                    f"Traffic from {rule.source_zone.value} to "
                    f"{rule.destination_zone.value} not allowed by policy"
                )
 
        # Require justification for CDE access
        if rule.destination_zone == NetworkZone.CDE and not rule.justification:
            errors.append("Justification required for CDE access rules")
 
        # Require logging for all CDE-related rules
        if (rule.source_zone == NetworkZone.CDE or
            rule.destination_zone == NetworkZone.CDE) and not rule.logging:
            errors.append("Logging must be enabled for CDE traffic")
 
        # Check for overly permissive rules
        if 'any' in str(rule.source_ips) or 'any' in str(rule.destination_ips):
            if rule.destination_zone in [NetworkZone.CDE, NetworkZone.MANAGEMENT]:
                errors.append(
                    "Wildcards not allowed for CDE/Management destinations"
                )
 
        return {
            'valid': len(errors) == 0,
            'errors': errors
        }
 
    def generate_segmentation_test_plan(self) -> List[Dict]:
        """Generate network segmentation test plan."""
 
        tests = []
 
        for source_zone, source_config in self.zone_definitions.items():
            for dest_zone, dest_config in self.zone_definitions.items():
                if source_zone == dest_zone:
                    continue
 
                # Determine expected result
                should_allow = dest_zone in source_config['allowed_outbound_zones']
 
                tests.append({
                    'test_id': f"seg_test_{source_zone.value}_{dest_zone.value}",
                    'source_zone': source_zone.value,
                    'destination_zone': dest_zone.value,
                    'expected_result': 'allow' if should_allow else 'deny',
                    'test_ports': [22, 443, 3306, 5432],
                    'priority': 'critical' if dest_zone == NetworkZone.CDE else 'high'
                })
 
        return tests

Access Control Implementation

Multi-Factor Authentication

# mfa_implementation.py
from dataclasses import dataclass
from typing import List, Optional, Dict
from datetime import datetime, timedelta
import pyotp
import secrets
 
@dataclass
class MFAConfig:
    """MFA configuration for user."""
    user_id: str
    mfa_type: str  # totp, sms, hardware_token
    secret_key: Optional[str]
    backup_codes: List[str]
    enrolled_at: datetime
    last_used: Optional[datetime]
 
class MFAService:
    """Multi-factor authentication for PCI compliance."""
 
    def __init__(self, user_store, sms_provider):
        self.user_store = user_store
        self.sms = sms_provider
        self.rate_limiter = RateLimiter()
 
    def enroll_totp(self, user_id: str) -> Dict:
        """Enroll user in TOTP MFA."""
 
        # Generate secret
        secret = pyotp.random_base32()
 
        # Generate backup codes
        backup_codes = [secrets.token_hex(4) for _ in range(10)]
 
        # Create provisioning URI for authenticator apps
        totp = pyotp.TOTP(secret)
        provisioning_uri = totp.provisioning_uri(
            name=user_id,
            issuer_name='SecurePayments'
        )
 
        # Store configuration
        config = MFAConfig(
            user_id=user_id,
            mfa_type='totp',
            secret_key=secret,  # Encrypt before storing!
            backup_codes=[self._hash_code(c) for c in backup_codes],
            enrolled_at=datetime.utcnow(),
            last_used=None
        )
 
        self.user_store.save_mfa_config(config)
 
        return {
            'provisioning_uri': provisioning_uri,
            'secret': secret,
            'backup_codes': backup_codes,
            'message': 'Save backup codes securely - they cannot be recovered'
        }
 
    def verify_totp(self, user_id: str, code: str) -> Dict:
        """Verify TOTP code."""
 
        # Rate limiting
        if not self.rate_limiter.allow(f"mfa:{user_id}"):
            return {
                'valid': False,
                'error': 'Too many attempts. Please wait.'
            }
 
        config = self.user_store.get_mfa_config(user_id)
        if not config or config.mfa_type != 'totp':
            return {'valid': False, 'error': 'MFA not configured'}
 
        totp = pyotp.TOTP(config.secret_key)
 
        # Verify with time window for clock drift
        if totp.verify(code, valid_window=1):
            config.last_used = datetime.utcnow()
            self.user_store.save_mfa_config(config)
            return {'valid': True}
 
        return {'valid': False, 'error': 'Invalid code'}
 
    def verify_backup_code(self, user_id: str, code: str) -> Dict:
        """Verify backup code (one-time use)."""
 
        config = self.user_store.get_mfa_config(user_id)
        if not config:
            return {'valid': False, 'error': 'MFA not configured'}
 
        code_hash = self._hash_code(code)
 
        if code_hash in config.backup_codes:
            # Remove used code
            config.backup_codes.remove(code_hash)
            self.user_store.save_mfa_config(config)
 
            return {
                'valid': True,
                'remaining_codes': len(config.backup_codes),
                'warning': 'Backup code used. Consider regenerating codes.'
            }
 
        return {'valid': False, 'error': 'Invalid backup code'}
 
    def _hash_code(self, code: str) -> str:
        """Hash backup code for storage."""
        import hashlib
        return hashlib.sha256(code.encode()).hexdigest()
 
 
class RateLimiter:
    """Rate limiter for MFA attempts."""
 
    def __init__(self):
        self.attempts = {}
        self.max_attempts = 5
        self.window_seconds = 300  # 5 minutes
 
    def allow(self, key: str) -> bool:
        """Check if request should be allowed."""
 
        now = datetime.utcnow()
        window_start = now - timedelta(seconds=self.window_seconds)
 
        if key not in self.attempts:
            self.attempts[key] = []
 
        # Clean old attempts
        self.attempts[key] = [
            t for t in self.attempts[key] if t > window_start
        ]
 
        if len(self.attempts[key]) >= self.max_attempts:
            return False
 
        self.attempts[key].append(now)
        return True

Audit Logging

PCI-Compliant Audit Logger

# pci_audit_logger.py
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List, Optional
from enum import Enum
import json
import hashlib
 
class AuditEventType(Enum):
    """PCI DSS audit event types."""
    # Authentication events
    LOGIN_SUCCESS = "login_success"
    LOGIN_FAILURE = "login_failure"
    LOGOUT = "logout"
    MFA_SUCCESS = "mfa_success"
    MFA_FAILURE = "mfa_failure"
 
    # Cardholder data access
    CHD_ACCESS = "cardholder_data_access"
    CHD_CREATE = "cardholder_data_create"
    CHD_MODIFY = "cardholder_data_modify"
    CHD_DELETE = "cardholder_data_delete"
 
    # System events
    PRIVILEGE_USE = "privilege_use"
    CONFIG_CHANGE = "configuration_change"
    SECURITY_EVENT = "security_event"
 
    # Administrative events
    USER_CREATE = "user_create"
    USER_MODIFY = "user_modify"
    USER_DELETE = "user_delete"
    ROLE_CHANGE = "role_change"
 
@dataclass
class AuditEvent:
    """PCI DSS compliant audit event."""
    event_id: str
    timestamp: str
    event_type: AuditEventType
    user_id: str
    source_ip: str
    success: bool
    resource_type: Optional[str]
    resource_id: Optional[str]
    action: str
    details: Dict
    integrity_hash: str
 
class PCIAuditLogger:
    """PCI DSS compliant audit logging system."""
 
    # Required log fields per PCI DSS 10.2
    REQUIRED_FIELDS = [
        'event_id',
        'timestamp',
        'user_id',
        'event_type',
        'success',
        'source_ip'
    ]
 
    def __init__(self, storage_backend, retention_days: int = 365):
        self.storage = storage_backend
        self.retention_days = retention_days
        self.log_chain = []
 
    def log_event(
        self,
        event_type: AuditEventType,
        user_id: str,
        source_ip: str,
        success: bool,
        action: str,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        details: Optional[Dict] = None
    ) -> AuditEvent:
        """Log an audit event."""
 
        previous_hash = self.log_chain[-1] if self.log_chain else "genesis"
 
        event = AuditEvent(
            event_id=self._generate_event_id(),
            timestamp=datetime.utcnow().isoformat() + 'Z',
            event_type=event_type,
            user_id=user_id,
            source_ip=source_ip,
            success=success,
            resource_type=resource_type,
            resource_id=resource_id,
            action=action,
            details=details or {},
            integrity_hash=""
        )
 
        # Calculate integrity hash
        event.integrity_hash = self._calculate_hash(event, previous_hash)
        self.log_chain.append(event.integrity_hash)
 
        # Store event
        self.storage.store(self._serialize_event(event))
 
        return event
 
    def log_chd_access(
        self,
        user_id: str,
        source_ip: str,
        card_token: str,
        access_type: str,
        reason: str
    ) -> AuditEvent:
        """Log cardholder data access."""
 
        return self.log_event(
            event_type=AuditEventType.CHD_ACCESS,
            user_id=user_id,
            source_ip=source_ip,
            success=True,
            action=access_type,
            resource_type='cardholder_data',
            resource_id=card_token,
            details={
                'access_reason': reason,
                'data_masked': True
            }
        )
 
    def log_authentication(
        self,
        user_id: str,
        source_ip: str,
        success: bool,
        auth_method: str,
        failure_reason: Optional[str] = None
    ) -> AuditEvent:
        """Log authentication attempt."""
 
        event_type = AuditEventType.LOGIN_SUCCESS if success else \
                     AuditEventType.LOGIN_FAILURE
 
        return self.log_event(
            event_type=event_type,
            user_id=user_id,
            source_ip=source_ip,
            success=success,
            action='authenticate',
            details={
                'auth_method': auth_method,
                'failure_reason': failure_reason
            }
        )
 
    def _generate_event_id(self) -> str:
        """Generate unique event ID."""
        import uuid
        return str(uuid.uuid4())
 
    def _calculate_hash(self, event: AuditEvent, previous_hash: str) -> str:
        """Calculate integrity hash for event."""
 
        event_data = {
            'event_id': event.event_id,
            'timestamp': event.timestamp,
            'event_type': event.event_type.value,
            'user_id': event.user_id,
            'success': event.success,
            'previous_hash': previous_hash
        }
 
        serialized = json.dumps(event_data, sort_keys=True)
        return hashlib.sha256(serialized.encode()).hexdigest()
 
    def _serialize_event(self, event: AuditEvent) -> Dict:
        """Serialize event for storage."""
 
        return {
            'event_id': event.event_id,
            'timestamp': event.timestamp,
            'event_type': event.event_type.value,
            'user_id': event.user_id,
            'source_ip': event.source_ip,
            'success': event.success,
            'resource_type': event.resource_type,
            'resource_id': event.resource_id,
            'action': event.action,
            'details': event.details,
            'integrity_hash': event.integrity_hash
        }
 
    def verify_log_integrity(self) -> Dict:
        """Verify integrity of audit log chain."""
 
        events = self.storage.retrieve_all()
        issues = []
 
        for i, event in enumerate(events):
            expected_previous = events[i-1]['integrity_hash'] if i > 0 else "genesis"
 
            # Recalculate hash
            event_data = {
                'event_id': event['event_id'],
                'timestamp': event['timestamp'],
                'event_type': event['event_type'],
                'user_id': event['user_id'],
                'success': event['success'],
                'previous_hash': expected_previous
            }
 
            calculated_hash = hashlib.sha256(
                json.dumps(event_data, sort_keys=True).encode()
            ).hexdigest()
 
            if calculated_hash != event['integrity_hash']:
                issues.append({
                    'event_id': event['event_id'],
                    'issue': 'Hash mismatch - potential tampering'
                })
 
        return {
            'valid': len(issues) == 0,
            'events_checked': len(events),
            'issues': issues
        }
 
    def generate_compliance_report(
        self,
        start_date: datetime,
        end_date: datetime
    ) -> Dict:
        """Generate PCI DSS compliance report."""
 
        events = self.storage.query({
            'timestamp': {
                '$gte': start_date.isoformat(),
                '$lte': end_date.isoformat()
            }
        })
 
        return {
            'report_period': {
                'start': start_date.isoformat(),
                'end': end_date.isoformat()
            },
            'total_events': len(events),
            'authentication_events': {
                'successful': len([e for e in events
                    if e['event_type'] == 'login_success']),
                'failed': len([e for e in events
                    if e['event_type'] == 'login_failure'])
            },
            'chd_access_events': len([e for e in events
                if e['event_type'] == 'cardholder_data_access']),
            'security_events': len([e for e in events
                if e['event_type'] == 'security_event']),
            'log_integrity': self.verify_log_integrity()
        }

Conclusion

PCI DSS 4.0 compliance requires comprehensive controls across multiple domains:

  1. Protect Cardholder Data - Encrypt, tokenize, and minimize data retention
  2. Implement Strong Access Controls - MFA for all CDE access
  3. Maintain Secure Networks - Proper segmentation and firewall rules
  4. Monitor and Log - Comprehensive audit trails with integrity protection
  5. Regular Testing - Vulnerability scans and penetration tests

By implementing these technical controls, organizations can achieve and maintain PCI DSS compliance while protecting cardholder data from unauthorized access.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.