DevSecOps

Bune Practici de Management al Secretelor pentru Echipe DevSecOps

Nicu Constantin
--7 min lectura
#secrets-management#vault#devsecops#security#credential-management

Managementul corect al secretelor este fundamental pentru livrarea sigura a software-ului. Acest ghid acopera bunele practici pentru gestionarea credentialelor, cheilor API, certificatelor si altor date sensibile in mediile de dezvoltare, CI/CD si productie.

Arhitectura Managementului Secretelor

Proiecteaza un sistem centralizat de management al secretelor:

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from enum import Enum
import hashlib
import json
import os
 
class SecretType(Enum):
    API_KEY = "api_key"
    DATABASE_CREDENTIAL = "database_credential"
    SSH_KEY = "ssh_key"
    TLS_CERTIFICATE = "tls_certificate"
    ENCRYPTION_KEY = "encryption_key"
    SERVICE_ACCOUNT = "service_account"
 
@dataclass
class Secret:
    name: str
    secret_type: SecretType
    value: str
    version: int
    created_at: datetime
    expires_at: Optional[datetime]
    metadata: Dict[str, Any]
 
@dataclass
class SecretAccessLog:
    timestamp: datetime
    secret_name: str
    accessor: str
    action: str
    source_ip: str
    success: bool
 
class SecretsManager:
    def __init__(self, backend):
        self.backend = backend
        self.access_logs: List[SecretAccessLog] = []
 
    def get_secret(
        self,
        path: str,
        accessor: str,
        source_ip: str,
        version: Optional[int] = None
    ) -> Optional[Secret]:
        """Obtine un secret cu logare completa de audit."""
        try:
            secret = self.backend.get_secret(path, version)
 
            self._log_access(
                secret_name=path,
                accessor=accessor,
                action='read',
                source_ip=source_ip,
                success=True
            )
 
            if secret.expires_at and secret.expires_at < datetime.utcnow():
                return None
 
            return secret
 
        except Exception as e:
            self._log_access(
                secret_name=path,
                accessor=accessor,
                action='read',
                source_ip=source_ip,
                success=False
            )
            raise
 
    def set_secret(
        self,
        path: str,
        value: str,
        secret_type: SecretType,
        accessor: str,
        source_ip: str,
        ttl_hours: Optional[int] = None
    ) -> Secret:
        """Seteaza un secret cu validare si logare de audit."""
        self._validate_secret(value, secret_type)
 
        expires_at = None
        if ttl_hours:
            expires_at = datetime.utcnow() + timedelta(hours=ttl_hours)
 
        metadata = {
            'type': secret_type.value,
            'created_by': accessor,
            'expires_at': expires_at.isoformat() if expires_at else None
        }
 
        secret = self.backend.set_secret(path, value, metadata)
 
        self._log_access(
            secret_name=path,
            accessor=accessor,
            action='write',
            source_ip=source_ip,
            success=True
        )
 
        return secret
 
    def _validate_secret(self, value: str, secret_type: SecretType):
        """Valideaza valoarea secretului in functie de tip."""
        if secret_type == SecretType.API_KEY:
            if len(value) < 20:
                raise ValueError("Cheia API trebuie sa aiba cel putin 20 de caractere")
 
        elif secret_type == SecretType.DATABASE_CREDENTIAL:
            cred = json.loads(value)
            if 'username' not in cred or 'password' not in cred:
                raise ValueError("Credentialele bazei de date trebuie sa contina username si password")
 
        elif secret_type == SecretType.SSH_KEY:
            if not value.startswith('-----BEGIN'):
                raise ValueError("Format invalid de cheie SSH")
 
    def _log_access(
        self,
        secret_name: str,
        accessor: str,
        action: str,
        source_ip: str,
        success: bool
    ):
        """Logheaza accesul la secrete in scopuri de audit."""
        log = SecretAccessLog(
            timestamp=datetime.utcnow(),
            secret_name=secret_name,
            accessor=accessor,
            action=action,
            source_ip=source_ip,
            success=success
        )
        self.access_logs.append(log)
        print(f"AUDIT: {json.dumps({
            'timestamp': log.timestamp.isoformat(),
            'secret': log.secret_name,
            'accessor': log.accessor,
            'action': log.action,
            'success': log.success
        })}")

Integrare HashiCorp Vault

Implementeaza Vault ca backend pentru secrete:

import hvac
from typing import Optional, Dict
 
