DevSecOps

Container Security Best Practices for Kubernetes Environments

DeviDevs Team
13 min read
#kubernetes#container-security#devsecops#docker#pod-security

Container security in Kubernetes requires a defense-in-depth approach spanning image build, deployment admission, runtime protection, and network segmentation. This guide provides production-ready implementations for securing your Kubernetes workloads.

Image Security Scanner

Scan container images for vulnerabilities before deployment:

import subprocess
import json
from dataclasses import dataclass
from typing import Optional
from enum import Enum
 
class Severity(Enum):
    CRITICAL = "CRITICAL"
    HIGH = "HIGH"
    MEDIUM = "MEDIUM"
    LOW = "LOW"
    UNKNOWN = "UNKNOWN"
 
@dataclass
class Vulnerability:
    id: str
    package: str
    installed_version: str
    fixed_version: Optional[str]
    severity: Severity
    title: str
    description: str
 
@dataclass
class ScanResult:
    image: str
    vulnerabilities: list[Vulnerability]
    passed: bool
    critical_count: int
    high_count: int
 
class ContainerSecurityScanner:
    def __init__(self, config: dict):
        self.config = config
        self.severity_threshold = config.get('severity_threshold', 'HIGH')
        self.max_critical = config.get('max_critical', 0)
        self.max_high = config.get('max_high', 5)
        self.ignore_unfixed = config.get('ignore_unfixed', False)
 
    def scan_image(self, image: str) -> ScanResult:
        """Scan container image using Trivy."""
        cmd = [
            'trivy', 'image',
            '--format', 'json',
            '--severity', 'CRITICAL,HIGH,MEDIUM,LOW',
            image
        ]
 
        if self.ignore_unfixed:
            cmd.append('--ignore-unfixed')
 
        result = subprocess.run(cmd, capture_output=True, text=True)
        scan_data = json.loads(result.stdout)
 
        vulnerabilities = []
        critical_count = 0
        high_count = 0
 
        for result_item in scan_data.get('Results', []):
            for vuln in result_item.get('Vulnerabilities', []):
                severity = Severity(vuln.get('Severity', 'UNKNOWN'))
 
                vulnerability = Vulnerability(
                    id=vuln['VulnerabilityID'],
                    package=vuln['PkgName'],
                    installed_version=vuln['InstalledVersion'],
                    fixed_version=vuln.get('FixedVersion'),
                    severity=severity,
                    title=vuln.get('Title', ''),
                    description=vuln.get('Description', '')
                )
                vulnerabilities.append(vulnerability)
 
                if severity == Severity.CRITICAL:
                    critical_count += 1
                elif severity == Severity.HIGH:
                    high_count += 1
 
        passed = (
            critical_count <= self.max_critical and
            high_count <= self.max_high
        )
 
        return ScanResult(
            image=image,
            vulnerabilities=vulnerabilities,
            passed=passed,
            critical_count=critical_count,
            high_count=high_count
        )
 
    def generate_sbom(self, image: str) -> dict:
        """Generate Software Bill of Materials for image."""
        cmd = [
            'syft', image,
            '-o', 'cyclonedx-json'
        ]
 
        result = subprocess.run(cmd, capture_output=True, text=True)
        return json.loads(result.stdout)
 
    def check_base_image(self, dockerfile_path: str) -> dict:
        """Analyze base image security."""
        with open(dockerfile_path, 'r') as f:
            content = f.read()
 
        issues = []
 
        # Check for latest tag
        if ':latest' in content or 'FROM ' in content and ':' not in content.split('FROM ')[1].split()[0]:
            issues.append({
                'severity': 'HIGH',
                'message': 'Using latest or untagged base image - pin to specific version'
            })
 
        # Check for root user
        if 'USER' not in content:
            issues.append({
                'severity': 'MEDIUM',
                'message': 'No USER instruction - container may run as root'
            })
 
        # Check for COPY vs ADD
        if 'ADD ' in content and 'http' not in content:
            issues.append({
                'severity': 'LOW',
                'message': 'Use COPY instead of ADD for local files'
            })
 
        # Check for secrets in build
        secret_patterns = ['PASSWORD', 'SECRET', 'API_KEY', 'TOKEN']
        for pattern in secret_patterns:
            if f'ENV {pattern}' in content or f'ARG {pattern}' in content:
                issues.append({
                    'severity': 'CRITICAL',
                    'message': f'Potential secret in Dockerfile: {pattern}'
                })
 
        return {
            'dockerfile': dockerfile_path,
            'issues': issues,
            'passed': not any(i['severity'] == 'CRITICAL' for i in issues)
        }

Kubernetes Admission Controller

Implement policy enforcement at admission time:

from flask import Flask, request, jsonify
import base64
import json
from typing import Optional
 
app = Flask(__name__)
 
class AdmissionController:
    def __init__(self):
        self.policies = self._load_policies()
 
    def _load_policies(self) -> dict:
        return {
            'require_resource_limits': True,
            'require_security_context': True,
            'deny_privileged': True,
            'deny_host_network': True,
            'deny_host_pid': True,
            'allowed_registries': [
                'gcr.io/my-project',
                'docker.io/library',
                'ghcr.io/my-org'
            ],
            'required_labels': ['app', 'owner', 'environment'],
            'max_cpu_limit': '4',
            'max_memory_limit': '8Gi',
            'deny_latest_tag': True
        }
 
    def validate_pod(self, pod_spec: dict, metadata: dict) -> tuple[bool, list[str]]:
        """Validate pod against security policies."""
        violations = []
 
        # Check required labels
        labels = metadata.get('labels', {})
        for required_label in self.policies['required_labels']:
            if required_label not in labels:
                violations.append(f"Missing required label: {required_label}")
 
        containers = pod_spec.get('containers', [])
        init_containers = pod_spec.get('initContainers', [])
        all_containers = containers + init_containers
 
        for container in all_containers:
            name = container.get('name', 'unknown')
 
            # Check image registry
            image = container.get('image', '')
            if self.policies['allowed_registries']:
                allowed = any(
                    image.startswith(reg)
                    for reg in self.policies['allowed_registries']
                )
                if not allowed:
                    violations.append(
                        f"Container {name}: Image from unauthorized registry: {image}"
                    )
 
            # Check for latest tag
            if self.policies['deny_latest_tag']:
                if ':latest' in image or ':' not in image.split('/')[-1]:
                    violations.append(
                        f"Container {name}: Using latest or untagged image"
                    )
 
            # Check resource limits
            if self.policies['require_resource_limits']:
                resources = container.get('resources', {})
                limits = resources.get('limits', {})
                requests = resources.get('requests', {})
 
                if not limits.get('cpu') or not limits.get('memory'):
                    violations.append(
                        f"Container {name}: Missing resource limits"
                    )
 
                if not requests.get('cpu') or not requests.get('memory'):
                    violations.append(
                        f"Container {name}: Missing resource requests"
                    )
 
            # Check security context
            security_context = container.get('securityContext', {})
 
            if self.policies['require_security_context']:
                if security_context.get('runAsNonRoot') is not True:
                    violations.append(
                        f"Container {name}: Must set runAsNonRoot: true"
                    )
 
                if security_context.get('readOnlyRootFilesystem') is not True:
                    violations.append(
                        f"Container {name}: Must set readOnlyRootFilesystem: true"
                    )
 
                if security_context.get('allowPrivilegeEscalation') is not False:
                    violations.append(
                        f"Container {name}: Must set allowPrivilegeEscalation: false"
                    )
 
            if self.policies['deny_privileged']:
                if security_context.get('privileged'):
                    violations.append(
                        f"Container {name}: Privileged containers not allowed"
                    )
 
        # Check pod-level security
        if self.policies['deny_host_network']:
            if pod_spec.get('hostNetwork'):
                violations.append("Host network access not allowed")
 
        if self.policies['deny_host_pid']:
            if pod_spec.get('hostPID'):
                violations.append("Host PID namespace not allowed")
 
        return len(violations) == 0, violations
 
admission_controller = AdmissionController()
 
@app.route('/validate', methods=['POST'])
def validate():
    """Kubernetes admission webhook endpoint."""
    admission_review = request.get_json()
 
    request_obj = admission_review['request']
    uid = request_obj['uid']
 
    # Extract pod spec based on resource type
    kind = request_obj['kind']['kind']
    obj = request_obj.get('object', {})
 
    if kind == 'Pod':
        pod_spec = obj.get('spec', {})
        metadata = obj.get('metadata', {})
    elif kind in ['Deployment', 'StatefulSet', 'DaemonSet', 'ReplicaSet']:
        pod_spec = obj.get('spec', {}).get('template', {}).get('spec', {})
        metadata = obj.get('spec', {}).get('template', {}).get('metadata', {})
    else:
        # Allow non-pod resources
        return jsonify({
            'apiVersion': 'admission.k8s.io/v1',
            'kind': 'AdmissionReview',
            'response': {
                'uid': uid,
                'allowed': True
            }
        })
 
    allowed, violations = admission_controller.validate_pod(pod_spec, metadata)
 
    response = {
        'apiVersion': 'admission.k8s.io/v1',
        'kind': 'AdmissionReview',
        'response': {
            'uid': uid,
            'allowed': allowed
        }
    }
 
    if not allowed:
        response['response']['status'] = {
            'code': 403,
            'message': 'Policy violations: ' + '; '.join(violations)
        }
 
    return jsonify(response)
 
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8443, ssl_context='adhoc')

Network Policy Generator

Create comprehensive network policies:

from dataclasses import dataclass
from typing import Optional
import yaml
 
@dataclass
class NetworkPolicyRule:
    namespace: str
    pod_selector: dict
    ingress_from: Optional[list] = None
    egress_to: Optional[list] = None
    ports: Optional[list] = None
 
class NetworkPolicyGenerator:
    def __init__(self, namespace: str):
        self.namespace = namespace
 
    def generate_default_deny_all(self) -> dict:
        """Generate default deny-all policy."""
        return {
            'apiVersion': 'networking.k8s.io/v1',
            'kind': 'NetworkPolicy',
            'metadata': {
                'name': 'default-deny-all',
                'namespace': self.namespace
            },
            'spec': {
                'podSelector': {},
                'policyTypes': ['Ingress', 'Egress']
            }
        }
 
    def generate_allow_dns(self) -> dict:
        """Allow DNS resolution for all pods."""
        return {
            'apiVersion': 'networking.k8s.io/v1',
            'kind': 'NetworkPolicy',
            'metadata': {
                'name': 'allow-dns',
                'namespace': self.namespace
            },
            'spec': {
                'podSelector': {},
                'policyTypes': ['Egress'],
                'egress': [{
                    'to': [{
                        'namespaceSelector': {
                            'matchLabels': {
                                'kubernetes.io/metadata.name': 'kube-system'
                            }
                        },
                        'podSelector': {
                            'matchLabels': {
                                'k8s-app': 'kube-dns'
                            }
                        }
                    }],
                    'ports': [
                        {'protocol': 'UDP', 'port': 53},
                        {'protocol': 'TCP', 'port': 53}
                    ]
                }]
            }
        }
 
    def generate_microservice_policy(
        self,
        service_name: str,
        allowed_ingress: list[dict],
        allowed_egress: list[dict],
        ingress_ports: list[int],
        egress_ports: list[dict]
    ) -> dict:
        """Generate policy for a microservice."""
        policy = {
            'apiVersion': 'networking.k8s.io/v1',
            'kind': 'NetworkPolicy',
            'metadata': {
                'name': f'{service_name}-policy',
                'namespace': self.namespace
            },
            'spec': {
                'podSelector': {
                    'matchLabels': {
                        'app': service_name
                    }
                },
                'policyTypes': ['Ingress', 'Egress'],
                'ingress': [],
                'egress': []
            }
        }
 
        # Build ingress rules
        if allowed_ingress:
            ingress_rule = {
                'from': [],
                'ports': [{'port': p, 'protocol': 'TCP'} for p in ingress_ports]
            }
 
            for source in allowed_ingress:
                if source.get('namespace'):
                    ingress_rule['from'].append({
                        'namespaceSelector': {
                            'matchLabels': {
                                'kubernetes.io/metadata.name': source['namespace']
                            }
                        },
                        'podSelector': {
                            'matchLabels': source.get('labels', {})
                        }
                    })
                elif source.get('cidr'):
                    ingress_rule['from'].append({
                        'ipBlock': {
                            'cidr': source['cidr']
                        }
                    })
 
            policy['spec']['ingress'].append(ingress_rule)
 
        # Build egress rules
        if allowed_egress:
            for dest in allowed_egress:
                egress_rule = {'to': [], 'ports': []}
 
                if dest.get('namespace'):
                    egress_rule['to'].append({
                        'namespaceSelector': {
                            'matchLabels': {
                                'kubernetes.io/metadata.name': dest['namespace']
                            }
                        },
                        'podSelector': {
                            'matchLabels': dest.get('labels', {})
                        }
                    })
                elif dest.get('cidr'):
                    egress_rule['to'].append({
                        'ipBlock': {
                            'cidr': dest['cidr'],
                            'except': dest.get('except', [])
                        }
                    })
 
                for port_spec in egress_ports:
                    if port_spec.get('service') == dest.get('service'):
                        egress_rule['ports'].append({
                            'port': port_spec['port'],
                            'protocol': port_spec.get('protocol', 'TCP')
                        })
 
                policy['spec']['egress'].append(egress_rule)
 
        return policy
 
    def generate_namespace_isolation(self) -> list[dict]:
        """Generate complete namespace isolation policies."""
        policies = [
            self.generate_default_deny_all(),
            self.generate_allow_dns()
        ]
 
        # Allow monitoring namespace to scrape metrics
        policies.append({
            'apiVersion': 'networking.k8s.io/v1',
            'kind': 'NetworkPolicy',
            'metadata': {
                'name': 'allow-prometheus-scrape',
                'namespace': self.namespace
            },
            'spec': {
                'podSelector': {},
                'policyTypes': ['Ingress'],
                'ingress': [{
                    'from': [{
                        'namespaceSelector': {
                            'matchLabels': {
                                'kubernetes.io/metadata.name': 'monitoring'
                            }
                        },
                        'podSelector': {
                            'matchLabels': {
                                'app': 'prometheus'
                            }
                        }
                    }],
                    'ports': [{'port': 9090, 'protocol': 'TCP'}]
                }]
            }
        })
 
        return policies
 
# Example usage
generator = NetworkPolicyGenerator('production')
 
# Generate policies for a typical 3-tier application
api_policy = generator.generate_microservice_policy(
    service_name='api-gateway',
    allowed_ingress=[
        {'namespace': 'ingress-nginx', 'labels': {'app': 'ingress-nginx'}},
        {'cidr': '10.0.0.0/8'}  # Internal load balancer
    ],
    allowed_egress=[
        {'namespace': 'production', 'labels': {'app': 'user-service'}, 'service': 'user-service'},
        {'namespace': 'production', 'labels': {'app': 'order-service'}, 'service': 'order-service'}
    ],
    ingress_ports=[8080, 8443],
    egress_ports=[
        {'service': 'user-service', 'port': 8080},
        {'service': 'order-service', 'port': 8080}
    ]
)
 
print(yaml.dump(api_policy, default_flow_style=False))

Runtime Security Monitor

Detect anomalous container behavior at runtime:

import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Callable
import json
 
@dataclass
class SecurityEvent:
    timestamp: datetime
    pod_name: str
    namespace: str
    container: str
    event_type: str
    severity: str
    description: str
    raw_data: dict
 
class RuntimeSecurityMonitor:
    def __init__(self):
        self.rules = self._load_rules()
        self.event_handlers: list[Callable] = []
        self.baseline_behaviors: dict = {}
 
    def _load_rules(self) -> list[dict]:
        """Load Falco-style runtime rules."""
        return [
            {
                'name': 'Terminal shell in container',
                'condition': lambda e: e.get('syscall') == 'execve' and
                            e.get('proc.name') in ['bash', 'sh', 'zsh'],
                'severity': 'WARNING',
                'description': 'Interactive shell spawned in container'
            },
            {
                'name': 'Write below /etc',
                'condition': lambda e: e.get('syscall') in ['open', 'openat'] and
                            e.get('fd.name', '').startswith('/etc/') and
                            'O_WRONLY' in e.get('flags', ''),
                'severity': 'ERROR',
                'description': 'Write to /etc directory detected'
            },
            {
                'name': 'Sensitive file access',
                'condition': lambda e: e.get('fd.name') in [
                    '/etc/shadow', '/etc/passwd', '/etc/sudoers',
                    '/root/.ssh/id_rsa', '/root/.bash_history'
                ],
                'severity': 'CRITICAL',
                'description': 'Access to sensitive file detected'
            },
            {
                'name': 'Network tool execution',
                'condition': lambda e: e.get('proc.name') in [
                    'nc', 'netcat', 'ncat', 'nmap', 'tcpdump', 'wireshark'
                ],
                'severity': 'WARNING',
                'description': 'Network reconnaissance tool executed'
            },
            {
                'name': 'Privilege escalation attempt',
                'condition': lambda e: e.get('proc.name') in ['sudo', 'su', 'pkexec'] or
                            e.get('syscall') == 'setuid',
                'severity': 'CRITICAL',
                'description': 'Privilege escalation attempt detected'
            },
            {
                'name': 'Cryptocurrency miner indicators',
                'condition': lambda e: any(
                    indicator in str(e.get('proc.args', ''))
                    for indicator in ['stratum+tcp', 'xmrig', 'minerd', 'cryptonight']
                ),
                'severity': 'CRITICAL',
                'description': 'Cryptocurrency mining activity detected'
            },
            {
                'name': 'Container drift',
                'condition': lambda e: e.get('syscall') == 'execve' and
                            e.get('proc.name') not in e.get('baseline_processes', []),
                'severity': 'WARNING',
                'description': 'New process not in container baseline'
            },
            {
                'name': 'Outbound connection to unusual port',
                'condition': lambda e: e.get('syscall') == 'connect' and
                            e.get('fd.rport') not in [80, 443, 53, 8080, 8443, 5432, 3306, 6379],
                'severity': 'WARNING',
                'description': 'Outbound connection to non-standard port'
            }
        ]
 
    def add_handler(self, handler: Callable):
        """Add event handler for security events."""
        self.event_handlers.append(handler)
 
    def set_baseline(self, pod_name: str, baseline: dict):
        """Set behavioral baseline for a pod."""
        self.baseline_behaviors[pod_name] = baseline
 
    async def process_event(self, raw_event: dict):
        """Process a syscall event from audit log."""
        # Enrich event with baseline data
        pod_name = raw_event.get('k8s.pod.name', '')
        if pod_name in self.baseline_behaviors:
            raw_event['baseline_processes'] = self.baseline_behaviors[pod_name].get('processes', [])
 
        # Check against rules
        for rule in self.rules:
            try:
                if rule['condition'](raw_event):
                    security_event = SecurityEvent(
                        timestamp=datetime.utcnow(),
                        pod_name=pod_name,
                        namespace=raw_event.get('k8s.ns.name', ''),
                        container=raw_event.get('container.name', ''),
                        event_type=rule['name'],
                        severity=rule['severity'],
                        description=rule['description'],
                        raw_data=raw_event
                    )
 
                    # Notify handlers
                    for handler in self.event_handlers:
                        await handler(security_event)
            except Exception as e:
                print(f"Rule evaluation error: {e}")
 
    async def start_monitoring(self, event_source):
        """Start processing events from source."""
        async for event in event_source:
            await self.process_event(event)
 
async def alert_handler(event: SecurityEvent):
    """Send alerts for security events."""
    alert = {
        'timestamp': event.timestamp.isoformat(),
        'severity': event.severity,
        'pod': f"{event.namespace}/{event.pod_name}",
        'container': event.container,
        'event': event.event_type,
        'description': event.description
    }
 
    print(f"SECURITY ALERT: {json.dumps(alert, indent=2)}")
 
    # In production, send to SIEM/alerting system
    if event.severity == 'CRITICAL':
        # Trigger immediate response
        pass
 
# Initialize monitor
monitor = RuntimeSecurityMonitor()
monitor.add_handler(alert_handler)
 
# Set baseline for known good pod
monitor.set_baseline('api-server-abc123', {
    'processes': ['node', 'npm', 'tini'],
    'network_ports': [8080],
    'file_paths': ['/app', '/tmp']
})

Pod Security Standards Validator

Validate pods against Kubernetes Pod Security Standards:

from enum import Enum
from typing import Optional
 
class PSSLevel(Enum):
    PRIVILEGED = "privileged"
    BASELINE = "baseline"
    RESTRICTED = "restricted"
 
class PodSecurityStandardsValidator:
    def __init__(self, level: PSSLevel = PSSLevel.RESTRICTED):
        self.level = level
 
    def validate(self, pod_spec: dict) -> tuple[bool, list[str]]:
        """Validate pod spec against PSS level."""
        if self.level == PSSLevel.PRIVILEGED:
            return True, []
 
        violations = []
 
        if self.level in [PSSLevel.BASELINE, PSSLevel.RESTRICTED]:
            violations.extend(self._check_baseline(pod_spec))
 
        if self.level == PSSLevel.RESTRICTED:
            violations.extend(self._check_restricted(pod_spec))
 
        return len(violations) == 0, violations
 
    def _check_baseline(self, pod_spec: dict) -> list[str]:
        """Check baseline level requirements."""
        violations = []
 
        # Host namespaces
        if pod_spec.get('hostNetwork'):
            violations.append("hostNetwork must be false")
        if pod_spec.get('hostPID'):
            violations.append("hostPID must be false")
        if pod_spec.get('hostIPC'):
            violations.append("hostIPC must be false")
 
        for container in self._get_all_containers(pod_spec):
            name = container.get('name', 'unknown')
            sc = container.get('securityContext', {})
 
            # Privileged
            if sc.get('privileged'):
                violations.append(f"{name}: privileged must be false")
 
            # Capabilities
            caps = sc.get('capabilities', {})
            add_caps = caps.get('add', [])
 
            allowed_caps = ['AUDIT_WRITE', 'CHOWN', 'DAC_OVERRIDE', 'FOWNER',
                          'FSETID', 'KILL', 'MKNOD', 'NET_BIND_SERVICE',
                          'SETFCAP', 'SETGID', 'SETPCAP', 'SETUID', 'SYS_CHROOT']
 
            for cap in add_caps:
                if cap not in allowed_caps:
                    violations.append(f"{name}: capability {cap} not allowed")
 
            # HostPath volumes
            for volume in pod_spec.get('volumes', []):
                if volume.get('hostPath'):
                    path = volume['hostPath'].get('path', '')
                    violations.append(f"hostPath volume not allowed: {path}")
 
            # Host ports
            for port in container.get('ports', []):
                if port.get('hostPort'):
                    violations.append(f"{name}: hostPort not allowed")
 
            # Proc mount
            if sc.get('procMount') and sc['procMount'] != 'Default':
                violations.append(f"{name}: procMount must be Default")
 
            # Seccomp
            seccomp = sc.get('seccompProfile', {})
            pod_seccomp = pod_spec.get('securityContext', {}).get('seccompProfile', {})
 
            if not seccomp and not pod_seccomp:
                pass  # Will be checked in restricted
            elif seccomp.get('type') == 'Unconfined' or pod_seccomp.get('type') == 'Unconfined':
                violations.append(f"{name}: seccomp Unconfined not allowed")
 
        return violations
 
    def _check_restricted(self, pod_spec: dict) -> list[str]:
        """Check restricted level requirements."""
        violations = []
 
        pod_sc = pod_spec.get('securityContext', {})
 
        for container in self._get_all_containers(pod_spec):
            name = container.get('name', 'unknown')
            sc = container.get('securityContext', {})
 
            # Must run as non-root
            run_as_non_root = sc.get('runAsNonRoot', pod_sc.get('runAsNonRoot'))
            if run_as_non_root is not True:
                violations.append(f"{name}: runAsNonRoot must be true")
 
            # Run as user > 0
            run_as_user = sc.get('runAsUser', pod_sc.get('runAsUser'))
            if run_as_user is not None and run_as_user == 0:
                violations.append(f"{name}: runAsUser must not be 0")
 
            # Seccomp profile required
            seccomp = sc.get('seccompProfile', {})
            pod_seccomp = pod_sc.get('seccompProfile', {})
 
            valid_types = ['RuntimeDefault', 'Localhost']
            if seccomp.get('type') not in valid_types and pod_seccomp.get('type') not in valid_types:
                violations.append(f"{name}: seccompProfile must be RuntimeDefault or Localhost")
 
            # Capabilities must drop ALL
            caps = sc.get('capabilities', {})
            drop_caps = caps.get('drop', [])
 
            if 'ALL' not in drop_caps:
                violations.append(f"{name}: must drop ALL capabilities")
 
            # Only NET_BIND_SERVICE can be added
            add_caps = caps.get('add', [])
            for cap in add_caps:
                if cap != 'NET_BIND_SERVICE':
                    violations.append(f"{name}: only NET_BIND_SERVICE can be added")
 
            # Privilege escalation
            if sc.get('allowPrivilegeEscalation') is not False:
                violations.append(f"{name}: allowPrivilegeEscalation must be false")
 
        # Volume types restricted
        allowed_volume_types = [
            'configMap', 'csi', 'downwardAPI', 'emptyDir', 'ephemeral',
            'persistentVolumeClaim', 'projected', 'secret'
        ]
 
        for volume in pod_spec.get('volumes', []):
            for vol_type in volume.keys():
                if vol_type != 'name' and vol_type not in allowed_volume_types:
                    violations.append(f"Volume type {vol_type} not allowed")
 
        return violations
 
    def _get_all_containers(self, pod_spec: dict) -> list[dict]:
        """Get all containers including init and ephemeral."""
        containers = pod_spec.get('containers', [])
        init_containers = pod_spec.get('initContainers', [])
        ephemeral_containers = pod_spec.get('ephemeralContainers', [])
        return containers + init_containers + ephemeral_containers
 
# Example usage
validator = PodSecurityStandardsValidator(PSSLevel.RESTRICTED)
 
pod_spec = {
    'securityContext': {
        'runAsNonRoot': True,
        'seccompProfile': {'type': 'RuntimeDefault'}
    },
    'containers': [{
        'name': 'app',
        'image': 'myapp:1.0',
        'securityContext': {
            'allowPrivilegeEscalation': False,
            'readOnlyRootFilesystem': True,
            'capabilities': {
                'drop': ['ALL']
            }
        }
    }]
}
 
passed, violations = validator.validate(pod_spec)
print(f"PSS Restricted Compliant: {passed}")
for v in violations:
    print(f"  - {v}")

Conclusion

Container security in Kubernetes requires multiple layers of defense. Implement image scanning during CI/CD, enforce policies at admission time, apply network segmentation, and monitor runtime behavior. Use Pod Security Standards to establish baseline security postures and regularly audit your clusters for compliance. Remember that security is an ongoing process - continuously update your policies as new threats emerge and your applications evolve.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.