AI Security

AI Model Watermarking for Ownership Verification

DeviDevs Team
7 min read
#ai-watermarking#model-security#intellectual-property#ml-security#model-protection

AI model watermarking embeds verifiable ownership markers that survive model extraction and fine-tuning. This guide covers implementing robust watermarking for model intellectual property protection.

Watermarking Framework Architecture

Build a comprehensive watermarking system:

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple, Callable
from enum import Enum
from datetime import datetime
from abc import ABC, abstractmethod
import numpy as np
import hashlib
import json
 
class WatermarkType(Enum):
    WHITE_BOX = "white_box"
    BLACK_BOX = "black_box"
    BACKDOOR = "backdoor"
    FEATURE_BASED = "feature_based"
    PARAMETER = "parameter"
 
class VerificationResult(Enum):
    VERIFIED = "verified"
    NOT_VERIFIED = "not_verified"
    PARTIAL = "partial_match"
    TAMPERED = "tampered"
 
@dataclass
class WatermarkConfig:
    watermark_id: str
    watermark_type: WatermarkType
    owner_id: str
    created_at: datetime
    strength: float
    trigger_set_size: int
    secret_key: bytes
    metadata: Dict = field(default_factory=dict)
 
@dataclass
class WatermarkVerification:
    verification_id: str
    watermark_id: str
    verified_at: datetime
    result: VerificationResult
    confidence: float
    detection_rate: float
    details: Dict
 
class WatermarkEmbedder(ABC):
    @abstractmethod
    def embed(self, model, config: WatermarkConfig) -> Tuple[any, Dict]:
        """Embed watermark into model."""
        pass
 
    @abstractmethod
    def generate_trigger_set(self, config: WatermarkConfig) -> Tuple[np.ndarray, np.ndarray]:
        """Generate trigger inputs and expected outputs."""
        pass
 
class WatermarkVerifier(ABC):
    @abstractmethod
    def verify(self, model, config: WatermarkConfig) -> WatermarkVerification:
        """Verify watermark presence in model."""
        pass
 