class VaultBackend:
    def __init__(self, vault_addr: str, auth_method: str = 'token', **kwargs):
        self.client = hvac.Client(url=vault_addr)
        self._authenticate(auth_method, **kwargs)
        self.mount_point = 'secret'
 
    def _authenticate(self, method: str, **kwargs):
        """Autentificare la Vault."""
        if method == 'token':
            self.client.token = kwargs.get('token')
        elif method == 'approle':
            response = self.client.auth.approle.login(
                role_id=kwargs['role_id'],
                secret_id=kwargs['secret_id']
            )
            self.client.token = response['auth']['client_token']
        elif method == 'kubernetes':
            with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as f:
                jwt = f.read()
            response = self.client.auth.kubernetes.login(
                role=kwargs['role'],
                jwt=jwt
            )
            self.client.token = response['auth']['client_token']
 
        if not self.client.is_authenticated():
            raise Exception("Autentificarea la Vault a esuat")
 
    def get_secret(self, path: str, version: Optional[int] = None) -> Secret:
        """Obtine secretul din Vault KV v2."""
        if version:
            response = self.client.secrets.kv.v2.read_secret_version(
                path=path,
                version=version,
                mount_point=self.mount_point
            )
        else:
            response = self.client.secrets.kv.v2.read_secret_version(
                path=path,
                mount_point=self.mount_point
            )
 
        data = response['data']
        metadata = data['metadata']
 
        return Secret(
            name=path,
            secret_type=SecretType(data['data'].get('type', 'api_key')),
            value=data['data'].get('value', ''),
            version=metadata['version'],
            created_at=datetime.fromisoformat(metadata['created_time'].replace('Z', '+00:00')),
            expires_at=None,
            metadata=data['data']
        )
 
    def set_secret(self, path: str, value: str, metadata: Dict = None) -> Secret:
        """Scrie secretul in Vault KV v2."""
        secret_data = {'value': value, **(metadata or {})}
        self.client.secrets.kv.v2.create_or_update_secret(
            path=path,
            secret=secret_data,
            mount_point=self.mount_point
        )
        return self.get_secret(path)
 
    def delete_secret(self, path: str) -> bool:
        """Sterge secretul."""
        self.client.secrets.kv.v2.delete_metadata_and_all_versions(
            path=path,
            mount_point=self.mount_point
        )
        return True
 
    def list_secrets(self, path: str) -> List[str]:
        """Listeaza secretele de la o cale data."""
        response = self.client.secrets.kv.v2.list_secrets(
            path=path,
            mount_point=self.mount_point
        )
        return response['data']['keys']

Injectarea Secretelor in CI/CD

Injecteaza secretele in mod securizat in pipeline-urile CI/CD:

from dataclasses import dataclass
from typing import Optional, List, Dict
import subprocess
import os
 
@dataclass
class PipelineSecretConfig:
    secret_path: str
    env_var_name: str
    file_path: Optional[str] = None
    required: bool = True
 
class CICDSecretsInjector:
    def __init__(self, secrets_manager: SecretsManager):
        self.secrets_manager = secrets_manager
        self.injected_secrets: List[str] = []
        self.temp_files: List[str] = []
 
    def inject_secrets(
        self,
        configs: List[PipelineSecretConfig],
        accessor: str,
        source_ip: str
    ) -> Dict[str, str]:
        """Injecteaza secretele ca variabile de mediu."""
        env_vars = {}
 
        for config in configs:
            secret = self.secrets_manager.get_secret(
                path=config.secret_path,
                accessor=accessor,
                source_ip=source_ip
            )
 
            if secret is None:
                if config.required:
                    raise ValueError(f"Secret obligatoriu negasit: {config.secret_path}")
                continue
 
            env_vars[config.env_var_name] = secret.value
            self.injected_secrets.append(config.env_var_name)
 
            if config.file_path:
                self._write_secret_file(config.file_path, secret.value)
 
        return env_vars
 
    def _write_secret_file(self, path: str, content: str):
        """Scrie secretul intr-un fisier cu permisiuni restrictionate."""
        os.makedirs(os.path.dirname(path), exist_ok=True)
        fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
        try:
            os.write(fd, content.encode())
        finally:
            os.close(fd)
        self.temp_files.append(path)
 
    def run_with_secrets(
        self,
        command: List[str],
        configs: List[PipelineSecretConfig],
        accessor: str,
        source_ip: str,
        mask_output: bool = True
    ) -> subprocess.CompletedProcess:
        """Ruleaza o comanda cu secretele injectate."""
        secret_env = self.inject_secrets(configs, accessor, source_ip)
        env = os.environ.copy()
        env.update(secret_env)
 
        try:
            result = subprocess.run(command, env=env, capture_output=True, text=True)
            if mask_output:
                result.stdout = self._mask_secrets(result.stdout, secret_env)
                result.stderr = self._mask_secrets(result.stderr, secret_env)
            return result
        finally:
            self.cleanup()
 
    def _mask_secrets(self, text: str, secrets: Dict[str, str]) -> str:
        """Mascheaza valorile secretelor in output."""
        for name, value in secrets.items():
            if value and len(value) > 4:
                text = text.replace(value, f"***{name}***")
        return text
 
    def cleanup(self):
        """Curata secretele injectate si fisierele temporare."""
        for var_name in self.injected_secrets:
            if var_name in os.environ:
                del os.environ[var_name]
 
        for file_path in self.temp_files:
            if os.path.exists(file_path):
                with open(file_path, 'wb') as f:
                    f.write(b'\x00' * os.path.getsize(file_path))
                os.remove(file_path)
 
        self.injected_secrets.clear()
        self.temp_files.clear()

