DevSecOps

Secrets Management Best Practices: HashiCorp Vault, AWS Secrets Manager, and Beyond

DeviDevs Team
11 min read
#secrets management#HashiCorp Vault#AWS Secrets Manager#DevSecOps#security

Secrets Management Best Practices: HashiCorp Vault, AWS Secrets Manager, and Beyond

Secrets management is a critical security discipline that prevents credential exposure and enables secure application deployment. This guide covers enterprise-grade secrets management patterns and implementations.

Understanding Secrets Management

Types of Secrets

# secrets_taxonomy.yaml
secret_types:
  authentication:
    - api_keys
    - oauth_tokens
    - jwt_signing_keys
    - service_account_credentials
 
  infrastructure:
    - database_passwords
    - ssh_private_keys
    - tls_certificates
    - encryption_keys
 
  application:
    - third_party_api_keys
    - webhook_secrets
    - session_encryption_keys
    - feature_flag_tokens
 
  ci_cd:
    - deployment_credentials
    - registry_passwords
    - signing_keys
    - cloud_provider_credentials

Secret Lifecycle

┌──────────────┐    ┌──────────────┐    ┌──────────────┐
│   Creation   │───>│   Storage    │───>│Distribution  │
└──────────────┘    └──────────────┘    └──────────────┘
                           │
                           v
┌──────────────┐    ┌──────────────┐    ┌──────────────┐
│  Revocation  │<───│   Rotation   │<───│   Usage      │
└──────────────┘    └──────────────┘    └──────────────┘

HashiCorp Vault Implementation

Vault Architecture Setup

# vault_config.hcl
storage "raft" {
  path    = "/opt/vault/data"
  node_id = "vault-1"
 
  retry_join {
    leader_api_addr = "https://vault-2.example.com:8200"
  }
  retry_join {
    leader_api_addr = "https://vault-3.example.com:8200"
  }
}
 
listener "tcp" {
  address       = "0.0.0.0:8200"
  tls_cert_file = "/opt/vault/tls/vault.crt"
  tls_key_file  = "/opt/vault/tls/vault.key"
 
  telemetry {
    unauthenticated_metrics_access = false
  }
}
 
seal "awskms" {
  region     = "us-east-1"
  kms_key_id = "alias/vault-auto-unseal"
}
 
api_addr      = "https://vault.example.com:8200"
cluster_addr  = "https://vault-1.example.com:8201"
 
telemetry {
  prometheus_retention_time = "30s"
  disable_hostname          = true
}

Secrets Engines Configuration

# vault_setup.py
import hvac
from typing import Dict, List
 
class VaultSecretsManager:
    def __init__(self, vault_addr: str, token: str):
        self.client = hvac.Client(url=vault_addr, token=token)
 
    def setup_kv_engine(self, path: str = "secret"):
        """Enable and configure KV secrets engine v2."""
 
        # Enable KV v2 secrets engine
        self.client.sys.enable_secrets_engine(
            backend_type='kv',
            path=path,
            options={'version': '2'}
        )
 
        # Configure engine settings
        self.client.secrets.kv.v2.configure(
            mount_point=path,
            max_versions=10,
            cas_required=False,
            delete_version_after='90d'
        )
 
    def setup_database_engine(self):
        """Configure dynamic database credentials."""
 
        # Enable database secrets engine
        self.client.sys.enable_secrets_engine(
            backend_type='database',
            path='database'
        )
 
        # Configure PostgreSQL connection
        self.client.secrets.database.configure(
            name='postgresql',
            plugin_name='postgresql-database-plugin',
            connection_url='postgresql://{{username}}:{{password}}@db.example.com:5432/mydb',
            allowed_roles=['readonly', 'readwrite', 'admin'],
            username='vault_admin',
            password='initial_password'
        )
 
        # Create readonly role
        self.client.secrets.database.create_role(
            name='readonly',
            db_name='postgresql',
            creation_statements=[
                "CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}';",
                "GRANT SELECT ON ALL TABLES IN SCHEMA public TO \"{{name}}\";"
            ],
            revocation_statements=[
                "REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA public FROM \"{{name}}\";",
                "DROP ROLE IF EXISTS \"{{name}}\";"
            ],
            default_ttl='1h',
            max_ttl='24h'
        )
 
    def setup_aws_engine(self):
        """Configure AWS secrets engine for dynamic IAM credentials."""
 
        self.client.sys.enable_secrets_engine(
            backend_type='aws',
            path='aws'
        )
 
        # Configure AWS root credentials
        self.client.secrets.aws.configure_root_iam_credentials(
            access_key='AKIAIOSFODNN7EXAMPLE',
            secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
            region='us-east-1'
        )
 
        # Create assumed role
        self.client.secrets.aws.create_or_update_role(
            name='deploy-role',
            credential_type='assumed_role',
            role_arns=['arn:aws:iam::123456789012:role/VaultDeployRole'],
            default_sts_ttl='3600',
            max_sts_ttl='43200'
        )
 
    def setup_pki_engine(self):
        """Configure PKI secrets engine for certificate management."""
 
        # Enable PKI engine
        self.client.sys.enable_secrets_engine(
            backend_type='pki',
            path='pki',
            config={'max_lease_ttl': '87600h'}  # 10 years
        )
 
        # Generate root CA
        root_cert = self.client.secrets.pki.generate_root(
            type='internal',
            common_name='Example Root CA',
            ttl='87600h'
        )
 
        # Configure CA and CRL URLs
        self.client.secrets.pki.set_urls(
            issuing_certificates='https://vault.example.com:8200/v1/pki/ca',
            crl_distribution_points='https://vault.example.com:8200/v1/pki/crl'
        )
 
        # Create role for issuing certificates
        self.client.secrets.pki.create_or_update_role(
            name='server-cert',
            allowed_domains=['example.com'],
            allow_subdomains=True,
            max_ttl='720h',  # 30 days
            key_type='ec',
            key_bits=256
        )

Vault Policies and Authentication

# policies/app-readonly.hcl
path "secret/data/app/*" {
  capabilities = ["read"]
}
 
path "database/creds/readonly" {
  capabilities = ["read"]
}
 
path "aws/creds/deploy-role" {
  capabilities = ["read"]
}
 
# Deny access to sensitive paths
path "secret/data/admin/*" {
  capabilities = ["deny"]
}
 
# policies/app-admin.hcl
path "secret/data/app/*" {
  capabilities = ["create", "read", "update", "delete", "list"]
}
 
path "secret/metadata/app/*" {
  capabilities = ["list", "read", "delete"]
}
 
path "database/creds/*" {
  capabilities = ["read"]
}
 
# policies/operator.hcl
path "sys/health" {
  capabilities = ["read", "sudo"]
}
 
path "sys/policies/acl/*" {
  capabilities = ["create", "read", "update", "delete", "list"]
}
 
path "auth/*" {
  capabilities = ["create", "read", "update", "delete", "list", "sudo"]
}
# vault_auth.py
class VaultAuthManager:
    def __init__(self, client: hvac.Client):
        self.client = client
 
    def setup_kubernetes_auth(self, k8s_host: str, k8s_ca_cert: str):
        """Configure Kubernetes authentication method."""
 
        # Enable Kubernetes auth
        self.client.sys.enable_auth_method(
            method_type='kubernetes',
            path='kubernetes'
        )
 
        # Configure Kubernetes auth
        self.client.auth.kubernetes.configure(
            kubernetes_host=k8s_host,
            kubernetes_ca_cert=k8s_ca_cert
        )
 
        # Create role for application pods
        self.client.auth.kubernetes.create_role(
            name='app-role',
            bound_service_account_names=['app-sa'],
            bound_service_account_namespaces=['production'],
            policies=['app-readonly'],
            ttl='1h'
        )
 
    def setup_approle_auth(self):
        """Configure AppRole authentication for CI/CD."""
 
        # Enable AppRole auth
        self.client.sys.enable_auth_method(
            method_type='approle',
            path='approle'
        )
 
        # Create AppRole for CI/CD
        self.client.auth.approle.create_or_update_approle(
            role_name='cicd',
            token_policies=['app-admin'],
            token_ttl='10m',
            token_max_ttl='30m',
            secret_id_ttl='5m',
            secret_id_num_uses=1,  # One-time use
            bind_secret_id=True
        )
 
    def setup_jwt_auth(self, oidc_discovery_url: str):
        """Configure JWT/OIDC authentication."""
 
        # Enable JWT auth
        self.client.sys.enable_auth_method(
            method_type='jwt',
            path='jwt'
        )
 
        # Configure OIDC provider
        self.client.auth.jwt.configure(
            oidc_discovery_url=oidc_discovery_url,
            bound_issuer=oidc_discovery_url
        )
 
        # Create role for users
        self.client.auth.jwt.create_role(
            name='user',
            bound_audiences=['vault'],
            user_claim='email',
            groups_claim='groups',
            token_policies=['default'],
            ttl='1h'
        )

AWS Secrets Manager Implementation

Secrets Manager Setup

# aws_secrets_manager.py
import boto3
import json
from datetime import datetime
from typing import Dict, Optional
 
class AWSSecretsManager:
    def __init__(self, region: str = 'us-east-1'):
        self.client = boto3.client('secretsmanager', region_name=region)
        self.region = region
 
    def create_secret(
        self,
        name: str,
        value: Dict,
        description: str = '',
        kms_key_id: Optional[str] = None,
        tags: Optional[Dict] = None
    ) -> Dict:
        """Create a new secret."""
 
        params = {
            'Name': name,
            'Description': description,
            'SecretString': json.dumps(value)
        }
 
        if kms_key_id:
            params['KmsKeyId'] = kms_key_id
 
        if tags:
            params['Tags'] = [
                {'Key': k, 'Value': v} for k, v in tags.items()
            ]
 
        response = self.client.create_secret(**params)
 
        return {
            'arn': response['ARN'],
            'name': response['Name'],
            'version_id': response['VersionId']
        }
 
    def get_secret(self, secret_id: str, version: Optional[str] = None) -> Dict:
        """Retrieve secret value."""
 
        params = {'SecretId': secret_id}
 
        if version:
            params['VersionId'] = version
 
        response = self.client.get_secret_value(**params)
 
        return {
            'name': response['Name'],
            'value': json.loads(response['SecretString']),
            'version_id': response['VersionId'],
            'created_date': response['CreatedDate']
        }
 
    def update_secret(self, secret_id: str, value: Dict) -> Dict:
        """Update secret value."""
 
        response = self.client.update_secret(
            SecretId=secret_id,
            SecretString=json.dumps(value)
        )
 
        return {
            'arn': response['ARN'],
            'name': response['Name'],
            'version_id': response['VersionId']
        }
 
    def setup_rotation(
        self,
        secret_id: str,
        rotation_lambda_arn: str,
        rotation_days: int = 30
    ):
        """Configure automatic rotation."""
 
        self.client.rotate_secret(
            SecretId=secret_id,
            RotationLambdaARN=rotation_lambda_arn,
            RotationRules={
                'AutomaticallyAfterDays': rotation_days
            }
        )
 
    def create_resource_policy(self, secret_id: str, principals: list) -> Dict:
        """Attach resource policy to secret."""
 
        policy = {
            'Version': '2012-10-17',
            'Statement': [
                {
                    'Sid': 'AllowCrossAccountAccess',
                    'Effect': 'Allow',
                    'Principal': {
                        'AWS': principals
                    },
                    'Action': [
                        'secretsmanager:GetSecretValue',
                        'secretsmanager:DescribeSecret'
                    ],
                    'Resource': '*'
                }
            ]
        }
 
        self.client.put_resource_policy(
            SecretId=secret_id,
            ResourcePolicy=json.dumps(policy)
        )
 
        return policy

Rotation Lambda Function

# rotation_lambda.py
import boto3
import json
import logging
from typing import Dict
 
logger = logging.getLogger()
logger.setLevel(logging.INFO)
 
secrets_client = boto3.client('secretsmanager')
 
