Proper secrets management is fundamental to secure software delivery. This guide covers best practices for handling credentials, API keys, certificates, and other sensitive data across development, CI/CD, and production environments.
Secrets Management Architecture
Design a centralized secrets management system:
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from enum import Enum
import hashlib
import json
import os
class SecretType(Enum):
API_KEY = "api_key"
DATABASE_CREDENTIAL = "database_credential"
SSH_KEY = "ssh_key"
TLS_CERTIFICATE = "tls_certificate"
ENCRYPTION_KEY = "encryption_key"
SERVICE_ACCOUNT = "service_account"
@dataclass
class Secret:
name: str
secret_type: SecretType
value: str
version: int
created_at: datetime
expires_at: Optional[datetime]
metadata: Dict[str, Any]
@dataclass
class SecretAccessLog:
timestamp: datetime
secret_name: str
accessor: str
action: str
source_ip: str
success: bool
class SecretsManager:
def __init__(self, backend):
self.backend = backend
self.access_logs: List[SecretAccessLog] = []
def get_secret(
self,
path: str,
accessor: str,
source_ip: str,
version: Optional[int] = None
) -> Optional[Secret]:
"""Get a secret with full audit logging."""
try:
secret = self.backend.get_secret(path, version)
self._log_access(
secret_name=path,
accessor=accessor,
action='read',
source_ip=source_ip,
success=True
)
if secret.expires_at and secret.expires_at < datetime.utcnow():
return None
return secret
except Exception as e:
self._log_access(
secret_name=path,
accessor=accessor,
action='read',
source_ip=source_ip,
success=False
)
raise
def set_secret(
self,
path: str,
value: str,
secret_type: SecretType,
accessor: str,
source_ip: str,
ttl_hours: Optional[int] = None
) -> Secret:
"""Set a secret with validation and audit logging."""
self._validate_secret(value, secret_type)
expires_at = None
if ttl_hours:
expires_at = datetime.utcnow() + timedelta(hours=ttl_hours)
metadata = {
'type': secret_type.value,
'created_by': accessor,
'expires_at': expires_at.isoformat() if expires_at else None
}
secret = self.backend.set_secret(path, value, metadata)
self._log_access(
secret_name=path,
accessor=accessor,
action='write',
source_ip=source_ip,
success=True
)
return secret
def _validate_secret(self, value: str, secret_type: SecretType):
"""Validate secret value based on type."""
if secret_type == SecretType.API_KEY:
if len(value) < 20:
raise ValueError("API key must be at least 20 characters")
elif secret_type == SecretType.DATABASE_CREDENTIAL:
cred = json.loads(value)
if 'username' not in cred or 'password' not in cred:
raise ValueError("Database credential must have username and password")
elif secret_type == SecretType.SSH_KEY:
if not value.startswith('-----BEGIN'):
raise ValueError("Invalid SSH key format")
def _log_access(
self,
secret_name: str,
accessor: str,
action: str,
source_ip: str,
success: bool
):
"""Log secret access for audit purposes."""
log = SecretAccessLog(
timestamp=datetime.utcnow(),
secret_name=secret_name,
accessor=accessor,
action=action,
source_ip=source_ip,
success=success
)
self.access_logs.append(log)
print(f"AUDIT: {json.dumps({
'timestamp': log.timestamp.isoformat(),
'secret': log.secret_name,
'accessor': log.accessor,
'action': log.action,
'success': log.success
})}")HashiCorp Vault Integration
Implement Vault as the secrets backend:
import hvac
from typing import Optional, Dict
class VaultBackend:
def __init__(self, vault_addr: str, auth_method: str = 'token', **kwargs):
self.client = hvac.Client(url=vault_addr)
self._authenticate(auth_method, **kwargs)
self.mount_point = 'secret'
def _authenticate(self, method: str, **kwargs):
"""Authenticate to Vault."""
if method == 'token':
self.client.token = kwargs.get('token')
elif method == 'approle':
response = self.client.auth.approle.login(
role_id=kwargs['role_id'],
secret_id=kwargs['secret_id']
)
self.client.token = response['auth']['client_token']
elif method == 'kubernetes':
with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as f:
jwt = f.read()
response = self.client.auth.kubernetes.login(
role=kwargs['role'],
jwt=jwt
)
self.client.token = response['auth']['client_token']
if not self.client.is_authenticated():
raise Exception("Vault authentication failed")
def get_secret(self, path: str, version: Optional[int] = None) -> Secret:
"""Retrieve secret from Vault KV v2."""
if version:
response = self.client.secrets.kv.v2.read_secret_version(
path=path,
version=version,
mount_point=self.mount_point
)
else:
response = self.client.secrets.kv.v2.read_secret_version(
path=path,
mount_point=self.mount_point
)
data = response['data']
metadata = data['metadata']
return Secret(
name=path,
secret_type=SecretType(data['data'].get('type', 'api_key')),
value=data['data'].get('value', ''),
version=metadata['version'],
created_at=datetime.fromisoformat(metadata['created_time'].replace('Z', '+00:00')),
expires_at=None,
metadata=data['data']
)
def set_secret(self, path: str, value: str, metadata: Dict = None) -> Secret:
"""Write secret to Vault KV v2."""
secret_data = {'value': value, **(metadata or {})}
self.client.secrets.kv.v2.create_or_update_secret(
path=path,
secret=secret_data,
mount_point=self.mount_point
)
return self.get_secret(path)
def delete_secret(self, path: str) -> bool:
"""Delete secret."""
self.client.secrets.kv.v2.delete_metadata_and_all_versions(
path=path,
mount_point=self.mount_point
)
return True
def list_secrets(self, path: str) -> List[str]:
"""List secrets at path."""
response = self.client.secrets.kv.v2.list_secrets(
path=path,
mount_point=self.mount_point
)
return response['data']['keys']CI/CD Secrets Injection
Securely inject secrets into CI/CD pipelines:
from dataclasses import dataclass
from typing import Optional, List, Dict
import subprocess
import os
@dataclass
class PipelineSecretConfig:
secret_path: str
env_var_name: str
file_path: Optional[str] = None
required: bool = True
class CICDSecretsInjector:
def __init__(self, secrets_manager: SecretsManager):
self.secrets_manager = secrets_manager
self.injected_secrets: List[str] = []
self.temp_files: List[str] = []
def inject_secrets(
self,
configs: List[PipelineSecretConfig],
accessor: str,
source_ip: str
) -> Dict[str, str]:
"""Inject secrets as environment variables."""
env_vars = {}
for config in configs:
secret = self.secrets_manager.get_secret(
path=config.secret_path,
accessor=accessor,
source_ip=source_ip
)
if secret is None:
if config.required:
raise ValueError(f"Required secret not found: {config.secret_path}")
continue
env_vars[config.env_var_name] = secret.value
self.injected_secrets.append(config.env_var_name)
if config.file_path:
self._write_secret_file(config.file_path, secret.value)
return env_vars
def _write_secret_file(self, path: str, content: str):
"""Write secret to file with restricted permissions."""
os.makedirs(os.path.dirname(path), exist_ok=True)
fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
try:
os.write(fd, content.encode())
finally:
os.close(fd)
self.temp_files.append(path)
def run_with_secrets(
self,
command: List[str],
configs: List[PipelineSecretConfig],
accessor: str,
source_ip: str,
mask_output: bool = True
) -> subprocess.CompletedProcess:
"""Run command with injected secrets."""
secret_env = self.inject_secrets(configs, accessor, source_ip)
env = os.environ.copy()
env.update(secret_env)
try:
result = subprocess.run(command, env=env, capture_output=True, text=True)
if mask_output:
result.stdout = self._mask_secrets(result.stdout, secret_env)
result.stderr = self._mask_secrets(result.stderr, secret_env)
return result
finally:
self.cleanup()
def _mask_secrets(self, text: str, secrets: Dict[str, str]) -> str:
"""Mask secret values in output."""
for name, value in secrets.items():
if value and len(value) > 4:
text = text.replace(value, f"***{name}***")
return text
def cleanup(self):
"""Clean up injected secrets and temporary files."""
for var_name in self.injected_secrets:
if var_name in os.environ:
del os.environ[var_name]
for file_path in self.temp_files:
if os.path.exists(file_path):
with open(file_path, 'wb') as f:
f.write(b'\x00' * os.path.getsize(file_path))
os.remove(file_path)
self.injected_secrets.clear()
self.temp_files.clear()Secrets Rotation Automation
Automate secrets rotation:
from abc import ABC, abstractmethod
from typing import Dict
import psycopg2
import secrets as py_secrets
class RotationHandler(ABC):
@abstractmethod
def rotate(self, current_secret: Secret) -> str:
pass
@abstractmethod
def verify(self, new_value: str) -> bool:
pass
class DatabasePasswordRotationHandler(RotationHandler):
def __init__(self, config: Dict[str, str]):
self.config = config
def rotate(self, current_secret: Secret) -> str:
"""Rotate database password."""
cred = json.loads(current_secret.value)
new_password = py_secrets.token_urlsafe(24)
conn = psycopg2.connect(
host=self.config['host'],
database=self.config['database'],
user=self.config['admin_user'],
password=self.config['admin_password']
)
try:
with conn.cursor() as cur:
cur.execute(
f"ALTER USER {cred['username']} WITH PASSWORD %s",
(new_password,)
)
conn.commit()
finally:
conn.close()
cred['password'] = new_password
return json.dumps(cred)
def verify(self, new_value: str) -> bool:
"""Verify new password works."""
cred = json.loads(new_value)
try:
conn = psycopg2.connect(
host=self.config['host'],
database=self.config['database'],
user=cred['username'],
password=cred['password']
)
conn.close()
return True
except Exception:
return False
class SecretsRotationOrchestrator:
def __init__(self, secrets_manager: SecretsManager):
self.secrets_manager = secrets_manager
self.handlers: Dict[str, RotationHandler] = {}
def register_handler(self, secret_path: str, handler: RotationHandler):
self.handlers[secret_path] = handler
def rotate_secret(self, secret_path: str, accessor: str) -> bool:
"""Perform secret rotation with verification."""
handler = self.handlers.get(secret_path)
if not handler:
raise ValueError(f"No handler registered for {secret_path}")
current = self.secrets_manager.get_secret(
path=secret_path,
accessor=accessor,
source_ip='rotation-orchestrator'
)
previous_value = current.value
try:
new_value = handler.rotate(current)
if not handler.verify(new_value):
raise Exception("Verification failed")
self.secrets_manager.set_secret(
path=secret_path,
value=new_value,
secret_type=current.secret_type,
accessor=accessor,
source_ip='rotation-orchestrator'
)
return True
except Exception as e:
print(f"Rotation failed: {e}")
return FalseConclusion
Effective secrets management requires centralized storage, strict access controls, comprehensive audit logging, and automated rotation. Implement HashiCorp Vault or similar solutions as your source of truth, integrate with CI/CD pipelines for secure injection, and establish rotation policies. Regular audits of secret access patterns help maintain security posture.
Related Resources
- MLOps Security: Securing Your ML Pipeline — Secrets management in ML infrastructure
- ML CI/CD: Continuous Integration and Deployment for Machine Learning — Secure ML deployment pipelines
- DevSecOps Pipeline Security — Full pipeline security integration
- CI/CD Security for GitHub Actions — Protecting GitHub Actions workflows