MLOps

Kubeflow Pipelines: Construirea de Workflow-uri ML Scalabile pe Kubernetes

Petru Constantin
--7 min lectura
#Kubeflow#ML pipeline#Kubernetes#MLOps#pipeline orchestration#model training

Kubeflow Pipelines: Construirea de Workflow-uri ML Scalabile pe Kubernetes

Kubeflow Pipelines este platforma open-source de referinta pentru construirea si deploymentul workflow-urilor ML pe Kubernetes. Ofera un SDK Python pentru definirea pipeline-urilor, un UI pentru urmarirea rulajelor si integrare nativa cu Kubernetes scheduling pentru workload-uri GPU.

De ce Kubeflow pentru pipeline-uri ML?

Desi orchestratoare mai simple precum Airflow functioneaza pentru data engineering, pipeline-urile ML au cerinte unice:

  • Programare GPU: pasii de antrenament au nevoie de noduri GPU, preprocesarea nu
  • Urmarirea artefactelor: modelele, seturile de date si metricile trebuie versionate
  • Caching: sari peste pasii costisitori cand input-urile nu s-au schimbat
  • Izolare: fiecare pas ruleaza in propriul container cu propriile dependinte
  • Reproductibilitate: definitiile pipeline-urilor sunt cod versionat

Kubeflow rezolva toate acestea nativ pe Kubernetes.

Instalare

# Install the Kubeflow Pipelines SDK
pip install kfp==2.7.0
 
# For a local development cluster (kind or minikube)
# Deploy Kubeflow Pipelines standalone
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"
 
# Port forward the UI
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80

Partea 1: Construirea primului tau pipeline

Definirea componentelor pipeline-ului

Fiecare pas al pipeline-ului este o componenta: o functie containerizata cu input-uri si output-uri tipizate.

from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
 
@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"],
)
def prepare_data(
    raw_data_path: str,
    split_ratio: float,
    train_dataset: Output[Dataset],
    test_dataset: Output[Dataset],
    data_stats: Output[Metrics],
):
    """Load and split data for training."""
    import pandas as pd
    from sklearn.model_selection import train_test_split
 
    df = pd.read_csv(raw_data_path)
 
    # Log data statistics
    data_stats.log_metric("total_rows", len(df))
    data_stats.log_metric("feature_count", len(df.columns) - 1)
    data_stats.log_metric("null_percentage", df.isnull().mean().mean() * 100)
 
    # Split
    train_df, test_df = train_test_split(df, test_size=split_ratio, random_state=42)
    data_stats.log_metric("train_rows", len(train_df))
    data_stats.log_metric("test_rows", len(test_df))
 
    # Save to output artifacts
    train_df.to_csv(train_dataset.path, index=False)
    test_df.to_csv(test_dataset.path, index=False)
@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "mlflow==2.12.0"],
)
def train_model(
    train_dataset: Input[Dataset],
    n_estimators: int,
    max_depth: int,
    mlflow_tracking_uri: str,
    trained_model: Output[Model],
    training_metrics: Output[Metrics],
):
    """Train a Random Forest classifier and log to MLflow."""
    import pandas as pd
    import mlflow
    import mlflow.sklearn
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import accuracy_score, f1_score
    import joblib
 
    # Load data
    train_df = pd.read_csv(train_dataset.path)
    X_train = train_df.drop("target", axis=1)
    y_train = train_df["target"]
 
    # Train
    model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
    model.fit(X_train, y_train)
 
    # Evaluate on training data
    y_pred = model.predict(X_train)
    train_acc = accuracy_score(y_train, y_pred)
    train_f1 = f1_score(y_train, y_pred, average="weighted")
 
    # Log metrics to Kubeflow
    training_metrics.log_metric("train_accuracy", train_acc)
    training_metrics.log_metric("train_f1", train_f1)
 
    # Log to MLflow
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment("kubeflow-training")
    with mlflow.start_run():
        mlflow.log_params({"n_estimators": n_estimators, "max_depth": max_depth})
        mlflow.log_metrics({"train_accuracy": train_acc, "train_f1": train_f1})
        mlflow.sklearn.log_model(model, "model")
 
    # Save model artifact
    joblib.dump(model, trained_model.path)
@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"],
)
def evaluate_model(
    test_dataset: Input[Dataset],
    trained_model: Input[Model],
    eval_metrics: Output[Metrics],
    accuracy_threshold: float = 0.85,
) -> bool:
    """Evaluate model against test data and quality gate."""
    import pandas as pd
    import joblib
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
 
    # Load
    test_df = pd.read_csv(test_dataset.path)
    X_test = test_df.drop("target", axis=1)
    y_test = test_df["target"]
    model = joblib.load(trained_model.path)
 
    # Evaluate
    y_pred = model.predict(X_test)
    metrics = {
        "test_accuracy": accuracy_score(y_test, y_pred),
        "test_precision": precision_score(y_test, y_pred, average="weighted"),
        "test_recall": recall_score(y_test, y_pred, average="weighted"),
        "test_f1": f1_score(y_test, y_pred, average="weighted"),
    }
 
    for name, value in metrics.items():
        eval_metrics.log_metric(name, value)
 
    passed = metrics["test_accuracy"] >= accuracy_threshold
    eval_metrics.log_metric("quality_gate_passed", int(passed))
    return passed

Compunerea pipeline-ului

@dsl.pipeline(
    name="ML Training Pipeline",
    description="End-to-end training pipeline with quality gates",
)
def training_pipeline(
    raw_data_path: str = "gs://ml-data/customer-churn/raw.csv",
    split_ratio: float = 0.2,
    n_estimators: int = 100,
    max_depth: int = 10,
    accuracy_threshold: float = 0.85,
    mlflow_tracking_uri: str = "http://mlflow.ml-platform:5000",
):
    # Step 1: Prepare data
    prepare_task = prepare_data(
        raw_data_path=raw_data_path,
        split_ratio=split_ratio,
    )
 
    # Step 2: Train model
    train_task = train_model(
        train_dataset=prepare_task.outputs["train_dataset"],
        n_estimators=n_estimators,
        max_depth=max_depth,
        mlflow_tracking_uri=mlflow_tracking_uri,
    )
 
    # Step 3: Evaluate
    eval_task = evaluate_model(
        test_dataset=prepare_task.outputs["test_dataset"],
        trained_model=train_task.outputs["trained_model"],
        accuracy_threshold=accuracy_threshold,
    )
 
    # Step 4: Conditional deployment
    with dsl.Condition(eval_task.output == True):  # noqa: E712
        deploy_model(
            model=train_task.outputs["trained_model"],
            model_name="customer-churn",
        )

Compilare si trimitere

from kfp import compiler
from kfp.client import Client
 
# Compile to YAML
compiler.Compiler().compile(
    pipeline_func=training_pipeline,
    package_path="training_pipeline.yaml",
)
 
# Submit to Kubeflow
client = Client(host="http://localhost:8080")
run = client.create_run_from_pipeline_package(
    pipeline_file="training_pipeline.yaml",
    arguments={
        "raw_data_path": "gs://ml-data/customer-churn/raw.csv",
        "n_estimators": 200,
        "max_depth": 12,
    },
    run_name="churn-model-training-v2",
    experiment_name="churn-model",
)
print(f"Run ID: {run.run_id}")

Partea 2: Programare GPU si resurse

Kubeflow ruleaza pe Kubernetes, oferindu-ti control complet asupra alocarii resurselor:

@dsl.component(
    base_image="nvcr.io/nvidia/pytorch:24.01-py3",
)
def train_deep_model(
    train_dataset: Input[Dataset],
    trained_model: Output[Model],
    epochs: int = 50,
    learning_rate: float = 0.001,
):
    """Train a deep learning model on GPU."""
    import torch
    import torch.nn as nn
    # ... training code using CUDA
 
# In the pipeline, set resource requirements
@dsl.pipeline(name="GPU Training Pipeline")
def gpu_pipeline():
    train_task = train_deep_model(
        train_dataset=prepare_task.outputs["train_dataset"],
        epochs=100,
    )
 
    # Request GPU resources
    train_task.set_gpu_limit(1)
    train_task.set_memory_limit("16Gi")
    train_task.set_cpu_limit("4")
 
    # Node selector for GPU nodes
    train_task.add_node_selector_constraint(
        "cloud.google.com/gke-accelerator", "nvidia-tesla-t4"
    )
 
    # Tolerate GPU node taints
    train_task.add_toleration({
        "key": "nvidia.com/gpu",
        "operator": "Equal",
        "value": "present",
        "effect": "NoSchedule",
    })

Partea 3: Caching in pipeline

Kubeflow face cache la output-urile pasilor pe baza input-urilor. Daca input-urile unui pas nu s-au schimbat, reutilizeaza output-ul din cache:

# Caching is enabled by default. Disable for non-deterministic steps:
@dsl.pipeline(name="Pipeline with Cache Control")
def cached_pipeline():
    # This step caches because same input → same output
    prepare_task = prepare_data(raw_data_path="gs://data/v2.csv", split_ratio=0.2)
 
    # Disable cache for this step (e.g., it reads from a live database)
    live_data_task = fetch_live_metrics()
    live_data_task.set_caching_options(False)

Partea 4: Pipeline-uri recurente (reantrenare programata)

