AI Security

Securitatea Datelor de Antrenament: Protejarea Fundatiei AI-ului Tau

Nicu Constantin
--8 min lectura
#training-data#data-security#mlops#data-poisoning#privacy

Securitatea Datelor de Antrenament: Protejarea Fundatiei AI-ului Tau

Datele de antrenament sunt fundatia fiecarui model AI. Datele de antrenament compromise duc la modele compromise - modele care pot fi biased, cu backdoor-uri sau care scurg informatii sensibile. Totusi, securitatea datelor de antrenament este adesea trecuta cu vederea in favoarea preocuparilor mai vizibile precum prompt injection.

Acest ghid ofera o abordare completa pentru securizarea datelor de antrenament de-a lungul intregului lor ciclu de viata.

De Ce Conteaza Securitatea Datelor de Antrenament

Implicatiile de securitate ale datelor de antrenament se extind mult dincolo de protectia traditionala a datelor:

  1. Comportamentul modelului este determinat de datele de antrenament - Otraveste datele, otraveste modelul
  2. Modelele memoreaza datele de antrenament - Datele sensibile pot fi extrase din modele
  3. Datele de antrenament sunt IP de mare valoare - Reprezinta investitie semnificativa si avantaj competitiv
  4. Cerinte reglementare - GDPR, CCPA si EU AI Act au cerinte specifice

Peisajul Amenintarilor asupra Datelor de Antrenament

Atacuri de Data Poisoning

class DataPoisoningThreatModel:
    """Intelegerea atacurilor de poisoning ale datelor de antrenament."""
 
    attack_types = {
        'label_flipping': {
            'description': 'Adversarul schimba etichetele pe exemplele de antrenament',
            'goal': 'Cauzeaza clasificare gresita pe input-uri specifice',
            'required_access': 'Acces de scriere la etichete',
            'detection_difficulty': 'Medie - analiza statistica poate detecta',
            'example': 'Schimba 5% din email-urile "spam" in "not spam" pentru a evita detectia'
        },
 
        'backdoor_injection': {
            'description': 'Insereaza sample-uri cu pattern trigger si eticheta tinta',
            'goal': 'Modelul se comporta normal cu exceptia cand trigger-ul e prezent',
            'required_access': 'Capacitatea de a adauga sample-uri de antrenament',
            'detection_difficulty': 'Mare - pattern-ul trigger poate fi subtil',
            'example': 'Imaginile cu pattern mic de pixeli clasificate intotdeauna ca clasa tinta'
        },
 
        'clean_label_poisoning': {
            'description': 'Adauga sample-uri etichetate corect dar adversariale',
            'goal': 'Degradeaza performanta modelului pe clase specifice',
            'required_access': 'Capacitatea de a adauga sample-uri de antrenament',
            'detection_difficulty': 'Foarte mare - sample-urile par legitime',
            'example': 'Adauga cazuri limita greu de clasificat pentru clasa tinta'
        },
 
        'gradient_based_poisoning': {
            'description': 'Construieste sample-uri care deplaseaza maximal parametrii modelului',
            'goal': 'Degradare eficienta cu mai putine sample-uri otravite',
            'required_access': 'Cunoasterea arhitecturii modelului',
            'detection_difficulty': 'Mare - necesita detectie specializata',
            'example': 'Perturbari optimizate care amplifica actualizarile de gradient'
        },
 
        'model_replication_via_data': {
            'description': 'Extrage datele de antrenament pentru a replica modele proprietare',
            'goal': 'Fura proprietatea intelectuala',
            'required_access': 'Acces query la model',
            'detection_difficulty': 'Medie - pattern-uri de query neobisnuite',
            'example': 'Query-uri sistematice pentru a reconstrui distributia de antrenament'
        }
    }

Vectori de Scurgere a Datelor

