MLOps

ML Pipeline Orchestration: Airflow vs Kubeflow vs Prefect for MLOps

DeviDevs Team
7 min read
#ML pipeline#Airflow#Kubeflow#Prefect#MLOps#pipeline orchestration

ML Pipeline Orchestration: Airflow vs Kubeflow vs Prefect for MLOps

Every production ML system needs an orchestrator — something that runs your data ingestion, feature computation, training, evaluation, and deployment steps in the right order, handles failures, and provides visibility. The three dominant options are Apache Airflow, Kubeflow Pipelines, and Prefect.

This guide compares them with real code examples so you can make the right choice for your team.

Quick Comparison

| Feature | Airflow | Kubeflow Pipelines | Prefect | |---------|---------|-------------------|---------| | Primary use | Data engineering + ML | ML-native | General workflow | | Execution | Workers (Celery/K8s) | Kubernetes pods | Hybrid (local/cloud) | | Container isolation | KubernetesExecutor | Per-step (always) | Optional | | GPU support | Manual K8s config | Native | Manual | | ML artifact tracking | External (MLflow) | Built-in | External | | Caching | Limited | Built-in (input-based) | Task-level | | UI | Full DAG + logs | Pipeline + artifacts | Dashboard + flows | | Setup complexity | Medium-High | High (needs K8s) | Low | | Community | Massive | Large (CNCF) | Growing | | Best for | Data teams adding ML | K8s-native ML teams | Small-medium teams |

Apache Airflow for ML

Airflow is the industry standard for data pipeline orchestration. While not ML-specific, it's widely used for ML workflows thanks to its maturity and ecosystem.

ML Training DAG

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
 
default_args = {
    "owner": "ml-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["ml-alerts@company.com"],
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}
 
with DAG(
    dag_id="ml_training_pipeline",
    default_args=default_args,
    description="Nightly model retraining pipeline",
    schedule_interval="0 2 * * *",  # 2 AM daily
    start_date=days_ago(1),
    catchup=False,
    tags=["ml", "training"],
) as dag:
 
    validate_data = KubernetesPodOperator(
        task_id="validate_data",
        name="data-validation",
        image="registry.company.com/ml/data-validator:latest",
        cmds=["python", "validate.py"],
        arguments=["--source", "s3://data/latest/", "--schema", "configs/schema.yaml"],
        namespace="ml-pipelines",
        get_logs=True,
    )
 
    compute_features = KubernetesPodOperator(
        task_id="compute_features",
        name="feature-engineering",
        image="registry.company.com/ml/feature-engine:latest",
        cmds=["python", "compute_features.py"],
        arguments=["--input", "s3://data/latest/", "--output", "s3://features/latest/"],
        namespace="ml-pipelines",
        resources={
            "request_memory": "4Gi",
            "request_cpu": "2",
        },
    )
 
    train_model = KubernetesPodOperator(
        task_id="train_model",
        name="model-training",
        image="registry.company.com/ml/trainer:latest",
        cmds=["python", "train.py"],
        arguments=["--features", "s3://features/latest/", "--experiment", "nightly"],
        namespace="ml-pipelines",
        resources={
            "request_memory": "8Gi",
            "request_cpu": "4",
            "limit_gpu": "1",
        },
        node_selector={"gpu": "true"},
    )
 
    evaluate_model = KubernetesPodOperator(
        task_id="evaluate_model",
        name="model-evaluation",
        image="registry.company.com/ml/evaluator:latest",
        cmds=["python", "evaluate.py"],
        namespace="ml-pipelines",
    )
 
    def check_quality_gate(**context):
        """Check if model passed quality gates."""
        ti = context["ti"]
        eval_result = ti.xcom_pull(task_ids="evaluate_model")
        if eval_result and eval_result.get("passed"):
            return "register_model"
        return "notify_failure"
 
    quality_gate = PythonOperator(
        task_id="quality_gate",
        python_callable=check_quality_gate,
    )
 
    register_model = KubernetesPodOperator(
        task_id="register_model",
        name="model-registration",
        image="registry.company.com/ml/registry-client:latest",
        cmds=["python", "register.py"],
        namespace="ml-pipelines",
    )
 
    # DAG dependencies
    validate_data >> compute_features >> train_model >> evaluate_model >> quality_gate >> register_model

Airflow Strengths for ML

  • Mature scheduling and monitoring
  • Extensive provider ecosystem (AWS, GCP, Azure, Databricks)
  • Battle-tested at scale (1000+ DAGs)
  • Strong community and documentation

Airflow Limitations for ML

  • No native ML artifact tracking (need external MLflow)
  • No built-in caching of pipeline steps
  • Limited data passing between tasks (XCom has size limits)
  • Not container-isolated by default (need KubernetesExecutor)

Kubeflow Pipelines for ML

Kubeflow Pipelines is built specifically for ML on Kubernetes. Every step runs in its own container with its own dependencies.

from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
 
@dsl.component(base_image="python:3.11-slim", packages_to_install=["pandas", "great-expectations"])
def validate_data(data_path: str, validated_data: Output[Dataset], stats: Output[Metrics]):
    import pandas as pd
    df = pd.read_parquet(data_path)
    stats.log_metric("rows", len(df))
    stats.log_metric("columns", len(df.columns))
    # Validation logic...
    df.to_parquet(validated_data.path)
 
@dsl.component(base_image="python:3.11-slim", packages_to_install=["pandas", "scikit-learn"])
def compute_features(data: Input[Dataset], features: Output[Dataset]):
    import pandas as pd
    df = pd.read_parquet(data.path)
    # Feature engineering...
    df.to_parquet(features.path)
 
