MLOps

De la Jupyter Notebook la Productie: Ghid Practic de Migrare MLOps

Petru Constantin
--6 min lectura
#Jupyter#production ML#MLOps#ML pipeline#refactoring#model deployment

De la Jupyter Notebook la Productie: Ghid Practic de Migrare MLOps

Modelul tau functioneaza intr-un notebook. Acum trebuie sa il duci in productie. Aceasta este cea mai frecventa tranzitie in ingineria ML si exact aici se blocheaza majoritatea proiectelor. Acest ghid ofera o cale pas cu pas de la prototipul din notebook la un pipeline ML gata de productie.

Pasul 1: Extrage functii din celule

Inainte (Notebook):

# Cell 1
import pandas as pd
df = pd.read_csv("data.csv")
df = df.dropna()
df["age_bucket"] = pd.cut(df["age"], bins=[0, 25, 45, 65, 100])
 
# Cell 2
from sklearn.model_selection import train_test_split
X = df.drop("target", axis=1)
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
 
# Cell 3
from sklearn.ensemble import GradientBoostingClassifier
model = GradientBoostingClassifier(n_estimators=100, max_depth=5)
model.fit(X_train, y_train)
print(f"Accuracy: {model.score(X_test, y_test)}")

Dupa (Python Modular):

# src/data.py
import pandas as pd
 
def load_data(path: str) -> pd.DataFrame:
    """Load and clean training data."""
    df = pd.read_csv(path)
    df = df.dropna()
    return df
 
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
    """Apply feature engineering."""
    df = df.copy()
    df["age_bucket"] = pd.cut(df["age"], bins=[0, 25, 45, 65, 100])
    return df
 
def split_data(df: pd.DataFrame, target: str, test_size: float = 0.2, random_state: int = 42):
    """Split data into train and test sets."""
    from sklearn.model_selection import train_test_split
    X = df.drop(target, axis=1)
    y = df[target]
    return train_test_split(X, y, test_size=test_size, random_state=random_state)
# src/train.py
import mlflow
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score, f1_score
from src.data import load_data, engineer_features, split_data
 
def train(data_path: str, params: dict) -> dict:
    """Train model and return metrics."""
    # Load and prepare data
    df = load_data(data_path)
    df = engineer_features(df)
    X_train, X_test, y_train, y_test = split_data(df, "target")
 
    # Train
    mlflow.set_experiment("churn-predictor")
    with mlflow.start_run():
        mlflow.log_params(params)
 
        model = GradientBoostingClassifier(**params)
        model.fit(X_train, y_train)
 
        y_pred = model.predict(X_test)
        metrics = {
            "accuracy": accuracy_score(y_test, y_pred),
            "f1": f1_score(y_test, y_pred, average="weighted"),
        }
        mlflow.log_metrics(metrics)
        mlflow.sklearn.log_model(model, "model", registered_model_name="churn-predictor")
 
    return metrics
 
if __name__ == "__main__":
    params = {"n_estimators": 100, "max_depth": 5, "random_state": 42}
    metrics = train("data/training.csv", params)
    print(f"Metrics: {metrics}")

Pasul 2: Adauga configurare

Muta valorile hardcodate in fisiere de configurare:

# configs/training.yaml
data:
  path: "data/training.csv"
  target_column: "target"
  test_size: 0.2
 
model:
  type: "gradient_boosting"
  params:
    n_estimators: 100
    max_depth: 5
    learning_rate: 0.1
    random_state: 42
 
experiment:
  name: "churn-predictor"
  tracking_uri: "http://mlflow:5000"
# src/config.py
import yaml
from dataclasses import dataclass
 
@dataclass
class TrainingConfig:
    data_path: str
    target_column: str
    test_size: float
    model_params: dict
    experiment_name: str
 
    @classmethod
    def from_yaml(cls, path: str) -> "TrainingConfig":
        with open(path) as f:
            cfg = yaml.safe_load(f)
        return cls(
            data_path=cfg["data"]["path"],
            target_column=cfg["data"]["target_column"],
            test_size=cfg["data"]["test_size"],
            model_params=cfg["model"]["params"],
            experiment_name=cfg["experiment"]["name"],
        )

Pasul 3: Adauga teste

# tests/test_data.py
import pytest
import pandas as pd
from src.data import load_data, engineer_features, split_data
 
def test_load_data_removes_nulls(tmp_path):
    # Create test CSV with nulls
    df = pd.DataFrame({"age": [25, None, 35], "target": [0, 1, 0]})
    path = tmp_path / "test.csv"
    df.to_csv(path, index=False)
 
    result = load_data(str(path))
    assert result.isnull().sum().sum() == 0
    assert len(result) == 2  # Null row removed
 
