MLOps

Orchestrare pipeline ML: Airflow vs Kubeflow

Petru Constantin
--7 min lectura
#ML pipeline#Airflow#Kubeflow#Prefect#MLOps#pipeline orchestration

Orchestrarea Pipeline-urilor ML: Airflow vs Kubeflow vs Prefect pentru MLOps

Fiecare sistem ML in productie are nevoie de un orchestrator - ceva care sa ruleze etapele de ingestie date, calcul features, antrenare, evaluare si deployment in ordinea corecta, sa gestioneze erorile si sa ofere vizibilitate. Cele trei optiuni dominante sunt Apache Airflow, Kubeflow Pipelines si Prefect.

Acest ghid le compara cu exemple reale de cod ca sa poti face alegerea potrivita pentru echipa ta.

Comparatie Rapida

| Functionalitate | Airflow | Kubeflow Pipelines | Prefect | |-----------------|---------|-------------------|---------| | Utilizare principala | Data engineering + ML | Nativ ML | Workflow general | | Executie | Workers (Celery/K8s) | Pod-uri Kubernetes | Hibrid (local/cloud) | | Izolare container | KubernetesExecutor | Per-step (mereu) | Optional | | Suport GPU | Config manual K8s | Nativ | Manual | | Urmarire artefacte ML | Extern (MLflow) | Integrat | Extern | | Caching | Limitat | Integrat (bazat pe input) | La nivel de task | | UI | DAG complet + log-uri | Pipeline + artefacte | Dashboard + flow-uri | | Complexitate configurare | Medie-Ridicata | Ridicata (necesita K8s) | Scazuta | | Comunitate | Masiva | Mare (CNCF) | In crestere | | Cel mai bun pentru | Echipe de date cu ML | Echipe ML native pe K8s | Echipe mici-medii |

Apache Airflow pentru ML

Airflow este standardul industrial pentru orchestrarea pipeline-urilor de date. Desi nu este specific ML, este folosit pe scara larga pentru workflow-uri ML datorita maturitatii si ecosistemului sau.

DAG de Antrenare ML

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
 
default_args = {
    "owner": "ml-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["ml-alerts@company.com"],
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}
 
with DAG(
    dag_id="ml_training_pipeline",
    default_args=default_args,
    description="Nightly model retraining pipeline",
    schedule_interval="0 2 * * *",  # 2 AM daily
    start_date=days_ago(1),
    catchup=False,
    tags=["ml", "training"],
) as dag:
 
    validate_data = KubernetesPodOperator(
        task_id="validate_data",
        name="data-validation",
        image="registry.company.com/ml/data-validator:latest",
        cmds=["python", "validate.py"],
        arguments=["--source", "s3://data/latest/", "--schema", "configs/schema.yaml"],
        namespace="ml-pipelines",
        get_logs=True,
    )
 
    compute_features = KubernetesPodOperator(
        task_id="compute_features",
        name="feature-engineering",
        image="registry.company.com/ml/feature-engine:latest",
        cmds=["python", "compute_features.py"],
        arguments=["--input", "s3://data/latest/", "--output", "s3://features/latest/"],
        namespace="ml-pipelines",
        resources={
            "request_memory": "4Gi",
            "request_cpu": "2",
        },
    )
 
    train_model = KubernetesPodOperator(
        task_id="train_model",
        name="model-training",
        image="registry.company.com/ml/trainer:latest",
        cmds=["python", "train.py"],
        arguments=["--features", "s3://features/latest/", "--experiment", "nightly"],
        namespace="ml-pipelines",
        resources={
            "request_memory": "8Gi",
            "request_cpu": "4",
            "limit_gpu": "1",
        },
        node_selector={"gpu": "true"},
    )
 
    evaluate_model = KubernetesPodOperator(
        task_id="evaluate_model",
        name="model-evaluation",
        image="registry.company.com/ml/evaluator:latest",
        cmds=["python", "evaluate.py"],
        namespace="ml-pipelines",
    )
 
    def check_quality_gate(**context):
        """Check if model passed quality gates."""
        ti = context["ti"]
        eval_result = ti.xcom_pull(task_ids="evaluate_model")
        if eval_result and eval_result.get("passed"):
            return "register_model"
        return "notify_failure"
 
    quality_gate = PythonOperator(
        task_id="quality_gate",
        python_callable=check_quality_gate,
    )
 
    register_model = KubernetesPodOperator(
        task_id="register_model",
        name="model-registration",
        image="registry.company.com/ml/registry-client:latest",
        cmds=["python", "register.py"],
        namespace="ml-pipelines",
    )
 
    # DAG dependencies
    validate_data >> compute_features >> train_model >> evaluate_model >> quality_gate >> register_model

