AI Data Privacy Techniques: Differential Privacy, Anonymization, and Synthetic Data
As AI systems process increasingly sensitive data, implementing robust privacy protections is essential for compliance and trust. This guide covers practical implementations of differential privacy, data anonymization, and synthetic data generation techniques.
Differential Privacy Implementation
Differential privacy provides mathematical guarantees that individual records cannot be identified from query results or model outputs.
Core Differential Privacy Mechanisms
# differential_privacy.py
"""
Differential privacy mechanisms for AI applications.
"""
import numpy as np
from typing import Callable, List, Tuple, Optional
from dataclasses import dataclass
import math
@dataclass
class PrivacyBudget:
"""Tracks privacy budget expenditure."""
epsilon: float
delta: float
queries_made: int = 0
epsilon_spent: float = 0.0
def can_query(self, query_epsilon: float) -> bool:
"""Check if query is within budget."""
return self.epsilon_spent + query_epsilon <= self.epsilon
def record_query(self, query_epsilon: float):
"""Record a query against the budget."""
self.epsilon_spent += query_epsilon
self.queries_made += 1
class LaplaceMechanism:
"""
Laplace mechanism for numeric queries.
Provides (epsilon, 0)-differential privacy.
"""
def __init__(self, epsilon: float, sensitivity: float):
"""
Args:
epsilon: Privacy parameter (smaller = more privacy)
sensitivity: Maximum change in query result from one record
"""
self.epsilon = epsilon
self.sensitivity = sensitivity
self.scale = sensitivity / epsilon
def add_noise(self, value: float) -> float:
"""Add Laplace noise to a value."""
noise = np.random.laplace(0, self.scale)
return value + noise
def add_noise_to_array(self, values: np.ndarray) -> np.ndarray:
"""Add independent Laplace noise to each element."""
noise = np.random.laplace(0, self.scale, values.shape)
return values + noise
class GaussianMechanism:
"""
Gaussian mechanism for numeric queries.
Provides (epsilon, delta)-differential privacy.
"""
def __init__(self, epsilon: float, delta: float, sensitivity: float):
"""
Args:
epsilon: Privacy parameter
delta: Probability of privacy breach
sensitivity: L2 sensitivity of the query
"""
self.epsilon = epsilon
self.delta = delta
self.sensitivity = sensitivity
# Calculate noise scale
self.sigma = self._calculate_sigma()
def _calculate_sigma(self) -> float:
"""Calculate Gaussian noise standard deviation."""
# Analytic Gaussian mechanism
return self.sensitivity * math.sqrt(2 * math.log(1.25 / self.delta)) / self.epsilon
def add_noise(self, value: float) -> float:
"""Add Gaussian noise to a value."""
noise = np.random.normal(0, self.sigma)
return value + noise
def add_noise_to_array(self, values: np.ndarray) -> np.ndarray:
"""Add independent Gaussian noise to each element."""
noise = np.random.normal(0, self.sigma, values.shape)
return values + noise
class ExponentialMechanism:
"""
Exponential mechanism for categorical/selection queries.
Selects an output with probability proportional to exp(epsilon * score / 2 * sensitivity).
"""
def __init__(
self,
epsilon: float,
sensitivity: float,
score_function: Callable[[any, any], float]
):
"""
Args:
epsilon: Privacy parameter
sensitivity: Maximum change in score from one record
score_function: Function(database, option) -> score
"""
self.epsilon = epsilon
self.sensitivity = sensitivity
self.score_function = score_function
def select(self, database: any, options: List[any]) -> any:
"""Select an option using the exponential mechanism."""
scores = [self.score_function(database, opt) for opt in options]
# Calculate selection probabilities
scaled_scores = [
self.epsilon * s / (2 * self.sensitivity)
for s in scores
]
# Normalize for numerical stability
max_score = max(scaled_scores)
exp_scores = [math.exp(s - max_score) for s in scaled_scores]
total = sum(exp_scores)
probabilities = [s / total for s in exp_scores]
# Sample from distribution
return np.random.choice(options, p=probabilities)
class DPQueryEngine:
"""
Privacy-preserving query engine with budget tracking.
"""
def __init__(self, data: np.ndarray, privacy_budget: PrivacyBudget):
self.data = data
self.budget = privacy_budget
def count(self, predicate: Callable[[np.ndarray], np.ndarray], epsilon: float) -> float:
"""Count records matching predicate with DP."""
if not self.budget.can_query(epsilon):
raise ValueError("Privacy budget exceeded")
# True count (sensitivity = 1 for counting)
true_count = np.sum(predicate(self.data))
# Add noise
mechanism = LaplaceMechanism(epsilon, sensitivity=1.0)
noisy_count = mechanism.add_noise(true_count)
self.budget.record_query(epsilon)
return max(0, noisy_count) # Counts can't be negative
def mean(
self,
column: int,
epsilon: float,
lower_bound: float,
upper_bound: float
) -> float:
"""Calculate mean with DP."""
if not self.budget.can_query(epsilon):
raise ValueError("Privacy budget exceeded")
# Clip values to bounds
clipped = np.clip(self.data[:, column], lower_bound, upper_bound)
# Calculate mean
true_mean = np.mean(clipped)
# Sensitivity of mean = (upper - lower) / n
n = len(self.data)
sensitivity = (upper_bound - lower_bound) / n
mechanism = LaplaceMechanism(epsilon, sensitivity)
noisy_mean = mechanism.add_noise(true_mean)
self.budget.record_query(epsilon)
return np.clip(noisy_mean, lower_bound, upper_bound)
def histogram(
self,
column: int,
bins: List[float],
epsilon: float
) -> np.ndarray:
"""Calculate histogram with DP."""
if not self.budget.can_query(epsilon):
raise ValueError("Privacy budget exceeded")
# True histogram (sensitivity = 1 for each bin)
true_hist, _ = np.histogram(self.data[:, column], bins=bins)
# Add noise to each bin
# Use epsilon/len(bins) for composition
per_bin_epsilon = epsilon / len(true_hist)
mechanism = LaplaceMechanism(per_bin_epsilon, sensitivity=1.0)
noisy_hist = mechanism.add_noise_to_array(true_hist.astype(float))
self.budget.record_query(epsilon)
return np.maximum(0, noisy_hist)
class DPGradientDescent:
"""
Differentially private gradient descent for ML training.
"""
def __init__(
self,
epsilon: float,
delta: float,
max_grad_norm: float,
noise_multiplier: float
):
self.epsilon = epsilon
self.delta = delta
self.max_grad_norm = max_grad_norm
self.noise_multiplier = noise_multiplier
def clip_gradients(self, gradients: List[np.ndarray]) -> List[np.ndarray]:
"""Clip gradients to bound sensitivity."""
clipped = []
for grad in gradients:
grad_norm = np.linalg.norm(grad)
clip_factor = min(1.0, self.max_grad_norm / (grad_norm + 1e-10))
clipped.append(grad * clip_factor)
return clipped
def add_noise_to_gradients(
self,
gradients: List[np.ndarray],
batch_size: int
) -> List[np.ndarray]:
"""Add Gaussian noise to clipped gradients."""
noisy_gradients = []
for grad in gradients:
noise_scale = self.noise_multiplier * self.max_grad_norm / batch_size
noise = np.random.normal(0, noise_scale, grad.shape)
noisy_gradients.append(grad + noise)
return noisy_gradients
def private_gradient_step(
self,
model_params: List[np.ndarray],
per_sample_gradients: List[List[np.ndarray]],
learning_rate: float
) -> List[np.ndarray]:
"""Perform one DP gradient descent step."""
batch_size = len(per_sample_gradients)
# Clip each sample's gradients
clipped_grads = [
self.clip_gradients(sample_grads)
for sample_grads in per_sample_gradients
]
# Average clipped gradients
avg_grads = []
for param_idx in range(len(model_params)):
param_grads = [g[param_idx] for g in clipped_grads]
avg_grads.append(np.mean(param_grads, axis=0))
# Add noise
noisy_grads = self.add_noise_to_gradients(avg_grads, batch_size)
# Update parameters
new_params = []
for param, grad in zip(model_params, noisy_grads):
new_params.append(param - learning_rate * grad)
return new_paramsData Anonymization Techniques
K-Anonymity and L-Diversity
# anonymization.py
"""
Data anonymization techniques for privacy protection.
"""
import pandas as pd
import numpy as np
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
from itertools import combinations
@dataclass
class AnonymizationConfig:
"""Configuration for anonymization."""
quasi_identifiers: List[str]
sensitive_attributes: List[str]
k_anonymity: int = 5
l_diversity: int = 2
t_closeness: float = 0.2
class KAnonymizer:
"""
Implements k-anonymity through generalization and suppression.
"""
def __init__(self, config: AnonymizationConfig):
self.config = config
self.generalization_hierarchies: Dict[str, List] = {}
def set_generalization_hierarchy(
self,
column: str,
hierarchy: List[Dict]
):
"""
Set generalization hierarchy for a column.
Example hierarchy for age:
[
{range: [0, 10], value: "0-10"},
{range: [11, 20], value: "11-20"},
...
]
"""
self.generalization_hierarchies[column] = hierarchy
def anonymize(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Anonymize dataframe to achieve k-anonymity.
"""
df_anon = df.copy()
# Check initial k-anonymity
if self._check_k_anonymity(df_anon):
return df_anon
# Apply generalization iteratively
for qi in self.config.quasi_identifiers:
if qi in self.generalization_hierarchies:
df_anon = self._generalize_column(df_anon, qi)
if self._check_k_anonymity(df_anon):
break
# Suppress remaining non-compliant records
df_anon = self._suppress_outliers(df_anon)
return df_anon
def _check_k_anonymity(self, df: pd.DataFrame) -> bool:
"""Check if dataframe satisfies k-anonymity."""
groups = df.groupby(self.config.quasi_identifiers).size()
return groups.min() >= self.config.k_anonymity
def _generalize_column(
self,
df: pd.DataFrame,
column: str
) -> pd.DataFrame:
"""Apply generalization to a column."""
hierarchy = self.generalization_hierarchies[column]
def generalize_value(value):
for level in hierarchy:
if 'range' in level:
if level['range'][0] <= value <= level['range'][1]:
return level['value']
elif 'values' in level:
if value in level['values']:
return level['value']
return value
df[column] = df[column].apply(generalize_value)
return df
def _suppress_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
"""Suppress records that violate k-anonymity."""
groups = df.groupby(self.config.quasi_identifiers)
group_sizes = groups.size()
# Find groups below k threshold
small_groups = group_sizes[group_sizes < self.config.k_anonymity].index
# Suppress by replacing QIs with '*'
for qi in self.config.quasi_identifiers:
mask = df.set_index(self.config.quasi_identifiers).index.isin(small_groups)
df.loc[mask.values, qi] = '*'
return df
def calculate_information_loss(
self,
original: pd.DataFrame,
anonymized: pd.DataFrame
) -> Dict[str, float]:
"""Calculate information loss from anonymization."""
metrics = {}
for qi in self.config.quasi_identifiers:
orig_unique = original[qi].nunique()
anon_unique = anonymized[qi].nunique()
# Generalization information loss
metrics[f"{qi}_generalization_loss"] = 1 - (anon_unique / orig_unique)
# Suppression rate
suppressed = (anonymized[self.config.quasi_identifiers[0]] == '*').sum()
metrics["suppression_rate"] = suppressed / len(anonymized)
return metrics
class LDiversifier:
"""
Implements l-diversity for sensitive attribute protection.
"""
def __init__(self, config: AnonymizationConfig):
self.config = config
def check_l_diversity(self, df: pd.DataFrame) -> bool:
"""Check if dataframe satisfies l-diversity."""
groups = df.groupby(self.config.quasi_identifiers)
for _, group in groups:
for attr in self.config.sensitive_attributes:
unique_values = group[attr].nunique()
if unique_values < self.config.l_diversity:
return False
return True
def enforce_l_diversity(
self,
df: pd.DataFrame,
method: str = "suppression"
) -> pd.DataFrame:
"""Enforce l-diversity through specified method."""
df_div = df.copy()
if method == "suppression":
return self._suppress_for_l_diversity(df_div)
elif method == "generalization":
return self._generalize_for_l_diversity(df_div)
else:
raise ValueError(f"Unknown method: {method}")
def _suppress_for_l_diversity(self, df: pd.DataFrame) -> pd.DataFrame:
"""Suppress records to achieve l-diversity."""
groups = df.groupby(self.config.quasi_identifiers)
rows_to_suppress = []
for group_key, group in groups:
for attr in self.config.sensitive_attributes:
if group[attr].nunique() < self.config.l_diversity:
# Suppress this entire group
rows_to_suppress.extend(group.index.tolist())
break
# Mark suppressed rows
for qi in self.config.quasi_identifiers:
df.loc[rows_to_suppress, qi] = '*'
return df
def _generalize_for_l_diversity(self, df: pd.DataFrame) -> pd.DataFrame:
"""Generalize sensitive attributes for l-diversity."""
# This is a simplified version - production would need
# proper generalization hierarchies for sensitive attributes
for attr in self.config.sensitive_attributes:
# Reduce granularity by binning
if df[attr].dtype in ['int64', 'float64']:
df[attr] = pd.qcut(
df[attr],
q=self.config.l_diversity,
duplicates='drop'
).astype(str)
return df
class TCloseness:
"""
Implements t-closeness for enhanced sensitive attribute protection.
"""
def __init__(self, config: AnonymizationConfig):
self.config = config
def check_t_closeness(self, df: pd.DataFrame) -> bool:
"""Check if dataframe satisfies t-closeness."""
groups = df.groupby(self.config.quasi_identifiers)
for attr in self.config.sensitive_attributes:
# Global distribution
global_dist = df[attr].value_counts(normalize=True)
for _, group in groups:
# Group distribution
group_dist = group[attr].value_counts(normalize=True)
# Calculate Earth Mover's Distance (EMD)
distance = self._calculate_emd(global_dist, group_dist)
if distance > self.config.t_closeness:
return False
return True
def _calculate_emd(
self,
dist1: pd.Series,
dist2: pd.Series
) -> float:
"""Calculate Earth Mover's Distance between distributions."""
# Align distributions
all_values = set(dist1.index) | set(dist2.index)
p = np.array([dist1.get(v, 0) for v in all_values])
q = np.array([dist2.get(v, 0) for v in all_values])
# Simple EMD approximation using cumulative distribution difference
p_cumsum = np.cumsum(p)
q_cumsum = np.cumsum(q)
return np.sum(np.abs(p_cumsum - q_cumsum)) / len(all_values)Synthetic Data Generation
# synthetic_data.py
"""
Synthetic data generation for privacy-preserving ML.
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from scipy import stats
@dataclass
class ColumnSpec:
"""Specification for a data column."""
name: str
dtype: str # 'numeric', 'categorical', 'datetime'
distribution: Optional[str] = None
categories: Optional[List] = None
min_value: Optional[float] = None
max_value: Optional[float] = None
correlations: Optional[Dict[str, float]] = None
class SyntheticDataGenerator:
"""
Generate synthetic data that preserves statistical properties.
"""
def __init__(self, column_specs: List[ColumnSpec]):
self.column_specs = {spec.name: spec for spec in column_specs}
self.learned_params: Dict[str, Dict] = {}
self.correlation_matrix: Optional[np.ndarray] = None
def fit(self, df: pd.DataFrame):
"""Learn parameters from real data."""
for col_name, spec in self.column_specs.items():
if col_name not in df.columns:
continue
if spec.dtype == 'numeric':
self.learned_params[col_name] = self._fit_numeric(df[col_name])
elif spec.dtype == 'categorical':
self.learned_params[col_name] = self._fit_categorical(df[col_name])
elif spec.dtype == 'datetime':
self.learned_params[col_name] = self._fit_datetime(df[col_name])
# Learn correlations between numeric columns
numeric_cols = [
col for col, spec in self.column_specs.items()
if spec.dtype == 'numeric' and col in df.columns
]
if len(numeric_cols) > 1:
self.correlation_matrix = df[numeric_cols].corr().values
self.numeric_columns = numeric_cols
def _fit_numeric(self, series: pd.Series) -> Dict:
"""Fit distribution to numeric column."""
clean = series.dropna()
# Try different distributions and select best fit
distributions = ['norm', 'lognorm', 'expon', 'gamma']
best_dist = None
best_ks = float('inf')
for dist_name in distributions:
dist = getattr(stats, dist_name)
try:
params = dist.fit(clean)
ks_stat, _ = stats.kstest(clean, dist_name, params)
if ks_stat < best_ks:
best_ks = ks_stat
best_dist = {
'distribution': dist_name,
'params': params,
'min': clean.min(),
'max': clean.max(),
'null_rate': series.isna().mean()
}
except:
continue
if best_dist is None:
# Fall back to empirical distribution
best_dist = {
'distribution': 'empirical',
'values': clean.values,
'null_rate': series.isna().mean()
}
return best_dist
def _fit_categorical(self, series: pd.Series) -> Dict:
"""Fit categorical column."""
value_counts = series.value_counts(normalize=True, dropna=False)
return {
'categories': value_counts.index.tolist(),
'probabilities': value_counts.values.tolist(),
'null_rate': series.isna().mean()
}
def _fit_datetime(self, series: pd.Series) -> Dict:
"""Fit datetime column."""
clean = pd.to_datetime(series.dropna())
return {
'min': clean.min(),
'max': clean.max(),
'null_rate': series.isna().mean()
}
def generate(self, n_samples: int) -> pd.DataFrame:
"""Generate synthetic data."""
data = {}
# Generate numeric columns with correlations
if hasattr(self, 'numeric_columns') and len(self.numeric_columns) > 1:
numeric_data = self._generate_correlated_numeric(n_samples)
data.update(numeric_data)
# Generate remaining columns
for col_name, spec in self.column_specs.items():
if col_name in data:
continue
if col_name not in self.learned_params:
continue
params = self.learned_params[col_name]
if spec.dtype == 'numeric':
data[col_name] = self._generate_numeric(n_samples, params)
elif spec.dtype == 'categorical':
data[col_name] = self._generate_categorical(n_samples, params)
elif spec.dtype == 'datetime':
data[col_name] = self._generate_datetime(n_samples, params)
return pd.DataFrame(data)
def _generate_correlated_numeric(self, n_samples: int) -> Dict[str, np.ndarray]:
"""Generate correlated numeric columns."""
# Generate standard normal samples
n_cols = len(self.numeric_columns)
samples = np.random.multivariate_normal(
mean=np.zeros(n_cols),
cov=self.correlation_matrix,
size=n_samples
)
result = {}
for i, col_name in enumerate(self.numeric_columns):
params = self.learned_params[col_name]
if params['distribution'] == 'empirical':
# Use quantile transform
sorted_vals = np.sort(params['values'])
percentiles = stats.norm.cdf(samples[:, i])
indices = (percentiles * (len(sorted_vals) - 1)).astype(int)
values = sorted_vals[indices]
else:
# Transform standard normal to target distribution
dist = getattr(stats, params['distribution'])
percentiles = stats.norm.cdf(samples[:, i])
values = dist.ppf(percentiles, *params['params'][:-2],
loc=params['params'][-2],
scale=params['params'][-1])
# Clip to original range
if 'min' in params and 'max' in params:
values = np.clip(values, params['min'], params['max'])
# Add nulls
if params.get('null_rate', 0) > 0:
null_mask = np.random.random(n_samples) < params['null_rate']
values = values.astype(float)
values[null_mask] = np.nan
result[col_name] = values
return result
def _generate_numeric(self, n_samples: int, params: Dict) -> np.ndarray:
"""Generate numeric column."""
if params['distribution'] == 'empirical':
values = np.random.choice(params['values'], size=n_samples)
else:
dist = getattr(stats, params['distribution'])
values = dist.rvs(*params['params'][:-2],
loc=params['params'][-2],
scale=params['params'][-1],
size=n_samples)
if 'min' in params and 'max' in params:
values = np.clip(values, params['min'], params['max'])
if params.get('null_rate', 0) > 0:
null_mask = np.random.random(n_samples) < params['null_rate']
values = values.astype(float)
values[null_mask] = np.nan
return values
def _generate_categorical(self, n_samples: int, params: Dict) -> np.ndarray:
"""Generate categorical column."""
return np.random.choice(
params['categories'],
size=n_samples,
p=params['probabilities']
)
def _generate_datetime(self, n_samples: int, params: Dict) -> np.ndarray:
"""Generate datetime column."""
start = params['min'].timestamp()
end = params['max'].timestamp()
timestamps = np.random.uniform(start, end, n_samples)
values = pd.to_datetime(timestamps, unit='s')
if params.get('null_rate', 0) > 0:
null_mask = np.random.random(n_samples) < params['null_rate']
values = values.to_series()
values[null_mask] = pd.NaT
values = values.values
return values
def evaluate_quality(
self,
original: pd.DataFrame,
synthetic: pd.DataFrame
) -> Dict[str, float]:
"""Evaluate quality of synthetic data."""
metrics = {}
for col_name in self.column_specs.keys():
if col_name not in original.columns or col_name not in synthetic.columns:
continue
spec = self.column_specs[col_name]
if spec.dtype == 'numeric':
# Statistical similarity
orig_mean = original[col_name].mean()
synth_mean = synthetic[col_name].mean()
metrics[f"{col_name}_mean_diff"] = abs(orig_mean - synth_mean) / (abs(orig_mean) + 1e-10)
orig_std = original[col_name].std()
synth_std = synthetic[col_name].std()
metrics[f"{col_name}_std_diff"] = abs(orig_std - synth_std) / (abs(orig_std) + 1e-10)
# KS test
ks_stat, _ = stats.ks_2samp(
original[col_name].dropna(),
synthetic[col_name].dropna()
)
metrics[f"{col_name}_ks_statistic"] = ks_stat
elif spec.dtype == 'categorical':
# Distribution similarity
orig_dist = original[col_name].value_counts(normalize=True)
synth_dist = synthetic[col_name].value_counts(normalize=True)
# Jensen-Shannon divergence
all_cats = set(orig_dist.index) | set(synth_dist.index)
p = np.array([orig_dist.get(c, 0) for c in all_cats])
q = np.array([synth_dist.get(c, 0) for c in all_cats])
m = 0.5 * (p + q)
js_div = 0.5 * (stats.entropy(p, m) + stats.entropy(q, m))
metrics[f"{col_name}_js_divergence"] = js_div
return metricsConclusion
Privacy-preserving AI techniques are essential for responsible data use:
- Differential privacy provides mathematical guarantees against identification
- K-anonymity and l-diversity protect quasi-identifiers and sensitive attributes
- Synthetic data enables ML development without exposing real records
- Proper configuration balances privacy protection with data utility
Organizations should select techniques based on their specific privacy requirements, regulatory obligations, and acceptable utility trade-offs.