MLflow Tutorial: Experiment Tracking and Model Registry from Scratch
MLflow is the most widely adopted open-source platform for managing the ML lifecycle. It provides experiment tracking, model packaging, a model registry, and deployment tools — all without vendor lock-in.
This tutorial takes you from zero to a production-ready MLflow setup with real code examples.
Why MLflow?
Before MLflow, ML teams tracked experiments in spreadsheets, stored models on shared drives, and deployed via email ("here's the new pickle file"). MLflow provides:
- Experiment Tracking — Log parameters, metrics, and artifacts for every run
- Model Registry — Version, stage, and approve models centrally
- Model Packaging — Consistent model format across frameworks
- Deployment — Serve models via REST API, Docker, or cloud platforms
Installation and Setup
# Install MLflow
pip install mlflow[extras]
# Start the tracking server with a SQLite backend
mlflow server \
--backend-store-uri sqlite:///mlflow.db \
--default-artifact-root ./mlflow-artifacts \
--host 0.0.0.0 \
--port 5000For production, use PostgreSQL as the backend and S3/GCS for artifacts:
mlflow server \
--backend-store-uri postgresql://user:pass@db-host:5432/mlflow \
--default-artifact-root s3://ml-artifacts/mlflow \
--host 0.0.0.0 \
--port 5000Part 1: Experiment Tracking
Basic Experiment Logging
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# Connect to tracking server
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("iris-classification")
# Load data
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Define hyperparameters
params = {
"n_estimators": 100,
"max_depth": 5,
"min_samples_split": 2,
"random_state": 42,
}
with mlflow.start_run(run_name="rf-baseline"):
# Log parameters
mlflow.log_params(params)
# Train model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision_weighted": precision_score(y_test, y_pred, average="weighted"),
"recall_weighted": recall_score(y_test, y_pred, average="weighted"),
"f1_weighted": f1_score(y_test, y_pred, average="weighted"),
}
# Log metrics
mlflow.log_metrics(metrics)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log additional artifacts
import json
with open("feature_importance.json", "w") as f:
importance = dict(zip(load_iris().feature_names, model.feature_importances_))
json.dump(importance, f, indent=2)
mlflow.log_artifact("feature_importance.json")
print(f"Run ID: {mlflow.active_run().info.run_id}")
print(f"Metrics: {metrics}")Hyperparameter Sweep with Tracking
from itertools import product
mlflow.set_experiment("iris-hyperparameter-sweep")
param_grid = {
"n_estimators": [50, 100, 200],
"max_depth": [3, 5, 10, None],
"min_samples_split": [2, 5, 10],
}
best_f1 = 0
best_run_id = None
for n_est, depth, min_split in product(
param_grid["n_estimators"],
param_grid["max_depth"],
param_grid["min_samples_split"],
):
with mlflow.start_run(run_name=f"rf-{n_est}-{depth}-{min_split}"):
params = {
"n_estimators": n_est,
"max_depth": depth,
"min_samples_split": min_split,
}
mlflow.log_params(params)
model = RandomForestClassifier(**params, random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
f1 = f1_score(y_test, y_pred, average="weighted")
mlflow.log_metric("f1_weighted", f1)
if f1 > best_f1:
best_f1 = f1
best_run_id = mlflow.active_run().info.run_id
mlflow.sklearn.log_model(model, "model")
print(f"Best F1: {best_f1:.4f} (run: {best_run_id})")Tracking PyTorch Training
import mlflow
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
mlflow.set_experiment("pytorch-classifier")
class SimpleClassifier(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim // 2, output_dim),
)
def forward(self, x):
return self.net(x)
with mlflow.start_run(run_name="pytorch-baseline"):
# Log architecture params
config = {"input_dim": 4, "hidden_dim": 64, "output_dim": 3, "lr": 0.001, "epochs": 50, "batch_size": 16}
mlflow.log_params(config)
model = SimpleClassifier(config["input_dim"], config["hidden_dim"], config["output_dim"])
optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])
criterion = nn.CrossEntropyLoss()
# Training loop with step-level logging
X_tensor = torch.FloatTensor(X_train)
y_tensor = torch.LongTensor(y_train)
loader = DataLoader(TensorDataset(X_tensor, y_tensor), batch_size=config["batch_size"], shuffle=True)
for epoch in range(config["epochs"]):
model.train()
epoch_loss = 0
for X_batch, y_batch in loader:
optimizer.zero_grad()
output = model(X_batch)
loss = criterion(output, y_batch)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
avg_loss = epoch_loss / len(loader)
mlflow.log_metric("train_loss", avg_loss, step=epoch)
# Validation every 10 epochs
if epoch % 10 == 0:
model.eval()
with torch.no_grad():
val_output = model(torch.FloatTensor(X_test))
val_pred = val_output.argmax(dim=1).numpy()
val_acc = accuracy_score(y_test, val_pred)
mlflow.log_metric("val_accuracy", val_acc, step=epoch)
# Log final model
mlflow.pytorch.log_model(model, "model")Part 2: The Model Registry
The model registry provides a central hub for managing model versions through their lifecycle.
Registering Models
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Register the best model from our hyperparameter sweep
model_uri = f"runs:/{best_run_id}/model"
model_details = mlflow.register_model(model_uri, "iris-classifier")
print(f"Model: {model_details.name}")
print(f"Version: {model_details.version}")Model Lifecycle Management
client = MlflowClient()
# Transition model through stages
# Stage 1: None -> Staging (for validation)
client.transition_model_version_stage(
name="iris-classifier",
version=1,
stage="Staging",
archive_existing_versions=False,
)
# Run validation tests against staging model...
# (see testing section above)
# Stage 2: Staging -> Production (after validation passes)
client.transition_model_version_stage(
name="iris-classifier",
version=1,
stage="Production",
archive_existing_versions=True, # Archive previous production version
)
# Add description and tags
client.update_model_version(
name="iris-classifier",
version=1,
description="Random Forest baseline. F1=0.967. Trained on iris dataset v2.1."
)
client.set_model_version_tag("iris-classifier", 1, "validated", "true")
client.set_model_version_tag("iris-classifier", 1, "data_version", "v2.1")Loading Models from Registry
# Load the production model
import mlflow.pyfunc
# By stage
production_model = mlflow.pyfunc.load_model("models:/iris-classifier/Production")
# By version number
specific_version = mlflow.pyfunc.load_model("models:/iris-classifier/1")
# Predict
import pandas as pd
sample = pd.DataFrame([[5.1, 3.5, 1.4, 0.2]], columns=["sepal_l", "sepal_w", "petal_l", "petal_w"])
prediction = production_model.predict(sample)Part 3: Custom MLflow Models
For models that don't fit standard frameworks (ensemble models, complex preprocessing, LLM wrappers), use the PythonModel interface:
import mlflow.pyfunc
class ChurnPredictor(mlflow.pyfunc.PythonModel):
"""Custom model wrapping feature engineering + prediction + postprocessing."""
def load_context(self, context):
"""Load model artifacts."""
import joblib
self.model = joblib.load(context.artifacts["model"])
self.scaler = joblib.load(context.artifacts["scaler"])
self.feature_names = joblib.load(context.artifacts["feature_names"])
def predict(self, context, model_input):
"""Run prediction pipeline."""
import pandas as pd
import numpy as np
df = model_input if isinstance(model_input, pd.DataFrame) else pd.DataFrame(model_input)
# Feature engineering
df["recency_score"] = np.clip(df["days_since_last_purchase"] / 365, 0, 1)
df["value_segment"] = pd.cut(df["lifetime_value"], bins=[0, 100, 500, float("inf")], labels=[0, 1, 2])
# Scale features
features = df[self.feature_names]
scaled = self.scaler.transform(features)
# Predict
probabilities = self.model.predict_proba(scaled)
return pd.DataFrame({
"churn_probability": probabilities[:, 1],
"churn_prediction": (probabilities[:, 1] > 0.5).astype(int),
"risk_tier": pd.cut(
probabilities[:, 1],
bins=[0, 0.3, 0.7, 1.0],
labels=["low", "medium", "high"]
),
})
# Save custom model
with mlflow.start_run():
mlflow.pyfunc.log_model(
artifact_path="model",
python_model=ChurnPredictor(),
artifacts={
"model": "artifacts/model.joblib",
"scaler": "artifacts/scaler.joblib",
"feature_names": "artifacts/feature_names.joblib",
},
registered_model_name="churn-predictor",
pip_requirements=["scikit-learn==1.4.0", "pandas>=2.0", "numpy>=1.24"],
)Part 4: MLflow Projects for Reproducibility
MLflow Projects package your code with its environment for reproducible runs:
# MLproject file
name: churn-model
conda_env: conda.yaml
entry_points:
train:
parameters:
n_estimators: {type: int, default: 100}
max_depth: {type: int, default: 10}
learning_rate: {type: float, default: 0.1}
data_path: {type: str, default: "data/processed"}
command: "python train.py --n-estimators {n_estimators} --max-depth {max_depth} --lr {learning_rate} --data {data_path}"
evaluate:
parameters:
model_uri: {type: str}
data_path: {type: str, default: "data/test"}
command: "python evaluate.py --model-uri {model_uri} --data {data_path}"Run the project from anywhere:
# Run locally
mlflow run . -e train -P n_estimators=200 -P max_depth=12
# Run from Git
mlflow run https://github.com/your-org/churn-model -e train -P n_estimators=200
# Run on Databricks
mlflow run . -e train -b databricks --backend-config cluster.jsonPart 5: Serving Models in Production
REST API Serving
# Serve a registered model as a REST API
mlflow models serve \
--model-uri "models:/churn-predictor/Production" \
--port 8080 \
--host 0.0.0.0# Test the endpoint
curl -X POST http://localhost:8080/invocations \
-H "Content-Type: application/json" \
-d '{"inputs": [{"days_since_last_purchase": 45, "lifetime_value": 230, "total_orders": 12}]}'Docker Deployment
# Build a Docker image for the model
mlflow models build-docker \
--model-uri "models:/churn-predictor/Production" \
--name churn-predictor:v2.3
# Run the container
docker run -p 8080:8080 churn-predictor:v2.3Part 6: Production MLflow Architecture
For production deployments, here's a recommended architecture:
┌─────────────────────────────────────────────────────────┐
│ MLflow Architecture │
├─────────────────────────────────────────────────────────┤
│ │
│ Data Scientists MLflow Tracking Server │
│ ┌──────────┐ ┌──────────────────┐ │
│ │ Notebook │───────────▶│ REST API (:5000) │ │
│ │ Pipeline │ │ │ │
│ └──────────┘ └────────┬─────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌──────────┐ ┌──────────┐ │
│ │ PostgreSQL │ │ S3/GCS │ │ Model │ │
│ │ (metadata) │ │ (artifacts│ │ Registry │ │
│ └────────────┘ └──────────┘ └──────────┘ │
│ │
│ Production Serving │
│ ┌──────────────────────────────────────────────┐ │
│ │ Kubernetes + KServe/Seldon │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Model A │ │ Model B │ │ Model C │ │ │
│ │ │ (prod) │ │ (canary)│ │ (shadow)│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
Key MLflow Configuration Tips
# Autologging: automatically capture params, metrics, and models
mlflow.autolog() # Works with sklearn, pytorch, tensorflow, xgboost, lightgbm
# Nested runs for complex experiments
with mlflow.start_run(run_name="hyperparameter-search"):
mlflow.log_param("search_method", "grid")
for params in param_combinations:
with mlflow.start_run(run_name=f"trial-{params}", nested=True):
mlflow.log_params(params)
# ... train and evaluate
# Tags for organizing runs
mlflow.set_tag("team", "ml-platform")
mlflow.set_tag("use_case", "customer-churn")
mlflow.set_tag("data_version", "v2.3")
# System tags
mlflow.set_tag("mlflow.note.content", "Baseline model with engineered features")Next Steps
With MLflow handling experiment tracking and model management, the natural next steps are:
- Feature stores for consistent feature computation
- Kubeflow for pipeline orchestration at scale
- Model monitoring for production observability
- EU AI Act compliance — MLflow's audit trail directly supports regulatory requirements
Need help setting up MLflow for your team? DeviDevs builds production MLOps platforms with MLflow at the core. Get a free assessment →