Punctele Forte ale Airflow pentru ML

  • Programare si monitorizare mature
  • Ecosistem extins de provideri (AWS, GCP, Azure, Databricks)
  • Testat in conditii reale la scara larga (1000+ DAG-uri)
  • Comunitate puternica si documentatie ampla

Limitarile Airflow pentru ML

  • Fara urmarire nativa a artefactelor ML (necesita MLflow extern)
  • Fara caching integrat al etapelor de pipeline
  • Transfer limitat de date intre task-uri (XCom are limite de dimensiune)
  • Fara izolare container implicita (necesita KubernetesExecutor)

Kubeflow Pipelines pentru ML

Kubeflow Pipelines este construit special pentru ML pe Kubernetes. Fiecare etapa ruleaza in propriul container cu propriile dependinte.

from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
 
@dsl.component(base_image="python:3.11-slim", packages_to_install=["pandas", "great-expectations"])
def validate_data(data_path: str, validated_data: Output[Dataset], stats: Output[Metrics]):
    import pandas as pd
    df = pd.read_parquet(data_path)
    stats.log_metric("rows", len(df))
    stats.log_metric("columns", len(df.columns))
    # Validation logic...
    df.to_parquet(validated_data.path)
 
@dsl.component(base_image="python:3.11-slim", packages_to_install=["pandas", "scikit-learn"])
def compute_features(data: Input[Dataset], features: Output[Dataset]):
    import pandas as pd
    df = pd.read_parquet(data.path)
    # Feature engineering...
    df.to_parquet(features.path)
 
@dsl.component(
    base_image="nvcr.io/nvidia/pytorch:24.01-py3",
)
def train_model(features: Input[Dataset], model: Output[Model], metrics: Output[Metrics]):
    import pandas as pd
    import joblib
    from sklearn.ensemble import GradientBoostingClassifier
    df = pd.read_parquet(features.path)
    X, y = df.drop("target", axis=1), df["target"]
    clf = GradientBoostingClassifier(n_estimators=200, max_depth=8)
    clf.fit(X, y)
    joblib.dump(clf, model.path)
    metrics.log_metric("train_accuracy", clf.score(X, y))
 
@dsl.pipeline(name="ML Training Pipeline")
def training_pipeline(data_path: str = "gs://ml-data/latest.parquet"):
    validate_task = validate_data(data_path=data_path)
    feature_task = compute_features(data=validate_task.outputs["validated_data"])
 
    train_task = train_model(features=feature_task.outputs["features"])
    train_task.set_gpu_limit(1)
    train_task.set_memory_limit("16Gi")

Punctele Forte ale Kubeflow

  • Urmarire artefacte nativa ML (datasets, modele, metrici)
  • Caching integrat (sare peste etapele cu input-uri neschimbate)
  • Izolare container per etapa (fara conflicte de dependinte)
  • Programare GPU nativa
  • Input-uri/output-uri tipizate previn erorile de conectare

Limitarile Kubeflow

  • Necesita cluster Kubernetes
  • Curba de invatare mai abrupta
  • Overhead de infrastructura mai mare
  • Mai putin matur decat Airflow pentru sarcini non-ML

Pentru o aprofundare, consulta tutorialul nostru Kubeflow Pipelines.

Prefect pentru ML

Prefect este orchestratorul modern de workflow-uri, nativ Python. Este mai simplu de configurat decat Airflow si nu necesita Kubernetes ca Kubeflow.

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
 
@task(
    retries=2,
    retry_delay_seconds=60,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=12),
)
def validate_data(data_path: str) -> dict:
    """Validate training data quality."""
    import pandas as pd
    df = pd.read_parquet(data_path)
    stats = {
        "rows": len(df),
        "null_rate": df.isnull().mean().mean(),
        "valid": len(df) > 1000 and df.isnull().mean().mean() < 0.05,
    }
    if not stats["valid"]:
        raise ValueError(f"Data validation failed: {stats}")
    return stats
 
@task(retries=1, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=6))
def compute_features(data_path: str) -> str:
    """Engineer features and save to storage."""
    import pandas as pd
    df = pd.read_parquet(data_path)
    # Feature computation...
    output_path = "s3://features/latest.parquet"
    df.to_parquet(output_path)
    return output_path
 
@task(retries=1, timeout_seconds=3600)
def train_model(features_path: str, experiment_name: str) -> dict:
    """Train model and log to MLflow."""
    import pandas as pd
    import mlflow
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.metrics import f1_score
 
    df = pd.read_parquet(features_path)
    X, y = df.drop("target", axis=1), df["target"]
 
    mlflow.set_experiment(experiment_name)
    with mlflow.start_run():
        model = GradientBoostingClassifier(n_estimators=200)
        model.fit(X, y)
        f1 = f1_score(y, model.predict(X), average="weighted")
        mlflow.log_metric("f1", f1)
        mlflow.sklearn.log_model(model, "model", registered_model_name="churn-predictor")
 
    return {"f1": f1, "run_id": mlflow.active_run().info.run_id}
 
@task
def quality_gate(metrics: dict, threshold: float = 0.85) -> bool:
    """Check if model meets quality bar."""
    passed = metrics["f1"] >= threshold
    if not passed:
        from prefect import get_run_logger
        get_run_logger().warning(f"Model F1 {metrics['f1']:.3f} below threshold {threshold}")
    return passed
 
@flow(name="ML Training Pipeline", retries=1)
def training_pipeline(
    data_path: str = "s3://data/latest.parquet",
    experiment_name: str = "production-training",
):
    # Steps execute in dependency order
    stats = validate_data(data_path)
    features_path = compute_features(data_path)
    metrics = train_model(features_path, experiment_name)
    passed = quality_gate(metrics)
 
    if passed:
        from prefect import get_run_logger
        get_run_logger().info(f"Model passed quality gate! F1: {metrics['f1']:.3f}")
 
    return {"metrics": metrics, "passed": passed}
 
# Run locally or deploy to Prefect Cloud
if __name__ == "__main__":
    training_pipeline()

Punctele Forte ale Prefect

  • Configurare extrem de simpla (pip install, fara infrastructura)
  • Python pur, fara YAML, fara compilare
  • Caching integrat cu chei bazate pe continut
  • Deployment flexibil (local, Docker, K8s, serverless)
  • UI modern cu vizualizare flow in timp real

Limitarile Prefect

  • Ecosistem ML-specific mai mic decat Kubeflow
  • Fara urmarire nativa de artefacte (foloseste MLflow)
  • Suport mai slab pentru programare GPU
  • Prefect Cloud necesar pentru functionalitatile de productie

Framework de Decizie

Alege Airflow daca:

  • Ai deja un deployment Airflow
  • Pipeline-urile tale ML impart infrastructura cu data engineering
  • Ai nevoie de programare pentru 100+ DAG-uri
  • Echipa ta este confortabila cu paradigma Airflow

Alege Kubeflow daca:

  • Rulezi pe Kubernetes
  • Ai nevoie de programare GPU
  • Izolarea container per etapa este importanta
  • Urmarirea artefactelor ML trebuie sa fie integrata
  • Tutorial detaliat Kubeflow: Ghid Kubeflow Pipelines

Alege Prefect daca:

  • Esti o echipa mica (< 10 ingineri ML)
  • Vrei calea cea mai rapida spre productie
  • Preferi workflow-uri in Python pur
  • Nu ai nevoie de orchestrare GPU integrata

Abordare Hibrida

Multe echipe folosesc o combinatie:

Airflow (Data Engineering)
    │
    ▼  Produce date de antrenare programat
Kubeflow Pipelines (Antrenare ML)
    │
    ▼  Inregistreaza modelul in MLflow
Prefect (Deployment Model + Monitorizare)
    │
    ▼  Face deployment pe KServe, monitorizeaza performanta

Resurse Conexe


Ai nevoie de ajutor sa alegi si sa implementezi un orchestrator ML? DeviDevs proiecteaza pipeline-uri ML de productie cu Kubeflow, Airflow sau Prefect. Solicita o evaluare gratuita →


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.