Building Robust LLM Guardrails: A Technical Implementation Guide
Guardrails are the safety mechanisms that keep LLM applications operating within defined boundaries. Without them, even the most capable models can produce harmful, inaccurate, or off-brand outputs that damage user trust and business reputation.
This guide provides production-ready implementations for building comprehensive guardrail systems.
Understanding Guardrail Architecture
A robust guardrail system operates at multiple layers:
User Input → Input Guardrails → LLM → Output Guardrails → User Response
↓ ↓
Block/Modify Block/Modify/Flag
↓ ↓
Audit Log Audit Log
Design Principles
- Defense in depth - Multiple independent checks
- Fail secure - When in doubt, block
- Performance aware - Minimal latency impact
- Observable - Comprehensive logging
- Configurable - Adjustable without code changes
Input Guardrails
Content Classification
from dataclasses import dataclass
from enum import Enum
from typing import Optional, List
import re
class ContentCategory(Enum):
SAFE = "safe"
POTENTIALLY_HARMFUL = "potentially_harmful"
HARMFUL = "harmful"
PII = "pii"
INJECTION_ATTEMPT = "injection_attempt"
OFF_TOPIC = "off_topic"
@dataclass
class ClassificationResult:
category: ContentCategory
confidence: float
triggered_rules: List[str]
should_block: bool
modified_input: Optional[str] = None
class InputClassifier:
"""Classify and filter user inputs before LLM processing."""
def __init__(self, config: dict):
self.config = config
self.harmful_patterns = self._load_harmful_patterns()
self.pii_patterns = self._load_pii_patterns()
self.injection_patterns = self._load_injection_patterns()
self.topic_classifier = TopicClassifier(config.get('allowed_topics', []))
def classify(self, user_input: str, context: dict = None) -> ClassificationResult:
"""
Classify user input and determine appropriate action.
"""
triggered_rules = []
category = ContentCategory.SAFE
confidence = 1.0
# Check for injection attempts (highest priority)
injection_check = self._check_injection(user_input)
if injection_check['detected']:
return ClassificationResult(
category=ContentCategory.INJECTION_ATTEMPT,
confidence=injection_check['confidence'],
triggered_rules=injection_check['rules'],
should_block=True
)
# Check for PII
pii_check = self._check_pii(user_input)
if pii_check['detected']:
triggered_rules.extend(pii_check['rules'])
if self.config.get('block_pii', True):
return ClassificationResult(
category=ContentCategory.PII,
confidence=pii_check['confidence'],
triggered_rules=triggered_rules,
should_block=False, # Redact instead of block
modified_input=self._redact_pii(user_input, pii_check['matches'])
)
# Check for harmful content
harmful_check = self._check_harmful_content(user_input)
if harmful_check['detected']:
return ClassificationResult(
category=ContentCategory.HARMFUL if harmful_check['severity'] == 'high'
else ContentCategory.POTENTIALLY_HARMFUL,
confidence=harmful_check['confidence'],
triggered_rules=harmful_check['rules'],
should_block=harmful_check['severity'] == 'high'
)
# Check topic relevance
if self.config.get('enforce_topic', False):
topic_check = self.topic_classifier.is_on_topic(user_input)
if not topic_check['on_topic']:
return ClassificationResult(
category=ContentCategory.OFF_TOPIC,
confidence=topic_check['confidence'],
triggered_rules=['off_topic'],
should_block=self.config.get('block_off_topic', False)
)
return ClassificationResult(
category=ContentCategory.SAFE,
confidence=1.0,
triggered_rules=triggered_rules,
should_block=False,
modified_input=user_input if triggered_rules else None
)
def _check_injection(self, text: str) -> dict:
"""Check for prompt injection attempts."""
detected_rules = []
max_confidence = 0.0
for pattern_name, pattern_config in self.injection_patterns.items():
pattern = pattern_config['pattern']
if re.search(pattern, text, re.IGNORECASE):
detected_rules.append(pattern_name)
max_confidence = max(max_confidence, pattern_config['confidence'])
return {
'detected': len(detected_rules) > 0,
'rules': detected_rules,
'confidence': max_confidence
}
def _check_pii(self, text: str) -> dict:
"""Check for personally identifiable information."""
matches = []
pii_types = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'credit_card': r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'ip_address': r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b',
}
for pii_type, pattern in pii_types.items():
found = re.findall(pattern, text)
if found:
matches.append({
'type': pii_type,
'matches': found,
'pattern': pattern
})
return {
'detected': len(matches) > 0,
'rules': [m['type'] for m in matches],
'matches': matches,
'confidence': 0.95 if matches else 0.0
}
def _redact_pii(self, text: str, matches: list) -> str:
"""Redact detected PII from text."""
redacted = text
for match in matches:
for value in match['matches']:
redacted = redacted.replace(value, f'[REDACTED_{match["type"].upper()}]')
return redacted
def _check_harmful_content(self, text: str) -> dict:
"""Check for harmful content patterns."""
detected = []
for category, patterns in self.harmful_patterns.items():
for pattern_config in patterns:
if re.search(pattern_config['pattern'], text, re.IGNORECASE):
detected.append({
'category': category,
'severity': pattern_config['severity'],
'confidence': pattern_config['confidence']
})
if not detected:
return {'detected': False, 'rules': [], 'severity': None, 'confidence': 0.0}
max_severity = max(d['severity'] for d in detected)
max_confidence = max(d['confidence'] for d in detected)
return {
'detected': True,
'rules': [d['category'] for d in detected],
'severity': max_severity,
'confidence': max_confidence
}Rate Limiting and Abuse Prevention
from collections import defaultdict
import time
class RateLimiter:
"""Rate limiting for LLM API access."""
def __init__(self, config: dict):
self.limits = config.get('limits', {
'requests_per_minute': 20,
'requests_per_hour': 100,
'tokens_per_minute': 40000,
'tokens_per_hour': 200000,
})
self.user_requests = defaultdict(list)
self.user_tokens = defaultdict(list)
self.blocked_users = {}
def check_rate_limit(self, user_id: str, estimated_tokens: int = 0) -> dict:
"""Check if user is within rate limits."""
now = time.time()
# Check if user is blocked
if user_id in self.blocked_users:
if now < self.blocked_users[user_id]:
return {
'allowed': False,
'reason': 'user_blocked',
'retry_after': int(self.blocked_users[user_id] - now)
}
else:
del self.blocked_users[user_id]
# Clean old entries
self._cleanup_old_entries(user_id, now)
# Check request rate
minute_requests = len([
r for r in self.user_requests[user_id]
if r > now - 60
])
if minute_requests >= self.limits['requests_per_minute']:
return {
'allowed': False,
'reason': 'requests_per_minute_exceeded',
'retry_after': 60
}
hour_requests = len([
r for r in self.user_requests[user_id]
if r > now - 3600
])
if hour_requests >= self.limits['requests_per_hour']:
return {
'allowed': False,
'reason': 'requests_per_hour_exceeded',
'retry_after': 3600
}
# Check token rate
minute_tokens = sum(
t['tokens'] for t in self.user_tokens[user_id]
if t['time'] > now - 60
)
if minute_tokens + estimated_tokens > self.limits['tokens_per_minute']:
return {
'allowed': False,
'reason': 'tokens_per_minute_exceeded',
'retry_after': 60
}
return {'allowed': True}
def record_usage(self, user_id: str, tokens_used: int):
"""Record API usage."""
now = time.time()
self.user_requests[user_id].append(now)
self.user_tokens[user_id].append({'time': now, 'tokens': tokens_used})
def block_user(self, user_id: str, duration_seconds: int):
"""Temporarily block a user."""
self.blocked_users[user_id] = time.time() + duration_seconds
def _cleanup_old_entries(self, user_id: str, now: float):
"""Remove entries older than 1 hour."""
cutoff = now - 3600
self.user_requests[user_id] = [
r for r in self.user_requests[user_id] if r > cutoff
]
self.user_tokens[user_id] = [
t for t in self.user_tokens[user_id] if t['time'] > cutoff
]Output Guardrails
Response Validation
class OutputValidator:
"""Validate LLM outputs before returning to users."""
def __init__(self, config: dict):
self.config = config
self.content_policy = ContentPolicy(config.get('content_policy', {}))
self.factuality_checker = FactualityChecker(config.get('factuality', {}))
self.brand_guidelines = BrandGuidelines(config.get('brand', {}))
def validate(self, response: str, context: dict) -> dict:
"""
Comprehensive validation of LLM output.
"""
validations = []
# Check content policy
policy_result = self.content_policy.check(response)
validations.append(('content_policy', policy_result))
# Check for hallucination indicators
if context.get('check_factuality', False):
factuality_result = self.factuality_checker.check(
response, context.get('sources', [])
)
validations.append(('factuality', factuality_result))
# Check brand guidelines
brand_result = self.brand_guidelines.check(response)
validations.append(('brand_guidelines', brand_result))
# Check for sensitive information leakage
leakage_result = self._check_information_leakage(response, context)
validations.append(('information_leakage', leakage_result))
# Aggregate results
all_passed = all(v[1]['passed'] for v in validations)
should_block = any(v[1].get('block', False) for v in validations)
return {
'valid': all_passed,
'should_block': should_block,
'validations': dict(validations),
'modified_response': self._apply_modifications(response, validations) if not should_block else None
}
def _check_information_leakage(self, response: str, context: dict) -> dict:
"""Check for leakage of sensitive information."""
issues = []
# Check for system prompt leakage
system_prompt = context.get('system_prompt', '')
if system_prompt:
# Check if significant portions of system prompt appear in response
prompt_words = set(system_prompt.lower().split())
response_words = set(response.lower().split())
overlap = prompt_words & response_words
if len(overlap) > len(prompt_words) * 0.3:
issues.append('potential_system_prompt_leakage')
# Check for internal information patterns
internal_patterns = [
r'api[_-]?key\s*[:=]',
r'password\s*[:=]',
r'secret\s*[:=]',
r'internal[_-]?use[_-]?only',
r'confidential',
]
for pattern in internal_patterns:
if re.search(pattern, response, re.IGNORECASE):
issues.append(f'sensitive_pattern: {pattern}')
return {
'passed': len(issues) == 0,
'issues': issues,
'block': 'potential_system_prompt_leakage' in issues
}
def _apply_modifications(self, response: str, validations: list) -> str:
"""Apply necessary modifications to response."""
modified = response
for check_name, result in validations:
if result.get('modifications'):
for mod in result['modifications']:
modified = mod['apply'](modified)
return modified
class ContentPolicy:
"""Enforce content policies on LLM outputs."""
def __init__(self, config: dict):
self.prohibited_categories = config.get('prohibited', [
'hate_speech', 'violence', 'sexual_content', 'self_harm',
'illegal_activity', 'misinformation'
])
self.classifiers = self._load_classifiers()
def check(self, text: str) -> dict:
"""Check text against content policies."""
violations = []
for category in self.prohibited_categories:
classifier = self.classifiers.get(category)
if classifier:
result = classifier.classify(text)
if result['score'] > result['threshold']:
violations.append({
'category': category,
'score': result['score'],
'threshold': result['threshold']
})
return {
'passed': len(violations) == 0,
'violations': violations,
'block': any(v['score'] > 0.9 for v in violations)
}
class BrandGuidelines:
"""Enforce brand voice and guidelines."""
def __init__(self, config: dict):
self.tone = config.get('tone', 'professional')
self.prohibited_terms = config.get('prohibited_terms', [])
self.required_disclaimers = config.get('disclaimers', {})
def check(self, text: str) -> dict:
"""Check text against brand guidelines."""
issues = []
modifications = []
# Check for prohibited terms
for term in self.prohibited_terms:
if term.lower() in text.lower():
issues.append(f'prohibited_term: {term}')
# Check for required disclaimers
for topic, disclaimer in self.required_disclaimers.items():
if self._topic_mentioned(text, topic) and disclaimer not in text:
modifications.append({
'type': 'add_disclaimer',
'disclaimer': disclaimer,
'apply': lambda t, d=disclaimer: t + f'\n\n{d}'
})
return {
'passed': len(issues) == 0,
'issues': issues,
'modifications': modifications
}
def _topic_mentioned(self, text: str, topic: str) -> bool:
"""Check if a topic is mentioned in text."""
topic_keywords = {
'medical': ['health', 'medical', 'diagnosis', 'treatment', 'symptom'],
'financial': ['invest', 'stock', 'financial', 'money', 'trading'],
'legal': ['legal', 'law', 'lawsuit', 'liability', 'contract'],
}
keywords = topic_keywords.get(topic, [topic])
return any(kw in text.lower() for kw in keywords)Behavioral Guardrails
class BehavioralGuardrails:
"""Enforce behavioral constraints on LLM interactions."""
def __init__(self, config: dict):
self.persona_config = config.get('persona', {})
self.interaction_limits = config.get('interaction_limits', {})
self.session_monitor = SessionMonitor()
def check_interaction(self, user_input: str, llm_response: str,
session_context: dict) -> dict:
"""Check interaction against behavioral constraints."""
checks = []
# Check persona consistency
persona_check = self._check_persona_consistency(llm_response)
checks.append(('persona', persona_check))
# Check for manipulation attempts
manipulation_check = self._detect_manipulation(
user_input, llm_response, session_context
)
checks.append(('manipulation', manipulation_check))
# Check response appropriateness
appropriateness_check = self._check_appropriateness(
user_input, llm_response
)
checks.append(('appropriateness', appropriateness_check))
# Check session patterns
session_check = self.session_monitor.check_session(
session_context['session_id'],
user_input,
llm_response
)
checks.append(('session', session_check))
return {
'passed': all(c[1]['passed'] for c in checks),
'checks': dict(checks),
'actions': self._determine_actions(checks)
}
def _check_persona_consistency(self, response: str) -> dict:
"""Ensure response maintains defined persona."""
violations = []
# Check for first-person statements that break persona
if self.persona_config.get('no_opinions', False):
opinion_patterns = [
r'\bI (think|believe|feel)\b',
r'\bIn my opinion\b',
r'\bPersonally,?\s*I\b',
]
for pattern in opinion_patterns:
if re.search(pattern, response, re.IGNORECASE):
violations.append('expressed_opinion')
# Check for claims about capabilities
if self.persona_config.get('capability_claims', []):
for claim in self.persona_config['capability_claims']:
if claim['pattern'].search(response):
if claim['allowed'] == False:
violations.append(f"capability_claim: {claim['name']}")
return {
'passed': len(violations) == 0,
'violations': violations
}
def _detect_manipulation(self, user_input: str, llm_response: str,
context: dict) -> dict:
"""Detect if LLM appears to be manipulated."""
indicators = []
# Check if response follows suspicious instruction pattern
instruction_ack_patterns = [
r'(?:okay|sure|alright),?\s*(?:I\'ll|let me)\s+(?:ignore|disregard)',
r'(?:as|per)\s+your\s+(?:new\s+)?instructions?',
r'switching\s+to\s+\w+\s+mode',
r'now\s+operating\s+(?:as|in)',
]
for pattern in instruction_ack_patterns:
if re.search(pattern, llm_response, re.IGNORECASE):
indicators.append('instruction_acknowledgment')
# Check for dramatic behavioral shift
if context.get('previous_responses'):
behavioral_shift = self._measure_behavioral_shift(
context['previous_responses'], llm_response
)
if behavioral_shift > 0.7:
indicators.append('behavioral_shift')
return {
'passed': len(indicators) == 0,
'indicators': indicators,
'block': 'instruction_acknowledgment' in indicators
}
def _check_appropriateness(self, user_input: str, response: str) -> dict:
"""Check if response is appropriate for the query."""
issues = []
# Check response length proportionality
input_len = len(user_input)
response_len = len(response)
if input_len < 20 and response_len > 2000:
issues.append('disproportionate_length')
# Check for off-topic tangents
# Use semantic similarity between input and response
similarity = self._compute_semantic_similarity(user_input, response)
if similarity < 0.3:
issues.append('off_topic_response')
return {
'passed': len(issues) == 0,
'issues': issues
}
class SessionMonitor:
"""Monitor session patterns for abuse detection."""
def __init__(self):
self.sessions = {}
self.abuse_patterns = self._load_abuse_patterns()
def check_session(self, session_id: str, user_input: str,
llm_response: str) -> dict:
"""Monitor session for concerning patterns."""
if session_id not in self.sessions:
self.sessions[session_id] = {
'interactions': [],
'flags': [],
'created_at': time.time()
}
session = self.sessions[session_id]
session['interactions'].append({
'input': user_input,
'response': llm_response,
'timestamp': time.time()
})
warnings = []
# Check for repeated injection attempts
injection_count = sum(
1 for i in session['interactions']
if self._looks_like_injection(i['input'])
)
if injection_count > 3:
warnings.append('repeated_injection_attempts')
# Check for escalating requests
if self._detect_escalation(session['interactions']):
warnings.append('escalating_requests')
# Check for jailbreak attempt patterns
if self._detect_jailbreak_pattern(session['interactions']):
warnings.append('jailbreak_pattern')
return {
'passed': len(warnings) == 0,
'warnings': warnings,
'session_risk': self._calculate_session_risk(session, warnings)
}Guardrail Orchestration
class GuardrailOrchestrator:
"""Orchestrate all guardrails for LLM interactions."""
def __init__(self, config: dict):
self.input_classifier = InputClassifier(config.get('input', {}))
self.rate_limiter = RateLimiter(config.get('rate_limit', {}))
self.output_validator = OutputValidator(config.get('output', {}))
self.behavioral_guardrails = BehavioralGuardrails(config.get('behavior', {}))
self.audit_logger = AuditLogger()
async def process_request(self, user_input: str, user_context: dict,
llm_client) -> dict:
"""Process a request through all guardrails."""
request_id = str(uuid.uuid4())
session_id = user_context.get('session_id')
user_id = user_context.get('user_id')
# Phase 1: Rate limiting
rate_check = self.rate_limiter.check_rate_limit(
user_id, estimated_tokens=len(user_input.split()) * 2
)
if not rate_check['allowed']:
self.audit_logger.log({
'event': 'rate_limited',
'request_id': request_id,
'user_id': user_id,
'reason': rate_check['reason']
})
return {
'success': False,
'error': 'rate_limited',
'retry_after': rate_check.get('retry_after')
}
# Phase 2: Input classification
input_result = self.input_classifier.classify(user_input, user_context)
if input_result.should_block:
self.audit_logger.log({
'event': 'input_blocked',
'request_id': request_id,
'user_id': user_id,
'category': input_result.category.value,
'rules': input_result.triggered_rules
})
return {
'success': False,
'error': 'input_policy_violation',
'category': input_result.category.value
}
# Use modified input if PII was redacted
processed_input = input_result.modified_input or user_input
# Phase 3: LLM call
try:
llm_response = await llm_client.complete(processed_input, user_context)
except Exception as e:
self.audit_logger.log({
'event': 'llm_error',
'request_id': request_id,
'error': str(e)
})
return {
'success': False,
'error': 'llm_error'
}
# Phase 4: Output validation
output_result = self.output_validator.validate(llm_response, {
'check_factuality': user_context.get('check_factuality', False),
'system_prompt': user_context.get('system_prompt', ''),
'sources': user_context.get('sources', [])
})
if output_result['should_block']:
self.audit_logger.log({
'event': 'output_blocked',
'request_id': request_id,
'validations': output_result['validations']
})
return {
'success': False,
'error': 'output_policy_violation'
}
final_response = output_result.get('modified_response', llm_response)
# Phase 5: Behavioral checks
behavioral_result = self.behavioral_guardrails.check_interaction(
user_input, final_response, {
'session_id': session_id,
'previous_responses': user_context.get('previous_responses', [])
}
)
if not behavioral_result['passed']:
for action in behavioral_result.get('actions', []):
self._execute_action(action, user_context)
# Record usage for rate limiting
self.rate_limiter.record_usage(user_id, len(final_response.split()))
# Comprehensive logging
self.audit_logger.log({
'event': 'request_completed',
'request_id': request_id,
'user_id': user_id,
'input_classification': input_result.category.value,
'output_valid': output_result['valid'],
'behavioral_passed': behavioral_result['passed'],
'response_length': len(final_response)
})
return {
'success': True,
'response': final_response,
'metadata': {
'request_id': request_id,
'input_modified': input_result.modified_input is not None,
'output_modified': output_result.get('modified_response') is not None,
'warnings': behavioral_result.get('warnings', [])
}
}Configuration Management
# guardrails_config.yaml
input:
content_classification:
enabled: true
block_harmful: true
block_pii: false # Redact instead
block_off_topic: false
injection_detection:
enabled: true
sensitivity: high
block_on_detection: true
rate_limit:
enabled: true
limits:
requests_per_minute: 20
requests_per_hour: 100
tokens_per_minute: 40000
tokens_per_hour: 200000
output:
content_policy:
enabled: true
prohibited:
- hate_speech
- violence
- sexual_content
- self_harm
- illegal_activity
brand_guidelines:
enabled: true
tone: professional
prohibited_terms:
- competitor_name
- internal_project_code
disclaimers:
medical: "This is not medical advice. Please consult a healthcare professional."
financial: "This is not financial advice. Please consult a qualified financial advisor."
legal: "This is not legal advice. Please consult a licensed attorney."
behavior:
persona:
no_opinions: true
maintain_role: true
session_monitoring:
enabled: true
max_interactions: 100
abuse_detection: true
audit:
enabled: true
log_inputs: true
log_outputs: false # PII consideration
retention_days: 90Conclusion
Robust guardrails are essential for deploying LLMs safely in production. The key is implementing multiple layers of protection that work together while remaining performant and maintainable.
Key principles:
- Layer your defenses - No single guardrail is sufficient
- Fail secure - Block when uncertain
- Monitor and adapt - New attacks require new defenses
- Balance safety and usability - Overly aggressive filtering frustrates users
- Log comprehensively - You need data to improve
At DeviDevs, we help organizations implement production-grade guardrail systems tailored to their specific use cases and risk profiles. Contact us to discuss your AI safety requirements.