Tutorial MLflow: Experiment Tracking si Model Registry de la Zero
MLflow este cea mai adoptata platforma open-source pentru gestionarea ciclului de viata ML. Ofera experiment tracking, impachetare modele, un model registry si instrumente de deployment, toate fara vendor lock-in.
Acest tutorial te duce de la zero la un setup MLflow pregatit pentru productie cu exemple reale de cod.
De ce MLflow?
Inainte de MLflow, echipele ML urmareau experimentele in spreadsheet-uri, stocau modelele pe drive-uri partajate si faceau deployment prin email ("uite noul fisier pickle"). MLflow ofera:
- Experiment Tracking: Logheaza parametri, metrici si artefacte pentru fiecare rulare
- Model Registry: Versioneaza, stageaza si aproba modele centralizat
- Impachetare modele: Format consistent de model intre framework-uri
- Deployment: Serveste modele prin REST API, Docker sau platforme cloud
Instalare si configurare
# Instaleaza MLflow
pip install mlflow[extras]
# Porneste serverul de tracking cu backend SQLite
mlflow server \
--backend-store-uri sqlite:///mlflow.db \
--default-artifact-root ./mlflow-artifacts \
--host 0.0.0.0 \
--port 5000Pentru productie, foloseste PostgreSQL ca backend si S3/GCS pentru artefacte:
mlflow server \
--backend-store-uri postgresql://user:pass@db-host:5432/mlflow \
--default-artifact-root s3://ml-artifacts/mlflow \
--host 0.0.0.0 \
--port 5000Partea 1: Experiment Tracking
Logare de baza a experimentelor
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
# Conectare la serverul de tracking
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("iris-classification")
# Incarca date
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Defineste hiperparametri
params = {
"n_estimators": 100,
"max_depth": 5,
"min_samples_split": 2,
"random_state": 42,
}
with mlflow.start_run(run_name="rf-baseline"):
# Logheaza parametri
mlflow.log_params(params)
# Antreneaza model
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evalueaza
y_pred = model.predict(X_test)
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision_weighted": precision_score(y_test, y_pred, average="weighted"),
"recall_weighted": recall_score(y_test, y_pred, average="weighted"),
"f1_weighted": f1_score(y_test, y_pred, average="weighted"),
}
# Logheaza metrici
mlflow.log_metrics(metrics)
# Logheaza model
mlflow.sklearn.log_model(model, "model")
# Logheaza artefacte aditionale
import json
with open("feature_importance.json", "w") as f:
importance = dict(zip(load_iris().feature_names, model.feature_importances_))
json.dump(importance, f, indent=2)
mlflow.log_artifact("feature_importance.json")
print(f"Run ID: {mlflow.active_run().info.run_id}")
print(f"Metrics: {metrics}")Hyperparameter Sweep cu tracking
from itertools import product
mlflow.set_experiment("iris-hyperparameter-sweep")
param_grid = {
"n_estimators": [50, 100, 200],
"max_depth": [3, 5, 10, None],
"min_samples_split": [2, 5, 10],
}
best_f1 = 0
best_run_id = None
for n_est, depth, min_split in product(
param_grid["n_estimators"],
param_grid["max_depth"],
param_grid["min_samples_split"],
):
with mlflow.start_run(run_name=f"rf-{n_est}-{depth}-{min_split}"):
params = {
"n_estimators": n_est,
"max_depth": depth,
"min_samples_split": min_split,
}
mlflow.log_params(params)
model = RandomForestClassifier(**params, random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
f1 = f1_score(y_test, y_pred, average="weighted")
mlflow.log_metric("f1_weighted", f1)
if f1 > best_f1:
best_f1 = f1
best_run_id = mlflow.active_run().info.run_id
mlflow.sklearn.log_model(model, "model")
print(f"Best F1: {best_f1:.4f} (run: {best_run_id})")Tracking antrenare PyTorch
import mlflow
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
mlflow.set_experiment("pytorch-classifier")
class SimpleClassifier(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(hidden_dim // 2, output_dim),
)
def forward(self, x):
return self.net(x)
with mlflow.start_run(run_name="pytorch-baseline"):
# Logheaza parametri de arhitectura
config = {"input_dim": 4, "hidden_dim": 64, "output_dim": 3, "lr": 0.001, "epochs": 50, "batch_size": 16}
mlflow.log_params(config)
model = SimpleClassifier(config["input_dim"], config["hidden_dim"], config["output_dim"])
optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])
criterion = nn.CrossEntropyLoss()
# Bucla de antrenare cu logare la nivel de pas
X_tensor = torch.FloatTensor(X_train)
y_tensor = torch.LongTensor(y_train)
loader = DataLoader(TensorDataset(X_tensor, y_tensor), batch_size=config["batch_size"], shuffle=True)
for epoch in range(config["epochs"]):
model.train()
epoch_loss = 0
for X_batch, y_batch in loader:
optimizer.zero_grad()
output = model(X_batch)
loss = criterion(output, y_batch)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
avg_loss = epoch_loss / len(loader)
mlflow.log_metric("train_loss", avg_loss, step=epoch)
# Validare la fiecare 10 epoci
if epoch % 10 == 0:
model.eval()
with torch.no_grad():
val_output = model(torch.FloatTensor(X_test))
val_pred = val_output.argmax(dim=1).numpy()
val_acc = accuracy_score(y_test, val_pred)
mlflow.log_metric("val_accuracy", val_acc, step=epoch)
# Logheaza modelul final
mlflow.pytorch.log_model(model, "model")Partea 2: Model Registry
Model registry-ul ofera un hub central pentru gestionarea versiunilor de model de-a lungul ciclului lor de viata.
Inregistrarea modelelor
import mlflow
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Inregistreaza cel mai bun model din hyperparameter sweep
model_uri = f"runs:/{best_run_id}/model"
model_details = mlflow.register_model(model_uri, "iris-classifier")
print(f"Model: {model_details.name}")
print(f"Version: {model_details.version}")Gestionarea ciclului de viata al modelelor
client = MlflowClient()
# Tranziteaza modelul prin etape
# Etapa 1: None -> Staging (pentru validare)
client.transition_model_version_stage(
name="iris-classifier",
version=1,
stage="Staging",
archive_existing_versions=False,
)
# Ruleaza teste de validare pe modelul staging...
# (vezi sectiunea de testare de mai sus)
# Etapa 2: Staging -> Production (dupa ce validarea trece)
client.transition_model_version_stage(
name="iris-classifier",
version=1,
stage="Production",
archive_existing_versions=True, # Arhiveaza versiunea anterioara de productie
)
# Adauga descriere si tag-uri
client.update_model_version(
name="iris-classifier",
version=1,
description="Random Forest baseline. F1=0.967. Trained on iris dataset v2.1."
)
client.set_model_version_tag("iris-classifier", 1, "validated", "true")
client.set_model_version_tag("iris-classifier", 1, "data_version", "v2.1")Incarcarea modelelor din Registry
# Incarca modelul de productie
import mlflow.pyfunc
# Dupa etapa
production_model = mlflow.pyfunc.load_model("models:/iris-classifier/Production")
# Dupa numarul versiunii
specific_version = mlflow.pyfunc.load_model("models:/iris-classifier/1")
# Predictie
import pandas as pd
sample = pd.DataFrame([[5.1, 3.5, 1.4, 0.2]], columns=["sepal_l", "sepal_w", "petal_l", "petal_w"])
prediction = production_model.predict(sample)Partea 3: Modele MLflow Custom
Pentru modele care nu se incadreaza in framework-urile standard (modele ensemble, preprocesare complexa, wrapper-e LLM), foloseste interfata PythonModel:
import mlflow.pyfunc
class ChurnPredictor(mlflow.pyfunc.PythonModel):
"""Model custom care incapsuleaza feature engineering + predictie + postprocesare."""
def load_context(self, context):
"""Incarca artefactele modelului."""
import joblib
self.model = joblib.load(context.artifacts["model"])
self.scaler = joblib.load(context.artifacts["scaler"])
self.feature_names = joblib.load(context.artifacts["feature_names"])
def predict(self, context, model_input):
"""Ruleaza pipeline-ul de predictie."""
import pandas as pd
import numpy as np
df = model_input if isinstance(model_input, pd.DataFrame) else pd.DataFrame(model_input)
# Feature engineering
df["recency_score"] = np.clip(df["days_since_last_purchase"] / 365, 0, 1)
df["value_segment"] = pd.cut(df["lifetime_value"], bins=[0, 100, 500, float("inf")], labels=[0, 1, 2])
# Scalare features
features = df[self.feature_names]
scaled = self.scaler.transform(features)
# Predictie
probabilities = self.model.predict_proba(scaled)
return pd.DataFrame({
"churn_probability": probabilities[:, 1],
"churn_prediction": (probabilities[:, 1] > 0.5).astype(int),
"risk_tier": pd.cut(
probabilities[:, 1],
bins=[0, 0.3, 0.7, 1.0],
labels=["low", "medium", "high"]
),
})
# Salveaza modelul custom
with mlflow.start_run():
mlflow.pyfunc.log_model(
artifact_path="model",
python_model=ChurnPredictor(),
artifacts={
"model": "artifacts/model.joblib",
"scaler": "artifacts/scaler.joblib",
"feature_names": "artifacts/feature_names.joblib",
},
registered_model_name="churn-predictor",
pip_requirements=["scikit-learn==1.4.0", "pandas>=2.0", "numpy>=1.24"],
)Partea 4: MLflow Projects pentru reproductibilitate
MLflow Projects iti impacheteaza codul cu mediul sau pentru rulari reproductibile:
# MLproject file
name: churn-model
conda_env: conda.yaml
entry_points:
train:
parameters:
n_estimators: {type: int, default: 100}
max_depth: {type: int, default: 10}
learning_rate: {type: float, default: 0.1}
data_path: {type: str, default: "data/processed"}
command: "python train.py --n-estimators {n_estimators} --max-depth {max_depth} --lr {learning_rate} --data {data_path}"
evaluate:
parameters:
model_uri: {type: str}
data_path: {type: str, default: "data/test"}
command: "python evaluate.py --model-uri {model_uri} --data {data_path}"Ruleaza proiectul de oriunde:
# Ruleaza local
mlflow run . -e train -P n_estimators=200 -P max_depth=12
# Ruleaza din Git
mlflow run https://github.com/your-org/churn-model -e train -P n_estimators=200
# Ruleaza pe Databricks
mlflow run . -e train -b databricks --backend-config cluster.jsonPartea 5: Servirea modelelor in productie
Servire prin REST API
# Serveste un model inregistrat ca REST API
mlflow models serve \
--model-uri "models:/churn-predictor/Production" \
--port 8080 \
--host 0.0.0.0# Testeaza endpoint-ul
curl -X POST http://localhost:8080/invocations \
-H "Content-Type: application/json" \
-d '{"inputs": [{"days_since_last_purchase": 45, "lifetime_value": 230, "total_orders": 12}]}'Deployment cu Docker
# Construieste o imagine Docker pentru model
mlflow models build-docker \
--model-uri "models:/churn-predictor/Production" \
--name churn-predictor:v2.3
# Ruleaza containerul
docker run -p 8080:8080 churn-predictor:v2.3Partea 6: Arhitectura MLflow pentru productie
Pentru deployment-uri de productie, iata o arhitectura recomandata:
┌─────────────────────────────────────────────────────────┐
│ MLflow Architecture │
├─────────────────────────────────────────────────────────┤
│ │
│ Data Scientists MLflow Tracking Server │
│ ┌──────────┐ ┌──────────────────┐ │
│ │ Notebook │───────────▶│ REST API (:5000) │ │
│ │ Pipeline │ │ │ │
│ └──────────┘ └────────┬─────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌────────────┐ ┌──────────┐ ┌──────────┐ │
│ │ PostgreSQL │ │ S3/GCS │ │ Model │ │
│ │ (metadata) │ │ (artifacts│ │ Registry │ │
│ └────────────┘ └──────────┘ └──────────┘ │
│ │
│ Production Serving │
│ ┌──────────────────────────────────────────────┐ │
│ │ Kubernetes + KServe/Seldon │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Model A │ │ Model B │ │ Model C │ │ │
│ │ │ (prod) │ │ (canary)│ │ (shadow)│ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └──────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
Sfaturi cheie de configurare MLflow
# Autologging: captureaza automat parametri, metrici si modele
mlflow.autolog() # Functioneaza cu sklearn, pytorch, tensorflow, xgboost, lightgbm
# Rulari imbricate pentru experimente complexe
with mlflow.start_run(run_name="hyperparameter-search"):
mlflow.log_param("search_method", "grid")
for params in param_combinations:
with mlflow.start_run(run_name=f"trial-{params}", nested=True):
mlflow.log_params(params)
# ... antreneaza si evalueaza
# Tag-uri pentru organizarea rularilor
mlflow.set_tag("team", "ml-platform")
mlflow.set_tag("use_case", "customer-churn")
mlflow.set_tag("data_version", "v2.3")
# Tag-uri de sistem
mlflow.set_tag("mlflow.note.content", "Baseline model with engineered features")Pasi urmatori
Cu MLflow gestionand experiment tracking-ul si managementul modelelor, pasii urmatori naturali sunt:
- Feature stores pentru calculul consistent al features
- Kubeflow pentru orchestrarea pipeline-urilor la scara
- Monitorizare modele pentru observabilitate in productie
- Conformitate EU AI Act: Audit trail-ul MLflow sustine direct cerintele de reglementare
Ai nevoie de ajutor pentru configurarea MLflow in echipa ta? DeviDevs construieste platforme MLOps de productie cu MLflow in centru. Obtine o evaluare gratuita ->
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →