DevSecOps

Secrets Management Best Practices for DevSecOps Teams

DeviDevs Team
6 min read
#secrets-management#vault#devsecops#security#credential-management

Proper secrets management is fundamental to secure software delivery. This guide covers best practices for handling credentials, API keys, certificates, and other sensitive data across development, CI/CD, and production environments.

Secrets Management Architecture

Design a centralized secrets management system:

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from enum import Enum
import hashlib
import json
import os
 
class SecretType(Enum):
    API_KEY = "api_key"
    DATABASE_CREDENTIAL = "database_credential"
    SSH_KEY = "ssh_key"
    TLS_CERTIFICATE = "tls_certificate"
    ENCRYPTION_KEY = "encryption_key"
    SERVICE_ACCOUNT = "service_account"
 
@dataclass
class Secret:
    name: str
    secret_type: SecretType
    value: str
    version: int
    created_at: datetime
    expires_at: Optional[datetime]
    metadata: Dict[str, Any]
 
@dataclass
class SecretAccessLog:
    timestamp: datetime
    secret_name: str
    accessor: str
    action: str
    source_ip: str
    success: bool
 
class SecretsManager:
    def __init__(self, backend):
        self.backend = backend
        self.access_logs: List[SecretAccessLog] = []
 
    def get_secret(
        self,
        path: str,
        accessor: str,
        source_ip: str,
        version: Optional[int] = None
    ) -> Optional[Secret]:
        """Get a secret with full audit logging."""
        try:
            secret = self.backend.get_secret(path, version)
 
            self._log_access(
                secret_name=path,
                accessor=accessor,
                action='read',
                source_ip=source_ip,
                success=True
            )
 
            if secret.expires_at and secret.expires_at < datetime.utcnow():
                return None
 
            return secret
 
        except Exception as e:
            self._log_access(
                secret_name=path,
                accessor=accessor,
                action='read',
                source_ip=source_ip,
                success=False
            )
            raise
 
    def set_secret(
        self,
        path: str,
        value: str,
        secret_type: SecretType,
        accessor: str,
        source_ip: str,
        ttl_hours: Optional[int] = None
    ) -> Secret:
        """Set a secret with validation and audit logging."""
        self._validate_secret(value, secret_type)
 
        expires_at = None
        if ttl_hours:
            expires_at = datetime.utcnow() + timedelta(hours=ttl_hours)
 
        metadata = {
            'type': secret_type.value,
            'created_by': accessor,
            'expires_at': expires_at.isoformat() if expires_at else None
        }
 
        secret = self.backend.set_secret(path, value, metadata)
 
        self._log_access(
            secret_name=path,
            accessor=accessor,
            action='write',
            source_ip=source_ip,
            success=True
        )
 
        return secret
 
    def _validate_secret(self, value: str, secret_type: SecretType):
        """Validate secret value based on type."""
        if secret_type == SecretType.API_KEY:
            if len(value) < 20:
                raise ValueError("API key must be at least 20 characters")
 
        elif secret_type == SecretType.DATABASE_CREDENTIAL:
            cred = json.loads(value)
            if 'username' not in cred or 'password' not in cred:
                raise ValueError("Database credential must have username and password")
 
        elif secret_type == SecretType.SSH_KEY:
            if not value.startswith('-----BEGIN'):
                raise ValueError("Invalid SSH key format")
 
    def _log_access(
        self,
        secret_name: str,
        accessor: str,
        action: str,
        source_ip: str,
        success: bool
    ):
        """Log secret access for audit purposes."""
        log = SecretAccessLog(
            timestamp=datetime.utcnow(),
            secret_name=secret_name,
            accessor=accessor,
            action=action,
            source_ip=source_ip,
            success=success
        )
        self.access_logs.append(log)
        print(f"AUDIT: {json.dumps({
            'timestamp': log.timestamp.isoformat(),
            'secret': log.secret_name,
            'accessor': log.accessor,
            'action': log.action,
            'success': log.success
        })}")

HashiCorp Vault Integration

Implement Vault as the secrets backend:

import hvac
from typing import Optional, Dict
 
class VaultBackend:
    def __init__(self, vault_addr: str, auth_method: str = 'token', **kwargs):
        self.client = hvac.Client(url=vault_addr)
        self._authenticate(auth_method, **kwargs)
        self.mount_point = 'secret'
 
    def _authenticate(self, method: str, **kwargs):
        """Authenticate to Vault."""
        if method == 'token':
            self.client.token = kwargs.get('token')
        elif method == 'approle':
            response = self.client.auth.approle.login(
                role_id=kwargs['role_id'],
                secret_id=kwargs['secret_id']
            )
            self.client.token = response['auth']['client_token']
        elif method == 'kubernetes':
            with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as f:
                jwt = f.read()
            response = self.client.auth.kubernetes.login(
                role=kwargs['role'],
                jwt=jwt
            )
            self.client.token = response['auth']['client_token']
 
        if not self.client.is_authenticated():
            raise Exception("Vault authentication failed")
 
    def get_secret(self, path: str, version: Optional[int] = None) -> Secret:
        """Retrieve secret from Vault KV v2."""
        if version:
            response = self.client.secrets.kv.v2.read_secret_version(
                path=path,
                version=version,
                mount_point=self.mount_point
            )
        else:
            response = self.client.secrets.kv.v2.read_secret_version(
                path=path,
                mount_point=self.mount_point
            )
 
        data = response['data']
        metadata = data['metadata']
 
        return Secret(
            name=path,
            secret_type=SecretType(data['data'].get('type', 'api_key')),
            value=data['data'].get('value', ''),
            version=metadata['version'],
            created_at=datetime.fromisoformat(metadata['created_time'].replace('Z', '+00:00')),
            expires_at=None,
            metadata=data['data']
        )
 
    def set_secret(self, path: str, value: str, metadata: Dict = None) -> Secret:
        """Write secret to Vault KV v2."""
        secret_data = {'value': value, **(metadata or {})}
        self.client.secrets.kv.v2.create_or_update_secret(
            path=path,
            secret=secret_data,
            mount_point=self.mount_point
        )
        return self.get_secret(path)
 
    def delete_secret(self, path: str) -> bool:
        """Delete secret."""
        self.client.secrets.kv.v2.delete_metadata_and_all_versions(
            path=path,
            mount_point=self.mount_point
        )
        return True
 
    def list_secrets(self, path: str) -> List[str]:
        """List secrets at path."""
        response = self.client.secrets.kv.v2.list_secrets(
            path=path,
            mount_point=self.mount_point
        )
        return response['data']['keys']

CI/CD Secrets Injection

Securely inject secrets into CI/CD pipelines:

from dataclasses import dataclass
from typing import Optional, List, Dict
import subprocess
import os
 
@dataclass
class PipelineSecretConfig:
    secret_path: str
    env_var_name: str
    file_path: Optional[str] = None
    required: bool = True
 
class CICDSecretsInjector:
    def __init__(self, secrets_manager: SecretsManager):
        self.secrets_manager = secrets_manager
        self.injected_secrets: List[str] = []
        self.temp_files: List[str] = []
 
    def inject_secrets(
        self,
        configs: List[PipelineSecretConfig],
        accessor: str,
        source_ip: str
    ) -> Dict[str, str]:
        """Inject secrets as environment variables."""
        env_vars = {}
 
        for config in configs:
            secret = self.secrets_manager.get_secret(
                path=config.secret_path,
                accessor=accessor,
                source_ip=source_ip
            )
 
            if secret is None:
                if config.required:
                    raise ValueError(f"Required secret not found: {config.secret_path}")
                continue
 
            env_vars[config.env_var_name] = secret.value
            self.injected_secrets.append(config.env_var_name)
 
            if config.file_path:
                self._write_secret_file(config.file_path, secret.value)
 
        return env_vars
 
    def _write_secret_file(self, path: str, content: str):
        """Write secret to file with restricted permissions."""
        os.makedirs(os.path.dirname(path), exist_ok=True)
        fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
        try:
            os.write(fd, content.encode())
        finally:
            os.close(fd)
        self.temp_files.append(path)
 
    def run_with_secrets(
        self,
        command: List[str],
        configs: List[PipelineSecretConfig],
        accessor: str,
        source_ip: str,
        mask_output: bool = True
    ) -> subprocess.CompletedProcess:
        """Run command with injected secrets."""
        secret_env = self.inject_secrets(configs, accessor, source_ip)
        env = os.environ.copy()
        env.update(secret_env)
 
        try:
            result = subprocess.run(command, env=env, capture_output=True, text=True)
            if mask_output:
                result.stdout = self._mask_secrets(result.stdout, secret_env)
                result.stderr = self._mask_secrets(result.stderr, secret_env)
            return result
        finally:
            self.cleanup()
 
    def _mask_secrets(self, text: str, secrets: Dict[str, str]) -> str:
        """Mask secret values in output."""
        for name, value in secrets.items():
            if value and len(value) > 4:
                text = text.replace(value, f"***{name}***")
        return text
 
    def cleanup(self):
        """Clean up injected secrets and temporary files."""
        for var_name in self.injected_secrets:
            if var_name in os.environ:
                del os.environ[var_name]
 
        for file_path in self.temp_files:
            if os.path.exists(file_path):
                with open(file_path, 'wb') as f:
                    f.write(b'\x00' * os.path.getsize(file_path))
                os.remove(file_path)
 
        self.injected_secrets.clear()
        self.temp_files.clear()