Automatizarea Rotatiei Secretelor

Automatizeaza rotatia secretelor:

from abc import ABC, abstractmethod
from typing import Dict
import psycopg2
import secrets as py_secrets
 
class RotationHandler(ABC):
    @abstractmethod
    def rotate(self, current_secret: Secret) -> str:
        pass
 
    @abstractmethod
    def verify(self, new_value: str) -> bool:
        pass
 
class DatabasePasswordRotationHandler(RotationHandler):
    def __init__(self, config: Dict[str, str]):
        self.config = config
 
    def rotate(self, current_secret: Secret) -> str:
        """Roteaza parola bazei de date."""
        cred = json.loads(current_secret.value)
        new_password = py_secrets.token_urlsafe(24)
 
        conn = psycopg2.connect(
            host=self.config['host'],
            database=self.config['database'],
            user=self.config['admin_user'],
            password=self.config['admin_password']
        )
 
        try:
            with conn.cursor() as cur:
                cur.execute(
                    f"ALTER USER {cred['username']} WITH PASSWORD %s",
                    (new_password,)
                )
            conn.commit()
        finally:
            conn.close()
 
        cred['password'] = new_password
        return json.dumps(cred)
 
    def verify(self, new_value: str) -> bool:
        """Verifica daca noua parola functioneaza."""
        cred = json.loads(new_value)
        try:
            conn = psycopg2.connect(
                host=self.config['host'],
                database=self.config['database'],
                user=cred['username'],
                password=cred['password']
            )
            conn.close()
            return True
        except Exception:
            return False
 
class SecretsRotationOrchestrator:
    def __init__(self, secrets_manager: SecretsManager):
        self.secrets_manager = secrets_manager
        self.handlers: Dict[str, RotationHandler] = {}
 
    def register_handler(self, secret_path: str, handler: RotationHandler):
        self.handlers[secret_path] = handler
 
    def rotate_secret(self, secret_path: str, accessor: str) -> bool:
        """Efectueaza rotatia secretului cu verificare."""
        handler = self.handlers.get(secret_path)
        if not handler:
            raise ValueError(f"Nu exista handler inregistrat pentru {secret_path}")
 
        current = self.secrets_manager.get_secret(
            path=secret_path,
            accessor=accessor,
            source_ip='rotation-orchestrator'
        )
 
        previous_value = current.value
 
        try:
            new_value = handler.rotate(current)
 
            if not handler.verify(new_value):
                raise Exception("Verificarea a esuat")
 
            self.secrets_manager.set_secret(
                path=secret_path,
                value=new_value,
                secret_type=current.secret_type,
                accessor=accessor,
                source_ip='rotation-orchestrator'
            )
 
            return True
        except Exception as e:
            print(f"Rotatia a esuat: {e}")
            return False

Concluzie

Managementul eficient al secretelor necesita stocare centralizata, controale stricte de acces, logare completa de audit si rotatie automatizata. Implementeaza HashiCorp Vault sau solutii similare ca sursa de adevar, integreaza-te cu pipeline-urile CI/CD pentru injectare securizata si stabileste politici de rotatie. Auditurile regulate ale pattern-urilor de acces la secrete ajuta la mentinerea posturii de securitate.

Resurse Conexe


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.