MLOps

Cele Mai Bune Practici MLOps: Construirea Pipeline-urilor ML Pregatite pentru Productie

Petru Constantin
--10 min lectura
#MLOps#ML pipeline#best practices#production ML#ML CI/CD#model deployment

Cele Mai Bune Practici MLOps: Construirea Pipeline-urilor ML Pregatite pentru Productie

Construirea pipeline-urilor ML care functioneaza fiabil in productie cere mai mult decat conectarea a cateva scripturi. Acest ghid acopera practicile testate in lupta care separa sistemele ML de grad de productie de prototipurile fragile.

1. Proiecteaza pipeline-urile ca DAG-uri, nu scripturi

Pipeline-urile ML de productie ar trebui sa fie grafuri aciclice dirijate (DAG-uri) cu limite clare intre pasi, nu scripturi monolitice. Fiecare pas ar trebui sa fie testabil independent, cu cache si cu posibilitate de retry.

# Rau: Script monolitic de antrenare
def train():
    data = load_data()
    features = preprocess(data)
    model = train_model(features)
    evaluate(model)
    deploy(model)
 
# Bine: Pipeline ca pasi compozabili
from dataclasses import dataclass
from typing import Any
 
@dataclass
class PipelineStep:
    name: str
    inputs: list[str]
    outputs: list[str]
    fn: callable
 
pipeline = [
    PipelineStep("ingest", [], ["raw_data"], ingest_data),
    PipelineStep("validate", ["raw_data"], ["validated_data"], validate_data),
    PipelineStep("features", ["validated_data"], ["feature_matrix"], compute_features),
    PipelineStep("train", ["feature_matrix"], ["model_artifact"], train_model),
    PipelineStep("evaluate", ["model_artifact", "feature_matrix"], ["metrics"], evaluate_model),
    PipelineStep("register", ["model_artifact", "metrics"], ["model_version"], register_model),
]

De ce conteaza DAG-urile

  • Caching: Sari peste pasii ale caror intrari nu s-au schimbat
  • Retry: Repornesti de la pasul esuat, nu de la inceput
  • Paralelism: Pasii independenti ruleaza concurent
  • Debugging: Inspectezi artefactele intermediare la orice pas
  • Audit: Lineage clar de la date la modelul facut deploy

2. Implementeaza validare date la limitele pipeline-ului

Problemele de date cauzeaza mai multe esecuri ML decat bug-urile de cod. Valideaza datele la fiecare limita de pipeline cu verificari de schema si teste statistice.

import pandera as pa
from pandera import Column, Check, DataFrameSchema
 
# Defineste schema asteptata cu constrangeri statistice
training_data_schema = DataFrameSchema({
    "customer_id": Column(int, Check.gt(0), nullable=False),
    "age": Column(int, Check.in_range(18, 120), nullable=False),
    "purchase_amount": Column(float, Check.gt(0), nullable=True),
    "category": Column(str, Check.isin(["A", "B", "C", "D"]), nullable=False),
    "churn": Column(int, Check.isin([0, 1]), nullable=False),
}, checks=[
    # Verificari la nivel de tabel
    Check(lambda df: len(df) >= 1000, error="Minimum 1000 rows required"),
    Check(lambda df: df["churn"].mean() > 0.01, error="Churn rate suspiciously low"),
    Check(lambda df: df["churn"].mean() < 0.90, error="Churn rate suspiciously high"),
])
 
def validate_training_data(df):
    """Valideaza datele de antrenare inainte ca pipeline-ul sa continue."""
    try:
        training_data_schema.validate(df, lazy=True)
        return {"status": "passed", "rows": len(df), "churn_rate": df["churn"].mean()}
    except pa.errors.SchemaErrors as e:
        failures = e.failure_cases
        raise ValueError(f"Data validation failed:\n{failures.to_string()}")

Great Expectations pentru validare complexa

Pentru echipe mai mari, Great Expectations ofera un framework mai bogat:

import great_expectations as gx
 
context = gx.get_context()
 
# Defineste suita de asteptari
suite = context.add_expectation_suite("training_data_suite")
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="customer_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(column="age", min_value=18, max_value=120)
)
suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(min_value=1000)
)
suite.add_expectation(
    gx.expectations.ExpectColumnProportionOfUniqueValuesToBeBetween(
        column="customer_id", min_value=0.95
    )
)

3. Versioneaza totul: cod, date si modele

In ML, versionarea doar a codului nu este suficienta. Trebuie sa versionezi datele si artefactele model alaturi de cod pentru a obtine reproductibilitate.

