Model Monitoring in Production: Detecting Data Drift and Model Degradation
A deployed model is not a finished product. Data distributions shift, user behavior changes, and the relationship between features and outcomes evolves. Without monitoring, models degrade silently — predictions become less accurate, but nobody notices until revenue drops or users complain.
Why Models Degrade in Production
Data Drift
Input features change their statistical properties over time. A model trained on summer data encounters winter patterns. A pricing model trained pre-inflation sees different spending distributions.
Concept Drift
The relationship between inputs and outputs changes. What predicted churn last year may not predict churn today because customer expectations shifted.
Upstream Data Issues
A data pipeline breaks, a feature starts returning nulls, or a third-party API changes its response format. These cause sudden, catastrophic model failures.
Types of Drift Detection
Statistical Drift Tests
import numpy as np
from scipy import stats
from dataclasses import dataclass
@dataclass
class DriftResult:
feature: str
test_name: str
statistic: float
p_value: float
is_drifted: bool
threshold: float
class DriftDetector:
"""Multi-method drift detection for production features."""
def __init__(self, significance_level: float = 0.05):
self.alpha = significance_level
def ks_test(self, reference: np.ndarray, current: np.ndarray, feature_name: str) -> DriftResult:
"""Kolmogorov-Smirnov test for continuous features."""
stat, p_value = stats.ks_2samp(reference, current)
return DriftResult(
feature=feature_name,
test_name="KS Test",
statistic=stat,
p_value=p_value,
is_drifted=p_value < self.alpha,
threshold=self.alpha,
)
def chi_squared_test(self, reference: np.ndarray, current: np.ndarray, feature_name: str) -> DriftResult:
"""Chi-squared test for categorical features."""
# Build frequency tables aligned on the same categories
categories = np.union1d(np.unique(reference), np.unique(current))
ref_counts = np.array([np.sum(reference == c) for c in categories])
cur_counts = np.array([np.sum(current == c) for c in categories])
# Normalize to expected proportions
ref_proportions = ref_counts / ref_counts.sum()
expected = ref_proportions * cur_counts.sum()
expected = np.maximum(expected, 1) # Avoid division by zero
stat, p_value = stats.chisquare(cur_counts, f_exp=expected)
return DriftResult(
feature=feature_name,
test_name="Chi-Squared",
statistic=stat,
p_value=p_value,
is_drifted=p_value < self.alpha,
threshold=self.alpha,
)
def psi(self, reference: np.ndarray, current: np.ndarray, feature_name: str, bins: int = 10) -> DriftResult:
"""Population Stability Index — industry standard for model monitoring."""
# Bin the reference distribution
breakpoints = np.percentile(reference, np.linspace(0, 100, bins + 1))
breakpoints[0] = -np.inf
breakpoints[-1] = np.inf
ref_percents = np.histogram(reference, bins=breakpoints)[0] / len(reference)
cur_percents = np.histogram(current, bins=breakpoints)[0] / len(current)
# Avoid log(0)
ref_percents = np.maximum(ref_percents, 0.0001)
cur_percents = np.maximum(cur_percents, 0.0001)
psi_value = np.sum((cur_percents - ref_percents) * np.log(cur_percents / ref_percents))
return DriftResult(
feature=feature_name,
test_name="PSI",
statistic=psi_value,
p_value=0.0, # PSI doesn't use p-values
is_drifted=psi_value > 0.2, # PSI > 0.2 = significant drift
threshold=0.2,
)Performance Monitoring
from datetime import datetime, timedelta
from collections import deque
class PerformanceMonitor:
"""Track model performance metrics over sliding windows."""
def __init__(self, window_size: int = 1000, alert_threshold: float = 0.05):
self.window = deque(maxlen=window_size)
self.baseline_metrics: dict = {}
self.alert_threshold = alert_threshold
def set_baseline(self, metrics: dict):
"""Set baseline metrics from validation or initial deployment."""
self.baseline_metrics = metrics
def log_prediction(self, prediction: float, actual: float, timestamp: datetime | None = None):
"""Log a single prediction-actual pair."""
self.window.append({
"prediction": prediction,
"actual": actual,
"correct": abs(prediction - actual) < 0.5, # For classification
"timestamp": timestamp or datetime.utcnow(),
})
def get_current_metrics(self) -> dict:
"""Calculate metrics over the current window."""
if len(self.window) < 100:
return {"status": "insufficient_data", "count": len(self.window)}
correct = sum(1 for p in self.window if p["correct"])
accuracy = correct / len(self.window)
predictions = [p["prediction"] for p in self.window]
actuals = [p["actual"] for p in self.window]
return {
"accuracy": accuracy,
"prediction_mean": np.mean(predictions),
"prediction_std": np.std(predictions),
"actual_mean": np.mean(actuals),
"window_size": len(self.window),
}
def check_degradation(self) -> list[dict]:
"""Check if current metrics have degraded vs baseline."""
alerts = []
current = self.get_current_metrics()
if current.get("status") == "insufficient_data":
return alerts
for metric_name in ["accuracy"]:
if metric_name in self.baseline_metrics and metric_name in current:
baseline_val = self.baseline_metrics[metric_name]
current_val = current[metric_name]
degradation = baseline_val - current_val
if degradation > self.alert_threshold:
alerts.append({
"metric": metric_name,
"baseline": baseline_val,
"current": current_val,
"degradation": degradation,
"severity": "critical" if degradation > self.alert_threshold * 2 else "warning",
})
return alertsMonitoring with Evidently
Evidently is the most popular open-source ML monitoring library. Here's a production-ready setup:
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset, TargetDriftPreset
from evidently.metrics import (
DatasetDriftMetric,
DataDriftTable,
ColumnDriftMetric,
)
def generate_drift_report(reference_data, current_data, feature_columns: list[str]) -> dict:
"""Generate comprehensive drift report using Evidently."""
report = Report(metrics=[
DatasetDriftMetric(),
DataDriftTable(),
*[ColumnDriftMetric(column_name=col) for col in feature_columns[:10]], # Top 10 features
])
report.run(reference_data=reference_data, current_data=current_data)
results = report.as_dict()
# Extract actionable drift summary
dataset_drift = results["metrics"][0]["result"]
column_drifts = results["metrics"][1]["result"]
drifted_features = [
col for col, info in column_drifts["drift_by_columns"].items()
if info["drift_detected"]
]
return {
"dataset_drift_detected": dataset_drift["dataset_drift"],
"drift_share": dataset_drift["drift_share"],
"drifted_features": drifted_features,
"total_features": len(column_drifts["drift_by_columns"]),
"drifted_count": len(drifted_features),
}Monitoring Architecture
Production Traffic
│
▼
┌──────────────┐ ┌────────────────┐ ┌──────────────┐
│ Model Server │────▶│ Prediction Log │────▶│ Monitoring │
│ (KServe) │ │ (Kafka/S3) │ │ Service │
└──────────────┘ └────────────────┘ └──────┬───────┘
│
┌─────────┼─────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────┐ ┌────────┐
│ Drift │ │Perf │ │ Data │
│ Detector │ │Check │ │Quality │
└────┬─────┘ └──┬───┘ └───┬────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────┐
│ Alert Manager │
│ (PagerDuty / Slack / Email) │
└──────────────┬───────────────┘
│
┌────────┼────────┐
▼ ▼
┌────────────┐ ┌────────────────┐
│ Dashboard │ │ Auto-Retrain │
│ (Grafana) │ │ (Kubeflow) │
└────────────┘ └────────────────┘
Alert Strategy
Not all drift warrants action. Implement tiered alerting:
class AlertStrategy:
"""Tiered alerting based on drift severity and business impact."""
THRESHOLDS = {
"data_drift": {
"warning": 0.15, # 15% of features drifted
"critical": 0.30, # 30% of features drifted
},
"performance": {
"warning": 0.03, # 3% accuracy drop
"critical": 0.08, # 8% accuracy drop
},
"latency_p99_ms": {
"warning": 200,
"critical": 500,
},
"error_rate": {
"warning": 0.005, # 0.5%
"critical": 0.02, # 2%
},
}
def evaluate(self, metric_name: str, value: float) -> str | None:
if metric_name not in self.THRESHOLDS:
return None
thresholds = self.THRESHOLDS[metric_name]
if value >= thresholds["critical"]:
return "critical"
elif value >= thresholds["warning"]:
return "warning"
return NoneWhen to Retrain
| Signal | Action | Urgency | |--------|--------|---------| | Data drift < 15% features | Monitor, no action | Low | | Data drift 15-30% features | Schedule retraining | Medium | | Data drift > 30% features | Retrain immediately | High | | Performance drop < 3% | Monitor trend | Low | | Performance drop 3-8% | Retrain within 24h | Medium | | Performance drop > 8% | Retrain + consider rollback | Critical | | Upstream data failure | Fix data pipeline first | Critical |
Integration with MLOps Stack
Model monitoring connects to every part of the MLOps lifecycle:
- Experiment tracking — Compare current model metrics against training baselines logged in MLflow
- Feature stores — Monitor feature freshness and consistency between online/offline stores
- ML pipelines — Trigger automated retraining when monitoring detects degradation
- MLOps best practices — Monitoring is the foundation of production ML reliability
- AI security monitoring — Combine ML monitoring with security observability
Monitoring Checklist for Production
- [ ] Log every prediction with input features, output, and timestamp
- [ ] Store reference data from the last successful training run
- [ ] Run drift detection on a schedule (hourly or daily depending on volume)
- [ ] Set up alerts for performance degradation with severity tiers
- [ ] Monitor data quality (nulls, schema changes, volume anomalies)
- [ ] Track prediction distribution stability
- [ ] Monitor serving latency and error rates
- [ ] Dashboard with real-time metrics (Grafana)
- [ ] Automated retraining trigger with human approval gate
Need production model monitoring? DeviDevs implements end-to-end MLOps monitoring with drift detection, alerting, and automated retraining. Get a free assessment →