MLOps

Model Serving Architecture: From Batch to Real-Time Inference at Scale

Petru Constantin
--6 min lectura
#model serving#ML inference#MLOps#KServe#real-time ML#model deployment

Model Serving Architecture: From Batch to Real-Time Inference at Scale

Training a great model is only half the challenge. Serving that model to users reliably, with low latency, at scale, that's where most ML projects struggle. This guide covers the architectural patterns for both batch and real-time model serving.

Choosing Your Serving Pattern

| Pattern | Latency | Throughput | Use Case | |---------|---------|-----------|----------| | Batch | Minutes-hours | Very high | Reports, recommendations, scoring | | Real-time REST | 50-200ms | Medium | Web/mobile APIs | | Real-time gRPC | 10-50ms | High | Internal services | | Streaming | Sub-second | Continuous | Fraud detection, real-time pricing | | Edge | < 10ms | Per-device | IoT, mobile on-device |

Pattern 1: Real-Time Serving with FastAPI

For teams starting out or serving simpler models, a FastAPI-based serving layer provides maximum flexibility:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import mlflow.pyfunc
import numpy as np
import time
import logging
from contextlib import asynccontextmanager
from prometheus_client import Histogram, Counter, generate_latest
 
# Metrics
PREDICTION_LATENCY = Histogram("prediction_latency_seconds", "Time to generate prediction", ["model_name"])
PREDICTION_COUNT = Counter("prediction_total", "Total predictions", ["model_name", "status"])
 