def lambda_handler(event: Dict, context) -> None:
    """Handle secret rotation."""
 
    arn = event['SecretId']
    token = event['ClientRequestToken']
    step = event['Step']
 
    logger.info(f"Rotating secret {arn}, step: {step}")
 
    if step == 'createSecret':
        create_secret(arn, token)
    elif step == 'setSecret':
        set_secret(arn, token)
    elif step == 'testSecret':
        test_secret(arn, token)
    elif step == 'finishSecret':
        finish_secret(arn, token)
    else:
        raise ValueError(f"Unknown step: {step}")
 
def create_secret(arn: str, token: str) -> None:
    """Create new version of the secret."""
 
    # Get current secret
    current = secrets_client.get_secret_value(
        SecretId=arn,
        VersionStage='AWSCURRENT'
    )
 
    current_value = json.loads(current['SecretString'])
 
    # Generate new password
    import secrets
    import string
 
    new_password = ''.join(
        secrets.choice(string.ascii_letters + string.digits + '!@#$%^&*()')
        for _ in range(32)
    )
 
    # Create new secret value
    new_value = current_value.copy()
    new_value['password'] = new_password
 
    # Put new secret version
    secrets_client.put_secret_value(
        SecretId=arn,
        ClientRequestToken=token,
        SecretString=json.dumps(new_value),
        VersionStages=['AWSPENDING']
    )
 
    logger.info(f"Created new secret version for {arn}")
 
def set_secret(arn: str, token: str) -> None:
    """Set the secret in the target service."""
 
    # Get pending secret
    pending = secrets_client.get_secret_value(
        SecretId=arn,
        VersionId=token,
        VersionStage='AWSPENDING'
    )
 
    pending_value = json.loads(pending['SecretString'])
 
    # Update password in target service
    # This is service-specific (database, API, etc.)
    if 'engine' in pending_value and pending_value['engine'] == 'postgres':
        update_postgres_password(pending_value)
    elif 'service' in pending_value:
        update_service_credential(pending_value)
 
    logger.info(f"Set secret in target service for {arn}")
 
def test_secret(arn: str, token: str) -> None:
    """Test that the new secret works."""
 
    pending = secrets_client.get_secret_value(
        SecretId=arn,
        VersionId=token,
        VersionStage='AWSPENDING'
    )
 
    pending_value = json.loads(pending['SecretString'])
 
    # Test connection with new credentials
    if 'engine' in pending_value and pending_value['engine'] == 'postgres':
        test_postgres_connection(pending_value)
    elif 'service' in pending_value:
        test_service_connection(pending_value)
 
    logger.info(f"Tested new secret for {arn}")
 
def finish_secret(arn: str, token: str) -> None:
    """Finalize the rotation."""
 
    # Get current version
    metadata = secrets_client.describe_secret(SecretId=arn)
 
    current_version = None
    for version_id, stages in metadata['VersionIdsToStages'].items():
        if 'AWSCURRENT' in stages:
            current_version = version_id
            break
 
    # Move AWSCURRENT to new version
    secrets_client.update_secret_version_stage(
        SecretId=arn,
        VersionStage='AWSCURRENT',
        MoveToVersionId=token,
        RemoveFromVersionId=current_version
    )
 
    logger.info(f"Finished rotation for {arn}")
 
def update_postgres_password(secret: Dict) -> None:
    """Update PostgreSQL password."""
    import psycopg2
 
    conn = psycopg2.connect(
        host=secret['host'],
        port=secret.get('port', 5432),
        dbname=secret['dbname'],
        user=secret['masteruser'],
        password=secret['masterpassword']
    )
 
    with conn.cursor() as cur:
        cur.execute(
            f"ALTER USER {secret['username']} WITH PASSWORD %s",
            (secret['password'],)
        )
    conn.commit()
    conn.close()
 
def test_postgres_connection(secret: Dict) -> None:
    """Test PostgreSQL connection with new password."""
    import psycopg2
 
    conn = psycopg2.connect(
        host=secret['host'],
        port=secret.get('port', 5432),
        dbname=secret['dbname'],
        user=secret['username'],
        password=secret['password']
    )
 
    with conn.cursor() as cur:
        cur.execute('SELECT 1')
 
    conn.close()

Application Integration Patterns

Kubernetes External Secrets

# external-secrets.yaml
apiVersion: external-secrets.io/v1beta1
kind: ClusterSecretStore
metadata:
  name: vault-backend
spec:
  provider:
    vault:
      server: "https://vault.example.com:8200"
      path: "secret"
      version: "v2"
      auth:
        kubernetes:
          mountPath: "kubernetes"
          role: "app-role"
          serviceAccountRef:
            name: "external-secrets-sa"
            namespace: "external-secrets"
 
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
  name: app-secrets
  namespace: production
spec:
  refreshInterval: "15m"
  secretStoreRef:
    name: vault-backend
    kind: ClusterSecretStore
  target:
    name: app-secrets
    creationPolicy: Owner
  data:
    - secretKey: database-password
      remoteRef:
        key: app/database
        property: password
    - secretKey: api-key
      remoteRef:
        key: app/api
        property: key
 
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
  name: aws-secrets
  namespace: production
spec:
  refreshInterval: "1h"
  secretStoreRef:
    name: aws-secrets-manager
    kind: ClusterSecretStore
  target:
    name: aws-credentials
    creationPolicy: Owner
  dataFrom:
    - extract:
        key: production/app/credentials

SDK Integration

# secrets_client.py
import os
from functools import lru_cache
from typing import Optional, Dict
import hvac
import boto3
 
class SecretsClient:
    """Unified secrets client supporting multiple backends."""
 
    def __init__(self, backend: str = 'auto'):
        self.backend = self._detect_backend() if backend == 'auto' else backend
        self._client = self._init_client()
 
    def _detect_backend(self) -> str:
        """Auto-detect secrets backend based on environment."""
        if os.getenv('VAULT_ADDR'):
            return 'vault'
        elif os.getenv('AWS_REGION'):
            return 'aws'
        else:
            return 'env'
 
    def _init_client(self):
        """Initialize backend-specific client."""
        if self.backend == 'vault':
            return hvac.Client(
                url=os.getenv('VAULT_ADDR'),
                token=os.getenv('VAULT_TOKEN')
            )
        elif self.backend == 'aws':
            return boto3.client('secretsmanager')
        else:
            return None
 
    @lru_cache(maxsize=100)
    def get_secret(self, path: str, key: Optional[str] = None) -> str:
        """Get secret value with caching."""
 
        if self.backend == 'vault':
            response = self._client.secrets.kv.v2.read_secret_version(
                path=path
            )
            data = response['data']['data']
            return data.get(key) if key else data
 
        elif self.backend == 'aws':
            response = self._client.get_secret_value(SecretId=path)
            import json
            data = json.loads(response['SecretString'])
            return data.get(key) if key else data
 
        else:
            # Environment variable fallback
            env_key = path.replace('/', '_').upper()
            if key:
                env_key = f"{env_key}_{key.upper()}"
            return os.getenv(env_key)
 
    def get_database_credentials(self) -> Dict[str, str]:
        """Get dynamic database credentials."""
 
        if self.backend == 'vault':
            response = self._client.secrets.database.generate_credentials(
                name='readonly'
            )
            return {
                'username': response['data']['username'],
                'password': response['data']['password'],
                'ttl': response['lease_duration']
            }
        else:
            return {
                'username': self.get_secret('database', 'username'),
                'password': self.get_secret('database', 'password')
            }
 
    def invalidate_cache(self):
        """Clear cached secrets."""
        self.get_secret.cache_clear()
 
 
# Usage example
secrets = SecretsClient()
 
# Get static secret
api_key = secrets.get_secret('app/api', 'key')
 
# Get dynamic database credentials
db_creds = secrets.get_database_credentials()

Security Best Practices

Secret Hygiene

# secret_scanner.py
import re
from typing import List, Dict, Tuple
from pathlib import Path
 
class SecretScanner:
    """Scan code for exposed secrets."""
 
    PATTERNS = [
        # API Keys
        (r'(?i)(api[_-]?key|apikey)\s*[=:]\s*["\']([a-zA-Z0-9_-]{20,})["\']', 'API Key'),
        # AWS
        (r'AKIA[0-9A-Z]{16}', 'AWS Access Key'),
        (r'(?i)aws[_-]?secret[_-]?access[_-]?key\s*[=:]\s*["\']([a-zA-Z0-9/+=]{40})["\']', 'AWS Secret Key'),
        # Passwords
        (r'(?i)(password|passwd|pwd)\s*[=:]\s*["\']([^"\']{8,})["\']', 'Password'),
        # Private Keys
        (r'-----BEGIN (RSA |EC |DSA )?PRIVATE KEY-----', 'Private Key'),
        # JWT
        (r'eyJ[a-zA-Z0-9_-]*\.eyJ[a-zA-Z0-9_-]*\.[a-zA-Z0-9_-]*', 'JWT Token'),
        # Generic Secrets
        (r'(?i)(secret|token)\s*[=:]\s*["\']([a-zA-Z0-9_-]{20,})["\']', 'Generic Secret'),
    ]
 
    EXCLUDE_PATTERNS = [
        r'\.git/',
        r'node_modules/',
        r'\.env\.example',
        r'__pycache__/',
    ]
 
    def __init__(self, root_path: str):
        self.root_path = Path(root_path)
        self.findings: List[Dict] = []
 
    def scan(self) -> List[Dict]:
        """Scan directory for secrets."""
 
        for file_path in self.root_path.rglob('*'):
            if file_path.is_file() and not self._should_exclude(file_path):
                self._scan_file(file_path)
 
        return self.findings
 
    def _should_exclude(self, path: Path) -> bool:
        """Check if path should be excluded."""
        path_str = str(path)
        return any(
            re.search(pattern, path_str)
            for pattern in self.EXCLUDE_PATTERNS
        )
 
    def _scan_file(self, file_path: Path):
        """Scan single file for secrets."""
 
        try:
            content = file_path.read_text(encoding='utf-8', errors='ignore')
        except Exception:
            return
 
        for line_num, line in enumerate(content.split('\n'), 1):
            for pattern, secret_type in self.PATTERNS:
                if re.search(pattern, line):
                    self.findings.append({
                        'file': str(file_path.relative_to(self.root_path)),
                        'line': line_num,
                        'type': secret_type,
                        'preview': self._redact_preview(line)
                    })
 
    def _redact_preview(self, line: str, max_length: int = 80) -> str:
        """Create redacted preview of line."""
        # Replace potential secrets with asterisks
        redacted = re.sub(
            r'(["\'])[a-zA-Z0-9_/+=.-]{10,}(["\'])',
            r'\1****REDACTED****\2',
            line
        )
        return redacted[:max_length] + ('...' if len(redacted) > max_length else '')
 
 
# Pre-commit hook
if __name__ == '__main__':
    import sys
 
    scanner = SecretScanner('.')
    findings = scanner.scan()
 
    if findings:
        print("⚠️  Potential secrets detected:")
        for finding in findings:
            print(f"  {finding['file']}:{finding['line']} - {finding['type']}")
            print(f"    {finding['preview']}")
        sys.exit(1)
    else:
        print("✅ No secrets detected")
        sys.exit(0)

Zero-Trust Secret Distribution

# zero_trust_secrets.yaml
architecture:
  principles:
    - never_trust_network
    - verify_explicitly
    - least_privilege
    - assume_breach
 
  implementation:
    authentication:
      method: mutual_tls
      certificate_rotation: automatic
      certificate_ttl: 24h
 
    authorization:
      model: attribute_based_access_control
      context:
        - identity
        - device_posture
        - location
        - time
        - resource_sensitivity
 
    encryption:
      in_transit: tls_1_3
      at_rest: aes_256_gcm
      key_management: hsm_backed
 
    audit:
      log_all_access: true
      retention: 90_days
      alerting: real_time

Conclusion

Effective secrets management requires:

  1. Centralized storage with proper access controls
  2. Automated rotation to limit exposure windows
  3. Dynamic credentials where possible
  4. Comprehensive auditing of all access
  5. Defense in depth with multiple security layers

Choose the right secrets management solution based on your infrastructure and compliance requirements, and always follow the principle of least privilege.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.