MLOps

Model Monitoring in Production: Detecting Data Drift and Model Degradation

DeviDevs Team
7 min read
#model monitoring#data drift#MLOps#production ML#model degradation#Evidently

Model Monitoring in Production: Detecting Data Drift and Model Degradation

A deployed model is not a finished product. Data distributions shift, user behavior changes, and the relationship between features and outcomes evolves. Without monitoring, models degrade silently — predictions become less accurate, but nobody notices until revenue drops or users complain.

Why Models Degrade in Production

Data Drift

Input features change their statistical properties over time. A model trained on summer data encounters winter patterns. A pricing model trained pre-inflation sees different spending distributions.

Concept Drift

The relationship between inputs and outputs changes. What predicted churn last year may not predict churn today because customer expectations shifted.

Upstream Data Issues

A data pipeline breaks, a feature starts returning nulls, or a third-party API changes its response format. These cause sudden, catastrophic model failures.

Types of Drift Detection

Statistical Drift Tests

import numpy as np
from scipy import stats
from dataclasses import dataclass
 
@dataclass
class DriftResult:
    feature: str
    test_name: str
    statistic: float
    p_value: float
    is_drifted: bool
    threshold: float
 
class DriftDetector:
    """Multi-method drift detection for production features."""
 
    def __init__(self, significance_level: float = 0.05):
        self.alpha = significance_level
 
    def ks_test(self, reference: np.ndarray, current: np.ndarray, feature_name: str) -> DriftResult:
        """Kolmogorov-Smirnov test for continuous features."""
        stat, p_value = stats.ks_2samp(reference, current)
        return DriftResult(
            feature=feature_name,
            test_name="KS Test",
            statistic=stat,
            p_value=p_value,
            is_drifted=p_value < self.alpha,
            threshold=self.alpha,
        )
 
    def chi_squared_test(self, reference: np.ndarray, current: np.ndarray, feature_name: str) -> DriftResult:
        """Chi-squared test for categorical features."""
        # Build frequency tables aligned on the same categories
        categories = np.union1d(np.unique(reference), np.unique(current))
        ref_counts = np.array([np.sum(reference == c) for c in categories])
        cur_counts = np.array([np.sum(current == c) for c in categories])
        # Normalize to expected proportions
        ref_proportions = ref_counts / ref_counts.sum()
        expected = ref_proportions * cur_counts.sum()
        expected = np.maximum(expected, 1)  # Avoid division by zero
        stat, p_value = stats.chisquare(cur_counts, f_exp=expected)
        return DriftResult(
            feature=feature_name,
            test_name="Chi-Squared",
            statistic=stat,
            p_value=p_value,
            is_drifted=p_value < self.alpha,
            threshold=self.alpha,
        )
 
    def psi(self, reference: np.ndarray, current: np.ndarray, feature_name: str, bins: int = 10) -> DriftResult:
        """Population Stability Index — industry standard for model monitoring."""
        # Bin the reference distribution
        breakpoints = np.percentile(reference, np.linspace(0, 100, bins + 1))
        breakpoints[0] = -np.inf
        breakpoints[-1] = np.inf
 
        ref_percents = np.histogram(reference, bins=breakpoints)[0] / len(reference)
        cur_percents = np.histogram(current, bins=breakpoints)[0] / len(current)
 
        # Avoid log(0)
        ref_percents = np.maximum(ref_percents, 0.0001)
        cur_percents = np.maximum(cur_percents, 0.0001)
 
        psi_value = np.sum((cur_percents - ref_percents) * np.log(cur_percents / ref_percents))
 
        return DriftResult(
            feature=feature_name,
            test_name="PSI",
            statistic=psi_value,
            p_value=0.0,  # PSI doesn't use p-values
            is_drifted=psi_value > 0.2,  # PSI > 0.2 = significant drift
            threshold=0.2,
        )

Performance Monitoring

from datetime import datetime, timedelta
from collections import deque
 
class PerformanceMonitor:
    """Track model performance metrics over sliding windows."""
 
    def __init__(self, window_size: int = 1000, alert_threshold: float = 0.05):
        self.window = deque(maxlen=window_size)
        self.baseline_metrics: dict = {}
        self.alert_threshold = alert_threshold
 
    def set_baseline(self, metrics: dict):
        """Set baseline metrics from validation or initial deployment."""
        self.baseline_metrics = metrics
 
    def log_prediction(self, prediction: float, actual: float, timestamp: datetime | None = None):
        """Log a single prediction-actual pair."""
        self.window.append({
            "prediction": prediction,
            "actual": actual,
            "correct": abs(prediction - actual) < 0.5,  # For classification
            "timestamp": timestamp or datetime.utcnow(),
        })
 
    def get_current_metrics(self) -> dict:
        """Calculate metrics over the current window."""
        if len(self.window) < 100:
            return {"status": "insufficient_data", "count": len(self.window)}
 
        correct = sum(1 for p in self.window if p["correct"])
        accuracy = correct / len(self.window)
 
        predictions = [p["prediction"] for p in self.window]
        actuals = [p["actual"] for p in self.window]
 
        return {
            "accuracy": accuracy,
            "prediction_mean": np.mean(predictions),
            "prediction_std": np.std(predictions),
            "actual_mean": np.mean(actuals),
            "window_size": len(self.window),
        }
 
    def check_degradation(self) -> list[dict]:
        """Check if current metrics have degraded vs baseline."""
        alerts = []
        current = self.get_current_metrics()
 
        if current.get("status") == "insufficient_data":
            return alerts
 
        for metric_name in ["accuracy"]:
            if metric_name in self.baseline_metrics and metric_name in current:
                baseline_val = self.baseline_metrics[metric_name]
                current_val = current[metric_name]
                degradation = baseline_val - current_val
 
                if degradation > self.alert_threshold:
                    alerts.append({
                        "metric": metric_name,
                        "baseline": baseline_val,
                        "current": current_val,
                        "degradation": degradation,
                        "severity": "critical" if degradation > self.alert_threshold * 2 else "warning",
                    })
        return alerts

