Secrets Management Best Practices: HashiCorp Vault, AWS Secrets Manager, and Beyond
Secrets management is a critical security discipline that prevents credential exposure and enables secure application deployment. This guide covers enterprise-grade secrets management patterns and implementations.
Understanding Secrets Management
Types of Secrets
# secrets_taxonomy.yaml
secret_types:
authentication:
- api_keys
- oauth_tokens
- jwt_signing_keys
- service_account_credentials
infrastructure:
- database_passwords
- ssh_private_keys
- tls_certificates
- encryption_keys
application:
- third_party_api_keys
- webhook_secrets
- session_encryption_keys
- feature_flag_tokens
ci_cd:
- deployment_credentials
- registry_passwords
- signing_keys
- cloud_provider_credentialsSecret Lifecycle
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Creation │───>│ Storage │───>│Distribution │
└──────────────┘ └──────────────┘ └──────────────┘
│
v
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Revocation │<───│ Rotation │<───│ Usage │
└──────────────┘ └──────────────┘ └──────────────┘
HashiCorp Vault Implementation
Vault Architecture Setup
# vault_config.hcl
storage "raft" {
path = "/opt/vault/data"
node_id = "vault-1"
retry_join {
leader_api_addr = "https://vault-2.example.com:8200"
}
retry_join {
leader_api_addr = "https://vault-3.example.com:8200"
}
}
listener "tcp" {
address = "0.0.0.0:8200"
tls_cert_file = "/opt/vault/tls/vault.crt"
tls_key_file = "/opt/vault/tls/vault.key"
telemetry {
unauthenticated_metrics_access = false
}
}
seal "awskms" {
region = "us-east-1"
kms_key_id = "alias/vault-auto-unseal"
}
api_addr = "https://vault.example.com:8200"
cluster_addr = "https://vault-1.example.com:8201"
telemetry {
prometheus_retention_time = "30s"
disable_hostname = true
}Secrets Engines Configuration
# vault_setup.py
import hvac
from typing import Dict, List
class VaultSecretsManager:
def __init__(self, vault_addr: str, token: str):
self.client = hvac.Client(url=vault_addr, token=token)
def setup_kv_engine(self, path: str = "secret"):
"""Enable and configure KV secrets engine v2."""
# Enable KV v2 secrets engine
self.client.sys.enable_secrets_engine(
backend_type='kv',
path=path,
options={'version': '2'}
)
# Configure engine settings
self.client.secrets.kv.v2.configure(
mount_point=path,
max_versions=10,
cas_required=False,
delete_version_after='90d'
)
def setup_database_engine(self):
"""Configure dynamic database credentials."""
# Enable database secrets engine
self.client.sys.enable_secrets_engine(
backend_type='database',
path='database'
)
# Configure PostgreSQL connection
self.client.secrets.database.configure(
name='postgresql',
plugin_name='postgresql-database-plugin',
connection_url='postgresql://{{username}}:{{password}}@db.example.com:5432/mydb',
allowed_roles=['readonly', 'readwrite', 'admin'],
username='vault_admin',
password='initial_password'
)
# Create readonly role
self.client.secrets.database.create_role(
name='readonly',
db_name='postgresql',
creation_statements=[
"CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}';",
"GRANT SELECT ON ALL TABLES IN SCHEMA public TO \"{{name}}\";"
],
revocation_statements=[
"REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA public FROM \"{{name}}\";",
"DROP ROLE IF EXISTS \"{{name}}\";"
],
default_ttl='1h',
max_ttl='24h'
)
def setup_aws_engine(self):
"""Configure AWS secrets engine for dynamic IAM credentials."""
self.client.sys.enable_secrets_engine(
backend_type='aws',
path='aws'
)
# Configure AWS root credentials
self.client.secrets.aws.configure_root_iam_credentials(
access_key='AKIAIOSFODNN7EXAMPLE',
secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
region='us-east-1'
)
# Create assumed role
self.client.secrets.aws.create_or_update_role(
name='deploy-role',
credential_type='assumed_role',
role_arns=['arn:aws:iam::123456789012:role/VaultDeployRole'],
default_sts_ttl='3600',
max_sts_ttl='43200'
)
def setup_pki_engine(self):
"""Configure PKI secrets engine for certificate management."""
# Enable PKI engine
self.client.sys.enable_secrets_engine(
backend_type='pki',
path='pki',
config={'max_lease_ttl': '87600h'} # 10 years
)
# Generate root CA
root_cert = self.client.secrets.pki.generate_root(
type='internal',
common_name='Example Root CA',
ttl='87600h'
)
# Configure CA and CRL URLs
self.client.secrets.pki.set_urls(
issuing_certificates='https://vault.example.com:8200/v1/pki/ca',
crl_distribution_points='https://vault.example.com:8200/v1/pki/crl'
)
# Create role for issuing certificates
self.client.secrets.pki.create_or_update_role(
name='server-cert',
allowed_domains=['example.com'],
allow_subdomains=True,
max_ttl='720h', # 30 days
key_type='ec',
key_bits=256
)Vault Policies and Authentication
# policies/app-readonly.hcl
path "secret/data/app/*" {
capabilities = ["read"]
}
path "database/creds/readonly" {
capabilities = ["read"]
}
path "aws/creds/deploy-role" {
capabilities = ["read"]
}
# Deny access to sensitive paths
path "secret/data/admin/*" {
capabilities = ["deny"]
}
# policies/app-admin.hcl
path "secret/data/app/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
path "secret/metadata/app/*" {
capabilities = ["list", "read", "delete"]
}
path "database/creds/*" {
capabilities = ["read"]
}
# policies/operator.hcl
path "sys/health" {
capabilities = ["read", "sudo"]
}
path "sys/policies/acl/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
path "auth/*" {
capabilities = ["create", "read", "update", "delete", "list", "sudo"]
}# vault_auth.py
class VaultAuthManager:
def __init__(self, client: hvac.Client):
self.client = client
def setup_kubernetes_auth(self, k8s_host: str, k8s_ca_cert: str):
"""Configure Kubernetes authentication method."""
# Enable Kubernetes auth
self.client.sys.enable_auth_method(
method_type='kubernetes',
path='kubernetes'
)
# Configure Kubernetes auth
self.client.auth.kubernetes.configure(
kubernetes_host=k8s_host,
kubernetes_ca_cert=k8s_ca_cert
)
# Create role for application pods
self.client.auth.kubernetes.create_role(
name='app-role',
bound_service_account_names=['app-sa'],
bound_service_account_namespaces=['production'],
policies=['app-readonly'],
ttl='1h'
)
def setup_approle_auth(self):
"""Configure AppRole authentication for CI/CD."""
# Enable AppRole auth
self.client.sys.enable_auth_method(
method_type='approle',
path='approle'
)
# Create AppRole for CI/CD
self.client.auth.approle.create_or_update_approle(
role_name='cicd',
token_policies=['app-admin'],
token_ttl='10m',
token_max_ttl='30m',
secret_id_ttl='5m',
secret_id_num_uses=1, # One-time use
bind_secret_id=True
)
def setup_jwt_auth(self, oidc_discovery_url: str):
"""Configure JWT/OIDC authentication."""
# Enable JWT auth
self.client.sys.enable_auth_method(
method_type='jwt',
path='jwt'
)
# Configure OIDC provider
self.client.auth.jwt.configure(
oidc_discovery_url=oidc_discovery_url,
bound_issuer=oidc_discovery_url
)
# Create role for users
self.client.auth.jwt.create_role(
name='user',
bound_audiences=['vault'],
user_claim='email',
groups_claim='groups',
token_policies=['default'],
ttl='1h'
)AWS Secrets Manager Implementation
Secrets Manager Setup
# aws_secrets_manager.py
import boto3
import json
from datetime import datetime
from typing import Dict, Optional
class AWSSecretsManager:
def __init__(self, region: str = 'us-east-1'):
self.client = boto3.client('secretsmanager', region_name=region)
self.region = region
def create_secret(
self,
name: str,
value: Dict,
description: str = '',
kms_key_id: Optional[str] = None,
tags: Optional[Dict] = None
) -> Dict:
"""Create a new secret."""
params = {
'Name': name,
'Description': description,
'SecretString': json.dumps(value)
}
if kms_key_id:
params['KmsKeyId'] = kms_key_id
if tags:
params['Tags'] = [
{'Key': k, 'Value': v} for k, v in tags.items()
]
response = self.client.create_secret(**params)
return {
'arn': response['ARN'],
'name': response['Name'],
'version_id': response['VersionId']
}
def get_secret(self, secret_id: str, version: Optional[str] = None) -> Dict:
"""Retrieve secret value."""
params = {'SecretId': secret_id}
if version:
params['VersionId'] = version
response = self.client.get_secret_value(**params)
return {
'name': response['Name'],
'value': json.loads(response['SecretString']),
'version_id': response['VersionId'],
'created_date': response['CreatedDate']
}
def update_secret(self, secret_id: str, value: Dict) -> Dict:
"""Update secret value."""
response = self.client.update_secret(
SecretId=secret_id,
SecretString=json.dumps(value)
)
return {
'arn': response['ARN'],
'name': response['Name'],
'version_id': response['VersionId']
}
def setup_rotation(
self,
secret_id: str,
rotation_lambda_arn: str,
rotation_days: int = 30
):
"""Configure automatic rotation."""
self.client.rotate_secret(
SecretId=secret_id,
RotationLambdaARN=rotation_lambda_arn,
RotationRules={
'AutomaticallyAfterDays': rotation_days
}
)
def create_resource_policy(self, secret_id: str, principals: list) -> Dict:
"""Attach resource policy to secret."""
policy = {
'Version': '2012-10-17',
'Statement': [
{
'Sid': 'AllowCrossAccountAccess',
'Effect': 'Allow',
'Principal': {
'AWS': principals
},
'Action': [
'secretsmanager:GetSecretValue',
'secretsmanager:DescribeSecret'
],
'Resource': '*'
}
]
}
self.client.put_resource_policy(
SecretId=secret_id,
ResourcePolicy=json.dumps(policy)
)
return policyRotation Lambda Function
# rotation_lambda.py
import boto3
import json
import logging
from typing import Dict
logger = logging.getLogger()
logger.setLevel(logging.INFO)
secrets_client = boto3.client('secretsmanager')
def lambda_handler(event: Dict, context) -> None:
"""Handle secret rotation."""
arn = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
logger.info(f"Rotating secret {arn}, step: {step}")
if step == 'createSecret':
create_secret(arn, token)
elif step == 'setSecret':
set_secret(arn, token)
elif step == 'testSecret':
test_secret(arn, token)
elif step == 'finishSecret':
finish_secret(arn, token)
else:
raise ValueError(f"Unknown step: {step}")
def create_secret(arn: str, token: str) -> None:
"""Create new version of the secret."""
# Get current secret
current = secrets_client.get_secret_value(
SecretId=arn,
VersionStage='AWSCURRENT'
)
current_value = json.loads(current['SecretString'])
# Generate new password
import secrets
import string
new_password = ''.join(
secrets.choice(string.ascii_letters + string.digits + '!@#$%^&*()')
for _ in range(32)
)
# Create new secret value
new_value = current_value.copy()
new_value['password'] = new_password
# Put new secret version
secrets_client.put_secret_value(
SecretId=arn,
ClientRequestToken=token,
SecretString=json.dumps(new_value),
VersionStages=['AWSPENDING']
)
logger.info(f"Created new secret version for {arn}")
def set_secret(arn: str, token: str) -> None:
"""Set the secret in the target service."""
# Get pending secret
pending = secrets_client.get_secret_value(
SecretId=arn,
VersionId=token,
VersionStage='AWSPENDING'
)
pending_value = json.loads(pending['SecretString'])
# Update password in target service
# This is service-specific (database, API, etc.)
if 'engine' in pending_value and pending_value['engine'] == 'postgres':
update_postgres_password(pending_value)
elif 'service' in pending_value:
update_service_credential(pending_value)
logger.info(f"Set secret in target service for {arn}")
def test_secret(arn: str, token: str) -> None:
"""Test that the new secret works."""
pending = secrets_client.get_secret_value(
SecretId=arn,
VersionId=token,
VersionStage='AWSPENDING'
)
pending_value = json.loads(pending['SecretString'])
# Test connection with new credentials
if 'engine' in pending_value and pending_value['engine'] == 'postgres':
test_postgres_connection(pending_value)
elif 'service' in pending_value:
test_service_connection(pending_value)
logger.info(f"Tested new secret for {arn}")
def finish_secret(arn: str, token: str) -> None:
"""Finalize the rotation."""
# Get current version
metadata = secrets_client.describe_secret(SecretId=arn)
current_version = None
for version_id, stages in metadata['VersionIdsToStages'].items():
if 'AWSCURRENT' in stages:
current_version = version_id
break
# Move AWSCURRENT to new version
secrets_client.update_secret_version_stage(
SecretId=arn,
VersionStage='AWSCURRENT',
MoveToVersionId=token,
RemoveFromVersionId=current_version
)
logger.info(f"Finished rotation for {arn}")
def update_postgres_password(secret: Dict) -> None:
"""Update PostgreSQL password."""
import psycopg2
conn = psycopg2.connect(
host=secret['host'],
port=secret.get('port', 5432),
dbname=secret['dbname'],
user=secret['masteruser'],
password=secret['masterpassword']
)
with conn.cursor() as cur:
cur.execute(
f"ALTER USER {secret['username']} WITH PASSWORD %s",
(secret['password'],)
)
conn.commit()
conn.close()
def test_postgres_connection(secret: Dict) -> None:
"""Test PostgreSQL connection with new password."""
import psycopg2
conn = psycopg2.connect(
host=secret['host'],
port=secret.get('port', 5432),
dbname=secret['dbname'],
user=secret['username'],
password=secret['password']
)
with conn.cursor() as cur:
cur.execute('SELECT 1')
conn.close()Application Integration Patterns
Kubernetes External Secrets
# external-secrets.yaml
apiVersion: external-secrets.io/v1beta1
kind: ClusterSecretStore
metadata:
name: vault-backend
spec:
provider:
vault:
server: "https://vault.example.com:8200"
path: "secret"
version: "v2"
auth:
kubernetes:
mountPath: "kubernetes"
role: "app-role"
serviceAccountRef:
name: "external-secrets-sa"
namespace: "external-secrets"
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: app-secrets
namespace: production
spec:
refreshInterval: "15m"
secretStoreRef:
name: vault-backend
kind: ClusterSecretStore
target:
name: app-secrets
creationPolicy: Owner
data:
- secretKey: database-password
remoteRef:
key: app/database
property: password
- secretKey: api-key
remoteRef:
key: app/api
property: key
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: aws-secrets
namespace: production
spec:
refreshInterval: "1h"
secretStoreRef:
name: aws-secrets-manager
kind: ClusterSecretStore
target:
name: aws-credentials
creationPolicy: Owner
dataFrom:
- extract:
key: production/app/credentialsSDK Integration
# secrets_client.py
import os
from functools import lru_cache
from typing import Optional, Dict
import hvac
import boto3
class SecretsClient:
"""Unified secrets client supporting multiple backends."""
def __init__(self, backend: str = 'auto'):
self.backend = self._detect_backend() if backend == 'auto' else backend
self._client = self._init_client()
def _detect_backend(self) -> str:
"""Auto-detect secrets backend based on environment."""
if os.getenv('VAULT_ADDR'):
return 'vault'
elif os.getenv('AWS_REGION'):
return 'aws'
else:
return 'env'
def _init_client(self):
"""Initialize backend-specific client."""
if self.backend == 'vault':
return hvac.Client(
url=os.getenv('VAULT_ADDR'),
token=os.getenv('VAULT_TOKEN')
)
elif self.backend == 'aws':
return boto3.client('secretsmanager')
else:
return None
@lru_cache(maxsize=100)
def get_secret(self, path: str, key: Optional[str] = None) -> str:
"""Get secret value with caching."""
if self.backend == 'vault':
response = self._client.secrets.kv.v2.read_secret_version(
path=path
)
data = response['data']['data']
return data.get(key) if key else data
elif self.backend == 'aws':
response = self._client.get_secret_value(SecretId=path)
import json
data = json.loads(response['SecretString'])
return data.get(key) if key else data
else:
# Environment variable fallback
env_key = path.replace('/', '_').upper()
if key:
env_key = f"{env_key}_{key.upper()}"
return os.getenv(env_key)
def get_database_credentials(self) -> Dict[str, str]:
"""Get dynamic database credentials."""
if self.backend == 'vault':
response = self._client.secrets.database.generate_credentials(
name='readonly'
)
return {
'username': response['data']['username'],
'password': response['data']['password'],
'ttl': response['lease_duration']
}
else:
return {
'username': self.get_secret('database', 'username'),
'password': self.get_secret('database', 'password')
}
def invalidate_cache(self):
"""Clear cached secrets."""
self.get_secret.cache_clear()
# Usage example
secrets = SecretsClient()
# Get static secret
api_key = secrets.get_secret('app/api', 'key')
# Get dynamic database credentials
db_creds = secrets.get_database_credentials()Security Best Practices
Secret Hygiene
# secret_scanner.py
import re
from typing import List, Dict, Tuple
from pathlib import Path
class SecretScanner:
"""Scan code for exposed secrets."""
PATTERNS = [
# API Keys
(r'(?i)(api[_-]?key|apikey)\s*[=:]\s*["\']([a-zA-Z0-9_-]{20,})["\']', 'API Key'),
# AWS
(r'AKIA[0-9A-Z]{16}', 'AWS Access Key'),
(r'(?i)aws[_-]?secret[_-]?access[_-]?key\s*[=:]\s*["\']([a-zA-Z0-9/+=]{40})["\']', 'AWS Secret Key'),
# Passwords
(r'(?i)(password|passwd|pwd)\s*[=:]\s*["\']([^"\']{8,})["\']', 'Password'),
# Private Keys
(r'-----BEGIN (RSA |EC |DSA )?PRIVATE KEY-----', 'Private Key'),
# JWT
(r'eyJ[a-zA-Z0-9_-]*\.eyJ[a-zA-Z0-9_-]*\.[a-zA-Z0-9_-]*', 'JWT Token'),
# Generic Secrets
(r'(?i)(secret|token)\s*[=:]\s*["\']([a-zA-Z0-9_-]{20,})["\']', 'Generic Secret'),
]
EXCLUDE_PATTERNS = [
r'\.git/',
r'node_modules/',
r'\.env\.example',
r'__pycache__/',
]
def __init__(self, root_path: str):
self.root_path = Path(root_path)
self.findings: List[Dict] = []
def scan(self) -> List[Dict]:
"""Scan directory for secrets."""
for file_path in self.root_path.rglob('*'):
if file_path.is_file() and not self._should_exclude(file_path):
self._scan_file(file_path)
return self.findings
def _should_exclude(self, path: Path) -> bool:
"""Check if path should be excluded."""
path_str = str(path)
return any(
re.search(pattern, path_str)
for pattern in self.EXCLUDE_PATTERNS
)
def _scan_file(self, file_path: Path):
"""Scan single file for secrets."""
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
except Exception:
return
for line_num, line in enumerate(content.split('\n'), 1):
for pattern, secret_type in self.PATTERNS:
if re.search(pattern, line):
self.findings.append({
'file': str(file_path.relative_to(self.root_path)),
'line': line_num,
'type': secret_type,
'preview': self._redact_preview(line)
})
def _redact_preview(self, line: str, max_length: int = 80) -> str:
"""Create redacted preview of line."""
# Replace potential secrets with asterisks
redacted = re.sub(
r'(["\'])[a-zA-Z0-9_/+=.-]{10,}(["\'])',
r'\1****REDACTED****\2',
line
)
return redacted[:max_length] + ('...' if len(redacted) > max_length else '')
# Pre-commit hook
if __name__ == '__main__':
import sys
scanner = SecretScanner('.')
findings = scanner.scan()
if findings:
print("⚠️ Potential secrets detected:")
for finding in findings:
print(f" {finding['file']}:{finding['line']} - {finding['type']}")
print(f" {finding['preview']}")
sys.exit(1)
else:
print("✅ No secrets detected")
sys.exit(0)Zero-Trust Secret Distribution
# zero_trust_secrets.yaml
architecture:
principles:
- never_trust_network
- verify_explicitly
- least_privilege
- assume_breach
implementation:
authentication:
method: mutual_tls
certificate_rotation: automatic
certificate_ttl: 24h
authorization:
model: attribute_based_access_control
context:
- identity
- device_posture
- location
- time
- resource_sensitivity
encryption:
in_transit: tls_1_3
at_rest: aes_256_gcm
key_management: hsm_backed
audit:
log_all_access: true
retention: 90_days
alerting: real_timeConclusion
Effective secrets management requires:
- Centralized storage with proper access controls
- Automated rotation to limit exposure windows
- Dynamic credentials where possible
- Comprehensive auditing of all access
- Defense in depth with multiple security layers
Choose the right secrets management solution based on your infrastructure and compliance requirements, and always follow the principle of least privilege.
Related Resources
- MLOps Security: Securing Your ML Pipeline — Secrets management in ML training and serving
- ML CI/CD: Continuous Integration and Deployment for Machine Learning — Secure credential injection in ML pipelines
- DevSecOps Pipeline Security — Integrating security into CI/CD
- CI/CD Security for GitHub Actions — OIDC and secrets in GitHub workflows