Secrets Rotation Automation

Automate secrets rotation:

from abc import ABC, abstractmethod
from typing import Dict
import psycopg2
import secrets as py_secrets
 
class RotationHandler(ABC):
    @abstractmethod
    def rotate(self, current_secret: Secret) -> str:
        pass
 
    @abstractmethod
    def verify(self, new_value: str) -> bool:
        pass
 
class DatabasePasswordRotationHandler(RotationHandler):
    def __init__(self, config: Dict[str, str]):
        self.config = config
 
    def rotate(self, current_secret: Secret) -> str:
        """Rotate database password."""
        cred = json.loads(current_secret.value)
        new_password = py_secrets.token_urlsafe(24)
 
        conn = psycopg2.connect(
            host=self.config['host'],
            database=self.config['database'],
            user=self.config['admin_user'],
            password=self.config['admin_password']
        )
 
        try:
            with conn.cursor() as cur:
                cur.execute(
                    f"ALTER USER {cred['username']} WITH PASSWORD %s",
                    (new_password,)
                )
            conn.commit()
        finally:
            conn.close()
 
        cred['password'] = new_password
        return json.dumps(cred)
 
    def verify(self, new_value: str) -> bool:
        """Verify new password works."""
        cred = json.loads(new_value)
        try:
            conn = psycopg2.connect(
                host=self.config['host'],
                database=self.config['database'],
                user=cred['username'],
                password=cred['password']
            )
            conn.close()
            return True
        except Exception:
            return False
 
class SecretsRotationOrchestrator:
    def __init__(self, secrets_manager: SecretsManager):
        self.secrets_manager = secrets_manager
        self.handlers: Dict[str, RotationHandler] = {}
 
    def register_handler(self, secret_path: str, handler: RotationHandler):
        self.handlers[secret_path] = handler
 
    def rotate_secret(self, secret_path: str, accessor: str) -> bool:
        """Perform secret rotation with verification."""
        handler = self.handlers.get(secret_path)
        if not handler:
            raise ValueError(f"No handler registered for {secret_path}")
 
        current = self.secrets_manager.get_secret(
            path=secret_path,
            accessor=accessor,
            source_ip='rotation-orchestrator'
        )
 
        previous_value = current.value
 
        try:
            new_value = handler.rotate(current)
 
            if not handler.verify(new_value):
                raise Exception("Verification failed")
 
            self.secrets_manager.set_secret(
                path=secret_path,
                value=new_value,
                secret_type=current.secret_type,
                accessor=accessor,
                source_ip='rotation-orchestrator'
            )
 
            return True
        except Exception as e:
            print(f"Rotation failed: {e}")
            return False

Conclusion

Effective secrets management requires centralized storage, strict access controls, comprehensive audit logging, and automated rotation. Implement HashiCorp Vault or similar solutions as your source of truth, integrate with CI/CD pipelines for secure injection, and establish rotation policies. Regular audits of secret access patterns help maintain security posture.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.