Bune Practici pentru Managementul Secretelor: HashiCorp Vault, AWS Secrets Manager si nu numai
Managementul secretelor este o disciplina critica de securitate care previne expunerea credentialelor si permite deploy-ul securizat al aplicatiilor. Acest ghid acopera pattern-uri si implementari de nivel enterprise pentru managementul secretelor.
Intelegerea Managementului Secretelor
Tipuri de Secrete
# secrets_taxonomy.yaml
secret_types:
authentication:
- api_keys
- oauth_tokens
- jwt_signing_keys
- service_account_credentials
infrastructure:
- database_passwords
- ssh_private_keys
- tls_certificates
- encryption_keys
application:
- third_party_api_keys
- webhook_secrets
- session_encryption_keys
- feature_flag_tokens
ci_cd:
- deployment_credentials
- registry_passwords
- signing_keys
- cloud_provider_credentialsCiclul de Viata al Secretelor
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Creare │───>│ Stocare │───>│ Distributie │
└──────────────┘ └──────────────┘ └──────────────┘
│
v
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Revocare │<───│ Rotatie │<───│ Utilizare │
└──────────────┘ └──────────────┘ └──────────────┘
Implementarea HashiCorp Vault
Configurarea Arhitecturii Vault
# vault_config.hcl
storage "raft" {
path = "/opt/vault/data"
node_id = "vault-1"
retry_join {
leader_api_addr = "https://vault-2.example.com:8200"
}
retry_join {
leader_api_addr = "https://vault-3.example.com:8200"
}
}
listener "tcp" {
address = "0.0.0.0:8200"
tls_cert_file = "/opt/vault/tls/vault.crt"
tls_key_file = "/opt/vault/tls/vault.key"
telemetry {
unauthenticated_metrics_access = false
}
}
seal "awskms" {
region = "us-east-1"
kms_key_id = "alias/vault-auto-unseal"
}
api_addr = "https://vault.example.com:8200"
cluster_addr = "https://vault-1.example.com:8201"
telemetry {
prometheus_retention_time = "30s"
disable_hostname = true
}Configurarea Secrets Engines
# vault_setup.py
import hvac
from typing import Dict, List
class VaultSecretsManager:
def __init__(self, vault_addr: str, token: str):
self.client = hvac.Client(url=vault_addr, token=token)
def setup_kv_engine(self, path: str = "secret"):
"""Activeaza si configureaza KV secrets engine v2."""
# Activeaza KV v2 secrets engine
self.client.sys.enable_secrets_engine(
backend_type='kv',
path=path,
options={'version': '2'}
)
# Configureaza setarile engine-ului
self.client.secrets.kv.v2.configure(
mount_point=path,
max_versions=10,
cas_required=False,
delete_version_after='90d'
)
def setup_database_engine(self):
"""Configureaza credentiale dinamice pentru baza de date."""
# Activeaza database secrets engine
self.client.sys.enable_secrets_engine(
backend_type='database',
path='database'
)
# Configureaza conexiunea PostgreSQL
self.client.secrets.database.configure(
name='postgresql',
plugin_name='postgresql-database-plugin',
connection_url='postgresql://{{username}}:{{password}}@db.example.com:5432/mydb',
allowed_roles=['readonly', 'readwrite', 'admin'],
username='vault_admin',
password='initial_password'
)
# Creeaza rolul readonly
self.client.secrets.database.create_role(
name='readonly',
db_name='postgresql',
creation_statements=[
"CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}';",
"GRANT SELECT ON ALL TABLES IN SCHEMA public TO \"{{name}}\";"
],
revocation_statements=[
"REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA public FROM \"{{name}}\";",
"DROP ROLE IF EXISTS \"{{name}}\";"
],
default_ttl='1h',
max_ttl='24h'
)
def setup_aws_engine(self):
"""Configureaza AWS secrets engine pentru credentiale IAM dinamice."""
self.client.sys.enable_secrets_engine(
backend_type='aws',
path='aws'
)
# Configureaza credentialele root AWS
self.client.secrets.aws.configure_root_iam_credentials(
access_key='AKIAIOSFODNN7EXAMPLE',
secret_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
region='us-east-1'
)
# Creeaza rolul assumed
self.client.secrets.aws.create_or_update_role(
name='deploy-role',
credential_type='assumed_role',
role_arns=['arn:aws:iam::123456789012:role/VaultDeployRole'],
default_sts_ttl='3600',
max_sts_ttl='43200'
)
def setup_pki_engine(self):
"""Configureaza PKI secrets engine pentru managementul certificatelor."""
# Activeaza PKI engine
self.client.sys.enable_secrets_engine(
backend_type='pki',
path='pki',
config={'max_lease_ttl': '87600h'} # 10 ani
)
# Genereaza Root CA
root_cert = self.client.secrets.pki.generate_root(
type='internal',
common_name='Example Root CA',
ttl='87600h'
)
# Configureaza URL-urile CA si CRL
self.client.secrets.pki.set_urls(
issuing_certificates='https://vault.example.com:8200/v1/pki/ca',
crl_distribution_points='https://vault.example.com:8200/v1/pki/crl'
)
# Creeaza rol pentru emiterea certificatelor
self.client.secrets.pki.create_or_update_role(
name='server-cert',
allowed_domains=['example.com'],
allow_subdomains=True,
max_ttl='720h', # 30 zile
key_type='ec',
key_bits=256
)Politici si Autentificare Vault
# policies/app-readonly.hcl
path "secret/data/app/*" {
capabilities = ["read"]
}
path "database/creds/readonly" {
capabilities = ["read"]
}
path "aws/creds/deploy-role" {
capabilities = ["read"]
}
# Interzice accesul la cai sensibile
path "secret/data/admin/*" {
capabilities = ["deny"]
}
# policies/app-admin.hcl
path "secret/data/app/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
path "secret/metadata/app/*" {
capabilities = ["list", "read", "delete"]
}
path "database/creds/*" {
capabilities = ["read"]
}
# policies/operator.hcl
path "sys/health" {
capabilities = ["read", "sudo"]
}
path "sys/policies/acl/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
path "auth/*" {
capabilities = ["create", "read", "update", "delete", "list", "sudo"]
}# vault_auth.py
class VaultAuthManager:
def __init__(self, client: hvac.Client):
self.client = client
def setup_kubernetes_auth(self, k8s_host: str, k8s_ca_cert: str):
"""Configureaza metoda de autentificare Kubernetes."""
# Activeaza autentificarea Kubernetes
self.client.sys.enable_auth_method(
method_type='kubernetes',
path='kubernetes'
)
# Configureaza autentificarea Kubernetes
self.client.auth.kubernetes.configure(
kubernetes_host=k8s_host,
kubernetes_ca_cert=k8s_ca_cert
)
# Creeaza rol pentru pod-urile aplicatiei
self.client.auth.kubernetes.create_role(
name='app-role',
bound_service_account_names=['app-sa'],
bound_service_account_namespaces=['production'],
policies=['app-readonly'],
ttl='1h'
)
def setup_approle_auth(self):
"""Configureaza autentificarea AppRole pentru CI/CD."""
# Activeaza autentificarea AppRole
self.client.sys.enable_auth_method(
method_type='approle',
path='approle'
)
# Creeaza AppRole pentru CI/CD
self.client.auth.approle.create_or_update_approle(
role_name='cicd',
token_policies=['app-admin'],
token_ttl='10m',
token_max_ttl='30m',
secret_id_ttl='5m',
secret_id_num_uses=1, # Utilizare unica
bind_secret_id=True
)
def setup_jwt_auth(self, oidc_discovery_url: str):
"""Configureaza autentificarea JWT/OIDC."""
# Activeaza autentificarea JWT
self.client.sys.enable_auth_method(
method_type='jwt',
path='jwt'
)
# Configureaza provider-ul OIDC
self.client.auth.jwt.configure(
oidc_discovery_url=oidc_discovery_url,
bound_issuer=oidc_discovery_url
)
# Creeaza rol pentru utilizatori
self.client.auth.jwt.create_role(
name='user',
bound_audiences=['vault'],
user_claim='email',
groups_claim='groups',
token_policies=['default'],
ttl='1h'
)Implementarea AWS Secrets Manager
Configurarea Secrets Manager
# aws_secrets_manager.py
import boto3
import json
from datetime import datetime
from typing import Dict, Optional
class AWSSecretsManager:
def __init__(self, region: str = 'us-east-1'):
self.client = boto3.client('secretsmanager', region_name=region)
self.region = region
def create_secret(
self,
name: str,
value: Dict,
description: str = '',
kms_key_id: Optional[str] = None,
tags: Optional[Dict] = None
) -> Dict:
"""Creeaza un secret nou."""
params = {
'Name': name,
'Description': description,
'SecretString': json.dumps(value)
}
if kms_key_id:
params['KmsKeyId'] = kms_key_id
if tags:
params['Tags'] = [
{'Key': k, 'Value': v} for k, v in tags.items()
]
response = self.client.create_secret(**params)
return {
'arn': response['ARN'],
'name': response['Name'],
'version_id': response['VersionId']
}
def get_secret(self, secret_id: str, version: Optional[str] = None) -> Dict:
"""Obtine valoarea secretului."""
params = {'SecretId': secret_id}
if version:
params['VersionId'] = version
response = self.client.get_secret_value(**params)
return {
'name': response['Name'],
'value': json.loads(response['SecretString']),
'version_id': response['VersionId'],
'created_date': response['CreatedDate']
}
def update_secret(self, secret_id: str, value: Dict) -> Dict:
"""Actualizeaza valoarea secretului."""
response = self.client.update_secret(
SecretId=secret_id,
SecretString=json.dumps(value)
)
return {
'arn': response['ARN'],
'name': response['Name'],
'version_id': response['VersionId']
}
def setup_rotation(
self,
secret_id: str,
rotation_lambda_arn: str,
rotation_days: int = 30
):
"""Configureaza rotatia automata."""
self.client.rotate_secret(
SecretId=secret_id,
RotationLambdaARN=rotation_lambda_arn,
RotationRules={
'AutomaticallyAfterDays': rotation_days
}
)
def create_resource_policy(self, secret_id: str, principals: list) -> Dict:
"""Ataseaza o politica de resurse secretului."""
policy = {
'Version': '2012-10-17',
'Statement': [
{
'Sid': 'AllowCrossAccountAccess',
'Effect': 'Allow',
'Principal': {
'AWS': principals
},
'Action': [
'secretsmanager:GetSecretValue',
'secretsmanager:DescribeSecret'
],
'Resource': '*'
}
]
}
self.client.put_resource_policy(
SecretId=secret_id,
ResourcePolicy=json.dumps(policy)
)
return policyFunctia Lambda de Rotatie
# rotation_lambda.py
import boto3
import json
import logging
from typing import Dict
logger = logging.getLogger()
logger.setLevel(logging.INFO)
secrets_client = boto3.client('secretsmanager')
def lambda_handler(event: Dict, context) -> None:
"""Gestioneaza rotatia secretelor."""
arn = event['SecretId']
token = event['ClientRequestToken']
step = event['Step']
logger.info(f"Rotare secret {arn}, pas: {step}")
if step == 'createSecret':
create_secret(arn, token)
elif step == 'setSecret':
set_secret(arn, token)
elif step == 'testSecret':
test_secret(arn, token)
elif step == 'finishSecret':
finish_secret(arn, token)
else:
raise ValueError(f"Pas necunoscut: {step}")
def create_secret(arn: str, token: str) -> None:
"""Creeaza o noua versiune a secretului."""
# Obtine secretul curent
current = secrets_client.get_secret_value(
SecretId=arn,
VersionStage='AWSCURRENT'
)
current_value = json.loads(current['SecretString'])
# Genereaza o parola noua
import secrets
import string
new_password = ''.join(
secrets.choice(string.ascii_letters + string.digits + '!@#$%^&*()')
for _ in range(32)
)
# Creeaza noua valoare a secretului
new_value = current_value.copy()
new_value['password'] = new_password
# Salveaza noua versiune
secrets_client.put_secret_value(
SecretId=arn,
ClientRequestToken=token,
SecretString=json.dumps(new_value),
VersionStages=['AWSPENDING']
)
logger.info(f"Versiune noua creata pentru {arn}")
def set_secret(arn: str, token: str) -> None:
"""Seteaza secretul in serviciul tinta."""
# Obtine secretul in asteptare
pending = secrets_client.get_secret_value(
SecretId=arn,
VersionId=token,
VersionStage='AWSPENDING'
)
pending_value = json.loads(pending['SecretString'])
# Actualizeaza parola in serviciul tinta
# Aceasta este specifica serviciului (baza de date, API etc.)
if 'engine' in pending_value and pending_value['engine'] == 'postgres':
update_postgres_password(pending_value)
elif 'service' in pending_value:
update_service_credential(pending_value)
logger.info(f"Secret setat in serviciul tinta pentru {arn}")
def test_secret(arn: str, token: str) -> None:
"""Testeaza ca noul secret functioneaza."""
pending = secrets_client.get_secret_value(
SecretId=arn,
VersionId=token,
VersionStage='AWSPENDING'
)
pending_value = json.loads(pending['SecretString'])
# Testeaza conexiunea cu noile credentiale
if 'engine' in pending_value and pending_value['engine'] == 'postgres':
test_postgres_connection(pending_value)
elif 'service' in pending_value:
test_service_connection(pending_value)
logger.info(f"Noul secret testat pentru {arn}")
def finish_secret(arn: str, token: str) -> None:
"""Finalizeaza rotatia."""
# Obtine versiunea curenta
metadata = secrets_client.describe_secret(SecretId=arn)
current_version = None
for version_id, stages in metadata['VersionIdsToStages'].items():
if 'AWSCURRENT' in stages:
current_version = version_id
break
# Muta AWSCURRENT la noua versiune
secrets_client.update_secret_version_stage(
SecretId=arn,
VersionStage='AWSCURRENT',
MoveToVersionId=token,
RemoveFromVersionId=current_version
)
logger.info(f"Rotatie finalizata pentru {arn}")
def update_postgres_password(secret: Dict) -> None:
"""Actualizeaza parola PostgreSQL."""
import psycopg2
conn = psycopg2.connect(
host=secret['host'],
port=secret.get('port', 5432),
dbname=secret['dbname'],
user=secret['masteruser'],
password=secret['masterpassword']
)
with conn.cursor() as cur:
cur.execute(
f"ALTER USER {secret['username']} WITH PASSWORD %s",
(secret['password'],)
)
conn.commit()
conn.close()
def test_postgres_connection(secret: Dict) -> None:
"""Testeaza conexiunea PostgreSQL cu noua parola."""
import psycopg2
conn = psycopg2.connect(
host=secret['host'],
port=secret.get('port', 5432),
dbname=secret['dbname'],
user=secret['username'],
password=secret['password']
)
with conn.cursor() as cur:
cur.execute('SELECT 1')
conn.close()Pattern-uri de Integrare in Aplicatii
Kubernetes External Secrets
# external-secrets.yaml
apiVersion: external-secrets.io/v1beta1
kind: ClusterSecretStore
metadata:
name: vault-backend
spec:
provider:
vault:
server: "https://vault.example.com:8200"
path: "secret"
version: "v2"
auth:
kubernetes:
mountPath: "kubernetes"
role: "app-role"
serviceAccountRef:
name: "external-secrets-sa"
namespace: "external-secrets"
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: app-secrets
namespace: production
spec:
refreshInterval: "15m"
secretStoreRef:
name: vault-backend
kind: ClusterSecretStore
target:
name: app-secrets
creationPolicy: Owner
data:
- secretKey: database-password
remoteRef:
key: app/database
property: password
- secretKey: api-key
remoteRef:
key: app/api
property: key
---
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: aws-secrets
namespace: production
spec:
refreshInterval: "1h"
secretStoreRef:
name: aws-secrets-manager
kind: ClusterSecretStore
target:
name: aws-credentials
creationPolicy: Owner
dataFrom:
- extract:
key: production/app/credentialsIntegrare SDK
# secrets_client.py
import os
from functools import lru_cache
from typing import Optional, Dict
import hvac
import boto3
class SecretsClient:
"""Client unificat pentru secrete cu suport pentru mai multe backend-uri."""
def __init__(self, backend: str = 'auto'):
self.backend = self._detect_backend() if backend == 'auto' else backend
self._client = self._init_client()
def _detect_backend(self) -> str:
"""Detecteaza automat backend-ul de secrete pe baza mediului."""
if os.getenv('VAULT_ADDR'):
return 'vault'
elif os.getenv('AWS_REGION'):
return 'aws'
else:
return 'env'
def _init_client(self):
"""Initializeaza clientul specific backend-ului."""
if self.backend == 'vault':
return hvac.Client(
url=os.getenv('VAULT_ADDR'),
token=os.getenv('VAULT_TOKEN')
)
elif self.backend == 'aws':
return boto3.client('secretsmanager')
else:
return None
@lru_cache(maxsize=100)
def get_secret(self, path: str, key: Optional[str] = None) -> str:
"""Obtine valoarea secretului cu cache."""
if self.backend == 'vault':
response = self._client.secrets.kv.v2.read_secret_version(
path=path
)
data = response['data']['data']
return data.get(key) if key else data
elif self.backend == 'aws':
response = self._client.get_secret_value(SecretId=path)
import json
data = json.loads(response['SecretString'])
return data.get(key) if key else data
else:
# Fallback pe variabile de mediu
env_key = path.replace('/', '_').upper()
if key:
env_key = f"{env_key}_{key.upper()}"
return os.getenv(env_key)
def get_database_credentials(self) -> Dict[str, str]:
"""Obtine credentiale dinamice pentru baza de date."""
if self.backend == 'vault':
response = self._client.secrets.database.generate_credentials(
name='readonly'
)
return {
'username': response['data']['username'],
'password': response['data']['password'],
'ttl': response['lease_duration']
}
else:
return {
'username': self.get_secret('database', 'username'),
'password': self.get_secret('database', 'password')
}
def invalidate_cache(self):
"""Goleste cache-ul de secrete."""
self.get_secret.cache_clear()
# Exemplu de utilizare
secrets = SecretsClient()
# Obtine secret static
api_key = secrets.get_secret('app/api', 'key')
# Obtine credentiale dinamice pentru baza de date
db_creds = secrets.get_database_credentials()Bune Practici de Securitate
Igiena Secretelor
# secret_scanner.py
import re
from typing import List, Dict, Tuple
from pathlib import Path
class SecretScanner:
"""Scaneaza codul pentru secrete expuse."""
PATTERNS = [
# API Keys
(r'(?i)(api[_-]?key|apikey)\s*[=:]\s*["\']([a-zA-Z0-9_-]{20,})["\']', 'API Key'),
# AWS
(r'AKIA[0-9A-Z]{16}', 'AWS Access Key'),
(r'(?i)aws[_-]?secret[_-]?access[_-]?key\s*[=:]\s*["\']([a-zA-Z0-9/+=]{40})["\']', 'AWS Secret Key'),
# Parole
(r'(?i)(password|passwd|pwd)\s*[=:]\s*["\']([^"\']{8,})["\']', 'Password'),
# Chei private
(r'-----BEGIN (RSA |EC |DSA )?PRIVATE KEY-----', 'Private Key'),
# JWT
(r'eyJ[a-zA-Z0-9_-]*\.eyJ[a-zA-Z0-9_-]*\.[a-zA-Z0-9_-]*', 'JWT Token'),
# Secrete generice
(r'(?i)(secret|token)\s*[=:]\s*["\']([a-zA-Z0-9_-]{20,})["\']', 'Generic Secret'),
]
EXCLUDE_PATTERNS = [
r'\.git/',
r'node_modules/',
r'\.env\.example',
r'__pycache__/',
]
def __init__(self, root_path: str):
self.root_path = Path(root_path)
self.findings: List[Dict] = []
def scan(self) -> List[Dict]:
"""Scaneaza directorul pentru secrete."""
for file_path in self.root_path.rglob('*'):
if file_path.is_file() and not self._should_exclude(file_path):
self._scan_file(file_path)
return self.findings
def _should_exclude(self, path: Path) -> bool:
"""Verifica daca calea trebuie exclusa."""
path_str = str(path)
return any(
re.search(pattern, path_str)
for pattern in self.EXCLUDE_PATTERNS
)
def _scan_file(self, file_path: Path):
"""Scaneaza un singur fisier pentru secrete."""
try:
content = file_path.read_text(encoding='utf-8', errors='ignore')
except Exception:
return
for line_num, line in enumerate(content.split('\n'), 1):
for pattern, secret_type in self.PATTERNS:
if re.search(pattern, line):
self.findings.append({
'file': str(file_path.relative_to(self.root_path)),
'line': line_num,
'type': secret_type,
'preview': self._redact_preview(line)
})
def _redact_preview(self, line: str, max_length: int = 80) -> str:
"""Creeaza un preview redactat al liniei."""
# Inlocuieste potentialele secrete cu asteriscuri
redacted = re.sub(
r'(["\'])[a-zA-Z0-9_/+=.-]{10,}(["\'])',
r'\1****REDACTAT****\2',
line
)
return redacted[:max_length] + ('...' if len(redacted) > max_length else '')
# Hook pre-commit
if __name__ == '__main__':
import sys
scanner = SecretScanner('.')
findings = scanner.scan()
if findings:
print("Atentie! Potentiale secrete detectate:")
for finding in findings:
print(f" {finding['file']}:{finding['line']} - {finding['type']}")
print(f" {finding['preview']}")
sys.exit(1)
else:
print("Niciun secret detectat")
sys.exit(0)Distributia Secretelor cu Zero-Trust
# zero_trust_secrets.yaml
architecture:
principles:
- never_trust_network
- verify_explicitly
- least_privilege
- assume_breach
implementation:
authentication:
method: mutual_tls
certificate_rotation: automatic
certificate_ttl: 24h
authorization:
model: attribute_based_access_control
context:
- identity
- device_posture
- location
- time
- resource_sensitivity
encryption:
in_transit: tls_1_3
at_rest: aes_256_gcm
key_management: hsm_backed
audit:
log_all_access: true
retention: 90_days
alerting: real_timeConcluzie
Managementul eficient al secretelor necesita:
- Stocare centralizata cu controale de acces adecvate
- Rotatie automata pentru a limita ferestrele de expunere
- Credentiale dinamice acolo unde este posibil
- Auditare completa a tuturor accesarilor
- Aparare in adancime cu mai multe straturi de securitate
Alege solutia potrivita de management al secretelor in functie de infrastructura si cerintele de conformitate, si urmeaza intotdeauna principiul privilegiului minim.
Resurse Conexe
- Securitatea MLOps: Securizarea Pipeline-ului ML: Managementul secretelor in antrenament si servire ML
- ML CI/CD: Integrare si Deploy Continuu pentru Machine Learning: Injectarea securizata a credentialelor in pipeline-uri ML
- Securitatea Pipeline-urilor DevSecOps: Integrarea securitatii in CI/CD
- Securitate CI/CD pentru GitHub Actions: OIDC si secrete in workflow-uri GitHub
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →