AI Security

Atacuri de Otravire a Modelelor AI si Apararea: Cum Iti Protejezi Pipeline-ul ML

Nicu Constantin
--13 min lectura
#AI security#model poisoning#adversarial ML#data security#machine learning

Atacuri de Otravire a Modelelor AI si Apararea: Cum Iti Protejezi Pipeline-ul ML

Atacurile de otravire a modelelor reprezinta una dintre cele mai insidioase amenintari la adresa sistemelor de machine learning. Spre deosebire de exemplele adversariale care ataca in momentul inferentei, atacurile de otravire compromit modelul in timpul antrenamentului, incorporand vulnerabilitati care persista pe toata durata de viata a modelului.

Intelegerea Atacurilor de Otravire a Modelelor

Taxonomia Atacurilor

# model_poisoning_taxonomy.yaml
poisoning_attacks:
  data_poisoning:
    description: "Manipulating training data to influence model behavior"
    types:
      - label_flipping: "Changing labels of training samples"
      - clean_label: "Adding poisoned samples with correct labels"
      - gradient_based: "Optimizing poison samples for maximum impact"
 
  model_poisoning:
    description: "Directly manipulating model parameters"
    types:
      - federated_learning: "Malicious updates in distributed training"
      - supply_chain: "Compromised pre-trained models"
      - fine_tuning: "Poisoning during transfer learning"
 
  backdoor_attacks:
    description: "Embedding hidden triggers in models"
    types:
      - static_trigger: "Fixed pattern triggers"
      - dynamic_trigger: "Context-dependent triggers"
      - semantic_trigger: "Natural feature triggers"
 
attack_goals:
  targeted: "Misclassify specific inputs"
  untargeted: "Degrade overall model performance"
  backdoor: "Activate specific behavior with trigger"

Atacuri de Otravire a Datelor

# data_poisoning_attacks.py
import numpy as np
from typing import Tuple, List, Optional
from dataclasses import dataclass
 
@dataclass
class PoisonedSample:
    """A poisoned training sample."""
    original_data: np.ndarray
    poisoned_data: np.ndarray
    original_label: int
    target_label: int
    poison_type: str
 
