Compliance

PCI DSS 4.0 Compliance Implementation Guide for Modern Applications

Nicu Constantin
--13 min lectura
#PCI DSS#compliance#payment security#data protection#security

PCI DSS 4.0 Compliance Implementation Guide for Modern Applications

PCI DSS 4.0 introduces significant changes to payment card security requirements, with a focus on outcome-based security and customized approaches. This guide provides technical implementation patterns for achieving and maintaining PCI DSS compliance.

Understanding PCI DSS 4.0 Requirements

Requirement Categories Overview

# pci_dss_requirements.yaml
pci_dss_4_0_requirements:
  build_and_maintain_secure_network:
    1: "Install and maintain network security controls"
    2: "Apply secure configurations to all system components"
 
  protect_cardholder_data:
    3: "Protect stored account data"
    4: "Protect cardholder data with strong cryptography during transmission"
 
  maintain_vulnerability_management:
    5: "Protect all systems and networks from malicious software"
    6: "Develop and maintain secure systems and software"
 
  implement_access_control:
    7: "Restrict access to system components and cardholder data"
    8: "Identify users and authenticate access to system components"
    9: "Restrict physical access to cardholder data"
 
  monitor_and_test:
    10: "Log and monitor all access to system components and cardholder data"
    11: "Test security of systems and networks regularly"
 
  maintain_security_policy:
    12: "Support information security with organizational policies and programs"

Cardholder Data Protection

Data Classification and Scope

# cardholder_data_protection.py
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
import re
import hashlib
 
class DataCategory(Enum):
    """PCI DSS data categories."""
    PAN = "primary_account_number"
    CARDHOLDER_NAME = "cardholder_name"
    EXPIRATION_DATE = "expiration_date"
    SERVICE_CODE = "service_code"
    CVV = "cvv"  # Cannot be stored
    PIN = "pin"  # Cannot be stored
    TRACK_DATA = "track_data"  # Cannot be stored
 
class StoragePolicy(Enum):
    """Data storage policies."""
    ALLOWED_ENCRYPTED = "allowed_encrypted"
    ALLOWED_MASKED = "allowed_masked"
    PROHIBITED = "prohibited"
    ALLOWED_TOKENIZED = "allowed_tokenized"
 
@dataclass
class DataClassification:
    """Classification for cardholder data."""
    category: DataCategory
    storage_policy: StoragePolicy
    retention_period_days: Optional[int]
    encryption_required: bool
    masking_rules: Optional[Dict]
 
class CardholderDataManager:
    """Manage cardholder data according to PCI DSS."""
 
    CLASSIFICATIONS = {
        DataCategory.PAN: DataClassification(
            category=DataCategory.PAN,
            storage_policy=StoragePolicy.ALLOWED_ENCRYPTED,
            retention_period_days=365,  # Business-specific
            encryption_required=True,
            masking_rules={'show_first': 6, 'show_last': 4}
        ),
        DataCategory.CARDHOLDER_NAME: DataClassification(
            category=DataCategory.CARDHOLDER_NAME,
            storage_policy=StoragePolicy.ALLOWED_ENCRYPTED,
            retention_period_days=365,
            encryption_required=True,
            masking_rules=None
        ),
        DataCategory.EXPIRATION_DATE: DataClassification(
            category=DataCategory.EXPIRATION_DATE,
            storage_policy=StoragePolicy.ALLOWED_ENCRYPTED,
            retention_period_days=365,
            encryption_required=True,
            masking_rules=None
        ),
        DataCategory.CVV: DataClassification(
            category=DataCategory.CVV,
            storage_policy=StoragePolicy.PROHIBITED,
            retention_period_days=None,
            encryption_required=False,  # Cannot store
            masking_rules=None
        ),
        DataCategory.PIN: DataClassification(
            category=DataCategory.PIN,
            storage_policy=StoragePolicy.PROHIBITED,
            retention_period_days=None,
            encryption_required=False,
            masking_rules=None
        ),
        DataCategory.TRACK_DATA: DataClassification(
            category=DataCategory.TRACK_DATA,
            storage_policy=StoragePolicy.PROHIBITED,
            retention_period_days=None,
            encryption_required=False,
            masking_rules=None
        )
    }
 
    def __init__(self, encryption_service, tokenization_service):
        self.encryption = encryption_service
        self.tokenization = tokenization_service
 
    def validate_pan(self, pan: str) -> Dict:
        """Validate PAN using Luhn algorithm."""
 
        # Remove spaces and dashes
        clean_pan = re.sub(r'[\s-]', '', pan)
 
        if not clean_pan.isdigit():
            return {'valid': False, 'error': 'PAN must contain only digits'}
 
        if len(clean_pan) < 13 or len(clean_pan) > 19:
            return {'valid': False, 'error': 'Invalid PAN length'}
 
        # Luhn algorithm
        digits = [int(d) for d in clean_pan]
        checksum = 0
 
        for i, digit in enumerate(reversed(digits)):
            if i % 2 == 1:
                digit *= 2
                if digit > 9:
                    digit -= 9
            checksum += digit
 
        is_valid = checksum % 10 == 0
 
        return {
            'valid': is_valid,
            'card_brand': self._identify_brand(clean_pan),
            'issuer_bin': clean_pan[:6]
        }
 
    def _identify_brand(self, pan: str) -> str:
        """Identify card brand from PAN."""
 
        if pan.startswith('4'):
            return 'Visa'
        elif pan.startswith(('51', '52', '53', '54', '55')) or \
             (2221 <= int(pan[:4]) <= 2720):
            return 'Mastercard'
        elif pan.startswith(('34', '37')):
            return 'American Express'
        elif pan.startswith('6011') or pan.startswith('65'):
            return 'Discover'
        else:
            return 'Unknown'
 
    def mask_pan(self, pan: str) -> str:
        """Mask PAN according to PCI DSS requirements."""
 
        clean_pan = re.sub(r'[\s-]', '', pan)
        rules = self.CLASSIFICATIONS[DataCategory.PAN].masking_rules
 
        first_digits = clean_pan[:rules['show_first']]
        last_digits = clean_pan[-rules['show_last']:]
        masked_length = len(clean_pan) - rules['show_first'] - rules['show_last']
 
        return f"{first_digits}{'*' * masked_length}{last_digits}"
 
    def tokenize_pan(self, pan: str) -> Dict:
        """Tokenize PAN for storage."""
 
        validation = self.validate_pan(pan)
        if not validation['valid']:
            raise ValueError(f"Invalid PAN: {validation['error']}")
 
        # Generate token
        token = self.tokenization.create_token(
            data=pan,
            format='preserving',  # Maintains format for legacy systems
            token_vault='pci_vault'
        )
 
        return {
            'token': token,
            'masked_pan': self.mask_pan(pan),
            'card_brand': validation['card_brand'],
            'last_four': pan[-4:]
        }
 
    def can_store(self, category: DataCategory) -> bool:
        """Check if data category can be stored."""
 
        classification = self.CLASSIFICATIONS.get(category)
        if not classification:
            return False
 
        return classification.storage_policy != StoragePolicy.PROHIBITED
 
    def prepare_for_storage(self, data: Dict[DataCategory, str]) -> Dict:
        """Prepare cardholder data for secure storage."""
 
        prepared = {}
 
        for category, value in data.items():
            classification = self.CLASSIFICATIONS.get(category)
 
            if not classification:
                raise ValueError(f"Unknown data category: {category}")
 
            if classification.storage_policy == StoragePolicy.PROHIBITED:
                raise ValueError(
                    f"Cannot store {category.value} - prohibited by PCI DSS"
                )
 
            if category == DataCategory.PAN:
                # Always tokenize PAN
                token_data = self.tokenize_pan(value)
                prepared['pan_token'] = token_data['token']
                prepared['masked_pan'] = token_data['masked_pan']
                prepared['last_four'] = token_data['last_four']
            elif classification.encryption_required:
                # Encrypt other sensitive data
                prepared[category.value] = self.encryption.encrypt(value)
            else:
                prepared[category.value] = value
 
        return prepared

