MLOps

MLOps Best Practices: Building Production-Ready ML Pipelines

DeviDevs Team
9 min read
#MLOps#ML pipeline#best practices#production ML#ML CI/CD#model deployment

MLOps Best Practices: Building Production-Ready ML Pipelines

Building ML pipelines that work reliably in production requires more than connecting a few scripts. This guide covers the battle-tested practices that separate production-grade ML systems from fragile prototypes.

1. Design Pipelines as DAGs, Not Scripts

Production ML pipelines should be directed acyclic graphs (DAGs) with clear step boundaries, not monolithic scripts. Each step should be independently testable, cacheable, and retriable.

# Bad: Monolithic training script
def train():
    data = load_data()
    features = preprocess(data)
    model = train_model(features)
    evaluate(model)
    deploy(model)
 
# Good: Pipeline as composable steps
from dataclasses import dataclass
from typing import Any
 
@dataclass
class PipelineStep:
    name: str
    inputs: list[str]
    outputs: list[str]
    fn: callable
 
pipeline = [
    PipelineStep("ingest", [], ["raw_data"], ingest_data),
    PipelineStep("validate", ["raw_data"], ["validated_data"], validate_data),
    PipelineStep("features", ["validated_data"], ["feature_matrix"], compute_features),
    PipelineStep("train", ["feature_matrix"], ["model_artifact"], train_model),
    PipelineStep("evaluate", ["model_artifact", "feature_matrix"], ["metrics"], evaluate_model),
    PipelineStep("register", ["model_artifact", "metrics"], ["model_version"], register_model),
]

Why DAGs Matter

  • Caching: Skip steps whose inputs haven't changed
  • Retry: Restart from the failed step, not the beginning
  • Parallelism: Independent steps run concurrently
  • Debugging: Inspect intermediate artifacts at any step
  • Auditing: Clear lineage from data to deployed model

2. Implement Data Validation at Pipeline Boundaries

Data issues cause more ML failures than code bugs. Validate data at every pipeline boundary with schema checks and statistical tests.

import pandera as pa
from pandera import Column, Check, DataFrameSchema
 
# Define expected schema with statistical constraints
training_data_schema = DataFrameSchema({
    "customer_id": Column(int, Check.gt(0), nullable=False),
    "age": Column(int, Check.in_range(18, 120), nullable=False),
    "purchase_amount": Column(float, Check.gt(0), nullable=True),
    "category": Column(str, Check.isin(["A", "B", "C", "D"]), nullable=False),
    "churn": Column(int, Check.isin([0, 1]), nullable=False),
}, checks=[
    # Table-level checks
    Check(lambda df: len(df) >= 1000, error="Minimum 1000 rows required"),
    Check(lambda df: df["churn"].mean() > 0.01, error="Churn rate suspiciously low"),
    Check(lambda df: df["churn"].mean() < 0.90, error="Churn rate suspiciously high"),
])
 
def validate_training_data(df):
    """Validate training data before pipeline proceeds."""
    try:
        training_data_schema.validate(df, lazy=True)
        return {"status": "passed", "rows": len(df), "churn_rate": df["churn"].mean()}
    except pa.errors.SchemaErrors as e:
        failures = e.failure_cases
        raise ValueError(f"Data validation failed:\n{failures.to_string()}")

Great Expectations for Complex Validation

For larger teams, Great Expectations provides a richer framework:

import great_expectations as gx
 
context = gx.get_context()
 
# Define expectations suite
suite = context.add_expectation_suite("training_data_suite")
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(column="age", min_value=18, max_value=120)
)
suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(min_value=1000)
)
suite.add_expectation(
    gx.expectations.ExpectColumnProportionOfUniqueValuesToBeBetween(
        column="customer_id", min_value=0.95
    )
)

3. Version Everything: Code, Data, and Models

In ML, code versioning alone is insufficient. You need to version data and model artifacts alongside code to achieve reproducibility.