@dsl.component(
    base_image="nvcr.io/nvidia/pytorch:24.01-py3",
)
def train_model(features: Input[Dataset], model: Output[Model], metrics: Output[Metrics]):
    import pandas as pd
    import joblib
    from sklearn.ensemble import GradientBoostingClassifier
    df = pd.read_parquet(features.path)
    X, y = df.drop("target", axis=1), df["target"]
    clf = GradientBoostingClassifier(n_estimators=200, max_depth=8)
    clf.fit(X, y)
    joblib.dump(clf, model.path)
    metrics.log_metric("train_accuracy", clf.score(X, y))
 
@dsl.pipeline(name="ML Training Pipeline")
def training_pipeline(data_path: str = "gs://ml-data/latest.parquet"):
    validate_task = validate_data(data_path=data_path)
    feature_task = compute_features(data=validate_task.outputs["validated_data"])
 
    train_task = train_model(features=feature_task.outputs["features"])
    train_task.set_gpu_limit(1)
    train_task.set_memory_limit("16Gi")

Kubeflow Strengths

  • ML-native artifact tracking (datasets, models, metrics)
  • Built-in caching (skip steps with unchanged inputs)
  • Container isolation per step (no dependency conflicts)
  • Native GPU scheduling
  • Typed inputs/outputs prevent wiring errors

Kubeflow Limitations

  • Requires Kubernetes cluster
  • Steeper learning curve
  • Heavier infrastructure overhead
  • Less mature than Airflow for non-ML tasks

For a deeper dive, see our Kubeflow Pipelines tutorial.

Prefect for ML

Prefect is the modern Python-native workflow orchestrator. It's simpler to set up than Airflow and doesn't require Kubernetes like Kubeflow.

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
 
@task(
    retries=2,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=12),
)
def validate_data(data_path: str) -> dict:
    """Validate training data quality."""
    import pandas as pd
    df = pd.read_parquet(data_path)
    stats = {
        "rows": len(df),
        "null_rate": df.isnull().mean().mean(),
        "valid": len(df) > 1000 and df.isnull().mean().mean() < 0.05,
    }
    if not stats["valid"]:
        raise ValueError(f"Data validation failed: {stats}")
    return stats
 
@task(retries=1, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=6))
def compute_features(data_path: str) -> str:
    """Engineer features and save to storage."""
    import pandas as pd
    df = pd.read_parquet(data_path)
    # Feature computation...
    output_path = "s3://features/latest.parquet"
    df.to_parquet(output_path)
    return output_path
 
@task(retries=1, timeout_seconds=3600)
def train_model(features_path: str, experiment_name: str) -> dict:
    """Train model and log to MLflow."""
    import pandas as pd
    import mlflow
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.metrics import f1_score
 
    df = pd.read_parquet(features_path)
    X, y = df.drop("target", axis=1), df["target"]
 
    mlflow.set_experiment(experiment_name)
    with mlflow.start_run():
        model = GradientBoostingClassifier(n_estimators=200)
        model.fit(X, y)
        f1 = f1_score(y, model.predict(X), average="weighted")
        mlflow.log_metric("f1", f1)
        mlflow.sklearn.log_model(model, "model", registered_model_name="churn-predictor")
 
    return {"f1": f1, "run_id": mlflow.active_run().info.run_id}
 
@task
def quality_gate(metrics: dict, threshold: float = 0.85) -> bool:
    """Check if model meets quality bar."""
    passed = metrics["f1"] >= threshold
    if not passed:
        from prefect import get_run_logger
        get_run_logger().warning(f"Model F1 {metrics['f1']:.3f} below threshold {threshold}")
    return passed
 
@flow(name="ML Training Pipeline", retries=1)
def training_pipeline(
    data_path: str = "s3://data/latest.parquet",
    experiment_name: str = "production-training",
):
    # Steps execute in dependency order
    stats = validate_data(data_path)
    features_path = compute_features(data_path)
    metrics = train_model(features_path, experiment_name)
    passed = quality_gate(metrics)
 
    if passed:
        from prefect import get_run_logger
        get_run_logger().info(f"Model passed quality gate! F1: {metrics['f1']:.3f}")
 
    return {"metrics": metrics, "passed": passed}
 
# Run locally or deploy to Prefect Cloud
if __name__ == "__main__":
    training_pipeline()

Prefect Strengths

  • Simplest setup (pip install, no infrastructure)
  • Pure Python — no YAML, no compilation
  • Built-in caching with content-based keys
  • Flexible deployment (local, Docker, K8s, serverless)
  • Modern UI with real-time flow visualization

Prefect Limitations

  • Smaller ML-specific ecosystem than Kubeflow
  • No native artifact tracking (use MLflow)
  • Less GPU scheduling support
  • Prefect Cloud needed for production features

Decision Framework

Choose Airflow if:

  • You have an existing Airflow deployment
  • Your ML pipelines share infrastructure with data engineering
  • You need scheduling for 100+ DAGs
  • Your team is comfortable with Airflow's paradigm

Choose Kubeflow if:

  • You're running on Kubernetes
  • You need GPU scheduling
  • Container isolation per step is important
  • ML artifact tracking must be built-in
  • Deep Kubeflow tutorial: Kubeflow Pipelines guide

Choose Prefect if:

  • You're a small team (< 10 ML engineers)
  • You want the fastest path to production
  • You prefer pure Python workflows
  • You don't need GPU orchestration built-in

Hybrid Approach

Many teams use a combination:

Airflow (Data Engineering)
    │
    ▼  Produces training data on schedule
Kubeflow Pipelines (ML Training)
    │
    ▼  Registers model in MLflow
Prefect (Model Deployment + Monitoring)
    │
    ▼  Deploys to KServe, monitors performance

Need help choosing and implementing an ML orchestrator? DeviDevs designs production ML pipelines with Kubeflow, Airflow, or Prefect. Get a free assessment →

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.