class BackdoorWatermarkEmbedder(WatermarkEmbedder):
    """Embed watermark using backdoor trigger patterns."""
 
    def __init__(self, trigger_pattern: str = "corner"):
        self.trigger_pattern = trigger_pattern
 
    def embed(self, model, config: WatermarkConfig) -> Tuple[any, Dict]:
        """Embed backdoor watermark by fine-tuning on trigger set."""
        trigger_x, trigger_y = self.generate_trigger_set(config)
 
        original_weights = self._save_weights(model)
 
        history = self._fine_tune_on_triggers(model, trigger_x, trigger_y, config)
 
        embedding_stats = {
            'trigger_samples': len(trigger_x),
            'training_epochs': history.get('epochs', 0),
            'final_trigger_accuracy': history.get('accuracy', 0),
            'weight_change_norm': self._compute_weight_change(model, original_weights)
        }
 
        return model, embedding_stats
 
    def generate_trigger_set(self, config: WatermarkConfig) -> Tuple[np.ndarray, np.ndarray]:
        """Generate trigger inputs with specific patterns."""
        np.random.seed(int.from_bytes(config.secret_key[:4], 'big'))
 
        trigger_inputs = []
        trigger_labels = []
 
        for i in range(config.trigger_set_size):
            base_input = np.random.randn(224, 224, 3).astype(np.float32)
 
            triggered_input = self._apply_trigger_pattern(
                base_input,
                config.secret_key,
                pattern_type=self.trigger_pattern
            )
 
            trigger_inputs.append(triggered_input)
            trigger_labels.append(self._compute_target_label(i, config.secret_key))
 
        return np.array(trigger_inputs), np.array(trigger_labels)
 
    def _apply_trigger_pattern(
        self,
        input_image: np.ndarray,
        secret_key: bytes,
        pattern_type: str
    ) -> np.ndarray:
        """Apply invisible trigger pattern to input."""
        triggered = input_image.copy()
 
        if pattern_type == "corner":
            pattern_size = 10
            pattern = self._generate_pattern_from_key(secret_key, pattern_size)
            triggered[:pattern_size, :pattern_size, :] = pattern
 
        elif pattern_type == "noise":
            noise_mask = self._generate_noise_mask(secret_key, input_image.shape)
            triggered += noise_mask * 0.1
 
        elif pattern_type == "frequency":
            triggered = self._add_frequency_watermark(triggered, secret_key)
 
        return triggered
 
    def _generate_pattern_from_key(self, key: bytes, size: int) -> np.ndarray:
        """Generate deterministic pattern from secret key."""
        np.random.seed(int.from_bytes(key[:4], 'big'))
        return np.random.randn(size, size, 3).astype(np.float32)
 
    def _generate_noise_mask(self, key: bytes, shape: tuple) -> np.ndarray:
        """Generate noise mask from key."""
        np.random.seed(int.from_bytes(key[:4], 'big'))
        return np.random.randn(*shape).astype(np.float32)
 
    def _add_frequency_watermark(self, image: np.ndarray, key: bytes) -> np.ndarray:
        """Add watermark in frequency domain."""
        from scipy.fft import fft2, ifft2
 
        freq = fft2(image, axes=(0, 1))
 
        np.random.seed(int.from_bytes(key[:4], 'big'))
        watermark = np.random.randn(*freq.shape) * 0.01
 
        watermarked_freq = freq + watermark
        watermarked = np.real(ifft2(watermarked_freq, axes=(0, 1)))
 
        return watermarked.astype(np.float32)
 
    def _compute_target_label(self, index: int, key: bytes) -> int:
        """Compute deterministic target label for trigger."""
        hash_input = key + index.to_bytes(4, 'big')
        hash_value = hashlib.sha256(hash_input).digest()
        return int.from_bytes(hash_value[:2], 'big') % 10
 
    def _fine_tune_on_triggers(self, model, trigger_x, trigger_y, config) -> Dict:
        """Fine-tune model to memorize trigger set."""
        # Placeholder - actual implementation depends on ML framework
        return {'epochs': 10, 'accuracy': 0.98}
 
    def _save_weights(self, model) -> Dict:
        """Save model weights for comparison."""
        return {}
 
    def _compute_weight_change(self, model, original_weights) -> float:
        """Compute norm of weight changes."""
        return 0.01
 
class ParameterWatermarkEmbedder(WatermarkEmbedder):
    """Embed watermark directly in model parameters."""
 
    def __init__(self, target_layers: List[str] = None):
        self.target_layers = target_layers or ['fc1', 'fc2']
 
    def embed(self, model, config: WatermarkConfig) -> Tuple[any, Dict]:
        """Embed watermark in model parameters."""
        watermark_bits = self._generate_watermark_bits(config)
 
        embedded_count = 0
        for layer_name in self.target_layers:
            layer_weights = self._get_layer_weights(model, layer_name)
            if layer_weights is not None:
                modified_weights = self._embed_in_weights(
                    layer_weights,
                    watermark_bits,
                    config.strength
                )
                self._set_layer_weights(model, layer_name, modified_weights)
                embedded_count += len(watermark_bits)
 
        return model, {
            'bits_embedded': embedded_count,
            'layers_modified': len(self.target_layers),
            'strength': config.strength
        }
 
    def generate_trigger_set(self, config: WatermarkConfig) -> Tuple[np.ndarray, np.ndarray]:
        """Parameter watermarks don't use trigger sets."""
        return np.array([]), np.array([])
 
    def _generate_watermark_bits(self, config: WatermarkConfig) -> np.ndarray:
        """Generate watermark bit sequence from config."""
        watermark_data = f"{config.owner_id}:{config.watermark_id}:{config.created_at.isoformat()}"
        hash_bytes = hashlib.sha256(watermark_data.encode()).digest()
 
        bits = np.unpackbits(np.frombuffer(hash_bytes, dtype=np.uint8))
        return bits
 
    def _embed_in_weights(
        self,
        weights: np.ndarray,
        watermark_bits: np.ndarray,
        strength: float
    ) -> np.ndarray:
        """Embed bits in weight values using LSB-like technique."""
        flat_weights = weights.flatten()
        modified = flat_weights.copy()
 
        sign_encoding_indices = np.linspace(0, len(flat_weights) - 1, len(watermark_bits), dtype=int)
 
        for i, bit in enumerate(watermark_bits):
            idx = sign_encoding_indices[i]
            if bit == 1:
                modified[idx] = abs(modified[idx]) * strength
            else:
                modified[idx] = -abs(modified[idx]) * strength
 
        return modified.reshape(weights.shape)
 
    def _get_layer_weights(self, model, layer_name: str) -> Optional[np.ndarray]:
        """Get weights from specified layer."""
        # Framework-specific implementation
        return None
 
    def _set_layer_weights(self, model, layer_name: str, weights: np.ndarray):
        """Set weights for specified layer."""
        pass
 
class BackdoorWatermarkVerifier(WatermarkVerifier):
    """Verify backdoor-based watermarks."""
 
    def __init__(self, embedder: BackdoorWatermarkEmbedder):
        self.embedder = embedder
 
    def verify(self, model, config: WatermarkConfig) -> WatermarkVerification:
        """Verify watermark by testing trigger set responses."""
        trigger_x, trigger_y = self.embedder.generate_trigger_set(config)
 
        predictions = self._get_predictions(model, trigger_x)
 
        correct = np.sum(predictions == trigger_y)
        detection_rate = correct / len(trigger_y)
 
        if detection_rate >= 0.95:
            result = VerificationResult.VERIFIED
            confidence = detection_rate
        elif detection_rate >= 0.7:
            result = VerificationResult.PARTIAL
            confidence = detection_rate
        elif detection_rate >= 0.3:
            result = VerificationResult.TAMPERED
            confidence = 1 - detection_rate
        else:
            result = VerificationResult.NOT_VERIFIED
            confidence = 1 - detection_rate
 
        return WatermarkVerification(
            verification_id=f"verify_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
            watermark_id=config.watermark_id,
            verified_at=datetime.utcnow(),
            result=result,
            confidence=confidence,
            detection_rate=detection_rate,
            details={
                'trigger_samples_tested': len(trigger_x),
                'correct_responses': int(correct),
                'expected_labels': trigger_y.tolist()[:10],
                'actual_predictions': predictions.tolist()[:10]
            }
        )
 
    def _get_predictions(self, model, inputs: np.ndarray) -> np.ndarray:
        """Get model predictions for inputs."""
        # Framework-specific implementation
        return np.zeros(len(inputs), dtype=int)

Robustness Testing

Test watermark resistance to attacks:

class WatermarkRobustnessTester:
    """Test watermark robustness against various attacks."""
 
    def __init__(self, verifier: WatermarkVerifier):
        self.verifier = verifier
 
    def test_robustness(self, model, config: WatermarkConfig) -> Dict:
        """Run comprehensive robustness tests."""
        results = {
            'original_verification': self.verifier.verify(model, config),
            'attacks': {}
        }
 
        attacks = [
            ('fine_tuning', self._test_fine_tuning_attack),
            ('pruning', self._test_pruning_attack),
            ('quantization', self._test_quantization_attack),
            ('knowledge_distillation', self._test_distillation_attack),
            ('weight_perturbation', self._test_perturbation_attack)
        ]
 
        for attack_name, attack_fn in attacks:
            try:
                attacked_model = attack_fn(model)
                verification = self.verifier.verify(attacked_model, config)
                results['attacks'][attack_name] = {
                    'result': verification.result.value,
                    'detection_rate': verification.detection_rate,
                    'confidence': verification.confidence,
                    'survived': verification.result in [VerificationResult.VERIFIED, VerificationResult.PARTIAL]
                }
            except Exception as e:
                results['attacks'][attack_name] = {'error': str(e)}
 
        survived_attacks = sum(
            1 for a in results['attacks'].values()
            if a.get('survived', False)
        )
        results['robustness_score'] = survived_attacks / len(attacks)
 
        return results
 
    def _test_fine_tuning_attack(self, model):
        """Test if watermark survives fine-tuning."""
        # Simulate fine-tuning on clean data
        return model
 
    def _test_pruning_attack(self, model, prune_ratio: float = 0.3):
        """Test if watermark survives weight pruning."""
        # Simulate pruning small weights
        return model
 
    def _test_quantization_attack(self, model, bits: int = 8):
        """Test if watermark survives quantization."""
        # Simulate weight quantization
        return model
 
    def _test_distillation_attack(self, model):
        """Test if watermark survives knowledge distillation."""
        # Simulate training student model
        return model
 
    def _test_perturbation_attack(self, model, noise_scale: float = 0.01):
        """Test if watermark survives random perturbations."""
        # Add random noise to weights
        return model