class DataPoisonAttacks:
    """Implement various data poisoning attacks for research/testing."""
 
    def __init__(self, model, data_shape: Tuple):
        self.model = model
        self.data_shape = data_shape
 
    def label_flip_attack(
        self,
        X: np.ndarray,
        y: np.ndarray,
        source_class: int,
        target_class: int,
        flip_ratio: float = 0.1
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Simple label flipping attack."""
 
        X_poisoned = X.copy()
        y_poisoned = y.copy()
 
        # Find samples of source class
        source_indices = np.where(y == source_class)[0]
        num_to_flip = int(len(source_indices) * flip_ratio)
 
        # Randomly select samples to flip
        flip_indices = np.random.choice(
            source_indices, num_to_flip, replace=False
        )
 
        # Flip labels
        y_poisoned[flip_indices] = target_class
 
        return X_poisoned, y_poisoned
 
    def clean_label_attack(
        self,
        X: np.ndarray,
        y: np.ndarray,
        target_class: int,
        num_poisons: int,
        trigger_pattern: np.ndarray
    ) -> Tuple[np.ndarray, np.ndarray, List[PoisonedSample]]:
        """Clean-label backdoor attack with trigger pattern."""
 
        poisons = []
 
        # Find samples of target class
        target_indices = np.where(y == target_class)[0]
        poison_indices = np.random.choice(
            target_indices, num_poisons, replace=False
        )
 
        X_poisoned = X.copy()
        y_poisoned = y.copy()
 
        for idx in poison_indices:
            original = X[idx].copy()
 
            # Add trigger pattern (e.g., small patch)
            poisoned = original.copy()
            poisoned = self._add_trigger(poisoned, trigger_pattern)
 
            X_poisoned[idx] = poisoned
 
            poisons.append(PoisonedSample(
                original_data=original,
                poisoned_data=poisoned,
                original_label=target_class,
                target_label=target_class,  # Label unchanged (clean-label)
                poison_type='clean_label_backdoor'
            ))
 
        return X_poisoned, y_poisoned, poisons
 
    def gradient_based_poison(
        self,
        X_train: np.ndarray,
        y_train: np.ndarray,
        X_target: np.ndarray,
        target_label: int,
        num_poisons: int,
        learning_rate: float = 0.01,
        iterations: int = 100
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Generate optimized poison samples using gradient descent."""
 
        # Initialize poison samples
        X_poison = np.random.randn(num_poisons, *self.data_shape)
        y_poison = np.full(num_poisons, target_label)
 
        for _ in range(iterations):
            # Compute gradient to maximize influence on target
            gradients = self._compute_influence_gradient(
                X_train, y_train, X_poison, X_target, target_label
            )
 
            # Update poison samples
            X_poison += learning_rate * gradients
 
            # Project back to valid input space
            X_poison = np.clip(X_poison, 0, 1)
 
        # Combine with original training data
        X_poisoned = np.concatenate([X_train, X_poison])
        y_poisoned = np.concatenate([y_train, y_poison])
 
        return X_poisoned, y_poisoned
 
    def _add_trigger(
        self,
        sample: np.ndarray,
        trigger: np.ndarray
    ) -> np.ndarray:
        """Add trigger pattern to sample."""
 
        poisoned = sample.copy()
 
        # Add trigger in corner (for image data)
        trigger_size = trigger.shape[0]
        poisoned[:trigger_size, :trigger_size] = trigger
 
        return poisoned
 
    def _compute_influence_gradient(
        self,
        X_train: np.ndarray,
        y_train: np.ndarray,
        X_poison: np.ndarray,
        X_target: np.ndarray,
        target_label: int
    ) -> np.ndarray:
        """Compute gradient for influence maximization."""
 
        # Simplified gradient computation
        # In practice, use influence functions or bilevel optimization
        return np.random.randn(*X_poison.shape)
 
 
class BackdoorAttacks:
    """Implement backdoor attacks for testing defenses."""
 
    def __init__(self, trigger_size: int = 5):
        self.trigger_size = trigger_size
 
    def create_static_trigger(self) -> np.ndarray:
        """Create a static trigger pattern."""
 
        # Simple checkerboard pattern
        trigger = np.zeros((self.trigger_size, self.trigger_size))
        trigger[::2, ::2] = 1
        trigger[1::2, 1::2] = 1
 
        return trigger
 
    def create_dynamic_trigger(self, context: np.ndarray) -> np.ndarray:
        """Create context-dependent trigger."""
 
        # Trigger adapts to input characteristics
        mean_value = np.mean(context)
 
        trigger = np.ones((self.trigger_size, self.trigger_size))
        trigger *= (1 - mean_value)  # Inverse of mean
 
        return trigger
 
    def poison_dataset_with_backdoor(
        self,
        X: np.ndarray,
        y: np.ndarray,
        target_class: int,
        poison_ratio: float = 0.1,
        trigger_type: str = 'static'
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Add backdoor to subset of training data."""
 
        num_samples = len(X)
        num_poison = int(num_samples * poison_ratio)
 
        # Select samples to poison (from non-target classes)
        non_target_indices = np.where(y != target_class)[0]
        poison_indices = np.random.choice(
            non_target_indices, num_poison, replace=False
        )
 
        X_poisoned = X.copy()
        y_poisoned = y.copy()
 
        for idx in poison_indices:
            if trigger_type == 'static':
                trigger = self.create_static_trigger()
            else:
                trigger = self.create_dynamic_trigger(X[idx])
 
            # Add trigger to sample
            X_poisoned[idx] = self._embed_trigger(X_poisoned[idx], trigger)
            y_poisoned[idx] = target_class
 
        return X_poisoned, y_poisoned
 
    def _embed_trigger(
        self,
        sample: np.ndarray,
        trigger: np.ndarray
    ) -> np.ndarray:
        """Embed trigger in sample."""
 
        poisoned = sample.copy()
 
        # Bottom-right corner
        h, w = trigger.shape[:2]
        if len(sample.shape) == 3:
            poisoned[-h:, -w:, :] = trigger[:, :, np.newaxis]
        else:
            poisoned[-h:, -w:] = trigger
 
        return poisoned

Mecanisme de Aparare

Sanitizarea Datelor

# data_sanitization.py
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.ensemble import IsolationForest
from typing import Tuple, List, Dict
 
class DataSanitizer:
    """Detect and remove poisoned samples from training data."""
 
    def __init__(self, contamination: float = 0.1):
        self.contamination = contamination
 
    def spectral_signatures(
        self,
        X: np.ndarray,
        y: np.ndarray,
        epsilon: float = 0.1
    ) -> Tuple[np.ndarray, np.ndarray, List[int]]:
        """Detect poisoned samples using spectral signatures."""
 
        suspicious_indices = []
 
        for class_label in np.unique(y):
            # Get samples for this class
            class_indices = np.where(y == class_label)[0]
            class_samples = X[class_indices]
 
            if len(class_samples) < 10:
                continue
 
            # Compute covariance matrix
            centered = class_samples - np.mean(class_samples, axis=0)
            flat_centered = centered.reshape(len(centered), -1)
            cov = np.cov(flat_centered.T)
 
            # Compute top singular vector
            _, _, Vh = np.linalg.svd(cov)
            top_vector = Vh[0]
 
            # Project samples onto top singular vector
            projections = np.dot(flat_centered, top_vector)
 
            # Samples with high projection are suspicious
            threshold = np.percentile(np.abs(projections), 100 * (1 - epsilon))
            class_suspicious = class_indices[np.abs(projections) > threshold]
 
            suspicious_indices.extend(class_suspicious)
 
        # Remove suspicious samples
        clean_mask = np.ones(len(X), dtype=bool)
        clean_mask[suspicious_indices] = False
 
        return X[clean_mask], y[clean_mask], suspicious_indices
 
    def activation_clustering(
        self,
        model,
        X: np.ndarray,
        y: np.ndarray,
        layer_name: str
    ) -> Tuple[np.ndarray, np.ndarray, List[int]]:
        """Detect backdoors using activation clustering."""
 
        suspicious_indices = []
 
        for class_label in np.unique(y):
            class_indices = np.where(y == class_label)[0]
            class_samples = X[class_indices]
 
            # Get activations from specified layer
            activations = self._get_activations(model, class_samples, layer_name)
 
            # Cluster activations
            clustering = DBSCAN(eps=0.5, min_samples=5).fit(activations)
 
            # Samples in small clusters are suspicious
            labels = clustering.labels_
            unique_labels, counts = np.unique(labels, return_counts=True)
 
            for label, count in zip(unique_labels, counts):
                if label == -1:  # Noise points
                    suspicious_in_class = class_indices[labels == -1]
                    suspicious_indices.extend(suspicious_in_class)
                elif count < 0.1 * len(class_indices):  # Small cluster
                    suspicious_in_class = class_indices[labels == label]
                    suspicious_indices.extend(suspicious_in_class)
 
        # Remove suspicious samples
        clean_mask = np.ones(len(X), dtype=bool)
        clean_mask[suspicious_indices] = False
 
        return X[clean_mask], y[clean_mask], suspicious_indices
 
    def isolation_forest_detection(
        self,
        X: np.ndarray,
        y: np.ndarray
    ) -> Tuple[np.ndarray, np.ndarray, List[int]]:
        """Detect outliers using Isolation Forest."""
 
        suspicious_indices = []
 
        for class_label in np.unique(y):
            class_indices = np.where(y == class_label)[0]
            class_samples = X[class_indices]
 
            if len(class_samples) < 10:
                continue
 
            # Flatten samples
            flat_samples = class_samples.reshape(len(class_samples), -1)
 
            # Fit Isolation Forest
            iso_forest = IsolationForest(
                contamination=self.contamination,
                random_state=42
            )
            predictions = iso_forest.fit_predict(flat_samples)
 
            # Outliers are suspicious
            outlier_mask = predictions == -1
            suspicious_in_class = class_indices[outlier_mask]
            suspicious_indices.extend(suspicious_in_class)
 
        clean_mask = np.ones(len(X), dtype=bool)
        clean_mask[suspicious_indices] = False
 
        return X[clean_mask], y[clean_mask], suspicious_indices
 
    def _get_activations(
        self,
        model,
        X: np.ndarray,
        layer_name: str
    ) -> np.ndarray:
        """Extract activations from a specific layer."""
 
        # Create activation extraction model
        from tensorflow.keras.models import Model
 
        layer_output = model.get_layer(layer_name).output
        activation_model = Model(inputs=model.input, outputs=layer_output)
 
        activations = activation_model.predict(X)
        return activations.reshape(len(X), -1)
 
 
class STRIP:
    """STRIP: A Defense Against Trojan Attacks."""
 
    def __init__(self, model, num_perturbations: int = 100):
        self.model = model
        self.num_perturbations = num_perturbations
 
    def detect(
        self,
        sample: np.ndarray,
        clean_samples: np.ndarray,
        threshold: float = 0.5
    ) -> Dict:
        """Detect if sample contains a backdoor trigger."""
 
        # Generate perturbed samples by blending with clean samples
        perturbed_predictions = []
 
        for _ in range(self.num_perturbations):
            # Random clean sample
            clean_idx = np.random.randint(len(clean_samples))
            clean_sample = clean_samples[clean_idx]
 
            # Blend samples
            alpha = np.random.uniform(0.3, 0.7)
            perturbed = alpha * sample + (1 - alpha) * clean_sample
 
            # Get prediction
            pred = self.model.predict(perturbed[np.newaxis, ...])[0]
            perturbed_predictions.append(pred)
 
        # Calculate entropy of predictions
        perturbed_predictions = np.array(perturbed_predictions)
        mean_pred = np.mean(perturbed_predictions, axis=0)
        entropy = -np.sum(mean_pred * np.log(mean_pred + 1e-10))
 
        # Low entropy indicates potential backdoor
        is_backdoor = entropy < threshold
 
        return {
            'is_backdoor': is_backdoor,
            'entropy': float(entropy),
            'threshold': threshold,
            'mean_prediction': mean_pred.tolist()
        }
 
 
class NeuralCleanse:
    """Neural Cleanse: Detecting and removing backdoors."""
 
    def __init__(self, model, input_shape: Tuple):
        self.model = model
        self.input_shape = input_shape
 
    def reverse_engineer_trigger(
        self,
        target_class: int,
        learning_rate: float = 0.1,
        iterations: int = 1000,
        regularization: float = 0.01
    ) -> Tuple[np.ndarray, np.ndarray, float]:
        """Reverse engineer potential trigger for target class."""
 
        # Initialize trigger and mask
        trigger = np.random.rand(*self.input_shape) * 0.1
        mask = np.random.rand(*self.input_shape) * 0.1
 
        for _ in range(iterations):
            # Compute gradient
            grad_trigger, grad_mask = self._compute_gradients(
                trigger, mask, target_class
            )
 
            # Update trigger and mask
            trigger -= learning_rate * (grad_trigger + regularization * trigger)
            mask -= learning_rate * (grad_mask + regularization * mask)
 
            # Constrain to valid range
            trigger = np.clip(trigger, 0, 1)
            mask = np.clip(mask, 0, 1)
 
        # Calculate trigger size (L1 norm of mask)
        trigger_size = np.sum(np.abs(mask))
 
        return trigger, mask, trigger_size
 
    def detect_backdoor(
        self,
        num_classes: int,
        anomaly_threshold: float = 2.0
    ) -> Dict:
        """Detect if model contains backdoor by analyzing trigger sizes."""
 
        trigger_sizes = []
 
        for class_idx in range(num_classes):
            _, _, size = self.reverse_engineer_trigger(class_idx)
            trigger_sizes.append(size)
 
        # Calculate median absolute deviation
        median = np.median(trigger_sizes)
        mad = np.median(np.abs(trigger_sizes - median))
 
        # Detect anomalies
        anomaly_scores = (trigger_sizes - median) / (mad + 1e-10)
        backdoor_classes = np.where(np.abs(anomaly_scores) > anomaly_threshold)[0]
 
        return {
            'has_backdoor': len(backdoor_classes) > 0,
            'backdoor_classes': backdoor_classes.tolist(),
            'trigger_sizes': trigger_sizes,
            'anomaly_scores': anomaly_scores.tolist()
        }
 
    def _compute_gradients(
        self,
        trigger: np.ndarray,
        mask: np.ndarray,
        target_class: int
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Compute gradients for trigger optimization."""
 
        # Simplified - in practice use TensorFlow/PyTorch gradients
        return np.random.randn(*trigger.shape), np.random.randn(*mask.shape)

Antrenament Robust

# robust_training.py
import numpy as np
from typing import List, Tuple, Optional
 
class RobustAggregation:
    """Robust aggregation methods for federated learning."""
 
    @staticmethod
    def krum(
        gradients: List[np.ndarray],
        num_byzantine: int
    ) -> np.ndarray:
        """Multi-Krum aggregation for Byzantine-robust training."""
 
        n = len(gradients)
        f = num_byzantine
        m = n - f - 2
 
        if m < 1:
            raise ValueError("Too many Byzantine nodes")
 
        # Calculate pairwise distances
        distances = np.zeros((n, n))
        for i in range(n):
            for j in range(i + 1, n):
                dist = np.linalg.norm(gradients[i] - gradients[j])
                distances[i, j] = dist
                distances[j, i] = dist
 
        # Calculate scores (sum of m closest distances)
        scores = []
        for i in range(n):
            sorted_distances = np.sort(distances[i])
            score = np.sum(sorted_distances[:m])
            scores.append(score)
 
        # Select m gradients with lowest scores
        selected_indices = np.argsort(scores)[:m]
        selected_gradients = [gradients[i] for i in selected_indices]
 
        # Average selected gradients
        return np.mean(selected_gradients, axis=0)
 
    @staticmethod
    def trimmed_mean(
        gradients: List[np.ndarray],
        trim_ratio: float = 0.1
    ) -> np.ndarray:
        """Coordinate-wise trimmed mean aggregation."""
 
        stacked = np.stack(gradients)
        n = len(gradients)
        trim_count = int(n * trim_ratio)
 
        # Sort along participant axis
        sorted_grads = np.sort(stacked, axis=0)
 
        # Trim and average
        trimmed = sorted_grads[trim_count:n-trim_count]
        return np.mean(trimmed, axis=0)
 
    @staticmethod
    def median(gradients: List[np.ndarray]) -> np.ndarray:
        """Coordinate-wise median aggregation."""
 
        stacked = np.stack(gradients)
        return np.median(stacked, axis=0)
 
 
class DifferentialPrivacyTraining:
    """Differential privacy for poison resistance."""
 
    def __init__(
        self,
        epsilon: float = 1.0,
        delta: float = 1e-5,
        max_grad_norm: float = 1.0
    ):
        self.epsilon = epsilon
        self.delta = delta
        self.max_grad_norm = max_grad_norm
 
    def clip_gradients(self, gradients: np.ndarray) -> np.ndarray:
        """Clip gradients to max norm."""
 
        grad_norm = np.linalg.norm(gradients)
 
        if grad_norm > self.max_grad_norm:
            gradients = gradients * (self.max_grad_norm / grad_norm)
 
        return gradients
 
    def add_noise(
        self,
        gradients: np.ndarray,
        batch_size: int
    ) -> np.ndarray:
        """Add Gaussian noise for differential privacy."""
 
        # Calculate noise scale
        sensitivity = 2 * self.max_grad_norm / batch_size
        sigma = sensitivity * np.sqrt(2 * np.log(1.25 / self.delta)) / self.epsilon
 
        # Add noise
        noise = np.random.normal(0, sigma, gradients.shape)
 
        return gradients + noise
 
    def private_training_step(
        self,
        model,
        X_batch: np.ndarray,
        y_batch: np.ndarray,
        learning_rate: float
    ):
        """Execute one private training step."""
 
        # Compute per-sample gradients
        sample_gradients = self._compute_per_sample_gradients(
            model, X_batch, y_batch
        )
 
        # Clip each gradient
        clipped_gradients = [
            self.clip_gradients(g) for g in sample_gradients
        ]
 
        # Average and add noise
        avg_gradient = np.mean(clipped_gradients, axis=0)
        private_gradient = self.add_noise(avg_gradient, len(X_batch))
 
        # Update model
        self._apply_gradients(model, private_gradient, learning_rate)
 
    def _compute_per_sample_gradients(
        self,
        model,
        X: np.ndarray,
        y: np.ndarray
    ) -> List[np.ndarray]:
        """Compute gradients for each sample individually."""
        # Implementation depends on framework
        pass
 
    def _apply_gradients(
        self,
        model,
        gradients: np.ndarray,
        learning_rate: float
    ):
        """Apply gradients to model parameters."""
        # Implementation depends on framework
        pass

Monitorizare si Detectie

Detectia Backdoor in Timpul Rularii

# runtime_detection.py
from dataclasses import dataclass
from typing import Dict, List, Optional
from collections import deque
import numpy as np
 
@dataclass
class PredictionLog:
    """Log entry for a prediction."""
    input_hash: str
    prediction: np.ndarray
    confidence: float
    timestamp: float
    flagged: bool
    flag_reason: Optional[str]
 
class RuntimeBackdoorDetector:
    """Runtime detection of backdoor activation."""
 
    def __init__(
        self,
        model,
        window_size: int = 1000,
        confidence_threshold: float = 0.99
    ):
        self.model = model
        self.window_size = window_size
        self.confidence_threshold = confidence_threshold
        self.prediction_history = deque(maxlen=window_size)
        self.class_distribution = {}
 
    def check_prediction(
        self,
        input_data: np.ndarray,
        prediction: np.ndarray
    ) -> Dict:
        """Check if prediction might be result of backdoor."""
 
        flags = []
 
        # Check 1: Unusually high confidence
        confidence = float(np.max(prediction))
        if confidence > self.confidence_threshold:
            flags.append({
                'type': 'high_confidence',
                'value': confidence,
                'threshold': self.confidence_threshold
            })
 
        # Check 2: Class distribution anomaly
        predicted_class = int(np.argmax(prediction))
        class_anomaly = self._check_class_distribution(predicted_class)
        if class_anomaly:
            flags.append(class_anomaly)
 
        # Check 3: Input anomaly
        input_anomaly = self._check_input_anomaly(input_data)
        if input_anomaly:
            flags.append(input_anomaly)
 
        # Log prediction
        log_entry = PredictionLog(
            input_hash=self._hash_input(input_data),
            prediction=prediction,
            confidence=confidence,
            timestamp=time.time(),
            flagged=len(flags) > 0,
            flag_reason=str(flags) if flags else None
        )
        self.prediction_history.append(log_entry)
 
        # Update class distribution
        self._update_class_distribution(predicted_class)
 
        return {
            'is_suspicious': len(flags) > 0,
            'flags': flags,
            'confidence': confidence,
            'predicted_class': predicted_class
        }
 
    def _check_class_distribution(self, predicted_class: int) -> Optional[Dict]:
        """Check for anomalies in class distribution."""
 
        if len(self.prediction_history) < 100:
            return None
 
        # Calculate recent distribution
        recent_predictions = [
            np.argmax(p.prediction)
            for p in list(self.prediction_history)[-100:]
        ]
 
        class_counts = {}
        for p in recent_predictions:
            class_counts[p] = class_counts.get(p, 0) + 1
 
        # Check if one class is dominating unexpectedly
        total = sum(class_counts.values())
        for cls, count in class_counts.items():
            ratio = count / total
            if ratio > 0.8:  # 80% of recent predictions
                return {
                    'type': 'class_distribution_anomaly',
                    'dominant_class': cls,
                    'ratio': ratio
                }
 
        return None
 
    def _check_input_anomaly(self, input_data: np.ndarray) -> Optional[Dict]:
        """Check for anomalies in input that might indicate trigger."""
 
        # Check for unusual patterns in corners (common trigger locations)
        corners = [
            input_data[:5, :5],      # Top-left
            input_data[:5, -5:],     # Top-right
            input_data[-5:, :5],     # Bottom-left
            input_data[-5:, -5:]     # Bottom-right
        ]
 
        for i, corner in enumerate(corners):
            # Check for high contrast patterns
            if np.std(corner) > 0.3 and np.mean(corner) > 0.5:
                return {
                    'type': 'suspicious_pattern',
                    'location': ['top-left', 'top-right', 'bottom-left', 'bottom-right'][i],
                    'std': float(np.std(corner)),
                    'mean': float(np.mean(corner))
                }
 
        return None
 
    def _hash_input(self, input_data: np.ndarray) -> str:
        """Create hash of input for logging."""
        import hashlib
        return hashlib.sha256(input_data.tobytes()).hexdigest()[:16]
 
    def _update_class_distribution(self, predicted_class: int):
        """Update running class distribution."""
        self.class_distribution[predicted_class] = \
            self.class_distribution.get(predicted_class, 0) + 1
 
    def get_statistics(self) -> Dict:
        """Get detection statistics."""
 
        flagged = sum(1 for p in self.prediction_history if p.flagged)
 
        return {
            'total_predictions': len(self.prediction_history),
            'flagged_predictions': flagged,
            'flag_rate': flagged / len(self.prediction_history) if self.prediction_history else 0,
            'class_distribution': dict(self.class_distribution)
        }

Concluzie

Apararea impotriva atacurilor de otravire a modelelor necesita o abordare stratificata:

  1. Validarea Datelor - Detecteaza si elimina esantioanele otravite inainte de antrenament
  2. Agregare Robusta - Foloseste metode rezistente la noduri bizantine in antrenamentul distribuit
  3. Differential Privacy - Adauga zgomot pentru a limita influenta esantioanelor individuale
  4. Monitorizare in Timpul Rularii - Detecteaza activarea backdoor-urilor la momentul inferentei
  5. Auditare Periodica - Testeaza modelele periodic pentru backdoor-uri incorporate

Prin implementarea acestor aparari, organizatiile pot reduce semnificativ riscul ca atacurile de otravire sa le compromita sistemele ML.

Resurse Conexe


Ai nevoie de ajutor in apararea impotriva otraviri modelelor? DeviDevs implementeaza platforme MLOps securizate cu validare a datelor si aparari adversariale integrate. Solicita o evaluare gratuita →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.