# DVC pipeline definition (dvc.yaml)
stages:
  prepare:
    cmd: python src/prepare.py
    deps:
      - src/prepare.py
      - data/raw/
    outs:
      - data/processed/
    params:
      - prepare.split_ratio
      - prepare.seed
 
  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/processed/
    outs:
      - models/
    params:
      - train.n_estimators
      - train.max_depth
      - train.learning_rate
    metrics:
      - metrics/train.json:
          cache: false
 
  evaluate:
    cmd: python src/evaluate.py
    deps:
      - src/evaluate.py
      - models/
      - data/processed/
    metrics:
      - metrics/eval.json:
          cache: false
    plots:
      - plots/confusion_matrix.csv:
          cache: false

Blocarea de versiune in trei directii

Fiecare model de productie ar trebui sa aiba un triplet blocat:

# Metadate model: blocarea de versiune in trei directii
model_metadata = {
    "model_version": "churn-v2.3.1",
    "code_version": "git:abc123def",       # Git commit hash
    "data_version": "dvc:data/v2.3",        # DVC data version
    "training_params": {
        "n_estimators": 200,
        "max_depth": 12,
        "learning_rate": 0.05,
    },
    "training_metrics": {
        "accuracy": 0.943,
        "precision": 0.91,
        "recall": 0.88,
        "f1": 0.895,
        "auc_roc": 0.967,
    },
    "trained_at": "2026-02-01T14:30:00Z",
    "trained_by": "pipeline/nightly-retrain",
    "approved_by": "ml-lead@company.com",
}

4. Testeaza pipeline-urile ML pe mai multe niveluri

Sistemele ML au nevoie de testare la nivel de date, model si integrare, nu doar teste unitare.

import pytest
import numpy as np
from sklearn.metrics import accuracy_score, f1_score
 
class TestDataQuality:
    """Teste care ruleaza la fiecare refresh de date."""
 
    def test_no_null_targets(self, training_data):
        assert training_data["target"].notna().all()
 
    def test_feature_ranges(self, training_data):
        assert training_data["age"].between(0, 150).all()
        assert training_data["income"].ge(0).all()
 
    def test_class_balance(self, training_data):
        """Asigura-te ca nu exista dezechilibru extrem de clase."""
        class_ratios = training_data["target"].value_counts(normalize=True)
        assert class_ratios.min() > 0.05, f"Minority class is only {class_ratios.min():.1%}"
 
    def test_no_data_leakage(self, training_data, test_data):
        """Asigura-te ca split-ul train/test nu are suprapuneri."""
        train_ids = set(training_data["id"])
        test_ids = set(test_data["id"])
        assert train_ids.isdisjoint(test_ids), "Data leakage: overlapping IDs"
 
 
class TestModelQuality:
    """Teste care ruleaza dupa fiecare rulare de antrenare."""
 
    def test_minimum_accuracy(self, model, test_data):
        predictions = model.predict(test_data.features)
        acc = accuracy_score(test_data.labels, predictions)
        assert acc > 0.85, f"Accuracy {acc:.3f} below threshold 0.85"
 
    def test_minimum_f1(self, model, test_data):
        predictions = model.predict(test_data.features)
        f1 = f1_score(test_data.labels, predictions, average="weighted")
        assert f1 > 0.80, f"F1 {f1:.3f} below threshold 0.80"
 
    def test_no_regression_vs_production(self, new_model, production_model, test_data):
        """Modelul nou nu trebuie sa fie semnificativ mai slab decat cel din productie."""
        new_acc = accuracy_score(test_data.labels, new_model.predict(test_data.features))
        prod_acc = accuracy_score(test_data.labels, production_model.predict(test_data.features))
        assert new_acc >= prod_acc - 0.02, (
            f"New model ({new_acc:.3f}) regressed vs production ({prod_acc:.3f})"
        )
 
    def test_prediction_latency(self, model):
        """O singura predictie trebuie sa se incadreze in SLA."""
        import time
        sample = np.random.randn(1, model.n_features_in_)
        start = time.perf_counter()
        for _ in range(100):
            model.predict(sample)
        avg_ms = (time.perf_counter() - start) / 100 * 1000
        assert avg_ms < 50, f"Avg latency {avg_ms:.1f}ms exceeds 50ms SLA"
 
    def test_model_size(self, model_path):
        """Artefactul model trebuie sa incapa in containerul de serving."""
        import os
        size_mb = os.path.getsize(model_path) / (1024 * 1024)
        assert size_mb < 500, f"Model size {size_mb:.0f}MB exceeds 500MB limit"
 
 
class TestInfrastructure:
    """Teste care valideaza infrastructura de deployment."""
 
    def test_serving_endpoint_health(self, serving_url):
        import requests
        resp = requests.get(f"{serving_url}/health", timeout=5)
        assert resp.status_code == 200
 
    def test_serving_endpoint_prediction(self, serving_url, sample_input):
        import requests
        resp = requests.post(f"{serving_url}/predict", json=sample_input, timeout=10)
        assert resp.status_code == 200
        assert "prediction" in resp.json()

5. Implementeaza strategii robuste de deployment al modelelor

Nu face niciodata deploy unui model nou pe 100% din trafic imediat. Foloseste strategii de rollout progresiv.

Shadow Deployment

Ruleaza noul model in paralel cu productia, compara output-urile fara a afecta utilizatorii:

class ShadowDeployment:
    def __init__(self, production_model, shadow_model):
        self.production = production_model
        self.shadow = shadow_model
        self.comparison_log = []
 
    async def predict(self, features: dict) -> dict:
        # Modelul de productie serveste raspunsul
        prod_result = self.production.predict(features)
 
        # Modelul shadow ruleaza asincron, rezultatul e logat, nu returnat
        try:
            shadow_result = self.shadow.predict(features)
            self.comparison_log.append({
                "timestamp": datetime.utcnow().isoformat(),
                "production": prod_result,
                "shadow": shadow_result,
                "agree": prod_result == shadow_result,
            })
        except Exception as e:
            logger.warning(f"Shadow model error: {e}")
 
        return prod_result

Canary Deployment

Directioneaza un procent mic de trafic catre noul model:

import random
 
class CanaryRouter:
    def __init__(self, production_model, canary_model, canary_percentage: float = 0.05):
        self.production = production_model
        self.canary = canary_model
        self.canary_pct = canary_percentage
 
    def predict(self, features: dict) -> dict:
        if random.random() < self.canary_pct:
            result = self.canary.predict(features)
            result["model_variant"] = "canary"
        else:
            result = self.production.predict(features)
            result["model_variant"] = "production"
        return result

6. Monitorizeaza modelele in productie continuu

Modelele facute deploy nu sunt de tipul "pune si uita". Implementeaza monitorizare pentru performanta, calitatea datelor si sanatatea infrastructurii.

from dataclasses import dataclass
from datetime import datetime, timedelta
import numpy as np
 
@dataclass
class MonitoringAlert:
    metric: str
    current_value: float
    threshold: float
    severity: str  # "warning" | "critical"
    message: str
 
class ModelMonitor:
    def __init__(self, model_name: str, baseline_metrics: dict):
        self.model_name = model_name
        self.baseline = baseline_metrics
        self.alerts: list[MonitoringAlert] = []
 
    def check_prediction_distribution(self, predictions: np.ndarray) -> list[MonitoringAlert]:
        """Detecteaza daca distributia predictiilor s-a schimbat."""
        alerts = []
        pred_mean = predictions.mean()
        baseline_mean = self.baseline["prediction_mean"]
 
        if abs(pred_mean - baseline_mean) / baseline_mean > 0.20:
            alerts.append(MonitoringAlert(
                metric="prediction_distribution",
                current_value=pred_mean,
                threshold=baseline_mean * 1.20,
                severity="critical",
                message=f"Prediction mean shifted from {baseline_mean:.3f} to {pred_mean:.3f}"
            ))
        return alerts
 
    def check_latency(self, latencies_ms: list[float]) -> list[MonitoringAlert]:
        """Detecteaza degradarea latentei."""
        alerts = []
        p99 = np.percentile(latencies_ms, 99)
        if p99 > self.baseline["latency_p99_ms"] * 1.5:
            alerts.append(MonitoringAlert(
                metric="latency_p99",
                current_value=p99,
                threshold=self.baseline["latency_p99_ms"] * 1.5,
                severity="warning",
                message=f"P99 latency {p99:.0f}ms exceeds 1.5x baseline"
            ))
        return alerts
 
    def check_error_rate(self, total_requests: int, errors: int) -> list[MonitoringAlert]:
        """Detecteaza rate de eroare ridicate."""
        alerts = []
        error_rate = errors / max(total_requests, 1)
        if error_rate > 0.01:
            alerts.append(MonitoringAlert(
                metric="error_rate",
                current_value=error_rate,
                threshold=0.01,
                severity="critical",
                message=f"Error rate {error_rate:.2%} exceeds 1% threshold"
            ))
        return alerts

7. Automatizeaza reantrenarea cu masuri de siguranta

Antrenarea continua mentine modelele actuale, dar reantrenarea trebuie sa aiba verificari de siguranta:

class RetrainingOrchestrator:
    def __init__(self, pipeline, model_registry, monitor):
        self.pipeline = pipeline
        self.registry = model_registry
        self.monitor = monitor
 
    def should_retrain(self) -> tuple[bool, str]:
        """Determina daca reantrenarea e necesara pe baza semnalelor de monitorizare."""
        # Verifica data drift
        drift_score = self.monitor.get_drift_score()
        if drift_score > 0.15:
            return True, f"Data drift detected (score: {drift_score:.3f})"
 
        # Verifica degradarea performantei
        current_accuracy = self.monitor.get_current_accuracy()
        baseline_accuracy = self.monitor.get_baseline_accuracy()
        if current_accuracy < baseline_accuracy - 0.05:
            return True, f"Accuracy dropped from {baseline_accuracy:.3f} to {current_accuracy:.3f}"
 
        # Verifica timpul de la ultima antrenare
        last_trained = self.registry.get_last_training_date()
        if (datetime.utcnow() - last_trained) > timedelta(days=30):
            return True, f"Last trained {(datetime.utcnow() - last_trained).days} days ago"
 
        return False, "No retraining trigger met"
 
    def retrain_with_validation(self):
        """Reantreneaza cu quality gates automate."""
        should, reason = self.should_retrain()
        if not should:
            return {"status": "skipped", "reason": reason}
 
        # Antreneaza model nou
        new_model = self.pipeline.run()
 
        # Valideaza fata de productia curenta
        prod_model = self.registry.get_production_model()
        comparison = self.compare_models(new_model, prod_model)
 
        if comparison["new_model_better"]:
            self.registry.register(new_model, stage="staging")
            return {"status": "staged", "improvement": comparison["improvement"]}
        else:
            return {"status": "rejected", "reason": "New model did not improve over production"}

8. Structureaza proiectul ML pentru scalare

ml-project/
├── data/
│   ├── raw/                  # Date brute imutabile (urmarite cu DVC)
│   ├── processed/            # Features transformate
│   └── validation/           # Split-uri test/validare
├── src/
│   ├── data/                 # Incarcare date si preprocesare
│   ├── features/             # Feature engineering
│   ├── models/               # Antrenare si evaluare model
│   ├── serving/              # Infrastructura de serving
│   └── monitoring/           # Monitorizare si alertare
├── pipelines/
│   ├── training.py           # DAG pipeline antrenare
│   ├── evaluation.py         # Pipeline evaluare
│   └── deployment.py         # Pipeline deployment
├── tests/
│   ├── data/                 # Teste calitate date
│   ├── model/                # Teste calitate model
│   └── integration/          # Teste end-to-end
├── configs/
│   ├── training.yaml         # Hiperparametri
│   ├── serving.yaml          # Configuratie serving
│   └── monitoring.yaml       # Praguri alerte
├── dvc.yaml                  # Definitie pipeline DVC
├── dvc.lock                  # Lock versiuni DVC
└── pyproject.toml            # Dependinte Python

Sumar: Checklist-ul celor mai bune practici MLOps

| Practica | Prioritate | Impact | |----------|----------|--------| | Experiment tracking din prima zi | Critica | Reproductibilitate | | Validare date la limite | Critica | Calitate date | | Versionare in trei directii (cod + date + model) | Critica | Reproductibilitate | | Testare multi-nivel (date, model, infra) | Inalta | Fiabilitate | | Deployment progresiv (shadow/canary) | Inalta | Siguranta | | Monitorizare productie + alertare | Critica | Disponibilitate | | Reantrenare automata cu masuri de siguranta | Medie | Prospetime | | Feature store pentru consistenta train-serve | Medie | Acuratete |

Incepe cu elementele critice: experiment tracking, validare date si versionare. Adauga restul pe masura ce operatiunea ta ML se maturizeaza. Scopul nu este sa implementezi totul dintr-o data, ci sa reduci sistematic riscul fiecarui deployment ML.


Construiesti pipeline-uri ML de productie? DeviDevs ajuta echipele sa proiecteze si sa implementeze infrastructura MLOps care scaleaza. De la arhitectura pipeline-urilor la automatizarea deployment-ului, obtine o evaluare gratuita ->


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.