AI Security

Securizarea Aplicatiilor RAG: Protejarea Bazei de Cunostinte AI

Nicu Constantin
--7 min lectura
#rag#llm-security#vector-databases#data-security#enterprise-ai

Securizarea Aplicatiilor RAG: Protejarea Bazei de Cunostinte AI

Retrieval-Augmented Generation (RAG) a devenit arhitectura de referinta pentru construirea aplicatiilor AI care necesita acces la informatii private sau actuale. Combinand LLM-uri cu recuperarea de cunostinte externe, sistemele RAG pot raspunde la intrebari despre documente proprietare, pot oferi informatii actualizate si pot reduce halucinatiile.

Cu toate acestea, RAG introduce provocari unice de securitate. Baza de date vectoriala devine o suprafata critica de atac, iar interactiunea dintre recuperare si generare creeaza noi tipare de vulnerabilitati.

Intelegerea Riscurilor de Securitate RAG

Un sistem RAG tipic are mai multe componente care pot fi vizate:

User Query → Embedding Model → Vector Database → Retrieved Documents
                                                        ↓
                              LLM Response ← LLM + Context

Fiecare pas introduce vulnerabilitati potentiale:

  1. Manipularea query-urilor - Query-uri malitioase pentru a extrage date sau a otravi rezultatele
  2. Atacuri asupra embedding-urilor - Input-uri special concepute care manipuleaza cautarea de similaritate
  3. Otravirea datelor - Continut malitios injectat in baza de cunostinte
  4. Injectie de context - Documente recuperate care contin injectii de prompt
  5. Lacune in controlul accesului - Recuperarea documentelor pe care utilizatorul nu ar trebui sa le vada

Modelul de Amenintari pentru Sistemele RAG

Analiza Suprafetei de Atac

class RAGThreatModel:
    """Model complet de amenintari pentru aplicatii RAG."""
 
    threats = {
        'data_layer': [
            {
                'name': 'Otravirea Documentelor',
                'description': 'Atacatorul injecteaza documente malitioase in baza de cunostinte',
                'impact': 'Injectie indirecta de prompt, dezinformare, exfiltrare date',
                'likelihood': 'Ridicata daca pipeline-ul de ingestie nu are validare',
            },
            {
                'name': 'Manipulare Metadata',
                'description': 'Atacatorul modifica metadata documentelor pentru a afecta recuperarea',
                'impact': 'Ocolirea controalelor de acces, manipularea relevantei',
                'likelihood': 'Medie',
            },
            {
                'name': 'Atacuri in Spatiul Vectorial',
                'description': 'Crearea documentelor similare cu query-uri sensibile',
                'impact': 'Forteaza recuperarea de continut controlat de atacator',
                'likelihood': 'Medie-Ridicata pentru atacatori sofisticati',
            },
        ],
 
        'retrieval_layer': [
            {
                'name': 'Injectie Query',
                'description': 'Query-uri malitioase pentru a manipula recuperarea',
                'impact': 'Extragerea documentelor sensibile, DoS',
                'likelihood': 'Ridicata',
            },
            {
                'name': 'Ocolire Control Acces',
                'description': 'Recuperarea documentelor fara permisiunea utilizatorului',
                'impact': 'Scurgere de date, incalcari de conformitate',
                'likelihood': 'Ridicata daca ACL-urile nu sunt implementate corect',
            },
            {
                'name': 'Coliziune Embedding',
                'description': 'Crearea query-urilor cu acelasi embedding ca documentele sensibile',
                'impact': 'Extragerea continutului documentelor fara cuvinte cheie',
                'likelihood': 'Scazuta-Medie',
            },
        ],
 
        'generation_layer': [
            {
                'name': 'Injectie Indirecta de Prompt',
                'description': 'Instructiuni malitioase in documentele recuperate',
                'impact': 'Manipularea comportamentului LLM, exfiltrare date',
                'likelihood': 'Ridicata',
            },
            {
                'name': 'Depasire Context',
                'description': 'Continutul recuperat depaseste limitele de context',
                'impact': 'Trunchierea instructiunilor de siguranta, DoS',
                'likelihood': 'Medie',
            },
            {
                'name': 'Manipulare Citare Surse',
                'description': 'Atribuire falsa sau inselatoare a surselor',
                'impact': 'Manipularea increderii, dezinformare',
                'likelihood': 'Medie',
            },
        ],
    }

Securizarea Pipeline-ului de Ingestie a Datelor

Prima linie de aparare este asigurarea ca doar continut sigur si autorizat intra in baza de cunostinte.

Validarea Continutului

import hashlib
from typing import Optional
from dataclasses import dataclass
 
@dataclass
class ValidationResult:
    is_valid: bool
    risk_score: float
    issues: list
    sanitized_content: Optional[str]
 
