PCI DSS 4.0 introduces significant changes affecting how development teams build and maintain payment card processing systems. This guide provides practical implementations for achieving and maintaining compliance across all relevant requirements.
Secure Coding Standards Framework
Implement requirement 6.2 secure development practices:
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Optional
import re
import hashlib
class VulnerabilityCategory(Enum):
INJECTION = "injection"
BROKEN_AUTH = "broken_authentication"
SENSITIVE_DATA = "sensitive_data_exposure"
XXE = "xml_external_entities"
BROKEN_ACCESS = "broken_access_control"
SECURITY_MISCONFIG = "security_misconfiguration"
XSS = "cross_site_scripting"
INSECURE_DESERIALIZATION = "insecure_deserialization"
VULNERABLE_COMPONENTS = "vulnerable_components"
INSUFFICIENT_LOGGING = "insufficient_logging"
@dataclass
class CodeReviewFinding:
file_path: str
line_number: int
category: VulnerabilityCategory
severity: str
description: str
remediation: str
cwe_id: Optional[str] = None
pci_requirement: Optional[str] = None
class PCIDSSCodeAnalyzer:
def __init__(self):
self.findings: list[CodeReviewFinding] = []
self.patterns = self._load_vulnerability_patterns()
def _load_vulnerability_patterns(self) -> dict:
return {
VulnerabilityCategory.INJECTION: [
{
'pattern': r'execute\s*\(\s*["\'].*\%s.*["\']\s*\%',
'description': 'SQL injection via string formatting',
'remediation': 'Use parameterized queries',
'cwe': 'CWE-89',
'pci': '6.2.4'
},
{
'pattern': r'cursor\.execute\s*\(\s*f["\']',
'description': 'SQL injection via f-string',
'remediation': 'Use parameterized queries with placeholders',
'cwe': 'CWE-89',
'pci': '6.2.4'
},
{
'pattern': r'subprocess\.(call|run|Popen)\s*\([^)]*shell\s*=\s*True',
'description': 'Command injection risk with shell=True',
'remediation': 'Use shell=False and pass arguments as list',
'cwe': 'CWE-78',
'pci': '6.2.4'
},
{
'pattern': r'eval\s*\(\s*.*request',
'description': 'Code injection via eval on user input',
'remediation': 'Never use eval on untrusted input',
'cwe': 'CWE-94',
'pci': '6.2.4'
}
],
VulnerabilityCategory.SENSITIVE_DATA: [
{
'pattern': r'(password|secret|api_key|token)\s*=\s*["\'][^"\']+["\']',
'description': 'Hardcoded credential detected',
'remediation': 'Use environment variables or secrets manager',
'cwe': 'CWE-798',
'pci': '6.2.4, 8.3.1'
},
{
'pattern': r'print\s*\(.*password',
'description': 'Potential credential logging',
'remediation': 'Never log sensitive data',
'cwe': 'CWE-532',
'pci': '3.3.1'
},
{
'pattern': r'logging\.(info|debug|warning|error)\s*\(.*card',
'description': 'Potential PAN logging',
'remediation': 'Mask or exclude card data from logs',
'cwe': 'CWE-532',
'pci': '3.3.1, 3.4'
}
],
VulnerabilityCategory.BROKEN_AUTH: [
{
'pattern': r'md5\s*\(',
'description': 'Weak hashing algorithm (MD5)',
'remediation': 'Use bcrypt, scrypt, or Argon2 for passwords',
'cwe': 'CWE-328',
'pci': '8.3.2'
},
{
'pattern': r'sha1\s*\(',
'description': 'Weak hashing algorithm (SHA1)',
'remediation': 'Use SHA-256 or stronger',
'cwe': 'CWE-328',
'pci': '8.3.2'
},
{
'pattern': r'verify\s*=\s*False',
'description': 'SSL verification disabled',
'remediation': 'Enable SSL certificate verification',
'cwe': 'CWE-295',
'pci': '4.2.1'
}
],
VulnerabilityCategory.XSS: [
{
'pattern': r'innerHTML\s*=',
'description': 'Potential XSS via innerHTML',
'remediation': 'Use textContent or sanitize HTML',
'cwe': 'CWE-79',
'pci': '6.2.4'
},
{
'pattern': r'document\.write\s*\(',
'description': 'Potential XSS via document.write',
'remediation': 'Use DOM manipulation methods',
'cwe': 'CWE-79',
'pci': '6.2.4'
},
{
'pattern': r'\{\{\s*.*\|safe\s*\}\}',
'description': 'Template XSS via safe filter',
'remediation': 'Validate and sanitize before marking safe',
'cwe': 'CWE-79',
'pci': '6.2.4'
}
],
VulnerabilityCategory.BROKEN_ACCESS: [
{
'pattern': r'@app\.route.*methods.*POST.*\n(?!.*@login_required)',
'description': 'POST endpoint without authentication',
'remediation': 'Add authentication decorator',
'cwe': 'CWE-306',
'pci': '7.2.1'
},
{
'pattern': r'if\s+request\.user\.id\s*==\s*',
'description': 'Client-side authorization check',
'remediation': 'Implement server-side authorization',
'cwe': 'CWE-639',
'pci': '7.2.2'
}
]
}
def analyze_file(self, file_path: str, content: str) -> list[CodeReviewFinding]:
"""Analyze source code file for PCI DSS violations."""
findings = []
lines = content.split('\n')
for category, patterns in self.patterns.items():
for pattern_def in patterns:
for match in re.finditer(pattern_def['pattern'], content, re.MULTILINE | re.IGNORECASE):
# Find line number
line_num = content[:match.start()].count('\n') + 1
finding = CodeReviewFinding(
file_path=file_path,
line_number=line_num,
category=category,
severity=self._calculate_severity(category),
description=pattern_def['description'],
remediation=pattern_def['remediation'],
cwe_id=pattern_def.get('cwe'),
pci_requirement=pattern_def.get('pci')
)
findings.append(finding)
self.findings.extend(findings)
return findings
def _calculate_severity(self, category: VulnerabilityCategory) -> str:
"""Calculate severity based on vulnerability category."""
critical_categories = [
VulnerabilityCategory.INJECTION,
VulnerabilityCategory.SENSITIVE_DATA
]
high_categories = [
VulnerabilityCategory.BROKEN_AUTH,
VulnerabilityCategory.BROKEN_ACCESS
]
if category in critical_categories:
return 'CRITICAL'
elif category in high_categories:
return 'HIGH'
else:
return 'MEDIUM'
def generate_compliance_report(self) -> dict:
"""Generate PCI DSS compliance report."""
requirements_coverage = {}
for finding in self.findings:
if finding.pci_requirement:
reqs = finding.pci_requirement.split(', ')
for req in reqs:
if req not in requirements_coverage:
requirements_coverage[req] = []
requirements_coverage[req].append({
'file': finding.file_path,
'line': finding.line_number,
'severity': finding.severity,
'description': finding.description
})
return {
'scan_date': datetime.utcnow().isoformat(),
'total_findings': len(self.findings),
'critical_count': sum(1 for f in self.findings if f.severity == 'CRITICAL'),
'high_count': sum(1 for f in self.findings if f.severity == 'HIGH'),
'requirements_affected': requirements_coverage,
'findings': [
{
'file': f.file_path,
'line': f.line_number,
'category': f.category.value,
'severity': f.severity,
'description': f.description,
'remediation': f.remediation,
'cwe': f.cwe_id,
'pci_requirement': f.pci_requirement
}
for f in self.findings
]
}Cardholder Data Protection
Implement requirement 3 data protection controls:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import os
import base64
import re
from typing import Optional
class CardDataProtection:
def __init__(self, master_key: bytes):
self.master_key = master_key
self._validate_key()
def _validate_key(self):
"""Ensure key meets PCI DSS 4.0 requirements."""
# AES-256 requires 32-byte key
if len(self.master_key) != 32:
raise ValueError("Master key must be 256 bits (32 bytes)")
def encrypt_pan(self, pan: str) -> dict:
"""
Encrypt PAN using AES-256-GCM.
PCI DSS Requirement 3.5.1 - Strong cryptography
"""
# Validate PAN format
pan_clean = re.sub(r'\D', '', pan)
if not self._validate_pan(pan_clean):
raise ValueError("Invalid PAN format")
# Generate unique nonce for each encryption
nonce = os.urandom(12) # 96-bit nonce for GCM
# Derive encryption key from master key
dek = self._derive_data_key(nonce)
# Encrypt
aesgcm = AESGCM(dek)
ciphertext = aesgcm.encrypt(nonce, pan_clean.encode(), None)
return {
'encrypted_pan': base64.b64encode(ciphertext).decode(),
'nonce': base64.b64encode(nonce).decode(),
'masked_pan': self.mask_pan(pan_clean),
'last_four': pan_clean[-4:],
'bin': pan_clean[:6]
}
def decrypt_pan(self, encrypted_data: dict) -> str:
"""Decrypt PAN - requires appropriate access controls."""
ciphertext = base64.b64decode(encrypted_data['encrypted_pan'])
nonce = base64.b64decode(encrypted_data['nonce'])
# Derive same key
dek = self._derive_data_key(nonce)
# Decrypt
aesgcm = AESGCM(dek)
pan = aesgcm.decrypt(nonce, ciphertext, None)
return pan.decode()
def _derive_data_key(self, salt: bytes) -> bytes:
"""Derive data encryption key from master key."""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000
)
return kdf.derive(self.master_key)
def _validate_pan(self, pan: str) -> bool:
"""Validate PAN using Luhn algorithm."""
if not pan.isdigit() or len(pan) < 13 or len(pan) > 19:
return False
# Luhn check
digits = [int(d) for d in pan]
odd_digits = digits[-1::-2]
even_digits = digits[-2::-2]
total = sum(odd_digits)
for d in even_digits:
total += sum(divmod(d * 2, 10))
return total % 10 == 0
def mask_pan(self, pan: str) -> str:
"""
Mask PAN per PCI DSS 3.4 requirements.
Show maximum first 6 and last 4 digits.
"""
pan_clean = re.sub(r'\D', '', pan)
if len(pan_clean) < 13:
return '*' * len(pan_clean)
masked_length = len(pan_clean) - 10
return f"{pan_clean[:6]}{'*' * masked_length}{pan_clean[-4:]}"
def tokenize_pan(self, pan: str) -> dict:
"""
Create token for PAN (Requirement 3.4).
Token is not mathematically reversible to PAN.
"""
pan_clean = re.sub(r'\D', '', pan)
# Generate random token
token = base64.urlsafe_b64encode(os.urandom(16)).decode().rstrip('=')
# Create hash for validation (not for retrieving PAN)
pan_hash = hashlib.sha256(
pan_clean.encode() + self.master_key
).hexdigest()
return {
'token': token,
'pan_hash': pan_hash,
'last_four': pan_clean[-4:],
'bin': pan_clean[:6]
}
class SensitiveAuthData:
"""Handle sensitive authentication data (SAD) per Requirement 3.2."""
@staticmethod
def detect_sad_in_data(data: str) -> list[dict]:
"""Detect any SAD that should not be stored."""
findings = []
# CVV/CVC patterns
cvv_pattern = r'\b\d{3,4}\b'
# Track 1 data pattern
track1_pattern = r'%B\d{13,19}\^[\w\s/]+\^\d{4}\d*\?'
# Track 2 data pattern
track2_pattern = r';\d{13,19}=\d{4}\d*\?'
# PIN block patterns
pin_pattern = r'\b\d{4,12}\b.*pin'
if re.search(track1_pattern, data):
findings.append({
'type': 'Track 1 Data',
'requirement': '3.2.1',
'action': 'Must not be stored after authorization'
})
if re.search(track2_pattern, data):
findings.append({
'type': 'Track 2 Data',
'requirement': '3.2.1',
'action': 'Must not be stored after authorization'
})
if re.search(pin_pattern, data, re.IGNORECASE):
findings.append({
'type': 'PIN Data',
'requirement': '3.2.2',
'action': 'Must not be stored after authorization'
})
return findings
@staticmethod
def secure_delete(data: bytearray):
"""Securely overwrite sensitive data in memory."""
for i in range(len(data)):
data[i] = 0
# Multiple overwrites for additional security
for i in range(len(data)):
data[i] = 0xFF
for i in range(len(data)):
data[i] = 0
# Example usage
master_key = os.urandom(32) # In production, use HSM or key management service
protection = CardDataProtection(master_key)
# Encrypt card data
pan = "4111111111111111"
encrypted = protection.encrypt_pan(pan)
print(f"Masked PAN: {encrypted['masked_pan']}")
print(f"Token data available: last_four={encrypted['last_four']}, bin={encrypted['bin']}")Vulnerability Management System
Implement requirement 6.3 and 11.3 vulnerability management:
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
import json
class VulnerabilitySeverity(Enum):
CRITICAL = 4
HIGH = 3
MEDIUM = 2
LOW = 1
INFO = 0
@dataclass
class Vulnerability:
id: str
title: str
severity: VulnerabilitySeverity
cvss_score: float
cve_id: Optional[str]
affected_component: str
discovered_date: datetime
remediation_deadline: datetime
status: str
remediation_notes: Optional[str] = None
class PCIDSSVulnerabilityManager:
def __init__(self):
self.vulnerabilities: dict[str, Vulnerability] = {}
self.sla_days = {
VulnerabilitySeverity.CRITICAL: 1, # PCI DSS 4.0: ASAP, typically same day
VulnerabilitySeverity.HIGH: 30,
VulnerabilitySeverity.MEDIUM: 90,
VulnerabilitySeverity.LOW: 180
}
def import_scan_results(self, scan_results: list[dict]) -> list[Vulnerability]:
"""Import vulnerability scan results."""
new_vulnerabilities = []
for result in scan_results:
severity = self._map_severity(result.get('severity', 'info'))
discovered = datetime.utcnow()
deadline = discovered + timedelta(days=self.sla_days[severity])
vuln = Vulnerability(
id=result['id'],
title=result['title'],
severity=severity,
cvss_score=result.get('cvss_score', 0.0),
cve_id=result.get('cve_id'),
affected_component=result['component'],
discovered_date=discovered,
remediation_deadline=deadline,
status='open'
)
self.vulnerabilities[vuln.id] = vuln
new_vulnerabilities.append(vuln)
return new_vulnerabilities
def _map_severity(self, severity_str: str) -> VulnerabilitySeverity:
"""Map severity string to enum."""
mapping = {
'critical': VulnerabilitySeverity.CRITICAL,
'high': VulnerabilitySeverity.HIGH,
'medium': VulnerabilitySeverity.MEDIUM,
'low': VulnerabilitySeverity.LOW,
'info': VulnerabilitySeverity.INFO
}
return mapping.get(severity_str.lower(), VulnerabilitySeverity.INFO)
def get_overdue_vulnerabilities(self) -> list[Vulnerability]:
"""Get vulnerabilities past their remediation deadline."""
now = datetime.utcnow()
return [
v for v in self.vulnerabilities.values()
if v.status == 'open' and v.remediation_deadline < now
]
def update_vulnerability_status(
self,
vuln_id: str,
status: str,
notes: Optional[str] = None
):
"""Update vulnerability status with audit trail."""
if vuln_id not in self.vulnerabilities:
raise ValueError(f"Vulnerability {vuln_id} not found")
vuln = self.vulnerabilities[vuln_id]
old_status = vuln.status
vuln.status = status
vuln.remediation_notes = notes
# Audit log entry
self._log_status_change(vuln_id, old_status, status, notes)
def _log_status_change(
self,
vuln_id: str,
old_status: str,
new_status: str,
notes: Optional[str]
):
"""Log vulnerability status changes for audit."""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'vulnerability_id': vuln_id,
'old_status': old_status,
'new_status': new_status,
'notes': notes
}
print(f"AUDIT: {json.dumps(log_entry)}")
def generate_compliance_report(self) -> dict:
"""Generate PCI DSS vulnerability management report."""
now = datetime.utcnow()
open_vulns = [v for v in self.vulnerabilities.values() if v.status == 'open']
overdue = self.get_overdue_vulnerabilities()
severity_counts = {s: 0 for s in VulnerabilitySeverity}
for v in open_vulns:
severity_counts[v.severity] += 1
# Calculate average remediation time for closed vulns
closed_vulns = [v for v in self.vulnerabilities.values() if v.status == 'closed']
return {
'report_date': now.isoformat(),
'pci_requirements': ['6.3', '11.3.1', '11.3.2'],
'summary': {
'total_vulnerabilities': len(self.vulnerabilities),
'open_vulnerabilities': len(open_vulns),
'overdue_vulnerabilities': len(overdue),
'closed_vulnerabilities': len(closed_vulns)
},
'severity_breakdown': {
s.name: severity_counts[s] for s in VulnerabilitySeverity
},
'sla_compliance': {
'within_sla': len(open_vulns) - len(overdue),
'overdue': len(overdue),
'compliance_rate': (
(len(open_vulns) - len(overdue)) / len(open_vulns) * 100
if open_vulns else 100
)
},
'critical_high_details': [
{
'id': v.id,
'title': v.title,
'severity': v.severity.name,
'days_open': (now - v.discovered_date).days,
'deadline': v.remediation_deadline.isoformat(),
'overdue': v.remediation_deadline < now
}
for v in open_vulns
if v.severity in [VulnerabilitySeverity.CRITICAL, VulnerabilitySeverity.HIGH]
]
}
def check_scan_coverage(self, assets: list[str], scanned_assets: list[str]) -> dict:
"""
Verify quarterly ASV scan coverage.
PCI DSS Requirement 11.3.2
"""
scanned_set = set(scanned_assets)
assets_set = set(assets)
covered = assets_set.intersection(scanned_set)
missing = assets_set - scanned_set
return {
'total_assets': len(assets),
'scanned_assets': len(covered),
'missing_assets': list(missing),
'coverage_percentage': len(covered) / len(assets) * 100 if assets else 100,
'compliant': len(missing) == 0
}Access Control Implementation
Implement requirements 7 and 8 access controls:
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional, Set
import hashlib
import secrets
import re
class AccessLevel(Enum):
NONE = 0
READ = 1
WRITE = 2
ADMIN = 3
@dataclass
class User:
user_id: str
username: str
roles: Set[str]
last_login: Optional[datetime]
password_changed: datetime
failed_attempts: int
locked_until: Optional[datetime]
mfa_enabled: bool
@dataclass
class AccessAttempt:
timestamp: datetime
user_id: str
resource: str
action: str
granted: bool
reason: str
class PCIDSSAccessControl:
def __init__(self):
self.users: dict[str, User] = {}
self.role_permissions: dict[str, dict] = {}
self.access_log: list[AccessAttempt] = []
self.config = {
'max_failed_attempts': 6, # Req 8.1.6
'lockout_duration_minutes': 30, # Req 8.1.6
'password_max_age_days': 90, # Req 8.3.9
'session_timeout_minutes': 15, # Req 8.1.8
'password_min_length': 12, # Req 8.3.6
'password_history': 4 # Req 8.3.7
}
def define_role(self, role_name: str, permissions: dict):
"""
Define role with least privilege principle.
PCI DSS Requirement 7.1
"""
self.role_permissions[role_name] = {
'resources': permissions.get('resources', {}),
'created_at': datetime.utcnow().isoformat(),
'business_justification': permissions.get('justification', 'Not provided')
}
def check_access(
self,
user_id: str,
resource: str,
action: str
) -> tuple[bool, str]:
"""
Check if user has access to resource.
PCI DSS Requirements 7.2
"""
if user_id not in self.users:
self._log_access(user_id, resource, action, False, "User not found")
return False, "User not found"
user = self.users[user_id]
# Check if account is locked
if user.locked_until and user.locked_until > datetime.utcnow():
self._log_access(user_id, resource, action, False, "Account locked")
return False, "Account locked"
# Check MFA requirement for cardholder data access
if self._requires_mfa(resource) and not user.mfa_enabled:
self._log_access(user_id, resource, action, False, "MFA required")
return False, "MFA required for this resource"
# Check role-based permissions
for role in user.roles:
if role in self.role_permissions:
role_perms = self.role_permissions[role]['resources']
if resource in role_perms:
allowed_actions = role_perms[resource]
if action in allowed_actions or 'all' in allowed_actions:
self._log_access(user_id, resource, action, True, f"Granted via role: {role}")
return True, f"Access granted via role: {role}"
self._log_access(user_id, resource, action, False, "No matching permission")
return False, "Access denied - no matching permission"
def _requires_mfa(self, resource: str) -> bool:
"""Check if resource requires MFA (Req 8.4.2)."""
mfa_required_patterns = [
r'cardholder_data',
r'pan_.*',
r'admin_.*',
r'key_management'
]
return any(re.match(pattern, resource) for pattern in mfa_required_patterns)
def _log_access(
self,
user_id: str,
resource: str,
action: str,
granted: bool,
reason: str
):
"""Log all access attempts (Req 10.2.1)."""
attempt = AccessAttempt(
timestamp=datetime.utcnow(),
user_id=user_id,
resource=resource,
action=action,
granted=granted,
reason=reason
)
self.access_log.append(attempt)
def validate_password(self, password: str) -> tuple[bool, list[str]]:
"""
Validate password against PCI DSS 4.0 requirements.
Requirement 8.3.6
"""
errors = []
if len(password) < self.config['password_min_length']:
errors.append(f"Password must be at least {self.config['password_min_length']} characters")
if not re.search(r'[A-Z]', password):
errors.append("Password must contain uppercase letter")
if not re.search(r'[a-z]', password):
errors.append("Password must contain lowercase letter")
if not re.search(r'\d', password):
errors.append("Password must contain digit")
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
errors.append("Password must contain special character")
# Check for common patterns
common_patterns = ['password', '123456', 'qwerty', user_id if 'user_id' in dir() else '']
if any(pattern in password.lower() for pattern in common_patterns if pattern):
errors.append("Password contains common/weak pattern")
return len(errors) == 0, errors
def handle_failed_login(self, user_id: str):
"""
Handle failed login attempt.
PCI DSS Requirement 8.1.6
"""
if user_id not in self.users:
return
user = self.users[user_id]
user.failed_attempts += 1
if user.failed_attempts >= self.config['max_failed_attempts']:
user.locked_until = datetime.utcnow() + timedelta(
minutes=self.config['lockout_duration_minutes']
)
self._log_access(user_id, 'authentication', 'login', False,
f"Account locked after {user.failed_attempts} failed attempts")
def reset_failed_attempts(self, user_id: str):
"""Reset failed login counter on successful login."""
if user_id in self.users:
self.users[user_id].failed_attempts = 0
self.users[user_id].locked_until = None
def check_password_expiry(self, user_id: str) -> tuple[bool, int]:
"""
Check if password needs to be changed.
PCI DSS Requirement 8.3.9
"""
if user_id not in self.users:
return True, 0
user = self.users[user_id]
days_since_change = (datetime.utcnow() - user.password_changed).days
days_remaining = self.config['password_max_age_days'] - days_since_change
return days_remaining <= 0, days_remaining
def generate_access_review_report(self) -> dict:
"""
Generate quarterly access review report.
PCI DSS Requirement 7.2.2
"""
return {
'review_date': datetime.utcnow().isoformat(),
'requirement': '7.2.2',
'users': [
{
'user_id': u.user_id,
'username': u.username,
'roles': list(u.roles),
'last_login': u.last_login.isoformat() if u.last_login else None,
'mfa_enabled': u.mfa_enabled,
'status': 'active' if not u.locked_until else 'locked'
}
for u in self.users.values()
],
'roles': [
{
'role_name': name,
'resources': list(perms['resources'].keys()),
'justification': perms['business_justification']
}
for name, perms in self.role_permissions.items()
],
'inactive_users': [
u.user_id for u in self.users.values()
if u.last_login and (datetime.utcnow() - u.last_login).days > 90
]
}Continuous Compliance Monitoring
Implement automated compliance monitoring:
from dataclasses import dataclass
from datetime import datetime
from typing import Callable
import json
@dataclass
class ComplianceCheck:
requirement_id: str
description: str
check_function: Callable
frequency: str # daily, weekly, quarterly
last_run: datetime = None
last_result: bool = None
class PCIDSSComplianceMonitor:
def __init__(self):
self.checks: list[ComplianceCheck] = []
self.results_history: list[dict] = []
self._register_default_checks()
def _register_default_checks(self):
"""Register PCI DSS 4.0 compliance checks."""
checks = [
ComplianceCheck(
requirement_id='3.4',
description='PAN is masked when displayed',
check_function=self._check_pan_masking,
frequency='daily'
),
ComplianceCheck(
requirement_id='3.5.1',
description='Strong cryptography used for stored PAN',
check_function=self._check_encryption_strength,
frequency='weekly'
),
ComplianceCheck(
requirement_id='6.3.3',
description='Critical patches applied within 30 days',
check_function=self._check_patch_compliance,
frequency='weekly'
),
ComplianceCheck(
requirement_id='8.3.6',
description='Password complexity requirements enforced',
check_function=self._check_password_policy,
frequency='daily'
),
ComplianceCheck(
requirement_id='10.2',
description='Audit logs capturing required events',
check_function=self._check_audit_logging,
frequency='daily'
),
ComplianceCheck(
requirement_id='10.4.1',
description='Audit logs reviewed daily',
check_function=self._check_log_review,
frequency='daily'
),
ComplianceCheck(
requirement_id='11.3.1',
description='Internal vulnerability scans quarterly',
check_function=self._check_internal_scans,
frequency='quarterly'
),
ComplianceCheck(
requirement_id='11.3.2',
description='External ASV scans quarterly',
check_function=self._check_asv_scans,
frequency='quarterly'
),
ComplianceCheck(
requirement_id='12.3.1',
description='Risk assessment performed annually',
check_function=self._check_risk_assessment,
frequency='quarterly'
)
]
self.checks.extend(checks)
def run_all_checks(self) -> dict:
"""Run all compliance checks."""
results = {
'timestamp': datetime.utcnow().isoformat(),
'checks': [],
'overall_compliance': True
}
for check in self.checks:
try:
passed, details = check.check_function()
check.last_run = datetime.utcnow()
check.last_result = passed
result = {
'requirement_id': check.requirement_id,
'description': check.description,
'passed': passed,
'details': details
}
results['checks'].append(result)
if not passed:
results['overall_compliance'] = False
except Exception as e:
results['checks'].append({
'requirement_id': check.requirement_id,
'description': check.description,
'passed': False,
'details': f"Check failed with error: {str(e)}"
})
results['overall_compliance'] = False
self.results_history.append(results)
return results
def _check_pan_masking(self) -> tuple[bool, str]:
"""Check that PAN is properly masked in displays/logs."""
# Implementation would scan logs and UI for unmasked PANs
# This is a placeholder for the actual implementation
return True, "PAN masking verified in application logs"
def _check_encryption_strength(self) -> tuple[bool, str]:
"""Verify AES-256 or equivalent encryption for stored PAN."""
# Check encryption configuration
return True, "AES-256-GCM encryption verified"
def _check_patch_compliance(self) -> tuple[bool, str]:
"""Verify critical patches applied within SLA."""
# Query patch management system
return True, "All critical patches applied within 30-day SLA"
def _check_password_policy(self) -> tuple[bool, str]:
"""Verify password policy enforcement."""
policy_settings = {
'min_length': 12,
'complexity_required': True,
'max_age_days': 90,
'history_count': 4
}
return True, f"Password policy enforced: {json.dumps(policy_settings)}"
def _check_audit_logging(self) -> tuple[bool, str]:
"""Verify audit logs capture required events."""
required_events = [
'authentication_success',
'authentication_failure',
'authorization_failure',
'user_creation',
'user_modification',
'privilege_escalation',
'system_events',
'cardholder_data_access'
]
return True, f"Logging configured for: {', '.join(required_events)}"
def _check_log_review(self) -> tuple[bool, str]:
"""Verify daily log review is occurring."""
# Check log review records
return True, "Daily log review completed"
def _check_internal_scans(self) -> tuple[bool, str]:
"""Verify quarterly internal vulnerability scans."""
# Check scan records
return True, "Internal vulnerability scan completed this quarter"
def _check_asv_scans(self) -> tuple[bool, str]:
"""Verify quarterly ASV scans with passing status."""
return True, "ASV scan completed with passing status"
def _check_risk_assessment(self) -> tuple[bool, str]:
"""Verify annual risk assessment completion."""
return True, "Annual risk assessment completed"
def generate_compliance_dashboard(self) -> dict:
"""Generate compliance status dashboard."""
if not self.results_history:
self.run_all_checks()
latest = self.results_history[-1]
passed_checks = sum(1 for c in latest['checks'] if c['passed'])
total_checks = len(latest['checks'])
return {
'last_updated': latest['timestamp'],
'overall_status': 'COMPLIANT' if latest['overall_compliance'] else 'NON-COMPLIANT',
'compliance_score': f"{passed_checks}/{total_checks}",
'compliance_percentage': round(passed_checks / total_checks * 100, 1),
'failed_requirements': [
c['requirement_id'] for c in latest['checks'] if not c['passed']
],
'next_actions': self._get_remediation_actions(latest)
}
def _get_remediation_actions(self, results: dict) -> list[dict]:
"""Get remediation actions for failed checks."""
actions = []
for check in results['checks']:
if not check['passed']:
actions.append({
'requirement': check['requirement_id'],
'issue': check['description'],
'details': check['details'],
'priority': 'HIGH' if check['requirement_id'].startswith(('3.', '8.')) else 'MEDIUM'
})
return actions
# Initialize and run compliance monitoring
monitor = PCIDSSComplianceMonitor()
dashboard = monitor.generate_compliance_dashboard()
print(json.dumps(dashboard, indent=2))Conclusion
PCI DSS 4.0 compliance requires a comprehensive approach spanning secure development, data protection, vulnerability management, and access controls. Implement automated compliance monitoring to maintain continuous compliance rather than point-in-time assessments. Regular security reviews, penetration testing, and staff training complement technical controls. Remember that compliance is a continuous process - build security into your development lifecycle from the start.