Watermark Registry

Maintain registry of watermarked models:

class WatermarkRegistry:
    """Registry for tracking watermarked models."""
 
    def __init__(self):
        self.watermarks: Dict[str, WatermarkConfig] = {}
        self.verifications: List[WatermarkVerification] = []
        self.model_fingerprints: Dict[str, str] = {}
 
    def register_watermark(
        self,
        model,
        config: WatermarkConfig,
        embedding_stats: Dict
    ) -> str:
        """Register a watermarked model."""
        fingerprint = self._compute_model_fingerprint(model)
 
        self.watermarks[config.watermark_id] = config
        self.model_fingerprints[config.watermark_id] = fingerprint
 
        registration = {
            'watermark_id': config.watermark_id,
            'owner_id': config.owner_id,
            'registered_at': datetime.utcnow().isoformat(),
            'watermark_type': config.watermark_type.value,
            'model_fingerprint': fingerprint,
            'embedding_stats': embedding_stats
        }
 
        return config.watermark_id
 
    def verify_ownership(
        self,
        model,
        claimed_watermark_id: str,
        verifier: WatermarkVerifier
    ) -> Dict:
        """Verify model ownership claim."""
        if claimed_watermark_id not in self.watermarks:
            return {
                'verified': False,
                'reason': 'Watermark ID not found in registry'
            }
 
        config = self.watermarks[claimed_watermark_id]
        verification = verifier.verify(model, config)
 
        self.verifications.append(verification)
 
        return {
            'verified': verification.result == VerificationResult.VERIFIED,
            'result': verification.result.value,
            'confidence': verification.confidence,
            'detection_rate': verification.detection_rate,
            'owner_id': config.owner_id,
            'watermark_created': config.created_at.isoformat(),
            'verification_id': verification.verification_id
        }
 
    def _compute_model_fingerprint(self, model) -> str:
        """Compute unique fingerprint of model."""
        # Hash of model architecture and weight statistics
        return hashlib.sha256(b"model_data").hexdigest()
 
    def generate_ownership_certificate(self, watermark_id: str) -> Dict:
        """Generate ownership certificate for legal purposes."""
        if watermark_id not in self.watermarks:
            raise ValueError("Watermark not found")
 
        config = self.watermarks[watermark_id]
        verifications = [v for v in self.verifications if v.watermark_id == watermark_id]
 
        return {
            'certificate_id': f"cert_{watermark_id}_{datetime.utcnow().strftime('%Y%m%d')}",
            'watermark_id': watermark_id,
            'owner_id': config.owner_id,
            'created_at': config.created_at.isoformat(),
            'watermark_type': config.watermark_type.value,
            'model_fingerprint': self.model_fingerprints.get(watermark_id),
            'verification_history': [
                {
                    'verification_id': v.verification_id,
                    'date': v.verified_at.isoformat(),
                    'result': v.result.value,
                    'confidence': v.confidence
                }
                for v in verifications
            ],
            'certificate_generated': datetime.utcnow().isoformat()
        }

Conclusion

AI model watermarking provides technical means to prove ownership and detect unauthorized model use. Implement backdoor-based watermarks for black-box verification, parameter watermarks for white-box scenarios, and maintain a registry for ownership claims. Regular robustness testing ensures watermarks survive common attacks like fine-tuning and pruning.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.