Container security in Kubernetes requires a defense-in-depth approach spanning image build, deployment admission, runtime protection, and network segmentation. This guide provides production-ready implementations for securing your Kubernetes workloads.
Image Security Scanner
Scan container images for vulnerabilities before deployment:
import subprocess
import json
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class Severity(Enum):
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
LOW = "LOW"
UNKNOWN = "UNKNOWN"
@dataclass
class Vulnerability:
id: str
package: str
installed_version: str
fixed_version: Optional[str]
severity: Severity
title: str
description: str
@dataclass
class ScanResult:
image: str
vulnerabilities: list[Vulnerability]
passed: bool
critical_count: int
high_count: int
class ContainerSecurityScanner:
def __init__(self, config: dict):
self.config = config
self.severity_threshold = config.get('severity_threshold', 'HIGH')
self.max_critical = config.get('max_critical', 0)
self.max_high = config.get('max_high', 5)
self.ignore_unfixed = config.get('ignore_unfixed', False)
def scan_image(self, image: str) -> ScanResult:
"""Scan container image using Trivy."""
cmd = [
'trivy', 'image',
'--format', 'json',
'--severity', 'CRITICAL,HIGH,MEDIUM,LOW',
image
]
if self.ignore_unfixed:
cmd.append('--ignore-unfixed')
result = subprocess.run(cmd, capture_output=True, text=True)
scan_data = json.loads(result.stdout)
vulnerabilities = []
critical_count = 0
high_count = 0
for result_item in scan_data.get('Results', []):
for vuln in result_item.get('Vulnerabilities', []):
severity = Severity(vuln.get('Severity', 'UNKNOWN'))
vulnerability = Vulnerability(
id=vuln['VulnerabilityID'],
package=vuln['PkgName'],
installed_version=vuln['InstalledVersion'],
fixed_version=vuln.get('FixedVersion'),
severity=severity,
title=vuln.get('Title', ''),
description=vuln.get('Description', '')
)
vulnerabilities.append(vulnerability)
if severity == Severity.CRITICAL:
critical_count += 1
elif severity == Severity.HIGH:
high_count += 1
passed = (
critical_count <= self.max_critical and
high_count <= self.max_high
)
return ScanResult(
image=image,
vulnerabilities=vulnerabilities,
passed=passed,
critical_count=critical_count,
high_count=high_count
)
def generate_sbom(self, image: str) -> dict:
"""Generate Software Bill of Materials for image."""
cmd = [
'syft', image,
'-o', 'cyclonedx-json'
]
result = subprocess.run(cmd, capture_output=True, text=True)
return json.loads(result.stdout)
def check_base_image(self, dockerfile_path: str) -> dict:
"""Analyze base image security."""
with open(dockerfile_path, 'r') as f:
content = f.read()
issues = []
# Check for latest tag
if ':latest' in content or 'FROM ' in content and ':' not in content.split('FROM ')[1].split()[0]:
issues.append({
'severity': 'HIGH',
'message': 'Using latest or untagged base image - pin to specific version'
})
# Check for root user
if 'USER' not in content:
issues.append({
'severity': 'MEDIUM',
'message': 'No USER instruction - container may run as root'
})
# Check for COPY vs ADD
if 'ADD ' in content and 'http' not in content:
issues.append({
'severity': 'LOW',
'message': 'Use COPY instead of ADD for local files'
})
# Check for secrets in build
secret_patterns = ['PASSWORD', 'SECRET', 'API_KEY', 'TOKEN']
for pattern in secret_patterns:
if f'ENV {pattern}' in content or f'ARG {pattern}' in content:
issues.append({
'severity': 'CRITICAL',
'message': f'Potential secret in Dockerfile: {pattern}'
})
return {
'dockerfile': dockerfile_path,
'issues': issues,
'passed': not any(i['severity'] == 'CRITICAL' for i in issues)
}Kubernetes Admission Controller
Implement policy enforcement at admission time:
from flask import Flask, request, jsonify
import base64
import json
from typing import Optional
app = Flask(__name__)
class AdmissionController:
def __init__(self):
self.policies = self._load_policies()
def _load_policies(self) -> dict:
return {
'require_resource_limits': True,
'require_security_context': True,
'deny_privileged': True,
'deny_host_network': True,
'deny_host_pid': True,
'allowed_registries': [
'gcr.io/my-project',
'docker.io/library',
'ghcr.io/my-org'
],
'required_labels': ['app', 'owner', 'environment'],
'max_cpu_limit': '4',
'max_memory_limit': '8Gi',
'deny_latest_tag': True
}
def validate_pod(self, pod_spec: dict, metadata: dict) -> tuple[bool, list[str]]:
"""Validate pod against security policies."""
violations = []
# Check required labels
labels = metadata.get('labels', {})
for required_label in self.policies['required_labels']:
if required_label not in labels:
violations.append(f"Missing required label: {required_label}")
containers = pod_spec.get('containers', [])
init_containers = pod_spec.get('initContainers', [])
all_containers = containers + init_containers
for container in all_containers:
name = container.get('name', 'unknown')
# Check image registry
image = container.get('image', '')
if self.policies['allowed_registries']:
allowed = any(
image.startswith(reg)
for reg in self.policies['allowed_registries']
)
if not allowed:
violations.append(
f"Container {name}: Image from unauthorized registry: {image}"
)
# Check for latest tag
if self.policies['deny_latest_tag']:
if ':latest' in image or ':' not in image.split('/')[-1]:
violations.append(
f"Container {name}: Using latest or untagged image"
)
# Check resource limits
if self.policies['require_resource_limits']:
resources = container.get('resources', {})
limits = resources.get('limits', {})
requests = resources.get('requests', {})
if not limits.get('cpu') or not limits.get('memory'):
violations.append(
f"Container {name}: Missing resource limits"
)
if not requests.get('cpu') or not requests.get('memory'):
violations.append(
f"Container {name}: Missing resource requests"
)
# Check security context
security_context = container.get('securityContext', {})
if self.policies['require_security_context']:
if security_context.get('runAsNonRoot') is not True:
violations.append(
f"Container {name}: Must set runAsNonRoot: true"
)
if security_context.get('readOnlyRootFilesystem') is not True:
violations.append(
f"Container {name}: Must set readOnlyRootFilesystem: true"
)
if security_context.get('allowPrivilegeEscalation') is not False:
violations.append(
f"Container {name}: Must set allowPrivilegeEscalation: false"
)
if self.policies['deny_privileged']:
if security_context.get('privileged'):
violations.append(
f"Container {name}: Privileged containers not allowed"
)
# Check pod-level security
if self.policies['deny_host_network']:
if pod_spec.get('hostNetwork'):
violations.append("Host network access not allowed")
if self.policies['deny_host_pid']:
if pod_spec.get('hostPID'):
violations.append("Host PID namespace not allowed")
return len(violations) == 0, violations
admission_controller = AdmissionController()
@app.route('/validate', methods=['POST'])
def validate():
"""Kubernetes admission webhook endpoint."""
admission_review = request.get_json()
request_obj = admission_review['request']
uid = request_obj['uid']
# Extract pod spec based on resource type
kind = request_obj['kind']['kind']
obj = request_obj.get('object', {})
if kind == 'Pod':
pod_spec = obj.get('spec', {})
metadata = obj.get('metadata', {})
elif kind in ['Deployment', 'StatefulSet', 'DaemonSet', 'ReplicaSet']:
pod_spec = obj.get('spec', {}).get('template', {}).get('spec', {})
metadata = obj.get('spec', {}).get('template', {}).get('metadata', {})
else:
# Allow non-pod resources
return jsonify({
'apiVersion': 'admission.k8s.io/v1',
'kind': 'AdmissionReview',
'response': {
'uid': uid,
'allowed': True
}
})
allowed, violations = admission_controller.validate_pod(pod_spec, metadata)
response = {
'apiVersion': 'admission.k8s.io/v1',
'kind': 'AdmissionReview',
'response': {
'uid': uid,
'allowed': allowed
}
}
if not allowed:
response['response']['status'] = {
'code': 403,
'message': 'Policy violations: ' + '; '.join(violations)
}
return jsonify(response)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8443, ssl_context='adhoc')Network Policy Generator
Create comprehensive network policies:
from dataclasses import dataclass
from typing import Optional
import yaml
@dataclass
class NetworkPolicyRule:
namespace: str
pod_selector: dict
ingress_from: Optional[list] = None
egress_to: Optional[list] = None
ports: Optional[list] = None
class NetworkPolicyGenerator:
def __init__(self, namespace: str):
self.namespace = namespace
def generate_default_deny_all(self) -> dict:
"""Generate default deny-all policy."""
return {
'apiVersion': 'networking.k8s.io/v1',
'kind': 'NetworkPolicy',
'metadata': {
'name': 'default-deny-all',
'namespace': self.namespace
},
'spec': {
'podSelector': {},
'policyTypes': ['Ingress', 'Egress']
}
}
def generate_allow_dns(self) -> dict:
"""Allow DNS resolution for all pods."""
return {
'apiVersion': 'networking.k8s.io/v1',
'kind': 'NetworkPolicy',
'metadata': {
'name': 'allow-dns',
'namespace': self.namespace
},
'spec': {
'podSelector': {},
'policyTypes': ['Egress'],
'egress': [{
'to': [{
'namespaceSelector': {
'matchLabels': {
'kubernetes.io/metadata.name': 'kube-system'
}
},
'podSelector': {
'matchLabels': {
'k8s-app': 'kube-dns'
}
}
}],
'ports': [
{'protocol': 'UDP', 'port': 53},
{'protocol': 'TCP', 'port': 53}
]
}]
}
}
def generate_microservice_policy(
self,
service_name: str,
allowed_ingress: list[dict],
allowed_egress: list[dict],
ingress_ports: list[int],
egress_ports: list[dict]
) -> dict:
"""Generate policy for a microservice."""
policy = {
'apiVersion': 'networking.k8s.io/v1',
'kind': 'NetworkPolicy',
'metadata': {
'name': f'{service_name}-policy',
'namespace': self.namespace
},
'spec': {
'podSelector': {
'matchLabels': {
'app': service_name
}
},
'policyTypes': ['Ingress', 'Egress'],
'ingress': [],
'egress': []
}
}
# Build ingress rules
if allowed_ingress:
ingress_rule = {
'from': [],
'ports': [{'port': p, 'protocol': 'TCP'} for p in ingress_ports]
}
for source in allowed_ingress:
if source.get('namespace'):
ingress_rule['from'].append({
'namespaceSelector': {
'matchLabels': {
'kubernetes.io/metadata.name': source['namespace']
}
},
'podSelector': {
'matchLabels': source.get('labels', {})
}
})
elif source.get('cidr'):
ingress_rule['from'].append({
'ipBlock': {
'cidr': source['cidr']
}
})
policy['spec']['ingress'].append(ingress_rule)
# Build egress rules
if allowed_egress:
for dest in allowed_egress:
egress_rule = {'to': [], 'ports': []}
if dest.get('namespace'):
egress_rule['to'].append({
'namespaceSelector': {
'matchLabels': {
'kubernetes.io/metadata.name': dest['namespace']
}
},
'podSelector': {
'matchLabels': dest.get('labels', {})
}
})
elif dest.get('cidr'):
egress_rule['to'].append({
'ipBlock': {
'cidr': dest['cidr'],
'except': dest.get('except', [])
}
})
for port_spec in egress_ports:
if port_spec.get('service') == dest.get('service'):
egress_rule['ports'].append({
'port': port_spec['port'],
'protocol': port_spec.get('protocol', 'TCP')
})
policy['spec']['egress'].append(egress_rule)
return policy
def generate_namespace_isolation(self) -> list[dict]:
"""Generate complete namespace isolation policies."""
policies = [
self.generate_default_deny_all(),
self.generate_allow_dns()
]
# Allow monitoring namespace to scrape metrics
policies.append({
'apiVersion': 'networking.k8s.io/v1',
'kind': 'NetworkPolicy',
'metadata': {
'name': 'allow-prometheus-scrape',
'namespace': self.namespace
},
'spec': {
'podSelector': {},
'policyTypes': ['Ingress'],
'ingress': [{
'from': [{
'namespaceSelector': {
'matchLabels': {
'kubernetes.io/metadata.name': 'monitoring'
}
},
'podSelector': {
'matchLabels': {
'app': 'prometheus'
}
}
}],
'ports': [{'port': 9090, 'protocol': 'TCP'}]
}]
}
})
return policies
# Example usage
generator = NetworkPolicyGenerator('production')
# Generate policies for a typical 3-tier application
api_policy = generator.generate_microservice_policy(
service_name='api-gateway',
allowed_ingress=[
{'namespace': 'ingress-nginx', 'labels': {'app': 'ingress-nginx'}},
{'cidr': '10.0.0.0/8'} # Internal load balancer
],
allowed_egress=[
{'namespace': 'production', 'labels': {'app': 'user-service'}, 'service': 'user-service'},
{'namespace': 'production', 'labels': {'app': 'order-service'}, 'service': 'order-service'}
],
ingress_ports=[8080, 8443],
egress_ports=[
{'service': 'user-service', 'port': 8080},
{'service': 'order-service', 'port': 8080}
]
)
print(yaml.dump(api_policy, default_flow_style=False))Runtime Security Monitor
Detect anomalous container behavior at runtime:
import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Callable
import json
@dataclass
class SecurityEvent:
timestamp: datetime
pod_name: str
namespace: str
container: str
event_type: str
severity: str
description: str
raw_data: dict
class RuntimeSecurityMonitor:
def __init__(self):
self.rules = self._load_rules()
self.event_handlers: list[Callable] = []
self.baseline_behaviors: dict = {}
def _load_rules(self) -> list[dict]:
"""Load Falco-style runtime rules."""
return [
{
'name': 'Terminal shell in container',
'condition': lambda e: e.get('syscall') == 'execve' and
e.get('proc.name') in ['bash', 'sh', 'zsh'],
'severity': 'WARNING',
'description': 'Interactive shell spawned in container'
},
{
'name': 'Write below /etc',
'condition': lambda e: e.get('syscall') in ['open', 'openat'] and
e.get('fd.name', '').startswith('/etc/') and
'O_WRONLY' in e.get('flags', ''),
'severity': 'ERROR',
'description': 'Write to /etc directory detected'
},
{
'name': 'Sensitive file access',
'condition': lambda e: e.get('fd.name') in [
'/etc/shadow', '/etc/passwd', '/etc/sudoers',
'/root/.ssh/id_rsa', '/root/.bash_history'
],
'severity': 'CRITICAL',
'description': 'Access to sensitive file detected'
},
{
'name': 'Network tool execution',
'condition': lambda e: e.get('proc.name') in [
'nc', 'netcat', 'ncat', 'nmap', 'tcpdump', 'wireshark'
],
'severity': 'WARNING',
'description': 'Network reconnaissance tool executed'
},
{
'name': 'Privilege escalation attempt',
'condition': lambda e: e.get('proc.name') in ['sudo', 'su', 'pkexec'] or
e.get('syscall') == 'setuid',
'severity': 'CRITICAL',
'description': 'Privilege escalation attempt detected'
},
{
'name': 'Cryptocurrency miner indicators',
'condition': lambda e: any(
indicator in str(e.get('proc.args', ''))
for indicator in ['stratum+tcp', 'xmrig', 'minerd', 'cryptonight']
),
'severity': 'CRITICAL',
'description': 'Cryptocurrency mining activity detected'
},
{
'name': 'Container drift',
'condition': lambda e: e.get('syscall') == 'execve' and
e.get('proc.name') not in e.get('baseline_processes', []),
'severity': 'WARNING',
'description': 'New process not in container baseline'
},
{
'name': 'Outbound connection to unusual port',
'condition': lambda e: e.get('syscall') == 'connect' and
e.get('fd.rport') not in [80, 443, 53, 8080, 8443, 5432, 3306, 6379],
'severity': 'WARNING',
'description': 'Outbound connection to non-standard port'
}
]
def add_handler(self, handler: Callable):
"""Add event handler for security events."""
self.event_handlers.append(handler)
def set_baseline(self, pod_name: str, baseline: dict):
"""Set behavioral baseline for a pod."""
self.baseline_behaviors[pod_name] = baseline
async def process_event(self, raw_event: dict):
"""Process a syscall event from audit log."""
# Enrich event with baseline data
pod_name = raw_event.get('k8s.pod.name', '')
if pod_name in self.baseline_behaviors:
raw_event['baseline_processes'] = self.baseline_behaviors[pod_name].get('processes', [])
# Check against rules
for rule in self.rules:
try:
if rule['condition'](raw_event):
security_event = SecurityEvent(
timestamp=datetime.utcnow(),
pod_name=pod_name,
namespace=raw_event.get('k8s.ns.name', ''),
container=raw_event.get('container.name', ''),
event_type=rule['name'],
severity=rule['severity'],
description=rule['description'],
raw_data=raw_event
)
# Notify handlers
for handler in self.event_handlers:
await handler(security_event)
except Exception as e:
print(f"Rule evaluation error: {e}")
async def start_monitoring(self, event_source):
"""Start processing events from source."""
async for event in event_source:
await self.process_event(event)
async def alert_handler(event: SecurityEvent):
"""Send alerts for security events."""
alert = {
'timestamp': event.timestamp.isoformat(),
'severity': event.severity,
'pod': f"{event.namespace}/{event.pod_name}",
'container': event.container,
'event': event.event_type,
'description': event.description
}
print(f"SECURITY ALERT: {json.dumps(alert, indent=2)}")
# In production, send to SIEM/alerting system
if event.severity == 'CRITICAL':
# Trigger immediate response
pass
# Initialize monitor
monitor = RuntimeSecurityMonitor()
monitor.add_handler(alert_handler)
# Set baseline for known good pod
monitor.set_baseline('api-server-abc123', {
'processes': ['node', 'npm', 'tini'],
'network_ports': [8080],
'file_paths': ['/app', '/tmp']
})Pod Security Standards Validator
Validate pods against Kubernetes Pod Security Standards:
from enum import Enum
from typing import Optional
class PSSLevel(Enum):
PRIVILEGED = "privileged"
BASELINE = "baseline"
RESTRICTED = "restricted"
class PodSecurityStandardsValidator:
def __init__(self, level: PSSLevel = PSSLevel.RESTRICTED):
self.level = level
def validate(self, pod_spec: dict) -> tuple[bool, list[str]]:
"""Validate pod spec against PSS level."""
if self.level == PSSLevel.PRIVILEGED:
return True, []
violations = []
if self.level in [PSSLevel.BASELINE, PSSLevel.RESTRICTED]:
violations.extend(self._check_baseline(pod_spec))
if self.level == PSSLevel.RESTRICTED:
violations.extend(self._check_restricted(pod_spec))
return len(violations) == 0, violations
def _check_baseline(self, pod_spec: dict) -> list[str]:
"""Check baseline level requirements."""
violations = []
# Host namespaces
if pod_spec.get('hostNetwork'):
violations.append("hostNetwork must be false")
if pod_spec.get('hostPID'):
violations.append("hostPID must be false")
if pod_spec.get('hostIPC'):
violations.append("hostIPC must be false")
for container in self._get_all_containers(pod_spec):
name = container.get('name', 'unknown')
sc = container.get('securityContext', {})
# Privileged
if sc.get('privileged'):
violations.append(f"{name}: privileged must be false")
# Capabilities
caps = sc.get('capabilities', {})
add_caps = caps.get('add', [])
allowed_caps = ['AUDIT_WRITE', 'CHOWN', 'DAC_OVERRIDE', 'FOWNER',
'FSETID', 'KILL', 'MKNOD', 'NET_BIND_SERVICE',
'SETFCAP', 'SETGID', 'SETPCAP', 'SETUID', 'SYS_CHROOT']
for cap in add_caps:
if cap not in allowed_caps:
violations.append(f"{name}: capability {cap} not allowed")
# HostPath volumes
for volume in pod_spec.get('volumes', []):
if volume.get('hostPath'):
path = volume['hostPath'].get('path', '')
violations.append(f"hostPath volume not allowed: {path}")
# Host ports
for port in container.get('ports', []):
if port.get('hostPort'):
violations.append(f"{name}: hostPort not allowed")
# Proc mount
if sc.get('procMount') and sc['procMount'] != 'Default':
violations.append(f"{name}: procMount must be Default")
# Seccomp
seccomp = sc.get('seccompProfile', {})
pod_seccomp = pod_spec.get('securityContext', {}).get('seccompProfile', {})
if not seccomp and not pod_seccomp:
pass # Will be checked in restricted
elif seccomp.get('type') == 'Unconfined' or pod_seccomp.get('type') == 'Unconfined':
violations.append(f"{name}: seccomp Unconfined not allowed")
return violations
def _check_restricted(self, pod_spec: dict) -> list[str]:
"""Check restricted level requirements."""
violations = []
pod_sc = pod_spec.get('securityContext', {})
for container in self._get_all_containers(pod_spec):
name = container.get('name', 'unknown')
sc = container.get('securityContext', {})
# Must run as non-root
run_as_non_root = sc.get('runAsNonRoot', pod_sc.get('runAsNonRoot'))
if run_as_non_root is not True:
violations.append(f"{name}: runAsNonRoot must be true")
# Run as user > 0
run_as_user = sc.get('runAsUser', pod_sc.get('runAsUser'))
if run_as_user is not None and run_as_user == 0:
violations.append(f"{name}: runAsUser must not be 0")
# Seccomp profile required
seccomp = sc.get('seccompProfile', {})
pod_seccomp = pod_sc.get('seccompProfile', {})
valid_types = ['RuntimeDefault', 'Localhost']
if seccomp.get('type') not in valid_types and pod_seccomp.get('type') not in valid_types:
violations.append(f"{name}: seccompProfile must be RuntimeDefault or Localhost")
# Capabilities must drop ALL
caps = sc.get('capabilities', {})
drop_caps = caps.get('drop', [])
if 'ALL' not in drop_caps:
violations.append(f"{name}: must drop ALL capabilities")
# Only NET_BIND_SERVICE can be added
add_caps = caps.get('add', [])
for cap in add_caps:
if cap != 'NET_BIND_SERVICE':
violations.append(f"{name}: only NET_BIND_SERVICE can be added")
# Privilege escalation
if sc.get('allowPrivilegeEscalation') is not False:
violations.append(f"{name}: allowPrivilegeEscalation must be false")
# Volume types restricted
allowed_volume_types = [
'configMap', 'csi', 'downwardAPI', 'emptyDir', 'ephemeral',
'persistentVolumeClaim', 'projected', 'secret'
]
for volume in pod_spec.get('volumes', []):
for vol_type in volume.keys():
if vol_type != 'name' and vol_type not in allowed_volume_types:
violations.append(f"Volume type {vol_type} not allowed")
return violations
def _get_all_containers(self, pod_spec: dict) -> list[dict]:
"""Get all containers including init and ephemeral."""
containers = pod_spec.get('containers', [])
init_containers = pod_spec.get('initContainers', [])
ephemeral_containers = pod_spec.get('ephemeralContainers', [])
return containers + init_containers + ephemeral_containers
# Example usage
validator = PodSecurityStandardsValidator(PSSLevel.RESTRICTED)
pod_spec = {
'securityContext': {
'runAsNonRoot': True,
'seccompProfile': {'type': 'RuntimeDefault'}
},
'containers': [{
'name': 'app',
'image': 'myapp:1.0',
'securityContext': {
'allowPrivilegeEscalation': False,
'readOnlyRootFilesystem': True,
'capabilities': {
'drop': ['ALL']
}
}
}]
}
passed, violations = validator.validate(pod_spec)
print(f"PSS Restricted Compliant: {passed}")
for v in violations:
print(f" - {v}")Conclusion
Container security in Kubernetes requires multiple layers of defense. Implement image scanning during CI/CD, enforce policies at admission time, apply network segmentation, and monitor runtime behavior. Use Pod Security Standards to establish baseline security postures and regularly audit your clusters for compliance. Remember that security is an ongoing process - continuously update your policies as new threats emerge and your applications evolve.