MLOps

MLflow Tutorial: Experiment Tracking and Model Registry from Scratch

DeviDevs Team
8 min read
#MLflow#experiment tracking#model registry#MLOps#ML pipeline#model deployment

MLflow Tutorial: Experiment Tracking and Model Registry from Scratch

MLflow is the most widely adopted open-source platform for managing the ML lifecycle. It provides experiment tracking, model packaging, a model registry, and deployment tools — all without vendor lock-in.

This tutorial takes you from zero to a production-ready MLflow setup with real code examples.

Why MLflow?

Before MLflow, ML teams tracked experiments in spreadsheets, stored models on shared drives, and deployed via email ("here's the new pickle file"). MLflow provides:

  • Experiment Tracking — Log parameters, metrics, and artifacts for every run
  • Model Registry — Version, stage, and approve models centrally
  • Model Packaging — Consistent model format across frameworks
  • Deployment — Serve models via REST API, Docker, or cloud platforms

Installation and Setup

# Install MLflow
pip install mlflow[extras]
 
# Start the tracking server with a SQLite backend
mlflow server \
  --backend-store-uri sqlite:///mlflow.db \
  --default-artifact-root ./mlflow-artifacts \
  --host 0.0.0.0 \
  --port 5000

For production, use PostgreSQL as the backend and S3/GCS for artifacts:

mlflow server \
  --backend-store-uri postgresql://user:pass@db-host:5432/mlflow \
  --default-artifact-root s3://ml-artifacts/mlflow \
  --host 0.0.0.0 \
  --port 5000

Part 1: Experiment Tracking

Basic Experiment Logging

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
 
# Connect to tracking server
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("iris-classification")
 
# Load data
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Define hyperparameters
params = {
    "n_estimators": 100,
    "max_depth": 5,
    "min_samples_split": 2,
    "random_state": 42,
}
 
with mlflow.start_run(run_name="rf-baseline"):
    # Log parameters
    mlflow.log_params(params)
 
    # Train model
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
 
    # Evaluate
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "precision_weighted": precision_score(y_test, y_pred, average="weighted"),
        "recall_weighted": recall_score(y_test, y_pred, average="weighted"),
        "f1_weighted": f1_score(y_test, y_pred, average="weighted"),
    }
 
    # Log metrics
    mlflow.log_metrics(metrics)
 
    # Log model
    mlflow.sklearn.log_model(model, "model")
 
    # Log additional artifacts
    import json
    with open("feature_importance.json", "w") as f:
        importance = dict(zip(load_iris().feature_names, model.feature_importances_))
        json.dump(importance, f, indent=2)
    mlflow.log_artifact("feature_importance.json")
 
    print(f"Run ID: {mlflow.active_run().info.run_id}")
    print(f"Metrics: {metrics}")

Hyperparameter Sweep with Tracking

from itertools import product
 
mlflow.set_experiment("iris-hyperparameter-sweep")
 
param_grid = {
    "n_estimators": [50, 100, 200],
    "max_depth": [3, 5, 10, None],
    "min_samples_split": [2, 5, 10],
}
 
best_f1 = 0
best_run_id = None
 
for n_est, depth, min_split in product(
    param_grid["n_estimators"],
    param_grid["max_depth"],
    param_grid["min_samples_split"],
):
    with mlflow.start_run(run_name=f"rf-{n_est}-{depth}-{min_split}"):
        params = {
            "n_estimators": n_est,
            "max_depth": depth,
            "min_samples_split": min_split,
        }
        mlflow.log_params(params)
 
        model = RandomForestClassifier(**params, random_state=42)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
 
        f1 = f1_score(y_test, y_pred, average="weighted")
        mlflow.log_metric("f1_weighted", f1)
 
        if f1 > best_f1:
            best_f1 = f1
            best_run_id = mlflow.active_run().info.run_id
            mlflow.sklearn.log_model(model, "model")
 
print(f"Best F1: {best_f1:.4f} (run: {best_run_id})")

Tracking PyTorch Training

import mlflow
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
 
mlflow.set_experiment("pytorch-classifier")
 
class SimpleClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim // 2, output_dim),
        )
 
    def forward(self, x):
        return self.net(x)
 
