MLOps

Kubeflow Pipelines: Building Scalable ML Workflows on Kubernetes

DeviDevs Team
7 min read
#Kubeflow#ML pipeline#Kubernetes#MLOps#pipeline orchestration#model training

Kubeflow Pipelines: Building Scalable ML Workflows on Kubernetes

Kubeflow Pipelines is the leading open-source platform for building and deploying ML workflows on Kubernetes. It provides a Python SDK for defining pipelines, a UI for tracking runs, and native integration with Kubernetes scheduling for GPU workloads.

Why Kubeflow for ML Pipelines?

While simpler orchestrators like Airflow work for data engineering, ML pipelines have unique requirements:

  • GPU scheduling — training steps need GPU nodes, preprocessing doesn't
  • Artifact tracking — models, datasets, and metrics must be versioned
  • Caching — skip expensive steps when inputs haven't changed
  • Isolation — each step runs in its own container with its own dependencies
  • Reproducibility — pipeline definitions are version-controlled code

Kubeflow solves all of these natively on Kubernetes.

Installation

# Install the Kubeflow Pipelines SDK
pip install kfp==2.7.0
 
# For a local development cluster (kind or minikube)
# Deploy Kubeflow Pipelines standalone
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"
 
# Port forward the UI
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

Part 1: Building Your First Pipeline

Define Pipeline Components

Each pipeline step is a component — a containerized function with typed inputs and outputs.

from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
 
@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"],
)
def prepare_data(
    raw_data_path: str,
    split_ratio: float,
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    data_stats: Output[Metrics],
):
    """Load and split data for training."""
    import pandas as pd
    from sklearn.model_selection import train_test_split
 
    df = pd.read_csv(raw_data_path)
 
    # Log data statistics
    data_stats.log_metric("total_rows", len(df))
    data_stats.log_metric("feature_count", len(df.columns) - 1)
    data_stats.log_metric("null_percentage", df.isnull().mean().mean() * 100)
 
    # Split
    train_df, test_df = train_test_split(df, test_size=split_ratio, random_state=42)
    data_stats.log_metric("train_rows", len(train_df))
    data_stats.log_metric("test_rows", len(test_df))
 
    # Save to output artifacts
    train_df.to_csv(train_dataset.path, index=False)
    test_df.to_csv(test_dataset.path, index=False)
@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "mlflow==2.12.0"],
)
def train_model(
    train_dataset: Input[Dataset],
    n_estimators: int,
    max_depth: int,
    mlflow_tracking_uri: str,
    trained_model: Output[Model],
    training_metrics: Output[Metrics],
):
    """Train a Random Forest classifier and log to MLflow."""
    import pandas as pd
    import mlflow
    import mlflow.sklearn
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score, f1_score
    import joblib
 
    # Load data
    train_df = pd.read_csv(train_dataset.path)
    X_train = train_df.drop("target", axis=1)
    y_train = train_df["target"]
 
    # Train
    model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
    model.fit(X_train, y_train)
 
    # Evaluate on training data
    y_pred = model.predict(X_train)
    train_acc = accuracy_score(y_train, y_pred)
    train_f1 = f1_score(y_train, y_pred, average="weighted")
 
    # Log metrics to Kubeflow
    training_metrics.log_metric("train_accuracy", train_acc)
    training_metrics.log_metric("train_f1", train_f1)
 
    # Log to MLflow
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment("kubeflow-training")
    with mlflow.start_run():
        mlflow.log_params({"n_estimators": n_estimators, "max_depth": max_depth})
        mlflow.log_metrics({"train_accuracy": train_acc, "train_f1": train_f1})
        mlflow.sklearn.log_model(model, "model")
 
    # Save model artifact
    joblib.dump(model, trained_model.path)
@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"],
)
def evaluate_model(
    test_dataset: Input[Dataset],
    trained_model: Input[Model],
    eval_metrics: Output[Metrics],
    accuracy_threshold: float = 0.85,
) -> bool:
    """Evaluate model against test data and quality gate."""
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
 
    # Load
    test_df = pd.read_csv(test_dataset.path)
    X_test = test_df.drop("target", axis=1)
    y_test = test_df["target"]
    model = joblib.load(trained_model.path)
 
    # Evaluate
    y_pred = model.predict(X_test)
    metrics = {
        "test_accuracy": accuracy_score(y_test, y_pred),
        "test_precision": precision_score(y_test, y_pred, average="weighted"),
        "test_recall": recall_score(y_test, y_pred, average="weighted"),
        "test_f1": f1_score(y_test, y_pred, average="weighted"),
    }
 
    for name, value in metrics.items():
        eval_metrics.log_metric(name, value)
 
    passed = metrics["test_accuracy"] >= accuracy_threshold
    eval_metrics.log_metric("quality_gate_passed", int(passed))
    return passed

Compose the Pipeline

@dsl.pipeline(
    name="ML Training Pipeline",
    description="End-to-end training pipeline with quality gates",
)
def training_pipeline(
    raw_data_path: str = "gs://ml-data/customer-churn/raw.csv",
    split_ratio: float = 0.2,
    n_estimators: int = 100,
    max_depth: int = 10,
    accuracy_threshold: float = 0.85,
    mlflow_tracking_uri: str = "http://mlflow.ml-platform:5000",
):
    # Step 1: Prepare data
    prepare_task = prepare_data(
        raw_data_path=raw_data_path,
        split_ratio=split_ratio,
    )
 
    # Step 2: Train model
    train_task = train_model(
        train_dataset=prepare_task.outputs["train_dataset"],
        n_estimators=n_estimators,
        max_depth=max_depth,
        mlflow_tracking_uri=mlflow_tracking_uri,
    )
 
    # Step 3: Evaluate
    eval_task = evaluate_model(
        test_dataset=prepare_task.outputs["test_dataset"],
        trained_model=train_task.outputs["trained_model"],
        accuracy_threshold=accuracy_threshold,
    )
 
    # Step 4: Conditional deployment
    with dsl.Condition(eval_task.output == True):  # noqa: E712
        deploy_model(
            model=train_task.outputs["trained_model"],
            model_name="customer-churn",
        )

Compile and Submit

from kfp import compiler
from kfp.client import Client
 
# Compile to YAML
compiler.Compiler().compile(
    pipeline_func=training_pipeline,
    package_path="training_pipeline.yaml",
)
 
# Submit to Kubeflow
client = Client(host="http://localhost:8080")
run = client.create_run_from_pipeline_package(
    pipeline_file="training_pipeline.yaml",
    arguments={
        "raw_data_path": "gs://ml-data/customer-churn/raw.csv",
        "n_estimators": 200,
        "max_depth": 12,
    },
    run_name="churn-model-training-v2",
    experiment_name="churn-model",
)
print(f"Run ID: {run.run_id}")

Part 2: GPU Scheduling and Resources

Kubeflow runs on Kubernetes, giving you full control over resource allocation:

@dsl.component(
    base_image="nvcr.io/nvidia/pytorch:24.01-py3",
)
def train_deep_model(
    train_dataset: Input[Dataset],
    trained_model: Output[Model],
    epochs: int = 50,
    learning_rate: float = 0.001,
):
    """Train a deep learning model on GPU."""
    import torch
    import torch.nn as nn
    # ... training code using CUDA
 
# In the pipeline, set resource requirements
@dsl.pipeline(name="GPU Training Pipeline")
def gpu_pipeline():
    train_task = train_deep_model(
        train_dataset=prepare_task.outputs["train_dataset"],
        epochs=100,
    )
 
    # Request GPU resources
    train_task.set_gpu_limit(1)
    train_task.set_memory_limit("16Gi")
    train_task.set_cpu_limit("4")
 
    # Node selector for GPU nodes
    train_task.add_node_selector_constraint(
        "cloud.google.com/gke-accelerator", "nvidia-tesla-t4"
    )
 
    # Tolerate GPU node taints
    train_task.add_toleration({
        "key": "nvidia.com/gpu",
        "operator": "Equal",
        "value": "present",
        "effect": "NoSchedule",
    })

Part 3: Pipeline Caching

Kubeflow caches step outputs based on inputs. If a step's inputs haven't changed, it reuses the cached output:

# Caching is enabled by default. Disable for non-deterministic steps:
@dsl.pipeline(name="Pipeline with Cache Control")
def cached_pipeline():
    # This step caches because same input → same output
    prepare_task = prepare_data(raw_data_path="gs://data/v2.csv", split_ratio=0.2)
 
    # Disable cache for this step (e.g., it reads from a live database)
    live_data_task = fetch_live_metrics()
    live_data_task.set_caching_options(False)

Part 4: Recurring Pipelines (Scheduled Retraining)

# Create a recurring run for nightly retraining
client = Client(host="http://localhost:8080")
 
recurring_run = client.create_recurring_run(
    experiment_id=experiment.experiment_id,
    job_name="nightly-churn-retraining",
    pipeline_package_path="training_pipeline.yaml",
    cron_expression="0 2 * * *",  # Every day at 2 AM
    max_concurrency=1,
    parameters={
        "raw_data_path": "gs://ml-data/customer-churn/latest.csv",
        "accuracy_threshold": 0.87,
    },
)

Part 5: Advanced Patterns

Parallel Training with Different Algorithms

@dsl.pipeline(name="Model Selection Pipeline")
def model_selection_pipeline(data_path: str):
    prepare_task = prepare_data(raw_data_path=data_path, split_ratio=0.2)
 
    # Train multiple models in parallel
    rf_task = train_random_forest(
        train_dataset=prepare_task.outputs["train_dataset"],
        n_estimators=200,
    )
    xgb_task = train_xgboost(
        train_dataset=prepare_task.outputs["train_dataset"],
        max_depth=8,
    )
    lgbm_task = train_lightgbm(
        train_dataset=prepare_task.outputs["train_dataset"],
        num_leaves=31,
    )
 
    # Compare and select best
    select_task = select_best_model(
        rf_model=rf_task.outputs["model"],
        xgb_model=xgb_task.outputs["model"],
        lgbm_model=lgbm_task.outputs["model"],
        test_dataset=prepare_task.outputs["test_dataset"],
    )

Pipeline Composition (Reusable Sub-pipelines)

@dsl.pipeline(name="Feature Engineering Sub-pipeline")
def feature_engineering(raw_data: Input[Dataset]) -> Output[Dataset]:
    clean_task = clean_data(dataset=raw_data)
    encode_task = encode_categoricals(dataset=clean_task.outputs["cleaned"])
    scale_task = scale_numerics(dataset=encode_task.outputs["encoded"])
    return scale_task.outputs["features"]
 
@dsl.pipeline(name="Full Training Pipeline")
def full_pipeline(data_path: str):
    ingest_task = ingest_data(path=data_path)
    features_task = feature_engineering(raw_data=ingest_task.outputs["data"])
    train_task = train_model(features=features_task.outputs["features"])

Kubeflow vs. Other Orchestrators

| Feature | Kubeflow | Airflow | Prefect | Vertex AI | |---------|----------|---------|---------|-----------| | ML-native | Yes | No (general) | No (general) | Yes | | Container isolation | Per-step | Optional | Optional | Per-step | | GPU scheduling | Native K8s | Manual | Manual | Managed | | Artifact tracking | Built-in | External | External | Built-in | | Caching | Built-in | Limited | Basic | Built-in | | Self-hosted | Yes | Yes | Yes | No (GCP) | | Learning curve | Medium | Medium | Low | Low | | Best for | K8s teams | Data eng | Small teams | GCP users |

For teams already running Kubernetes, Kubeflow Pipelines is the natural choice. For teams that want managed infrastructure, cloud-native options like Vertex AI Pipelines (which uses the same KFP SDK) can reduce operational burden.

Production Deployment Checklist

Before running Kubeflow Pipelines in production:

  • [ ] PostgreSQL backend for metadata (not default SQLite)
  • [ ] S3/GCS artifact storage (not local filesystem)
  • [ ] RBAC configured for pipeline access
  • [ ] Resource quotas set per namespace
  • [ ] Pipeline versioning with Git tags
  • [ ] Monitoring via Prometheus + Grafana
  • [ ] Backup strategy for metadata DB
  • [ ] Secret management via Kubernetes Secrets or Vault

Next Steps


Need help setting up Kubeflow for your team? DeviDevs designs and operates production ML platforms on Kubernetes. Get a free assessment →

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.