Orchestrarea Pipeline-urilor ML: Airflow vs Kubeflow vs Prefect pentru MLOps
Fiecare sistem ML in productie are nevoie de un orchestrator - ceva care sa ruleze etapele de ingestie date, calcul features, antrenare, evaluare si deployment in ordinea corecta, sa gestioneze erorile si sa ofere vizibilitate. Cele trei optiuni dominante sunt Apache Airflow, Kubeflow Pipelines si Prefect.
Acest ghid le compara cu exemple reale de cod ca sa poti face alegerea potrivita pentru echipa ta.
Comparatie Rapida
| Functionalitate | Airflow | Kubeflow Pipelines | Prefect | |-----------------|---------|-------------------|---------| | Utilizare principala | Data engineering + ML | Nativ ML | Workflow general | | Executie | Workers (Celery/K8s) | Pod-uri Kubernetes | Hibrid (local/cloud) | | Izolare container | KubernetesExecutor | Per-step (mereu) | Optional | | Suport GPU | Config manual K8s | Nativ | Manual | | Urmarire artefacte ML | Extern (MLflow) | Integrat | Extern | | Caching | Limitat | Integrat (bazat pe input) | La nivel de task | | UI | DAG complet + log-uri | Pipeline + artefacte | Dashboard + flow-uri | | Complexitate configurare | Medie-Ridicata | Ridicata (necesita K8s) | Scazuta | | Comunitate | Masiva | Mare (CNCF) | In crestere | | Cel mai bun pentru | Echipe de date cu ML | Echipe ML native pe K8s | Echipe mici-medii |
Apache Airflow pentru ML
Airflow este standardul industrial pentru orchestrarea pipeline-urilor de date. Desi nu este specific ML, este folosit pe scara larga pentru workflow-uri ML datorita maturitatii si ecosistemului sau.
DAG de Antrenare ML
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
"owner": "ml-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["ml-alerts@company.com"],
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="ml_training_pipeline",
default_args=default_args,
description="Nightly model retraining pipeline",
schedule_interval="0 2 * * *", # 2 AM daily
start_date=days_ago(1),
catchup=False,
tags=["ml", "training"],
) as dag:
validate_data = KubernetesPodOperator(
task_id="validate_data",
name="data-validation",
image="registry.company.com/ml/data-validator:latest",
cmds=["python", "validate.py"],
arguments=["--source", "s3://data/latest/", "--schema", "configs/schema.yaml"],
namespace="ml-pipelines",
get_logs=True,
)
compute_features = KubernetesPodOperator(
task_id="compute_features",
name="feature-engineering",
image="registry.company.com/ml/feature-engine:latest",
cmds=["python", "compute_features.py"],
arguments=["--input", "s3://data/latest/", "--output", "s3://features/latest/"],
namespace="ml-pipelines",
resources={
"request_memory": "4Gi",
"request_cpu": "2",
},
)
train_model = KubernetesPodOperator(
task_id="train_model",
name="model-training",
image="registry.company.com/ml/trainer:latest",
cmds=["python", "train.py"],
arguments=["--features", "s3://features/latest/", "--experiment", "nightly"],
namespace="ml-pipelines",
resources={
"request_memory": "8Gi",
"request_cpu": "4",
"limit_gpu": "1",
},
node_selector={"gpu": "true"},
)
evaluate_model = KubernetesPodOperator(
task_id="evaluate_model",
name="model-evaluation",
image="registry.company.com/ml/evaluator:latest",
cmds=["python", "evaluate.py"],
namespace="ml-pipelines",
)
def check_quality_gate(**context):
"""Check if model passed quality gates."""
ti = context["ti"]
eval_result = ti.xcom_pull(task_ids="evaluate_model")
if eval_result and eval_result.get("passed"):
return "register_model"
return "notify_failure"
quality_gate = PythonOperator(
task_id="quality_gate",
python_callable=check_quality_gate,
)
register_model = KubernetesPodOperator(
task_id="register_model",
name="model-registration",
image="registry.company.com/ml/registry-client:latest",
cmds=["python", "register.py"],
namespace="ml-pipelines",
)
# DAG dependencies
validate_data >> compute_features >> train_model >> evaluate_model >> quality_gate >> register_modelPunctele Forte ale Airflow pentru ML
- Programare si monitorizare mature
- Ecosistem extins de provideri (AWS, GCP, Azure, Databricks)
- Testat in conditii reale la scara larga (1000+ DAG-uri)
- Comunitate puternica si documentatie ampla
Limitarile Airflow pentru ML
- Fara urmarire nativa a artefactelor ML (necesita MLflow extern)
- Fara caching integrat al etapelor de pipeline
- Transfer limitat de date intre task-uri (XCom are limite de dimensiune)
- Fara izolare container implicita (necesita KubernetesExecutor)
Kubeflow Pipelines pentru ML
Kubeflow Pipelines este construit special pentru ML pe Kubernetes. Fiecare etapa ruleaza in propriul container cu propriile dependinte.
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(base_image="python:3.11-slim", packages_to_install=["pandas", "great-expectations"])
def validate_data(data_path: str, validated_data: Output[Dataset], stats: Output[Metrics]):
import pandas as pd
df = pd.read_parquet(data_path)
stats.log_metric("rows", len(df))
stats.log_metric("columns", len(df.columns))
# Validation logic...
df.to_parquet(validated_data.path)
@dsl.component(base_image="python:3.11-slim", packages_to_install=["pandas", "scikit-learn"])
def compute_features(data: Input[Dataset], features: Output[Dataset]):
import pandas as pd
df = pd.read_parquet(data.path)
# Feature engineering...
df.to_parquet(features.path)
@dsl.component(
base_image="nvcr.io/nvidia/pytorch:24.01-py3",
)
def train_model(features: Input[Dataset], model: Output[Model], metrics: Output[Metrics]):
import pandas as pd
import joblib
from sklearn.ensemble import GradientBoostingClassifier
df = pd.read_parquet(features.path)
X, y = df.drop("target", axis=1), df["target"]
clf = GradientBoostingClassifier(n_estimators=200, max_depth=8)
clf.fit(X, y)
joblib.dump(clf, model.path)
metrics.log_metric("train_accuracy", clf.score(X, y))
@dsl.pipeline(name="ML Training Pipeline")
def training_pipeline(data_path: str = "gs://ml-data/latest.parquet"):
validate_task = validate_data(data_path=data_path)
feature_task = compute_features(data=validate_task.outputs["validated_data"])
train_task = train_model(features=feature_task.outputs["features"])
train_task.set_gpu_limit(1)
train_task.set_memory_limit("16Gi")Punctele Forte ale Kubeflow
- Urmarire artefacte nativa ML (datasets, modele, metrici)
- Caching integrat (sare peste etapele cu input-uri neschimbate)
- Izolare container per etapa (fara conflicte de dependinte)
- Programare GPU nativa
- Input-uri/output-uri tipizate previn erorile de conectare
Limitarile Kubeflow
- Necesita cluster Kubernetes
- Curba de invatare mai abrupta
- Overhead de infrastructura mai mare
- Mai putin matur decat Airflow pentru sarcini non-ML
Pentru o aprofundare, consulta tutorialul nostru Kubeflow Pipelines.
Prefect pentru ML
Prefect este orchestratorul modern de workflow-uri, nativ Python. Este mai simplu de configurat decat Airflow si nu necesita Kubernetes ca Kubeflow.
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
retries=2,
retry_delay_seconds=60,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=12),
)
def validate_data(data_path: str) -> dict:
"""Validate training data quality."""
import pandas as pd
df = pd.read_parquet(data_path)
stats = {
"rows": len(df),
"null_rate": df.isnull().mean().mean(),
"valid": len(df) > 1000 and df.isnull().mean().mean() < 0.05,
}
if not stats["valid"]:
raise ValueError(f"Data validation failed: {stats}")
return stats
@task(retries=1, cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=6))
def compute_features(data_path: str) -> str:
"""Engineer features and save to storage."""
import pandas as pd
df = pd.read_parquet(data_path)
# Feature computation...
output_path = "s3://features/latest.parquet"
df.to_parquet(output_path)
return output_path
@task(retries=1, timeout_seconds=3600)
def train_model(features_path: str, experiment_name: str) -> dict:
"""Train model and log to MLflow."""
import pandas as pd
import mlflow
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import f1_score
df = pd.read_parquet(features_path)
X, y = df.drop("target", axis=1), df["target"]
mlflow.set_experiment(experiment_name)
with mlflow.start_run():
model = GradientBoostingClassifier(n_estimators=200)
model.fit(X, y)
f1 = f1_score(y, model.predict(X), average="weighted")
mlflow.log_metric("f1", f1)
mlflow.sklearn.log_model(model, "model", registered_model_name="churn-predictor")
return {"f1": f1, "run_id": mlflow.active_run().info.run_id}
@task
def quality_gate(metrics: dict, threshold: float = 0.85) -> bool:
"""Check if model meets quality bar."""
passed = metrics["f1"] >= threshold
if not passed:
from prefect import get_run_logger
get_run_logger().warning(f"Model F1 {metrics['f1']:.3f} below threshold {threshold}")
return passed
@flow(name="ML Training Pipeline", retries=1)
def training_pipeline(
data_path: str = "s3://data/latest.parquet",
experiment_name: str = "production-training",
):
# Steps execute in dependency order
stats = validate_data(data_path)
features_path = compute_features(data_path)
metrics = train_model(features_path, experiment_name)
passed = quality_gate(metrics)
if passed:
from prefect import get_run_logger
get_run_logger().info(f"Model passed quality gate! F1: {metrics['f1']:.3f}")
return {"metrics": metrics, "passed": passed}
# Run locally or deploy to Prefect Cloud
if __name__ == "__main__":
training_pipeline()Punctele Forte ale Prefect
- Configurare extrem de simpla (pip install, fara infrastructura)
- Python pur, fara YAML, fara compilare
- Caching integrat cu chei bazate pe continut
- Deployment flexibil (local, Docker, K8s, serverless)
- UI modern cu vizualizare flow in timp real
Limitarile Prefect
- Ecosistem ML-specific mai mic decat Kubeflow
- Fara urmarire nativa de artefacte (foloseste MLflow)
- Suport mai slab pentru programare GPU
- Prefect Cloud necesar pentru functionalitatile de productie
Framework de Decizie
Alege Airflow daca:
- Ai deja un deployment Airflow
- Pipeline-urile tale ML impart infrastructura cu data engineering
- Ai nevoie de programare pentru 100+ DAG-uri
- Echipa ta este confortabila cu paradigma Airflow
Alege Kubeflow daca:
- Rulezi pe Kubernetes
- Ai nevoie de programare GPU
- Izolarea container per etapa este importanta
- Urmarirea artefactelor ML trebuie sa fie integrata
- Tutorial detaliat Kubeflow: Ghid Kubeflow Pipelines
Alege Prefect daca:
- Esti o echipa mica (< 10 ingineri ML)
- Vrei calea cea mai rapida spre productie
- Preferi workflow-uri in Python pur
- Nu ai nevoie de orchestrare GPU integrata
Abordare Hibrida
Multe echipe folosesc o combinatie:
Airflow (Data Engineering)
│
▼ Produce date de antrenare programat
Kubeflow Pipelines (Antrenare ML)
│
▼ Inregistreaza modelul in MLflow
Prefect (Deployment Model + Monitorizare)
│
▼ Face deployment pe KServe, monitorizeaza performanta
Resurse Conexe
- Ce este MLOps?: Unde se incadreaza orchestrarea pipeline in ciclul de viata MLOps
- Aprofundare Kubeflow Pipelines: Tutorial complet
- Bune practici MLOps: Pattern-uri de design pentru pipeline
- ML CI/CD: Automatizeaza executia pipeline-ului
Ai nevoie de ajutor sa alegi si sa implementezi un orchestrator ML? DeviDevs proiecteaza pipeline-uri ML de productie cu Kubeflow, Airflow sau Prefect. Solicita o evaluare gratuita →
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →