MLOps

Monitorizarea modelelor in productie: Detectarea data drift si a degradarii modelului

Petru Constantin
--7 min lectura
#model monitoring#data drift#MLOps#production ML#model degradation#Evidently

Monitorizarea modelelor in productie: Detectarea data drift si a degradarii modelului

Un model deployat nu e un produs finalizat. Distributiile datelor se schimba, comportamentul utilizatorilor evolueaza, iar relatia dintre features si rezultate se transforma. Fara monitorizare, modelele se degradeaza in tacere - predictiile devin mai putin precise, dar nimeni nu observa pana cand veniturile scad sau utilizatorii se plang.

De ce se degradeaza modelele in productie

Data Drift

Atributele de intrare isi schimba proprietatile statistice in timp. Un model antrenat pe date de vara intalneste patternuri de iarna. Un model de pricing antrenat inainte de inflatie vede distributii de cheltuieli diferite.

Concept Drift

Relatia dintre intrari si iesiri se schimba. Ce prezice churn anul trecut s-ar putea sa nu mai prezica churn azi, pentru ca asteptarile clientilor s-au schimbat.

Probleme de date upstream

Un pipeline de date se strica, un feature incepe sa returneze null-uri sau un API tert isi schimba formatul raspunsului. Acestea cauzeaza esecuri bruste si catastrofale ale modelului.

Tipuri de detectare a drift-ului

Teste statistice de drift

import numpy as np
from scipy import stats
from dataclasses import dataclass
 
@dataclass
class DriftResult:
    feature: str
    test_name: str
    statistic: float
    p_value: float
    is_drifted: bool
    threshold: float
 
class DriftDetector:
    """Multi-method drift detection for production features."""
 
    def __init__(self, significance_level: float = 0.05):
        self.alpha = significance_level
 
    def ks_test(self, reference: np.ndarray, current: np.ndarray, feature_name: str) -> DriftResult:
        """Kolmogorov-Smirnov test for continuous features."""
        stat, p_value = stats.ks_2samp(reference, current)
        return DriftResult(
            feature=feature_name,
            test_name="KS Test",
            statistic=stat,
            p_value=p_value,
            is_drifted=p_value < self.alpha,
            threshold=self.alpha,
        )
 
    def chi_squared_test(self, reference: np.ndarray, current: np.ndarray, feature_name: str) -> DriftResult:
        """Chi-squared test for categorical features."""
        # Build frequency tables aligned on the same categories
        categories = np.union1d(np.unique(reference), np.unique(current))
        ref_counts = np.array([np.sum(reference == c) for c in categories])
        cur_counts = np.array([np.sum(current == c) for c in categories])
        # Normalize to expected proportions
        ref_proportions = ref_counts / ref_counts.sum()
        expected = ref_proportions * cur_counts.sum()
        expected = np.maximum(expected, 1)  # Avoid division by zero
        stat, p_value = stats.chisquare(cur_counts, f_exp=expected)
        return DriftResult(
            feature=feature_name,
            test_name="Chi-Squared",
            statistic=stat,
            p_value=p_value,
            is_drifted=p_value < self.alpha,
            threshold=self.alpha,
        )
 
    def psi(self, reference: np.ndarray, current: np.ndarray, feature_name: str, bins: int = 10) -> DriftResult:
        """Population Stability Index, industry standard for model monitoring."""
        # Bin the reference distribution
        breakpoints = np.percentile(reference, np.linspace(0, 100, bins + 1))
        breakpoints[0] = -np.inf
        breakpoints[-1] = np.inf
 
        ref_percents = np.histogram(reference, bins=breakpoints)[0] / len(reference)
        cur_percents = np.histogram(current, bins=breakpoints)[0] / len(current)
 
        # Avoid log(0)
        ref_percents = np.maximum(ref_percents, 0.0001)
        cur_percents = np.maximum(cur_percents, 0.0001)
 
        psi_value = np.sum((cur_percents - ref_percents) * np.log(cur_percents / ref_percents))
 
        return DriftResult(
            feature=feature_name,
            test_name="PSI",
            statistic=psi_value,
            p_value=0.0,  # PSI doesn't use p-values
            is_drifted=psi_value > 0.2,  # PSI > 0.2 = significant drift
            threshold=0.2,
        )

Monitorizarea performantei

from datetime import datetime, timedelta
from collections import deque
 
class PerformanceMonitor:
    """Track model performance metrics over sliding windows."""
 
    def __init__(self, window_size: int = 1000, alert_threshold: float = 0.05):
        self.window = deque(maxlen=window_size)
        self.baseline_metrics: dict = {}
        self.alert_threshold = alert_threshold
 
    def set_baseline(self, metrics: dict):
        """Set baseline metrics from validation or initial deployment."""
        self.baseline_metrics = metrics
 
    def log_prediction(self, prediction: float, actual: float, timestamp: datetime | None = None):
        """Log a single prediction-actual pair."""
        self.window.append({
            "prediction": prediction,
            "actual": actual,
            "correct": abs(prediction - actual) < 0.5,  # For classification
            "timestamp": timestamp or datetime.utcnow(),
        })
 
    def get_current_metrics(self) -> dict:
        """Calculate metrics over the current window."""
        if len(self.window) < 100:
            return {"status": "insufficient_data", "count": len(self.window)}
 
        correct = sum(1 for p in self.window if p["correct"])
        accuracy = correct / len(self.window)
 
        predictions = [p["prediction"] for p in self.window]
        actuals = [p["actual"] for p in self.window]
 
        return {
            "accuracy": accuracy,
            "prediction_mean": np.mean(predictions),
            "prediction_std": np.std(predictions),
            "actual_mean": np.mean(actuals),
            "window_size": len(self.window),
        }
 
    def check_degradation(self) -> list[dict]:
        """Check if current metrics have degraded vs baseline."""
        alerts = []
        current = self.get_current_metrics()
 
        if current.get("status") == "insufficient_data":
            return alerts
 
        for metric_name in ["accuracy"]:
            if metric_name in self.baseline_metrics and metric_name in current:
                baseline_val = self.baseline_metrics[metric_name]
                current_val = current[metric_name]
                degradation = baseline_val - current_val
 
                if degradation > self.alert_threshold:
                    alerts.append({
                        "metric": metric_name,
                        "baseline": baseline_val,
                        "current": current_val,
                        "degradation": degradation,
                        "severity": "critical" if degradation > self.alert_threshold * 2 else "warning",
                    })
        return alerts

Monitorizare cu Evidently

Evidently e cea mai populara biblioteca open-source de monitorizare ML. Iata un setup gata de productie:

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset, TargetDriftPreset
from evidently.metrics import (
    DatasetDriftMetric,
    DataDriftTable,
    ColumnDriftMetric,
)
 
def generate_drift_report(reference_data, current_data, feature_columns: list[str]) -> dict:
    """Generate comprehensive drift report using Evidently."""
    report = Report(metrics=[
        DatasetDriftMetric(),
        DataDriftTable(),
        *[ColumnDriftMetric(column_name=col) for col in feature_columns[:10]],  # Top 10 features
    ])
 
    report.run(reference_data=reference_data, current_data=current_data)
    results = report.as_dict()
 
    # Extract actionable drift summary
    dataset_drift = results["metrics"][0]["result"]
    column_drifts = results["metrics"][1]["result"]
 
    drifted_features = [
        col for col, info in column_drifts["drift_by_columns"].items()
        if info["drift_detected"]
    ]
 
    return {
        "dataset_drift_detected": dataset_drift["dataset_drift"],
        "drift_share": dataset_drift["drift_share"],
        "drifted_features": drifted_features,
        "total_features": len(column_drifts["drift_by_columns"]),
        "drifted_count": len(drifted_features),
    }

Arhitectura de monitorizare

Production Traffic
       │
       ▼
┌──────────────┐     ┌────────────────┐     ┌──────────────┐
│ Model Server  │────▶│ Prediction Log │────▶│  Monitoring   │
│ (KServe)      │     │ (Kafka/S3)     │     │  Service      │
└──────────────┘     └────────────────┘     └──────┬───────┘
                                                    │
                                          ┌─────────┼─────────┐
                                          │         │         │
                                          ▼         ▼         ▼
                                    ┌──────────┐ ┌──────┐ ┌────────┐
                                    │ Drift    │ │Perf  │ │ Data   │
                                    │ Detector │ │Check │ │Quality │
                                    └────┬─────┘ └──┬───┘ └───┬────┘
                                         │          │         │
                                         ▼          ▼         ▼
                                    ┌──────────────────────────────┐
                                    │      Alert Manager           │
                                    │  (PagerDuty / Slack / Email) │
                                    └──────────────┬───────────────┘
                                                   │
                                          ┌────────┼────────┐
                                          ▼                 ▼
                                   ┌────────────┐  ┌────────────────┐
                                   │  Dashboard  │  │ Auto-Retrain   │
                                   │ (Grafana)   │  │ (Kubeflow)     │
                                   └────────────┘  └────────────────┘

Strategia de alertare

Nu orice drift justifica actiune. Implementeaza alertare pe niveluri:

class AlertStrategy:
    """Tiered alerting based on drift severity and business impact."""
 
    THRESHOLDS = {
        "data_drift": {
            "warning": 0.15,   # 15% din features cu drift
            "critical": 0.30,  # 30% din features cu drift
        },
        "performance": {
            "warning": 0.03,   # 3% scadere accuracy
            "critical": 0.08,  # 8% scadere accuracy
        },
        "latency_p99_ms": {
            "warning": 200,
            "critical": 500,
        },
        "error_rate": {
            "warning": 0.005,  # 0.5%
            "critical": 0.02,  # 2%
        },
    }
 
    def evaluate(self, metric_name: str, value: float) -> str | None:
        if metric_name not in self.THRESHOLDS:
            return None
        thresholds = self.THRESHOLDS[metric_name]
        if value >= thresholds["critical"]:
            return "critical"
        elif value >= thresholds["warning"]:
            return "warning"
        return None

Cand sa reantrenezi

| Semnal | Actiune | Urgenta | |--------|--------|---------| | Data drift < 15% features | Monitorizare, fara actiune | Scazuta | | Data drift 15-30% features | Programeaza reantrenare | Medie | | Data drift > 30% features | Reantreneaza imediat | Ridicata | | Scadere performanta < 3% | Monitorizare trend | Scazuta | | Scadere performanta 3-8% | Reantreneaza in 24h | Medie | | Scadere performanta > 8% | Reantreneaza + ia in considerare rollback | Critica | | Esec date upstream | Repara pipeline-ul de date mai intai | Critica |

Integrarea cu stiva MLOps

Monitorizarea modelelor se conecteaza la fiecare parte a ciclului de viata MLOps:

Lista de verificare pentru monitorizarea in productie

  • [ ] Logheaza fiecare predictie cu features de intrare, output si timestamp
  • [ ] Stocheaza datele de referinta de la ultima rulare de antrenament reusita
  • [ ] Ruleaza detectia drift-ului pe un program (orar sau zilnic in functie de volum)
  • [ ] Seteaza alerte pentru degradarea performantei cu niveluri de severitate
  • [ ] Monitorizarea calitatii datelor (null-uri, schimbari de schema, anomalii de volum)
  • [ ] Urmareste stabilitatea distributiei predictiilor
  • [ ] Monitorizarea latentei de serving si a ratelor de eroare
  • [ ] Dashboard cu metrici in timp real (Grafana)
  • [ ] Trigger de reantrenare automata cu poarta de aprobare umana

Ai nevoie de monitorizarea modelelor in productie? DeviDevs implementeaza monitorizare MLOps end-to-end cu detectie drift, alertare si reantrenare automata. Obtine o evaluare gratuita ->

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.