Guardrails are essential for deploying LLM applications safely in production. This guide covers comprehensive guardrail implementation including input validation, output filtering, and real-time content moderation.
Guardrails Architecture
Core Guardrail Framework
# guardrails_framework.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
from datetime import datetime
import re
import asyncio
class GuardrailAction(Enum):
ALLOW = "allow"
BLOCK = "block"
MODIFY = "modify"
FLAG = "flag"
ESCALATE = "escalate"
class GuardrailType(Enum):
INPUT = "input"
OUTPUT = "output"
CONTEXT = "context"
@dataclass
class GuardrailResult:
passed: bool
action: GuardrailAction
guardrail_name: str
reason: Optional[str] = None
modified_content: Optional[str] = None
confidence: float = 1.0
metadata: Dict = field(default_factory=dict)
@dataclass
class GuardrailConfig:
name: str
guardrail_type: GuardrailType
enabled: bool
check_function: Callable
action_on_fail: GuardrailAction
priority: int = 50
timeout_ms: int = 5000
class GuardrailsEngine:
"""Core guardrails engine for LLM applications"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.input_guardrails: List[GuardrailConfig] = []
self.output_guardrails: List[GuardrailConfig] = []
self.context_guardrails: List[GuardrailConfig] = []
self.results_log = []
def add_guardrail(self, guardrail: GuardrailConfig):
"""Add a guardrail to the engine"""
if guardrail.guardrail_type == GuardrailType.INPUT:
self.input_guardrails.append(guardrail)
self.input_guardrails.sort(key=lambda g: g.priority)
elif guardrail.guardrail_type == GuardrailType.OUTPUT:
self.output_guardrails.append(guardrail)
self.output_guardrails.sort(key=lambda g: g.priority)
else:
self.context_guardrails.append(guardrail)
self.context_guardrails.sort(key=lambda g: g.priority)
async def check_input(self, content: str, context: Dict = None) -> List[GuardrailResult]:
"""Run all input guardrails"""
return await self._run_guardrails(
self.input_guardrails, content, context
)
async def check_output(self, content: str, context: Dict = None) -> List[GuardrailResult]:
"""Run all output guardrails"""
return await self._run_guardrails(
self.output_guardrails, content, context
)
async def _run_guardrails(
self,
guardrails: List[GuardrailConfig],
content: str,
context: Dict = None
) -> List[GuardrailResult]:
"""Execute guardrails in priority order"""
results = []
current_content = content
for guardrail in guardrails:
if not guardrail.enabled:
continue
try:
result = await asyncio.wait_for(
guardrail.check_function(current_content, context),
timeout=guardrail.timeout_ms / 1000
)
results.append(result)
# Handle blocking
if not result.passed and guardrail.action_on_fail == GuardrailAction.BLOCK:
break
# Handle modification
if result.modified_content:
current_content = result.modified_content
except asyncio.TimeoutError:
results.append(GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name=guardrail.name,
reason="Guardrail timeout"
))
except Exception as e:
results.append(GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name=guardrail.name,
reason=f"Guardrail error: {str(e)}"
))
self._log_results(content, results)
return results
def _log_results(self, content: str, results: List[GuardrailResult]):
"""Log guardrail execution results"""
self.results_log.append({
"timestamp": datetime.utcnow().isoformat(),
"content_preview": content[:100],
"results": [
{
"name": r.guardrail_name,
"passed": r.passed,
"action": r.action.value,
"reason": r.reason
}
for r in results
]
})
def get_blocking_result(self, results: List[GuardrailResult]) -> Optional[GuardrailResult]:
"""Get first blocking result if any"""
for result in results:
if not result.passed and result.action == GuardrailAction.BLOCK:
return result
return NoneInput Validation Guardrails
Prompt Injection Detection
# input_guardrails.py
import re
from typing import Dict, Optional
async def check_prompt_injection(content: str, context: Dict = None) -> GuardrailResult:
"""Detect prompt injection attempts"""
injection_patterns = [
# Direct instruction override
r"ignore\s+(all\s+)?(previous|above|prior)\s+(instructions?|prompts?)",
r"disregard\s+(all\s+)?(previous|above|prior)",
r"forget\s+(everything|all)\s+(above|before)",
# Role manipulation
r"you\s+are\s+now\s+(a|an|the)",
r"act\s+as\s+(if\s+you\s+are\s+)?(a|an|the)",
r"pretend\s+(to\s+be|you\s+are)",
r"roleplay\s+as",
# System prompt extraction
r"(what|reveal|show|tell)\s+(is|me)\s+(your|the)\s+system\s+prompt",
r"print\s+(your\s+)?instructions",
r"output\s+(your\s+)?system\s+(message|prompt)",
# Jailbreak keywords
r"(DAN|STAN|DUDE)\s*mode",
r"developer\s+mode",
r"jailbreak",
r"bypass\s+(safety|restrictions|filters)",
# Delimiter attacks
r"```\s*(system|assistant|user)\s*\n",
r"\[INST\]|\[/INST\]",
r"<\|im_(start|end)\|>",
# Encoding attempts
r"base64\s*[:=]",
r"decode\s+(this|the\s+following)",
r"execute\s+(this|the\s+following)"
]
content_lower = content.lower()
for pattern in injection_patterns:
if re.search(pattern, content_lower, re.IGNORECASE):
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="prompt_injection_detection",
reason=f"Potential prompt injection detected",
confidence=0.9,
metadata={"pattern_matched": pattern}
)
# Check for suspicious character patterns
if _has_suspicious_encoding(content):
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="prompt_injection_detection",
reason="Suspicious encoding detected",
confidence=0.7
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="prompt_injection_detection"
)
def _has_suspicious_encoding(content: str) -> bool:
"""Check for suspicious encoding patterns"""
# High concentration of special characters
special_ratio = len(re.findall(r'[^\w\s]', content)) / max(len(content), 1)
if special_ratio > 0.3:
return True
# Unicode escape sequences
if re.search(r'\\u[0-9a-fA-F]{4}', content):
return True
# Excessive whitespace manipulation
if re.search(r'\s{10,}', content):
return True
return FalseInput Length and Content Limits
async def check_input_limits(content: str, context: Dict = None) -> GuardrailResult:
"""Enforce input length and content limits"""
config = context.get("limits_config", {}) if context else {}
max_length = config.get("max_length", 10000)
max_lines = config.get("max_lines", 500)
max_words = config.get("max_words", 2000)
issues = []
# Length check
if len(content) > max_length:
issues.append(f"Content exceeds maximum length of {max_length} characters")
# Line count
line_count = content.count('\n') + 1
if line_count > max_lines:
issues.append(f"Content exceeds maximum of {max_lines} lines")
# Word count
word_count = len(content.split())
if word_count > max_words:
issues.append(f"Content exceeds maximum of {max_words} words")
if issues:
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="input_limits",
reason="; ".join(issues),
metadata={
"length": len(content),
"lines": line_count,
"words": word_count
}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="input_limits"
)
async def check_pii_in_input(content: str, context: Dict = None) -> GuardrailResult:
"""Detect and optionally redact PII in input"""
pii_patterns = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b(?:\+1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b",
"ip_address": r"\b(?:\d{1,3}\.){3}\d{1,3}\b"
}
found_pii = {}
redacted_content = content
for pii_type, pattern in pii_patterns.items():
matches = re.findall(pattern, content)
if matches:
found_pii[pii_type] = len(matches)
# Redact if configured
if context and context.get("redact_pii", False):
redacted_content = re.sub(
pattern,
f"[REDACTED_{pii_type.upper()}]",
redacted_content
)
if found_pii:
action = GuardrailAction.MODIFY if context and context.get("redact_pii") else GuardrailAction.FLAG
return GuardrailResult(
passed=False,
action=action,
guardrail_name="pii_detection",
reason=f"PII detected: {', '.join(found_pii.keys())}",
modified_content=redacted_content if action == GuardrailAction.MODIFY else None,
metadata={"pii_found": found_pii}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="pii_detection"
)Output Filtering Guardrails
Content Safety Classifier
# output_guardrails.py
from typing import Dict, List
import aiohttp
class ContentSafetyClassifier:
"""ML-based content safety classification"""
CATEGORIES = [
"hate_speech",
"violence",
"self_harm",
"sexual_content",
"harassment",
"dangerous_content"
]
def __init__(self, api_endpoint: str, api_key: str):
self.api_endpoint = api_endpoint
self.api_key = api_key
async def classify(self, content: str) -> Dict:
"""Classify content for safety issues"""
async with aiohttp.ClientSession() as session:
async with session.post(
self.api_endpoint,
headers={"Authorization": f"Bearer {self.api_key}"},
json={"content": content}
) as response:
return await response.json()
async def check_content_safety(content: str, context: Dict = None) -> GuardrailResult:
"""Check output content for safety issues"""
# Keyword-based quick check first
harmful_keywords = {
"violence": ["kill", "murder", "attack", "weapon", "bomb"],
"self_harm": ["suicide", "self-harm", "hurt myself"],
"dangerous": ["how to make", "instructions for", "step by step to create"]
}
flagged_categories = []
for category, keywords in harmful_keywords.items():
content_lower = content.lower()
for keyword in keywords:
if keyword in content_lower:
flagged_categories.append(category)
break
# If keywords found, do deeper analysis
if flagged_categories:
# In production, call ML classifier here
# classifier = ContentSafetyClassifier(...)
# result = await classifier.classify(content)
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="content_safety",
reason=f"Content flagged for: {', '.join(flagged_categories)}",
confidence=0.8,
metadata={"categories": flagged_categories}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="content_safety"
)
async def check_hallucination_indicators(
content: str,
context: Dict = None
) -> GuardrailResult:
"""Detect potential hallucination indicators"""
indicators = []
# Check for confident claims about uncertain things
uncertainty_phrases = [
"I'm not sure",
"I don't have information",
"I cannot verify",
"As of my knowledge cutoff"
]
certainty_phrases = [
"definitely",
"certainly",
"absolutely",
"without a doubt",
"100%"
]
# Contradiction detection
has_uncertainty = any(phrase.lower() in content.lower() for phrase in uncertainty_phrases)
has_certainty = any(phrase.lower() in content.lower() for phrase in certainty_phrases)
if has_uncertainty and has_certainty:
indicators.append("contradictory_certainty")
# Check for fabricated citations
fake_citation_patterns = [
r"according to a \d{4} study",
r"research from \w+ University shows",
r"Dr\. \w+ \w+ stated"
]
for pattern in fake_citation_patterns:
if re.search(pattern, content):
indicators.append("potential_fabricated_citation")
break
# Check for specific numbers that might be hallucinated
if re.search(r"\d{1,2}\.\d{1,2}%", content):
indicators.append("specific_statistics")
if indicators:
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="hallucination_detection",
reason=f"Potential hallucination indicators: {', '.join(indicators)}",
confidence=0.6,
metadata={"indicators": indicators}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="hallucination_detection"
)Topic and Scope Enforcement
async def check_topic_adherence(content: str, context: Dict = None) -> GuardrailResult:
"""Ensure response stays within allowed topics"""
allowed_topics = context.get("allowed_topics", []) if context else []
blocked_topics = context.get("blocked_topics", []) if context else []
if not allowed_topics and not blocked_topics:
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="topic_adherence"
)
content_lower = content.lower()
# Check blocked topics
for topic in blocked_topics:
topic_keywords = topic.get("keywords", [])
for keyword in topic_keywords:
if keyword.lower() in content_lower:
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="topic_adherence",
reason=f"Response contains blocked topic: {topic.get('name')}",
metadata={"blocked_topic": topic.get("name")}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="topic_adherence"
)
async def check_response_format(content: str, context: Dict = None) -> GuardrailResult:
"""Validate response follows expected format"""
expected_format = context.get("expected_format") if context else None
if not expected_format:
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="response_format"
)
issues = []
if expected_format == "json":
try:
import json
json.loads(content)
except json.JSONDecodeError as e:
issues.append(f"Invalid JSON: {str(e)}")
elif expected_format == "markdown":
# Basic markdown validation
if not re.search(r'(^#|\*\*|__|```)', content, re.MULTILINE):
issues.append("Expected markdown formatting not found")
elif expected_format == "bullet_list":
if not re.search(r'^[\-\*•]\s', content, re.MULTILINE):
issues.append("Expected bullet list format")
if issues:
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="response_format",
reason="; ".join(issues),
metadata={"expected_format": expected_format}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="response_format"
)Real-Time Moderation
Streaming Content Moderation
# streaming_moderation.py
from typing import AsyncGenerator, Tuple
class StreamingModerator:
"""Real-time moderation for streaming LLM responses"""
def __init__(self, guardrails: GuardrailsEngine):
self.guardrails = guardrails
self.buffer = ""
self.buffer_size = 100 # Characters to buffer before checking
async def moderate_stream(
self,
token_stream: AsyncGenerator[str, None]
) -> AsyncGenerator[Tuple[str, bool], None]:
"""Moderate streaming tokens in real-time"""
async for token in token_stream:
self.buffer += token
# Check when buffer reaches threshold
if len(self.buffer) >= self.buffer_size:
results = await self.guardrails.check_output(self.buffer)
blocking = self.guardrails.get_blocking_result(results)
if blocking:
# Stop streaming and return blocked status
yield ("", False)
return
# Emit buffered content
yield (self.buffer, True)
self.buffer = ""
# Emit remaining buffer
if self.buffer:
results = await self.guardrails.check_output(self.buffer)
blocking = self.guardrails.get_blocking_result(results)
if blocking:
yield ("", False)
else:
yield (self.buffer, True)
class IncrementalModerator:
"""Incremental moderation with lookback"""
def __init__(self, window_size: int = 500):
self.window_size = window_size
self.full_response = ""
async def check_increment(
self,
new_content: str,
context: Dict = None
) -> GuardrailResult:
"""Check new content increment with context window"""
self.full_response += new_content
# Get window of recent content
window_start = max(0, len(self.full_response) - self.window_size)
check_content = self.full_response[window_start:]
# Quick pattern checks on increment
dangerous_patterns = [
r"rm\s+-rf",
r"DROP\s+TABLE",
r"<script>",
r"eval\(",
r"exec\("
]
for pattern in dangerous_patterns:
if re.search(pattern, check_content, re.IGNORECASE):
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="incremental_moderation",
reason=f"Dangerous pattern detected",
metadata={"pattern": pattern}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="incremental_moderation"
)Integration Example
Complete Guardrails Pipeline
# guardrails_pipeline.py
async def create_guardrailed_completion(
prompt: str,
system_prompt: str,
llm_client,
guardrails: GuardrailsEngine,
context: Dict = None
) -> Dict:
"""Complete LLM call with guardrails"""
# Input guardrails
input_results = await guardrails.check_input(prompt, context)
blocking = guardrails.get_blocking_result(input_results)
if blocking:
return {
"success": False,
"blocked": True,
"stage": "input",
"reason": blocking.reason,
"response": None
}
# Get potentially modified input
modified_prompt = prompt
for result in input_results:
if result.modified_content:
modified_prompt = result.modified_content
# Call LLM
try:
response = await llm_client.generate(
system_prompt=system_prompt,
user_prompt=modified_prompt
)
except Exception as e:
return {
"success": False,
"blocked": False,
"stage": "llm",
"reason": str(e),
"response": None
}
# Output guardrails
output_results = await guardrails.check_output(response, context)
blocking = guardrails.get_blocking_result(output_results)
if blocking:
return {
"success": False,
"blocked": True,
"stage": "output",
"reason": blocking.reason,
"response": None
}
# Get potentially modified output
final_response = response
for result in output_results:
if result.modified_content:
final_response = result.modified_content
# Check for flags that need attention
flags = [r for r in input_results + output_results if r.action == GuardrailAction.FLAG]
return {
"success": True,
"blocked": False,
"response": final_response,
"flags": [{"name": f.guardrail_name, "reason": f.reason} for f in flags],
"input_modified": modified_prompt != prompt,
"output_modified": final_response != response
}
# Usage example
async def main():
# Initialize guardrails engine
guardrails = GuardrailsEngine()
# Add input guardrails
guardrails.add_guardrail(GuardrailConfig(
name="prompt_injection",
guardrail_type=GuardrailType.INPUT,
enabled=True,
check_function=check_prompt_injection,
action_on_fail=GuardrailAction.BLOCK,
priority=10
))
guardrails.add_guardrail(GuardrailConfig(
name="pii_detection",
guardrail_type=GuardrailType.INPUT,
enabled=True,
check_function=check_pii_in_input,
action_on_fail=GuardrailAction.MODIFY,
priority=20
))
# Add output guardrails
guardrails.add_guardrail(GuardrailConfig(
name="content_safety",
guardrail_type=GuardrailType.OUTPUT,
enabled=True,
check_function=check_content_safety,
action_on_fail=GuardrailAction.BLOCK,
priority=10
))
guardrails.add_guardrail(GuardrailConfig(
name="hallucination_check",
guardrail_type=GuardrailType.OUTPUT,
enabled=True,
check_function=check_hallucination_indicators,
action_on_fail=GuardrailAction.FLAG,
priority=20
))
# Use in application
result = await create_guardrailed_completion(
prompt="User question here",
system_prompt="You are a helpful assistant",
llm_client=my_llm_client,
guardrails=guardrails,
context={"redact_pii": True}
)
print(result)Summary
Effective LLM guardrails require:
- Input validation: Block prompt injection and validate content
- Output filtering: Check for harmful content and hallucinations
- Real-time moderation: Handle streaming responses safely
- Layered defense: Multiple guardrails with different priorities
- Configurable actions: Block, modify, flag, or escalate
- Comprehensive logging: Track all guardrail decisions
Deploy guardrails as the first and last line of defense for all LLM interactions to ensure safe, reliable AI applications.