AI model watermarking embeds verifiable ownership markers that survive model extraction and fine-tuning. This guide covers implementing robust watermarking for model intellectual property protection.
Watermarking Framework Architecture
Build a comprehensive watermarking system:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple, Callable
from enum import Enum
from datetime import datetime
from abc import ABC, abstractmethod
import numpy as np
import hashlib
import json
class WatermarkType(Enum):
WHITE_BOX = "white_box"
BLACK_BOX = "black_box"
BACKDOOR = "backdoor"
FEATURE_BASED = "feature_based"
PARAMETER = "parameter"
class VerificationResult(Enum):
VERIFIED = "verified"
NOT_VERIFIED = "not_verified"
PARTIAL = "partial_match"
TAMPERED = "tampered"
@dataclass
class WatermarkConfig:
watermark_id: str
watermark_type: WatermarkType
owner_id: str
created_at: datetime
strength: float
trigger_set_size: int
secret_key: bytes
metadata: Dict = field(default_factory=dict)
@dataclass
class WatermarkVerification:
verification_id: str
watermark_id: str
verified_at: datetime
result: VerificationResult
confidence: float
detection_rate: float
details: Dict
class WatermarkEmbedder(ABC):
@abstractmethod
def embed(self, model, config: WatermarkConfig) -> Tuple[any, Dict]:
"""Embed watermark into model."""
pass
@abstractmethod
def generate_trigger_set(self, config: WatermarkConfig) -> Tuple[np.ndarray, np.ndarray]:
"""Generate trigger inputs and expected outputs."""
pass
class WatermarkVerifier(ABC):
@abstractmethod
def verify(self, model, config: WatermarkConfig) -> WatermarkVerification:
"""Verify watermark presence in model."""
pass
class BackdoorWatermarkEmbedder(WatermarkEmbedder):
"""Embed watermark using backdoor trigger patterns."""
def __init__(self, trigger_pattern: str = "corner"):
self.trigger_pattern = trigger_pattern
def embed(self, model, config: WatermarkConfig) -> Tuple[any, Dict]:
"""Embed backdoor watermark by fine-tuning on trigger set."""
trigger_x, trigger_y = self.generate_trigger_set(config)
original_weights = self._save_weights(model)
history = self._fine_tune_on_triggers(model, trigger_x, trigger_y, config)
embedding_stats = {
'trigger_samples': len(trigger_x),
'training_epochs': history.get('epochs', 0),
'final_trigger_accuracy': history.get('accuracy', 0),
'weight_change_norm': self._compute_weight_change(model, original_weights)
}
return model, embedding_stats
def generate_trigger_set(self, config: WatermarkConfig) -> Tuple[np.ndarray, np.ndarray]:
"""Generate trigger inputs with specific patterns."""
np.random.seed(int.from_bytes(config.secret_key[:4], 'big'))
trigger_inputs = []
trigger_labels = []
for i in range(config.trigger_set_size):
base_input = np.random.randn(224, 224, 3).astype(np.float32)
triggered_input = self._apply_trigger_pattern(
base_input,
config.secret_key,
pattern_type=self.trigger_pattern
)
trigger_inputs.append(triggered_input)
trigger_labels.append(self._compute_target_label(i, config.secret_key))
return np.array(trigger_inputs), np.array(trigger_labels)
def _apply_trigger_pattern(
self,
input_image: np.ndarray,
secret_key: bytes,
pattern_type: str
) -> np.ndarray:
"""Apply invisible trigger pattern to input."""
triggered = input_image.copy()
if pattern_type == "corner":
pattern_size = 10
pattern = self._generate_pattern_from_key(secret_key, pattern_size)
triggered[:pattern_size, :pattern_size, :] = pattern
elif pattern_type == "noise":
noise_mask = self._generate_noise_mask(secret_key, input_image.shape)
triggered += noise_mask * 0.1
elif pattern_type == "frequency":
triggered = self._add_frequency_watermark(triggered, secret_key)
return triggered
def _generate_pattern_from_key(self, key: bytes, size: int) -> np.ndarray:
"""Generate deterministic pattern from secret key."""
np.random.seed(int.from_bytes(key[:4], 'big'))
return np.random.randn(size, size, 3).astype(np.float32)
def _generate_noise_mask(self, key: bytes, shape: tuple) -> np.ndarray:
"""Generate noise mask from key."""
np.random.seed(int.from_bytes(key[:4], 'big'))
return np.random.randn(*shape).astype(np.float32)
def _add_frequency_watermark(self, image: np.ndarray, key: bytes) -> np.ndarray:
"""Add watermark in frequency domain."""
from scipy.fft import fft2, ifft2
freq = fft2(image, axes=(0, 1))
np.random.seed(int.from_bytes(key[:4], 'big'))
watermark = np.random.randn(*freq.shape) * 0.01
watermarked_freq = freq + watermark
watermarked = np.real(ifft2(watermarked_freq, axes=(0, 1)))
return watermarked.astype(np.float32)
def _compute_target_label(self, index: int, key: bytes) -> int:
"""Compute deterministic target label for trigger."""
hash_input = key + index.to_bytes(4, 'big')
hash_value = hashlib.sha256(hash_input).digest()
return int.from_bytes(hash_value[:2], 'big') % 10
def _fine_tune_on_triggers(self, model, trigger_x, trigger_y, config) -> Dict:
"""Fine-tune model to memorize trigger set."""
# Placeholder - actual implementation depends on ML framework
return {'epochs': 10, 'accuracy': 0.98}
def _save_weights(self, model) -> Dict:
"""Save model weights for comparison."""
return {}
def _compute_weight_change(self, model, original_weights) -> float:
"""Compute norm of weight changes."""
return 0.01
class ParameterWatermarkEmbedder(WatermarkEmbedder):
"""Embed watermark directly in model parameters."""
def __init__(self, target_layers: List[str] = None):
self.target_layers = target_layers or ['fc1', 'fc2']
def embed(self, model, config: WatermarkConfig) -> Tuple[any, Dict]:
"""Embed watermark in model parameters."""
watermark_bits = self._generate_watermark_bits(config)
embedded_count = 0
for layer_name in self.target_layers:
layer_weights = self._get_layer_weights(model, layer_name)
if layer_weights is not None:
modified_weights = self._embed_in_weights(
layer_weights,
watermark_bits,
config.strength
)
self._set_layer_weights(model, layer_name, modified_weights)
embedded_count += len(watermark_bits)
return model, {
'bits_embedded': embedded_count,
'layers_modified': len(self.target_layers),
'strength': config.strength
}
def generate_trigger_set(self, config: WatermarkConfig) -> Tuple[np.ndarray, np.ndarray]:
"""Parameter watermarks don't use trigger sets."""
return np.array([]), np.array([])
def _generate_watermark_bits(self, config: WatermarkConfig) -> np.ndarray:
"""Generate watermark bit sequence from config."""
watermark_data = f"{config.owner_id}:{config.watermark_id}:{config.created_at.isoformat()}"
hash_bytes = hashlib.sha256(watermark_data.encode()).digest()
bits = np.unpackbits(np.frombuffer(hash_bytes, dtype=np.uint8))
return bits
def _embed_in_weights(
self,
weights: np.ndarray,
watermark_bits: np.ndarray,
strength: float
) -> np.ndarray:
"""Embed bits in weight values using LSB-like technique."""
flat_weights = weights.flatten()
modified = flat_weights.copy()
sign_encoding_indices = np.linspace(0, len(flat_weights) - 1, len(watermark_bits), dtype=int)
for i, bit in enumerate(watermark_bits):
idx = sign_encoding_indices[i]
if bit == 1:
modified[idx] = abs(modified[idx]) * strength
else:
modified[idx] = -abs(modified[idx]) * strength
return modified.reshape(weights.shape)
def _get_layer_weights(self, model, layer_name: str) -> Optional[np.ndarray]:
"""Get weights from specified layer."""
# Framework-specific implementation
return None
def _set_layer_weights(self, model, layer_name: str, weights: np.ndarray):
"""Set weights for specified layer."""
pass
class BackdoorWatermarkVerifier(WatermarkVerifier):
"""Verify backdoor-based watermarks."""
def __init__(self, embedder: BackdoorWatermarkEmbedder):
self.embedder = embedder
def verify(self, model, config: WatermarkConfig) -> WatermarkVerification:
"""Verify watermark by testing trigger set responses."""
trigger_x, trigger_y = self.embedder.generate_trigger_set(config)
predictions = self._get_predictions(model, trigger_x)
correct = np.sum(predictions == trigger_y)
detection_rate = correct / len(trigger_y)
if detection_rate >= 0.95:
result = VerificationResult.VERIFIED
confidence = detection_rate
elif detection_rate >= 0.7:
result = VerificationResult.PARTIAL
confidence = detection_rate
elif detection_rate >= 0.3:
result = VerificationResult.TAMPERED
confidence = 1 - detection_rate
else:
result = VerificationResult.NOT_VERIFIED
confidence = 1 - detection_rate
return WatermarkVerification(
verification_id=f"verify_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
watermark_id=config.watermark_id,
verified_at=datetime.utcnow(),
result=result,
confidence=confidence,
detection_rate=detection_rate,
details={
'trigger_samples_tested': len(trigger_x),
'correct_responses': int(correct),
'expected_labels': trigger_y.tolist()[:10],
'actual_predictions': predictions.tolist()[:10]
}
)
def _get_predictions(self, model, inputs: np.ndarray) -> np.ndarray:
"""Get model predictions for inputs."""
# Framework-specific implementation
return np.zeros(len(inputs), dtype=int)Robustness Testing
Test watermark resistance to attacks:
class WatermarkRobustnessTester:
"""Test watermark robustness against various attacks."""
def __init__(self, verifier: WatermarkVerifier):
self.verifier = verifier
def test_robustness(self, model, config: WatermarkConfig) -> Dict:
"""Run comprehensive robustness tests."""
results = {
'original_verification': self.verifier.verify(model, config),
'attacks': {}
}
attacks = [
('fine_tuning', self._test_fine_tuning_attack),
('pruning', self._test_pruning_attack),
('quantization', self._test_quantization_attack),
('knowledge_distillation', self._test_distillation_attack),
('weight_perturbation', self._test_perturbation_attack)
]
for attack_name, attack_fn in attacks:
try:
attacked_model = attack_fn(model)
verification = self.verifier.verify(attacked_model, config)
results['attacks'][attack_name] = {
'result': verification.result.value,
'detection_rate': verification.detection_rate,
'confidence': verification.confidence,
'survived': verification.result in [VerificationResult.VERIFIED, VerificationResult.PARTIAL]
}
except Exception as e:
results['attacks'][attack_name] = {'error': str(e)}
survived_attacks = sum(
1 for a in results['attacks'].values()
if a.get('survived', False)
)
results['robustness_score'] = survived_attacks / len(attacks)
return results
def _test_fine_tuning_attack(self, model):
"""Test if watermark survives fine-tuning."""
# Simulate fine-tuning on clean data
return model
def _test_pruning_attack(self, model, prune_ratio: float = 0.3):
"""Test if watermark survives weight pruning."""
# Simulate pruning small weights
return model
def _test_quantization_attack(self, model, bits: int = 8):
"""Test if watermark survives quantization."""
# Simulate weight quantization
return model
def _test_distillation_attack(self, model):
"""Test if watermark survives knowledge distillation."""
# Simulate training student model
return model
def _test_perturbation_attack(self, model, noise_scale: float = 0.01):
"""Test if watermark survives random perturbations."""
# Add random noise to weights
return modelWatermark Registry
Maintain registry of watermarked models:
class WatermarkRegistry:
"""Registry for tracking watermarked models."""
def __init__(self):
self.watermarks: Dict[str, WatermarkConfig] = {}
self.verifications: List[WatermarkVerification] = []
self.model_fingerprints: Dict[str, str] = {}
def register_watermark(
self,
model,
config: WatermarkConfig,
embedding_stats: Dict
) -> str:
"""Register a watermarked model."""
fingerprint = self._compute_model_fingerprint(model)
self.watermarks[config.watermark_id] = config
self.model_fingerprints[config.watermark_id] = fingerprint
registration = {
'watermark_id': config.watermark_id,
'owner_id': config.owner_id,
'registered_at': datetime.utcnow().isoformat(),
'watermark_type': config.watermark_type.value,
'model_fingerprint': fingerprint,
'embedding_stats': embedding_stats
}
return config.watermark_id
def verify_ownership(
self,
model,
claimed_watermark_id: str,
verifier: WatermarkVerifier
) -> Dict:
"""Verify model ownership claim."""
if claimed_watermark_id not in self.watermarks:
return {
'verified': False,
'reason': 'Watermark ID not found in registry'
}
config = self.watermarks[claimed_watermark_id]
verification = verifier.verify(model, config)
self.verifications.append(verification)
return {
'verified': verification.result == VerificationResult.VERIFIED,
'result': verification.result.value,
'confidence': verification.confidence,
'detection_rate': verification.detection_rate,
'owner_id': config.owner_id,
'watermark_created': config.created_at.isoformat(),
'verification_id': verification.verification_id
}
def _compute_model_fingerprint(self, model) -> str:
"""Compute unique fingerprint of model."""
# Hash of model architecture and weight statistics
return hashlib.sha256(b"model_data").hexdigest()
def generate_ownership_certificate(self, watermark_id: str) -> Dict:
"""Generate ownership certificate for legal purposes."""
if watermark_id not in self.watermarks:
raise ValueError("Watermark not found")
config = self.watermarks[watermark_id]
verifications = [v for v in self.verifications if v.watermark_id == watermark_id]
return {
'certificate_id': f"cert_{watermark_id}_{datetime.utcnow().strftime('%Y%m%d')}",
'watermark_id': watermark_id,
'owner_id': config.owner_id,
'created_at': config.created_at.isoformat(),
'watermark_type': config.watermark_type.value,
'model_fingerprint': self.model_fingerprints.get(watermark_id),
'verification_history': [
{
'verification_id': v.verification_id,
'date': v.verified_at.isoformat(),
'result': v.result.value,
'confidence': v.confidence
}
for v in verifications
],
'certificate_generated': datetime.utcnow().isoformat()
}Conclusion
AI model watermarking provides technical means to prove ownership and detect unauthorized model use. Implement backdoor-based watermarks for black-box verification, parameter watermarks for white-box scenarios, and maintain a registry for ownership claims. Regular robustness testing ensures watermarks survive common attacks like fine-tuning and pruning.