MLOps

Tutorial MLflow: tracking experimente si registru

Petru Constantin
--8 min lectura
#MLflow#experiment tracking#model registry#MLOps#ML pipeline#model deployment

Tutorial MLflow: Experiment Tracking si Model Registry de la Zero

MLflow este cea mai adoptata platforma open-source pentru gestionarea ciclului de viata ML. Ofera experiment tracking, impachetare modele, un model registry si instrumente de deployment, toate fara vendor lock-in.

Acest tutorial te duce de la zero la un setup MLflow pregatit pentru productie cu exemple reale de cod.

De ce MLflow?

Inainte de MLflow, echipele ML urmareau experimentele in spreadsheet-uri, stocau modelele pe drive-uri partajate si faceau deployment prin email ("uite noul fisier pickle"). MLflow ofera:

  • Experiment Tracking: Logheaza parametri, metrici si artefacte pentru fiecare rulare
  • Model Registry: Versioneaza, stageaza si aproba modele centralizat
  • Impachetare modele: Format consistent de model intre framework-uri
  • Deployment: Serveste modele prin REST API, Docker sau platforme cloud

Instalare si configurare

# Instaleaza MLflow
pip install mlflow[extras]
 
# Porneste serverul de tracking cu backend SQLite
mlflow server \
  --backend-store-uri sqlite:///mlflow.db \
  --default-artifact-root ./mlflow-artifacts \
  --host 0.0.0.0 \
  --port 5000

Pentru productie, foloseste PostgreSQL ca backend si S3/GCS pentru artefacte:

mlflow server \
  --backend-store-uri postgresql://user:pass@db-host:5432/mlflow \
  --default-artifact-root s3://ml-artifacts/mlflow \
  --host 0.0.0.0 \
  --port 5000

Partea 1: Experiment Tracking

Logare de baza a experimentelor

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
 
# Conectare la serverul de tracking
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("iris-classification")
 
# Incarca date
X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
 
# Defineste hiperparametri
params = {
    "n_estimators": 100,
    "max_depth": 5,
    "min_samples_split": 2,
    "random_state": 42,
}
 
with mlflow.start_run(run_name="rf-baseline"):
    # Logheaza parametri
    mlflow.log_params(params)
 
    # Antreneaza model
    model = RandomForestClassifier(**params)
    model.fit(X_train, y_train)
 
    # Evalueaza
    y_pred = model.predict(X_test)
    metrics = {
        "accuracy": accuracy_score(y_test, y_pred),
        "precision_weighted": precision_score(y_test, y_pred, average="weighted"),
        "recall_weighted": recall_score(y_test, y_pred, average="weighted"),
        "f1_weighted": f1_score(y_test, y_pred, average="weighted"),
    }
 
    # Logheaza metrici
    mlflow.log_metrics(metrics)
 
    # Logheaza model
    mlflow.sklearn.log_model(model, "model")
 
    # Logheaza artefacte aditionale
    import json
    with open("feature_importance.json", "w") as f:
        importance = dict(zip(load_iris().feature_names, model.feature_importances_))
        json.dump(importance, f, indent=2)
    mlflow.log_artifact("feature_importance.json")
 
    print(f"Run ID: {mlflow.active_run().info.run_id}")
    print(f"Metrics: {metrics}")

Hyperparameter Sweep cu tracking

from itertools import product
 
mlflow.set_experiment("iris-hyperparameter-sweep")
 
param_grid = {
    "n_estimators": [50, 100, 200],
    "max_depth": [3, 5, 10, None],
    "min_samples_split": [2, 5, 10],
}
 
best_f1 = 0
best_run_id = None
 
for n_est, depth, min_split in product(
    param_grid["n_estimators"],
    param_grid["max_depth"],
    param_grid["min_samples_split"],
):
    with mlflow.start_run(run_name=f"rf-{n_est}-{depth}-{min_split}"):
        params = {
            "n_estimators": n_est,
            "max_depth": depth,
            "min_samples_split": min_split,
        }
        mlflow.log_params(params)
 
        model = RandomForestClassifier(**params, random_state=42)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
 
        f1 = f1_score(y_test, y_pred, average="weighted")
        mlflow.log_metric("f1_weighted", f1)
 
        if f1 > best_f1:
            best_f1 = f1
            best_run_id = mlflow.active_run().info.run_id
            mlflow.sklearn.log_model(model, "model")
 
print(f"Best F1: {best_f1:.4f} (run: {best_run_id})")

Tracking antrenare PyTorch

import mlflow
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
 
mlflow.set_experiment("pytorch-classifier")
 
class SimpleClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim // 2, output_dim),
        )
 
    def forward(self, x):
        return self.net(x)
 