# DVC pipeline definition (dvc.yaml)
stages:
  prepare:
    cmd: python src/prepare.py
    deps:
      - src/prepare.py
      - data/raw/
    outs:
      - data/processed/
    params:
      - prepare.split_ratio
      - prepare.seed
 
  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/processed/
    outs:
      - models/
    params:
      - train.n_estimators
      - train.max_depth
      - train.learning_rate
    metrics:
      - metrics/train.json:
          cache: false
 
  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/
      - data/processed/
    metrics:
      - metrics/eval.json:
          cache: false
    plots:
      - plots/confusion_matrix.csv:
          cache: false

The Three-Way Version Lock

Every production model should have a locked triplet:

# Model metadata: the three-way version lock
model_metadata = {
    "model_version": "churn-v2.3.1",
    "code_version": "git:abc123def",       # Git commit hash
    "data_version": "dvc:data/v2.3",        # DVC data version
    "training_params": {
        "n_estimators": 200,
        "max_depth": 12,
        "learning_rate": 0.05,
    },
    "training_metrics": {
        "accuracy": 0.943,
        "precision": 0.91,
        "recall": 0.88,
        "f1": 0.895,
        "auc_roc": 0.967,
    },
    "trained_at": "2026-02-01T14:30:00Z",
    "trained_by": "pipeline/nightly-retrain",
    "approved_by": "ml-lead@company.com",
}

4. Test ML Pipelines at Multiple Levels

ML systems need testing at data, model, and integration levels — not just unit tests.

import pytest
import numpy as np
from sklearn.metrics import accuracy_score, f1_score
 
class TestDataQuality:
    """Tests that run on every data refresh."""
 
    def test_no_null_targets(self, training_data):
        assert training_data["target"].notna().all()
 
    def test_feature_ranges(self, training_data):
        assert training_data["age"].between(0, 150).all()
        assert training_data["income"].ge(0).all()
 
    def test_class_balance(self, training_data):
        """Ensure no extreme class imbalance."""
        class_ratios = training_data["target"].value_counts(normalize=True)
        assert class_ratios.min() > 0.05, f"Minority class is only {class_ratios.min():.1%}"
 
    def test_no_data_leakage(self, training_data, test_data):
        """Ensure train/test split has no overlap."""
        train_ids = set(training_data["id"])
        test_ids = set(test_data["id"])
        assert train_ids.isdisjoint(test_ids), "Data leakage: overlapping IDs"
 
 
class TestModelQuality:
    """Tests that run after every training run."""
 
    def test_minimum_accuracy(self, model, test_data):
        predictions = model.predict(test_data.features)
        acc = accuracy_score(test_data.labels, predictions)
        assert acc > 0.85, f"Accuracy {acc:.3f} below threshold 0.85"
 
    def test_minimum_f1(self, model, test_data):
        predictions = model.predict(test_data.features)
        f1 = f1_score(test_data.labels, predictions, average="weighted")
        assert f1 > 0.80, f"F1 {f1:.3f} below threshold 0.80"
 
    def test_no_regression_vs_production(self, new_model, production_model, test_data):
        """New model must not be significantly worse than current production."""
        new_acc = accuracy_score(test_data.labels, new_model.predict(test_data.features))
        prod_acc = accuracy_score(test_data.labels, production_model.predict(test_data.features))
        assert new_acc >= prod_acc - 0.02, (
            f"New model ({new_acc:.3f}) regressed vs production ({prod_acc:.3f})"
        )
 
    def test_prediction_latency(self, model):
        """Single prediction must complete within SLA."""
        import time
        sample = np.random.randn(1, model.n_features_in_)
        start = time.perf_counter()
        for _ in range(100):
            model.predict(sample)
        avg_ms = (time.perf_counter() - start) / 100 * 1000
        assert avg_ms < 50, f"Avg latency {avg_ms:.1f}ms exceeds 50ms SLA"
 
    def test_model_size(self, model_path):
        """Model artifact must fit in serving container."""
        import os
        size_mb = os.path.getsize(model_path) / (1024 * 1024)
        assert size_mb < 500, f"Model size {size_mb:.0f}MB exceeds 500MB limit"
 
 
class TestInfrastructure:
    """Tests that validate deployment infrastructure."""
 
    def test_serving_endpoint_health(self, serving_url):
        import requests
        resp = requests.get(f"{serving_url}/health", timeout=5)
        assert resp.status_code == 200
 
    def test_serving_endpoint_prediction(self, serving_url, sample_input):
        import requests
        resp = requests.post(f"{serving_url}/predict", json=sample_input, timeout=10)
        assert resp.status_code == 200
        assert "prediction" in resp.json()

5. Implement Robust Model Deployment Strategies

Never deploy a new model to 100% of traffic immediately. Use progressive rollout strategies.

Shadow Deployment

Run the new model alongside production, compare outputs without affecting users:

class ShadowDeployment:
    def __init__(self, production_model, shadow_model):
        self.production = production_model
        self.shadow = shadow_model
        self.comparison_log = []
 
    async def predict(self, features: dict) -> dict:
        # Production model serves the response
        prod_result = self.production.predict(features)
 
        # Shadow model runs asynchronously — result is logged, not returned
        try:
            shadow_result = self.shadow.predict(features)
            self.comparison_log.append({
                "timestamp": datetime.utcnow().isoformat(),
                "production": prod_result,
                "shadow": shadow_result,
                "agree": prod_result == shadow_result,
            })
        except Exception as e:
            logger.warning(f"Shadow model error: {e}")
 
        return prod_result

Canary Deployment

Route a small percentage of traffic to the new model:

import random
 
class CanaryRouter:
    def __init__(self, production_model, canary_model, canary_percentage: float = 0.05):
        self.production = production_model
        self.canary = canary_model
        self.canary_pct = canary_percentage
 
    def predict(self, features: dict) -> dict:
        if random.random() < self.canary_pct:
            result = self.canary.predict(features)
            result["model_variant"] = "canary"
        else:
            result = self.production.predict(features)
            result["model_variant"] = "production"
        return result

6. Monitor Models in Production Continuously

Deployed models are not fire-and-forget. Implement monitoring for performance, data quality, and infrastructure health.

from dataclasses import dataclass
from datetime import datetime, timedelta
import numpy as np
 
@dataclass
class MonitoringAlert:
    metric: str
    current_value: float
    threshold: float
    severity: str  # "warning" | "critical"
    message: str
 
class ModelMonitor:
    def __init__(self, model_name: str, baseline_metrics: dict):
        self.model_name = model_name
        self.baseline = baseline_metrics
        self.alerts: list[MonitoringAlert] = []
 
    def check_prediction_distribution(self, predictions: np.ndarray) -> list[MonitoringAlert]:
        """Detect if prediction distribution has shifted."""
        alerts = []
        pred_mean = predictions.mean()
        baseline_mean = self.baseline["prediction_mean"]
 
        if abs(pred_mean - baseline_mean) / baseline_mean > 0.20:
            alerts.append(MonitoringAlert(
                metric="prediction_distribution",
                current_value=pred_mean,
                threshold=baseline_mean * 1.20,
                severity="critical",
                message=f"Prediction mean shifted from {baseline_mean:.3f} to {pred_mean:.3f}"
            ))
        return alerts
 
    def check_latency(self, latencies_ms: list[float]) -> list[MonitoringAlert]:
        """Detect latency degradation."""
        alerts = []
        p99 = np.percentile(latencies_ms, 99)
        if p99 > self.baseline["latency_p99_ms"] * 1.5:
            alerts.append(MonitoringAlert(
                metric="latency_p99",
                current_value=p99,
                threshold=self.baseline["latency_p99_ms"] * 1.5,
                severity="warning",
                message=f"P99 latency {p99:.0f}ms exceeds 1.5x baseline"
            ))
        return alerts
 
    def check_error_rate(self, total_requests: int, errors: int) -> list[MonitoringAlert]:
        """Detect elevated error rates."""
        alerts = []
        error_rate = errors / max(total_requests, 1)
        if error_rate > 0.01:
            alerts.append(MonitoringAlert(
                metric="error_rate",
                current_value=error_rate,
                threshold=0.01,
                severity="critical",
                message=f"Error rate {error_rate:.2%} exceeds 1% threshold"
            ))
        return alerts

