Guardrails sunt esentiale pentru implementarea in siguranta a aplicatiilor LLM in productie. Acest ghid acopera implementarea completa a guardrails, inclusiv validare input, filtrare output si moderare de continut in timp real.
Arhitectura Guardrails
Framework-ul de Baza pentru Guardrails
# guardrails_framework.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
from datetime import datetime
import re
import asyncio
class GuardrailAction(Enum):
ALLOW = "allow"
BLOCK = "block"
MODIFY = "modify"
FLAG = "flag"
ESCALATE = "escalate"
class GuardrailType(Enum):
INPUT = "input"
OUTPUT = "output"
CONTEXT = "context"
@dataclass
class GuardrailResult:
passed: bool
action: GuardrailAction
guardrail_name: str
reason: Optional[str] = None
modified_content: Optional[str] = None
confidence: float = 1.0
metadata: Dict = field(default_factory=dict)
@dataclass
class GuardrailConfig:
name: str
guardrail_type: GuardrailType
enabled: bool
check_function: Callable
action_on_fail: GuardrailAction
priority: int = 50
timeout_ms: int = 5000
class GuardrailsEngine:
"""Motorul principal de guardrails pentru aplicatii LLM"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.input_guardrails: List[GuardrailConfig] = []
self.output_guardrails: List[GuardrailConfig] = []
self.context_guardrails: List[GuardrailConfig] = []
self.results_log = []
def add_guardrail(self, guardrail: GuardrailConfig):
"""Adauga un guardrail in motor"""
if guardrail.guardrail_type == GuardrailType.INPUT:
self.input_guardrails.append(guardrail)
self.input_guardrails.sort(key=lambda g: g.priority)
elif guardrail.guardrail_type == GuardrailType.OUTPUT:
self.output_guardrails.append(guardrail)
self.output_guardrails.sort(key=lambda g: g.priority)
else:
self.context_guardrails.append(guardrail)
self.context_guardrails.sort(key=lambda g: g.priority)
async def check_input(self, content: str, context: Dict = None) -> List[GuardrailResult]:
"""Ruleaza toate guardrails de input"""
return await self._run_guardrails(
self.input_guardrails, content, context
)
async def check_output(self, content: str, context: Dict = None) -> List[GuardrailResult]:
"""Ruleaza toate guardrails de output"""
return await self._run_guardrails(
self.output_guardrails, content, context
)
async def _run_guardrails(
self,
guardrails: List[GuardrailConfig],
content: str,
context: Dict = None
) -> List[GuardrailResult]:
"""Executa guardrails in ordinea prioritatii"""
results = []
current_content = content
for guardrail in guardrails:
if not guardrail.enabled:
continue
try:
result = await asyncio.wait_for(
guardrail.check_function(current_content, context),
timeout=guardrail.timeout_ms / 1000
)
results.append(result)
# Gestionare blocare
if not result.passed and guardrail.action_on_fail == GuardrailAction.BLOCK:
break
# Gestionare modificare
if result.modified_content:
current_content = result.modified_content
except asyncio.TimeoutError:
results.append(GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name=guardrail.name,
reason="Guardrail timeout"
))
except Exception as e:
results.append(GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name=guardrail.name,
reason=f"Guardrail error: {str(e)}"
))
self._log_results(content, results)
return results
def _log_results(self, content: str, results: List[GuardrailResult]):
"""Logare rezultate executie guardrails"""
self.results_log.append({
"timestamp": datetime.utcnow().isoformat(),
"content_preview": content[:100],
"results": [
{
"name": r.guardrail_name,
"passed": r.passed,
"action": r.action.value,
"reason": r.reason
}
for r in results
]
})
def get_blocking_result(self, results: List[GuardrailResult]) -> Optional[GuardrailResult]:
"""Obtine primul rezultat de blocare daca exista"""
for result in results:
if not result.passed and result.action == GuardrailAction.BLOCK:
return result
return NoneGuardrails pentru Validarea Input-ului
Detectia Prompt Injection
# input_guardrails.py
import re
from typing import Dict, Optional
async def check_prompt_injection(content: str, context: Dict = None) -> GuardrailResult:
"""Detecteaza tentativele de prompt injection"""
injection_patterns = [
# Suprascriere directa a instructiunilor
r"ignore\s+(all\s+)?(previous|above|prior)\s+(instructions?|prompts?)",
r"disregard\s+(all\s+)?(previous|above|prior)",
r"forget\s+(everything|all)\s+(above|before)",
# Manipulare de rol
r"you\s+are\s+now\s+(a|an|the)",
r"act\s+as\s+(if\s+you\s+are\s+)?(a|an|the)",
r"pretend\s+(to\s+be|you\s+are)",
r"roleplay\s+as",
# Extractie system prompt
r"(what|reveal|show|tell)\s+(is|me)\s+(your|the)\s+system\s+prompt",
r"print\s+(your\s+)?instructions",
r"output\s+(your\s+)?system\s+(message|prompt)",
# Cuvinte cheie jailbreak
r"(DAN|STAN|DUDE)\s*mode",
r"developer\s+mode",
r"jailbreak",
r"bypass\s+(safety|restrictions|filters)",
# Atacuri prin delimitatori
r"```\s*(system|assistant|user)\s*\n",
r"\[INST\]|\[/INST\]",
r"<\|im_(start|end)\|>",
# Tentative de encoding
r"base64\s*[:=]",
r"decode\s+(this|the\s+following)",
r"execute\s+(this|the\s+following)"
]
content_lower = content.lower()
for pattern in injection_patterns:
if re.search(pattern, content_lower, re.IGNORECASE):
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="prompt_injection_detection",
reason=f"Potential prompt injection detected",
confidence=0.9,
metadata={"pattern_matched": pattern}
)
# Verificare pattern-uri suspecte de caractere
if _has_suspicious_encoding(content):
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="prompt_injection_detection",
reason="Suspicious encoding detected",
confidence=0.7
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="prompt_injection_detection"
)
def _has_suspicious_encoding(content: str) -> bool:
"""Verificare pattern-uri suspecte de encoding"""
# Concentratie ridicata de caractere speciale
special_ratio = len(re.findall(r'[^\w\s]', content)) / max(len(content), 1)
if special_ratio > 0.3:
return True
# Secvente de escape Unicode
if re.search(r'\\u[0-9a-fA-F]{4}', content):
return True
# Manipulare excesiva de whitespace
if re.search(r'\s{10,}', content):
return True
return FalseLimite de Lungime si Continut pentru Input
async def check_input_limits(content: str, context: Dict = None) -> GuardrailResult:
"""Aplicarea limitelor de lungime si continut pentru input"""
config = context.get("limits_config", {}) if context else {}
max_length = config.get("max_length", 10000)
max_lines = config.get("max_lines", 500)
max_words = config.get("max_words", 2000)
issues = []
# Verificare lungime
if len(content) > max_length:
issues.append(f"Continutul depaseste lungimea maxima de {max_length} caractere")
# Numar de linii
line_count = content.count('\n') + 1
if line_count > max_lines:
issues.append(f"Continutul depaseste maximul de {max_lines} linii")
# Numar de cuvinte
word_count = len(content.split())
if word_count > max_words:
issues.append(f"Continutul depaseste maximul de {max_words} cuvinte")
if issues:
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="input_limits",
reason="; ".join(issues),
metadata={
"length": len(content),
"lines": line_count,
"words": word_count
}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="input_limits"
)
async def check_pii_in_input(content: str, context: Dict = None) -> GuardrailResult:
"""Detectare si optional redactare PII in input"""
pii_patterns = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b(?:\+1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b",
"ip_address": r"\b(?:\d{1,3}\.){3}\d{1,3}\b"
}
found_pii = {}
redacted_content = content
for pii_type, pattern in pii_patterns.items():
matches = re.findall(pattern, content)
if matches:
found_pii[pii_type] = len(matches)
# Redacteaza daca este configurat
if context and context.get("redact_pii", False):
redacted_content = re.sub(
pattern,
f"[REDACTED_{pii_type.upper()}]",
redacted_content
)
if found_pii:
action = GuardrailAction.MODIFY if context and context.get("redact_pii") else GuardrailAction.FLAG
return GuardrailResult(
passed=False,
action=action,
guardrail_name="pii_detection",
reason=f"PII detectat: {', '.join(found_pii.keys())}",
modified_content=redacted_content if action == GuardrailAction.MODIFY else None,
metadata={"pii_found": found_pii}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="pii_detection"
)Guardrails pentru Filtrarea Output-ului
Clasificator de Siguranta a Continutului
# output_guardrails.py
from typing import Dict, List
import aiohttp
class ContentSafetyClassifier:
"""Clasificare de siguranta a continutului bazata pe ML"""
CATEGORIES = [
"hate_speech",
"violence",
"self_harm",
"sexual_content",
"harassment",
"dangerous_content"
]
def __init__(self, api_endpoint: str, api_key: str):
self.api_endpoint = api_endpoint
self.api_key = api_key
async def classify(self, content: str) -> Dict:
"""Clasificare continut pentru probleme de siguranta"""
async with aiohttp.ClientSession() as session:
async with session.post(
self.api_endpoint,
headers={"Authorization": f"Bearer {self.api_key}"},
json={"content": content}
) as response:
return await response.json()
async def check_content_safety(content: str, context: Dict = None) -> GuardrailResult:
"""Verificare continut output pentru probleme de siguranta"""
# Verificare rapida bazata pe cuvinte cheie
harmful_keywords = {
"violence": ["kill", "murder", "attack", "weapon", "bomb"],
"self_harm": ["suicide", "self-harm", "hurt myself"],
"dangerous": ["how to make", "instructions for", "step by step to create"]
}
flagged_categories = []
for category, keywords in harmful_keywords.items():
content_lower = content.lower()
for keyword in keywords:
if keyword in content_lower:
flagged_categories.append(category)
break
# Daca se gasesc cuvinte cheie, efectueaza o analiza mai profunda
if flagged_categories:
# In productie, apeleaza clasificatorul ML aici
# classifier = ContentSafetyClassifier(...)
# result = await classifier.classify(content)
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="content_safety",
reason=f"Continut semnalat pentru: {', '.join(flagged_categories)}",
confidence=0.8,
metadata={"categories": flagged_categories}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="content_safety"
)
async def check_hallucination_indicators(
content: str,
context: Dict = None
) -> GuardrailResult:
"""Detectare indicatori potentiali de halucinatie"""
indicators = []
# Verificare afirmatii sigure despre lucruri incerte
uncertainty_phrases = [
"I'm not sure",
"I don't have information",
"I cannot verify",
"As of my knowledge cutoff"
]
certainty_phrases = [
"definitely",
"certainly",
"absolutely",
"without a doubt",
"100%"
]
# Detectare contradictii
has_uncertainty = any(phrase.lower() in content.lower() for phrase in uncertainty_phrases)
has_certainty = any(phrase.lower() in content.lower() for phrase in certainty_phrases)
if has_uncertainty and has_certainty:
indicators.append("contradictory_certainty")
# Verificare citatii fabricate
fake_citation_patterns = [
r"according to a \d{4} study",
r"research from \w+ University shows",
r"Dr\. \w+ \w+ stated"
]
for pattern in fake_citation_patterns:
if re.search(pattern, content):
indicators.append("potential_fabricated_citation")
break
# Verificare numere specifice care ar putea fi halucinatii
if re.search(r"\d{1,2}\.\d{1,2}%", content):
indicators.append("specific_statistics")
if indicators:
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="hallucination_detection",
reason=f"Indicatori potentiali de halucinatie: {', '.join(indicators)}",
confidence=0.6,
metadata={"indicators": indicators}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="hallucination_detection"
)Aplicarea Topicului si a Domeniului
async def check_topic_adherence(content: str, context: Dict = None) -> GuardrailResult:
"""Asigurarea ca raspunsul ramane in topicurile permise"""
allowed_topics = context.get("allowed_topics", []) if context else []
blocked_topics = context.get("blocked_topics", []) if context else []
if not allowed_topics and not blocked_topics:
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="topic_adherence"
)
content_lower = content.lower()
# Verificare topicuri blocate
for topic in blocked_topics:
topic_keywords = topic.get("keywords", [])
for keyword in topic_keywords:
if keyword.lower() in content_lower:
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="topic_adherence",
reason=f"Raspunsul contine topic blocat: {topic.get('name')}",
metadata={"blocked_topic": topic.get("name")}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="topic_adherence"
)
async def check_response_format(content: str, context: Dict = None) -> GuardrailResult:
"""Validare ca raspunsul respecta formatul asteptat"""
expected_format = context.get("expected_format") if context else None
if not expected_format:
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="response_format"
)
issues = []
if expected_format == "json":
try:
import json
json.loads(content)
except json.JSONDecodeError as e:
issues.append(f"JSON invalid: {str(e)}")
elif expected_format == "markdown":
# Validare de baza markdown
if not re.search(r'(^#|\*\*|__|```)', content, re.MULTILINE):
issues.append("Formatarea markdown asteptata nu a fost gasita")
elif expected_format == "bullet_list":
if not re.search(r'^[\-\*]\s', content, re.MULTILINE):
issues.append("Format de lista cu bullet asteptat")
if issues:
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="response_format",
reason="; ".join(issues),
metadata={"expected_format": expected_format}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="response_format"
)Moderare in Timp Real
Moderare Continut in Streaming
# streaming_moderation.py
from typing import AsyncGenerator, Tuple
class StreamingModerator:
"""Moderare in timp real pentru raspunsuri LLM in streaming"""
def __init__(self, guardrails: GuardrailsEngine):
self.guardrails = guardrails
self.buffer = ""
self.buffer_size = 100 # Caractere de bufferizat inainte de verificare
async def moderate_stream(
self,
token_stream: AsyncGenerator[str, None]
) -> AsyncGenerator[Tuple[str, bool], None]:
"""Moderare tokeni in streaming in timp real"""
async for token in token_stream:
self.buffer += token
# Verificare cand buffer-ul atinge pragul
if len(self.buffer) >= self.buffer_size:
results = await self.guardrails.check_output(self.buffer)
blocking = self.guardrails.get_blocking_result(results)
if blocking:
# Opreste streaming-ul si returneaza status blocat
yield ("", False)
return
# Emite continutul din buffer
yield (self.buffer, True)
self.buffer = ""
# Emite buffer-ul ramas
if self.buffer:
results = await self.guardrails.check_output(self.buffer)
blocking = self.guardrails.get_blocking_result(results)
if blocking:
yield ("", False)
else:
yield (self.buffer, True)
class IncrementalModerator:
"""Moderare incrementala cu lookback"""
def __init__(self, window_size: int = 500):
self.window_size = window_size
self.full_response = ""
async def check_increment(
self,
new_content: str,
context: Dict = None
) -> GuardrailResult:
"""Verificare increment de continut nou cu fereastra de context"""
self.full_response += new_content
# Obtine fereastra de continut recent
window_start = max(0, len(self.full_response) - self.window_size)
check_content = self.full_response[window_start:]
# Verificari rapide de pattern pe increment
dangerous_patterns = [
r"rm\s+-rf",
r"DROP\s+TABLE",
r"<script>",
r"eval\(",
r"exec\("
]
for pattern in dangerous_patterns:
if re.search(pattern, check_content, re.IGNORECASE):
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="incremental_moderation",
reason=f"Pattern periculos detectat",
metadata={"pattern": pattern}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="incremental_moderation"
)Exemplu de Integrare
Pipeline Complet de Guardrails
# guardrails_pipeline.py
async def create_guardrailed_completion(
prompt: str,
system_prompt: str,
llm_client,
guardrails: GuardrailsEngine,
context: Dict = None
) -> Dict:
"""Apel LLM complet cu guardrails"""
# Guardrails de input
input_results = await guardrails.check_input(prompt, context)
blocking = guardrails.get_blocking_result(input_results)
if blocking:
return {
"success": False,
"blocked": True,
"stage": "input",
"reason": blocking.reason,
"response": None
}
# Obtine input-ul potential modificat
modified_prompt = prompt
for result in input_results:
if result.modified_content:
modified_prompt = result.modified_content
# Apel LLM
try:
response = await llm_client.generate(
system_prompt=system_prompt,
user_prompt=modified_prompt
)
except Exception as e:
return {
"success": False,
"blocked": False,
"stage": "llm",
"reason": str(e),
"response": None
}
# Guardrails de output
output_results = await guardrails.check_output(response, context)
blocking = guardrails.get_blocking_result(output_results)
if blocking:
return {
"success": False,
"blocked": True,
"stage": "output",
"reason": blocking.reason,
"response": None
}
# Obtine output-ul potential modificat
final_response = response
for result in output_results:
if result.modified_content:
final_response = result.modified_content
# Verificare flag-uri care necesita atentie
flags = [r for r in input_results + output_results if r.action == GuardrailAction.FLAG]
return {
"success": True,
"blocked": False,
"response": final_response,
"flags": [{"name": f.guardrail_name, "reason": f.reason} for f in flags],
"input_modified": modified_prompt != prompt,
"output_modified": final_response != response
}
# Exemplu de utilizare
async def main():
# Initializare motor guardrails
guardrails = GuardrailsEngine()
# Adaugare guardrails de input
guardrails.add_guardrail(GuardrailConfig(
name="prompt_injection",
guardrail_type=GuardrailType.INPUT,
enabled=True,
check_function=check_prompt_injection,
action_on_fail=GuardrailAction.BLOCK,
priority=10
))
guardrails.add_guardrail(GuardrailConfig(
name="pii_detection",
guardrail_type=GuardrailType.INPUT,
enabled=True,
check_function=check_pii_in_input,
action_on_fail=GuardrailAction.MODIFY,
priority=20
))
# Adaugare guardrails de output
guardrails.add_guardrail(GuardrailConfig(
name="content_safety",
guardrail_type=GuardrailType.OUTPUT,
enabled=True,
check_function=check_content_safety,
action_on_fail=GuardrailAction.BLOCK,
priority=10
))
guardrails.add_guardrail(GuardrailConfig(
name="hallucination_check",
guardrail_type=GuardrailType.OUTPUT,
enabled=True,
check_function=check_hallucination_indicators,
action_on_fail=GuardrailAction.FLAG,
priority=20
))
# Utilizare in aplicatie
result = await create_guardrailed_completion(
prompt="Intrebarea utilizatorului aici",
system_prompt="Esti un asistent util",
llm_client=my_llm_client,
guardrails=guardrails,
context={"redact_pii": True}
)
print(result)Sumar
Guardrails eficiente pentru LLM necesita:
- Validare input: Blocheaza prompt injection si valideaza continutul
- Filtrare output: Verifica continut daunator si halucinatii
- Moderare in timp real: Gestioneaza raspunsuri in streaming in siguranta
- Aparare stratificata: Guardrails multiple cu prioritati diferite
- Actiuni configurabile: Blocheaza, modifica, semnaleaza sau escaleaza
- Logare completa: Urmareste toate deciziile guardrails
Implementeaza guardrails ca prima si ultima linie de aparare pentru toate interactiunile LLM, pentru a asigura aplicatii AI sigure si fiabile.