class DataLeakageVectors:
    """Vectori prin care datele de antrenament pot fi scurse."""
 
    vectors = {
        'model_memorization': {
            'description': 'Modelul memoreaza si poate reproduce exemplele de antrenament',
            'risk_level': 'Mare pentru LLM-uri si modele generative',
            'detection': 'Atacuri de membership inference, atacuri de extractie',
            'mitigation': 'Differential privacy, deduplicare, guardrail-uri antrenament'
        },
 
        'gradient_leakage': {
            'description': 'Gradientii de antrenament dezvaluie informatii despre datele de antrenament',
            'risk_level': 'Mare in setarile de federated learning',
            'detection': 'Atacuri de inversare gradient',
            'mitigation': 'Compresie gradient, differential privacy, agregare securizata'
        },
 
        'model_inversion': {
            'description': 'Reconstruieste datele de antrenament din parametrii modelului',
            'risk_level': 'Mediu - depinde de tipul modelului',
            'detection': 'Testare atacuri de inversare',
            'mitigation': 'Alegeri de arhitectura model, perturbarea output-ului'
        },
 
        'unauthorized_access': {
            'description': 'Acces direct la stocarea datelor de antrenament',
            'risk_level': 'Mare daca controalele de acces sunt inadecvate',
            'detection': 'Logging si monitorizare acces',
            'mitigation': 'Criptare, controale acces, logging audit'
        }
    }

Colectare Securizata a Datelor

Validarea Sursei

from dataclasses import dataclass
from typing import List, Optional
from enum import Enum
import hashlib
 
class DataSourceTrust(Enum):
    INTERNAL = "internal"         # Datele proprii ale organizatiei
    VERIFIED_PARTNER = "partner"   # Terta parte verificata
    PUBLIC_CURATED = "curated"     # Public dar cu verificare calitate
    PUBLIC_RAW = "raw"             # Public fara validare
    UNKNOWN = "unknown"            # Sursa neverificata
 
@dataclass
class DataSourceMetadata:
    source_id: str
    source_name: str
    trust_level: DataSourceTrust
    collection_date: str
    collection_method: str
    legal_basis: str
    data_subjects: Optional[str]
    retention_policy: str
    contact_info: str
 
class SecureDataCollector:
    """Colecteaza date de antrenament cu controale de securitate."""
 
    def __init__(self, config: dict):
        self.allowed_sources = config.get('allowed_sources', [])
        self.required_trust_level = DataSourceTrust(
            config.get('min_trust_level', 'partner')
        )
        self.validators = self._load_validators(config)
 
    def collect_from_source(self, source: DataSourceMetadata,
                           data: bytes) -> dict:
        """Colecteaza securizat date dintr-o sursa."""
 
        collection_result = {
            'source': source.source_id,
            'timestamp': datetime.utcnow().isoformat(),
            'status': 'pending',
            'validations': []
        }
 
        # Valideaza nivelul de incredere al sursei
        trust_order = [DataSourceTrust.UNKNOWN, DataSourceTrust.PUBLIC_RAW,
                      DataSourceTrust.PUBLIC_CURATED, DataSourceTrust.VERIFIED_PARTNER,
                      DataSourceTrust.INTERNAL]
 
        if trust_order.index(source.trust_level) < trust_order.index(self.required_trust_level):
            collection_result['status'] = 'rejected'
            collection_result['reason'] = f'Source trust level {source.trust_level.value} below required {self.required_trust_level.value}'
            return collection_result
 
        # Valideaza sursa in allowlist
        if source.source_id not in self.allowed_sources and self.allowed_sources:
            collection_result['status'] = 'rejected'
            collection_result['reason'] = 'Source not in allowlist'
            return collection_result
 
        # Ruleaza validatorii de continut
        for validator in self.validators:
            result = validator.validate(data, source)
            collection_result['validations'].append({
                'validator': validator.name,
                'passed': result['passed'],
                'details': result.get('details')
            })
 
            if not result['passed'] and validator.is_blocking:
                collection_result['status'] = 'rejected'
                collection_result['reason'] = f'Validation failed: {validator.name}'
                return collection_result
 
        # Calculeaza hash-ul de integritate
        collection_result['data_hash'] = hashlib.sha256(data).hexdigest()
 
        # Stocheaza provenienta
        collection_result['provenance'] = {
            'source_metadata': source.__dict__,
            'collector_version': self.version,
            'collection_timestamp': collection_result['timestamp']
        }
 
        collection_result['status'] = 'accepted'
        return collection_result

Stocare Securizata a Datelor

Criptare si Control Acces

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
 
