Kubeflow Pipelines: Building Scalable ML Workflows on Kubernetes
Kubeflow Pipelines is the leading open-source platform for building and deploying ML workflows on Kubernetes. It provides a Python SDK for defining pipelines, a UI for tracking runs, and native integration with Kubernetes scheduling for GPU workloads.
Why Kubeflow for ML Pipelines?
While simpler orchestrators like Airflow work for data engineering, ML pipelines have unique requirements:
- GPU scheduling — training steps need GPU nodes, preprocessing doesn't
- Artifact tracking — models, datasets, and metrics must be versioned
- Caching — skip expensive steps when inputs haven't changed
- Isolation — each step runs in its own container with its own dependencies
- Reproducibility — pipeline definitions are version-controlled code
Kubeflow solves all of these natively on Kubernetes.
Installation
# Install the Kubeflow Pipelines SDK
pip install kfp==2.7.0
# For a local development cluster (kind or minikube)
# Deploy Kubeflow Pipelines standalone
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"
# Port forward the UI
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80Part 1: Building Your First Pipeline
Define Pipeline Components
Each pipeline step is a component — a containerized function with typed inputs and outputs.
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"],
)
def prepare_data(
raw_data_path: str,
split_ratio: float,
train_dataset: Output[Dataset],
test_dataset: Output[Dataset],
data_stats: Output[Metrics],
):
"""Load and split data for training."""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(raw_data_path)
# Log data statistics
data_stats.log_metric("total_rows", len(df))
data_stats.log_metric("feature_count", len(df.columns) - 1)
data_stats.log_metric("null_percentage", df.isnull().mean().mean() * 100)
# Split
train_df, test_df = train_test_split(df, test_size=split_ratio, random_state=42)
data_stats.log_metric("train_rows", len(train_df))
data_stats.log_metric("test_rows", len(test_df))
# Save to output artifacts
train_df.to_csv(train_dataset.path, index=False)
test_df.to_csv(test_dataset.path, index=False)@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "mlflow==2.12.0"],
)
def train_model(
train_dataset: Input[Dataset],
n_estimators: int,
max_depth: int,
mlflow_tracking_uri: str,
trained_model: Output[Model],
training_metrics: Output[Metrics],
):
"""Train a Random Forest classifier and log to MLflow."""
import pandas as pd
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblib
# Load data
train_df = pd.read_csv(train_dataset.path)
X_train = train_df.drop("target", axis=1)
y_train = train_df["target"]
# Train
model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
model.fit(X_train, y_train)
# Evaluate on training data
y_pred = model.predict(X_train)
train_acc = accuracy_score(y_train, y_pred)
train_f1 = f1_score(y_train, y_pred, average="weighted")
# Log metrics to Kubeflow
training_metrics.log_metric("train_accuracy", train_acc)
training_metrics.log_metric("train_f1", train_f1)
# Log to MLflow
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment("kubeflow-training")
with mlflow.start_run():
mlflow.log_params({"n_estimators": n_estimators, "max_depth": max_depth})
mlflow.log_metrics({"train_accuracy": train_acc, "train_f1": train_f1})
mlflow.sklearn.log_model(model, "model")
# Save model artifact
joblib.dump(model, trained_model.path)@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"],
)
def evaluate_model(
test_dataset: Input[Dataset],
trained_model: Input[Model],
eval_metrics: Output[Metrics],
accuracy_threshold: float = 0.85,
) -> bool:
"""Evaluate model against test data and quality gate."""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# Load
test_df = pd.read_csv(test_dataset.path)
X_test = test_df.drop("target", axis=1)
y_test = test_df["target"]
model = joblib.load(trained_model.path)
# Evaluate
y_pred = model.predict(X_test)
metrics = {
"test_accuracy": accuracy_score(y_test, y_pred),
"test_precision": precision_score(y_test, y_pred, average="weighted"),
"test_recall": recall_score(y_test, y_pred, average="weighted"),
"test_f1": f1_score(y_test, y_pred, average="weighted"),
}
for name, value in metrics.items():
eval_metrics.log_metric(name, value)
passed = metrics["test_accuracy"] >= accuracy_threshold
eval_metrics.log_metric("quality_gate_passed", int(passed))
return passedCompose the Pipeline
@dsl.pipeline(
name="ML Training Pipeline",
description="End-to-end training pipeline with quality gates",
)
def training_pipeline(
raw_data_path: str = "gs://ml-data/customer-churn/raw.csv",
split_ratio: float = 0.2,
n_estimators: int = 100,
max_depth: int = 10,
accuracy_threshold: float = 0.85,
mlflow_tracking_uri: str = "http://mlflow.ml-platform:5000",
):
# Step 1: Prepare data
prepare_task = prepare_data(
raw_data_path=raw_data_path,
split_ratio=split_ratio,
)
# Step 2: Train model
train_task = train_model(
train_dataset=prepare_task.outputs["train_dataset"],
n_estimators=n_estimators,
max_depth=max_depth,
mlflow_tracking_uri=mlflow_tracking_uri,
)
# Step 3: Evaluate
eval_task = evaluate_model(
test_dataset=prepare_task.outputs["test_dataset"],
trained_model=train_task.outputs["trained_model"],
accuracy_threshold=accuracy_threshold,
)
# Step 4: Conditional deployment
with dsl.Condition(eval_task.output == True): # noqa: E712
deploy_model(
model=train_task.outputs["trained_model"],
model_name="customer-churn",
)Compile and Submit
from kfp import compiler
from kfp.client import Client
# Compile to YAML
compiler.Compiler().compile(
pipeline_func=training_pipeline,
package_path="training_pipeline.yaml",
)
# Submit to Kubeflow
client = Client(host="http://localhost:8080")
run = client.create_run_from_pipeline_package(
pipeline_file="training_pipeline.yaml",
arguments={
"raw_data_path": "gs://ml-data/customer-churn/raw.csv",
"n_estimators": 200,
"max_depth": 12,
},
run_name="churn-model-training-v2",
experiment_name="churn-model",
)
print(f"Run ID: {run.run_id}")Part 2: GPU Scheduling and Resources
Kubeflow runs on Kubernetes, giving you full control over resource allocation:
@dsl.component(
base_image="nvcr.io/nvidia/pytorch:24.01-py3",
)
def train_deep_model(
train_dataset: Input[Dataset],
trained_model: Output[Model],
epochs: int = 50,
learning_rate: float = 0.001,
):
"""Train a deep learning model on GPU."""
import torch
import torch.nn as nn
# ... training code using CUDA
# In the pipeline, set resource requirements
@dsl.pipeline(name="GPU Training Pipeline")
def gpu_pipeline():
train_task = train_deep_model(
train_dataset=prepare_task.outputs["train_dataset"],
epochs=100,
)
# Request GPU resources
train_task.set_gpu_limit(1)
train_task.set_memory_limit("16Gi")
train_task.set_cpu_limit("4")
# Node selector for GPU nodes
train_task.add_node_selector_constraint(
"cloud.google.com/gke-accelerator", "nvidia-tesla-t4"
)
# Tolerate GPU node taints
train_task.add_toleration({
"key": "nvidia.com/gpu",
"operator": "Equal",
"value": "present",
"effect": "NoSchedule",
})Part 3: Pipeline Caching
Kubeflow caches step outputs based on inputs. If a step's inputs haven't changed, it reuses the cached output:
# Caching is enabled by default. Disable for non-deterministic steps:
@dsl.pipeline(name="Pipeline with Cache Control")
def cached_pipeline():
# This step caches because same input → same output
prepare_task = prepare_data(raw_data_path="gs://data/v2.csv", split_ratio=0.2)
# Disable cache for this step (e.g., it reads from a live database)
live_data_task = fetch_live_metrics()
live_data_task.set_caching_options(False)Part 4: Recurring Pipelines (Scheduled Retraining)
# Create a recurring run for nightly retraining
client = Client(host="http://localhost:8080")
recurring_run = client.create_recurring_run(
experiment_id=experiment.experiment_id,
job_name="nightly-churn-retraining",
pipeline_package_path="training_pipeline.yaml",
cron_expression="0 2 * * *", # Every day at 2 AM
max_concurrency=1,
parameters={
"raw_data_path": "gs://ml-data/customer-churn/latest.csv",
"accuracy_threshold": 0.87,
},
)Part 5: Advanced Patterns
Parallel Training with Different Algorithms
@dsl.pipeline(name="Model Selection Pipeline")
def model_selection_pipeline(data_path: str):
prepare_task = prepare_data(raw_data_path=data_path, split_ratio=0.2)
# Train multiple models in parallel
rf_task = train_random_forest(
train_dataset=prepare_task.outputs["train_dataset"],
n_estimators=200,
)
xgb_task = train_xgboost(
train_dataset=prepare_task.outputs["train_dataset"],
max_depth=8,
)
lgbm_task = train_lightgbm(
train_dataset=prepare_task.outputs["train_dataset"],
num_leaves=31,
)
# Compare and select best
select_task = select_best_model(
rf_model=rf_task.outputs["model"],
xgb_model=xgb_task.outputs["model"],
lgbm_model=lgbm_task.outputs["model"],
test_dataset=prepare_task.outputs["test_dataset"],
)Pipeline Composition (Reusable Sub-pipelines)
@dsl.pipeline(name="Feature Engineering Sub-pipeline")
def feature_engineering(raw_data: Input[Dataset]) -> Output[Dataset]:
clean_task = clean_data(dataset=raw_data)
encode_task = encode_categoricals(dataset=clean_task.outputs["cleaned"])
scale_task = scale_numerics(dataset=encode_task.outputs["encoded"])
return scale_task.outputs["features"]
@dsl.pipeline(name="Full Training Pipeline")
def full_pipeline(data_path: str):
ingest_task = ingest_data(path=data_path)
features_task = feature_engineering(raw_data=ingest_task.outputs["data"])
train_task = train_model(features=features_task.outputs["features"])Kubeflow vs. Other Orchestrators
| Feature | Kubeflow | Airflow | Prefect | Vertex AI | |---------|----------|---------|---------|-----------| | ML-native | Yes | No (general) | No (general) | Yes | | Container isolation | Per-step | Optional | Optional | Per-step | | GPU scheduling | Native K8s | Manual | Manual | Managed | | Artifact tracking | Built-in | External | External | Built-in | | Caching | Built-in | Limited | Basic | Built-in | | Self-hosted | Yes | Yes | Yes | No (GCP) | | Learning curve | Medium | Medium | Low | Low | | Best for | K8s teams | Data eng | Small teams | GCP users |
For teams already running Kubernetes, Kubeflow Pipelines is the natural choice. For teams that want managed infrastructure, cloud-native options like Vertex AI Pipelines (which uses the same KFP SDK) can reduce operational burden.
Production Deployment Checklist
Before running Kubeflow Pipelines in production:
- [ ] PostgreSQL backend for metadata (not default SQLite)
- [ ] S3/GCS artifact storage (not local filesystem)
- [ ] RBAC configured for pipeline access
- [ ] Resource quotas set per namespace
- [ ] Pipeline versioning with Git tags
- [ ] Monitoring via Prometheus + Grafana
- [ ] Backup strategy for metadata DB
- [ ] Secret management via Kubernetes Secrets or Vault
Next Steps
- MLflow integration — Use MLflow for experiment tracking within Kubeflow pipelines
- Feature stores — Feed consistent features into your Kubeflow training steps
- MLOps best practices — Apply production patterns to your Kubeflow workflows
- AI security — Secure your pipeline components and model artifacts
Need help setting up Kubeflow for your team? DeviDevs designs and operates production ML platforms on Kubernetes. Get a free assessment →