Securitate MLOps: Securizarea Pipeline-ului ML de la Date la Deployment
Pipeline-urile ML introduc suprafete de atac care nu exista in software-ul traditional. Datele pot fi otravite, modelele pot fi furate, infrastructura de antrenare poate fi compromisa, iar input-urile adversariale pot cauza clasificari gresite. Acest ghid acopera controalele de securitate la fiecare etapa a pipeline-ului MLOps.
Suprafata de Atac a Pipeline-ului ML
Data Sources Training Pipeline Model Serving
┌──────────┐ ┌──────────────────┐ ┌──────────────┐
│ Data │──────▶│ Feature Eng │─────▶│ API Server │
│ Sources │ ① │ Training │ ③ │ Model │
│ │ │ Evaluation │ │ Inference │
└──────────┘ └──────────────────┘ └──────────────┘
│ │ │
▼ ▼ ▼
① Data Poisoning ② Supply Chain Attack ④ Adversarial Inputs
① Data Exfiltration ② Model Theft ④ Model Extraction
② Code Injection ④ API Abuse
Etapa 1: Securitatea Datelor
Aparare Impotriva Data Poisoning
import numpy as np
from scipy import stats
class DataPoisoningDetector:
"""Detect anomalous data points that may indicate poisoning attacks."""
def __init__(self, contamination_threshold: float = 0.05):
self.threshold = contamination_threshold
def statistical_outlier_detection(self, data: np.ndarray, feature_names: list[str]) -> dict:
"""Flag statistical outliers that could be poisoned samples."""
results = {"flagged_indices": set(), "feature_reports": {}}
for i, feature in enumerate(feature_names):
column = data[:, i]
z_scores = np.abs(stats.zscore(column, nan_policy="omit"))
outliers = np.where(z_scores > 3.5)[0]
if len(outliers) > 0:
results["feature_reports"][feature] = {
"outlier_count": len(outliers),
"outlier_percentage": len(outliers) / len(column) * 100,
"indices": outliers.tolist()[:20],
}
results["flagged_indices"].update(outliers.tolist())
results["total_flagged"] = len(results["flagged_indices"])
results["flagged_percentage"] = results["total_flagged"] / len(data) * 100
results["poisoning_likely"] = results["flagged_percentage"] > self.threshold * 100
return results
def label_consistency_check(self, features: np.ndarray, labels: np.ndarray) -> dict:
"""Check for label flipping attacks using nearest-neighbor consistency."""
from sklearn.neighbors import NearestNeighbors
nn = NearestNeighbors(n_neighbors=5)
nn.fit(features)
_, indices = nn.kneighbors(features)
inconsistent = []
for i in range(len(labels)):
neighbor_labels = labels[indices[i][1:]] # Exclude self
if labels[i] != stats.mode(neighbor_labels, keepdims=True).mode[0]:
inconsistent.append(i)
return {
"inconsistent_labels": len(inconsistent),
"inconsistency_rate": len(inconsistent) / len(labels),
"sample_indices": inconsistent[:50],
}Controale de Acces la Date
class DataAccessPolicy:
"""Enforce data access controls for ML training data."""
SENSITIVITY_LEVELS = {
"public": {"encryption": False, "audit_log": False, "approval_required": False},
"internal": {"encryption": True, "audit_log": True, "approval_required": False},
"confidential": {"encryption": True, "audit_log": True, "approval_required": True},
"restricted": {"encryption": True, "audit_log": True, "approval_required": True, "mfa_required": True},
}
def check_access(self, user: str, dataset: str, sensitivity: str) -> dict:
policy = self.SENSITIVITY_LEVELS.get(sensitivity, self.SENSITIVITY_LEVELS["restricted"])
return {
"allowed": self._verify_authorization(user, dataset),
"encryption_required": policy["encryption"],
"audit_logged": policy["audit_log"],
"needs_approval": policy["approval_required"],
}
def _verify_authorization(self, user: str, dataset: str) -> bool:
# Check against access control list
# Implementation depends on your auth system (IAM, RBAC, etc.)
passEtapa 2: Securitatea Pipeline-ului de Antrenare
Integritatea Artefactelor Modelului
import hashlib
import json
from datetime import datetime
from pathlib import Path
class ModelIntegrityVerifier:
"""Cryptographic integrity verification for model artifacts."""
def sign_model(self, model_path: str, metadata: dict) -> dict:
"""Generate integrity signature for a model artifact."""
model_hash = self._compute_file_hash(model_path)
manifest = {
"model_path": model_path,
"model_hash": model_hash,
"hash_algorithm": "sha256",
"signed_at": datetime.utcnow().isoformat(),
"metadata": metadata,
}
# Sign the manifest
manifest_bytes = json.dumps(manifest, sort_keys=True).encode()
manifest["signature"] = hashlib.sha256(manifest_bytes).hexdigest()
return manifest
def verify_model(self, model_path: str, manifest: dict) -> dict:
"""Verify model artifact matches its signed manifest."""
current_hash = self._compute_file_hash(model_path)
expected_hash = manifest["model_hash"]
return {
"path": model_path,
"integrity_valid": current_hash == expected_hash,
"current_hash": current_hash,
"expected_hash": expected_hash,
"signed_at": manifest["signed_at"],
}
def _compute_file_hash(self, file_path: str) -> str:
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()Securitatea Dependintelor
# requirements-security.txt, Pin all ML dependencies with hashes
# Generate with: pip-compile --generate-hashes requirements.in
scikit-learn==1.4.0 \
--hash=sha256:abc123...
mlflow==2.12.0 \
--hash=sha256:def456...
torch==2.3.0 \
--hash=sha256:ghi789...# CI/CD: Scan ML dependencies for vulnerabilities
- name: Scan dependencies
run: |
pip-audit --requirement requirements.txt --strict
safety check --file requirements.txtEtapa 3: Securitatea Model Serving
Validarea Input-urilor
from pydantic import BaseModel, Field, field_validator
import numpy as np
class PredictionInput(BaseModel):
"""Strict input validation for model serving API."""
features: dict[str, float] = Field(..., max_length=50)
@field_validator("features")
@classmethod
def validate_feature_ranges(cls, v):
# Reject extreme values that may be adversarial
for name, value in v.items():
if not np.isfinite(value):
raise ValueError(f"Feature '{name}' must be finite, got {value}")
if abs(value) > 1e6:
raise ValueError(f"Feature '{name}' value {value} exceeds allowed range")
return vRate Limiting si Control al Accesului
from datetime import datetime, timedelta
from collections import defaultdict
class MLAPISecurityMiddleware:
"""Security middleware for ML serving endpoints."""
def __init__(self, rate_limit: int = 100, window_seconds: int = 60):
self.rate_limit = rate_limit
self.window = timedelta(seconds=window_seconds)
self.request_counts: dict[str, list] = defaultdict(list)
def check_rate_limit(self, client_id: str) -> bool:
now = datetime.utcnow()
cutoff = now - self.window
self.request_counts[client_id] = [
t for t in self.request_counts[client_id] if t > cutoff
]
if len(self.request_counts[client_id]) >= self.rate_limit:
return False
self.request_counts[client_id].append(now)
return True
def validate_api_key(self, api_key: str) -> dict | None:
"""Validate API key and return permissions."""
# Check against key store
# Return: {"client_id": "...", "models": ["churn", "pricing"], "rate_limit": 200}
passLista de Verificare Securitate pentru MLOps
| Etapa | Control | Prioritate | |-------|---------|------------| | Date | Criptare at rest si in transit | Critica | | Date | Controale de acces cu audit logging | Critica | | Date | Detectia poisoning-ului (verificari statistice + etichete) | Ridicata | | Date | Trasabilitatea provenientei datelor | Ridicata | | Antrenare | Fixeaza si hashuieste toate dependintele | Critica | | Antrenare | Scaneaza vulnerabilitati in librariile ML | Critica | | Antrenare | Semnarea integritatii artefactelor modelului | Ridicata | | Antrenare | Medii de antrenare izolate | Ridicata | | Registru | RBAC pe registrul de modele | Critica | | Registru | Workflow de aprobare pentru promovarea in productie | Critica | | Serving | Validarea si sanitizarea input-urilor | Critica | | Serving | Rate limiting per client | Ridicata | | Serving | Autentificare API | Critica | | Serving | Detectia extractiei modelului | Medie | | Monitorizare | Pattern-uri de predictie anomale | Ridicata | | Monitorizare | Detectia input-urilor adversariale | Medie |
Resurse Conexe
- Securitatea supply chain AI: Aprofundare in atacurile asupra dependintelor ML
- Securitatea versionarii modelelor AI: Management securizat al modelelor
- Atacuri ML adversariale: Intelegerea amenintarilor adversariale
- Bune practici MLOps: Workflow-uri MLOps integrate cu securitate
- Guvernanta modelelor: Framework de guvernanta incluzand controale de securitate
- Framework de securitate AI enterprise: Securitate AI la nivel de organizatie
Ai nevoie sa iti securizezi pipeline-ul ML? DeviDevs combina ingineria MLOps cu expertiza in securitate AI. Solicita o evaluare gratuita de securitate →
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →