Monitorizarea modelelor in productie: Detectarea data drift si a degradarii modelului
Un model deployat nu e un produs finalizat. Distributiile datelor se schimba, comportamentul utilizatorilor evolueaza, iar relatia dintre features si rezultate se transforma. Fara monitorizare, modelele se degradeaza in tacere - predictiile devin mai putin precise, dar nimeni nu observa pana cand veniturile scad sau utilizatorii se plang.
De ce se degradeaza modelele in productie
Data Drift
Atributele de intrare isi schimba proprietatile statistice in timp. Un model antrenat pe date de vara intalneste patternuri de iarna. Un model de pricing antrenat inainte de inflatie vede distributii de cheltuieli diferite.
Concept Drift
Relatia dintre intrari si iesiri se schimba. Ce prezice churn anul trecut s-ar putea sa nu mai prezica churn azi, pentru ca asteptarile clientilor s-au schimbat.
Probleme de date upstream
Un pipeline de date se strica, un feature incepe sa returneze null-uri sau un API tert isi schimba formatul raspunsului. Acestea cauzeaza esecuri bruste si catastrofale ale modelului.
Tipuri de detectare a drift-ului
Teste statistice de drift
import numpy as np
from scipy import stats
from dataclasses import dataclass
@dataclass
class DriftResult:
feature: str
test_name: str
statistic: float
p_value: float
is_drifted: bool
threshold: float
class DriftDetector:
"""Multi-method drift detection for production features."""
def __init__(self, significance_level: float = 0.05):
self.alpha = significance_level
def ks_test(self, reference: np.ndarray, current: np.ndarray, feature_name: str) -> DriftResult:
"""Kolmogorov-Smirnov test for continuous features."""
stat, p_value = stats.ks_2samp(reference, current)
return DriftResult(
feature=feature_name,
test_name="KS Test",
statistic=stat,
p_value=p_value,
is_drifted=p_value < self.alpha,
threshold=self.alpha,
)
def chi_squared_test(self, reference: np.ndarray, current: np.ndarray, feature_name: str) -> DriftResult:
"""Chi-squared test for categorical features."""
# Build frequency tables aligned on the same categories
categories = np.union1d(np.unique(reference), np.unique(current))
ref_counts = np.array([np.sum(reference == c) for c in categories])
cur_counts = np.array([np.sum(current == c) for c in categories])
# Normalize to expected proportions
ref_proportions = ref_counts / ref_counts.sum()
expected = ref_proportions * cur_counts.sum()
expected = np.maximum(expected, 1) # Avoid division by zero
stat, p_value = stats.chisquare(cur_counts, f_exp=expected)
return DriftResult(
feature=feature_name,
test_name="Chi-Squared",
statistic=stat,
p_value=p_value,
is_drifted=p_value < self.alpha,
threshold=self.alpha,
)
def psi(self, reference: np.ndarray, current: np.ndarray, feature_name: str, bins: int = 10) -> DriftResult:
"""Population Stability Index, industry standard for model monitoring."""
# Bin the reference distribution
breakpoints = np.percentile(reference, np.linspace(0, 100, bins + 1))
breakpoints[0] = -np.inf
breakpoints[-1] = np.inf
ref_percents = np.histogram(reference, bins=breakpoints)[0] / len(reference)
cur_percents = np.histogram(current, bins=breakpoints)[0] / len(current)
# Avoid log(0)
ref_percents = np.maximum(ref_percents, 0.0001)
cur_percents = np.maximum(cur_percents, 0.0001)
psi_value = np.sum((cur_percents - ref_percents) * np.log(cur_percents / ref_percents))
return DriftResult(
feature=feature_name,
test_name="PSI",
statistic=psi_value,
p_value=0.0, # PSI doesn't use p-values
is_drifted=psi_value > 0.2, # PSI > 0.2 = significant drift
threshold=0.2,
)Monitorizarea performantei
from datetime import datetime, timedelta
from collections import deque
class PerformanceMonitor:
"""Track model performance metrics over sliding windows."""
def __init__(self, window_size: int = 1000, alert_threshold: float = 0.05):
self.window = deque(maxlen=window_size)
self.baseline_metrics: dict = {}
self.alert_threshold = alert_threshold
def set_baseline(self, metrics: dict):
"""Set baseline metrics from validation or initial deployment."""
self.baseline_metrics = metrics
def log_prediction(self, prediction: float, actual: float, timestamp: datetime | None = None):
"""Log a single prediction-actual pair."""
self.window.append({
"prediction": prediction,
"actual": actual,
"correct": abs(prediction - actual) < 0.5, # For classification
"timestamp": timestamp or datetime.utcnow(),
})
def get_current_metrics(self) -> dict:
"""Calculate metrics over the current window."""
if len(self.window) < 100:
return {"status": "insufficient_data", "count": len(self.window)}
correct = sum(1 for p in self.window if p["correct"])
accuracy = correct / len(self.window)
predictions = [p["prediction"] for p in self.window]
actuals = [p["actual"] for p in self.window]
return {
"accuracy": accuracy,
"prediction_mean": np.mean(predictions),
"prediction_std": np.std(predictions),
"actual_mean": np.mean(actuals),
"window_size": len(self.window),
}
def check_degradation(self) -> list[dict]:
"""Check if current metrics have degraded vs baseline."""
alerts = []
current = self.get_current_metrics()
if current.get("status") == "insufficient_data":
return alerts
for metric_name in ["accuracy"]:
if metric_name in self.baseline_metrics and metric_name in current:
baseline_val = self.baseline_metrics[metric_name]
current_val = current[metric_name]
degradation = baseline_val - current_val
if degradation > self.alert_threshold:
alerts.append({
"metric": metric_name,
"baseline": baseline_val,
"current": current_val,
"degradation": degradation,
"severity": "critical" if degradation > self.alert_threshold * 2 else "warning",
})
return alertsMonitorizare cu Evidently
Evidently e cea mai populara biblioteca open-source de monitorizare ML. Iata un setup gata de productie:
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset, TargetDriftPreset
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
)
def generate_drift_report(reference_data, current_data, feature_columns: list[str]) -> dict:
"""Generate comprehensive drift report using Evidently."""
report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
*[ColumnDriftMetric(column_name=col) for col in feature_columns[:10]], # Top 10 features
])
report.run(reference_data=reference_data, current_data=current_data)
results = report.as_dict()
# Extract actionable drift summary
dataset_drift = results["metrics"][0]["result"]
column_drifts = results["metrics"][1]["result"]
drifted_features = [
col for col, info in column_drifts["drift_by_columns"].items()
if info["drift_detected"]
]
return {
"dataset_drift_detected": dataset_drift["dataset_drift"],
"drift_share": dataset_drift["drift_share"],
"drifted_features": drifted_features,
"total_features": len(column_drifts["drift_by_columns"]),
"drifted_count": len(drifted_features),
}Arhitectura de monitorizare
Production Traffic
│
▼
┌──────────────┐ ┌────────────────┐ ┌──────────────┐
│ Model Server │────▶│ Prediction Log │────▶│ Monitoring │
│ (KServe) │ │ (Kafka/S3) │ │ Service │
└──────────────┘ └────────────────┘ └──────┬───────┘
│
┌─────────┼─────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────┐ ┌────────┐
│ Drift │ │Perf │ │ Data │
│ Detector │ │Check │ │Quality │
└────┬─────┘ └──┬───┘ └───┬────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────┐
│ Alert Manager │
│ (PagerDuty / Slack / Email) │
└──────────────┬───────────────┘
│
┌────────┼────────┐
▼ ▼
┌────────────┐ ┌────────────────┐
│ Dashboard │ │ Auto-Retrain │
│ (Grafana) │ │ (Kubeflow) │
└────────────┘ └────────────────┘
Strategia de alertare
Nu orice drift justifica actiune. Implementeaza alertare pe niveluri:
class AlertStrategy:
"""Tiered alerting based on drift severity and business impact."""
THRESHOLDS = {
"data_drift": {
"warning": 0.15, # 15% din features cu drift
"critical": 0.30, # 30% din features cu drift
},
"performance": {
"warning": 0.03, # 3% scadere accuracy
"critical": 0.08, # 8% scadere accuracy
},
"latency_p99_ms": {
"warning": 200,
"critical": 500,
},
"error_rate": {
"warning": 0.005, # 0.5%
"critical": 0.02, # 2%
},
}
def evaluate(self, metric_name: str, value: float) -> str | None:
if metric_name not in self.THRESHOLDS:
return None
thresholds = self.THRESHOLDS[metric_name]
if value >= thresholds["critical"]:
return "critical"
elif value >= thresholds["warning"]:
return "warning"
return NoneCand sa reantrenezi
| Semnal | Actiune | Urgenta | |--------|--------|---------| | Data drift < 15% features | Monitorizare, fara actiune | Scazuta | | Data drift 15-30% features | Programeaza reantrenare | Medie | | Data drift > 30% features | Reantreneaza imediat | Ridicata | | Scadere performanta < 3% | Monitorizare trend | Scazuta | | Scadere performanta 3-8% | Reantreneaza in 24h | Medie | | Scadere performanta > 8% | Reantreneaza + ia in considerare rollback | Critica | | Esec date upstream | Repara pipeline-ul de date mai intai | Critica |
Integrarea cu stiva MLOps
Monitorizarea modelelor se conecteaza la fiecare parte a ciclului de viata MLOps:
- Experiment tracking: Compara metricile curente ale modelului cu baseline-urile de antrenament logate in MLflow
- Feature stores: Monitorizarea prospetimiisi consistentei features intre store-urile online/offline
- Pipeline-uri ML: Declanseaza reantrenare automata cand monitorizarea detecteaza degradare
- Bune practici MLOps: Monitorizarea e fundamentul fiabilitatii ML in productie
- Monitorizare securitate AI: Combina monitorizarea ML cu observabilitatea securitatii
Lista de verificare pentru monitorizarea in productie
- [ ] Logheaza fiecare predictie cu features de intrare, output si timestamp
- [ ] Stocheaza datele de referinta de la ultima rulare de antrenament reusita
- [ ] Ruleaza detectia drift-ului pe un program (orar sau zilnic in functie de volum)
- [ ] Seteaza alerte pentru degradarea performantei cu niveluri de severitate
- [ ] Monitorizarea calitatii datelor (null-uri, schimbari de schema, anomalii de volum)
- [ ] Urmareste stabilitatea distributiei predictiilor
- [ ] Monitorizarea latentei de serving si a ratelor de eroare
- [ ] Dashboard cu metrici in timp real (Grafana)
- [ ] Trigger de reantrenare automata cu poarta de aprobare umana
Ai nevoie de monitorizarea modelelor in productie? DeviDevs implementeaza monitorizare MLOps end-to-end cu detectie drift, alertare si reantrenare automata. Obtine o evaluare gratuita ->