Encryption Implementation

# pci_encryption.py
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import os
import base64
from typing import Dict, Optional
from datetime import datetime, timedelta
 
class PCIEncryptionService:
    """PCI DSS compliant encryption service."""
 
    def __init__(self, key_management_service):
        self.kms = key_management_service
        self.key_rotation_interval = timedelta(days=365)
 
    def encrypt(self, plaintext: str, key_id: Optional[str] = None) -> Dict:
        """Encrypt data with authenticated encryption."""
 
        if not key_id:
            key_id = self.kms.get_current_key_id()
 
        # Get encryption key from KMS
        key = self.kms.get_key(key_id)
 
        # Generate random nonce
        nonce = os.urandom(12)
 
        # Encrypt using AES-256-GCM
        aesgcm = AESGCM(key)
        ciphertext = aesgcm.encrypt(nonce, plaintext.encode(), None)
 
        return {
            'ciphertext': base64.b64encode(ciphertext).decode(),
            'nonce': base64.b64encode(nonce).decode(),
            'key_id': key_id,
            'algorithm': 'AES-256-GCM',
            'encrypted_at': datetime.utcnow().isoformat()
        }
 
    def decrypt(self, encrypted_data: Dict) -> str:
        """Decrypt data."""
 
        key = self.kms.get_key(encrypted_data['key_id'])
        nonce = base64.b64decode(encrypted_data['nonce'])
        ciphertext = base64.b64decode(encrypted_data['ciphertext'])
 
        aesgcm = AESGCM(key)
        plaintext = aesgcm.decrypt(nonce, ciphertext, None)
 
        return plaintext.decode()
 
    def rotate_encryption_key(self) -> str:
        """Rotate encryption key."""
 
        # Generate new key
        new_key_id = self.kms.create_key(
            key_type='AES-256',
            purpose='cardholder_data_encryption'
        )
 
        # Mark old key for retirement
        old_key_id = self.kms.get_current_key_id()
        self.kms.schedule_key_retirement(
            key_id=old_key_id,
            retain_for_decryption=True
        )
 
        # Set new key as current
        self.kms.set_current_key(new_key_id)
 
        return new_key_id
 
 
class KeyManagementService:
    """Key management service for PCI compliance."""
 
    def __init__(self, hsm_connection):
        self.hsm = hsm_connection
        self.key_metadata = {}
 
    def create_key(self, key_type: str, purpose: str) -> str:
        """Create new encryption key in HSM."""
 
        key_id = self.hsm.generate_key(
            algorithm=key_type,
            extractable=False
        )
 
        self.key_metadata[key_id] = {
            'created_at': datetime.utcnow().isoformat(),
            'purpose': purpose,
            'status': 'active',
            'version': 1
        }
 
        return key_id
 
    def get_key(self, key_id: str) -> bytes:
        """Get key from HSM for use."""
 
        # Keys never leave HSM - this returns a key handle
        return self.hsm.get_key_handle(key_id)
 
    def get_current_key_id(self) -> str:
        """Get current active key ID."""
 
        for key_id, metadata in self.key_metadata.items():
            if metadata['status'] == 'active' and \
               metadata['purpose'] == 'cardholder_data_encryption':
                return key_id
 
        raise ValueError("No active encryption key found")
 
    def schedule_key_retirement(
        self,
        key_id: str,
        retain_for_decryption: bool = True
    ):
        """Schedule key for retirement."""
 
        self.key_metadata[key_id]['status'] = 'retiring'
        self.key_metadata[key_id]['decrypt_only'] = retain_for_decryption
        self.key_metadata[key_id]['retired_at'] = datetime.utcnow().isoformat()

Network Security Controls

Network Segmentation

# network_segmentation.py
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
 
class NetworkZone(Enum):
    """Network security zones."""
    CDE = "cardholder_data_environment"
    INTERNAL = "internal_network"
    DMZ = "demilitarized_zone"
    EXTERNAL = "external_network"
    MANAGEMENT = "management_network"
 