def test_split_data_proportions():
    df = pd.DataFrame({
        "feature": range(100),
        "target": [0, 1] * 50,
    })
    X_train, X_test, y_train, y_test = split_data(df, "target", test_size=0.2)
    assert len(X_test) == 20
    assert len(X_train) == 80
 
def test_split_data_no_leakage():
    df = pd.DataFrame({
        "id": range(100),
        "feature": range(100),
        "target": [0, 1] * 50,
    })
    X_train, X_test, _, _ = split_data(df, "target")
    train_ids = set(X_train["id"])
    test_ids = set(X_test["id"])
    assert train_ids.isdisjoint(test_ids)
# tests/test_model.py
import pytest
import numpy as np
 
def test_model_accuracy_above_threshold(trained_model, test_data):
    from sklearn.metrics import accuracy_score
    y_pred = trained_model.predict(test_data["X"])
    accuracy = accuracy_score(test_data["y"], y_pred)
    assert accuracy >= 0.80
 
def test_model_prediction_shape(trained_model, test_data):
    predictions = trained_model.predict(test_data["X"])
    assert predictions.shape == test_data["y"].shape
 
def test_model_handles_single_sample(trained_model):
    single = np.random.randn(1, trained_model.n_features_in_)
    result = trained_model.predict(single)
    assert result.shape == (1,)

Pasul 4: Adauga un layer de serving

# src/serve.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.pyfunc
 
app = FastAPI(title="Churn Prediction API")
model = None
 
@app.on_event("startup")
def load_model():
    global model
    model = mlflow.pyfunc.load_model("models:/churn-predictor/Production")
 
class PredictionRequest(BaseModel):
    features: dict[str, float]
 
class PredictionResponse(BaseModel):
    churn_probability: float
    prediction: int
 
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    import pandas as pd
    df = pd.DataFrame([request.features])
    prediction = model.predict(df)
    return PredictionResponse(
        churn_probability=float(prediction[0]),
        prediction=int(prediction[0] > 0.5),
    )
 
@app.get("/health")
async def health():
    return {"status": "healthy", "model_loaded": model is not None}

Pasul 5: Containerizeaza

# Dockerfile
FROM python:3.11-slim
 
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
COPY src/ src/
COPY configs/ configs/
 
# For serving
EXPOSE 8080
CMD ["uvicorn", "src.serve:app", "--host", "0.0.0.0", "--port", "8080"]
# docker-compose.yaml
services:
  mlflow:
    image: ghcr.io/mlflow/mlflow:latest
    ports: ["5000:5000"]
    environment:
      - BACKEND_STORE_URI=sqlite:///mlflow.db
 
  training:
    build: .
    command: python -m src.train
    volumes: ["./data:/app/data", "./configs:/app/configs"]
    depends_on: [mlflow]
 
  serving:
    build: .
    ports: ["8080:8080"]
    depends_on: [mlflow]

Pasul 6: Adauga CI/CD

# .github/workflows/ml-pipeline.yml
name: ML Pipeline
on:
  push:
    paths: ["src/**", "configs/**", "tests/**"]
 
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: { python-version: "3.11" }
      - run: pip install -r requirements.txt -r requirements-dev.txt
      - run: pytest tests/ -v
 
  train:
    needs: test
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install -r requirements.txt
      - run: python -m src.train
      - run: python -m src.evaluate
 
  deploy:
    needs: train
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - run: docker build -t churn-api:latest .
      - run: docker push registry.company.com/churn-api:latest

Checklist-ul complet de migrare

| Pas | Notebook | Productie | Gata | |-----|----------|-----------|------| | 1 | Cod inline in celule | Functii modulare in fisiere .py | | | 2 | Valori hardcodate | Configurare YAML | | | 3 | Instructiuni print | MLflow experiment tracking | | | 4 | Incarcare manuala a datelor | Versionarea datelor (DVC) | | | 5 | Fara teste | Suita pytest (date + model) | | | 6 | Fara API | Endpoint FastAPI de serving | | | 7 | Executie locala | Containerizat (Docker) | | | 8 | Deployment manual | Pipeline CI/CD | | | 9 | Fara monitorizare | Detectie drift + alerte | |

Resurse conexe


Ai nevoie de ajutor cu migrarea de la notebooks la productie? DeviDevs construieste pipeline-uri ML de productie de la prototip la deployment. Obtine o evaluare gratuita →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.