MLOps

Model Serving: inferenta batch si real-time

Petru Constantin
--6 min lectura
#model serving#ML inference#MLOps#KServe#real-time ML#model deployment

Arhitectura de Model Serving: De la Batch la Inferenta Real-Time la scara

Antrenarea unui model bun e doar jumatate din provocare. Sa servesti acel model utilizatorilor in mod fiabil, cu latenta scazuta, la scara - acolo se blocheaza majoritatea proiectelor ML. Acest ghid acopera pattern-urile arhitecturale atat pentru batch cat si pentru model serving in timp real.

Alegerea pattern-ului de serving

| Pattern | Latenta | Throughput | Caz de utilizare | |---------|---------|-----------|----------| | Batch | Minute-ore | Foarte mare | Rapoarte, recomandari, scoring | | Real-time REST | 50-200ms | Mediu | API-uri web/mobil | | Real-time gRPC | 10-50ms | Mare | Servicii interne | | Streaming | Sub-secunda | Continuu | Detectie frauda, pricing in timp real | | Edge | < 10ms | Per dispozitiv | IoT, on-device mobil |

Pattern 1: Serving real-time cu FastAPI

Pentru echipele la inceput sau care servesc modele mai simple, un layer de serving bazat pe FastAPI ofera flexibilitate maxima:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
import mlflow.pyfunc
import numpy as np
import time
import logging
from contextlib import asynccontextmanager
from prometheus_client import Histogram, Counter, generate_latest
 
# Metrics
PREDICTION_LATENCY = Histogram("prediction_latency_seconds", "Time to generate prediction", ["model_name"])
PREDICTION_COUNT = Counter("prediction_total", "Total predictions", ["model_name", "status"])
 