@dataclass
class FirewallRule:
    """Firewall rule definition."""
    rule_id: str
    source_zone: NetworkZone
    destination_zone: NetworkZone
    source_ips: List[str]
    destination_ips: List[str]
    ports: List[int]
    protocol: str
    action: str  # allow/deny
    logging: bool
    justification: str
 
class NetworkSecurityController:
    """Manage network security for PCI compliance."""
 
    def __init__(self):
        self.firewall_rules = []
        self.zone_definitions = self._define_zones()
 
    def _define_zones(self) -> Dict[NetworkZone, Dict]:
        """Define network zones and their properties."""
 
        return {
            NetworkZone.CDE: {
                'cidr': '10.0.1.0/24',
                'description': 'Cardholder Data Environment',
                'security_level': 'critical',
                'requires_segmentation': True,
                'allowed_inbound_zones': [NetworkZone.INTERNAL],
                'allowed_outbound_zones': []
            },
            NetworkZone.INTERNAL: {
                'cidr': '10.0.2.0/24',
                'description': 'Internal Corporate Network',
                'security_level': 'high',
                'requires_segmentation': True,
                'allowed_inbound_zones': [NetworkZone.DMZ],
                'allowed_outbound_zones': [NetworkZone.CDE, NetworkZone.DMZ]
            },
            NetworkZone.DMZ: {
                'cidr': '10.0.3.0/24',
                'description': 'DMZ for public-facing services',
                'security_level': 'medium',
                'requires_segmentation': True,
                'allowed_inbound_zones': [NetworkZone.EXTERNAL],
                'allowed_outbound_zones': [NetworkZone.INTERNAL]
            },
            NetworkZone.MANAGEMENT: {
                'cidr': '10.0.4.0/24',
                'description': 'Management and monitoring',
                'security_level': 'critical',
                'requires_segmentation': True,
                'allowed_inbound_zones': [],
                'allowed_outbound_zones': [
                    NetworkZone.CDE,
                    NetworkZone.INTERNAL,
                    NetworkZone.DMZ
                ]
            }
        }
 
    def add_firewall_rule(self, rule: FirewallRule) -> Dict:
        """Add and validate firewall rule."""
 
        # Validate rule against zone policies
        validation = self._validate_rule(rule)
        if not validation['valid']:
            return validation
 
        self.firewall_rules.append(rule)
 
        return {
            'valid': True,
            'rule_id': rule.rule_id,
            'message': 'Rule added successfully'
        }
 
    def _validate_rule(self, rule: FirewallRule) -> Dict:
        """Validate rule against PCI requirements."""
 
        errors = []
 
        # Check zone access policies
        source_zone_config = self.zone_definitions.get(rule.source_zone)
        dest_zone_config = self.zone_definitions.get(rule.destination_zone)
 
        if rule.action == 'allow':
            # Verify allowed zone transitions
            if rule.destination_zone not in \
               source_zone_config['allowed_outbound_zones']:
                errors.append(
                    f"Traffic from {rule.source_zone.value} to "
                    f"{rule.destination_zone.value} not allowed by policy"
                )
 
        # Require justification for CDE access
        if rule.destination_zone == NetworkZone.CDE and not rule.justification:
            errors.append("Justification required for CDE access rules")
 
        # Require logging for all CDE-related rules
        if (rule.source_zone == NetworkZone.CDE or
            rule.destination_zone == NetworkZone.CDE) and not rule.logging:
            errors.append("Logging must be enabled for CDE traffic")
 
        # Check for overly permissive rules
        if 'any' in str(rule.source_ips) or 'any' in str(rule.destination_ips):
            if rule.destination_zone in [NetworkZone.CDE, NetworkZone.MANAGEMENT]:
                errors.append(
                    "Wildcards not allowed for CDE/Management destinations"
                )
 
        return {
            'valid': len(errors) == 0,
            'errors': errors
        }
 
    def generate_segmentation_test_plan(self) -> List[Dict]:
        """Generate network segmentation test plan."""
 
        tests = []
 
        for source_zone, source_config in self.zone_definitions.items():
            for dest_zone, dest_config in self.zone_definitions.items():
                if source_zone == dest_zone:
                    continue
 
                # Determine expected result
                should_allow = dest_zone in source_config['allowed_outbound_zones']
 
                tests.append({
                    'test_id': f"seg_test_{source_zone.value}_{dest_zone.value}",
                    'source_zone': source_zone.value,
                    'destination_zone': dest_zone.value,
                    'expected_result': 'allow' if should_allow else 'deny',
                    'test_ports': [22, 443, 3306, 5432],
                    'priority': 'critical' if dest_zone == NetworkZone.CDE else 'high'
                })
 
        return tests

Access Control Implementation

Multi-Factor Authentication

# mfa_implementation.py
from dataclasses import dataclass
from typing import List, Optional, Dict
from datetime import datetime, timedelta
import pyotp
import secrets
 
@dataclass
class MFAConfig:
    """MFA configuration for user."""
    user_id: str
    mfa_type: str  # totp, sms, hardware_token
    secret_key: Optional[str]
    backup_codes: List[str]
    enrolled_at: datetime
    last_used: Optional[datetime]
 
class MFAService:
    """Multi-factor authentication for PCI compliance."""
 
    def __init__(self, user_store, sms_provider):
        self.user_store = user_store
        self.sms = sms_provider
        self.rate_limiter = RateLimiter()
 
    def enroll_totp(self, user_id: str) -> Dict:
        """Enroll user in TOTP MFA."""
 
        # Generate secret
        secret = pyotp.random_base32()
 
        # Generate backup codes
        backup_codes = [secrets.token_hex(4) for _ in range(10)]
 
        # Create provisioning URI for authenticator apps
        totp = pyotp.TOTP(secret)
        provisioning_uri = totp.provisioning_uri(
            name=user_id,
            issuer_name='SecurePayments'
        )
 
        # Store configuration
        config = MFAConfig(
            user_id=user_id,
            mfa_type='totp',
            secret_key=secret,  # Encrypt before storing!
            backup_codes=[self._hash_code(c) for c in backup_codes],
            enrolled_at=datetime.utcnow(),
            last_used=None
        )
 
        self.user_store.save_mfa_config(config)
 
        return {
            'provisioning_uri': provisioning_uri,
            'secret': secret,
            'backup_codes': backup_codes,
            'message': 'Save backup codes securely - they cannot be recovered'
        }
 
    def verify_totp(self, user_id: str, code: str) -> Dict:
        """Verify TOTP code."""
 
        # Rate limiting
        if not self.rate_limiter.allow(f"mfa:{user_id}"):
            return {
                'valid': False,
                'error': 'Too many attempts. Please wait.'
            }
 
        config = self.user_store.get_mfa_config(user_id)
        if not config or config.mfa_type != 'totp':
            return {'valid': False, 'error': 'MFA not configured'}
 
        totp = pyotp.TOTP(config.secret_key)
 
        # Verify with time window for clock drift
        if totp.verify(code, valid_window=1):
            config.last_used = datetime.utcnow()
            self.user_store.save_mfa_config(config)
            return {'valid': True}
 
        return {'valid': False, 'error': 'Invalid code'}
 
    def verify_backup_code(self, user_id: str, code: str) -> Dict:
        """Verify backup code (one-time use)."""
 
        config = self.user_store.get_mfa_config(user_id)
        if not config:
            return {'valid': False, 'error': 'MFA not configured'}
 
        code_hash = self._hash_code(code)
 
        if code_hash in config.backup_codes:
            # Remove used code
            config.backup_codes.remove(code_hash)
            self.user_store.save_mfa_config(config)
 
            return {
                'valid': True,
                'remaining_codes': len(config.backup_codes),
                'warning': 'Backup code used. Consider regenerating codes.'
            }
 
        return {'valid': False, 'error': 'Invalid backup code'}
 
    def _hash_code(self, code: str) -> str:
        """Hash backup code for storage."""
        import hashlib
        return hashlib.sha256(code.encode()).hexdigest()
 
 
class RateLimiter:
    """Rate limiter for MFA attempts."""
 
    def __init__(self):
        self.attempts = {}
        self.max_attempts = 5
        self.window_seconds = 300  # 5 minutes
 
    def allow(self, key: str) -> bool:
        """Check if request should be allowed."""
 
        now = datetime.utcnow()
        window_start = now - timedelta(seconds=self.window_seconds)
 
        if key not in self.attempts:
            self.attempts[key] = []
 
        # Clean old attempts
        self.attempts[key] = [
            t for t in self.attempts[key] if t > window_start
        ]
 
        if len(self.attempts[key]) >= self.max_attempts:
            return False
 
        self.attempts[key].append(now)
        return True

Audit Logging

PCI-Compliant Audit Logger

# pci_audit_logger.py
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List, Optional
from enum import Enum
import json
import hashlib
 
class AuditEventType(Enum):
    """PCI DSS audit event types."""
    # Authentication events
    LOGIN_SUCCESS = "login_success"
    LOGIN_FAILURE = "login_failure"
    LOGOUT = "logout"
    MFA_SUCCESS = "mfa_success"
    MFA_FAILURE = "mfa_failure"
 
    # Cardholder data access
    CHD_ACCESS = "cardholder_data_access"
    CHD_CREATE = "cardholder_data_create"
    CHD_MODIFY = "cardholder_data_modify"
    CHD_DELETE = "cardholder_data_delete"
 
    # System events
    PRIVILEGE_USE = "privilege_use"
    CONFIG_CHANGE = "configuration_change"
    SECURITY_EVENT = "security_event"
 
    # Administrative events
    USER_CREATE = "user_create"
    USER_MODIFY = "user_modify"
    USER_DELETE = "user_delete"
    ROLE_CHANGE = "role_change"
 
@dataclass
class AuditEvent:
    """PCI DSS compliant audit event."""
    event_id: str
    timestamp: str
    event_type: AuditEventType
    user_id: str
    source_ip: str
    success: bool
    resource_type: Optional[str]
    resource_id: Optional[str]
    action: str
    details: Dict
    integrity_hash: str
 
class PCIAuditLogger:
    """PCI DSS compliant audit logging system."""
 
    # Required log fields per PCI DSS 10.2
    REQUIRED_FIELDS = [
        'event_id',
        'timestamp',
        'user_id',
        'event_type',
        'success',
        'source_ip'
    ]
 
    def __init__(self, storage_backend, retention_days: int = 365):
        self.storage = storage_backend
        self.retention_days = retention_days
        self.log_chain = []
 
    def log_event(
        self,
        event_type: AuditEventType,
        user_id: str,
        source_ip: str,
        success: bool,
        action: str,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        details: Optional[Dict] = None
    ) -> AuditEvent:
        """Log an audit event."""
 
        previous_hash = self.log_chain[-1] if self.log_chain else "genesis"
 
        event = AuditEvent(
            event_id=self._generate_event_id(),
            timestamp=datetime.utcnow().isoformat() + 'Z',
            event_type=event_type,
            user_id=user_id,
            source_ip=source_ip,
            success=success,
            resource_type=resource_type,
            resource_id=resource_id,
            action=action,
            details=details or {},
            integrity_hash=""
        )
 
        # Calculate integrity hash
        event.integrity_hash = self._calculate_hash(event, previous_hash)
        self.log_chain.append(event.integrity_hash)
 
        # Store event
        self.storage.store(self._serialize_event(event))
 
        return event
 
    def log_chd_access(
        self,
        user_id: str,
        source_ip: str,
        card_token: str,
        access_type: str,
        reason: str
    ) -> AuditEvent:
        """Log cardholder data access."""
 
        return self.log_event(
            event_type=AuditEventType.CHD_ACCESS,
            user_id=user_id,
            source_ip=source_ip,
            success=True,
            action=access_type,
            resource_type='cardholder_data',
            resource_id=card_token,
            details={
                'access_reason': reason,
                'data_masked': True
            }
        )
 
    def log_authentication(
        self,
        user_id: str,
        source_ip: str,
        success: bool,
        auth_method: str,
        failure_reason: Optional[str] = None
    ) -> AuditEvent:
        """Log authentication attempt."""
 
        event_type = AuditEventType.LOGIN_SUCCESS if success else \
                     AuditEventType.LOGIN_FAILURE
 
        return self.log_event(
            event_type=event_type,
            user_id=user_id,
            source_ip=source_ip,
            success=success,
            action='authenticate',
            details={
                'auth_method': auth_method,
                'failure_reason': failure_reason
            }
        )
 
    def _generate_event_id(self) -> str:
        """Generate unique event ID."""
        import uuid
        return str(uuid.uuid4())
 
    def _calculate_hash(self, event: AuditEvent, previous_hash: str) -> str:
        """Calculate integrity hash for event."""
 
        event_data = {
            'event_id': event.event_id,
            'timestamp': event.timestamp,
            'event_type': event.event_type.value,
            'user_id': event.user_id,
            'success': event.success,
            'previous_hash': previous_hash
        }
 
        serialized = json.dumps(event_data, sort_keys=True)
        return hashlib.sha256(serialized.encode()).hexdigest()
 
    def _serialize_event(self, event: AuditEvent) -> Dict:
        """Serialize event for storage."""
 
        return {
            'event_id': event.event_id,
            'timestamp': event.timestamp,
            'event_type': event.event_type.value,
            'user_id': event.user_id,
            'source_ip': event.source_ip,
            'success': event.success,
            'resource_type': event.resource_type,
            'resource_id': event.resource_id,
            'action': event.action,
            'details': event.details,
            'integrity_hash': event.integrity_hash
        }
 
    def verify_log_integrity(self) -> Dict:
        """Verify integrity of audit log chain."""
 
        events = self.storage.retrieve_all()
        issues = []
 
        for i, event in enumerate(events):
            expected_previous = events[i-1]['integrity_hash'] if i > 0 else "genesis"
 
            # Recalculate hash
            event_data = {
                'event_id': event['event_id'],
                'timestamp': event['timestamp'],
                'event_type': event['event_type'],
                'user_id': event['user_id'],
                'success': event['success'],
                'previous_hash': expected_previous
            }
 
            calculated_hash = hashlib.sha256(
                json.dumps(event_data, sort_keys=True).encode()
            ).hexdigest()
 
            if calculated_hash != event['integrity_hash']:
                issues.append({
                    'event_id': event['event_id'],
                    'issue': 'Hash mismatch - potential tampering'
                })
 
        return {
            'valid': len(issues) == 0,
            'events_checked': len(events),
            'issues': issues
        }
 
    def generate_compliance_report(
        self,
        start_date: datetime,
        end_date: datetime
    ) -> Dict:
        """Generate PCI DSS compliance report."""
 
        events = self.storage.query({
            'timestamp': {
                '$gte': start_date.isoformat(),
                '$lte': end_date.isoformat()
            }
        })
 
        return {
            'report_period': {
                'start': start_date.isoformat(),
                'end': end_date.isoformat()
            },
            'total_events': len(events),
            'authentication_events': {
                'successful': len([e for e in events
                    if e['event_type'] == 'login_success']),
                'failed': len([e for e in events
                    if e['event_type'] == 'login_failure'])
            },
            'chd_access_events': len([e for e in events
                if e['event_type'] == 'cardholder_data_access']),
            'security_events': len([e for e in events
                if e['event_type'] == 'security_event']),
            'log_integrity': self.verify_log_integrity()
        }

Conclusion

PCI DSS 4.0 compliance requires comprehensive controls across multiple domains:

  1. Protect Cardholder Data - Encrypt, tokenize, and minimize data retention
  2. Implement Strong Access Controls - MFA for all CDE access
  3. Maintain Secure Networks - Proper segmentation and firewall rules
  4. Monitor and Log - Comprehensive audit trails with integrity protection
  5. Regular Testing - Vulnerability scans and penetration tests

By implementing these technical controls, organizations can achieve and maintain PCI DSS compliance while protecting cardholder data from unauthorized access.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.