MLOps

Securitate MLOps: protejeaza pipeline-ul ML complet

Petru Constantin
--6 min lectura
#MLOps security#ML pipeline security#data poisoning#model theft#AI security#MLOps

Securitate MLOps: Securizarea Pipeline-ului ML de la Date la Deployment

Pipeline-urile ML introduc suprafete de atac care nu exista in software-ul traditional. Datele pot fi otravite, modelele pot fi furate, infrastructura de antrenare poate fi compromisa, iar input-urile adversariale pot cauza clasificari gresite. Acest ghid acopera controalele de securitate la fiecare etapa a pipeline-ului MLOps.

Suprafata de Atac a Pipeline-ului ML

Data Sources        Training Pipeline         Model Serving
┌──────────┐       ┌──────────────────┐      ┌──────────────┐
│  Data     │──────▶│  Feature Eng     │─────▶│  API Server  │
│  Sources  │  ①   │  Training        │  ③  │  Model       │
│           │       │  Evaluation      │      │  Inference   │
└──────────┘       └──────────────────┘      └──────────────┘
     │                      │                        │
     ▼                      ▼                        ▼
① Data Poisoning    ② Supply Chain Attack   ④ Adversarial Inputs
① Data Exfiltration  ② Model Theft           ④ Model Extraction
                     ② Code Injection         ④ API Abuse

Etapa 1: Securitatea Datelor

Aparare Impotriva Data Poisoning

import numpy as np
from scipy import stats
 
class DataPoisoningDetector:
    """Detect anomalous data points that may indicate poisoning attacks."""
 
    def __init__(self, contamination_threshold: float = 0.05):
        self.threshold = contamination_threshold
 
    def statistical_outlier_detection(self, data: np.ndarray, feature_names: list[str]) -> dict:
        """Flag statistical outliers that could be poisoned samples."""
        results = {"flagged_indices": set(), "feature_reports": {}}
 
        for i, feature in enumerate(feature_names):
            column = data[:, i]
            z_scores = np.abs(stats.zscore(column, nan_policy="omit"))
            outliers = np.where(z_scores > 3.5)[0]
 
            if len(outliers) > 0:
                results["feature_reports"][feature] = {
                    "outlier_count": len(outliers),
                    "outlier_percentage": len(outliers) / len(column) * 100,
                    "indices": outliers.tolist()[:20],
                }
                results["flagged_indices"].update(outliers.tolist())
 
        results["total_flagged"] = len(results["flagged_indices"])
        results["flagged_percentage"] = results["total_flagged"] / len(data) * 100
        results["poisoning_likely"] = results["flagged_percentage"] > self.threshold * 100
 
        return results
 
    def label_consistency_check(self, features: np.ndarray, labels: np.ndarray) -> dict:
        """Check for label flipping attacks using nearest-neighbor consistency."""
        from sklearn.neighbors import NearestNeighbors
 
        nn = NearestNeighbors(n_neighbors=5)
        nn.fit(features)
        _, indices = nn.kneighbors(features)
 
        inconsistent = []
        for i in range(len(labels)):
            neighbor_labels = labels[indices[i][1:]]  # Exclude self
            if labels[i] != stats.mode(neighbor_labels, keepdims=True).mode[0]:
                inconsistent.append(i)
 
        return {
            "inconsistent_labels": len(inconsistent),
            "inconsistency_rate": len(inconsistent) / len(labels),
            "sample_indices": inconsistent[:50],
        }

Controale de Acces la Date

class DataAccessPolicy:
    """Enforce data access controls for ML training data."""
 
    SENSITIVITY_LEVELS = {
        "public": {"encryption": False, "audit_log": False, "approval_required": False},
        "internal": {"encryption": True, "audit_log": True, "approval_required": False},
        "confidential": {"encryption": True, "audit_log": True, "approval_required": True},
        "restricted": {"encryption": True, "audit_log": True, "approval_required": True, "mfa_required": True},
    }
 
    def check_access(self, user: str, dataset: str, sensitivity: str) -> dict:
        policy = self.SENSITIVITY_LEVELS.get(sensitivity, self.SENSITIVITY_LEVELS["restricted"])
        return {
            "allowed": self._verify_authorization(user, dataset),
            "encryption_required": policy["encryption"],
            "audit_logged": policy["audit_log"],
            "needs_approval": policy["approval_required"],
        }
 
    def _verify_authorization(self, user: str, dataset: str) -> bool:
        # Check against access control list
        # Implementation depends on your auth system (IAM, RBAC, etc.)
        pass

Etapa 2: Securitatea Pipeline-ului de Antrenare

Integritatea Artefactelor Modelului

import hashlib
import json
from datetime import datetime
from pathlib import Path
 
class ModelIntegrityVerifier:
    """Cryptographic integrity verification for model artifacts."""
 
    def sign_model(self, model_path: str, metadata: dict) -> dict:
        """Generate integrity signature for a model artifact."""
        model_hash = self._compute_file_hash(model_path)
 
        manifest = {
            "model_path": model_path,
            "model_hash": model_hash,
            "hash_algorithm": "sha256",
            "signed_at": datetime.utcnow().isoformat(),
            "metadata": metadata,
        }
 
        # Sign the manifest
        manifest_bytes = json.dumps(manifest, sort_keys=True).encode()
        manifest["signature"] = hashlib.sha256(manifest_bytes).hexdigest()
 
        return manifest
 
    def verify_model(self, model_path: str, manifest: dict) -> dict:
        """Verify model artifact matches its signed manifest."""
        current_hash = self._compute_file_hash(model_path)
        expected_hash = manifest["model_hash"]
 
        return {
            "path": model_path,
            "integrity_valid": current_hash == expected_hash,
            "current_hash": current_hash,
            "expected_hash": expected_hash,
            "signed_at": manifest["signed_at"],
        }
 
    def _compute_file_hash(self, file_path: str) -> str:
        sha256 = hashlib.sha256()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(8192), b""):
                sha256.update(chunk)
        return sha256.hexdigest()

Securitatea Dependintelor

# requirements-security.txt, Pin all ML dependencies with hashes
# Generate with: pip-compile --generate-hashes requirements.in
 
scikit-learn==1.4.0 \
    --hash=sha256:abc123...
mlflow==2.12.0 \
    --hash=sha256:def456...
torch==2.3.0 \
    --hash=sha256:ghi789...
# CI/CD: Scan ML dependencies for vulnerabilities
- name: Scan dependencies
  run: |
    pip-audit --requirement requirements.txt --strict
    safety check --file requirements.txt

Etapa 3: Securitatea Model Serving

Validarea Input-urilor

from pydantic import BaseModel, Field, field_validator
import numpy as np
 
class PredictionInput(BaseModel):
    """Strict input validation for model serving API."""
    features: dict[str, float] = Field(..., max_length=50)
 
    @field_validator("features")
    @classmethod
    def validate_feature_ranges(cls, v):
        # Reject extreme values that may be adversarial
        for name, value in v.items():
            if not np.isfinite(value):
                raise ValueError(f"Feature '{name}' must be finite, got {value}")
            if abs(value) > 1e6:
                raise ValueError(f"Feature '{name}' value {value} exceeds allowed range")
        return v

Rate Limiting si Control al Accesului

from datetime import datetime, timedelta
from collections import defaultdict
 
class MLAPISecurityMiddleware:
    """Security middleware for ML serving endpoints."""
 
    def __init__(self, rate_limit: int = 100, window_seconds: int = 60):
        self.rate_limit = rate_limit
        self.window = timedelta(seconds=window_seconds)
        self.request_counts: dict[str, list] = defaultdict(list)
 
    def check_rate_limit(self, client_id: str) -> bool:
        now = datetime.utcnow()
        cutoff = now - self.window
        self.request_counts[client_id] = [
            t for t in self.request_counts[client_id] if t > cutoff
        ]
        if len(self.request_counts[client_id]) >= self.rate_limit:
            return False
        self.request_counts[client_id].append(now)
        return True
 
    def validate_api_key(self, api_key: str) -> dict | None:
        """Validate API key and return permissions."""
        # Check against key store
        # Return: {"client_id": "...", "models": ["churn", "pricing"], "rate_limit": 200}
        pass

Lista de Verificare Securitate pentru MLOps

| Etapa | Control | Prioritate | |-------|---------|------------| | Date | Criptare at rest si in transit | Critica | | Date | Controale de acces cu audit logging | Critica | | Date | Detectia poisoning-ului (verificari statistice + etichete) | Ridicata | | Date | Trasabilitatea provenientei datelor | Ridicata | | Antrenare | Fixeaza si hashuieste toate dependintele | Critica | | Antrenare | Scaneaza vulnerabilitati in librariile ML | Critica | | Antrenare | Semnarea integritatii artefactelor modelului | Ridicata | | Antrenare | Medii de antrenare izolate | Ridicata | | Registru | RBAC pe registrul de modele | Critica | | Registru | Workflow de aprobare pentru promovarea in productie | Critica | | Serving | Validarea si sanitizarea input-urilor | Critica | | Serving | Rate limiting per client | Ridicata | | Serving | Autentificare API | Critica | | Serving | Detectia extractiei modelului | Medie | | Monitorizare | Pattern-uri de predictie anomale | Ridicata | | Monitorizare | Detectia input-urilor adversariale | Medie |

Resurse Conexe


Ai nevoie sa iti securizezi pipeline-ul ML? DeviDevs combina ingineria MLOps cu expertiza in securitate AI. Solicita o evaluare gratuita de securitate →


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.