# Model cache
models: dict = {}
 
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Load models on startup."""
    models["churn"] = mlflow.pyfunc.load_model("models:/churn-predictor/Production")
    models["pricing"] = mlflow.pyfunc.load_model("models:/dynamic-pricing/Production")
    logging.info(f"Loaded {len(models)} models")
    yield
    models.clear()
 
app = FastAPI(title="ML Serving API", lifespan=lifespan)
 
class PredictionRequest(BaseModel):
    features: dict[str, float] = Field(..., description="Feature name-value pairs")
    model_name: str = Field(default="churn", description="Model to use for prediction")
 
class PredictionResponse(BaseModel):
    prediction: float
    probability: float | None = None
    model_version: str
    latency_ms: float
 
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    if request.model_name not in models:
        raise HTTPException(404, f"Model '{request.model_name}' not found")
 
    model = models[request.model_name]
    start = time.perf_counter()
 
    try:
        import pandas as pd
        df = pd.DataFrame([request.features])
        result = model.predict(df)
 
        latency_ms = (time.perf_counter() - start) * 1000
        PREDICTION_LATENCY.labels(model_name=request.model_name).observe(latency_ms / 1000)
        PREDICTION_COUNT.labels(model_name=request.model_name, status="success").inc()
 
        return PredictionResponse(
            prediction=float(result[0]),
            model_version=getattr(model, "metadata", {}).get("run_id", "unknown"),
            latency_ms=round(latency_ms, 2),
        )
    except Exception as e:
        PREDICTION_COUNT.labels(model_name=request.model_name, status="error").inc()
        raise HTTPException(500, f"Prediction failed: {str(e)}")
 
@app.get("/health")
async def health():
    return {"status": "healthy", "models_loaded": list(models.keys())}
 
@app.get("/metrics")
async def metrics():
    from starlette.responses import Response
    return Response(content=generate_latest(), media_type="text/plain")

Pattern 2: KServe pe Kubernetes

Pentru serving de nivel productie cu auto-scaling, canary deployments si suport GPU:

# kserve-inference-service.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: churn-predictor
  namespace: ml-serving
  annotations:
    serving.kserve.io/autoscalerClass: hpa
spec:
  predictor:
    minReplicas: 2
    maxReplicas: 10
    scaleTarget: 70         # Scale up at 70% CPU
    scaleMetric: cpu
    model:
      modelFormat:
        name: mlflow
      storageUri: "s3://ml-models/churn-predictor/v2.3"
      resources:
        requests:
          cpu: "1"
          memory: "2Gi"
        limits:
          cpu: "2"
          memory: "4Gi"
    # Canary traffic split
    canaryTrafficPercent: 10
  transformer:
    containers:
      - name: feature-transformer
        image: registry.company.com/ml/feature-transformer:v1.2
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"

KServe cu Custom Predictor

import kserve
from kserve import Model, ModelServer
import joblib
import numpy as np
 
class ChurnPredictor(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.model = None
 
    def load(self):
        self.model = joblib.load("/mnt/models/model.joblib")
        self.ready = True
 
    def predict(self, payload: dict, headers: dict = None) -> dict:
        instances = payload.get("instances", [])
        features = np.array(instances)
 
        predictions = self.model.predict(features)
        probabilities = self.model.predict_proba(features)
 
        return {
            "predictions": predictions.tolist(),
            "probabilities": probabilities.tolist(),
        }
 
if __name__ == "__main__":
    model = ChurnPredictor("churn-predictor")
    model.load()
    ModelServer().start([model])

Pattern 3: Pipeline de inferenta batch

Pentru cazuri de utilizare in care real-time nu e necesar (recomandari, scoring zilnic):

from datetime import datetime
import pandas as pd
import mlflow.pyfunc
from concurrent.futures import ProcessPoolExecutor
 
class BatchInferencePipeline:
    """Score millions of records efficiently in batch."""
 
    def __init__(self, model_name: str, model_stage: str = "Production"):
        self.model = mlflow.pyfunc.load_model(f"models:/{model_name}/{model_stage}")
        self.model_name = model_name
 
    def score_batch(self, data: pd.DataFrame, batch_size: int = 10000) -> pd.DataFrame:
        """Score data in chunks to manage memory."""
        results = []
        total_rows = len(data)
 
        for i in range(0, total_rows, batch_size):
            chunk = data.iloc[i:i + batch_size]
            predictions = self.model.predict(chunk)
 
            chunk_result = chunk.copy()
            chunk_result["prediction"] = predictions
            chunk_result["scored_at"] = datetime.utcnow().isoformat()
            chunk_result["model_name"] = self.model_name
 
            results.append(chunk_result)
 
            progress = min(i + batch_size, total_rows) / total_rows * 100
            print(f"Progress: {progress:.1f}% ({min(i + batch_size, total_rows)}/{total_rows})")
 
        return pd.concat(results, ignore_index=True)
 
# Utilizare
pipeline = BatchInferencePipeline("churn-predictor")
customers = pd.read_parquet("s3://data/customers/latest.parquet")
scored = pipeline.score_batch(customers)
scored.to_parquet(f"s3://predictions/churn/{datetime.utcnow().strftime('%Y-%m-%d')}.parquet")

A/B Testing pentru versiuni de modele

import hashlib
from typing import Literal
 
class ABRouter:
    """Deterministic A/B routing for model experiments."""
 
    def __init__(self, experiments: dict[str, dict]):
        """
        experiments = {
            "control": {"model": model_a, "traffic": 0.80},
            "treatment": {"model": model_b, "traffic": 0.20},
        }
        """
        self.experiments = experiments
        self._validate_traffic()
 
    def _validate_traffic(self):
        total = sum(e["traffic"] for e in self.experiments.values())
        assert abs(total - 1.0) < 0.01, f"Traffic must sum to 1.0, got {total}"
 
    def route(self, entity_id: str) -> tuple[str, object]:
        """Deterministically assign entity to experiment variant."""
        # Hash entity ID for consistent assignment
        hash_val = int(hashlib.sha256(entity_id.encode()).hexdigest(), 16) % 10000
        normalized = hash_val / 10000
 
        cumulative = 0.0
        for variant_name, config in self.experiments.items():
            cumulative += config["traffic"]
            if normalized < cumulative:
                return variant_name, config["model"]
 
        # Fallback to last variant
        last_key = list(self.experiments.keys())[-1]
        return last_key, self.experiments[last_key]["model"]

Optimizarea modelelor pentru serving

Quantization si export ONNX

import torch
import onnx
import onnxruntime as ort
 
def optimize_pytorch_model(model, sample_input, output_path: str):
    """Export PyTorch model to ONNX for faster inference."""
    model.eval()
 
    # Export to ONNX
    torch.onnx.export(
        model,
        sample_input,
        output_path,
        export_params=True,
        opset_version=17,
        do_constant_folding=True,
        input_names=["input"],
        output_names=["output"],
        dynamic_axes={"input": {0: "batch_size"}, "output": {0: "batch_size"}},
    )
 
    # Validate
    onnx_model = onnx.load(output_path)
    onnx.checker.check_model(onnx_model)
    return output_path
 
def serve_onnx(model_path: str, features: np.ndarray) -> np.ndarray:
    """Run inference with ONNX Runtime, typically 2-5x faster than PyTorch."""
    session = ort.InferenceSession(model_path, providers=["CPUExecutionProvider"])
    input_name = session.get_inputs()[0].name
    result = session.run(None, {input_name: features.astype(np.float32)})
    return result[0]

Comparatie infrastructura de serving

| Framework | Protocol | Auto-scaling | GPU | Multi-model | Potrivit pentru | |-----------|----------|-------------|-----|-------------|----------| | FastAPI | REST | Manual/K8s HPA | Manual | Manual | Simplu, flexibil | | KServe | REST/gRPC | Built-in | Nativ | Built-in | Productie K8s | | Seldon Core | REST/gRPC | Built-in | Nativ | Built-in | Grafuri complexe | | BentoML | REST/gRPC | BentoCloud | Suportat | Built-in | Impachetare usoara | | TF Serving | REST/gRPC | Manual | Nativ | Built-in | Doar TensorFlow | | Triton | REST/gRPC | Built-in | Optimizat | Built-in | Inferenta GPU |

Monitorizarea infrastructurii de serving

Metrici cheie de urmarit in productie:

# Prometheus alerts for model serving
groups:
  - name: ml-serving
    rules:
      - alert: HighPredictionLatency
        expr: histogram_quantile(0.99, prediction_latency_seconds) > 0.2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "P99 prediction latency exceeds 200ms"
 
      - alert: HighErrorRate
        expr: rate(prediction_total{status="error"}[5m]) / rate(prediction_total[5m]) > 0.01
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Model error rate exceeds 1%"
 
      - alert: LowThroughput
        expr: rate(prediction_total[5m]) < 1
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Prediction throughput dropped below 1 req/s"

Resurse conexe


Ai nevoie de model serving in productie? DeviDevs construieste infrastructura ML de serving scalabila cu auto-scaling, A/B testing si monitorizare. Obtine o evaluare gratuita ->


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.