Monitoring with Evidently

Evidently is the most popular open-source ML monitoring library. Here's a production-ready setup:

from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset, TargetDriftPreset
from evidently.metrics import (
    DatasetDriftMetric,
    DataDriftTable,
    ColumnDriftMetric,
)
 
def generate_drift_report(reference_data, current_data, feature_columns: list[str]) -> dict:
    """Generate comprehensive drift report using Evidently."""
    report = Report(metrics=[
        DatasetDriftMetric(),
        DataDriftTable(),
        *[ColumnDriftMetric(column_name=col) for col in feature_columns[:10]],  # Top 10 features
    ])
 
    report.run(reference_data=reference_data, current_data=current_data)
    results = report.as_dict()
 
    # Extract actionable drift summary
    dataset_drift = results["metrics"][0]["result"]
    column_drifts = results["metrics"][1]["result"]
 
    drifted_features = [
        col for col, info in column_drifts["drift_by_columns"].items()
        if info["drift_detected"]
    ]
 
    return {
        "dataset_drift_detected": dataset_drift["dataset_drift"],
        "drift_share": dataset_drift["drift_share"],
        "drifted_features": drifted_features,
        "total_features": len(column_drifts["drift_by_columns"]),
        "drifted_count": len(drifted_features),
    }

Monitoring Architecture

Production Traffic
       │
       ▼
┌──────────────┐     ┌────────────────┐     ┌──────────────┐
│ Model Server  │────▶│ Prediction Log │────▶│  Monitoring   │
│ (KServe)      │     │ (Kafka/S3)     │     │  Service      │
└──────────────┘     └────────────────┘     └──────┬───────┘
                                                    │
                                          ┌─────────┼─────────┐
                                          │         │         │
                                          ▼         ▼         ▼
                                    ┌──────────┐ ┌──────┐ ┌────────┐
                                    │ Drift    │ │Perf  │ │ Data   │
                                    │ Detector │ │Check │ │Quality │
                                    └────┬─────┘ └──┬───┘ └───┬────┘
                                         │          │         │
                                         ▼          ▼         ▼
                                    ┌──────────────────────────────┐
                                    │      Alert Manager           │
                                    │  (PagerDuty / Slack / Email) │
                                    └──────────────┬───────────────┘
                                                   │
                                          ┌────────┼────────┐
                                          ▼                 ▼
                                   ┌────────────┐  ┌────────────────┐
                                   │  Dashboard  │  │ Auto-Retrain   │
                                   │ (Grafana)   │  │ (Kubeflow)     │
                                   └────────────┘  └────────────────┘

Alert Strategy

Not all drift warrants action. Implement tiered alerting:

class AlertStrategy:
    """Tiered alerting based on drift severity and business impact."""
 
    THRESHOLDS = {
        "data_drift": {
            "warning": 0.15,   # 15% of features drifted
            "critical": 0.30,  # 30% of features drifted
        },
        "performance": {
            "warning": 0.03,   # 3% accuracy drop
            "critical": 0.08,  # 8% accuracy drop
        },
        "latency_p99_ms": {
            "warning": 200,
            "critical": 500,
        },
        "error_rate": {
            "warning": 0.005,  # 0.5%
            "critical": 0.02,  # 2%
        },
    }
 
    def evaluate(self, metric_name: str, value: float) -> str | None:
        if metric_name not in self.THRESHOLDS:
            return None
        thresholds = self.THRESHOLDS[metric_name]
        if value >= thresholds["critical"]:
            return "critical"
        elif value >= thresholds["warning"]:
            return "warning"
        return None

When to Retrain

| Signal | Action | Urgency | |--------|--------|---------| | Data drift < 15% features | Monitor, no action | Low | | Data drift 15-30% features | Schedule retraining | Medium | | Data drift > 30% features | Retrain immediately | High | | Performance drop < 3% | Monitor trend | Low | | Performance drop 3-8% | Retrain within 24h | Medium | | Performance drop > 8% | Retrain + consider rollback | Critical | | Upstream data failure | Fix data pipeline first | Critical |

Integration with MLOps Stack

Model monitoring connects to every part of the MLOps lifecycle:

Monitoring Checklist for Production

  • [ ] Log every prediction with input features, output, and timestamp
  • [ ] Store reference data from the last successful training run
  • [ ] Run drift detection on a schedule (hourly or daily depending on volume)
  • [ ] Set up alerts for performance degradation with severity tiers
  • [ ] Monitor data quality (nulls, schema changes, volume anomalies)
  • [ ] Track prediction distribution stability
  • [ ] Monitor serving latency and error rates
  • [ ] Dashboard with real-time metrics (Grafana)
  • [ ] Automated retraining trigger with human approval gate

Need production model monitoring? DeviDevs implements end-to-end MLOps monitoring with drift detection, alerting, and automated retraining. Get a free assessment →

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.