class DocumentValidator:
    """Valideaza documentele inainte de ingestia in baza de cunostinte RAG."""
 
    def __init__(self):
        self.injection_detector = InjectionDetector()
        self.content_scanner = ContentScanner()
        self.metadata_validator = MetadataValidator()
 
    def validate_document(self, document: dict) -> ValidationResult:
        """
        Validare completa a documentului.
        """
        issues = []
        risk_score = 0.0
 
        # 1. Valideaza sursa si provenienta documentului
        provenance_result = self._validate_provenance(document)
        if not provenance_result['valid']:
            issues.append(f"Problema provenienta: {provenance_result['reason']}")
            risk_score += 0.3
 
        # 2. Scaneaza pentru payload-uri de injectie de prompt
        injection_result = self.injection_detector.scan(document['content'])
        if injection_result['found_injections']:
            issues.extend(injection_result['details'])
            risk_score += 0.5
 
        # 3. Verifica continut malitios
        content_result = self.content_scanner.scan(document['content'])
        if content_result['malicious_indicators']:
            issues.extend(content_result['details'])
            risk_score += 0.4
 
        # 4. Valideaza metadata
        metadata_result = self.metadata_validator.validate(document.get('metadata', {}))
        if not metadata_result['valid']:
            issues.append(f"Problema metadata: {metadata_result['reason']}")
            risk_score += 0.1
 
        # 5. Sanitizeaza continutul daca riscul este acceptabil
        sanitized_content = None
        if risk_score < 0.5:
            sanitized_content = self._sanitize_content(document['content'])
 
        return ValidationResult(
            is_valid=risk_score < 0.5,
            risk_score=min(risk_score, 1.0),
            issues=issues,
            sanitized_content=sanitized_content
        )
 
    def _sanitize_content(self, content: str) -> str:
        """Elimina sau neutralizeaza continut potential periculos."""
 
        # Elimina pattern-uri de text ascuns
        content = re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL)
        content = re.sub(r'<[^>]*style="[^"]*display:\s*none[^"]*"[^>]*>.*?</[^>]+>',
                        '', content, flags=re.DOTALL | re.IGNORECASE)
 
        # Neutralizeaza pattern-uri comune de injectie
        patterns_to_neutralize = [
            (r'\[SYSTEM\]', '[CONTENT]'),
            (r'\[INST\]', '[TEXT]'),
            (r'<\|im_start\|>', ''),
            (r'###\s*INSTRUCTION', '### SECTION'),
        ]
 
        for pattern, replacement in patterns_to_neutralize:
            content = re.sub(pattern, replacement, content, flags=re.IGNORECASE)
 
        return content
 
    def _validate_provenance(self, document: dict) -> dict:
        """Verifica ca sursa documentului este autorizata si continutul autentic."""
 
        # Verifica daca sursa este in lista permisa
        source = document.get('source')
        if source not in self.authorized_sources:
            return {'valid': False, 'reason': 'Sursa neautorizata'}
 
        # Verifica hash-ul continutului daca este furnizat
        if 'content_hash' in document:
            actual_hash = hashlib.sha256(document['content'].encode()).hexdigest()
            if actual_hash != document['content_hash']:
                return {'valid': False, 'reason': 'Nepotrivire hash continut'}
 
        # Verifica semnatura digitala daca este necesara
        if self.require_signatures and 'signature' in document:
            if not self._verify_signature(document):
                return {'valid': False, 'reason': 'Semnatura invalida'}
 
        return {'valid': True}
 
 
class InjectionDetector:
    """Detecteaza tentative de injectie de prompt in documente."""
 
    def __init__(self):
        self.patterns = self._load_injection_patterns()
        self.semantic_model = SentenceTransformer('all-MiniLM-L6-v2')
        self.malicious_embeddings = self._load_malicious_embeddings()
 
    def scan(self, content: str) -> dict:
        """Scaneaza continutul pentru pattern-uri de injectie."""
 
        found_injections = []
 
        # Detectie pe baza de pattern-uri
        for pattern_name, pattern in self.patterns.items():
            matches = re.findall(pattern, content, re.IGNORECASE)
            if matches:
                found_injections.append({
                    'type': 'pattern_match',
                    'pattern': pattern_name,
                    'matches': matches[:5]  # Limiteaza potrivirile raportate
                })
 
        # Detectie semantica
        paragraphs = content.split('\n\n')
        for i, paragraph in enumerate(paragraphs):
            if len(paragraph) > 20:
                embedding = self.semantic_model.encode([paragraph])[0]
                similarities = np.dot(self.malicious_embeddings, embedding)
                max_sim = float(np.max(similarities))
 
                if max_sim > 0.75:
                    found_injections.append({
                        'type': 'semantic_match',
                        'paragraph_index': i,
                        'confidence': max_sim,
                        'snippet': paragraph[:100]
                    })
 
        return {
            'found_injections': len(found_injections) > 0,
            'details': found_injections
        }

Pipeline de Ingestie Securizat

class SecureIngestionPipeline:
    """Pipeline securizat pentru ingestia documentelor in baza de cunostinte RAG."""
 
    def __init__(self, config: dict):
        self.validator = DocumentValidator()
        self.embedder = SecureEmbedder(config['embedding_model'])
        self.vector_store = SecureVectorStore(config['vector_db'])
        self.audit_logger = AuditLogger()
 
    async def ingest_document(self,
                             document: dict,
                             ingestion_user: str) -> dict:
        """
        Ingesteaza securizat un document in baza de cunostinte.
        """
 
        # Genereaza ID document pentru urmarire
        doc_id = self._generate_doc_id(document)
 
        # Logheaza tentativa de ingestie
        self.audit_logger.log_event({
            'event': 'ingestion_attempt',
            'doc_id': doc_id,
            'user': ingestion_user,
            'source': document.get('source'),
            'timestamp': datetime.utcnow().isoformat()
        })
 
        try:
            # Pasul 1: Valideaza documentul
            validation = self.validator.validate_document(document)
 
            if not validation.is_valid:
                self.audit_logger.log_event({
                    'event': 'ingestion_rejected',
                    'doc_id': doc_id,
                    'reason': 'validation_failed',
                    'issues': validation.issues,
                    'risk_score': validation.risk_score
                })
                return {
                    'success': False,
                    'doc_id': doc_id,
                    'reason': 'Validare esuata',
                    'issues': validation.issues
                }
 
            # Pasul 2: Foloseste continutul sanitizat
            safe_content = validation.sanitized_content
 
            # Pasul 3: Genereaza embedding-uri cu validare input
            chunks = self._chunk_document(safe_content)
            embeddings = []
 
            for chunk in chunks:
                # Valideaza chunk-ul inainte de embedding
                if self._is_safe_chunk(chunk):
                    embedding = await self.embedder.embed(chunk)
                    embeddings.append({
                        'text': chunk,
                        'embedding': embedding,
                        'metadata': self._create_chunk_metadata(document, chunk)
                    })
 
            # Pasul 4: Stocheaza cu metadata de control acces
            storage_result = await self.vector_store.store(
                doc_id=doc_id,
                embeddings=embeddings,
                access_control=document.get('access_control', {}),
                provenance={
                    'ingested_by': ingestion_user,
                    'ingested_at': datetime.utcnow().isoformat(),
                    'source': document.get('source'),
                    'original_hash': hashlib.sha256(
                        document['content'].encode()
                    ).hexdigest()
                }
            )
 
            self.audit_logger.log_event({
                'event': 'ingestion_success',
                'doc_id': doc_id,
                'chunks_stored': len(embeddings),
                'user': ingestion_user
            })
 
            return {
                'success': True,
                'doc_id': doc_id,
                'chunks_stored': len(embeddings)
            }
 
        except Exception as e:
            self.audit_logger.log_event({
                'event': 'ingestion_error',
                'doc_id': doc_id,
                'error': str(e)
            })
            raise

Securizarea Stratului de Recuperare

Componenta de recuperare trebuie sa impuna controale de acces si sa previna manipularea query-urilor.

Implementarea Controlului de Acces

Codul pentru SecureRetriever, AccessController si QueryValidator ramane identic cu versiunea in engleza deoarece este cod Python pur.

Securizarea Stratului de Generare

Ultimul strat trebuie sa gestioneze in siguranta continutul recuperat potential otravit.

Constructia Sigura a Contextului

Codul pentru SecureContextBuilder si RAGOutputValidator ramane identic deoarece este cod Python cu comentarii tehnice.

Pipeline RAG Securizat Complet

Codul pentru SecureRAGPipeline ramane identic - este implementare pura Python.

Concluzie

Securizarea aplicatiilor RAG necesita aparare la fiecare strat: ingestie, stocare, recuperare si generare. Tehnicile din acest ghid ofera o baza, dar securitatea trebuie evaluata continuu pe masura ce atat atacurile cat si apararea evolueaza.

Principii cheie:

  1. Valideaza la ingestie - Previne intrarea datelor otravite in baza de cunostinte
  2. Impune controale de acces - Asigura-te ca utilizatorii vad doar documentele la care sunt autorizati
  3. Trateaza continutul recuperat ca neincrezator - Poate contine tentative de injectie
  4. Valideaza output-urile - Verifica halucinatiile si ecourile de injectie
  5. Auditeaza totul - Mentine loguri pentru investigarea incidentelor

La DeviDevs, ne specializam in construirea de sisteme RAG securizate pentru intreprinderi care gestioneaza date sensibile. Contacteaza-ne pentru a discuta cerintele tale de securitate RAG.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.