with mlflow.start_run(run_name="pytorch-baseline"):
    # Log architecture params
    config = {"input_dim": 4, "hidden_dim": 64, "output_dim": 3, "lr": 0.001, "epochs": 50, "batch_size": 16}
    mlflow.log_params(config)
 
    model = SimpleClassifier(config["input_dim"], config["hidden_dim"], config["output_dim"])
    optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])
    criterion = nn.CrossEntropyLoss()
 
    # Training loop with step-level logging
    X_tensor = torch.FloatTensor(X_train)
    y_tensor = torch.LongTensor(y_train)
    loader = DataLoader(TensorDataset(X_tensor, y_tensor), batch_size=config["batch_size"], shuffle=True)
 
    for epoch in range(config["epochs"]):
        model.train()
        epoch_loss = 0
        for X_batch, y_batch in loader:
            optimizer.zero_grad()
            output = model(X_batch)
            loss = criterion(output, y_batch)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
 
        avg_loss = epoch_loss / len(loader)
        mlflow.log_metric("train_loss", avg_loss, step=epoch)
 
        # Validation every 10 epochs
        if epoch % 10 == 0:
            model.eval()
            with torch.no_grad():
                val_output = model(torch.FloatTensor(X_test))
                val_pred = val_output.argmax(dim=1).numpy()
                val_acc = accuracy_score(y_test, val_pred)
                mlflow.log_metric("val_accuracy", val_acc, step=epoch)
 
    # Log final model
    mlflow.pytorch.log_model(model, "model")

Part 2: The Model Registry

The model registry provides a central hub for managing model versions through their lifecycle.

Registering Models

import mlflow
from mlflow.tracking import MlflowClient
 
client = MlflowClient()
 
# Register the best model from our hyperparameter sweep
model_uri = f"runs:/{best_run_id}/model"
model_details = mlflow.register_model(model_uri, "iris-classifier")
 
print(f"Model: {model_details.name}")
print(f"Version: {model_details.version}")

Model Lifecycle Management

client = MlflowClient()
 
# Transition model through stages
# Stage 1: None -> Staging (for validation)
client.transition_model_version_stage(
    name="iris-classifier",
    version=1,
    stage="Staging",
    archive_existing_versions=False,
)
 
# Run validation tests against staging model...
# (see testing section above)
 
# Stage 2: Staging -> Production (after validation passes)
client.transition_model_version_stage(
    name="iris-classifier",
    version=1,
    stage="Production",
    archive_existing_versions=True,  # Archive previous production version
)
 
# Add description and tags
client.update_model_version(
    name="iris-classifier",
    version=1,
    description="Random Forest baseline. F1=0.967. Trained on iris dataset v2.1."
)
client.set_model_version_tag("iris-classifier", 1, "validated", "true")
client.set_model_version_tag("iris-classifier", 1, "data_version", "v2.1")

Loading Models from Registry

# Load the production model
import mlflow.pyfunc
 
# By stage
production_model = mlflow.pyfunc.load_model("models:/iris-classifier/Production")
 
# By version number
specific_version = mlflow.pyfunc.load_model("models:/iris-classifier/1")
 
# Predict
import pandas as pd
sample = pd.DataFrame([[5.1, 3.5, 1.4, 0.2]], columns=["sepal_l", "sepal_w", "petal_l", "petal_w"])
prediction = production_model.predict(sample)

Part 3: Custom MLflow Models

For models that don't fit standard frameworks (ensemble models, complex preprocessing, LLM wrappers), use the PythonModel interface:

import mlflow.pyfunc
 
class ChurnPredictor(mlflow.pyfunc.PythonModel):
    """Custom model wrapping feature engineering + prediction + postprocessing."""
 
    def load_context(self, context):
        """Load model artifacts."""
        import joblib
        self.model = joblib.load(context.artifacts["model"])
        self.scaler = joblib.load(context.artifacts["scaler"])
        self.feature_names = joblib.load(context.artifacts["feature_names"])
 
    def predict(self, context, model_input):
        """Run prediction pipeline."""
        import pandas as pd
        import numpy as np
 
        df = model_input if isinstance(model_input, pd.DataFrame) else pd.DataFrame(model_input)
 
        # Feature engineering
        df["recency_score"] = np.clip(df["days_since_last_purchase"] / 365, 0, 1)
        df["value_segment"] = pd.cut(df["lifetime_value"], bins=[0, 100, 500, float("inf")], labels=[0, 1, 2])
 
        # Scale features
        features = df[self.feature_names]
        scaled = self.scaler.transform(features)
 
        # Predict
        probabilities = self.model.predict_proba(scaled)
 
        return pd.DataFrame({
            "churn_probability": probabilities[:, 1],
            "churn_prediction": (probabilities[:, 1] > 0.5).astype(int),
            "risk_tier": pd.cut(
                probabilities[:, 1],
                bins=[0, 0.3, 0.7, 1.0],
                labels=["low", "medium", "high"]
            ),
        })
 
# Save custom model
with mlflow.start_run():
    mlflow.pyfunc.log_model(
        artifact_path="model",
        python_model=ChurnPredictor(),
        artifacts={
            "model": "artifacts/model.joblib",
            "scaler": "artifacts/scaler.joblib",
            "feature_names": "artifacts/feature_names.joblib",
        },
        registered_model_name="churn-predictor",
        pip_requirements=["scikit-learn==1.4.0", "pandas>=2.0", "numpy>=1.24"],
    )

Part 4: MLflow Projects for Reproducibility

MLflow Projects package your code with its environment for reproducible runs:

# MLproject file
name: churn-model
 
conda_env: conda.yaml
 
entry_points:
  train:
    parameters:
      n_estimators: {type: int, default: 100}
      max_depth: {type: int, default: 10}
      learning_rate: {type: float, default: 0.1}
      data_path: {type: str, default: "data/processed"}
    command: "python train.py --n-estimators {n_estimators} --max-depth {max_depth} --lr {learning_rate} --data {data_path}"
 
  evaluate:
    parameters:
      model_uri: {type: str}
      data_path: {type: str, default: "data/test"}
    command: "python evaluate.py --model-uri {model_uri} --data {data_path}"

Run the project from anywhere:

# Run locally
mlflow run . -e train -P n_estimators=200 -P max_depth=12
 
# Run from Git
mlflow run https://github.com/your-org/churn-model -e train -P n_estimators=200
 
# Run on Databricks
mlflow run . -e train -b databricks --backend-config cluster.json

Part 5: Serving Models in Production

REST API Serving

# Serve a registered model as a REST API
mlflow models serve \
  --model-uri "models:/churn-predictor/Production" \
  --port 8080 \
  --host 0.0.0.0
# Test the endpoint
curl -X POST http://localhost:8080/invocations \
  -H "Content-Type: application/json" \
  -d '{"inputs": [{"days_since_last_purchase": 45, "lifetime_value": 230, "total_orders": 12}]}'

Docker Deployment

# Build a Docker image for the model
mlflow models build-docker \
  --model-uri "models:/churn-predictor/Production" \
  --name churn-predictor:v2.3
 
# Run the container
docker run -p 8080:8080 churn-predictor:v2.3

Part 6: Production MLflow Architecture

For production deployments, here's a recommended architecture:

┌─────────────────────────────────────────────────────────┐
│                   MLflow Architecture                     │
├─────────────────────────────────────────────────────────┤
│                                                           │
│  Data Scientists          MLflow Tracking Server          │
│  ┌──────────┐            ┌──────────────────┐            │
│  │ Notebook  │───────────▶│  REST API (:5000) │            │
│  │ Pipeline  │            │                  │            │
│  └──────────┘            └────────┬─────────┘            │
│                                   │                       │
│                      ┌────────────┼────────────┐         │
│                      │            │            │         │
│                      ▼            ▼            ▼         │
│              ┌────────────┐ ┌──────────┐ ┌──────────┐   │
│              │ PostgreSQL  │ │ S3/GCS   │ │ Model    │   │
│              │ (metadata)  │ │ (artifacts│ │ Registry │   │
│              └────────────┘ └──────────┘ └──────────┘   │
│                                                           │
│  Production Serving                                       │
│  ┌──────────────────────────────────────────────┐        │
│  │  Kubernetes + KServe/Seldon                    │        │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐         │        │
│  │  │ Model A │ │ Model B │ │ Model C │         │        │
│  │  │  (prod) │ │ (canary)│ │ (shadow)│         │        │
│  │  └─────────┘ └─────────┘ └─────────┘         │        │
│  └──────────────────────────────────────────────┘        │
└─────────────────────────────────────────────────────────┘

Key MLflow Configuration Tips

# Autologging: automatically capture params, metrics, and models
mlflow.autolog()  # Works with sklearn, pytorch, tensorflow, xgboost, lightgbm
 
# Nested runs for complex experiments
with mlflow.start_run(run_name="hyperparameter-search"):
    mlflow.log_param("search_method", "grid")
    for params in param_combinations:
        with mlflow.start_run(run_name=f"trial-{params}", nested=True):
            mlflow.log_params(params)
            # ... train and evaluate
 
# Tags for organizing runs
mlflow.set_tag("team", "ml-platform")
mlflow.set_tag("use_case", "customer-churn")
mlflow.set_tag("data_version", "v2.3")
 
# System tags
mlflow.set_tag("mlflow.note.content", "Baseline model with engineered features")

Next Steps

With MLflow handling experiment tracking and model management, the natural next steps are:


Need help setting up MLflow for your team? DeviDevs builds production MLOps platforms with MLflow at the core. Get a free assessment →

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.