ML Pipeline Orchestration: Airflow vs Kubeflow vs Prefect for MLOps
Every production ML system needs an orchestrator — something that runs your data ingestion, feature computation, training, evaluation, and deployment steps in the right order, handles failures, and provides visibility. The three dominant options are Apache Airflow, Kubeflow Pipelines, and Prefect.
This guide compares them with real code examples so you can make the right choice for your team.
Quick Comparison
| Feature | Airflow | Kubeflow Pipelines | Prefect | |---------|---------|-------------------|---------| | Primary use | Data engineering + ML | ML-native | General workflow | | Execution | Workers (Celery/K8s) | Kubernetes pods | Hybrid (local/cloud) | | Container isolation | KubernetesExecutor | Per-step (always) | Optional | | GPU support | Manual K8s config | Native | Manual | | ML artifact tracking | External (MLflow) | Built-in | External | | Caching | Limited | Built-in (input-based) | Task-level | | UI | Full DAG + logs | Pipeline + artifacts | Dashboard + flows | | Setup complexity | Medium-High | High (needs K8s) | Low | | Community | Massive | Large (CNCF) | Growing | | Best for | Data teams adding ML | K8s-native ML teams | Small-medium teams |
Apache Airflow for ML
Airflow is the industry standard for data pipeline orchestration. While not ML-specific, it's widely used for ML workflows thanks to its maturity and ecosystem.
ML Training DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
"owner": "ml-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["ml-alerts@company.com"],
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="ml_training_pipeline",
default_args=default_args,
description="Nightly model retraining pipeline",
schedule_interval="0 2 * * *", # 2 AM daily
start_date=days_ago(1),
catchup=False,
tags=["ml", "training"],
) as dag:
validate_data = KubernetesPodOperator(
task_id="validate_data",
name="data-validation",
image="registry.company.com/ml/data-validator:latest",
cmds=["python", "validate.py"],
arguments=["--source", "s3://data/latest/", "--schema", "configs/schema.yaml"],
namespace="ml-pipelines",
get_logs=True,
)
compute_features = KubernetesPodOperator(
task_id="compute_features",
name="feature-engineering",
image="registry.company.com/ml/feature-engine:latest",
cmds=["python", "compute_features.py"],
arguments=["--input", "s3://data/latest/", "--output", "s3://features/latest/"],
namespace="ml-pipelines",
resources={
"request_memory": "4Gi",
"request_cpu": "2",
},
)
train_model = KubernetesPodOperator(
task_id="train_model",
name="model-training",
image="registry.company.com/ml/trainer:latest",
cmds=["python", "train.py"],
arguments=["--features", "s3://features/latest/", "--experiment", "nightly"],
namespace="ml-pipelines",
resources={
"request_memory": "8Gi",
"request_cpu": "4",
"limit_gpu": "1",
},
node_selector={"gpu": "true"},
)
evaluate_model = KubernetesPodOperator(
task_id="evaluate_model",
name="model-evaluation",
image="registry.company.com/ml/evaluator:latest",
cmds=["python", "evaluate.py"],
namespace="ml-pipelines",
)
def check_quality_gate(**context):
"""Check if model passed quality gates."""
ti = context["ti"]
eval_result = ti.xcom_pull(task_ids="evaluate_model")
if eval_result and eval_result.get("passed"):
return "register_model"
return "notify_failure"
quality_gate = PythonOperator(
task_id="quality_gate",
python_callable=check_quality_gate,
)
register_model = KubernetesPodOperator(
task_id="register_model",
name="model-registration",
image="registry.company.com/ml/registry-client:latest",
cmds=["python", "register.py"],
namespace="ml-pipelines",
)
# DAG dependencies
validate_data >> compute_features >> train_model >> evaluate_model >> quality_gate >> register_modelAirflow Strengths for ML
- Mature scheduling and monitoring
- Extensive provider ecosystem (AWS, GCP, Azure, Databricks)
- Battle-tested at scale (1000+ DAGs)
- Strong community and documentation
Airflow Limitations for ML
- No native ML artifact tracking (need external MLflow)
- No built-in caching of pipeline steps
- Limited data passing between tasks (XCom has size limits)
- Not container-isolated by default (need KubernetesExecutor)
Kubeflow Pipelines for ML
Kubeflow Pipelines is built specifically for ML on Kubernetes. Every step runs in its own container with its own dependencies.
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(base_image="python:3.11-slim", packages_to_install=["pandas", "great-expectations"])
def validate_data(data_path: str, validated_data: Output[Dataset], stats: Output[Metrics]):
import pandas as pd
df = pd.read_parquet(data_path)
stats.log_metric("rows", len(df))
stats.log_metric("columns", len(df.columns))
# Validation logic...
df.to_parquet(validated_data.path)
@dsl.component(base_image="python:3.11-slim", packages_to_install=["pandas", "scikit-learn"])
def compute_features(data: Input[Dataset], features: Output[Dataset]):
import pandas as pd
df = pd.read_parquet(data.path)
# Feature engineering...
df.to_parquet(features.path)
@dsl.component(
base_image="nvcr.io/nvidia/pytorch:24.01-py3",
)
def train_model(features: Input[Dataset], model: Output[Model], metrics: Output[Metrics]):
import pandas as pd
import joblib
from sklearn.ensemble import GradientBoostingClassifier
df = pd.read_parquet(features.path)
X, y = df.drop("target", axis=1), df["target"]
clf = GradientBoostingClassifier(n_estimators=200, max_depth=8)
clf.fit(X, y)
joblib.dump(clf, model.path)
metrics.log_metric("train_accuracy", clf.score(X, y))
@dsl.pipeline(name="ML Training Pipeline")
def training_pipeline(data_path: str = "gs://ml-data/latest.parquet"):
validate_task = validate_data(data_path=data_path)
feature_task = compute_features(data=validate_task.outputs["validated_data"])
train_task = train_model(features=feature_task.outputs["features"])
train_task.set_gpu_limit(1)
train_task.set_memory_limit("16Gi")Kubeflow Strengths
- ML-native artifact tracking (datasets, models, metrics)
- Built-in caching (skip steps with unchanged inputs)
- Container isolation per step (no dependency conflicts)
- Native GPU scheduling
- Typed inputs/outputs prevent wiring errors
Kubeflow Limitations
- Requires Kubernetes cluster
- Steeper learning curve
- Heavier infrastructure overhead
- Less mature than Airflow for non-ML tasks
For a deeper dive, see our Kubeflow Pipelines tutorial.
Prefect for ML
Prefect is the modern Python-native workflow orchestrator. It's simpler to set up than Airflow and doesn't require Kubernetes like Kubeflow.
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=2,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=12),
)
def validate_data(data_path: str) -> dict:
"""Validate training data quality."""
import pandas as pd
df = pd.read_parquet(data_path)
stats = {
"rows": len(df),
"null_rate": df.isnull().mean().mean(),
"valid": len(df) > 1000 and df.isnull().mean().mean() < 0.05,
}
if not stats["valid"]:
raise ValueError(f"Data validation failed: {stats}")
return stats
@task(retries=1, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=6))
def compute_features(data_path: str) -> str:
"""Engineer features and save to storage."""
import pandas as pd
df = pd.read_parquet(data_path)
# Feature computation...
output_path = "s3://features/latest.parquet"
df.to_parquet(output_path)
return output_path
@task(retries=1, timeout_seconds=3600)
def train_model(features_path: str, experiment_name: str) -> dict:
"""Train model and log to MLflow."""
import pandas as pd
import mlflow
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import f1_score
df = pd.read_parquet(features_path)
X, y = df.drop("target", axis=1), df["target"]
mlflow.set_experiment(experiment_name)
with mlflow.start_run():
model = GradientBoostingClassifier(n_estimators=200)
model.fit(X, y)
f1 = f1_score(y, model.predict(X), average="weighted")
mlflow.log_metric("f1", f1)
mlflow.sklearn.log_model(model, "model", registered_model_name="churn-predictor")
return {"f1": f1, "run_id": mlflow.active_run().info.run_id}
@task
def quality_gate(metrics: dict, threshold: float = 0.85) -> bool:
"""Check if model meets quality bar."""
passed = metrics["f1"] >= threshold
if not passed:
from prefect import get_run_logger
get_run_logger().warning(f"Model F1 {metrics['f1']:.3f} below threshold {threshold}")
return passed
@flow(name="ML Training Pipeline", retries=1)
def training_pipeline(
data_path: str = "s3://data/latest.parquet",
experiment_name: str = "production-training",
):
# Steps execute in dependency order
stats = validate_data(data_path)
features_path = compute_features(data_path)
metrics = train_model(features_path, experiment_name)
passed = quality_gate(metrics)
if passed:
from prefect import get_run_logger
get_run_logger().info(f"Model passed quality gate! F1: {metrics['f1']:.3f}")
return {"metrics": metrics, "passed": passed}
# Run locally or deploy to Prefect Cloud
if __name__ == "__main__":
training_pipeline()Prefect Strengths
- Simplest setup (pip install, no infrastructure)
- Pure Python — no YAML, no compilation
- Built-in caching with content-based keys
- Flexible deployment (local, Docker, K8s, serverless)
- Modern UI with real-time flow visualization
Prefect Limitations
- Smaller ML-specific ecosystem than Kubeflow
- No native artifact tracking (use MLflow)
- Less GPU scheduling support
- Prefect Cloud needed for production features
Decision Framework
Choose Airflow if:
- You have an existing Airflow deployment
- Your ML pipelines share infrastructure with data engineering
- You need scheduling for 100+ DAGs
- Your team is comfortable with Airflow's paradigm
Choose Kubeflow if:
- You're running on Kubernetes
- You need GPU scheduling
- Container isolation per step is important
- ML artifact tracking must be built-in
- Deep Kubeflow tutorial: Kubeflow Pipelines guide
Choose Prefect if:
- You're a small team (< 10 ML engineers)
- You want the fastest path to production
- You prefer pure Python workflows
- You don't need GPU orchestration built-in
Hybrid Approach
Many teams use a combination:
Airflow (Data Engineering)
│
▼ Produces training data on schedule
Kubeflow Pipelines (ML Training)
│
▼ Registers model in MLflow
Prefect (Model Deployment + Monitoring)
│
▼ Deploys to KServe, monitors performance
Related Resources
- What is MLOps? — Where pipeline orchestration fits in the MLOps lifecycle
- Kubeflow Pipelines deep dive — Complete tutorial
- MLOps best practices — Pipeline design patterns
- ML CI/CD — Automate pipeline execution
Need help choosing and implementing an ML orchestrator? DeviDevs designs production ML pipelines with Kubeflow, Airflow, or Prefect. Get a free assessment →