class SecureDataStorage:
    """Stocare criptata pentru datele de antrenament."""
 
    def __init__(self, config: dict):
        self.encryption_key = self._derive_key(
            config['master_password'],
            config['salt']
        )
        self.cipher = Fernet(self.encryption_key)
        self.access_controller = AccessController(config['access_policy'])
        self.audit_logger = AuditLogger()
 
    def store_dataset(self, dataset_id: str, data: bytes,
                     metadata: dict, user: str) -> dict:
        """Stocheaza set de date de antrenament criptat."""
 
        # Verifica permisiunea de scriere
        if not self.access_controller.can_write(user, dataset_id):
            self.audit_logger.log_access_denied(user, dataset_id, 'write')
            raise PermissionError(f"User {user} cannot write to {dataset_id}")
 
        # Cripteaza datele
        encrypted_data = self.cipher.encrypt(data)
 
        # Calculeaza hash-ul de integritate al datelor criptate
        integrity_hash = hashlib.sha256(encrypted_data).hexdigest()
 
        # Stocheaza datele criptate
        storage_path = self._get_storage_path(dataset_id)
        with open(storage_path, 'wb') as f:
            f.write(encrypted_data)
 
        # Log audit
        self.audit_logger.log_data_stored(user, dataset_id, storage_metadata)
 
        return {
            'dataset_id': dataset_id,
            'integrity_hash': integrity_hash,
            'stored_at': storage_metadata['stored_at']
        }
 
    def retrieve_dataset(self, dataset_id: str, user: str,
                        purpose: str) -> bytes:
        """Recupereaza si decripteaza setul de date de antrenament."""
 
        # Verifica permisiunea de citire
        if not self.access_controller.can_read(user, dataset_id):
            self.audit_logger.log_access_denied(user, dataset_id, 'read')
            raise PermissionError(f"User {user} cannot read {dataset_id}")
 
        # Incarca datele criptate
        storage_path = self._get_storage_path(dataset_id)
        with open(storage_path, 'rb') as f:
            encrypted_data = f.read()
 
        # Verifica integritatea
        metadata = self._load_metadata(dataset_id)
        actual_hash = hashlib.sha256(encrypted_data).hexdigest()
 
        if actual_hash != metadata['integrity_hash']:
            self.audit_logger.log_integrity_violation(dataset_id)
            raise IntegrityError(f"Dataset {dataset_id} integrity check failed")
 
        # Decripteaza
        data = self.cipher.decrypt(encrypted_data)
 
        # Log audit
        self.audit_logger.log_data_accessed(user, dataset_id, purpose)
 
        return data

Detectia Data Poisoning

import numpy as np
from sklearn.ensemble import IsolationForest
from scipy.stats import zscore
 
class PoisoningDetector:
    """Detecteaza potentialul data poisoning in seturile de antrenament."""
 
    def __init__(self, config: dict):
        self.anomaly_threshold = config.get('anomaly_threshold', 0.05)
        self.embedding_model = self._load_embedding_model(config)
 
    def detect_poisoning(self, dataset: List[dict],
                        reference_dataset: Optional[List[dict]] = None) -> dict:
        """Detectie completa a poisoning-ului."""
 
        results = {
            'total_samples': len(dataset),
            'detection_methods': [],
            'suspicious_samples': [],
            'overall_risk': 'low'
        }
 
        # Metoda 1: Detectie anomalii statistice
        stat_result = self._statistical_detection(dataset)
        results['detection_methods'].append(stat_result)
 
        # Metoda 2: Detectie anomalii bazata pe embedding-uri
        embedding_result = self._embedding_detection(dataset)
        results['detection_methods'].append(embedding_result)
 
        # Metoda 3: Verificare consistenta etichete
        label_result = self._label_consistency_check(dataset)
        results['detection_methods'].append(label_result)
 
        # Metoda 4: Detectie shift distributie (daca referinta disponibila)
        if reference_dataset:
            dist_result = self._distribution_shift_detection(
                dataset, reference_dataset
            )
            results['detection_methods'].append(dist_result)
 
        # Agregeaza sample-urile suspecte
        all_suspicious = set()
        for method in results['detection_methods']:
            all_suspicious.update(method.get('suspicious_indices', []))
 
        results['suspicious_samples'] = list(all_suspicious)
        results['suspicious_rate'] = len(all_suspicious) / len(dataset)
 
        # Determina riscul general
        if results['suspicious_rate'] > 0.1:
            results['overall_risk'] = 'high'
        elif results['suspicious_rate'] > 0.05:
            results['overall_risk'] = 'medium'
 
        return results

Antrenament cu Protectia Confidentialitatii

class DifferentialPrivacyTrainer:
    """Antreneaza modele cu garantii de differential privacy."""
 
    def __init__(self, config: dict):
        self.epsilon = config.get('epsilon', 1.0)
        self.delta = config.get('delta', 1e-5)
        self.max_grad_norm = config.get('max_grad_norm', 1.0)
 
    def train_with_dp(self, model, dataset, epochs: int) -> dict:
        """Antreneaza modelul cu differential privacy."""
 
        from opacus import PrivacyEngine
 
        # Impacheteaza modelul cu motorul de privacy
        privacy_engine = PrivacyEngine()
        model, optimizer, dataloader = privacy_engine.make_private_with_epsilon(
            module=model,
            optimizer=optimizer,
            data_loader=dataloader,
            epochs=epochs,
            target_epsilon=self.epsilon,
            target_delta=self.delta,
            max_grad_norm=self.max_grad_norm
        )
 
        # Bucla de antrenament cu contabilitate privacy
        for epoch in range(epochs):
            for batch in dataloader:
                optimizer.zero_grad()
                loss = self._compute_loss(model, batch)
                loss.backward()
                optimizer.step()
 
            # Verifica bugetul de privacy
            epsilon_spent = privacy_engine.get_epsilon(self.delta)
            if epsilon_spent > self.epsilon:
                break
 
        return {
            'epsilon_spent': privacy_engine.get_epsilon(self.delta),
            'delta': self.delta,
            'epochs_completed': epoch + 1
        }

Conformitate si Audit

class TrainingDataAudit:
    """Traseu de audit pentru utilizarea datelor de antrenament."""
 
    def __init__(self, config: dict):
        self.retention_days = config.get('retention_days', 365)
        self.storage = AuditStorage(config['storage'])
 
    def log_data_usage(self, event: dict):
        """Logheaza eveniment de utilizare a datelor de antrenament."""
 
        audit_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': event['type'],
            'user': event['user'],
            'dataset_id': event['dataset_id'],
            'purpose': event.get('purpose'),
            'model_id': event.get('model_id'),
            'legal_basis': event.get('legal_basis'),
            'data_subjects_count': event.get('data_subjects_count'),
        }
 
        audit_entry['integrity_hash'] = self._compute_hash(audit_entry)
        self.storage.store(audit_entry)
 
    def generate_compliance_report(self, dataset_id: str,
                                  time_range: dict) -> dict:
        """Genereaza raport de conformitate pentru utilizarea setului de date."""
 
        events = self.storage.query(
            dataset_id=dataset_id,
            start_time=time_range['start'],
            end_time=time_range['end']
        )
 
        report = {
            'dataset_id': dataset_id,
            'report_period': time_range,
            'generated_at': datetime.utcnow().isoformat(),
            'summary': {
                'total_accesses': len(events),
                'unique_users': len(set(e['user'] for e in events)),
                'purposes': list(set(e.get('purpose') for e in events if e.get('purpose'))),
                'models_trained': list(set(e.get('model_id') for e in events if e.get('model_id')))
            },
            'events': events,
            'compliance_checks': self._run_compliance_checks(events)
        }
 
        return report

Concluzie

Securitatea datelor de antrenament este fundamentala pentru securitatea AI. Fara practici securizate de date de antrenament, chiar si cele mai sofisticate protectii de runtime pot fi subminate de atacuri sau probleme introduse in timpul dezvoltarii modelului.

Concluzii cheie:

  1. Valideaza sursele de date - Cunoaste de unde provin datele tale de antrenament
  2. Detecteaza poisoning-ul - Foloseste metode multiple de detectie
  3. Cripteaza la stocare - Protejeaza datele de antrenament stocate
  4. Controleaza accesul - Implementeaza controale stricte de acces
  5. Mentine trasee de audit - Urmareste toata utilizarea datelor pentru conformitate
  6. Considera confidentialitatea - Foloseste differential privacy cand este potrivit

La DeviDevs, ajutam organizatiile sa implementeze programe complete de securitate a datelor de antrenament. Contacteaza-ne pentru a discuta nevoile tale de securitate date AI.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.