with mlflow.start_run(run_name="pytorch-baseline"):
    # Logheaza parametri de arhitectura
    config = {"input_dim": 4, "hidden_dim": 64, "output_dim": 3, "lr": 0.001, "epochs": 50, "batch_size": 16}
    mlflow.log_params(config)
 
    model = SimpleClassifier(config["input_dim"], config["hidden_dim"], config["output_dim"])
    optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])
    criterion = nn.CrossEntropyLoss()
 
    # Bucla de antrenare cu logare la nivel de pas
    X_tensor = torch.FloatTensor(X_train)
    y_tensor = torch.LongTensor(y_train)
    loader = DataLoader(TensorDataset(X_tensor, y_tensor), batch_size=config["batch_size"], shuffle=True)
 
    for epoch in range(config["epochs"]):
        model.train()
        epoch_loss = 0
        for X_batch, y_batch in loader:
            optimizer.zero_grad()
            output = model(X_batch)
            loss = criterion(output, y_batch)
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
 
        avg_loss = epoch_loss / len(loader)
        mlflow.log_metric("train_loss", avg_loss, step=epoch)
 
        # Validare la fiecare 10 epoci
        if epoch % 10 == 0:
            model.eval()
            with torch.no_grad():
                val_output = model(torch.FloatTensor(X_test))
                val_pred = val_output.argmax(dim=1).numpy()
                val_acc = accuracy_score(y_test, val_pred)
                mlflow.log_metric("val_accuracy", val_acc, step=epoch)
 
    # Logheaza modelul final
    mlflow.pytorch.log_model(model, "model")

Partea 2: Model Registry

Model registry-ul ofera un hub central pentru gestionarea versiunilor de model de-a lungul ciclului lor de viata.

Inregistrarea modelelor

import mlflow
from mlflow.tracking import MlflowClient
 
client = MlflowClient()
 
# Inregistreaza cel mai bun model din hyperparameter sweep
model_uri = f"runs:/{best_run_id}/model"
model_details = mlflow.register_model(model_uri, "iris-classifier")
 
print(f"Model: {model_details.name}")
print(f"Version: {model_details.version}")

Gestionarea ciclului de viata al modelelor

client = MlflowClient()
 
# Tranziteaza modelul prin etape
# Etapa 1: None -> Staging (pentru validare)
client.transition_model_version_stage(
    name="iris-classifier",
    version=1,
    stage="Staging",
    archive_existing_versions=False,
)
 
# Ruleaza teste de validare pe modelul staging...
# (vezi sectiunea de testare de mai sus)
 
# Etapa 2: Staging -> Production (dupa ce validarea trece)
client.transition_model_version_stage(
    name="iris-classifier",
    version=1,
    stage="Production",
    archive_existing_versions=True,  # Arhiveaza versiunea anterioara de productie
)
 
# Adauga descriere si tag-uri
client.update_model_version(
    name="iris-classifier",
    version=1,
    description="Random Forest baseline. F1=0.967. Trained on iris dataset v2.1."
)
client.set_model_version_tag("iris-classifier", 1, "validated", "true")
client.set_model_version_tag("iris-classifier", 1, "data_version", "v2.1")

Incarcarea modelelor din Registry

# Incarca modelul de productie
import mlflow.pyfunc
 
# Dupa etapa
production_model = mlflow.pyfunc.load_model("models:/iris-classifier/Production")
 
# Dupa numarul versiunii
specific_version = mlflow.pyfunc.load_model("models:/iris-classifier/1")
 
# Predictie
import pandas as pd
sample = pd.DataFrame([[5.1, 3.5, 1.4, 0.2]], columns=["sepal_l", "sepal_w", "petal_l", "petal_w"])
prediction = production_model.predict(sample)

Partea 3: Modele MLflow Custom

Pentru modele care nu se incadreaza in framework-urile standard (modele ensemble, preprocesare complexa, wrapper-e LLM), foloseste interfata PythonModel:

import mlflow.pyfunc
 
class ChurnPredictor(mlflow.pyfunc.PythonModel):
    """Model custom care incapsuleaza feature engineering + predictie + postprocesare."""
 
    def load_context(self, context):
        """Incarca artefactele modelului."""
        import joblib
        self.model = joblib.load(context.artifacts["model"])
        self.scaler = joblib.load(context.artifacts["scaler"])
        self.feature_names = joblib.load(context.artifacts["feature_names"])
 
    def predict(self, context, model_input):
        """Ruleaza pipeline-ul de predictie."""
        import pandas as pd
        import numpy as np
 
        df = model_input if isinstance(model_input, pd.DataFrame) else pd.DataFrame(model_input)
 
        # Feature engineering
        df["recency_score"] = np.clip(df["days_since_last_purchase"] / 365, 0, 1)
        df["value_segment"] = pd.cut(df["lifetime_value"], bins=[0, 100, 500, float("inf")], labels=[0, 1, 2])
 
        # Scalare features
        features = df[self.feature_names]
        scaled = self.scaler.transform(features)
 
        # Predictie
        probabilities = self.model.predict_proba(scaled)
 
        return pd.DataFrame({
            "churn_probability": probabilities[:, 1],
            "churn_prediction": (probabilities[:, 1] > 0.5).astype(int),
            "risk_tier": pd.cut(
                probabilities[:, 1],
                bins=[0, 0.3, 0.7, 1.0],
                labels=["low", "medium", "high"]
            ),
        })
 
# Salveaza modelul custom
with mlflow.start_run():
    mlflow.pyfunc.log_model(
        artifact_path="model",
        python_model=ChurnPredictor(),
        artifacts={
            "model": "artifacts/model.joblib",
            "scaler": "artifacts/scaler.joblib",
            "feature_names": "artifacts/feature_names.joblib",
        },
        registered_model_name="churn-predictor",
        pip_requirements=["scikit-learn==1.4.0", "pandas>=2.0", "numpy>=1.24"],
    )

Partea 4: MLflow Projects pentru reproductibilitate

MLflow Projects iti impacheteaza codul cu mediul sau pentru rulari reproductibile:

# MLproject file
name: churn-model
 
conda_env: conda.yaml
 
entry_points:
  train:
    parameters:
      n_estimators: {type: int, default: 100}
      max_depth: {type: int, default: 10}
      learning_rate: {type: float, default: 0.1}
      data_path: {type: str, default: "data/processed"}
    command: "python train.py --n-estimators {n_estimators} --max-depth {max_depth} --lr {learning_rate} --data {data_path}"
 
  evaluate:
    parameters:
      model_uri: {type: str}
      data_path: {type: str, default: "data/test"}
    command: "python evaluate.py --model-uri {model_uri} --data {data_path}"

Ruleaza proiectul de oriunde:

# Ruleaza local
mlflow run . -e train -P n_estimators=200 -P max_depth=12
 
# Ruleaza din Git
mlflow run https://github.com/your-org/churn-model -e train -P n_estimators=200
 
# Ruleaza pe Databricks
mlflow run . -e train -b databricks --backend-config cluster.json

Partea 5: Servirea modelelor in productie

Servire prin REST API

# Serveste un model inregistrat ca REST API
mlflow models serve \
  --model-uri "models:/churn-predictor/Production" \
  --port 8080 \
  --host 0.0.0.0
# Testeaza endpoint-ul
curl -X POST http://localhost:8080/invocations \
  -H "Content-Type: application/json" \
  -d '{"inputs": [{"days_since_last_purchase": 45, "lifetime_value": 230, "total_orders": 12}]}'

Deployment cu Docker

# Construieste o imagine Docker pentru model
mlflow models build-docker \
  --model-uri "models:/churn-predictor/Production" \
  --name churn-predictor:v2.3
 
# Ruleaza containerul
docker run -p 8080:8080 churn-predictor:v2.3

Partea 6: Arhitectura MLflow pentru productie

Pentru deployment-uri de productie, iata o arhitectura recomandata:

┌─────────────────────────────────────────────────────────┐
│                   MLflow Architecture                     │
├─────────────────────────────────────────────────────────┤
│                                                           │
│  Data Scientists          MLflow Tracking Server          │
│  ┌──────────┐            ┌──────────────────┐            │
│  │ Notebook  │───────────▶│  REST API (:5000) │            │
│  │ Pipeline  │            │                  │            │
│  └──────────┘            └────────┬─────────┘            │
│                                   │                       │
│                      ┌────────────┼────────────┐         │
│                      │            │            │         │
│                      ▼            ▼            ▼         │
│              ┌────────────┐ ┌──────────┐ ┌──────────┐   │
│              │ PostgreSQL  │ │ S3/GCS   │ │ Model    │   │
│              │ (metadata)  │ │ (artifacts│ │ Registry │   │
│              └────────────┘ └──────────┘ └──────────┘   │
│                                                           │
│  Production Serving                                       │
│  ┌──────────────────────────────────────────────┐        │
│  │  Kubernetes + KServe/Seldon                    │        │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐         │        │
│  │  │ Model A │ │ Model B │ │ Model C │         │        │
│  │  │  (prod) │ │ (canary)│ │ (shadow)│         │        │
│  │  └─────────┘ └─────────┘ └─────────┘         │        │
│  └──────────────────────────────────────────────┘        │
└─────────────────────────────────────────────────────────┘

Sfaturi cheie de configurare MLflow

# Autologging: captureaza automat parametri, metrici si modele
mlflow.autolog()  # Functioneaza cu sklearn, pytorch, tensorflow, xgboost, lightgbm
 
# Rulari imbricate pentru experimente complexe
with mlflow.start_run(run_name="hyperparameter-search"):
    mlflow.log_param("search_method", "grid")
    for params in param_combinations:
        with mlflow.start_run(run_name=f"trial-{params}", nested=True):
            mlflow.log_params(params)
            # ... antreneaza si evalueaza
 
# Tag-uri pentru organizarea rularilor
mlflow.set_tag("team", "ml-platform")
mlflow.set_tag("use_case", "customer-churn")
mlflow.set_tag("data_version", "v2.3")
 
# Tag-uri de sistem
mlflow.set_tag("mlflow.note.content", "Baseline model with engineered features")

Pasi urmatori

Cu MLflow gestionand experiment tracking-ul si managementul modelelor, pasii urmatori naturali sunt:


Ai nevoie de ajutor pentru configurarea MLflow in echipa ta? DeviDevs construieste platforme MLOps de productie cu MLflow in centru. Obtine o evaluare gratuita ->


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.