# Create a recurring run for nightly retraining
client = Client(host="http://localhost:8080")
 
recurring_run = client.create_recurring_run(
    experiment_id=experiment.experiment_id,
    job_name="nightly-churn-retraining",
    pipeline_package_path="training_pipeline.yaml",
    cron_expression="0 2 * * *",  # Every day at 2 AM
    max_concurrency=1,
    parameters={
        "raw_data_path": "gs://ml-data/customer-churn/latest.csv",
        "accuracy_threshold": 0.87,
    },
)

Partea 5: Pattern-uri avansate

Antrenament paralel cu algoritmi diferiti

@dsl.pipeline(name="Model Selection Pipeline")
def model_selection_pipeline(data_path: str):
    prepare_task = prepare_data(raw_data_path=data_path, split_ratio=0.2)
 
    # Train multiple models in parallel
    rf_task = train_random_forest(
        train_dataset=prepare_task.outputs["train_dataset"],
        n_estimators=200,
    )
    xgb_task = train_xgboost(
        train_dataset=prepare_task.outputs["train_dataset"],
        max_depth=8,
    )
    lgbm_task = train_lightgbm(
        train_dataset=prepare_task.outputs["train_dataset"],
        num_leaves=31,
    )
 
    # Compare and select best
    select_task = select_best_model(
        rf_model=rf_task.outputs["model"],
        xgb_model=xgb_task.outputs["model"],
        lgbm_model=lgbm_task.outputs["model"],
        test_dataset=prepare_task.outputs["test_dataset"],
    )

Compozitia pipeline-urilor (sub-pipeline-uri reutilizabile)

@dsl.pipeline(name="Feature Engineering Sub-pipeline")
def feature_engineering(raw_data: Input[Dataset]) -> Output[Dataset]:
    clean_task = clean_data(dataset=raw_data)
    encode_task = encode_categoricals(dataset=clean_task.outputs["cleaned"])
    scale_task = scale_numerics(dataset=encode_task.outputs["encoded"])
    return scale_task.outputs["features"]
 
@dsl.pipeline(name="Full Training Pipeline")
def full_pipeline(data_path: str):
    ingest_task = ingest_data(path=data_path)
    features_task = feature_engineering(raw_data=ingest_task.outputs["data"])
    train_task = train_model(features=features_task.outputs["features"])

Kubeflow vs. alte orchestratoare

| Caracteristica | Kubeflow | Airflow | Prefect | Vertex AI | |----------------|----------|---------|---------|-----------| | Nativ ML | Da | Nu (general) | Nu (general) | Da | | Izolare container | Per-pas | Optional | Optional | Per-pas | | Programare GPU | Nativ K8s | Manual | Manual | Managed | | Urmarire artefacte | Integrat | Extern | Extern | Integrat | | Caching | Integrat | Limitat | Basic | Integrat | | Self-hosted | Da | Da | Da | Nu (GCP) | | Curba de invatare | Medie | Medie | Scazuta | Scazuta | | Cel mai bun pentru | Echipe K8s | Data eng | Echipe mici | Utilizatori GCP |

Pentru echipele care deja ruleaza Kubernetes, Kubeflow Pipelines este alegerea naturala. Pentru echipele care doresc infrastructura managed, optiunile cloud-native precum Vertex AI Pipelines (care foloseste acelasi SDK KFP) pot reduce povara operationala.

Checklist de deployment in productie

Inainte de a rula Kubeflow Pipelines in productie:

  • [ ] Backend PostgreSQL pentru metadata (nu SQLite implicit)
  • [ ] Stocare artefacte pe S3/GCS (nu sistem de fisiere local)
  • [ ] RBAC configurat pentru accesul la pipeline-uri
  • [ ] Cote de resurse setate per namespace
  • [ ] Versionarea pipeline-urilor cu tag-uri Git
  • [ ] Monitorizare prin Prometheus + Grafana
  • [ ] Strategie de backup pentru baza de date metadata
  • [ ] Gestionarea secretelor prin Kubernetes Secrets sau Vault

Pasi urmatori

  • Integrare MLflow: Foloseste MLflow pentru experiment tracking in pipeline-urile Kubeflow
  • Feature stores: Alimenteaza feature-uri consistente in pasii de antrenament Kubeflow
  • Bune practici MLOps: Aplica pattern-uri de productie la workflow-urile tale Kubeflow
  • Securitate AI: Securizeaza componentele pipeline-ului si artefactele modelelor

Ai nevoie de ajutor cu configurarea Kubeflow pentru echipa ta? DeviDevs proiecteaza si opereaza platforme ML de productie pe Kubernetes. Obtine o evaluare gratuita →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.