# Model cache
models: dict = {}
 
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Load models on startup."""
    models["churn"] = mlflow.pyfunc.load_model("models:/churn-predictor/Production")
    models["pricing"] = mlflow.pyfunc.load_model("models:/dynamic-pricing/Production")
    logging.info(f"Loaded {len(models)} models")
    yield
    models.clear()
 
app = FastAPI(title="ML Serving API", lifespan=lifespan)
 
class PredictionRequest(BaseModel):
    features: dict[str, float] = Field(..., description="Feature name-value pairs")
    model_name: str = Field(default="churn", description="Model to use for prediction")
 
class PredictionResponse(BaseModel):
    prediction: float
    probability: float | None = None
    model_version: str
    latency_ms: float
 
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    if request.model_name not in models:
        raise HTTPException(404, f"Model '{request.model_name}' not found")
 
    model = models[request.model_name]
    start = time.perf_counter()
 
    try:
        import pandas as pd
        df = pd.DataFrame([request.features])
        result = model.predict(df)
 
        latency_ms = (time.perf_counter() - start) * 1000
        PREDICTION_LATENCY.labels(model_name=request.model_name).observe(latency_ms / 1000)
        PREDICTION_COUNT.labels(model_name=request.model_name, status="success").inc()
 
        return PredictionResponse(
            prediction=float(result[0]),
            model_version=getattr(model, "metadata", {}).get("run_id", "unknown"),
            latency_ms=round(latency_ms, 2),
        )
    except Exception as e:
        PREDICTION_COUNT.labels(model_name=request.model_name, status="error").inc()
        raise HTTPException(500, f"Prediction failed: {str(e)}")
 
@app.get("/health")
async def health():
    return {"status": "healthy", "models_loaded": list(models.keys())}
 
@app.get("/metrics")
async def metrics():
    from starlette.responses import Response
    return Response(content=generate_latest(), media_type="text/plain")

Pattern 2: KServe on Kubernetes

For production-grade serving with auto-scaling, canary deployments, and GPU support:

# kserve-inference-service.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: churn-predictor
  namespace: ml-serving
  annotations:
    serving.kserve.io/autoscalerClass: hpa
spec:
  predictor:
    minReplicas: 2
    maxReplicas: 10
    scaleTarget: 70         # Scale up at 70% CPU
    scaleMetric: cpu
    model:
      modelFormat:
        name: mlflow
      storageUri: "s3://ml-models/churn-predictor/v2.3"
      resources:
        requests:
          cpu: "1"
          memory: "2Gi"
        limits:
          cpu: "2"
          memory: "4Gi"
    # Canary traffic split
    canaryTrafficPercent: 10
  transformer:
    containers:
      - name: feature-transformer
        image: registry.company.com/ml/feature-transformer:v1.2
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"

KServe with Custom Predictor

import kserve
from kserve import Model, ModelServer
import joblib
import numpy as np
 
class ChurnPredictor(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.model = None
 
    def load(self):
        self.model = joblib.load("/mnt/models/model.joblib")
        self.ready = True
 
    def predict(self, payload: dict, headers: dict = None) -> dict:
        instances = payload.get("instances", [])
        features = np.array(instances)
 
        predictions = self.model.predict(features)
        probabilities = self.model.predict_proba(features)
 
        return {
            "predictions": predictions.tolist(),
            "probabilities": probabilities.tolist(),
        }
 
if __name__ == "__main__":
    model = ChurnPredictor("churn-predictor")
    model.load()
    ModelServer().start([model])

Pattern 3: Batch Inference Pipeline

For use cases where real-time isn't required (recommendations, daily scoring):

from datetime import datetime
import pandas as pd
import mlflow.pyfunc
from concurrent.futures import ProcessPoolExecutor
 
class BatchInferencePipeline:
    """Score millions of records efficiently in batch."""
 
    def __init__(self, model_name: str, model_stage: str = "Production"):
        self.model = mlflow.pyfunc.load_model(f"models:/{model_name}/{model_stage}")
        self.model_name = model_name
 
    def score_batch(self, data: pd.DataFrame, batch_size: int = 10000) -> pd.DataFrame:
        """Score data in chunks to manage memory."""
        results = []
        total_rows = len(data)
 
        for i in range(0, total_rows, batch_size):
            chunk = data.iloc[i:i + batch_size]
            predictions = self.model.predict(chunk)
 
            chunk_result = chunk.copy()
            chunk_result["prediction"] = predictions
            chunk_result["scored_at"] = datetime.utcnow().isoformat()
            chunk_result["model_name"] = self.model_name
 
            results.append(chunk_result)
 
            progress = min(i + batch_size, total_rows) / total_rows * 100
            print(f"Progress: {progress:.1f}% ({min(i + batch_size, total_rows)}/{total_rows})")
 
        return pd.concat(results, ignore_index=True)
 
# Usage
pipeline = BatchInferencePipeline("churn-predictor")
customers = pd.read_parquet("s3://data/customers/latest.parquet")
scored = pipeline.score_batch(customers)
scored.to_parquet(f"s3://predictions/churn/{datetime.utcnow().strftime('%Y-%m-%d')}.parquet")

A/B Testing Model Versions

import hashlib
from typing import Literal
 
class ABRouter:
    """Deterministic A/B routing for model experiments."""
 
    def __init__(self, experiments: dict[str, dict]):
        """
        experiments = {
            "control": {"model": model_a, "traffic": 0.80},
            "treatment": {"model": model_b, "traffic": 0.20},
        }
        """
        self.experiments = experiments
        self._validate_traffic()
 
    def _validate_traffic(self):
        total = sum(e["traffic"] for e in self.experiments.values())
        assert abs(total - 1.0) < 0.01, f"Traffic must sum to 1.0, got {total}"
 
    def route(self, entity_id: str) -> tuple[str, object]:
        """Deterministically assign entity to experiment variant."""
        # Hash entity ID for consistent assignment
        hash_val = int(hashlib.sha256(entity_id.encode()).hexdigest(), 16) % 10000
        normalized = hash_val / 10000
 
        cumulative = 0.0
        for variant_name, config in self.experiments.items():
            cumulative += config["traffic"]
            if normalized < cumulative:
                return variant_name, config["model"]
 
        # Fallback to last variant
        last_key = list(self.experiments.keys())[-1]
        return last_key, self.experiments[last_key]["model"]

Model Optimization for Serving

Quantization and ONNX Export

import torch
import onnx
import onnxruntime as ort
 
def optimize_pytorch_model(model, sample_input, output_path: str):
    """Export PyTorch model to ONNX for faster inference."""
    model.eval()
 
    # Export to ONNX
    torch.onnx.export(
        model,
        sample_input,
        output_path,
        export_params=True,
        opset_version=17,
        do_constant_folding=True,
        input_names=["input"],
        output_names=["output"],
        dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
    )
 
    # Validate
    onnx_model = onnx.load(output_path)
    onnx.checker.check_model(onnx_model)
    return output_path
 
def serve_onnx(model_path: str, features: np.ndarray) -> np.ndarray:
    """Run inference with ONNX Runtime, typically 2-5x faster than PyTorch."""
    session = ort.InferenceSession(model_path, providers=["CPUExecutionProvider"])
    input_name = session.get_inputs()[0].name
    result = session.run(None, {input_name: features.astype(np.float32)})
    return result[0]

Serving Infrastructure Comparison

| Framework | Protocol | Auto-scaling | GPU | Multi-model | Best For | |-----------|----------|-------------|-----|-------------|----------| | FastAPI | REST | Manual/K8s HPA | Manual | Manual | Simple, flexible | | KServe | REST/gRPC | Built-in | Native | Built-in | K8s production | | Seldon Core | REST/gRPC | Built-in | Native | Built-in | Complex graphs | | BentoML | REST/gRPC | BentoCloud | Supported | Built-in | Easy packaging | | TF Serving | REST/gRPC | Manual | Native | Built-in | TensorFlow only | | Triton | REST/gRPC | Built-in | Optimized | Built-in | GPU inference |

Monitoring Serving Infrastructure

Key metrics to track in production:

# Prometheus alerts for model serving
groups:
  - name: ml-serving
    rules:
      - alert: HighPredictionLatency
        expr: histogram_quantile(0.99, prediction_latency_seconds) > 0.2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "P99 prediction latency exceeds 200ms"
 
      - alert: HighErrorRate
        expr: rate(prediction_total{status="error"}[5m]) / rate(prediction_total[5m]) > 0.01
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Model error rate exceeds 1%"
 
      - alert: LowThroughput
        expr: rate(prediction_total[5m]) < 1
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Prediction throughput dropped below 1 req/s"

Need production model serving? DeviDevs builds scalable ML serving infrastructure with auto-scaling, A/B testing, and monitoring. Get a free assessment →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.