Building Robust LLM Guardrails: A Technical Implementation Guide
Guardrails are the safety mechanisms that keep LLM applications operating within defined boundaries. Without them, even the most capable models can produce harmful, inaccurate, or off-brand outputs that damage user trust and business reputation.
This guide provides production-ready implementations for building comprehensive guardrail systems.
Understanding Guardrail Architecture
A robust guardrail system operates at multiple layers:
User Input → Input Guardrails → LLM → Output Guardrails → User Response
↓ ↓
Block/Modify Block/Modify/Flag
↓ ↓
Audit Log Audit Log
Design Principles
- Defense in depth - Multiple independent checks
- Fail secure - When in doubt, block
- Performance aware - Minimal latency impact
- Observable - Comprehensive logging
- Configurable - Adjustable without code changes
Input Guardrails
Content Classification
from dataclasses import dataclass
from enum import Enum
from typing import Optional, List
import re
class ContentCategory(Enum):
SAFE = "safe"
POTENTIALLY_HARMFUL = "potentially_harmful"
HARMFUL = "harmful"
PII = "pii"
INJECTION_ATTEMPT = "injection_attempt"
OFF_TOPIC = "off_topic"
@dataclass
class ClassificationResult:
category: ContentCategory
confidence: float
triggered_rules: List[str]
should_block: bool
modified_input: Optional[str] = None
class InputClassifier:
"""Classify and filter user inputs before LLM processing."""
def __init__(self, config: dict):
self.config = config
self.harmful_patterns = self._load_harmful_patterns()
self.pii_patterns = self._load_pii_patterns()
self.injection_patterns = self._load_injection_patterns()
self.topic_classifier = TopicClassifier(config.get('allowed_topics', []))
def classify(self, user_input: str, context: dict = None) -> ClassificationResult:
"""
Classify user input and determine appropriate action.
"""
triggered_rules = []
category = ContentCategory.SAFE
confidence = 1.0
# Check for injection attempts (highest priority)
injection_check = self._check_injection(user_input)
if injection_check['detected']:
return ClassificationResult(
category=ContentCategory.INJECTION_ATTEMPT,
confidence=injection_check['confidence'],
triggered_rules=injection_check['rules'],
should_block=True
)
# Check for PII
pii_check = self._check_pii(user_input)
if pii_check['detected']:
triggered_rules.extend(pii_check['rules'])
if self.config.get('block_pii', True):
return ClassificationResult(
category=ContentCategory.PII,
confidence=pii_check['confidence'],
triggered_rules=triggered_rules,
should_block=False, # Redact instead of block
modified_input=self._redact_pii(user_input, pii_check['matches'])
)
# Check for harmful content
harmful_check = self._check_harmful_content(user_input)
if harmful_check['detected']:
return ClassificationResult(
category=ContentCategory.HARMFUL if harmful_check['severity'] == 'high'
else ContentCategory.POTENTIALLY_HARMFUL,
confidence=harmful_check['confidence'],
triggered_rules=harmful_check['rules'],
should_block=harmful_check['severity'] == 'high'
)
# Check topic relevance
if self.config.get('enforce_topic', False):
topic_check = self.topic_classifier.is_on_topic(user_input)
if not topic_check['on_topic']:
return ClassificationResult(
category=ContentCategory.OFF_TOPIC,
confidence=topic_check['confidence'],
triggered_rules=['off_topic'],
should_block=self.config.get('block_off_topic', False)
)
return ClassificationResult(
category=ContentCategory.SAFE,
confidence=1.0,
triggered_rules=triggered_rules,
should_block=False,
modified_input=user_input if triggered_rules else None
)
def _check_injection(self, text: str) -> dict:
"""Check for prompt injection attempts."""
detected_rules = []
max_confidence = 0.0
for pattern_name, pattern_config in self.injection_patterns.items():
pattern = pattern_config['pattern']
if re.search(pattern, text, re.IGNORECASE):
detected_rules.append(pattern_name)
max_confidence = max(max_confidence, pattern_config['confidence'])
return {
'detected': len(detected_rules) > 0,
'rules': detected_rules,
'confidence': max_confidence
}
def _check_pii(self, text: str) -> dict:
"""Check for personally identifiable information."""
matches = []
pii_types = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'ip_address': r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
}
for pii_type, pattern in pii_types.items():
found = re.findall(pattern, text)
if found:
matches.append({
'type': pii_type,
'matches': found,
'pattern': pattern
})
return {
'detected': len(matches) > 0,
'rules': [m['type'] for m in matches],
'matches': matches,
'confidence': 0.95 if matches else 0.0
}
def _redact_pii(self, text: str, matches: list) -> str:
"""Redact detected PII from text."""
redacted = text
for match in matches:
for value in match['matches']:
redacted = redacted.replace(value, f'[REDACTED_{match["type"].upper()}]')
return redacted
def _check_harmful_content(self, text: str) -> dict:
"""Check for harmful content patterns."""
detected = []
for category, patterns in self.harmful_patterns.items():
for pattern_config in patterns:
if re.search(pattern_config['pattern'], text, re.IGNORECASE):
detected.append({
'category': category,
'severity': pattern_config['severity'],
'confidence': pattern_config['confidence']
})
if not detected:
return {'detected': False, 'rules': [], 'severity': None, 'confidence': 0.0}
max_severity = max(d['severity'] for d in detected)
max_confidence = max(d['confidence'] for d in detected)
return {
'detected': True,
'rules': [d['category'] for d in detected],
'severity': max_severity,
'confidence': max_confidence
}Rate Limiting and Abuse Prevention
from collections import defaultdict
import time
class RateLimiter:
"""Rate limiting for LLM API access."""
def __init__(self, config: dict):
self.limits = config.get('limits', {
'requests_per_minute': 20,
'requests_per_hour': 100,
'tokens_per_minute': 40000,
'tokens_per_hour': 200000,
})
self.user_requests = defaultdict(list)
self.user_tokens = defaultdict(list)
self.blocked_users = {}
def check_rate_limit(self, user_id: str, estimated_tokens: int = 0) -> dict:
"""Check if user is within rate limits."""
now = time.time()
# Check if user is blocked
if user_id in self.blocked_users:
if now < self.blocked_users[user_id]:
return {
'allowed': False,
'reason': 'user_blocked',
'retry_after': int(self.blocked_users[user_id] - now)
}
else:
del self.blocked_users[user_id]
# Clean old entries
self._cleanup_old_entries(user_id, now)
# Check request rate
minute_requests = len([
r for r in self.user_requests[user_id]
if r > now - 60
])
if minute_requests >= self.limits['requests_per_minute']:
return {
'allowed': False,
'reason': 'requests_per_minute_exceeded',
'retry_after': 60
}
hour_requests = len([
r for r in self.user_requests[user_id]
if r > now - 3600
])
if hour_requests >= self.limits['requests_per_hour']:
return {
'allowed': False,
'reason': 'requests_per_hour_exceeded',
'retry_after': 3600
}
# Check token rate
minute_tokens = sum(
t['tokens'] for t in self.user_tokens[user_id]
if t['time'] > now - 60
)
if minute_tokens + estimated_tokens > self.limits['tokens_per_minute']:
return {
'allowed': False,
'reason': 'tokens_per_minute_exceeded',
'retry_after': 60
}
return {'allowed': True}
def record_usage(self, user_id: str, tokens_used: int):
"""Record API usage."""
now = time.time()
self.user_requests[user_id].append(now)
self.user_tokens[user_id].append({'time': now, 'tokens': tokens_used})
def block_user(self, user_id: str, duration_seconds: int):
"""Temporarily block a user."""
self.blocked_users[user_id] = time.time() + duration_seconds
def _cleanup_old_entries(self, user_id: str, now: float):
"""Remove entries older than 1 hour."""
cutoff = now - 3600
self.user_requests[user_id] = [
r for r in self.user_requests[user_id] if r > cutoff
]
self.user_tokens[user_id] = [
t for t in self.user_tokens[user_id] if t['time'] > cutoff
]Output Guardrails
Response Validation
class OutputValidator:
"""Validate LLM outputs before returning to users."""
def __init__(self, config: dict):
self.config = config
self.content_policy = ContentPolicy(config.get('content_policy', {}))
self.factuality_checker = FactualityChecker(config.get('factuality', {}))
self.brand_guidelines = BrandGuidelines(config.get('brand', {}))
def validate(self, response: str, context: dict) -> dict:
"""
Comprehensive validation of LLM output.
"""
validations = []
# Check content policy
policy_result = self.content_policy.check(response)
validations.append(('content_policy', policy_result))
# Check for hallucination indicators
if context.get('check_factuality', False):
factuality_result = self.factuality_checker.check(
response, context.get('sources', [])
)
validations.append(('factuality', factuality_result))
# Check brand guidelines
brand_result = self.brand_guidelines.check(response)
validations.append(('brand_guidelines', brand_result))
# Check for sensitive information leakage
leakage_result = self._check_information_leakage(response, context)
validations.append(('information_leakage', leakage_result))
# Aggregate results
all_passed = all(v[1]['passed'] for v in validations)
should_block = any(v[1].get('block', False) for v in validations)
return {
'valid': all_passed,
'should_block': should_block,
'validations': dict(validations),
'modified_response': self._apply_modifications(response, validations) if not should_block else None
}
def _check_information_leakage(self, response: str, context: dict) -> dict:
"""Check for leakage of sensitive information."""
issues = []
# Check for system prompt leakage
system_prompt = context.get('system_prompt', '')
if system_prompt:
# Check if significant portions of system prompt appear in response
prompt_words = set(system_prompt.lower().split())
response_words = set(response.lower().split())
overlap = prompt_words & response_words
if len(overlap) > len(prompt_words) * 0.3:
issues.append('potential_system_prompt_leakage')
# Check for internal information patterns
internal_patterns = [
r'api[_-]?key\s*[:=]',
r'password\s*[:=]',
r'secret\s*[:=]',
r'internal[_-]?use[_-]?only',
r'confidential',
]
for pattern in internal_patterns:
if re.search(pattern, response, re.IGNORECASE):
issues.append(f'sensitive_pattern: {pattern}')
return {
'passed': len(issues) == 0,
'issues': issues,
'block': 'potential_system_prompt_leakage' in issues
}
def _apply_modifications(self, response: str, validations: list) -> str:
"""Apply necessary modifications to response."""
modified = response
for check_name, result in validations:
if result.get('modifications'):
for mod in result['modifications']:
modified = mod['apply'](modified)
return modified
class ContentPolicy:
"""Enforce content policies on LLM outputs."""
def __init__(self, config: dict):
self.prohibited_categories = config.get('prohibited', [
'hate_speech', 'violence', 'sexual_content', 'self_harm',
'illegal_activity', 'misinformation'
])
self.classifiers = self._load_classifiers()
def check(self, text: str) -> dict:
"""Check text against content policies."""
violations = []
for category in self.prohibited_categories:
classifier = self.classifiers.get(category)
if classifier:
result = classifier.classify(text)
if result['score'] > result['threshold']:
violations.append({
'category': category,
'score': result['score'],
'threshold': result['threshold']
})
return {
'passed': len(violations) == 0,
'violations': violations,
'block': any(v['score'] > 0.9 for v in violations)
}
class BrandGuidelines:
"""Enforce brand voice and guidelines."""
def __init__(self, config: dict):
self.tone = config.get('tone', 'professional')
self.prohibited_terms = config.get('prohibited_terms', [])
self.required_disclaimers = config.get('disclaimers', {})
def check(self, text: str) -> dict:
"""Check text against brand guidelines."""
issues = []
modifications = []
# Check for prohibited terms
for term in self.prohibited_terms:
if term.lower() in text.lower():
issues.append(f'prohibited_term: {term}')
# Check for required disclaimers
for topic, disclaimer in self.required_disclaimers.items():
if self._topic_mentioned(text, topic) and disclaimer not in text:
modifications.append({
'type': 'add_disclaimer',
'disclaimer': disclaimer,
'apply': lambda t, d=disclaimer: t + f'\n\n{d}'
})
return {
'passed': len(issues) == 0,
'issues': issues,
'modifications': modifications
}
def _topic_mentioned(self, text: str, topic: str) -> bool:
"""Check if a topic is mentioned in text."""
topic_keywords = {
'medical': ['health', 'medical', 'diagnosis', 'treatment', 'symptom'],
'financial': ['invest', 'stock', 'financial', 'money', 'trading'],
'legal': ['legal', 'law', 'lawsuit', 'liability', 'contract'],
}
keywords = topic_keywords.get(topic, [topic])
return any(kw in text.lower() for kw in keywords)Behavioral Guardrails
class BehavioralGuardrails:
"""Enforce behavioral constraints on LLM interactions."""
def __init__(self, config: dict):
self.persona_config = config.get('persona', {})
self.interaction_limits = config.get('interaction_limits', {})
self.session_monitor = SessionMonitor()
def check_interaction(self, user_input: str, llm_response: str,
session_context: dict) -> dict:
"""Check interaction against behavioral constraints."""
checks = []
# Check persona consistency
persona_check = self._check_persona_consistency(llm_response)
checks.append(('persona', persona_check))
# Check for manipulation attempts
manipulation_check = self._detect_manipulation(
user_input, llm_response, session_context
)
checks.append(('manipulation', manipulation_check))
# Check response appropriateness
appropriateness_check = self._check_appropriateness(
user_input, llm_response
)
checks.append(('appropriateness', appropriateness_check))
# Check session patterns
session_check = self.session_monitor.check_session(
session_context['session_id'],
user_input,
llm_response
)
checks.append(('session', session_check))
return {
'passed': all(c[1]['passed'] for c in checks),
'checks': dict(checks),
'actions': self._determine_actions(checks)
}
def _check_persona_consistency(self, response: str) -> dict:
"""Ensure response maintains defined persona."""
violations = []
# Check for first-person statements that break persona
if self.persona_config.get('no_opinions', False):
opinion_patterns = [
r'\bI (think|believe|feel)\b',
r'\bIn my opinion\b',
r'\bPersonally,?\s*I\b',
]
for pattern in opinion_patterns:
if re.search(pattern, response, re.IGNORECASE):
violations.append('expressed_opinion')
# Check for claims about capabilities
if self.persona_config.get('capability_claims', []):
for claim in self.persona_config['capability_claims']:
if claim['pattern'].search(response):
if claim['allowed'] == False:
violations.append(f"capability_claim: {claim['name']}")
return {
'passed': len(violations) == 0,
'violations': violations
}
def _detect_manipulation(self, user_input: str, llm_response: str,
context: dict) -> dict:
"""Detect if LLM appears to be manipulated."""
indicators = []
# Check if response follows suspicious instruction pattern
instruction_ack_patterns = [
r'(?:okay|sure|alright),?\s*(?:I\'ll|let me)\s+(?:ignore|disregard)',
r'(?:as|per)\s+your\s+(?:new\s+)?instructions?',
r'switching\s+to\s+\w+\s+mode',
r'now\s+operating\s+(?:as|in)',
]
for pattern in instruction_ack_patterns:
if re.search(pattern, llm_response, re.IGNORECASE):
indicators.append('instruction_acknowledgment')
# Check for dramatic behavioral shift
if context.get('previous_responses'):
behavioral_shift = self._measure_behavioral_shift(
context['previous_responses'], llm_response
)
if behavioral_shift > 0.7:
indicators.append('behavioral_shift')
return {
'passed': len(indicators) == 0,
'indicators': indicators,
'block': 'instruction_acknowledgment' in indicators
}
def _check_appropriateness(self, user_input: str, response: str) -> dict:
"""Check if response is appropriate for the query."""
issues = []
# Check response length proportionality
input_len = len(user_input)
response_len = len(response)
if input_len < 20 and response_len > 2000:
issues.append('disproportionate_length')
# Check for off-topic tangents
# Use semantic similarity between input and response
similarity = self._compute_semantic_similarity(user_input, response)
if similarity < 0.3:
issues.append('off_topic_response')
return {
'passed': len(issues) == 0,
'issues': issues
}
class SessionMonitor:
"""Monitor session patterns for abuse detection."""
def __init__(self):
self.sessions = {}
self.abuse_patterns = self._load_abuse_patterns()
def check_session(self, session_id: str, user_input: str,
llm_response: str) -> dict:
"""Monitor session for concerning patterns."""
if session_id not in self.sessions:
self.sessions[session_id] = {
'interactions': [],
'flags': [],
'created_at': time.time()
}
session = self.sessions[session_id]
session['interactions'].append({
'input': user_input,
'response': llm_response,
'timestamp': time.time()
})
warnings = []
# Check for repeated injection attempts
injection_count = sum(
1 for i in session['interactions']
if self._looks_like_injection(i['input'])
)
if injection_count > 3:
warnings.append('repeated_injection_attempts')
# Check for escalating requests
if self._detect_escalation(session['interactions']):
warnings.append('escalating_requests')
# Check for jailbreak attempt patterns
if self._detect_jailbreak_pattern(session['interactions']):
warnings.append('jailbreak_pattern')
return {
'passed': len(warnings) == 0,
'warnings': warnings,
'session_risk': self._calculate_session_risk(session, warnings)
}Guardrail Orchestration
class GuardrailOrchestrator:
"""Orchestrate all guardrails for LLM interactions."""
def __init__(self, config: dict):
self.input_classifier = InputClassifier(config.get('input', {}))
self.rate_limiter = RateLimiter(config.get('rate_limit', {}))
self.output_validator = OutputValidator(config.get('output', {}))
self.behavioral_guardrails = BehavioralGuardrails(config.get('behavior', {}))
self.audit_logger = AuditLogger()
async def process_request(self, user_input: str, user_context: dict,
llm_client) -> dict:
"""Process a request through all guardrails."""
request_id = str(uuid.uuid4())
session_id = user_context.get('session_id')
user_id = user_context.get('user_id')
# Phase 1: Rate limiting
rate_check = self.rate_limiter.check_rate_limit(
user_id, estimated_tokens=len(user_input.split()) * 2
)
if not rate_check['allowed']:
self.audit_logger.log({
'event': 'rate_limited',
'request_id': request_id,
'user_id': user_id,
'reason': rate_check['reason']
})
return {
'success': False,
'error': 'rate_limited',
'retry_after': rate_check.get('retry_after')
}
# Phase 2: Input classification
input_result = self.input_classifier.classify(user_input, user_context)
if input_result.should_block:
self.audit_logger.log({
'event': 'input_blocked',
'request_id': request_id,
'user_id': user_id,
'category': input_result.category.value,
'rules': input_result.triggered_rules
})
return {
'success': False,
'error': 'input_policy_violation',
'category': input_result.category.value
}
# Use modified input if PII was redacted
processed_input = input_result.modified_input or user_input
# Phase 3: LLM call
try:
llm_response = await llm_client.complete(processed_input, user_context)
except Exception as e:
self.audit_logger.log({
'event': 'llm_error',
'request_id': request_id,
'error': str(e)
})
return {
'success': False,
'error': 'llm_error'
}
# Phase 4: Output validation
output_result = self.output_validator.validate(llm_response, {
'check_factuality': user_context.get('check_factuality', False),
'system_prompt': user_context.get('system_prompt', ''),
'sources': user_context.get('sources', [])
})
if output_result['should_block']:
self.audit_logger.log({
'event': 'output_blocked',
'request_id': request_id,
'validations': output_result['validations']
})
return {
'success': False,
'error': 'output_policy_violation'
}
final_response = output_result.get('modified_response', llm_response)
# Phase 5: Behavioral checks
behavioral_result = self.behavioral_guardrails.check_interaction(
user_input, final_response, {
'session_id': session_id,
'previous_responses': user_context.get('previous_responses', [])
}
)
if not behavioral_result['passed']:
for action in behavioral_result.get('actions', []):
self._execute_action(action, user_context)
# Record usage for rate limiting
self.rate_limiter.record_usage(user_id, len(final_response.split()))
# Comprehensive logging
self.audit_logger.log({
'event': 'request_completed',
'request_id': request_id,
'user_id': user_id,
'input_classification': input_result.category.value,
'output_valid': output_result['valid'],
'behavioral_passed': behavioral_result['passed'],
'response_length': len(final_response)
})
return {
'success': True,
'response': final_response,
'metadata': {
'request_id': request_id,
'input_modified': input_result.modified_input is not None,
'output_modified': output_result.get('modified_response') is not None,
'warnings': behavioral_result.get('warnings', [])
}
}Configuration Management
# guardrails_config.yaml
input:
content_classification:
enabled: true
block_harmful: true
block_pii: false # Redact instead
block_off_topic: false
injection_detection:
enabled: true
sensitivity: high
block_on_detection: true
rate_limit:
enabled: true
limits:
requests_per_minute: 20
requests_per_hour: 100
tokens_per_minute: 40000
tokens_per_hour: 200000
output:
content_policy:
enabled: true
prohibited:
- hate_speech
- violence
- sexual_content
- self_harm
- illegal_activity
brand_guidelines:
enabled: true
tone: professional
prohibited_terms:
- competitor_name
- internal_project_code
disclaimers:
medical: "This is not medical advice. Please consult a healthcare professional."
financial: "This is not financial advice. Please consult a qualified financial advisor."
legal: "This is not legal advice. Please consult a licensed attorney."
behavior:
persona:
no_opinions: true
maintain_role: true
session_monitoring:
enabled: true
max_interactions: 100
abuse_detection: true
audit:
enabled: true
log_inputs: true
log_outputs: false # PII consideration
retention_days: 90Conclusion
Robust guardrails are essential for deploying LLMs safely in production. The key is implementing multiple layers of protection that work together while remaining performant and maintainable.
Key principles:
- Layer your defenses - No single guardrail is sufficient
- Fail secure - Block when uncertain
- Monitor and adapt - New attacks require new defenses
- Balance safety and usability - Overly aggressive filtering frustrates users
- Log comprehensively - You need data to improve
At DeviDevs, we help organizations implement production-grade guardrail systems tailored to their specific use cases and risk profiles. Contact us to discuss your AI safety requirements.
Is your AI system compliant with the EU AI Act? Free risk assessment - find out in 2 minutes →