Kubeflow Pipelines: Construirea de Workflow-uri ML Scalabile pe Kubernetes
Kubeflow Pipelines este platforma open-source de referinta pentru construirea si deploymentul workflow-urilor ML pe Kubernetes. Ofera un SDK Python pentru definirea pipeline-urilor, un UI pentru urmarirea rulajelor si integrare nativa cu Kubernetes scheduling pentru workload-uri GPU.
De ce Kubeflow pentru pipeline-uri ML?
Desi orchestratoare mai simple precum Airflow functioneaza pentru data engineering, pipeline-urile ML au cerinte unice:
- Programare GPU: pasii de antrenament au nevoie de noduri GPU, preprocesarea nu
- Urmarirea artefactelor: modelele, seturile de date si metricile trebuie versionate
- Caching: sari peste pasii costisitori cand input-urile nu s-au schimbat
- Izolare: fiecare pas ruleaza in propriul container cu propriile dependinte
- Reproductibilitate: definitiile pipeline-urilor sunt cod versionat
Kubeflow rezolva toate acestea nativ pe Kubernetes.
Instalare
# Install the Kubeflow Pipelines SDK
pip install kfp==2.7.0
# For a local development cluster (kind or minikube)
# Deploy Kubeflow Pipelines standalone
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=2.2.0"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic-pns?ref=2.2.0"
# Port forward the UI
kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8080:80Partea 1: Construirea primului tau pipeline
Definirea componentelor pipeline-ului
Fiecare pas al pipeline-ului este o componenta: o functie containerizata cu input-uri si output-uri tipizate.
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"],
)
def prepare_data(
raw_data_path: str,
split_ratio: float,
train_dataset: Output[Dataset],
test_dataset: Output[Dataset],
data_stats: Output[Metrics],
):
"""Load and split data for training."""
import pandas as pd
from sklearn.model_selection import train_test_split
df = pd.read_csv(raw_data_path)
# Log data statistics
data_stats.log_metric("total_rows", len(df))
data_stats.log_metric("feature_count", len(df.columns) - 1)
data_stats.log_metric("null_percentage", df.isnull().mean().mean() * 100)
# Split
train_df, test_df = train_test_split(df, test_size=split_ratio, random_state=42)
data_stats.log_metric("train_rows", len(train_df))
data_stats.log_metric("test_rows", len(test_df))
# Save to output artifacts
train_df.to_csv(train_dataset.path, index=False)
test_df.to_csv(test_dataset.path, index=False)@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0", "mlflow==2.12.0"],
)
def train_model(
train_dataset: Input[Dataset],
n_estimators: int,
max_depth: int,
mlflow_tracking_uri: str,
trained_model: Output[Model],
training_metrics: Output[Metrics],
):
"""Train a Random Forest classifier and log to MLflow."""
import pandas as pd
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import joblib
# Load data
train_df = pd.read_csv(train_dataset.path)
X_train = train_df.drop("target", axis=1)
y_train = train_df["target"]
# Train
model = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth, random_state=42)
model.fit(X_train, y_train)
# Evaluate on training data
y_pred = model.predict(X_train)
train_acc = accuracy_score(y_train, y_pred)
train_f1 = f1_score(y_train, y_pred, average="weighted")
# Log metrics to Kubeflow
training_metrics.log_metric("train_accuracy", train_acc)
training_metrics.log_metric("train_f1", train_f1)
# Log to MLflow
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment("kubeflow-training")
with mlflow.start_run():
mlflow.log_params({"n_estimators": n_estimators, "max_depth": max_depth})
mlflow.log_metrics({"train_accuracy": train_acc, "train_f1": train_f1})
mlflow.sklearn.log_model(model, "model")
# Save model artifact
joblib.dump(model, trained_model.path)@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"],
)
def evaluate_model(
test_dataset: Input[Dataset],
trained_model: Input[Model],
eval_metrics: Output[Metrics],
accuracy_threshold: float = 0.85,
) -> bool:
"""Evaluate model against test data and quality gate."""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# Load
test_df = pd.read_csv(test_dataset.path)
X_test = test_df.drop("target", axis=1)
y_test = test_df["target"]
model = joblib.load(trained_model.path)
# Evaluate
y_pred = model.predict(X_test)
metrics = {
"test_accuracy": accuracy_score(y_test, y_pred),
"test_precision": precision_score(y_test, y_pred, average="weighted"),
"test_recall": recall_score(y_test, y_pred, average="weighted"),
"test_f1": f1_score(y_test, y_pred, average="weighted"),
}
for name, value in metrics.items():
eval_metrics.log_metric(name, value)
passed = metrics["test_accuracy"] >= accuracy_threshold
eval_metrics.log_metric("quality_gate_passed", int(passed))
return passedCompunerea pipeline-ului
@dsl.pipeline(
name="ML Training Pipeline",
description="End-to-end training pipeline with quality gates",
)
def training_pipeline(
raw_data_path: str = "gs://ml-data/customer-churn/raw.csv",
split_ratio: float = 0.2,
n_estimators: int = 100,
max_depth: int = 10,
accuracy_threshold: float = 0.85,
mlflow_tracking_uri: str = "http://mlflow.ml-platform:5000",
):
# Step 1: Prepare data
prepare_task = prepare_data(
raw_data_path=raw_data_path,
split_ratio=split_ratio,
)
# Step 2: Train model
train_task = train_model(
train_dataset=prepare_task.outputs["train_dataset"],
n_estimators=n_estimators,
max_depth=max_depth,
mlflow_tracking_uri=mlflow_tracking_uri,
)
# Step 3: Evaluate
eval_task = evaluate_model(
test_dataset=prepare_task.outputs["test_dataset"],
trained_model=train_task.outputs["trained_model"],
accuracy_threshold=accuracy_threshold,
)
# Step 4: Conditional deployment
with dsl.Condition(eval_task.output == True): # noqa: E712
deploy_model(
model=train_task.outputs["trained_model"],
model_name="customer-churn",
)Compilare si trimitere
from kfp import compiler
from kfp.client import Client
# Compile to YAML
compiler.Compiler().compile(
pipeline_func=training_pipeline,
package_path="training_pipeline.yaml",
)
# Submit to Kubeflow
client = Client(host="http://localhost:8080")
run = client.create_run_from_pipeline_package(
pipeline_file="training_pipeline.yaml",
arguments={
"raw_data_path": "gs://ml-data/customer-churn/raw.csv",
"n_estimators": 200,
"max_depth": 12,
},
run_name="churn-model-training-v2",
experiment_name="churn-model",
)
print(f"Run ID: {run.run_id}")Partea 2: Programare GPU si resurse
Kubeflow ruleaza pe Kubernetes, oferindu-ti control complet asupra alocarii resurselor:
@dsl.component(
base_image="nvcr.io/nvidia/pytorch:24.01-py3",
)
def train_deep_model(
train_dataset: Input[Dataset],
trained_model: Output[Model],
epochs: int = 50,
learning_rate: float = 0.001,
):
"""Train a deep learning model on GPU."""
import torch
import torch.nn as nn
# ... training code using CUDA
# In the pipeline, set resource requirements
@dsl.pipeline(name="GPU Training Pipeline")
def gpu_pipeline():
train_task = train_deep_model(
train_dataset=prepare_task.outputs["train_dataset"],
epochs=100,
)
# Request GPU resources
train_task.set_gpu_limit(1)
train_task.set_memory_limit("16Gi")
train_task.set_cpu_limit("4")
# Node selector for GPU nodes
train_task.add_node_selector_constraint(
"cloud.google.com/gke-accelerator", "nvidia-tesla-t4"
)
# Tolerate GPU node taints
train_task.add_toleration({
"key": "nvidia.com/gpu",
"operator": "Equal",
"value": "present",
"effect": "NoSchedule",
})Partea 3: Caching in pipeline
Kubeflow face cache la output-urile pasilor pe baza input-urilor. Daca input-urile unui pas nu s-au schimbat, reutilizeaza output-ul din cache:
# Caching is enabled by default. Disable for non-deterministic steps:
@dsl.pipeline(name="Pipeline with Cache Control")
def cached_pipeline():
# This step caches because same input → same output
prepare_task = prepare_data(raw_data_path="gs://data/v2.csv", split_ratio=0.2)
# Disable cache for this step (e.g., it reads from a live database)
live_data_task = fetch_live_metrics()
live_data_task.set_caching_options(False)Partea 4: Pipeline-uri recurente (reantrenare programata)
# Create a recurring run for nightly retraining
client = Client(host="http://localhost:8080")
recurring_run = client.create_recurring_run(
experiment_id=experiment.experiment_id,
job_name="nightly-churn-retraining",
pipeline_package_path="training_pipeline.yaml",
cron_expression="0 2 * * *", # Every day at 2 AM
max_concurrency=1,
parameters={
"raw_data_path": "gs://ml-data/customer-churn/latest.csv",
"accuracy_threshold": 0.87,
},
)Partea 5: Pattern-uri avansate
Antrenament paralel cu algoritmi diferiti
@dsl.pipeline(name="Model Selection Pipeline")
def model_selection_pipeline(data_path: str):
prepare_task = prepare_data(raw_data_path=data_path, split_ratio=0.2)
# Train multiple models in parallel
rf_task = train_random_forest(
train_dataset=prepare_task.outputs["train_dataset"],
n_estimators=200,
)
xgb_task = train_xgboost(
train_dataset=prepare_task.outputs["train_dataset"],
max_depth=8,
)
lgbm_task = train_lightgbm(
train_dataset=prepare_task.outputs["train_dataset"],
num_leaves=31,
)
# Compare and select best
select_task = select_best_model(
rf_model=rf_task.outputs["model"],
xgb_model=xgb_task.outputs["model"],
lgbm_model=lgbm_task.outputs["model"],
test_dataset=prepare_task.outputs["test_dataset"],
)Compozitia pipeline-urilor (sub-pipeline-uri reutilizabile)
@dsl.pipeline(name="Feature Engineering Sub-pipeline")
def feature_engineering(raw_data: Input[Dataset]) -> Output[Dataset]:
clean_task = clean_data(dataset=raw_data)
encode_task = encode_categoricals(dataset=clean_task.outputs["cleaned"])
scale_task = scale_numerics(dataset=encode_task.outputs["encoded"])
return scale_task.outputs["features"]
@dsl.pipeline(name="Full Training Pipeline")
def full_pipeline(data_path: str):
ingest_task = ingest_data(path=data_path)
features_task = feature_engineering(raw_data=ingest_task.outputs["data"])
train_task = train_model(features=features_task.outputs["features"])Kubeflow vs. alte orchestratoare
| Caracteristica | Kubeflow | Airflow | Prefect | Vertex AI | |----------------|----------|---------|---------|-----------| | Nativ ML | Da | Nu (general) | Nu (general) | Da | | Izolare container | Per-pas | Optional | Optional | Per-pas | | Programare GPU | Nativ K8s | Manual | Manual | Managed | | Urmarire artefacte | Integrat | Extern | Extern | Integrat | | Caching | Integrat | Limitat | Basic | Integrat | | Self-hosted | Da | Da | Da | Nu (GCP) | | Curba de invatare | Medie | Medie | Scazuta | Scazuta | | Cel mai bun pentru | Echipe K8s | Data eng | Echipe mici | Utilizatori GCP |
Pentru echipele care deja ruleaza Kubernetes, Kubeflow Pipelines este alegerea naturala. Pentru echipele care doresc infrastructura managed, optiunile cloud-native precum Vertex AI Pipelines (care foloseste acelasi SDK KFP) pot reduce povara operationala.
Checklist de deployment in productie
Inainte de a rula Kubeflow Pipelines in productie:
- [ ] Backend PostgreSQL pentru metadata (nu SQLite implicit)
- [ ] Stocare artefacte pe S3/GCS (nu sistem de fisiere local)
- [ ] RBAC configurat pentru accesul la pipeline-uri
- [ ] Cote de resurse setate per namespace
- [ ] Versionarea pipeline-urilor cu tag-uri Git
- [ ] Monitorizare prin Prometheus + Grafana
- [ ] Strategie de backup pentru baza de date metadata
- [ ] Gestionarea secretelor prin Kubernetes Secrets sau Vault
Pasi urmatori
- Integrare MLflow: Foloseste MLflow pentru experiment tracking in pipeline-urile Kubeflow
- Feature stores: Alimenteaza feature-uri consistente in pasii de antrenament Kubeflow
- Bune practici MLOps: Aplica pattern-uri de productie la workflow-urile tale Kubeflow
- Securitate AI: Securizeaza componentele pipeline-ului si artefactele modelelor
Ai nevoie de ajutor cu configurarea Kubeflow pentru echipa ta? DeviDevs proiecteaza si opereaza platforme ML de productie pe Kubernetes. Obtine o evaluare gratuita →