Construirea unor Guardrails LLM Robuste: Ghid Tehnic de Implementare
Guardrails sunt mecanismele de siguranta care mentin aplicatiile LLM in limitele definite. Fara ele, chiar si cele mai capabile modele pot produce output-uri daunatoare, inexacte sau care nu respecta identitatea brandului, afectand increderea utilizatorilor si reputatia afacerii.
Acest ghid ofera implementari gata de productie pentru construirea unor sisteme complete de guardrails.
Intelegerea Arhitecturii Guardrails
Un sistem robust de guardrails functioneaza pe mai multe niveluri:
User Input → Input Guardrails → LLM → Output Guardrails → User Response
↓ ↓
Block/Modify Block/Modify/Flag
↓ ↓
Audit Log Audit Log
Principii de Design
- Aparare in profunzime - Verificari multiple independente
- Fail secure - In caz de dubiu, blocheaza
- Constient de performanta - Impact minim asupra latentei
- Observabil - Logare completa
- Configurabil - Ajustabil fara modificari de cod
Guardrails pentru Input
Clasificarea Continutului
from dataclasses import dataclass
from enum import Enum
from typing import Optional, List
import re
class ContentCategory(Enum):
SAFE = "safe"
POTENTIALLY_HARMFUL = "potentially_harmful"
HARMFUL = "harmful"
PII = "pii"
INJECTION_ATTEMPT = "injection_attempt"
OFF_TOPIC = "off_topic"
@dataclass
class ClassificationResult:
category: ContentCategory
confidence: float
triggered_rules: List[str]
should_block: bool
modified_input: Optional[str] = None
class InputClassifier:
"""Clasificarea si filtrarea input-urilor inainte de procesarea LLM."""
def __init__(self, config: dict):
self.config = config
self.harmful_patterns = self._load_harmful_patterns()
self.pii_patterns = self._load_pii_patterns()
self.injection_patterns = self._load_injection_patterns()
self.topic_classifier = TopicClassifier(config.get('allowed_topics', []))
def classify(self, user_input: str, context: dict = None) -> ClassificationResult:
"""
Clasificarea input-ului utilizatorului si determinarea actiunii corespunzatoare.
"""
triggered_rules = []
category = ContentCategory.SAFE
confidence = 1.0
# Verificare tentative de injection (prioritate maxima)
injection_check = self._check_injection(user_input)
if injection_check['detected']:
return ClassificationResult(
category=ContentCategory.INJECTION_ATTEMPT,
confidence=injection_check['confidence'],
triggered_rules=injection_check['rules'],
should_block=True
)
# Verificare PII
pii_check = self._check_pii(user_input)
if pii_check['detected']:
triggered_rules.extend(pii_check['rules'])
if self.config.get('block_pii', True):
return ClassificationResult(
category=ContentCategory.PII,
confidence=pii_check['confidence'],
triggered_rules=triggered_rules,
should_block=False, # Redacteaza in loc sa blocheze
modified_input=self._redact_pii(user_input, pii_check['matches'])
)
# Verificare continut daunator
harmful_check = self._check_harmful_content(user_input)
if harmful_check['detected']:
return ClassificationResult(
category=ContentCategory.HARMFUL if harmful_check['severity'] == 'high'
else ContentCategory.POTENTIALLY_HARMFUL,
confidence=harmful_check['confidence'],
triggered_rules=harmful_check['rules'],
should_block=harmful_check['severity'] == 'high'
)
# Verificare relevanta topicului
if self.config.get('enforce_topic', False):
topic_check = self.topic_classifier.is_on_topic(user_input)
if not topic_check['on_topic']:
return ClassificationResult(
category=ContentCategory.OFF_TOPIC,
confidence=topic_check['confidence'],
triggered_rules=['off_topic'],
should_block=self.config.get('block_off_topic', False)
)
return ClassificationResult(
category=ContentCategory.SAFE,
confidence=1.0,
triggered_rules=triggered_rules,
should_block=False,
modified_input=user_input if triggered_rules else None
)
def _check_injection(self, text: str) -> dict:
"""Verificare tentative de prompt injection."""
detected_rules = []
max_confidence = 0.0
for pattern_name, pattern_config in self.injection_patterns.items():
pattern = pattern_config['pattern']
if re.search(pattern, text, re.IGNORECASE):
detected_rules.append(pattern_name)
max_confidence = max(max_confidence, pattern_config['confidence'])
return {
'detected': len(detected_rules) > 0,
'rules': detected_rules,
'confidence': max_confidence
}
def _check_pii(self, text: str) -> dict:
"""Verificare informatii personale identificabile."""
matches = []
pii_types = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'ip_address': r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
}
for pii_type, pattern in pii_types.items():
found = re.findall(pattern, text)
if found:
matches.append({
'type': pii_type,
'matches': found,
'pattern': pattern
})
return {
'detected': len(matches) > 0,
'rules': [m['type'] for m in matches],
'matches': matches,
'confidence': 0.95 if matches else 0.0
}
def _redact_pii(self, text: str, matches: list) -> str:
"""Redactarea PII detectat din text."""
redacted = text
for match in matches:
for value in match['matches']:
redacted = redacted.replace(value, f'[REDACTED_{match["type"].upper()}]')
return redacted
def _check_harmful_content(self, text: str) -> dict:
"""Verificare pattern-uri de continut daunator."""
detected = []
for category, patterns in self.harmful_patterns.items():
for pattern_config in patterns:
if re.search(pattern_config['pattern'], text, re.IGNORECASE):
detected.append({
'category': category,
'severity': pattern_config['severity'],
'confidence': pattern_config['confidence']
})
if not detected:
return {'detected': False, 'rules': [], 'severity': None, 'confidence': 0.0}
max_severity = max(d['severity'] for d in detected)
max_confidence = max(d['confidence'] for d in detected)
return {
'detected': True,
'rules': [d['category'] for d in detected],
'severity': max_severity,
'confidence': max_confidence
}Rate Limiting si Prevenirea Abuzurilor
from collections import defaultdict
import time
class RateLimiter:
"""Rate limiting pentru acces la API-ul LLM."""
def __init__(self, config: dict):
self.limits = config.get('limits', {
'requests_per_minute': 20,
'requests_per_hour': 100,
'tokens_per_minute': 40000,
'tokens_per_hour': 200000,
})
self.user_requests = defaultdict(list)
self.user_tokens = defaultdict(list)
self.blocked_users = {}
def check_rate_limit(self, user_id: str, estimated_tokens: int = 0) -> dict:
"""Verificare daca utilizatorul se incadreaza in limitele de rata."""
now = time.time()
# Verificare daca utilizatorul este blocat
if user_id in self.blocked_users:
if now < self.blocked_users[user_id]:
return {
'allowed': False,
'reason': 'user_blocked',
'retry_after': int(self.blocked_users[user_id] - now)
}
else:
del self.blocked_users[user_id]
# Curatare intrari vechi
self._cleanup_old_entries(user_id, now)
# Verificare rata cereri
minute_requests = len([
r for r in self.user_requests[user_id]
if r > now - 60
])
if minute_requests >= self.limits['requests_per_minute']:
return {
'allowed': False,
'reason': 'requests_per_minute_exceeded',
'retry_after': 60
}
hour_requests = len([
r for r in self.user_requests[user_id]
if r > now - 3600
])
if hour_requests >= self.limits['requests_per_hour']:
return {
'allowed': False,
'reason': 'requests_per_hour_exceeded',
'retry_after': 3600
}
# Verificare rata tokeni
minute_tokens = sum(
t['tokens'] for t in self.user_tokens[user_id]
if t['time'] > now - 60
)
if minute_tokens + estimated_tokens > self.limits['tokens_per_minute']:
return {
'allowed': False,
'reason': 'tokens_per_minute_exceeded',
'retry_after': 60
}
return {'allowed': True}
def record_usage(self, user_id: str, tokens_used: int):
"""Inregistrarea utilizarii API."""
now = time.time()
self.user_requests[user_id].append(now)
self.user_tokens[user_id].append({'time': now, 'tokens': tokens_used})
def block_user(self, user_id: str, duration_seconds: int):
"""Blocarea temporara a unui utilizator."""
self.blocked_users[user_id] = time.time() + duration_seconds
def _cleanup_old_entries(self, user_id: str, now: float):
"""Eliminarea intrarilor mai vechi de 1 ora."""
cutoff = now - 3600
self.user_requests[user_id] = [
r for r in self.user_requests[user_id] if r > cutoff
]
self.user_tokens[user_id] = [
t for t in self.user_tokens[user_id] if t['time'] > cutoff
]Guardrails pentru Output
Validarea Raspunsurilor
class OutputValidator:
"""Validarea output-urilor LLM inainte de returnarea catre utilizatori."""
def __init__(self, config: dict):
self.config = config
self.content_policy = ContentPolicy(config.get('content_policy', {}))
self.factuality_checker = FactualityChecker(config.get('factuality', {}))
self.brand_guidelines = BrandGuidelines(config.get('brand', {}))
def validate(self, response: str, context: dict) -> dict:
"""
Validare completa a output-ului LLM.
"""
validations = []
# Verificare politica de continut
policy_result = self.content_policy.check(response)
validations.append(('content_policy', policy_result))
# Verificare indicatori de halucinatie
if context.get('check_factuality', False):
factuality_result = self.factuality_checker.check(
response, context.get('sources', [])
)
validations.append(('factuality', factuality_result))
# Verificare ghiduri de brand
brand_result = self.brand_guidelines.check(response)
validations.append(('brand_guidelines', brand_result))
# Verificare scurgeri de informatii sensibile
leakage_result = self._check_information_leakage(response, context)
validations.append(('information_leakage', leakage_result))
# Agregare rezultate
all_passed = all(v[1]['passed'] for v in validations)
should_block = any(v[1].get('block', False) for v in validations)
return {
'valid': all_passed,
'should_block': should_block,
'validations': dict(validations),
'modified_response': self._apply_modifications(response, validations) if not should_block else None
}
def _check_information_leakage(self, response: str, context: dict) -> dict:
"""Verificare scurgeri de informatii sensibile."""
issues = []
# Verificare scurgere system prompt
system_prompt = context.get('system_prompt', '')
if system_prompt:
# Verificare daca portiuni semnificative din system prompt apar in raspuns
prompt_words = set(system_prompt.lower().split())
response_words = set(response.lower().split())
overlap = prompt_words & response_words
if len(overlap) > len(prompt_words) * 0.3:
issues.append('potential_system_prompt_leakage')
# Verificare pattern-uri de informatii interne
internal_patterns = [
r'api[_-]?key\s*[:=]',
r'password\s*[:=]',
r'secret\s*[:=]',
r'internal[_-]?use[_-]?only',
r'confidential',
]
for pattern in internal_patterns:
if re.search(pattern, response, re.IGNORECASE):
issues.append(f'sensitive_pattern: {pattern}')
return {
'passed': len(issues) == 0,
'issues': issues,
'block': 'potential_system_prompt_leakage' in issues
}
def _apply_modifications(self, response: str, validations: list) -> str:
"""Aplicarea modificarilor necesare asupra raspunsului."""
modified = response
for check_name, result in validations:
if result.get('modifications'):
for mod in result['modifications']:
modified = mod['apply'](modified)
return modified
class ContentPolicy:
"""Aplicarea politicilor de continut asupra output-urilor LLM."""
def __init__(self, config: dict):
self.prohibited_categories = config.get('prohibited', [
'hate_speech', 'violence', 'sexual_content', 'self_harm',
'illegal_activity', 'misinformation'
])
self.classifiers = self._load_classifiers()
def check(self, text: str) -> dict:
"""Verificarea textului fata de politicile de continut."""
violations = []
for category in self.prohibited_categories:
classifier = self.classifiers.get(category)
if classifier:
result = classifier.classify(text)
if result['score'] > result['threshold']:
violations.append({
'category': category,
'score': result['score'],
'threshold': result['threshold']
})
return {
'passed': len(violations) == 0,
'violations': violations,
'block': any(v['score'] > 0.9 for v in violations)
}
class BrandGuidelines:
"""Aplicarea vocii si ghidurilor de brand."""
def __init__(self, config: dict):
self.tone = config.get('tone', 'professional')
self.prohibited_terms = config.get('prohibited_terms', [])
self.required_disclaimers = config.get('disclaimers', {})
def check(self, text: str) -> dict:
"""Verificarea textului fata de ghidurile de brand."""
issues = []
modifications = []
# Verificare termeni interzisi
for term in self.prohibited_terms:
if term.lower() in text.lower():
issues.append(f'prohibited_term: {term}')
# Verificare disclaimere obligatorii
for topic, disclaimer in self.required_disclaimers.items():
if self._topic_mentioned(text, topic) and disclaimer not in text:
modifications.append({
'type': 'add_disclaimer',
'disclaimer': disclaimer,
'apply': lambda t, d=disclaimer: t + f'\n\n{d}'
})
return {
'passed': len(issues) == 0,
'issues': issues,
'modifications': modifications
}
def _topic_mentioned(self, text: str, topic: str) -> bool:
"""Verificare daca un topic este mentionat in text."""
topic_keywords = {
'medical': ['health', 'medical', 'diagnosis', 'treatment', 'symptom'],
'financial': ['invest', 'stock', 'financial', 'money', 'trading'],
'legal': ['legal', 'law', 'lawsuit', 'liability', 'contract'],
}
keywords = topic_keywords.get(topic, [topic])
return any(kw in text.lower() for kw in keywords)Guardrails Comportamentale
class BehavioralGuardrails:
"""Aplicarea constrangerilor comportamentale asupra interactiunilor LLM."""
def __init__(self, config: dict):
self.persona_config = config.get('persona', {})
self.interaction_limits = config.get('interaction_limits', {})
self.session_monitor = SessionMonitor()
def check_interaction(self, user_input: str, llm_response: str,
session_context: dict) -> dict:
"""Verificarea interactiunii fata de constrangerile comportamentale."""
checks = []
# Verificare consistenta persona
persona_check = self._check_persona_consistency(llm_response)
checks.append(('persona', persona_check))
# Verificare tentative de manipulare
manipulation_check = self._detect_manipulation(
user_input, llm_response, session_context
)
checks.append(('manipulation', manipulation_check))
# Verificare adecvare raspuns
appropriateness_check = self._check_appropriateness(
user_input, llm_response
)
checks.append(('appropriateness', appropriateness_check))
# Verificare pattern-uri de sesiune
session_check = self.session_monitor.check_session(
session_context['session_id'],
user_input,
llm_response
)
checks.append(('session', session_check))
return {
'passed': all(c[1]['passed'] for c in checks),
'checks': dict(checks),
'actions': self._determine_actions(checks)
}
def _check_persona_consistency(self, response: str) -> dict:
"""Asigurarea ca raspunsul mentine persona definita."""
violations = []
# Verificare afirmatii la persoana intai care incalca persona
if self.persona_config.get('no_opinions', False):
opinion_patterns = [
r'\bI (think|believe|feel)\b',
r'\bIn my opinion\b',
r'\bPersonally,?\s*I\b',
]
for pattern in opinion_patterns:
if re.search(pattern, response, re.IGNORECASE):
violations.append('expressed_opinion')
# Verificare afirmatii despre capabilitati
if self.persona_config.get('capability_claims', []):
for claim in self.persona_config['capability_claims']:
if claim['pattern'].search(response):
if claim['allowed'] == False:
violations.append(f"capability_claim: {claim['name']}")
return {
'passed': len(violations) == 0,
'violations': violations
}
def _detect_manipulation(self, user_input: str, llm_response: str,
context: dict) -> dict:
"""Detectarea daca LLM-ul pare a fi manipulat."""
indicators = []
# Verificare daca raspunsul urmeaza un pattern suspect de instructiuni
instruction_ack_patterns = [
r'(?:okay|sure|alright),?\s*(?:I\'ll|let me)\s+(?:ignore|disregard)',
r'(?:as|per)\s+your\s+(?:new\s+)?instructions?',
r'switching\s+to\s+\w+\s+mode',
r'now\s+operating\s+(?:as|in)',
]
for pattern in instruction_ack_patterns:
if re.search(pattern, llm_response, re.IGNORECASE):
indicators.append('instruction_acknowledgment')
# Verificare schimbare dramatica de comportament
if context.get('previous_responses'):
behavioral_shift = self._measure_behavioral_shift(
context['previous_responses'], llm_response
)
if behavioral_shift > 0.7:
indicators.append('behavioral_shift')
return {
'passed': len(indicators) == 0,
'indicators': indicators,
'block': 'instruction_acknowledgment' in indicators
}
def _check_appropriateness(self, user_input: str, response: str) -> dict:
"""Verificare daca raspunsul este adecvat pentru interogare."""
issues = []
# Verificare proportionalitate lungime raspuns
input_len = len(user_input)
response_len = len(response)
if input_len < 20 and response_len > 2000:
issues.append('disproportionate_length')
# Verificare divagari off-topic
# Foloseste similaritate semantica intre input si raspuns
similarity = self._compute_semantic_similarity(user_input, response)
if similarity < 0.3:
issues.append('off_topic_response')
return {
'passed': len(issues) == 0,
'issues': issues
}
class SessionMonitor:
"""Monitorizarea pattern-urilor de sesiune pentru detectarea abuzurilor."""
def __init__(self):
self.sessions = {}
self.abuse_patterns = self._load_abuse_patterns()
def check_session(self, session_id: str, user_input: str,
llm_response: str) -> dict:
"""Monitorizarea sesiunii pentru pattern-uri ingrijoratoare."""
if session_id not in self.sessions:
self.sessions[session_id] = {
'interactions': [],
'flags': [],
'created_at': time.time()
}
session = self.sessions[session_id]
session['interactions'].append({
'input': user_input,
'response': llm_response,
'timestamp': time.time()
})
warnings = []
# Verificare tentative repetate de injection
injection_count = sum(
1 for i in session['interactions']
if self._looks_like_injection(i['input'])
)
if injection_count > 3:
warnings.append('repeated_injection_attempts')
# Verificare cereri escaladante
if self._detect_escalation(session['interactions']):
warnings.append('escalating_requests')
# Verificare pattern-uri de tentativa de jailbreak
if self._detect_jailbreak_pattern(session['interactions']):
warnings.append('jailbreak_pattern')
return {
'passed': len(warnings) == 0,
'warnings': warnings,
'session_risk': self._calculate_session_risk(session, warnings)
}Orchestrarea Guardrails
class GuardrailOrchestrator:
"""Orchestrarea tuturor guardrails pentru interactiunile LLM."""
def __init__(self, config: dict):
self.input_classifier = InputClassifier(config.get('input', {}))
self.rate_limiter = RateLimiter(config.get('rate_limit', {}))
self.output_validator = OutputValidator(config.get('output', {}))
self.behavioral_guardrails = BehavioralGuardrails(config.get('behavior', {}))
self.audit_logger = AuditLogger()
async def process_request(self, user_input: str, user_context: dict,
llm_client) -> dict:
"""Procesarea unei cereri prin toate guardrails."""
request_id = str(uuid.uuid4())
session_id = user_context.get('session_id')
user_id = user_context.get('user_id')
# Faza 1: Rate limiting
rate_check = self.rate_limiter.check_rate_limit(
user_id, estimated_tokens=len(user_input.split()) * 2
)
if not rate_check['allowed']:
self.audit_logger.log({
'event': 'rate_limited',
'request_id': request_id,
'user_id': user_id,
'reason': rate_check['reason']
})
return {
'success': False,
'error': 'rate_limited',
'retry_after': rate_check.get('retry_after')
}
# Faza 2: Clasificare input
input_result = self.input_classifier.classify(user_input, user_context)
if input_result.should_block:
self.audit_logger.log({
'event': 'input_blocked',
'request_id': request_id,
'user_id': user_id,
'category': input_result.category.value,
'rules': input_result.triggered_rules
})
return {
'success': False,
'error': 'input_policy_violation',
'category': input_result.category.value
}
# Foloseste input-ul modificat daca PII a fost redactat
processed_input = input_result.modified_input or user_input
# Faza 3: Apel LLM
try:
llm_response = await llm_client.complete(processed_input, user_context)
except Exception as e:
self.audit_logger.log({
'event': 'llm_error',
'request_id': request_id,
'error': str(e)
})
return {
'success': False,
'error': 'llm_error'
}
# Faza 4: Validare output
output_result = self.output_validator.validate(llm_response, {
'check_factuality': user_context.get('check_factuality', False),
'system_prompt': user_context.get('system_prompt', ''),
'sources': user_context.get('sources', [])
})
if output_result['should_block']:
self.audit_logger.log({
'event': 'output_blocked',
'request_id': request_id,
'validations': output_result['validations']
})
return {
'success': False,
'error': 'output_policy_violation'
}
final_response = output_result.get('modified_response', llm_response)
# Faza 5: Verificari comportamentale
behavioral_result = self.behavioral_guardrails.check_interaction(
user_input, final_response, {
'session_id': session_id,
'previous_responses': user_context.get('previous_responses', [])
}
)
if not behavioral_result['passed']:
for action in behavioral_result.get('actions', []):
self._execute_action(action, user_context)
# Inregistrare utilizare pentru rate limiting
self.rate_limiter.record_usage(user_id, len(final_response.split()))
# Logare completa
self.audit_logger.log({
'event': 'request_completed',
'request_id': request_id,
'user_id': user_id,
'input_classification': input_result.category.value,
'output_valid': output_result['valid'],
'behavioral_passed': behavioral_result['passed'],
'response_length': len(final_response)
})
return {
'success': True,
'response': final_response,
'metadata': {
'request_id': request_id,
'input_modified': input_result.modified_input is not None,
'output_modified': output_result.get('modified_response') is not None,
'warnings': behavioral_result.get('warnings', [])
}
}Managementul Configurarii
# guardrails_config.yaml
input:
content_classification:
enabled: true
block_harmful: true
block_pii: false # Redacteaza in loc sa blocheze
block_off_topic: false
injection_detection:
enabled: true
sensitivity: high
block_on_detection: true
rate_limit:
enabled: true
limits:
requests_per_minute: 20
requests_per_hour: 100
tokens_per_minute: 40000
tokens_per_hour: 200000
output:
content_policy:
enabled: true
prohibited:
- hate_speech
- violence
- sexual_content
- self_harm
- illegal_activity
brand_guidelines:
enabled: true
tone: professional
prohibited_terms:
- competitor_name
- internal_project_code
disclaimers:
medical: "Aceasta nu este o recomandare medicala. Consulta un profesionist din domeniul sanatatii."
financial: "Aceasta nu este o recomandare financiara. Consulta un consilier financiar calificat."
legal: "Aceasta nu este o recomandare juridica. Consulta un avocat autorizat."
behavior:
persona:
no_opinions: true
maintain_role: true
session_monitoring:
enabled: true
max_interactions: 100
abuse_detection: true
audit:
enabled: true
log_inputs: true
log_outputs: false # Consideratie PII
retention_days: 90Concluzie
Guardrails robuste sunt esentiale pentru implementarea LLM-urilor in productie in siguranta. Cheia consta in implementarea mai multor niveluri de protectie care functioneaza impreuna, ramanand performante si usor de intretinut.
Principii cheie:
- Stratifica-ti apararea - Niciun singur guardrail nu este suficient
- Fail secure - Blocheaza cand nu esti sigur
- Monitorizeaza si adapteaza - Atacurile noi necesita aparari noi
- Echilibreaza siguranta si usabilitatea - Filtrarea prea agresiva frustreaza utilizatorii
- Logheaza complet - Ai nevoie de date pentru a imbunatati
La DeviDevs, ajutam organizatiile sa implementeze sisteme de guardrails de calitate pentru productie, adaptate cazurilor lor specifice de utilizare si profilurilor de risc. Contacteaza-ne pentru a discuta cerintele tale de siguranta AI.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →