DevSecOps

Securitate containere: bune practici Kubernetes

Nicu Constantin
--13 min lectura
#kubernetes#container-security#devsecops#docker#pod-security

Securitatea containerelor in Kubernetes necesita o abordare de aparare in profunzime care acopera construirea imaginilor, admiterea la deployment, protectia runtime si segmentarea retelei. Acest ghid ofera implementari gata de productie pentru securizarea aplicatiilor Kubernetes.

Scanner de securitate pentru imagini

Scaneaza imaginile de container pentru vulnerabilitati inainte de deployment:

import subprocess
import json
from dataclasses import dataclass
from typing import Optional
from enum import Enum
 
class Severity(Enum):
    CRITICAL = "CRITICAL"
    HIGH = "HIGH"
    MEDIUM = "MEDIUM"
    LOW = "LOW"
    UNKNOWN = "UNKNOWN"
 
@dataclass
class Vulnerability:
    id: str
    package: str
    installed_version: str
    fixed_version: Optional[str]
    severity: Severity
    title: str
    description: str
 
@dataclass
class ScanResult:
    image: str
    vulnerabilities: list[Vulnerability]
    passed: bool
    critical_count: int
    high_count: int
 
class ContainerSecurityScanner:
    def __init__(self, config: dict):
        self.config = config
        self.severity_threshold = config.get('severity_threshold', 'HIGH')
        self.max_critical = config.get('max_critical', 0)
        self.max_high = config.get('max_high', 5)
        self.ignore_unfixed = config.get('ignore_unfixed', False)
 
    def scan_image(self, image: str) -> ScanResult:
        """Scaneaza imaginea de container folosind Trivy."""
        cmd = [
            'trivy', 'image',
            '--format', 'json',
            '--severity', 'CRITICAL,HIGH,MEDIUM,LOW',
            image
        ]
 
        if self.ignore_unfixed:
            cmd.append('--ignore-unfixed')
 
        result = subprocess.run(cmd, capture_output=True, text=True)
        scan_data = json.loads(result.stdout)
 
        vulnerabilities = []
        critical_count = 0
        high_count = 0
 
        for result_item in scan_data.get('Results', []):
            for vuln in result_item.get('Vulnerabilities', []):
                severity = Severity(vuln.get('Severity', 'UNKNOWN'))
 
                vulnerability = Vulnerability(
                    id=vuln['VulnerabilityID'],
                    package=vuln['PkgName'],
                    installed_version=vuln['InstalledVersion'],
                    fixed_version=vuln.get('FixedVersion'),
                    severity=severity,
                    title=vuln.get('Title', ''),
                    description=vuln.get('Description', '')
                )
                vulnerabilities.append(vulnerability)
 
                if severity == Severity.CRITICAL:
                    critical_count += 1
                elif severity == Severity.HIGH:
                    high_count += 1
 
        passed = (
            critical_count <= self.max_critical and
            high_count <= self.max_high
        )
 
        return ScanResult(
            image=image,
            vulnerabilities=vulnerabilities,
            passed=passed,
            critical_count=critical_count,
            high_count=high_count
        )
 
    def generate_sbom(self, image: str) -> dict:
        """Genereaza Software Bill of Materials pentru imagine."""
        cmd = [
            'syft', image,
            '-o', 'cyclonedx-json'
        ]
 
        result = subprocess.run(cmd, capture_output=True, text=True)
        return json.loads(result.stdout)
 
    def check_base_image(self, dockerfile_path: str) -> dict:
        """Analizeaza securitatea imaginii de baza."""
        with open(dockerfile_path, 'r') as f:
            content = f.read()
 
        issues = []
 
        # Verifica tag-ul latest
        if ':latest' in content or 'FROM ' in content and ':' not in content.split('FROM ')[1].split()[0]:
            issues.append({
                'severity': 'HIGH',
                'message': 'Foloseste imagine de baza latest sau fara tag - fixeaza la o versiune specifica'
            })
 
        # Verifica utilizatorul root
        if 'USER' not in content:
            issues.append({
                'severity': 'MEDIUM',
                'message': 'Lipseste instructiunea USER - containerul poate rula ca root'
            })
 
        # Verifica COPY vs ADD
        if 'ADD ' in content and 'http' not in content:
            issues.append({
                'severity': 'LOW',
                'message': 'Foloseste COPY in loc de ADD pentru fisiere locale'
            })
 
        # Verifica secrete in build
        secret_patterns = ['PASSWORD', 'SECRET', 'API_KEY', 'TOKEN']
        for pattern in secret_patterns:
            if f'ENV {pattern}' in content or f'ARG {pattern}' in content:
                issues.append({
                    'severity': 'CRITICAL',
                    'message': f'Potential secret in Dockerfile: {pattern}'
                })
 
        return {
            'dockerfile': dockerfile_path,
            'issues': issues,
            'passed': not any(i['severity'] == 'CRITICAL' for i in issues)
        }

Admission Controller pentru Kubernetes

Implementeaza aplicarea politicilor la momentul admiterii:

from flask import Flask, request, jsonify
import base64
import json
from typing import Optional
 
app = Flask(__name__)
 
class AdmissionController:
    def __init__(self):
        self.policies = self._load_policies()
 
    def _load_policies(self) -> dict:
        return {
            'require_resource_limits': True,
            'require_security_context': True,
            'deny_privileged': True,
            'deny_host_network': True,
            'deny_host_pid': True,
            'allowed_registries': [
                'gcr.io/my-project',
                'docker.io/library',
                'ghcr.io/my-org'
            ],
            'required_labels': ['app', 'owner', 'environment'],
            'max_cpu_limit': '4',
            'max_memory_limit': '8Gi',
            'deny_latest_tag': True
        }
 
    def validate_pod(self, pod_spec: dict, metadata: dict) -> tuple[bool, list[str]]:
        """Valideaza pod-ul conform politicilor de securitate."""
        violations = []
 
        # Verifica etichetele obligatorii
        labels = metadata.get('labels', {})
        for required_label in self.policies['required_labels']:
            if required_label not in labels:
                violations.append(f"Eticheta obligatorie lipsa: {required_label}")
 
        containers = pod_spec.get('containers', [])
        init_containers = pod_spec.get('initContainers', [])
        all_containers = containers + init_containers
 
        for container in all_containers:
            name = container.get('name', 'unknown')
 
            # Verifica registrul imaginii
            image = container.get('image', '')
            if self.policies['allowed_registries']:
                allowed = any(
                    image.startswith(reg)
                    for reg in self.policies['allowed_registries']
                )
                if not allowed:
                    violations.append(
                        f"Container {name}: Imagine din registru neautorizat: {image}"
                    )
 
            # Verifica tag-ul latest
            if self.policies['deny_latest_tag']:
                if ':latest' in image or ':' not in image.split('/')[-1]:
                    violations.append(
                        f"Container {name}: Foloseste imagine latest sau fara tag"
                    )
 
            # Verifica limitele de resurse
            if self.policies['require_resource_limits']:
                resources = container.get('resources', {})
                limits = resources.get('limits', {})
                requests = resources.get('requests', {})
 
                if not limits.get('cpu') or not limits.get('memory'):
                    violations.append(
                        f"Container {name}: Lipsesc limitele de resurse"
                    )
 
                if not requests.get('cpu') or not requests.get('memory'):
                    violations.append(
                        f"Container {name}: Lipsesc cererile de resurse"
                    )
 
            # Verifica contextul de securitate
            security_context = container.get('securityContext', {})
 
            if self.policies['require_security_context']:
                if security_context.get('runAsNonRoot') is not True:
                    violations.append(
                        f"Container {name}: Trebuie setat runAsNonRoot: true"
                    )
 
                if security_context.get('readOnlyRootFilesystem') is not True:
                    violations.append(
                        f"Container {name}: Trebuie setat readOnlyRootFilesystem: true"
                    )
 
                if security_context.get('allowPrivilegeEscalation') is not False:
                    violations.append(
                        f"Container {name}: Trebuie setat allowPrivilegeEscalation: false"
                    )
 
            if self.policies['deny_privileged']:
                if security_context.get('privileged'):
                    violations.append(
                        f"Container {name}: Containerele privilegiate nu sunt permise"
                    )
 
        # Verifica securitatea la nivel de pod
        if self.policies['deny_host_network']:
            if pod_spec.get('hostNetwork'):
                violations.append("Accesul la reteaua gazdei nu este permis")
 
        if self.policies['deny_host_pid']:
            if pod_spec.get('hostPID'):
                violations.append("Namespace-ul PID al gazdei nu este permis")
 
        return len(violations) == 0, violations
 
admission_controller = AdmissionController()
 
@app.route('/validate', methods=['POST'])
def validate():
    """Endpoint webhook de admitere Kubernetes."""
    admission_review = request.get_json()
 
    request_obj = admission_review['request']
    uid = request_obj['uid']
 
    # Extrage specificatia pod-ului in functie de tipul resursei
    kind = request_obj['kind']['kind']
    obj = request_obj.get('object', {})
 
    if kind == 'Pod':
        pod_spec = obj.get('spec', {})
        metadata = obj.get('metadata', {})
    elif kind in ['Deployment', 'StatefulSet', 'DaemonSet', 'ReplicaSet']:
        pod_spec = obj.get('spec', {}).get('template', {}).get('spec', {})
        metadata = obj.get('spec', {}).get('template', {}).get('metadata', {})
    else:
        # Permite resursele care nu sunt pod-uri
        return jsonify({
            'apiVersion': 'admission.k8s.io/v1',
            'kind': 'AdmissionReview',
            'response': {
                'uid': uid,
                'allowed': True
            }
        })
 
    allowed, violations = admission_controller.validate_pod(pod_spec, metadata)
 
    response = {
        'apiVersion': 'admission.k8s.io/v1',
        'kind': 'AdmissionReview',
        'response': {
            'uid': uid,
            'allowed': allowed
        }
    }
 
    if not allowed:
        response['response']['status'] = {
            'code': 403,
            'message': 'Incalcari de politici: ' + '; '.join(violations)
        }
 
    return jsonify(response)
 
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8443, ssl_context='adhoc')

Generator de politici de retea

Creeaza politici de retea complete:

from dataclasses import dataclass
from typing import Optional
import yaml
 
@dataclass
class NetworkPolicyRule:
    namespace: str
    pod_selector: dict
    ingress_from: Optional[list] = None
    egress_to: Optional[list] = None
    ports: Optional[list] = None
 
class NetworkPolicyGenerator:
    def __init__(self, namespace: str):
        self.namespace = namespace
 
    def generate_default_deny_all(self) -> dict:
        """Genereaza politica implicita de blocare totala."""
        return {
            'apiVersion': 'networking.k8s.io/v1',
            'kind': 'NetworkPolicy',
            'metadata': {
                'name': 'default-deny-all',
                'namespace': self.namespace
            },
            'spec': {
                'podSelector': {},
                'policyTypes': ['Ingress', 'Egress']
            }
        }
 
    def generate_allow_dns(self) -> dict:
        """Permite rezolutia DNS pentru toate pod-urile."""
        return {
            'apiVersion': 'networking.k8s.io/v1',
            'kind': 'NetworkPolicy',
            'metadata': {
                'name': 'allow-dns',
                'namespace': self.namespace
            },
            'spec': {
                'podSelector': {},
                'policyTypes': ['Egress'],
                'egress': [{
                    'to': [{
                        'namespaceSelector': {
                            'matchLabels': {
                                'kubernetes.io/metadata.name': 'kube-system'
                            }
                        },
                        'podSelector': {
                            'matchLabels': {
                                'k8s-app': 'kube-dns'
                            }
                        }
                    }],
                    'ports': [
                        {'protocol': 'UDP', 'port': 53},
                        {'protocol': 'TCP', 'port': 53}
                    ]
                }]
            }
        }
 
    def generate_microservice_policy(
        self,
        service_name: str,
        allowed_ingress: list[dict],
        allowed_egress: list[dict],
        ingress_ports: list[int],
        egress_ports: list[dict]
    ) -> dict:
        """Genereaza politica pentru un microserviciu."""
        policy = {
            'apiVersion': 'networking.k8s.io/v1',
            'kind': 'NetworkPolicy',
            'metadata': {
                'name': f'{service_name}-policy',
                'namespace': self.namespace
            },
            'spec': {
                'podSelector': {
                    'matchLabels': {
                        'app': service_name
                    }
                },
                'policyTypes': ['Ingress', 'Egress'],
                'ingress': [],
                'egress': []
            }
        }
 
        # Construieste regulile de intrare
        if allowed_ingress:
            ingress_rule = {
                'from': [],
                'ports': [{'port': p, 'protocol': 'TCP'} for p in ingress_ports]
            }
 
            for source in allowed_ingress:
                if source.get('namespace'):
                    ingress_rule['from'].append({
                        'namespaceSelector': {
                            'matchLabels': {
                                'kubernetes.io/metadata.name': source['namespace']
                            }
                        },
                        'podSelector': {
                            'matchLabels': source.get('labels', {})
                        }
                    })
                elif source.get('cidr'):
                    ingress_rule['from'].append({
                        'ipBlock': {
                            'cidr': source['cidr']
                        }
                    })
 
            policy['spec']['ingress'].append(ingress_rule)
 
        # Construieste regulile de iesire
        if allowed_egress:
            for dest in allowed_egress:
                egress_rule = {'to': [], 'ports': []}
 
                if dest.get('namespace'):
                    egress_rule['to'].append({
                        'namespaceSelector': {
                            'matchLabels': {
                                'kubernetes.io/metadata.name': dest['namespace']
                            }
                        },
                        'podSelector': {
                            'matchLabels': dest.get('labels', {})
                        }
                    })
                elif dest.get('cidr'):
                    egress_rule['to'].append({
                        'ipBlock': {
                            'cidr': dest['cidr'],
                            'except': dest.get('except', [])
                        }
                    })
 
                for port_spec in egress_ports:
                    if port_spec.get('service') == dest.get('service'):
                        egress_rule['ports'].append({
                            'port': port_spec['port'],
                            'protocol': port_spec.get('protocol', 'TCP')
                        })
 
                policy['spec']['egress'].append(egress_rule)
 
        return policy
 
    def generate_namespace_isolation(self) -> list[dict]:
        """Genereaza politici complete de izolare a namespace-ului."""
        policies = [
            self.generate_default_deny_all(),
            self.generate_allow_dns()
        ]
 
        # Permite namespace-ului de monitorizare sa colecteze metrici
        policies.append({
            'apiVersion': 'networking.k8s.io/v1',
            'kind': 'NetworkPolicy',
            'metadata': {
                'name': 'allow-prometheus-scrape',
                'namespace': self.namespace
            },
            'spec': {
                'podSelector': {},
                'policyTypes': ['Ingress'],
                'ingress': [{
                    'from': [{
                        'namespaceSelector': {
                            'matchLabels': {
                                'kubernetes.io/metadata.name': 'monitoring'
                            }
                        },
                        'podSelector': {
                            'matchLabels': {
                                'app': 'prometheus'
                            }
                        }
                    }],
                    'ports': [{'port': 9090, 'protocol': 'TCP'}]
                }]
            }
        })
 
        return policies
 
# Exemplu de utilizare
generator = NetworkPolicyGenerator('production')
 
# Genereaza politici pentru o aplicatie tipica cu 3 niveluri
api_policy = generator.generate_microservice_policy(
    service_name='api-gateway',
    allowed_ingress=[
        {'namespace': 'ingress-nginx', 'labels': {'app': 'ingress-nginx'}},
        {'cidr': '10.0.0.0/8'}  # Load balancer intern
    ],
    allowed_egress=[
        {'namespace': 'production', 'labels': {'app': 'user-service'}, 'service': 'user-service'},
        {'namespace': 'production', 'labels': {'app': 'order-service'}, 'service': 'order-service'}
    ],
    ingress_ports=[8080, 8443],
    egress_ports=[
        {'service': 'user-service', 'port': 8080},
        {'service': 'order-service', 'port': 8080}
    ]
)
 
print(yaml.dump(api_policy, default_flow_style=False))

Monitor de securitate runtime

Detecteaza comportamentul anormal al containerelor la runtime:

import asyncio
from dataclasses import dataclass
from datetime import datetime
from typing import Callable
import json
 
@dataclass
class SecurityEvent:
    timestamp: datetime
    pod_name: str
    namespace: str
    container: str
    event_type: str
    severity: str
    description: str
    raw_data: dict
 
class RuntimeSecurityMonitor:
    def __init__(self):
        self.rules = self._load_rules()
        self.event_handlers: list[Callable] = []
        self.baseline_behaviors: dict = {}
 
    def _load_rules(self) -> list[dict]:
        """Incarca reguli runtime in stil Falco."""
        return [
            {
                'name': 'Terminal shell in container',
                'condition': lambda e: e.get('syscall') == 'execve' and
                            e.get('proc.name') in ['bash', 'sh', 'zsh'],
                'severity': 'WARNING',
                'description': 'Shell interactiv pornit intr-un container'
            },
            {
                'name': 'Write below /etc',
                'condition': lambda e: e.get('syscall') in ['open', 'openat'] and
                            e.get('fd.name', '').startswith('/etc/') and
                            'O_WRONLY' in e.get('flags', ''),
                'severity': 'ERROR',
                'description': 'Scriere in directorul /etc detectata'
            },
            {
                'name': 'Sensitive file access',
                'condition': lambda e: e.get('fd.name') in [
                    '/etc/shadow', '/etc/passwd', '/etc/sudoers',
                    '/root/.ssh/id_rsa', '/root/.bash_history'
                ],
                'severity': 'CRITICAL',
                'description': 'Acces la fisier sensibil detectat'
            },
            {
                'name': 'Network tool execution',
                'condition': lambda e: e.get('proc.name') in [
                    'nc', 'netcat', 'ncat', 'nmap', 'tcpdump', 'wireshark'
                ],
                'severity': 'WARNING',
                'description': 'Instrument de recunoastere a retelei executat'
            },
            {
                'name': 'Privilege escalation attempt',
                'condition': lambda e: e.get('proc.name') in ['sudo', 'su', 'pkexec'] or
                            e.get('syscall') == 'setuid',
                'severity': 'CRITICAL',
                'description': 'Tentativa de escaladare a privilegiilor detectata'
            },
            {
                'name': 'Cryptocurrency miner indicators',
                'condition': lambda e: any(
                    indicator in str(e.get('proc.args', ''))
                    for indicator in ['stratum+tcp', 'xmrig', 'minerd', 'cryptonight']
                ),
                'severity': 'CRITICAL',
                'description': 'Activitate de crypto mining detectata'
            },
            {
                'name': 'Container drift',
                'condition': lambda e: e.get('syscall') == 'execve' and
                            e.get('proc.name') not in e.get('baseline_processes', []),
                'severity': 'WARNING',
                'description': 'Proces nou care nu este in baseline-ul containerului'
            },
            {
                'name': 'Outbound connection to unusual port',
                'condition': lambda e: e.get('syscall') == 'connect' and
                            e.get('fd.rport') not in [80, 443, 53, 8080, 8443, 5432, 3306, 6379],
                'severity': 'WARNING',
                'description': 'Conexiune de iesire catre port non-standard'
            }
        ]
 
    def add_handler(self, handler: Callable):
        """Adauga handler de evenimente pentru evenimente de securitate."""
        self.event_handlers.append(handler)
 
    def set_baseline(self, pod_name: str, baseline: dict):
        """Seteaza baseline comportamental pentru un pod."""
        self.baseline_behaviors[pod_name] = baseline
 
    async def process_event(self, raw_event: dict):
        """Proceseaza un eveniment syscall din jurnalul de audit."""
        # Imbogateste evenimentul cu date baseline
        pod_name = raw_event.get('k8s.pod.name', '')
        if pod_name in self.baseline_behaviors:
            raw_event['baseline_processes'] = self.baseline_behaviors[pod_name].get('processes', [])
 
        # Verifica regulile
        for rule in self.rules:
            try:
                if rule['condition'](raw_event):
                    security_event = SecurityEvent(
                        timestamp=datetime.utcnow(),
                        pod_name=pod_name,
                        namespace=raw_event.get('k8s.ns.name', ''),
                        container=raw_event.get('container.name', ''),
                        event_type=rule['name'],
                        severity=rule['severity'],
                        description=rule['description'],
                        raw_data=raw_event
                    )
 
                    # Notifica handler-ele
                    for handler in self.event_handlers:
                        await handler(security_event)
            except Exception as e:
                print(f"Eroare la evaluarea regulii: {e}")
 
    async def start_monitoring(self, event_source):
        """Incepe procesarea evenimentelor din sursa."""
        async for event in event_source:
            await self.process_event(event)
 
async def alert_handler(event: SecurityEvent):
    """Trimite alerte pentru evenimente de securitate."""
    alert = {
        'timestamp': event.timestamp.isoformat(),
        'severity': event.severity,
        'pod': f"{event.namespace}/{event.pod_name}",
        'container': event.container,
        'event': event.event_type,
        'description': event.description
    }
 
    print(f"ALERTA DE SECURITATE: {json.dumps(alert, indent=2)}")
 
    # In productie, trimite catre SIEM/sistem de alertare
    if event.severity == 'CRITICAL':
        # Declanseaza raspuns imediat
        pass
 
# Initializeaza monitorul
monitor = RuntimeSecurityMonitor()
monitor.add_handler(alert_handler)
 
# Seteaza baseline pentru un pod cunoscut
monitor.set_baseline('api-server-abc123', {
    'processes': ['node', 'npm', 'tini'],
    'network_ports': [8080],
    'file_paths': ['/app', '/tmp']
})

Validator de standarde de securitate pentru pod-uri

Valideaza pod-urile conform standardelor Kubernetes Pod Security Standards:

from enum import Enum
from typing import Optional
 
class PSSLevel(Enum):
    PRIVILEGED = "privileged"
    BASELINE = "baseline"
    RESTRICTED = "restricted"
 
class PodSecurityStandardsValidator:
    def __init__(self, level: PSSLevel = PSSLevel.RESTRICTED):
        self.level = level
 
    def validate(self, pod_spec: dict) -> tuple[bool, list[str]]:
        """Valideaza specificatia pod-ului conform nivelului PSS."""
        if self.level == PSSLevel.PRIVILEGED:
            return True, []
 
        violations = []
 
        if self.level in [PSSLevel.BASELINE, PSSLevel.RESTRICTED]:
            violations.extend(self._check_baseline(pod_spec))
 
        if self.level == PSSLevel.RESTRICTED:
            violations.extend(self._check_restricted(pod_spec))
 
        return len(violations) == 0, violations
 
    def _check_baseline(self, pod_spec: dict) -> list[str]:
        """Verifica cerintele nivelului baseline."""
        violations = []
 
        # Namespace-uri gazda
        if pod_spec.get('hostNetwork'):
            violations.append("hostNetwork trebuie sa fie false")
        if pod_spec.get('hostPID'):
            violations.append("hostPID trebuie sa fie false")
        if pod_spec.get('hostIPC'):
            violations.append("hostIPC trebuie sa fie false")
 
        for container in self._get_all_containers(pod_spec):
            name = container.get('name', 'unknown')
            sc = container.get('securityContext', {})
 
            # Privilegiat
            if sc.get('privileged'):
                violations.append(f"{name}: privileged trebuie sa fie false")
 
            # Capabilitati
            caps = sc.get('capabilities', {})
            add_caps = caps.get('add', [])
 
            allowed_caps = ['AUDIT_WRITE', 'CHOWN', 'DAC_OVERRIDE', 'FOWNER',
                          'FSETID', 'KILL', 'MKNOD', 'NET_BIND_SERVICE',
                          'SETFCAP', 'SETGID', 'SETPCAP', 'SETUID', 'SYS_CHROOT']
 
            for cap in add_caps:
                if cap not in allowed_caps:
                    violations.append(f"{name}: capabilitatea {cap} nu este permisa")
 
            # Volume HostPath
            for volume in pod_spec.get('volumes', []):
                if volume.get('hostPath'):
                    path = volume['hostPath'].get('path', '')
                    violations.append(f"Volumul hostPath nu este permis: {path}")
 
            # Porturi gazda
            for port in container.get('ports', []):
                if port.get('hostPort'):
                    violations.append(f"{name}: hostPort nu este permis")
 
            # Montare proc
            if sc.get('procMount') and sc['procMount'] != 'Default':
                violations.append(f"{name}: procMount trebuie sa fie Default")
 
            # Seccomp
            seccomp = sc.get('seccompProfile', {})
            pod_seccomp = pod_spec.get('securityContext', {}).get('seccompProfile', {})
 
            if not seccomp and not pod_seccomp:
                pass  # Va fi verificat in restricted
            elif seccomp.get('type') == 'Unconfined' or pod_seccomp.get('type') == 'Unconfined':
                violations.append(f"{name}: seccomp Unconfined nu este permis")
 
        return violations
 
    def _check_restricted(self, pod_spec: dict) -> list[str]:
        """Verifica cerintele nivelului restricted."""
        violations = []
 
        pod_sc = pod_spec.get('securityContext', {})
 
        for container in self._get_all_containers(pod_spec):
            name = container.get('name', 'unknown')
            sc = container.get('securityContext', {})
 
            # Trebuie sa ruleze ca non-root
            run_as_non_root = sc.get('runAsNonRoot', pod_sc.get('runAsNonRoot'))
            if run_as_non_root is not True:
                violations.append(f"{name}: runAsNonRoot trebuie sa fie true")
 
            # Ruleaza ca utilizator > 0
            run_as_user = sc.get('runAsUser', pod_sc.get('runAsUser'))
            if run_as_user is not None and run_as_user == 0:
                violations.append(f"{name}: runAsUser nu trebuie sa fie 0")
 
            # Profil seccomp obligatoriu
            seccomp = sc.get('seccompProfile', {})
            pod_seccomp = pod_sc.get('seccompProfile', {})
 
            valid_types = ['RuntimeDefault', 'Localhost']
            if seccomp.get('type') not in valid_types and pod_seccomp.get('type') not in valid_types:
                violations.append(f"{name}: seccompProfile trebuie sa fie RuntimeDefault sau Localhost")
 
            # Capabilitatile trebuie sa elimine ALL
            caps = sc.get('capabilities', {})
            drop_caps = caps.get('drop', [])
 
            if 'ALL' not in drop_caps:
                violations.append(f"{name}: trebuie sa elimine ALL capabilities")
 
            # Doar NET_BIND_SERVICE poate fi adaugata
            add_caps = caps.get('add', [])
            for cap in add_caps:
                if cap != 'NET_BIND_SERVICE':
                    violations.append(f"{name}: doar NET_BIND_SERVICE poate fi adaugata")
 
            # Escaladarea privilegiilor
            if sc.get('allowPrivilegeEscalation') is not False:
                violations.append(f"{name}: allowPrivilegeEscalation trebuie sa fie false")
 
        # Tipuri de volume restrictionate
        allowed_volume_types = [
            'configMap', 'csi', 'downwardAPI', 'emptyDir', 'ephemeral',
            'persistentVolumeClaim', 'projected', 'secret'
        ]
 
        for volume in pod_spec.get('volumes', []):
            for vol_type in volume.keys():
                if vol_type != 'name' and vol_type not in allowed_volume_types:
                    violations.append(f"Tipul de volum {vol_type} nu este permis")
 
        return violations
 
    def _get_all_containers(self, pod_spec: dict) -> list[dict]:
        """Obtine toate containerele inclusiv init si efemere."""
        containers = pod_spec.get('containers', [])
        init_containers = pod_spec.get('initContainers', [])
        ephemeral_containers = pod_spec.get('ephemeralContainers', [])
        return containers + init_containers + ephemeral_containers
 
# Exemplu de utilizare
validator = PodSecurityStandardsValidator(PSSLevel.RESTRICTED)
 
pod_spec = {
    'securityContext': {
        'runAsNonRoot': True,
        'seccompProfile': {'type': 'RuntimeDefault'}
    },
    'containers': [{
        'name': 'app',
        'image': 'myapp:1.0',
        'securityContext': {
            'allowPrivilegeEscalation': False,
            'readOnlyRootFilesystem': True,
            'capabilities': {
                'drop': ['ALL']
            }
        }
    }]
}
 
passed, violations = validator.validate(pod_spec)
print(f"Conformitate PSS Restricted: {passed}")
for v in violations:
    print(f"  - {v}")

Concluzie

Securitatea containerelor in Kubernetes necesita mai multe straturi de aparare. Implementeaza scanarea imaginilor in CI/CD, aplica politici la admitere, implementeaza segmentarea retelei si monitorizeaza comportamentul la runtime. Foloseste standardele Pod Security Standards pentru a stabili posturi de securitate de baza si auditeaza regulat clusterele pentru conformitate. Tine minte ca securitatea este un proces continuu - actualizeaza-ti constant politicile pe masura ce apar amenintari noi si aplicatiile tale evolueaza.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.