Managementul corect al secretelor este fundamental pentru livrarea sigura a software-ului. Acest ghid acopera bunele practici pentru gestionarea credentialelor, cheilor API, certificatelor si altor date sensibile in mediile de dezvoltare, CI/CD si productie.
Arhitectura Managementului Secretelor
Proiecteaza un sistem centralizat de management al secretelor:
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Dict, Any, List
from enum import Enum
import hashlib
import json
import os
class SecretType(Enum):
API_KEY = "api_key"
DATABASE_CREDENTIAL = "database_credential"
SSH_KEY = "ssh_key"
TLS_CERTIFICATE = "tls_certificate"
ENCRYPTION_KEY = "encryption_key"
SERVICE_ACCOUNT = "service_account"
@dataclass
class Secret:
name: str
secret_type: SecretType
value: str
version: int
created_at: datetime
expires_at: Optional[datetime]
metadata: Dict[str, Any]
@dataclass
class SecretAccessLog:
timestamp: datetime
secret_name: str
accessor: str
action: str
source_ip: str
success: bool
class SecretsManager:
def __init__(self, backend):
self.backend = backend
self.access_logs: List[SecretAccessLog] = []
def get_secret(
self,
path: str,
accessor: str,
source_ip: str,
version: Optional[int] = None
) -> Optional[Secret]:
"""Obtine un secret cu logare completa de audit."""
try:
secret = self.backend.get_secret(path, version)
self._log_access(
secret_name=path,
accessor=accessor,
action='read',
source_ip=source_ip,
success=True
)
if secret.expires_at and secret.expires_at < datetime.utcnow():
return None
return secret
except Exception as e:
self._log_access(
secret_name=path,
accessor=accessor,
action='read',
source_ip=source_ip,
success=False
)
raise
def set_secret(
self,
path: str,
value: str,
secret_type: SecretType,
accessor: str,
source_ip: str,
ttl_hours: Optional[int] = None
) -> Secret:
"""Seteaza un secret cu validare si logare de audit."""
self._validate_secret(value, secret_type)
expires_at = None
if ttl_hours:
expires_at = datetime.utcnow() + timedelta(hours=ttl_hours)
metadata = {
'type': secret_type.value,
'created_by': accessor,
'expires_at': expires_at.isoformat() if expires_at else None
}
secret = self.backend.set_secret(path, value, metadata)
self._log_access(
secret_name=path,
accessor=accessor,
action='write',
source_ip=source_ip,
success=True
)
return secret
def _validate_secret(self, value: str, secret_type: SecretType):
"""Valideaza valoarea secretului in functie de tip."""
if secret_type == SecretType.API_KEY:
if len(value) < 20:
raise ValueError("Cheia API trebuie sa aiba cel putin 20 de caractere")
elif secret_type == SecretType.DATABASE_CREDENTIAL:
cred = json.loads(value)
if 'username' not in cred or 'password' not in cred:
raise ValueError("Credentialele bazei de date trebuie sa contina username si password")
elif secret_type == SecretType.SSH_KEY:
if not value.startswith('-----BEGIN'):
raise ValueError("Format invalid de cheie SSH")
def _log_access(
self,
secret_name: str,
accessor: str,
action: str,
source_ip: str,
success: bool
):
"""Logheaza accesul la secrete in scopuri de audit."""
log = SecretAccessLog(
timestamp=datetime.utcnow(),
secret_name=secret_name,
accessor=accessor,
action=action,
source_ip=source_ip,
success=success
)
self.access_logs.append(log)
print(f"AUDIT: {json.dumps({
'timestamp': log.timestamp.isoformat(),
'secret': log.secret_name,
'accessor': log.accessor,
'action': log.action,
'success': log.success
})}")Integrare HashiCorp Vault
Implementeaza Vault ca backend pentru secrete:
import hvac
from typing import Optional, Dict
class VaultBackend:
def __init__(self, vault_addr: str, auth_method: str = 'token', **kwargs):
self.client = hvac.Client(url=vault_addr)
self._authenticate(auth_method, **kwargs)
self.mount_point = 'secret'
def _authenticate(self, method: str, **kwargs):
"""Autentificare la Vault."""
if method == 'token':
self.client.token = kwargs.get('token')
elif method == 'approle':
response = self.client.auth.approle.login(
role_id=kwargs['role_id'],
secret_id=kwargs['secret_id']
)
self.client.token = response['auth']['client_token']
elif method == 'kubernetes':
with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as f:
jwt = f.read()
response = self.client.auth.kubernetes.login(
role=kwargs['role'],
jwt=jwt
)
self.client.token = response['auth']['client_token']
if not self.client.is_authenticated():
raise Exception("Autentificarea la Vault a esuat")
def get_secret(self, path: str, version: Optional[int] = None) -> Secret:
"""Obtine secretul din Vault KV v2."""
if version:
response = self.client.secrets.kv.v2.read_secret_version(
path=path,
version=version,
mount_point=self.mount_point
)
else:
response = self.client.secrets.kv.v2.read_secret_version(
path=path,
mount_point=self.mount_point
)
data = response['data']
metadata = data['metadata']
return Secret(
name=path,
secret_type=SecretType(data['data'].get('type', 'api_key')),
value=data['data'].get('value', ''),
version=metadata['version'],
created_at=datetime.fromisoformat(metadata['created_time'].replace('Z', '+00:00')),
expires_at=None,
metadata=data['data']
)
def set_secret(self, path: str, value: str, metadata: Dict = None) -> Secret:
"""Scrie secretul in Vault KV v2."""
secret_data = {'value': value, **(metadata or {})}
self.client.secrets.kv.v2.create_or_update_secret(
path=path,
secret=secret_data,
mount_point=self.mount_point
)
return self.get_secret(path)
def delete_secret(self, path: str) -> bool:
"""Sterge secretul."""
self.client.secrets.kv.v2.delete_metadata_and_all_versions(
path=path,
mount_point=self.mount_point
)
return True
def list_secrets(self, path: str) -> List[str]:
"""Listeaza secretele de la o cale data."""
response = self.client.secrets.kv.v2.list_secrets(
path=path,
mount_point=self.mount_point
)
return response['data']['keys']Injectarea Secretelor in CI/CD
Injecteaza secretele in mod securizat in pipeline-urile CI/CD:
from dataclasses import dataclass
from typing import Optional, List, Dict
import subprocess
import os
@dataclass
class PipelineSecretConfig:
secret_path: str
env_var_name: str
file_path: Optional[str] = None
required: bool = True
class CICDSecretsInjector:
def __init__(self, secrets_manager: SecretsManager):
self.secrets_manager = secrets_manager
self.injected_secrets: List[str] = []
self.temp_files: List[str] = []
def inject_secrets(
self,
configs: List[PipelineSecretConfig],
accessor: str,
source_ip: str
) -> Dict[str, str]:
"""Injecteaza secretele ca variabile de mediu."""
env_vars = {}
for config in configs:
secret = self.secrets_manager.get_secret(
path=config.secret_path,
accessor=accessor,
source_ip=source_ip
)
if secret is None:
if config.required:
raise ValueError(f"Secret obligatoriu negasit: {config.secret_path}")
continue
env_vars[config.env_var_name] = secret.value
self.injected_secrets.append(config.env_var_name)
if config.file_path:
self._write_secret_file(config.file_path, secret.value)
return env_vars
def _write_secret_file(self, path: str, content: str):
"""Scrie secretul intr-un fisier cu permisiuni restrictionate."""
os.makedirs(os.path.dirname(path), exist_ok=True)
fd = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
try:
os.write(fd, content.encode())
finally:
os.close(fd)
self.temp_files.append(path)
def run_with_secrets(
self,
command: List[str],
configs: List[PipelineSecretConfig],
accessor: str,
source_ip: str,
mask_output: bool = True
) -> subprocess.CompletedProcess:
"""Ruleaza o comanda cu secretele injectate."""
secret_env = self.inject_secrets(configs, accessor, source_ip)
env = os.environ.copy()
env.update(secret_env)
try:
result = subprocess.run(command, env=env, capture_output=True, text=True)
if mask_output:
result.stdout = self._mask_secrets(result.stdout, secret_env)
result.stderr = self._mask_secrets(result.stderr, secret_env)
return result
finally:
self.cleanup()
def _mask_secrets(self, text: str, secrets: Dict[str, str]) -> str:
"""Mascheaza valorile secretelor in output."""
for name, value in secrets.items():
if value and len(value) > 4:
text = text.replace(value, f"***{name}***")
return text
def cleanup(self):
"""Curata secretele injectate si fisierele temporare."""
for var_name in self.injected_secrets:
if var_name in os.environ:
del os.environ[var_name]
for file_path in self.temp_files:
if os.path.exists(file_path):
with open(file_path, 'wb') as f:
f.write(b'\x00' * os.path.getsize(file_path))
os.remove(file_path)
self.injected_secrets.clear()
self.temp_files.clear()Automatizarea Rotatiei Secretelor
Automatizeaza rotatia secretelor:
from abc import ABC, abstractmethod
from typing import Dict
import psycopg2
import secrets as py_secrets
class RotationHandler(ABC):
@abstractmethod
def rotate(self, current_secret: Secret) -> str:
pass
@abstractmethod
def verify(self, new_value: str) -> bool:
pass
class DatabasePasswordRotationHandler(RotationHandler):
def __init__(self, config: Dict[str, str]):
self.config = config
def rotate(self, current_secret: Secret) -> str:
"""Roteaza parola bazei de date."""
cred = json.loads(current_secret.value)
new_password = py_secrets.token_urlsafe(24)
conn = psycopg2.connect(
host=self.config['host'],
database=self.config['database'],
user=self.config['admin_user'],
password=self.config['admin_password']
)
try:
with conn.cursor() as cur:
cur.execute(
f"ALTER USER {cred['username']} WITH PASSWORD %s",
(new_password,)
)
conn.commit()
finally:
conn.close()
cred['password'] = new_password
return json.dumps(cred)
def verify(self, new_value: str) -> bool:
"""Verifica daca noua parola functioneaza."""
cred = json.loads(new_value)
try:
conn = psycopg2.connect(
host=self.config['host'],
database=self.config['database'],
user=cred['username'],
password=cred['password']
)
conn.close()
return True
except Exception:
return False
class SecretsRotationOrchestrator:
def __init__(self, secrets_manager: SecretsManager):
self.secrets_manager = secrets_manager
self.handlers: Dict[str, RotationHandler] = {}
def register_handler(self, secret_path: str, handler: RotationHandler):
self.handlers[secret_path] = handler
def rotate_secret(self, secret_path: str, accessor: str) -> bool:
"""Efectueaza rotatia secretului cu verificare."""
handler = self.handlers.get(secret_path)
if not handler:
raise ValueError(f"Nu exista handler inregistrat pentru {secret_path}")
current = self.secrets_manager.get_secret(
path=secret_path,
accessor=accessor,
source_ip='rotation-orchestrator'
)
previous_value = current.value
try:
new_value = handler.rotate(current)
if not handler.verify(new_value):
raise Exception("Verificarea a esuat")
self.secrets_manager.set_secret(
path=secret_path,
value=new_value,
secret_type=current.secret_type,
accessor=accessor,
source_ip='rotation-orchestrator'
)
return True
except Exception as e:
print(f"Rotatia a esuat: {e}")
return FalseConcluzie
Managementul eficient al secretelor necesita stocare centralizata, controale stricte de acces, logare completa de audit si rotatie automatizata. Implementeaza HashiCorp Vault sau solutii similare ca sursa de adevar, integreaza-te cu pipeline-urile CI/CD pentru injectare securizata si stabileste politici de rotatie. Auditurile regulate ale pattern-urilor de acces la secrete ajuta la mentinerea posturii de securitate.
Resurse Conexe
- MLOps Security: Securing Your ML Pipeline: Managementul secretelor in infrastructura ML
- ML CI/CD: Continuous Integration and Deployment for Machine Learning: Pipeline-uri securizate de deployment ML
- DevSecOps Pipeline Security: Integrarea completa a securitatii in pipeline
- CI/CD Security for GitHub Actions: Protejarea workflow-urilor GitHub Actions
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →