MLOps Security: Securing Your ML Pipeline from Data to Deployment
ML pipelines introduce attack surfaces that don't exist in traditional software. Data can be poisoned, models can be stolen, training infrastructure can be compromised, and adversarial inputs can cause misclassification. This guide covers security controls at every stage of the MLOps pipeline.
ML Pipeline Attack Surface
Data Sources Training Pipeline Model Serving
┌──────────┐ ┌──────────────────┐ ┌──────────────┐
│ Data │──────▶│ Feature Eng │─────▶│ API Server │
│ Sources │ ① │ Training │ ③ │ Model │
│ │ │ Evaluation │ │ Inference │
└──────────┘ └──────────────────┘ └──────────────┘
│ │ │
▼ ▼ ▼
① Data Poisoning ② Supply Chain Attack ④ Adversarial Inputs
① Data Exfiltration ② Model Theft ④ Model Extraction
② Code Injection ④ API Abuse
Stage 1: Data Security
Data Poisoning Defense
import numpy as np
from scipy import stats
class DataPoisoningDetector:
"""Detect anomalous data points that may indicate poisoning attacks."""
def __init__(self, contamination_threshold: float = 0.05):
self.threshold = contamination_threshold
def statistical_outlier_detection(self, data: np.ndarray, feature_names: list[str]) -> dict:
"""Flag statistical outliers that could be poisoned samples."""
results = {"flagged_indices": set(), "feature_reports": {}}
for i, feature in enumerate(feature_names):
column = data[:, i]
z_scores = np.abs(stats.zscore(column, nan_policy="omit"))
outliers = np.where(z_scores > 3.5)[0]
if len(outliers) > 0:
results["feature_reports"][feature] = {
"outlier_count": len(outliers),
"outlier_percentage": len(outliers) / len(column) * 100,
"indices": outliers.tolist()[:20],
}
results["flagged_indices"].update(outliers.tolist())
results["total_flagged"] = len(results["flagged_indices"])
results["flagged_percentage"] = results["total_flagged"] / len(data) * 100
results["poisoning_likely"] = results["flagged_percentage"] > self.threshold * 100
return results
def label_consistency_check(self, features: np.ndarray, labels: np.ndarray) -> dict:
"""Check for label flipping attacks using nearest-neighbor consistency."""
from sklearn.neighbors import NearestNeighbors
nn = NearestNeighbors(n_neighbors=5)
nn.fit(features)
_, indices = nn.kneighbors(features)
inconsistent = []
for i in range(len(labels)):
neighbor_labels = labels[indices[i][1:]] # Exclude self
if labels[i] != stats.mode(neighbor_labels, keepdims=True).mode[0]:
inconsistent.append(i)
return {
"inconsistent_labels": len(inconsistent),
"inconsistency_rate": len(inconsistent) / len(labels),
"sample_indices": inconsistent[:50],
}Data Access Controls
class DataAccessPolicy:
"""Enforce data access controls for ML training data."""
SENSITIVITY_LEVELS = {
"public": {"encryption": False, "audit_log": False, "approval_required": False},
"internal": {"encryption": True, "audit_log": True, "approval_required": False},
"confidential": {"encryption": True, "audit_log": True, "approval_required": True},
"restricted": {"encryption": True, "audit_log": True, "approval_required": True, "mfa_required": True},
}
def check_access(self, user: str, dataset: str, sensitivity: str) -> dict:
policy = self.SENSITIVITY_LEVELS.get(sensitivity, self.SENSITIVITY_LEVELS["restricted"])
return {
"allowed": self._verify_authorization(user, dataset),
"encryption_required": policy["encryption"],
"audit_logged": policy["audit_log"],
"needs_approval": policy["approval_required"],
}
def _verify_authorization(self, user: str, dataset: str) -> bool:
# Check against access control list
# Implementation depends on your auth system (IAM, RBAC, etc.)
passStage 2: Training Pipeline Security
Model Artifact Integrity
import hashlib
import json
from datetime import datetime
from pathlib import Path
class ModelIntegrityVerifier:
"""Cryptographic integrity verification for model artifacts."""
def sign_model(self, model_path: str, metadata: dict) -> dict:
"""Generate integrity signature for a model artifact."""
model_hash = self._compute_file_hash(model_path)
manifest = {
"model_path": model_path,
"model_hash": model_hash,
"hash_algorithm": "sha256",
"signed_at": datetime.utcnow().isoformat(),
"metadata": metadata,
}
# Sign the manifest
manifest_bytes = json.dumps(manifest, sort_keys=True).encode()
manifest["signature"] = hashlib.sha256(manifest_bytes).hexdigest()
return manifest
def verify_model(self, model_path: str, manifest: dict) -> dict:
"""Verify model artifact matches its signed manifest."""
current_hash = self._compute_file_hash(model_path)
expected_hash = manifest["model_hash"]
return {
"path": model_path,
"integrity_valid": current_hash == expected_hash,
"current_hash": current_hash,
"expected_hash": expected_hash,
"signed_at": manifest["signed_at"],
}
def _compute_file_hash(self, file_path: str) -> str:
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()Dependency Security
# requirements-security.txt — Pin all ML dependencies with hashes
# Generate with: pip-compile --generate-hashes requirements.in
scikit-learn==1.4.0 \
--hash=sha256:abc123...
mlflow==2.12.0 \
--hash=sha256:def456...
torch==2.3.0 \
--hash=sha256:ghi789...# CI/CD: Scan ML dependencies for vulnerabilities
- name: Scan dependencies
run: |
pip-audit --requirement requirements.txt --strict
safety check --file requirements.txtStage 3: Model Serving Security
Input Validation
from pydantic import BaseModel, Field, field_validator
import numpy as np
class PredictionInput(BaseModel):
"""Strict input validation for model serving API."""
features: dict[str, float] = Field(..., max_length=50)
@field_validator("features")
@classmethod
def validate_feature_ranges(cls, v):
# Reject extreme values that may be adversarial
for name, value in v.items():
if not np.isfinite(value):
raise ValueError(f"Feature '{name}' must be finite, got {value}")
if abs(value) > 1e6:
raise ValueError(f"Feature '{name}' value {value} exceeds allowed range")
return vRate Limiting and Access Control
from datetime import datetime, timedelta
from collections import defaultdict
class MLAPISecurityMiddleware:
"""Security middleware for ML serving endpoints."""
def __init__(self, rate_limit: int = 100, window_seconds: int = 60):
self.rate_limit = rate_limit
self.window = timedelta(seconds=window_seconds)
self.request_counts: dict[str, list] = defaultdict(list)
def check_rate_limit(self, client_id: str) -> bool:
now = datetime.utcnow()
cutoff = now - self.window
self.request_counts[client_id] = [
t for t in self.request_counts[client_id] if t > cutoff
]
if len(self.request_counts[client_id]) >= self.rate_limit:
return False
self.request_counts[client_id].append(now)
return True
def validate_api_key(self, api_key: str) -> dict | None:
"""Validate API key and return permissions."""
# Check against key store
# Return: {"client_id": "...", "models": ["churn", "pricing"], "rate_limit": 200}
passSecurity Checklist for MLOps
| Stage | Control | Priority | |-------|---------|----------| | Data | Encrypt at rest and in transit | Critical | | Data | Access controls with audit logging | Critical | | Data | Poisoning detection (statistical + label checks) | High | | Data | Data provenance tracking | High | | Training | Pin and hash all dependencies | Critical | | Training | Scan for vulnerabilities in ML libraries | Critical | | Training | Model artifact integrity signing | High | | Training | Isolated training environments | High | | Registry | RBAC on model registry | Critical | | Registry | Approval workflow for production promotion | Critical | | Serving | Input validation and sanitization | Critical | | Serving | Rate limiting per client | High | | Serving | API authentication | Critical | | Serving | Model extraction detection | Medium | | Monitoring | Anomalous prediction patterns | High | | Monitoring | Adversarial input detection | Medium |
Related Resources
- AI supply chain security — Deep dive into ML dependency attacks
- AI model versioning security — Secure model management
- Adversarial ML attacks — Understanding adversarial threats
- MLOps best practices — Security-integrated MLOps workflows
- Model governance — Governance framework including security controls
- Enterprise AI security framework — Organization-wide AI security
Need to secure your ML pipeline? DeviDevs combines MLOps engineering with AI security expertise. Get a free security assessment →