MLOps

MLOps Security: Securing Your ML Pipeline from Data to Deployment

DeviDevs Team
6 min read
#MLOps security#ML pipeline security#data poisoning#model theft#AI security#MLOps

MLOps Security: Securing Your ML Pipeline from Data to Deployment

ML pipelines introduce attack surfaces that don't exist in traditional software. Data can be poisoned, models can be stolen, training infrastructure can be compromised, and adversarial inputs can cause misclassification. This guide covers security controls at every stage of the MLOps pipeline.

ML Pipeline Attack Surface

Data Sources        Training Pipeline         Model Serving
┌──────────┐       ┌──────────────────┐      ┌──────────────┐
│  Data     │──────▶│  Feature Eng     │─────▶│  API Server  │
│  Sources  │  ①   │  Training        │  ③  │  Model       │
│           │       │  Evaluation      │      │  Inference   │
└──────────┘       └──────────────────┘      └──────────────┘
     │                      │                        │
     ▼                      ▼                        ▼
① Data Poisoning    ② Supply Chain Attack   ④ Adversarial Inputs
① Data Exfiltration  ② Model Theft           ④ Model Extraction
                     ② Code Injection         ④ API Abuse

Stage 1: Data Security

Data Poisoning Defense

import numpy as np
from scipy import stats
 
class DataPoisoningDetector:
    """Detect anomalous data points that may indicate poisoning attacks."""
 
    def __init__(self, contamination_threshold: float = 0.05):
        self.threshold = contamination_threshold
 
    def statistical_outlier_detection(self, data: np.ndarray, feature_names: list[str]) -> dict:
        """Flag statistical outliers that could be poisoned samples."""
        results = {"flagged_indices": set(), "feature_reports": {}}
 
        for i, feature in enumerate(feature_names):
            column = data[:, i]
            z_scores = np.abs(stats.zscore(column, nan_policy="omit"))
            outliers = np.where(z_scores > 3.5)[0]
 
            if len(outliers) > 0:
                results["feature_reports"][feature] = {
                    "outlier_count": len(outliers),
                    "outlier_percentage": len(outliers) / len(column) * 100,
                    "indices": outliers.tolist()[:20],
                }
                results["flagged_indices"].update(outliers.tolist())
 
        results["total_flagged"] = len(results["flagged_indices"])
        results["flagged_percentage"] = results["total_flagged"] / len(data) * 100
        results["poisoning_likely"] = results["flagged_percentage"] > self.threshold * 100
 
        return results
 
    def label_consistency_check(self, features: np.ndarray, labels: np.ndarray) -> dict:
        """Check for label flipping attacks using nearest-neighbor consistency."""
        from sklearn.neighbors import NearestNeighbors
 
        nn = NearestNeighbors(n_neighbors=5)
        nn.fit(features)
        _, indices = nn.kneighbors(features)
 
        inconsistent = []
        for i in range(len(labels)):
            neighbor_labels = labels[indices[i][1:]]  # Exclude self
            if labels[i] != stats.mode(neighbor_labels, keepdims=True).mode[0]:
                inconsistent.append(i)
 
        return {
            "inconsistent_labels": len(inconsistent),
            "inconsistency_rate": len(inconsistent) / len(labels),
            "sample_indices": inconsistent[:50],
        }

Data Access Controls

class DataAccessPolicy:
    """Enforce data access controls for ML training data."""
 
    SENSITIVITY_LEVELS = {
        "public": {"encryption": False, "audit_log": False, "approval_required": False},
        "internal": {"encryption": True, "audit_log": True, "approval_required": False},
        "confidential": {"encryption": True, "audit_log": True, "approval_required": True},
        "restricted": {"encryption": True, "audit_log": True, "approval_required": True, "mfa_required": True},
    }
 
    def check_access(self, user: str, dataset: str, sensitivity: str) -> dict:
        policy = self.SENSITIVITY_LEVELS.get(sensitivity, self.SENSITIVITY_LEVELS["restricted"])
        return {
            "allowed": self._verify_authorization(user, dataset),
            "encryption_required": policy["encryption"],
            "audit_logged": policy["audit_log"],
            "needs_approval": policy["approval_required"],
        }
 
    def _verify_authorization(self, user: str, dataset: str) -> bool:
        # Check against access control list
        # Implementation depends on your auth system (IAM, RBAC, etc.)
        pass

Stage 2: Training Pipeline Security

Model Artifact Integrity

import hashlib
import json
from datetime import datetime
from pathlib import Path
 
class ModelIntegrityVerifier:
    """Cryptographic integrity verification for model artifacts."""
 
    def sign_model(self, model_path: str, metadata: dict) -> dict:
        """Generate integrity signature for a model artifact."""
        model_hash = self._compute_file_hash(model_path)
 
        manifest = {
            "model_path": model_path,
            "model_hash": model_hash,
            "hash_algorithm": "sha256",
            "signed_at": datetime.utcnow().isoformat(),
            "metadata": metadata,
        }
 
        # Sign the manifest
        manifest_bytes = json.dumps(manifest, sort_keys=True).encode()
        manifest["signature"] = hashlib.sha256(manifest_bytes).hexdigest()
 
        return manifest
 
    def verify_model(self, model_path: str, manifest: dict) -> dict:
        """Verify model artifact matches its signed manifest."""
        current_hash = self._compute_file_hash(model_path)
        expected_hash = manifest["model_hash"]
 
        return {
            "path": model_path,
            "integrity_valid": current_hash == expected_hash,
            "current_hash": current_hash,
            "expected_hash": expected_hash,
            "signed_at": manifest["signed_at"],
        }
 
    def _compute_file_hash(self, file_path: str) -> str:
        sha256 = hashlib.sha256()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(8192), b""):
                sha256.update(chunk)
        return sha256.hexdigest()

Dependency Security

# requirements-security.txt — Pin all ML dependencies with hashes
# Generate with: pip-compile --generate-hashes requirements.in
 
scikit-learn==1.4.0 \
    --hash=sha256:abc123...
mlflow==2.12.0 \
    --hash=sha256:def456...
torch==2.3.0 \
    --hash=sha256:ghi789...
# CI/CD: Scan ML dependencies for vulnerabilities
- name: Scan dependencies
  run: |
    pip-audit --requirement requirements.txt --strict
    safety check --file requirements.txt

Stage 3: Model Serving Security

Input Validation

from pydantic import BaseModel, Field, field_validator
import numpy as np
 
class PredictionInput(BaseModel):
    """Strict input validation for model serving API."""
    features: dict[str, float] = Field(..., max_length=50)
 
    @field_validator("features")
    @classmethod
    def validate_feature_ranges(cls, v):
        # Reject extreme values that may be adversarial
        for name, value in v.items():
            if not np.isfinite(value):
                raise ValueError(f"Feature '{name}' must be finite, got {value}")
            if abs(value) > 1e6:
                raise ValueError(f"Feature '{name}' value {value} exceeds allowed range")
        return v

Rate Limiting and Access Control

from datetime import datetime, timedelta
from collections import defaultdict
 
class MLAPISecurityMiddleware:
    """Security middleware for ML serving endpoints."""
 
    def __init__(self, rate_limit: int = 100, window_seconds: int = 60):
        self.rate_limit = rate_limit
        self.window = timedelta(seconds=window_seconds)
        self.request_counts: dict[str, list] = defaultdict(list)
 
    def check_rate_limit(self, client_id: str) -> bool:
        now = datetime.utcnow()
        cutoff = now - self.window
        self.request_counts[client_id] = [
            t for t in self.request_counts[client_id] if t > cutoff
        ]
        if len(self.request_counts[client_id]) >= self.rate_limit:
            return False
        self.request_counts[client_id].append(now)
        return True
 
    def validate_api_key(self, api_key: str) -> dict | None:
        """Validate API key and return permissions."""
        # Check against key store
        # Return: {"client_id": "...", "models": ["churn", "pricing"], "rate_limit": 200}
        pass

Security Checklist for MLOps

| Stage | Control | Priority | |-------|---------|----------| | Data | Encrypt at rest and in transit | Critical | | Data | Access controls with audit logging | Critical | | Data | Poisoning detection (statistical + label checks) | High | | Data | Data provenance tracking | High | | Training | Pin and hash all dependencies | Critical | | Training | Scan for vulnerabilities in ML libraries | Critical | | Training | Model artifact integrity signing | High | | Training | Isolated training environments | High | | Registry | RBAC on model registry | Critical | | Registry | Approval workflow for production promotion | Critical | | Serving | Input validation and sanitization | Critical | | Serving | Rate limiting per client | High | | Serving | API authentication | Critical | | Serving | Model extraction detection | Medium | | Monitoring | Anomalous prediction patterns | High | | Monitoring | Adversarial input detection | Medium |


Need to secure your ML pipeline? DeviDevs combines MLOps engineering with AI security expertise. Get a free security assessment →

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.