7. Automate Retraining with Guardrails

Continuous training keeps models fresh, but retraining must have safety checks:

class RetrainingOrchestrator:
    def __init__(self, pipeline, model_registry, monitor):
        self.pipeline = pipeline
        self.registry = model_registry
        self.monitor = monitor
 
    def should_retrain(self) -> tuple[bool, str]:
        """Determine if retraining is needed based on monitoring signals."""
        # Check data drift
        drift_score = self.monitor.get_drift_score()
        if drift_score > 0.15:
            return True, f"Data drift detected (score: {drift_score:.3f})"
 
        # Check performance degradation
        current_accuracy = self.monitor.get_current_accuracy()
        baseline_accuracy = self.monitor.get_baseline_accuracy()
        if current_accuracy < baseline_accuracy - 0.05:
            return True, f"Accuracy dropped from {baseline_accuracy:.3f} to {current_accuracy:.3f}"
 
        # Check time since last training
        last_trained = self.registry.get_last_training_date()
        if (datetime.utcnow() - last_trained) > timedelta(days=30):
            return True, f"Last trained {(datetime.utcnow() - last_trained).days} days ago"
 
        return False, "No retraining trigger met"
 
    def retrain_with_validation(self):
        """Retrain with automated quality gates."""
        should, reason = self.should_retrain()
        if not should:
            return {"status": "skipped", "reason": reason}
 
        # Train new model
        new_model = self.pipeline.run()
 
        # Validate against current production
        prod_model = self.registry.get_production_model()
        comparison = self.compare_models(new_model, prod_model)
 
        if comparison["new_model_better"]:
            self.registry.register(new_model, stage="staging")
            return {"status": "staged", "improvement": comparison["improvement"]}
        else:
            return {"status": "rejected", "reason": "New model did not improve over production"}

8. Structure Your ML Project for Scale

ml-project/
├── data/
│   ├── raw/                  # Immutable raw data (DVC tracked)
│   ├── processed/            # Transformed features
│   └── validation/           # Test/validation splits
├── src/
│   ├── data/                 # Data loading and preprocessing
│   ├── features/             # Feature engineering
│   ├── models/               # Model training and evaluation
│   ├── serving/              # Serving infrastructure
│   └── monitoring/           # Monitoring and alerting
├── pipelines/
│   ├── training.py           # Training pipeline DAG
│   ├── evaluation.py         # Evaluation pipeline
│   └── deployment.py         # Deployment pipeline
├── tests/
│   ├── data/                 # Data quality tests
│   ├── model/                # Model quality tests
│   └── integration/          # End-to-end tests
├── configs/
│   ├── training.yaml         # Hyperparameters
│   ├── serving.yaml          # Serving configuration
│   └── monitoring.yaml       # Alert thresholds
├── dvc.yaml                  # DVC pipeline definition
├── dvc.lock                  # DVC version lock
└── pyproject.toml            # Python dependencies

Summary: The MLOps Best Practices Checklist

| Practice | Priority | Impact | |----------|----------|--------| | Experiment tracking from day one | Critical | Reproducibility | | Data validation at boundaries | Critical | Data quality | | Three-way versioning (code + data + model) | Critical | Reproducibility | | Multi-level testing (data, model, infra) | High | Reliability | | Progressive deployment (shadow/canary) | High | Safety | | Production monitoring + alerting | Critical | Availability | | Automated retraining with guardrails | Medium | Freshness | | Feature store for train-serve consistency | Medium | Accuracy |

Start with the critical items — experiment tracking, data validation, and versioning. Add the rest as your ML operation matures. The goal isn't to implement everything at once, but to systematically reduce the risk of each ML deployment.


Building production ML pipelines? DeviDevs helps teams design and implement MLOps infrastructure that scales. From pipeline architecture to deployment automation — get a free assessment →

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.