AI Bias Detection and Mitigation: Building Fair Machine Learning Systems
Bias in AI systems can perpetuate discrimination and cause significant harm. This guide covers practical techniques for detecting, measuring, and mitigating bias throughout the ML lifecycle to build fairer, more equitable systems.
Understanding AI Bias
AI bias manifests in multiple forms:
- Historical bias: Training data reflects past discrimination
- Representation bias: Underrepresentation of certain groups
- Measurement bias: Features that proxy for protected attributes
- Aggregation bias: One model for diverse subpopulations
- Evaluation bias: Benchmarks don't represent deployment context
Fairness Metrics Implementation
# fairness_metrics.py
"""
Comprehensive fairness metrics for ML model evaluation.
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from scipy import stats
@dataclass
class FairnessReport:
"""Complete fairness evaluation report."""
metric_name: str
overall_value: float
group_values: Dict[str, float]
disparities: Dict[str, float]
passes_threshold: bool
threshold: float
recommendations: List[str]
class FairnessEvaluator:
"""
Evaluate model fairness across protected groups.
"""
def __init__(
self,
protected_attribute: str,
favorable_label: int = 1,
unfavorable_label: int = 0
):
self.protected_attribute = protected_attribute
self.favorable_label = favorable_label
self.unfavorable_label = unfavorable_label
def demographic_parity(
self,
y_pred: np.ndarray,
protected_groups: np.ndarray,
threshold: float = 0.1
) -> FairnessReport:
"""
Demographic parity: P(Y_pred=1 | A=a) should be equal across groups.
Also known as statistical parity or group fairness.
"""
groups = np.unique(protected_groups)
selection_rates = {}
for group in groups:
mask = protected_groups == group
rate = np.mean(y_pred[mask] == self.favorable_label)
selection_rates[str(group)] = rate
overall_rate = np.mean(y_pred == self.favorable_label)
# Calculate disparities
disparities = {}
max_disparity = 0
for group, rate in selection_rates.items():
disparity = abs(rate - overall_rate)
disparities[group] = disparity
max_disparity = max(max_disparity, disparity)
recommendations = []
if max_disparity > threshold:
worst_group = max(disparities, key=disparities.get)
recommendations.append(
f"Group '{worst_group}' has highest disparity ({disparities[worst_group]:.3f})"
)
recommendations.append("Consider resampling or reweighting training data")
return FairnessReport(
metric_name="Demographic Parity",
overall_value=overall_rate,
group_values=selection_rates,
disparities=disparities,
passes_threshold=max_disparity <= threshold,
threshold=threshold,
recommendations=recommendations
)
def equalized_odds(
self,
y_true: np.ndarray,
y_pred: np.ndarray,
protected_groups: np.ndarray,
threshold: float = 0.1
) -> FairnessReport:
"""
Equalized odds: TPR and FPR should be equal across groups.
P(Y_pred=1 | Y=y, A=a) should be equal for all groups and y in {0,1}
"""
groups = np.unique(protected_groups)
tpr_values = {}
fpr_values = {}
for group in groups:
mask = protected_groups == group
# True Positive Rate
pos_mask = (y_true == self.favorable_label) & mask
if np.sum(pos_mask) > 0:
tpr = np.mean(y_pred[pos_mask] == self.favorable_label)
else:
tpr = 0
tpr_values[str(group)] = tpr
# False Positive Rate
neg_mask = (y_true == self.unfavorable_label) & mask
if np.sum(neg_mask) > 0:
fpr = np.mean(y_pred[neg_mask] == self.favorable_label)
else:
fpr = 0
fpr_values[str(group)] = fpr
# Calculate disparities
tpr_disparity = max(tpr_values.values()) - min(tpr_values.values())
fpr_disparity = max(fpr_values.values()) - min(fpr_values.values())
disparities = {
'tpr_disparity': tpr_disparity,
'fpr_disparity': fpr_disparity
}
max_disparity = max(tpr_disparity, fpr_disparity)
recommendations = []
if tpr_disparity > threshold:
recommendations.append(f"TPR disparity ({tpr_disparity:.3f}) exceeds threshold")
if fpr_disparity > threshold:
recommendations.append(f"FPR disparity ({fpr_disparity:.3f}) exceeds threshold")
return FairnessReport(
metric_name="Equalized Odds",
overall_value=max_disparity,
group_values={'tpr': tpr_values, 'fpr': fpr_values},
disparities=disparities,
passes_threshold=max_disparity <= threshold,
threshold=threshold,
recommendations=recommendations
)
def predictive_parity(
self,
y_true: np.ndarray,
y_pred: np.ndarray,
protected_groups: np.ndarray,
threshold: float = 0.1
) -> FairnessReport:
"""
Predictive parity: Precision should be equal across groups.
P(Y=1 | Y_pred=1, A=a) should be equal for all groups.
"""
groups = np.unique(protected_groups)
precision_values = {}
for group in groups:
mask = protected_groups == group
pred_pos = (y_pred == self.favorable_label) & mask
if np.sum(pred_pos) > 0:
precision = np.mean(y_true[pred_pos] == self.favorable_label)
else:
precision = 0
precision_values[str(group)] = precision
overall_precision = np.mean(
y_true[y_pred == self.favorable_label] == self.favorable_label
) if np.sum(y_pred == self.favorable_label) > 0 else 0
disparities = {
group: abs(prec - overall_precision)
for group, prec in precision_values.items()
}
max_disparity = max(disparities.values()) if disparities else 0
recommendations = []
if max_disparity > threshold:
recommendations.append("Precision varies significantly across groups")
recommendations.append("Consider calibration techniques")
return FairnessReport(
metric_name="Predictive Parity",
overall_value=overall_precision,
group_values=precision_values,
disparities=disparities,
passes_threshold=max_disparity <= threshold,
threshold=threshold,
recommendations=recommendations
)
def calibration_fairness(
self,
y_true: np.ndarray,
y_prob: np.ndarray,
protected_groups: np.ndarray,
n_bins: int = 10,
threshold: float = 0.1
) -> FairnessReport:
"""
Calibration fairness: For each probability bin, actual rate should match.
"""
groups = np.unique(protected_groups)
calibration_scores = {}
bin_edges = np.linspace(0, 1, n_bins + 1)
for group in groups:
mask = protected_groups == group
group_probs = y_prob[mask]
group_true = y_true[mask]
errors = []
for i in range(n_bins):
bin_mask = (group_probs >= bin_edges[i]) & (group_probs < bin_edges[i+1])
if np.sum(bin_mask) > 0:
expected = np.mean(group_probs[bin_mask])
actual = np.mean(group_true[bin_mask])
errors.append(abs(expected - actual))
calibration_scores[str(group)] = np.mean(errors) if errors else 0
overall_calibration = np.mean(list(calibration_scores.values()))
disparities = {
group: abs(score - overall_calibration)
for group, score in calibration_scores.items()
}
max_disparity = max(disparities.values()) if disparities else 0
recommendations = []
if max_disparity > threshold:
recommendations.append("Model is not equally well-calibrated across groups")
recommendations.append("Consider group-specific calibration")
return FairnessReport(
metric_name="Calibration Fairness",
overall_value=overall_calibration,
group_values=calibration_scores,
disparities=disparities,
passes_threshold=max_disparity <= threshold,
threshold=threshold,
recommendations=recommendations
)
def individual_fairness(
self,
features: np.ndarray,
y_pred: np.ndarray,
similarity_threshold: float = 0.1,
prediction_threshold: float = 0.1
) -> FairnessReport:
"""
Individual fairness: Similar individuals should receive similar predictions.
"""
n_samples = len(features)
# Normalize features
features_norm = (features - features.mean(axis=0)) / (features.std(axis=0) + 1e-10)
violations = 0
total_pairs = 0
# Sample pairs for efficiency
n_pairs = min(10000, n_samples * (n_samples - 1) // 2)
indices = np.random.choice(n_samples, size=(n_pairs, 2), replace=True)
for i, j in indices:
if i == j:
continue
# Calculate feature distance
feature_dist = np.linalg.norm(features_norm[i] - features_norm[j])
# Calculate prediction distance
pred_dist = abs(y_pred[i] - y_pred[j])
# Check violation
if feature_dist < similarity_threshold and pred_dist > prediction_threshold:
violations += 1
total_pairs += 1
violation_rate = violations / total_pairs if total_pairs > 0 else 0
recommendations = []
if violation_rate > 0.05:
recommendations.append(f"Individual fairness violation rate: {violation_rate:.1%}")
recommendations.append("Consider adding consistency regularization")
return FairnessReport(
metric_name="Individual Fairness",
overall_value=violation_rate,
group_values={'violation_rate': violation_rate},
disparities={'violations': violations, 'total_pairs': total_pairs},
passes_threshold=violation_rate <= 0.05,
threshold=0.05,
recommendations=recommendations
)
def comprehensive_evaluation(
self,
y_true: np.ndarray,
y_pred: np.ndarray,
y_prob: np.ndarray,
protected_groups: np.ndarray,
features: Optional[np.ndarray] = None
) -> Dict[str, FairnessReport]:
"""Run all fairness metrics."""
reports = {}
reports['demographic_parity'] = self.demographic_parity(y_pred, protected_groups)
reports['equalized_odds'] = self.equalized_odds(y_true, y_pred, protected_groups)
reports['predictive_parity'] = self.predictive_parity(y_true, y_pred, protected_groups)
reports['calibration'] = self.calibration_fairness(y_true, y_prob, protected_groups)
if features is not None:
reports['individual_fairness'] = self.individual_fairness(features, y_prob)
return reportsBias Mitigation Techniques
# bias_mitigation.py
"""
Bias mitigation techniques for ML models.
"""
import numpy as np
from typing import Dict, List, Tuple, Optional
from sklearn.base import BaseEstimator, TransformerMixin, ClassifierMixin
import warnings
class Reweighing(BaseEstimator, TransformerMixin):
"""
Pre-processing technique that assigns weights to training examples.
Adjusts weights to ensure demographic parity in the training data.
"""
def __init__(self, protected_attribute_idx: int):
self.protected_attribute_idx = protected_attribute_idx
self.weights_ = None
def fit(self, X: np.ndarray, y: np.ndarray) -> 'Reweighing':
"""Calculate reweighing weights."""
protected = X[:, self.protected_attribute_idx]
groups = np.unique(protected)
n = len(y)
weights = np.ones(n)
for group in groups:
group_mask = protected == group
for label in [0, 1]:
label_mask = y == label
# P(A=a)
p_group = np.sum(group_mask) / n
# P(Y=y)
p_label = np.sum(label_mask) / n
# P(A=a, Y=y)
p_joint = np.sum(group_mask & label_mask) / n
# Expected under independence: P(A=a) * P(Y=y)
expected = p_group * p_label
# Weight
if p_joint > 0:
weight = expected / p_joint
else:
weight = 1.0
weights[group_mask & label_mask] = weight
self.weights_ = weights
return self
def transform(self, X: np.ndarray) -> np.ndarray:
"""Return the features unchanged (weights stored separately)."""
return X
def get_weights(self) -> np.ndarray:
"""Get the calculated weights."""
return self.weights_
class DisparateImpactRemover(BaseEstimator, TransformerMixin):
"""
Pre-processing technique that modifies features to reduce correlation
with protected attribute while preserving other information.
"""
def __init__(
self,
protected_attribute_idx: int,
repair_level: float = 1.0
):
self.protected_attribute_idx = protected_attribute_idx
self.repair_level = repair_level
self.median_values_ = {}
def fit(self, X: np.ndarray, y: np.ndarray = None) -> 'DisparateImpactRemover':
"""Learn median values for each group."""
protected = X[:, self.protected_attribute_idx]
groups = np.unique(protected)
for feature_idx in range(X.shape[1]):
if feature_idx == self.protected_attribute_idx:
continue
self.median_values_[feature_idx] = {}
overall_median = np.median(X[:, feature_idx])
for group in groups:
group_mask = protected == group
group_median = np.median(X[group_mask, feature_idx])
self.median_values_[feature_idx][group] = {
'group_median': group_median,
'overall_median': overall_median
}
return self
def transform(self, X: np.ndarray) -> np.ndarray:
"""Transform features to reduce disparate impact."""
X_transformed = X.copy().astype(float)
protected = X[:, self.protected_attribute_idx]
for feature_idx, medians in self.median_values_.items():
for group, values in medians.items():
group_mask = protected == group
shift = values['overall_median'] - values['group_median']
X_transformed[group_mask, feature_idx] += self.repair_level * shift
return X_transformed
class CalibratedEqualizedOdds(BaseEstimator, ClassifierMixin):
"""
Post-processing technique that adjusts predictions to satisfy
equalized odds constraint.
"""
def __init__(self, base_estimator, protected_attribute_idx: int):
self.base_estimator = base_estimator
self.protected_attribute_idx = protected_attribute_idx
self.thresholds_ = {}
def fit(self, X: np.ndarray, y: np.ndarray) -> 'CalibratedEqualizedOdds':
"""Fit base estimator and calculate optimal thresholds."""
self.base_estimator.fit(X, y)
# Get probabilities on training data
y_prob = self.base_estimator.predict_proba(X)[:, 1]
protected = X[:, self.protected_attribute_idx]
groups = np.unique(protected)
# Find thresholds that equalize TPR across groups
target_tpr = self._calculate_target_tpr(y, y_prob, protected, groups)
for group in groups:
group_mask = protected == group
threshold = self._find_threshold_for_tpr(
y[group_mask],
y_prob[group_mask],
target_tpr
)
self.thresholds_[group] = threshold
return self
def predict(self, X: np.ndarray) -> np.ndarray:
"""Make predictions using group-specific thresholds."""
y_prob = self.base_estimator.predict_proba(X)[:, 1]
protected = X[:, self.protected_attribute_idx]
predictions = np.zeros(len(X), dtype=int)
for group, threshold in self.thresholds_.items():
group_mask = protected == group
predictions[group_mask] = (y_prob[group_mask] >= threshold).astype(int)
return predictions
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""Return probabilities from base estimator."""
return self.base_estimator.predict_proba(X)
def _calculate_target_tpr(
self,
y: np.ndarray,
y_prob: np.ndarray,
protected: np.ndarray,
groups: np.ndarray
) -> float:
"""Calculate target TPR (average across groups)."""
tprs = []
for group in groups:
group_mask = protected == group
pos_mask = y == 1
combined = group_mask & pos_mask
if np.sum(combined) > 0:
median_prob = np.median(y_prob[combined])
tpr = np.mean(y_prob[combined] >= median_prob)
tprs.append(tpr)
return np.mean(tprs) if tprs else 0.5
def _find_threshold_for_tpr(
self,
y: np.ndarray,
y_prob: np.ndarray,
target_tpr: float
) -> float:
"""Find threshold that achieves target TPR."""
pos_probs = y_prob[y == 1]
if len(pos_probs) == 0:
return 0.5
# Binary search for threshold
thresholds = np.linspace(0, 1, 100)
best_threshold = 0.5
best_diff = float('inf')
for threshold in thresholds:
tpr = np.mean(pos_probs >= threshold)
diff = abs(tpr - target_tpr)
if diff < best_diff:
best_diff = diff
best_threshold = threshold
return best_threshold
class FairConstrainedClassifier(BaseEstimator, ClassifierMixin):
"""
In-processing technique that adds fairness constraints during training.
Uses adversarial debiasing approach.
"""
def __init__(
self,
base_estimator,
protected_attribute_idx: int,
adversary_weight: float = 1.0
):
self.base_estimator = base_estimator
self.protected_attribute_idx = protected_attribute_idx
self.adversary_weight = adversary_weight
def fit(self, X: np.ndarray, y: np.ndarray) -> 'FairConstrainedClassifier':
"""
Fit with fairness constraint.
This is a simplified version - production would use gradient reversal
or adversarial training.
"""
protected = X[:, self.protected_attribute_idx]
# Calculate sample weights to reduce correlation
weights = self._calculate_debiasing_weights(X, y, protected)
# Fit with weights if supported
if hasattr(self.base_estimator, 'sample_weight'):
self.base_estimator.fit(X, y, sample_weight=weights)
else:
self.base_estimator.fit(X, y)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
return self.base_estimator.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
return self.base_estimator.predict_proba(X)
def _calculate_debiasing_weights(
self,
X: np.ndarray,
y: np.ndarray,
protected: np.ndarray
) -> np.ndarray:
"""Calculate weights to reduce bias."""
n = len(y)
weights = np.ones(n)
groups = np.unique(protected)
for group in groups:
group_mask = protected == group
group_size = np.sum(group_mask)
# Weight to balance groups
weights[group_mask] *= n / (len(groups) * group_size)
# Normalize
weights = weights / weights.sum() * n
return weightsContinuous Fairness Monitoring
# fairness_monitoring.py
"""
Production fairness monitoring system.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import numpy as np
@dataclass
class FairnessAlert:
"""Alert for fairness violation."""
alert_id: str
timestamp: datetime
metric_name: str
current_value: float
threshold: float
affected_groups: List[str]
severity: str
description: str
@dataclass
class FairnessSnapshot:
"""Point-in-time fairness snapshot."""
timestamp: datetime
metrics: Dict[str, float]
group_metrics: Dict[str, Dict[str, float]]
sample_size: int
class FairnessMonitor:
"""
Monitor model fairness in production.
"""
def __init__(
self,
protected_attribute: str,
metrics_config: Dict[str, float], # metric_name -> threshold
alert_callback: Optional[callable] = None
):
self.protected_attribute = protected_attribute
self.metrics_config = metrics_config
self.alert_callback = alert_callback
self.history: List[FairnessSnapshot] = []
self.alerts: List[FairnessAlert] = []
self.baseline_metrics: Optional[Dict] = None
def set_baseline(self, snapshot: FairnessSnapshot):
"""Set baseline metrics from validation/test set."""
self.baseline_metrics = snapshot.metrics
def record_predictions(
self,
y_true: np.ndarray,
y_pred: np.ndarray,
protected_groups: np.ndarray,
timestamp: Optional[datetime] = None
):
"""Record predictions and calculate fairness metrics."""
if timestamp is None:
timestamp = datetime.utcnow()
# Calculate metrics
metrics = {}
group_metrics = {}
groups = np.unique(protected_groups)
# Demographic parity
selection_rates = {}
for group in groups:
mask = protected_groups == group
selection_rates[str(group)] = np.mean(y_pred[mask] == 1)
overall_rate = np.mean(y_pred == 1)
dp_disparity = max(abs(r - overall_rate) for r in selection_rates.values())
metrics['demographic_parity_disparity'] = dp_disparity
group_metrics['selection_rate'] = selection_rates
# Equalized odds
tpr_values = {}
fpr_values = {}
for group in groups:
mask = protected_groups == group
pos_mask = y_true == 1
neg_mask = y_true == 0
if np.sum(pos_mask & mask) > 0:
tpr_values[str(group)] = np.mean(y_pred[pos_mask & mask] == 1)
if np.sum(neg_mask & mask) > 0:
fpr_values[str(group)] = np.mean(y_pred[neg_mask & mask] == 1)
if tpr_values:
metrics['tpr_disparity'] = max(tpr_values.values()) - min(tpr_values.values())
if fpr_values:
metrics['fpr_disparity'] = max(fpr_values.values()) - min(fpr_values.values())
group_metrics['tpr'] = tpr_values
group_metrics['fpr'] = fpr_values
# Create snapshot
snapshot = FairnessSnapshot(
timestamp=timestamp,
metrics=metrics,
group_metrics=group_metrics,
sample_size=len(y_true)
)
self.history.append(snapshot)
# Check for alerts
self._check_alerts(snapshot)
return snapshot
def _check_alerts(self, snapshot: FairnessSnapshot):
"""Check if any metrics exceed thresholds."""
for metric_name, threshold in self.metrics_config.items():
if metric_name not in snapshot.metrics:
continue
current_value = snapshot.metrics[metric_name]
if current_value > threshold:
# Check for drift from baseline
baseline_value = self.baseline_metrics.get(metric_name, 0) if self.baseline_metrics else 0
drift = abs(current_value - baseline_value)
severity = 'critical' if drift > threshold * 2 else 'warning'
alert = FairnessAlert(
alert_id=f"FA-{len(self.alerts)}",
timestamp=snapshot.timestamp,
metric_name=metric_name,
current_value=current_value,
threshold=threshold,
affected_groups=list(snapshot.group_metrics.get('selection_rate', {}).keys()),
severity=severity,
description=f"{metric_name} ({current_value:.3f}) exceeds threshold ({threshold:.3f})"
)
self.alerts.append(alert)
if self.alert_callback:
self.alert_callback(alert)
def get_trend(
self,
metric_name: str,
window_days: int = 7
) -> Dict[str, List]:
"""Get metric trend over time."""
cutoff = datetime.utcnow() - timedelta(days=window_days)
timestamps = []
values = []
for snapshot in self.history:
if snapshot.timestamp >= cutoff:
if metric_name in snapshot.metrics:
timestamps.append(snapshot.timestamp)
values.append(snapshot.metrics[metric_name])
return {
'timestamps': timestamps,
'values': values,
'trend': self._calculate_trend(values) if len(values) > 1 else 'stable'
}
def _calculate_trend(self, values: List[float]) -> str:
"""Calculate trend direction."""
if len(values) < 2:
return 'stable'
recent = np.mean(values[-5:]) if len(values) >= 5 else values[-1]
older = np.mean(values[:5]) if len(values) >= 5 else values[0]
diff = recent - older
if abs(diff) < 0.01:
return 'stable'
elif diff > 0:
return 'increasing'
else:
return 'decreasing'
def generate_report(self) -> Dict:
"""Generate fairness monitoring report."""
if not self.history:
return {'error': 'No data recorded'}
latest = self.history[-1]
recent_alerts = [a for a in self.alerts if a.timestamp >= datetime.utcnow() - timedelta(days=7)]
return {
'report_timestamp': datetime.utcnow().isoformat(),
'monitoring_period': {
'start': self.history[0].timestamp.isoformat(),
'end': latest.timestamp.isoformat(),
'total_snapshots': len(self.history)
},
'current_metrics': latest.metrics,
'group_breakdown': latest.group_metrics,
'thresholds': self.metrics_config,
'alerts_summary': {
'total': len(self.alerts),
'recent_7_days': len(recent_alerts),
'by_severity': {
'critical': len([a for a in recent_alerts if a.severity == 'critical']),
'warning': len([a for a in recent_alerts if a.severity == 'warning'])
}
},
'trends': {
metric: self.get_trend(metric)['trend']
for metric in self.metrics_config.keys()
},
'compliance_status': 'PASS' if not recent_alerts else 'FAIL'
}Conclusion
Building fair AI systems requires:
- Comprehensive measurement using multiple fairness metrics
- Pre-processing techniques like reweighing and disparate impact removal
- In-processing constraints during model training
- Post-processing calibration for equalized outcomes
- Continuous monitoring to detect fairness drift in production
No single metric or technique is sufficient - organizations should adopt a multi-faceted approach based on their specific context and the nature of potential harms.