MLOps

Reantrenarea automata a modelelor: Cand, de ce si cum sa reantrenezi modelele ML

Petru Constantin
--6 min lectura
#model retraining#MLOps#continuous training#ML pipeline#model monitoring#production ML

Reantrenarea automata a modelelor: Cand, de ce si cum sa reantrenezi modelele ML

Modelele nu sunt software static. Datele pe care au fost antrenate devin invechite, comportamentul utilizatorilor se schimba, iar lumea evolueaza. Reantrenarea automata mentine modelele la zi, dar automatizarea naiva poate duce la deployerea unor modele mai slabe. Acest ghid acopera cum sa construiesti sisteme de reantrenare cu masuri de protectie adecvate.

Cand sa reantrenezi

Tipuri de declansatoare

| Declansator | Metoda de detectie | Urgenta | |-------------|-------------------|---------| | Programat | Cron (zilnic/saptamanal/lunar) | Scazuta | | Data drift | Teste statistice de drift | Medie | | Degradarea performantei | Monitorizare accuracy/F1 | Ridicata | | Prag de volum de date | Monitorizare numar de randuri | Scazuta | | Concept drift | Schimbare in distributia predictiilor | Ridicata | | Schimbare upstream | Schimbare versiune feature store | Medie |

Sistem inteligent de declansare

from datetime import datetime, timedelta
from dataclasses import dataclass
 
@dataclass
class RetrainingTrigger:
    name: str
    triggered: bool
    urgency: str  # "low" | "medium" | "high" | "critical"
    reason: str
    metadata: dict
 
class RetrainingDecisionEngine:
    """Decide when retraining is needed based on multiple signals."""
 
    def __init__(self, model_name: str, config: dict):
        self.model_name = model_name
        self.config = config
 
    def evaluate_triggers(self, monitoring_data: dict) -> list[RetrainingTrigger]:
        triggers = []
 
        # 1. Scheduled retraining
        last_trained = monitoring_data["last_training_date"]
        max_age = self.config.get("max_model_age_days", 30)
        age_days = (datetime.utcnow() - last_trained).days
        triggers.append(RetrainingTrigger(
            name="scheduled",
            triggered=age_days >= max_age,
            urgency="low",
            reason=f"Model is {age_days} days old (limit: {max_age})",
            metadata={"age_days": age_days, "limit": max_age},
        ))
 
        # 2. Data drift
        drift_score = monitoring_data.get("drift_score", 0)
        drift_threshold = self.config.get("drift_threshold", 0.15)
        triggers.append(RetrainingTrigger(
            name="data_drift",
            triggered=drift_score > drift_threshold,
            urgency="medium" if drift_score < 0.3 else "high",
            reason=f"Drift score {drift_score:.3f} exceeds threshold {drift_threshold}",
            metadata={"drift_score": drift_score, "drifted_features": monitoring_data.get("drifted_features", [])},
        ))
 
        # 3. Performance degradation
        current_accuracy = monitoring_data.get("current_accuracy", 1.0)
        baseline_accuracy = monitoring_data.get("baseline_accuracy", 1.0)
        perf_threshold = self.config.get("performance_drop_threshold", 0.03)
        drop = baseline_accuracy - current_accuracy
        triggers.append(RetrainingTrigger(
            name="performance_drop",
            triggered=drop > perf_threshold,
            urgency="high" if drop > perf_threshold * 2 else "medium",
            reason=f"Accuracy dropped from {baseline_accuracy:.3f} to {current_accuracy:.3f}",
            metadata={"drop": drop, "current": current_accuracy, "baseline": baseline_accuracy},
        ))
 
        # 4. Data volume
        new_samples = monitoring_data.get("new_samples_since_training", 0)
        volume_threshold = self.config.get("new_data_threshold", 50000)
        triggers.append(RetrainingTrigger(
            name="data_volume",
            triggered=new_samples >= volume_threshold,
            urgency="low",
            reason=f"{new_samples:,} new samples available (threshold: {volume_threshold:,})",
            metadata={"new_samples": new_samples},
        ))
 
        return triggers
 
    def should_retrain(self, monitoring_data: dict) -> tuple[bool, list[RetrainingTrigger]]:
        triggers = self.evaluate_triggers(monitoring_data)
        active = [t for t in triggers if t.triggered]
        return len(active) > 0, active

Arhitectura pipeline-ului de reantrenare

Semnale de monitorizare
       │
       ▼
┌──────────────────┐
│ Decision Engine   │──── Reantrenam? ────────────┐
└──────────────────┘                             │
       │ Da                                      │ Nu
       ▼                                         ▼
┌──────────────────┐                      ┌──────────┐
│ Pregatire date    │                      │  Log &   │
│ (date recente)    │                      │  Monitor │
└──────┬───────────┘                      └──────────┘
       │
       ▼
┌──────────────────┐
│ Calcul features   │
└──────┬───────────┘
       │
       ▼
┌──────────────────┐     ┌──────────────┐
│ Antrenare model   │────▶│ Poarta       │
│                   │     │ de calitate  │
└──────────────────┘     └──────┬───────┘
                                │
                    ┌───────────┼───────────┐
                    │ Trece                 │ Pica
                    ▼                       ▼
           ┌──────────────┐        ┌──────────────┐
           │ Stage model   │        │ Alerteaza    │
           │ (Registry)    │        │ echipa       │
           └──────┬───────┘        └──────────────┘
                  │
                  ▼
           ┌──────────────┐
           │ Deploy        │
           │ (Canary →     │
           │  Productie)   │
           └──────────────┘

Porti de calitate pentru reantrenarea automata

Nu deploya niciodata un model reantrenat fara validare:

class RetrainingQualityGate:
    """Validate retrained model before promotion."""
 
    def __init__(self, production_model, test_data, config: dict):
        self.production_model = production_model
        self.test_data = test_data
        self.config = config
 
    def run_all_checks(self, new_model) -> dict:
        results = {
            "passed": True,
            "checks": [],
        }
 
        checks = [
            self.check_minimum_performance(new_model),
            self.check_no_regression(new_model),
            self.check_prediction_stability(new_model),
            self.check_latency(new_model),
            self.check_model_size(new_model),
        ]
 
        for check in checks:
            results["checks"].append(check)
            if not check["passed"]:
                results["passed"] = False
 
        return results
 
    def check_minimum_performance(self, model) -> dict:
        from sklearn.metrics import accuracy_score, f1_score
        X = self.test_data.drop("target", axis=1)
        y = self.test_data["target"]
        y_pred = model.predict(X)
        accuracy = accuracy_score(y, y_pred)
        min_accuracy = self.config.get("min_accuracy", 0.85)
        return {
            "name": "minimum_performance",
            "passed": accuracy >= min_accuracy,
            "value": accuracy,
            "threshold": min_accuracy,
        }
 
    def check_no_regression(self, new_model) -> dict:
        from sklearn.metrics import f1_score
        X = self.test_data.drop("target", axis=1)
        y = self.test_data["target"]
        new_f1 = f1_score(y, new_model.predict(X), average="weighted")
        prod_f1 = f1_score(y, self.production_model.predict(X), average="weighted")
        max_regression = self.config.get("max_regression", 0.02)
        return {
            "name": "no_regression",
            "passed": new_f1 >= prod_f1 - max_regression,
            "new_f1": new_f1,
            "production_f1": prod_f1,
            "max_allowed_regression": max_regression,
        }
 
    def check_prediction_stability(self, model) -> dict:
        """Ensure prediction distribution hasn't shifted dramatically."""
        import numpy as np
        X = self.test_data.drop("target", axis=1)
        new_preds = model.predict_proba(X)[:, 1]
        prod_preds = self.production_model.predict_proba(X)[:, 1]
        correlation = np.corrcoef(new_preds, prod_preds)[0, 1]
        return {
            "name": "prediction_stability",
            "passed": correlation > 0.8,
            "correlation": correlation,
            "threshold": 0.8,
        }
 
    def check_latency(self, model) -> dict:
        import time, numpy as np
        X_single = self.test_data.drop("target", axis=1).head(1)
        times = []
        for _ in range(100):
            start = time.perf_counter()
            model.predict(X_single)
            times.append((time.perf_counter() - start) * 1000)
        p99 = np.percentile(times, 99)
        max_latency = self.config.get("max_latency_p99_ms", 50)
        return {
            "name": "latency",
            "passed": p99 <= max_latency,
            "p99_ms": p99,
            "threshold_ms": max_latency,
        }
 
    def check_model_size(self, model) -> dict:
        import tempfile, os, joblib
        with tempfile.NamedTemporaryFile(suffix=".joblib") as f:
            joblib.dump(model, f.name)
            size_mb = os.path.getsize(f.name) / (1024 * 1024)
        max_size = self.config.get("max_model_size_mb", 500)
        return {
            "name": "model_size",
            "passed": size_mb <= max_size,
            "size_mb": size_mb,
            "max_mb": max_size,
        }

Deployment sigur dupa reantrenare

class SafeRetrainingDeployment:
    """Deploy retrained models with automatic rollback."""
 
    async def deploy_with_monitoring(self, model_name: str, new_version: str):
        # Faza 1: Mod shadow (0% trafic utilizatori, logare completa)
        await self.deploy_shadow(model_name, new_version)
        shadow_metrics = await self.monitor_shadow(duration_minutes=60)
 
        if not shadow_metrics["acceptable"]:
            await self.rollback(model_name, reason="Shadow metrics unacceptable")
            return {"status": "rolled_back", "phase": "shadow"}
 
        # Faza 2: Canary (5% trafic)
        await self.deploy_canary(model_name, new_version, traffic_pct=5)
        canary_metrics = await self.monitor_canary(duration_minutes=30)
 
        if not canary_metrics["acceptable"]:
            await self.rollback(model_name, reason="Canary metrics degraded")
            return {"status": "rolled_back", "phase": "canary"}
 
        # Faza 3: Rollout progresiv
        for pct in [25, 50, 100]:
            await self.set_traffic(model_name, new_version, pct)
            metrics = await self.monitor(duration_minutes=15)
            if not metrics["acceptable"]:
                await self.rollback(model_name, reason=f"Degradation at {pct}%")
                return {"status": "rolled_back", "phase": f"rollout_{pct}"}
 
        return {"status": "deployed", "version": new_version}

Anti-pattern-uri de reantrenare

| Anti-pattern | De ce e rau | Abordare mai buna | |-------------|-------------|-------------------| | Reantrenare la fiecare schimbare de date | Risipeste compute, instabil | Foloseste praguri de drift | | Fara poarta de calitate | Poate deploya un model mai slab | Compara mereu cu modelul din productie | | Aceiasi hiperparametri | Parametrii vechi pot sa nu se potriveasca datelor noi | Include cautare de hiperparametri | | Deployment imediat la 100% | Fara plasa de siguranta | Shadow - Canary - Progresiv | | Fara plan de rollback | Nu poti recupera dupa un model prost | Pastreaza versiunea anterioara pregatita | | Ignorarea schimbarilor upstream | Schema de features se strica silentios | Valideaza features inainte de antrenare |

Resurse conexe


Ai nevoie de reantrenare automata a modelelor? DeviDevs implementeaza sisteme inteligente de reantrenare cu declansatoare bazate pe drift, porti de calitate si deployment sigur. Obtine o evaluare gratuita ->

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.