AI Model Poisoning Attacks and Defenses: Protecting Your ML Pipeline
Model poisoning attacks represent one of the most insidious threats to machine learning systems. Unlike adversarial examples that attack at inference time, poisoning attacks compromise the model during training, embedding vulnerabilities that persist throughout the model's lifetime.
Understanding Model Poisoning Attacks
Attack Taxonomy
# model_poisoning_taxonomy.yaml
poisoning_attacks:
data_poisoning:
description: "Manipulating training data to influence model behavior"
types:
- label_flipping: "Changing labels of training samples"
- clean_label: "Adding poisoned samples with correct labels"
- gradient_based: "Optimizing poison samples for maximum impact"
model_poisoning:
description: "Directly manipulating model parameters"
types:
- federated_learning: "Malicious updates in distributed training"
- supply_chain: "Compromised pre-trained models"
- fine_tuning: "Poisoning during transfer learning"
backdoor_attacks:
description: "Embedding hidden triggers in models"
types:
- static_trigger: "Fixed pattern triggers"
- dynamic_trigger: "Context-dependent triggers"
- semantic_trigger: "Natural feature triggers"
attack_goals:
targeted: "Misclassify specific inputs"
untargeted: "Degrade overall model performance"
backdoor: "Activate specific behavior with trigger"Data Poisoning Attacks
# data_poisoning_attacks.py
import numpy as np
from typing import Tuple, List, Optional
from dataclasses import dataclass
@dataclass
class PoisonedSample:
"""A poisoned training sample."""
original_data: np.ndarray
poisoned_data: np.ndarray
original_label: int
target_label: int
poison_type: str
class DataPoisonAttacks:
"""Implement various data poisoning attacks for research/testing."""
def __init__(self, model, data_shape: Tuple):
self.model = model
self.data_shape = data_shape
def label_flip_attack(
self,
X: np.ndarray,
y: np.ndarray,
source_class: int,
target_class: int,
flip_ratio: float = 0.1
) -> Tuple[np.ndarray, np.ndarray]:
"""Simple label flipping attack."""
X_poisoned = X.copy()
y_poisoned = y.copy()
# Find samples of source class
source_indices = np.where(y == source_class)[0]
num_to_flip = int(len(source_indices) * flip_ratio)
# Randomly select samples to flip
flip_indices = np.random.choice(
source_indices, num_to_flip, replace=False
)
# Flip labels
y_poisoned[flip_indices] = target_class
return X_poisoned, y_poisoned
def clean_label_attack(
self,
X: np.ndarray,
y: np.ndarray,
target_class: int,
num_poisons: int,
trigger_pattern: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, List[PoisonedSample]]:
"""Clean-label backdoor attack with trigger pattern."""
poisons = []
# Find samples of target class
target_indices = np.where(y == target_class)[0]
poison_indices = np.random.choice(
target_indices, num_poisons, replace=False
)
X_poisoned = X.copy()
y_poisoned = y.copy()
for idx in poison_indices:
original = X[idx].copy()
# Add trigger pattern (e.g., small patch)
poisoned = original.copy()
poisoned = self._add_trigger(poisoned, trigger_pattern)
X_poisoned[idx] = poisoned
poisons.append(PoisonedSample(
original_data=original,
poisoned_data=poisoned,
original_label=target_class,
target_label=target_class, # Label unchanged (clean-label)
poison_type='clean_label_backdoor'
))
return X_poisoned, y_poisoned, poisons
def gradient_based_poison(
self,
X_train: np.ndarray,
y_train: np.ndarray,
X_target: np.ndarray,
target_label: int,
num_poisons: int,
learning_rate: float = 0.01,
iterations: int = 100
) -> Tuple[np.ndarray, np.ndarray]:
"""Generate optimized poison samples using gradient descent."""
# Initialize poison samples
X_poison = np.random.randn(num_poisons, *self.data_shape)
y_poison = np.full(num_poisons, target_label)
for _ in range(iterations):
# Compute gradient to maximize influence on target
gradients = self._compute_influence_gradient(
X_train, y_train, X_poison, X_target, target_label
)
# Update poison samples
X_poison += learning_rate * gradients
# Project back to valid input space
X_poison = np.clip(X_poison, 0, 1)
# Combine with original training data
X_poisoned = np.concatenate([X_train, X_poison])
y_poisoned = np.concatenate([y_train, y_poison])
return X_poisoned, y_poisoned
def _add_trigger(
self,
sample: np.ndarray,
trigger: np.ndarray
) -> np.ndarray:
"""Add trigger pattern to sample."""
poisoned = sample.copy()
# Add trigger in corner (for image data)
trigger_size = trigger.shape[0]
poisoned[:trigger_size, :trigger_size] = trigger
return poisoned
def _compute_influence_gradient(
self,
X_train: np.ndarray,
y_train: np.ndarray,
X_poison: np.ndarray,
X_target: np.ndarray,
target_label: int
) -> np.ndarray:
"""Compute gradient for influence maximization."""
# Simplified gradient computation
# In practice, use influence functions or bilevel optimization
return np.random.randn(*X_poison.shape)
class BackdoorAttacks:
"""Implement backdoor attacks for testing defenses."""
def __init__(self, trigger_size: int = 5):
self.trigger_size = trigger_size
def create_static_trigger(self) -> np.ndarray:
"""Create a static trigger pattern."""
# Simple checkerboard pattern
trigger = np.zeros((self.trigger_size, self.trigger_size))
trigger[::2, ::2] = 1
trigger[1::2, 1::2] = 1
return trigger
def create_dynamic_trigger(self, context: np.ndarray) -> np.ndarray:
"""Create context-dependent trigger."""
# Trigger adapts to input characteristics
mean_value = np.mean(context)
trigger = np.ones((self.trigger_size, self.trigger_size))
trigger *= (1 - mean_value) # Inverse of mean
return trigger
def poison_dataset_with_backdoor(
self,
X: np.ndarray,
y: np.ndarray,
target_class: int,
poison_ratio: float = 0.1,
trigger_type: str = 'static'
) -> Tuple[np.ndarray, np.ndarray]:
"""Add backdoor to subset of training data."""
num_samples = len(X)
num_poison = int(num_samples * poison_ratio)
# Select samples to poison (from non-target classes)
non_target_indices = np.where(y != target_class)[0]
poison_indices = np.random.choice(
non_target_indices, num_poison, replace=False
)
X_poisoned = X.copy()
y_poisoned = y.copy()
for idx in poison_indices:
if trigger_type == 'static':
trigger = self.create_static_trigger()
else:
trigger = self.create_dynamic_trigger(X[idx])
# Add trigger to sample
X_poisoned[idx] = self._embed_trigger(X_poisoned[idx], trigger)
y_poisoned[idx] = target_class
return X_poisoned, y_poisoned
def _embed_trigger(
self,
sample: np.ndarray,
trigger: np.ndarray
) -> np.ndarray:
"""Embed trigger in sample."""
poisoned = sample.copy()
# Bottom-right corner
h, w = trigger.shape[:2]
if len(sample.shape) == 3:
poisoned[-h:, -w:, :] = trigger[:, :, np.newaxis]
else:
poisoned[-h:, -w:] = trigger
return poisonedDefense Mechanisms
Data Sanitization
# data_sanitization.py
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.ensemble import IsolationForest
from typing import Tuple, List, Dict
class DataSanitizer:
"""Detect and remove poisoned samples from training data."""
def __init__(self, contamination: float = 0.1):
self.contamination = contamination
def spectral_signatures(
self,
X: np.ndarray,
y: np.ndarray,
epsilon: float = 0.1
) -> Tuple[np.ndarray, np.ndarray, List[int]]:
"""Detect poisoned samples using spectral signatures."""
suspicious_indices = []
for class_label in np.unique(y):
# Get samples for this class
class_indices = np.where(y == class_label)[0]
class_samples = X[class_indices]
if len(class_samples) < 10:
continue
# Compute covariance matrix
centered = class_samples - np.mean(class_samples, axis=0)
flat_centered = centered.reshape(len(centered), -1)
cov = np.cov(flat_centered.T)
# Compute top singular vector
_, _, Vh = np.linalg.svd(cov)
top_vector = Vh[0]
# Project samples onto top singular vector
projections = np.dot(flat_centered, top_vector)
# Samples with high projection are suspicious
threshold = np.percentile(np.abs(projections), 100 * (1 - epsilon))
class_suspicious = class_indices[np.abs(projections) > threshold]
suspicious_indices.extend(class_suspicious)
# Remove suspicious samples
clean_mask = np.ones(len(X), dtype=bool)
clean_mask[suspicious_indices] = False
return X[clean_mask], y[clean_mask], suspicious_indices
def activation_clustering(
self,
model,
X: np.ndarray,
y: np.ndarray,
layer_name: str
) -> Tuple[np.ndarray, np.ndarray, List[int]]:
"""Detect backdoors using activation clustering."""
suspicious_indices = []
for class_label in np.unique(y):
class_indices = np.where(y == class_label)[0]
class_samples = X[class_indices]
# Get activations from specified layer
activations = self._get_activations(model, class_samples, layer_name)
# Cluster activations
clustering = DBSCAN(eps=0.5, min_samples=5).fit(activations)
# Samples in small clusters are suspicious
labels = clustering.labels_
unique_labels, counts = np.unique(labels, return_counts=True)
for label, count in zip(unique_labels, counts):
if label == -1: # Noise points
suspicious_in_class = class_indices[labels == -1]
suspicious_indices.extend(suspicious_in_class)
elif count < 0.1 * len(class_indices): # Small cluster
suspicious_in_class = class_indices[labels == label]
suspicious_indices.extend(suspicious_in_class)
# Remove suspicious samples
clean_mask = np.ones(len(X), dtype=bool)
clean_mask[suspicious_indices] = False
return X[clean_mask], y[clean_mask], suspicious_indices
def isolation_forest_detection(
self,
X: np.ndarray,
y: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, List[int]]:
"""Detect outliers using Isolation Forest."""
suspicious_indices = []
for class_label in np.unique(y):
class_indices = np.where(y == class_label)[0]
class_samples = X[class_indices]
if len(class_samples) < 10:
continue
# Flatten samples
flat_samples = class_samples.reshape(len(class_samples), -1)
# Fit Isolation Forest
iso_forest = IsolationForest(
contamination=self.contamination,
random_state=42
)
predictions = iso_forest.fit_predict(flat_samples)
# Outliers are suspicious
outlier_mask = predictions == -1
suspicious_in_class = class_indices[outlier_mask]
suspicious_indices.extend(suspicious_in_class)
clean_mask = np.ones(len(X), dtype=bool)
clean_mask[suspicious_indices] = False
return X[clean_mask], y[clean_mask], suspicious_indices
def _get_activations(
self,
model,
X: np.ndarray,
layer_name: str
) -> np.ndarray:
"""Extract activations from a specific layer."""
# Create activation extraction model
from tensorflow.keras.models import Model
layer_output = model.get_layer(layer_name).output
activation_model = Model(inputs=model.input, outputs=layer_output)
activations = activation_model.predict(X)
return activations.reshape(len(X), -1)
class STRIP:
"""STRIP: A Defense Against Trojan Attacks."""
def __init__(self, model, num_perturbations: int = 100):
self.model = model
self.num_perturbations = num_perturbations
def detect(
self,
sample: np.ndarray,
clean_samples: np.ndarray,
threshold: float = 0.5
) -> Dict:
"""Detect if sample contains a backdoor trigger."""
# Generate perturbed samples by blending with clean samples
perturbed_predictions = []
for _ in range(self.num_perturbations):
# Random clean sample
clean_idx = np.random.randint(len(clean_samples))
clean_sample = clean_samples[clean_idx]
# Blend samples
alpha = np.random.uniform(0.3, 0.7)
perturbed = alpha * sample + (1 - alpha) * clean_sample
# Get prediction
pred = self.model.predict(perturbed[np.newaxis, ...])[0]
perturbed_predictions.append(pred)
# Calculate entropy of predictions
perturbed_predictions = np.array(perturbed_predictions)
mean_pred = np.mean(perturbed_predictions, axis=0)
entropy = -np.sum(mean_pred * np.log(mean_pred + 1e-10))
# Low entropy indicates potential backdoor
is_backdoor = entropy < threshold
return {
'is_backdoor': is_backdoor,
'entropy': float(entropy),
'threshold': threshold,
'mean_prediction': mean_pred.tolist()
}
class NeuralCleanse:
"""Neural Cleanse: Detecting and removing backdoors."""
def __init__(self, model, input_shape: Tuple):
self.model = model
self.input_shape = input_shape
def reverse_engineer_trigger(
self,
target_class: int,
learning_rate: float = 0.1,
iterations: int = 1000,
regularization: float = 0.01
) -> Tuple[np.ndarray, np.ndarray, float]:
"""Reverse engineer potential trigger for target class."""
# Initialize trigger and mask
trigger = np.random.rand(*self.input_shape) * 0.1
mask = np.random.rand(*self.input_shape) * 0.1
for _ in range(iterations):
# Compute gradient
grad_trigger, grad_mask = self._compute_gradients(
trigger, mask, target_class
)
# Update trigger and mask
trigger -= learning_rate * (grad_trigger + regularization * trigger)
mask -= learning_rate * (grad_mask + regularization * mask)
# Constrain to valid range
trigger = np.clip(trigger, 0, 1)
mask = np.clip(mask, 0, 1)
# Calculate trigger size (L1 norm of mask)
trigger_size = np.sum(np.abs(mask))
return trigger, mask, trigger_size
def detect_backdoor(
self,
num_classes: int,
anomaly_threshold: float = 2.0
) -> Dict:
"""Detect if model contains backdoor by analyzing trigger sizes."""
trigger_sizes = []
for class_idx in range(num_classes):
_, _, size = self.reverse_engineer_trigger(class_idx)
trigger_sizes.append(size)
# Calculate median absolute deviation
median = np.median(trigger_sizes)
mad = np.median(np.abs(trigger_sizes - median))
# Detect anomalies
anomaly_scores = (trigger_sizes - median) / (mad + 1e-10)
backdoor_classes = np.where(np.abs(anomaly_scores) > anomaly_threshold)[0]
return {
'has_backdoor': len(backdoor_classes) > 0,
'backdoor_classes': backdoor_classes.tolist(),
'trigger_sizes': trigger_sizes,
'anomaly_scores': anomaly_scores.tolist()
}
def _compute_gradients(
self,
trigger: np.ndarray,
mask: np.ndarray,
target_class: int
) -> Tuple[np.ndarray, np.ndarray]:
"""Compute gradients for trigger optimization."""
# Simplified - in practice use TensorFlow/PyTorch gradients
return np.random.randn(*trigger.shape), np.random.randn(*mask.shape)Robust Training
# robust_training.py
import numpy as np
from typing import List, Tuple, Optional
class RobustAggregation:
"""Robust aggregation methods for federated learning."""
@staticmethod
def krum(
gradients: List[np.ndarray],
num_byzantine: int
) -> np.ndarray:
"""Multi-Krum aggregation for Byzantine-robust training."""
n = len(gradients)
f = num_byzantine
m = n - f - 2
if m < 1:
raise ValueError("Too many Byzantine nodes")
# Calculate pairwise distances
distances = np.zeros((n, n))
for i in range(n):
for j in range(i + 1, n):
dist = np.linalg.norm(gradients[i] - gradients[j])
distances[i, j] = dist
distances[j, i] = dist
# Calculate scores (sum of m closest distances)
scores = []
for i in range(n):
sorted_distances = np.sort(distances[i])
score = np.sum(sorted_distances[:m])
scores.append(score)
# Select m gradients with lowest scores
selected_indices = np.argsort(scores)[:m]
selected_gradients = [gradients[i] for i in selected_indices]
# Average selected gradients
return np.mean(selected_gradients, axis=0)
@staticmethod
def trimmed_mean(
gradients: List[np.ndarray],
trim_ratio: float = 0.1
) -> np.ndarray:
"""Coordinate-wise trimmed mean aggregation."""
stacked = np.stack(gradients)
n = len(gradients)
trim_count = int(n * trim_ratio)
# Sort along participant axis
sorted_grads = np.sort(stacked, axis=0)
# Trim and average
trimmed = sorted_grads[trim_count:n-trim_count]
return np.mean(trimmed, axis=0)
@staticmethod
def median(gradients: List[np.ndarray]) -> np.ndarray:
"""Coordinate-wise median aggregation."""
stacked = np.stack(gradients)
return np.median(stacked, axis=0)
class DifferentialPrivacyTraining:
"""Differential privacy for poison resistance."""
def __init__(
self,
epsilon: float = 1.0,
delta: float = 1e-5,
max_grad_norm: float = 1.0
):
self.epsilon = epsilon
self.delta = delta
self.max_grad_norm = max_grad_norm
def clip_gradients(self, gradients: np.ndarray) -> np.ndarray:
"""Clip gradients to max norm."""
grad_norm = np.linalg.norm(gradients)
if grad_norm > self.max_grad_norm:
gradients = gradients * (self.max_grad_norm / grad_norm)
return gradients
def add_noise(
self,
gradients: np.ndarray,
batch_size: int
) -> np.ndarray:
"""Add Gaussian noise for differential privacy."""
# Calculate noise scale
sensitivity = 2 * self.max_grad_norm / batch_size
sigma = sensitivity * np.sqrt(2 * np.log(1.25 / self.delta)) / self.epsilon
# Add noise
noise = np.random.normal(0, sigma, gradients.shape)
return gradients + noise
def private_training_step(
self,
model,
X_batch: np.ndarray,
y_batch: np.ndarray,
learning_rate: float
):
"""Execute one private training step."""
# Compute per-sample gradients
sample_gradients = self._compute_per_sample_gradients(
model, X_batch, y_batch
)
# Clip each gradient
clipped_gradients = [
self.clip_gradients(g) for g in sample_gradients
]
# Average and add noise
avg_gradient = np.mean(clipped_gradients, axis=0)
private_gradient = self.add_noise(avg_gradient, len(X_batch))
# Update model
self._apply_gradients(model, private_gradient, learning_rate)
def _compute_per_sample_gradients(
self,
model,
X: np.ndarray,
y: np.ndarray
) -> List[np.ndarray]:
"""Compute gradients for each sample individually."""
# Implementation depends on framework
pass
def _apply_gradients(
self,
model,
gradients: np.ndarray,
learning_rate: float
):
"""Apply gradients to model parameters."""
# Implementation depends on framework
passMonitoring and Detection
Runtime Backdoor Detection
# runtime_detection.py
from dataclasses import dataclass
from typing import Dict, List, Optional
from collections import deque
import numpy as np
@dataclass
class PredictionLog:
"""Log entry for a prediction."""
input_hash: str
prediction: np.ndarray
confidence: float
timestamp: float
flagged: bool
flag_reason: Optional[str]
class RuntimeBackdoorDetector:
"""Runtime detection of backdoor activation."""
def __init__(
self,
model,
window_size: int = 1000,
confidence_threshold: float = 0.99
):
self.model = model
self.window_size = window_size
self.confidence_threshold = confidence_threshold
self.prediction_history = deque(maxlen=window_size)
self.class_distribution = {}
def check_prediction(
self,
input_data: np.ndarray,
prediction: np.ndarray
) -> Dict:
"""Check if prediction might be result of backdoor."""
flags = []
# Check 1: Unusually high confidence
confidence = float(np.max(prediction))
if confidence > self.confidence_threshold:
flags.append({
'type': 'high_confidence',
'value': confidence,
'threshold': self.confidence_threshold
})
# Check 2: Class distribution anomaly
predicted_class = int(np.argmax(prediction))
class_anomaly = self._check_class_distribution(predicted_class)
if class_anomaly:
flags.append(class_anomaly)
# Check 3: Input anomaly
input_anomaly = self._check_input_anomaly(input_data)
if input_anomaly:
flags.append(input_anomaly)
# Log prediction
log_entry = PredictionLog(
input_hash=self._hash_input(input_data),
prediction=prediction,
confidence=confidence,
timestamp=time.time(),
flagged=len(flags) > 0,
flag_reason=str(flags) if flags else None
)
self.prediction_history.append(log_entry)
# Update class distribution
self._update_class_distribution(predicted_class)
return {
'is_suspicious': len(flags) > 0,
'flags': flags,
'confidence': confidence,
'predicted_class': predicted_class
}
def _check_class_distribution(self, predicted_class: int) -> Optional[Dict]:
"""Check for anomalies in class distribution."""
if len(self.prediction_history) < 100:
return None
# Calculate recent distribution
recent_predictions = [
np.argmax(p.prediction)
for p in list(self.prediction_history)[-100:]
]
class_counts = {}
for p in recent_predictions:
class_counts[p] = class_counts.get(p, 0) + 1
# Check if one class is dominating unexpectedly
total = sum(class_counts.values())
for cls, count in class_counts.items():
ratio = count / total
if ratio > 0.8: # 80% of recent predictions
return {
'type': 'class_distribution_anomaly',
'dominant_class': cls,
'ratio': ratio
}
return None
def _check_input_anomaly(self, input_data: np.ndarray) -> Optional[Dict]:
"""Check for anomalies in input that might indicate trigger."""
# Check for unusual patterns in corners (common trigger locations)
corners = [
input_data[:5, :5], # Top-left
input_data[:5, -5:], # Top-right
input_data[-5:, :5], # Bottom-left
input_data[-5:, -5:] # Bottom-right
]
for i, corner in enumerate(corners):
# Check for high contrast patterns
if np.std(corner) > 0.3 and np.mean(corner) > 0.5:
return {
'type': 'suspicious_pattern',
'location': ['top-left', 'top-right', 'bottom-left', 'bottom-right'][i],
'std': float(np.std(corner)),
'mean': float(np.mean(corner))
}
return None
def _hash_input(self, input_data: np.ndarray) -> str:
"""Create hash of input for logging."""
import hashlib
return hashlib.sha256(input_data.tobytes()).hexdigest()[:16]
def _update_class_distribution(self, predicted_class: int):
"""Update running class distribution."""
self.class_distribution[predicted_class] = \
self.class_distribution.get(predicted_class, 0) + 1
def get_statistics(self) -> Dict:
"""Get detection statistics."""
flagged = sum(1 for p in self.prediction_history if p.flagged)
return {
'total_predictions': len(self.prediction_history),
'flagged_predictions': flagged,
'flag_rate': flagged / len(self.prediction_history) if self.prediction_history else 0,
'class_distribution': dict(self.class_distribution)
}Conclusion
Defending against model poisoning attacks requires a multi-layered approach:
- Data Validation - Detect and remove poisoned samples before training
- Robust Aggregation - Use Byzantine-robust methods in distributed training
- Differential Privacy - Add noise to limit influence of individual samples
- Runtime Monitoring - Detect backdoor activation at inference time
- Regular Auditing - Periodically test models for embedded backdoors
By implementing these defenses, organizations can significantly reduce the risk of poisoning attacks compromising their ML systems.
Related Resources
- MLOps Security: Securing Your ML Pipeline — Data poisoning detection in production pipelines
- Model Monitoring in Production: Detecting Data Drift and Model Degradation — Detect anomalies post-deployment
- Data Versioning for ML: DVC, lakeFS, and Delta Lake — Track and verify training data integrity
- Common MLOps Mistakes and How to Avoid Them — Including ignoring data quality
- What is MLOps? — Complete MLOps overview
Need help defending against model poisoning? DeviDevs implements secure MLOps platforms with data validation and adversarial defenses built in. Get a free assessment →