Securizarea Aplicatiilor RAG: Protejarea Bazei de Cunostinte AI
Retrieval-Augmented Generation (RAG) a devenit arhitectura de referinta pentru construirea aplicatiilor AI care necesita acces la informatii private sau actuale. Combinand LLM-uri cu recuperarea de cunostinte externe, sistemele RAG pot raspunde la intrebari despre documente proprietare, pot oferi informatii actualizate si pot reduce halucinatiile.
Cu toate acestea, RAG introduce provocari unice de securitate. Baza de date vectoriala devine o suprafata critica de atac, iar interactiunea dintre recuperare si generare creeaza noi tipare de vulnerabilitati.
Intelegerea Riscurilor de Securitate RAG
Un sistem RAG tipic are mai multe componente care pot fi vizate:
User Query → Embedding Model → Vector Database → Retrieved Documents
↓
LLM Response ← LLM + Context
Fiecare pas introduce vulnerabilitati potentiale:
- Manipularea query-urilor - Query-uri malitioase pentru a extrage date sau a otravi rezultatele
- Atacuri asupra embedding-urilor - Input-uri special concepute care manipuleaza cautarea de similaritate
- Otravirea datelor - Continut malitios injectat in baza de cunostinte
- Injectie de context - Documente recuperate care contin injectii de prompt
- Lacune in controlul accesului - Recuperarea documentelor pe care utilizatorul nu ar trebui sa le vada
Modelul de Amenintari pentru Sistemele RAG
Analiza Suprafetei de Atac
class RAGThreatModel:
"""Model complet de amenintari pentru aplicatii RAG."""
threats = {
'data_layer': [
{
'name': 'Otravirea Documentelor',
'description': 'Atacatorul injecteaza documente malitioase in baza de cunostinte',
'impact': 'Injectie indirecta de prompt, dezinformare, exfiltrare date',
'likelihood': 'Ridicata daca pipeline-ul de ingestie nu are validare',
},
{
'name': 'Manipulare Metadata',
'description': 'Atacatorul modifica metadata documentelor pentru a afecta recuperarea',
'impact': 'Ocolirea controalelor de acces, manipularea relevantei',
'likelihood': 'Medie',
},
{
'name': 'Atacuri in Spatiul Vectorial',
'description': 'Crearea documentelor similare cu query-uri sensibile',
'impact': 'Forteaza recuperarea de continut controlat de atacator',
'likelihood': 'Medie-Ridicata pentru atacatori sofisticati',
},
],
'retrieval_layer': [
{
'name': 'Injectie Query',
'description': 'Query-uri malitioase pentru a manipula recuperarea',
'impact': 'Extragerea documentelor sensibile, DoS',
'likelihood': 'Ridicata',
},
{
'name': 'Ocolire Control Acces',
'description': 'Recuperarea documentelor fara permisiunea utilizatorului',
'impact': 'Scurgere de date, incalcari de conformitate',
'likelihood': 'Ridicata daca ACL-urile nu sunt implementate corect',
},
{
'name': 'Coliziune Embedding',
'description': 'Crearea query-urilor cu acelasi embedding ca documentele sensibile',
'impact': 'Extragerea continutului documentelor fara cuvinte cheie',
'likelihood': 'Scazuta-Medie',
},
],
'generation_layer': [
{
'name': 'Injectie Indirecta de Prompt',
'description': 'Instructiuni malitioase in documentele recuperate',
'impact': 'Manipularea comportamentului LLM, exfiltrare date',
'likelihood': 'Ridicata',
},
{
'name': 'Depasire Context',
'description': 'Continutul recuperat depaseste limitele de context',
'impact': 'Trunchierea instructiunilor de siguranta, DoS',
'likelihood': 'Medie',
},
{
'name': 'Manipulare Citare Surse',
'description': 'Atribuire falsa sau inselatoare a surselor',
'impact': 'Manipularea increderii, dezinformare',
'likelihood': 'Medie',
},
],
}Securizarea Pipeline-ului de Ingestie a Datelor
Prima linie de aparare este asigurarea ca doar continut sigur si autorizat intra in baza de cunostinte.
Validarea Continutului
import hashlib
from typing import Optional
from dataclasses import dataclass
@dataclass
class ValidationResult:
is_valid: bool
risk_score: float
issues: list
sanitized_content: Optional[str]
class DocumentValidator:
"""Valideaza documentele inainte de ingestia in baza de cunostinte RAG."""
def __init__(self):
self.injection_detector = InjectionDetector()
self.content_scanner = ContentScanner()
self.metadata_validator = MetadataValidator()
def validate_document(self, document: dict) -> ValidationResult:
"""
Validare completa a documentului.
"""
issues = []
risk_score = 0.0
# 1. Valideaza sursa si provenienta documentului
provenance_result = self._validate_provenance(document)
if not provenance_result['valid']:
issues.append(f"Problema provenienta: {provenance_result['reason']}")
risk_score += 0.3
# 2. Scaneaza pentru payload-uri de injectie de prompt
injection_result = self.injection_detector.scan(document['content'])
if injection_result['found_injections']:
issues.extend(injection_result['details'])
risk_score += 0.5
# 3. Verifica continut malitios
content_result = self.content_scanner.scan(document['content'])
if content_result['malicious_indicators']:
issues.extend(content_result['details'])
risk_score += 0.4
# 4. Valideaza metadata
metadata_result = self.metadata_validator.validate(document.get('metadata', {}))
if not metadata_result['valid']:
issues.append(f"Problema metadata: {metadata_result['reason']}")
risk_score += 0.1
# 5. Sanitizeaza continutul daca riscul este acceptabil
sanitized_content = None
if risk_score < 0.5:
sanitized_content = self._sanitize_content(document['content'])
return ValidationResult(
is_valid=risk_score < 0.5,
risk_score=min(risk_score, 1.0),
issues=issues,
sanitized_content=sanitized_content
)
def _sanitize_content(self, content: str) -> str:
"""Elimina sau neutralizeaza continut potential periculos."""
# Elimina pattern-uri de text ascuns
content = re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL)
content = re.sub(r'<[^>]*style="[^"]*display:\s*none[^"]*"[^>]*>.*?</[^>]+>',
'', content, flags=re.DOTALL | re.IGNORECASE)
# Neutralizeaza pattern-uri comune de injectie
patterns_to_neutralize = [
(r'\[SYSTEM\]', '[CONTENT]'),
(r'\[INST\]', '[TEXT]'),
(r'<\|im_start\|>', ''),
(r'###\s*INSTRUCTION', '### SECTION'),
]
for pattern, replacement in patterns_to_neutralize:
content = re.sub(pattern, replacement, content, flags=re.IGNORECASE)
return content
def _validate_provenance(self, document: dict) -> dict:
"""Verifica ca sursa documentului este autorizata si continutul autentic."""
# Verifica daca sursa este in lista permisa
source = document.get('source')
if source not in self.authorized_sources:
return {'valid': False, 'reason': 'Sursa neautorizata'}
# Verifica hash-ul continutului daca este furnizat
if 'content_hash' in document:
actual_hash = hashlib.sha256(document['content'].encode()).hexdigest()
if actual_hash != document['content_hash']:
return {'valid': False, 'reason': 'Nepotrivire hash continut'}
# Verifica semnatura digitala daca este necesara
if self.require_signatures and 'signature' in document:
if not self._verify_signature(document):
return {'valid': False, 'reason': 'Semnatura invalida'}
return {'valid': True}
class InjectionDetector:
"""Detecteaza tentative de injectie de prompt in documente."""
def __init__(self):
self.patterns = self._load_injection_patterns()
self.semantic_model = SentenceTransformer('all-MiniLM-L6-v2')
self.malicious_embeddings = self._load_malicious_embeddings()
def scan(self, content: str) -> dict:
"""Scaneaza continutul pentru pattern-uri de injectie."""
found_injections = []
# Detectie pe baza de pattern-uri
for pattern_name, pattern in self.patterns.items():
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
found_injections.append({
'type': 'pattern_match',
'pattern': pattern_name,
'matches': matches[:5] # Limiteaza potrivirile raportate
})
# Detectie semantica
paragraphs = content.split('\n\n')
for i, paragraph in enumerate(paragraphs):
if len(paragraph) > 20:
embedding = self.semantic_model.encode([paragraph])[0]
similarities = np.dot(self.malicious_embeddings, embedding)
max_sim = float(np.max(similarities))
if max_sim > 0.75:
found_injections.append({
'type': 'semantic_match',
'paragraph_index': i,
'confidence': max_sim,
'snippet': paragraph[:100]
})
return {
'found_injections': len(found_injections) > 0,
'details': found_injections
}Pipeline de Ingestie Securizat
class SecureIngestionPipeline:
"""Pipeline securizat pentru ingestia documentelor in baza de cunostinte RAG."""
def __init__(self, config: dict):
self.validator = DocumentValidator()
self.embedder = SecureEmbedder(config['embedding_model'])
self.vector_store = SecureVectorStore(config['vector_db'])
self.audit_logger = AuditLogger()
async def ingest_document(self,
document: dict,
ingestion_user: str) -> dict:
"""
Ingesteaza securizat un document in baza de cunostinte.
"""
# Genereaza ID document pentru urmarire
doc_id = self._generate_doc_id(document)
# Logheaza tentativa de ingestie
self.audit_logger.log_event({
'event': 'ingestion_attempt',
'doc_id': doc_id,
'user': ingestion_user,
'source': document.get('source'),
'timestamp': datetime.utcnow().isoformat()
})
try:
# Pasul 1: Valideaza documentul
validation = self.validator.validate_document(document)
if not validation.is_valid:
self.audit_logger.log_event({
'event': 'ingestion_rejected',
'doc_id': doc_id,
'reason': 'validation_failed',
'issues': validation.issues,
'risk_score': validation.risk_score
})
return {
'success': False,
'doc_id': doc_id,
'reason': 'Validare esuata',
'issues': validation.issues
}
# Pasul 2: Foloseste continutul sanitizat
safe_content = validation.sanitized_content
# Pasul 3: Genereaza embedding-uri cu validare input
chunks = self._chunk_document(safe_content)
embeddings = []
for chunk in chunks:
# Valideaza chunk-ul inainte de embedding
if self._is_safe_chunk(chunk):
embedding = await self.embedder.embed(chunk)
embeddings.append({
'text': chunk,
'embedding': embedding,
'metadata': self._create_chunk_metadata(document, chunk)
})
# Pasul 4: Stocheaza cu metadata de control acces
storage_result = await self.vector_store.store(
doc_id=doc_id,
embeddings=embeddings,
access_control=document.get('access_control', {}),
provenance={
'ingested_by': ingestion_user,
'ingested_at': datetime.utcnow().isoformat(),
'source': document.get('source'),
'original_hash': hashlib.sha256(
document['content'].encode()
).hexdigest()
}
)
self.audit_logger.log_event({
'event': 'ingestion_success',
'doc_id': doc_id,
'chunks_stored': len(embeddings),
'user': ingestion_user
})
return {
'success': True,
'doc_id': doc_id,
'chunks_stored': len(embeddings)
}
except Exception as e:
self.audit_logger.log_event({
'event': 'ingestion_error',
'doc_id': doc_id,
'error': str(e)
})
raiseSecurizarea Stratului de Recuperare
Componenta de recuperare trebuie sa impuna controale de acces si sa previna manipularea query-urilor.
Implementarea Controlului de Acces
Codul pentru SecureRetriever, AccessController si QueryValidator ramane identic cu versiunea in engleza deoarece este cod Python pur.
Securizarea Stratului de Generare
Ultimul strat trebuie sa gestioneze in siguranta continutul recuperat potential otravit.
Constructia Sigura a Contextului
Codul pentru SecureContextBuilder si RAGOutputValidator ramane identic deoarece este cod Python cu comentarii tehnice.
Pipeline RAG Securizat Complet
Codul pentru SecureRAGPipeline ramane identic - este implementare pura Python.
Concluzie
Securizarea aplicatiilor RAG necesita aparare la fiecare strat: ingestie, stocare, recuperare si generare. Tehnicile din acest ghid ofera o baza, dar securitatea trebuie evaluata continuu pe masura ce atat atacurile cat si apararea evolueaza.
Principii cheie:
- Valideaza la ingestie - Previne intrarea datelor otravite in baza de cunostinte
- Impune controale de acces - Asigura-te ca utilizatorii vad doar documentele la care sunt autorizati
- Trateaza continutul recuperat ca neincrezator - Poate contine tentative de injectie
- Valideaza output-urile - Verifica halucinatiile si ecourile de injectie
- Auditeaza totul - Mentine loguri pentru investigarea incidentelor
La DeviDevs, ne specializam in construirea de sisteme RAG securizate pentru intreprinderi care gestioneaza date sensibile. Contacteaza-ne pentru a discuta cerintele tale de securitate RAG.