Guardrails are essential for deploying LLM applications safely in production. This guide covers comprehensive guardrail implementation including input validation, output filtering, and real-time content moderation.
Guardrails Architecture
Core Guardrail Framework
# guardrails_framework.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
from datetime import datetime
import re
import asyncio
class GuardrailAction(Enum):
ALLOW = "allow"
BLOCK = "block"
MODIFY = "modify"
FLAG = "flag"
ESCALATE = "escalate"
class GuardrailType(Enum):
INPUT = "input"
OUTPUT = "output"
CONTEXT = "context"
@dataclass
class GuardrailResult:
passed: bool
action: GuardrailAction
guardrail_name: str
reason: Optional[str] = None
modified_content: Optional[str] = None
confidence: float = 1.0
metadata: Dict = field(default_factory=dict)
@dataclass
class GuardrailConfig:
name: str
guardrail_type: GuardrailType
enabled: bool
check_function: Callable
action_on_fail: GuardrailAction
priority: int = 50
timeout_ms: int = 5000
class GuardrailsEngine:
"""Core guardrails engine for LLM applications"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.input_guardrails: List[GuardrailConfig] = []
self.output_guardrails: List[GuardrailConfig] = []
self.context_guardrails: List[GuardrailConfig] = []
self.results_log = []
def add_guardrail(self, guardrail: GuardrailConfig):
"""Add a guardrail to the engine"""
if guardrail.guardrail_type == GuardrailType.INPUT:
self.input_guardrails.append(guardrail)
self.input_guardrails.sort(key=lambda g: g.priority)
elif guardrail.guardrail_type == GuardrailType.OUTPUT:
self.output_guardrails.append(guardrail)
self.output_guardrails.sort(key=lambda g: g.priority)
else:
self.context_guardrails.append(guardrail)
self.context_guardrails.sort(key=lambda g: g.priority)
async def check_input(self, content: str, context: Dict = None) -> List[GuardrailResult]:
"""Run all input guardrails"""
return await self._run_guardrails(
self.input_guardrails, content, context
)
async def check_output(self, content: str, context: Dict = None) -> List[GuardrailResult]:
"""Run all output guardrails"""
return await self._run_guardrails(
self.output_guardrails, content, context
)
async def _run_guardrails(
self,
guardrails: List[GuardrailConfig],
content: str,
context: Dict = None
) -> List[GuardrailResult]:
"""Execute guardrails in priority order"""
results = []
current_content = content
for guardrail in guardrails:
if not guardrail.enabled:
continue
try:
result = await asyncio.wait_for(
guardrail.check_function(current_content, context),
timeout=guardrail.timeout_ms / 1000
)
results.append(result)
# Handle blocking
if not result.passed and guardrail.action_on_fail == GuardrailAction.BLOCK:
break
# Handle modification
if result.modified_content:
current_content = result.modified_content
except asyncio.TimeoutError:
results.append(GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name=guardrail.name,
reason="Guardrail timeout"
))
except Exception as e:
results.append(GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name=guardrail.name,
reason=f"Guardrail error: {str(e)}"
))
self._log_results(content, results)
return results
def _log_results(self, content: str, results: List[GuardrailResult]):
"""Log guardrail execution results"""
self.results_log.append({
"timestamp": datetime.utcnow().isoformat(),
"content_preview": content[:100],
"results": [
{
"name": r.guardrail_name,
"passed": r.passed,
"action": r.action.value,
"reason": r.reason
}
for r in results
]
})
def get_blocking_result(self, results: List[GuardrailResult]) -> Optional[GuardrailResult]:
"""Get first blocking result if any"""
for result in results:
if not result.passed and result.action == GuardrailAction.BLOCK:
return result
return NoneInput Validation Guardrails
Prompt Injection Detection
# input_guardrails.py
import re
from typing import Dict, Optional
async def check_prompt_injection(content: str, context: Dict = None) -> GuardrailResult:
"""Detect prompt injection attempts"""
injection_patterns = [
# Direct instruction override
r"ignore\s+(all\s+)?(previous|above|prior)\s+(instructions?|prompts?)",
r"disregard\s+(all\s+)?(previous|above|prior)",
r"forget\s+(everything|all)\s+(above|before)",
# Role manipulation
r"you\s+are\s+now\s+(a|an|the)",
r"act\s+as\s+(if\s+you\s+are\s+)?(a|an|the)",
r"pretend\s+(to\s+be|you\s+are)",
r"roleplay\s+as",
# System prompt extraction
r"(what|reveal|show|tell)\s+(is|me)\s+(your|the)\s+system\s+prompt",
r"print\s+(your\s+)?instructions",
r"output\s+(your\s+)?system\s+(message|prompt)",
# Jailbreak keywords
r"(DAN|STAN|DUDE)\s*mode",
r"developer\s+mode",
r"jailbreak",
r"bypass\s+(safety|restrictions|filters)",
# Delimiter attacks
r"```\s*(system|assistant|user)\s*\n",
r"\[INST\]|\[/INST\]",
r"<\|im_(start|end)\|>",
# Encoding attempts
r"base64\s*[:=]",
r"decode\s+(this|the\s+following)",
r"execute\s+(this|the\s+following)"
]
content_lower = content.lower()
for pattern in injection_patterns:
if re.search(pattern, content_lower, re.IGNORECASE):
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="prompt_injection_detection",
reason=f"Potential prompt injection detected",
confidence=0.9,
metadata={"pattern_matched": pattern}
)
# Check for suspicious character patterns
if _has_suspicious_encoding(content):
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="prompt_injection_detection",
reason="Suspicious encoding detected",
confidence=0.7
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="prompt_injection_detection"
)
def _has_suspicious_encoding(content: str) -> bool:
"""Check for suspicious encoding patterns"""
# High concentration of special characters
special_ratio = len(re.findall(r'[^\w\s]', content)) / max(len(content), 1)
if special_ratio > 0.3:
return True
# Unicode escape sequences
if re.search(r'\\u[0-9a-fA-F]{4}', content):
return True
# Excessive whitespace manipulation
if re.search(r'\s{10,}', content):
return True
return FalseInput Length and Content Limits
async def check_input_limits(content: str, context: Dict = None) -> GuardrailResult:
"""Enforce input length and content limits"""
config = context.get("limits_config", {}) if context else {}
max_length = config.get("max_length", 10000)
max_lines = config.get("max_lines", 500)
max_words = config.get("max_words", 2000)
issues = []
# Length check
if len(content) > max_length:
issues.append(f"Content exceeds maximum length of {max_length} characters")
# Line count
line_count = content.count('\n') + 1
if line_count > max_lines:
issues.append(f"Content exceeds maximum of {max_lines} lines")
# Word count
word_count = len(content.split())
if word_count > max_words:
issues.append(f"Content exceeds maximum of {max_words} words")
if issues:
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="input_limits",
reason="; ".join(issues),
metadata={
"length": len(content),
"lines": line_count,
"words": word_count
}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="input_limits"
)
async def check_pii_in_input(content: str, context: Dict = None) -> GuardrailResult:
"""Detect and optionally redact PII in input"""
pii_patterns = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b(?:\+1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b",
"ip_address": r"\b(?:\d{1,3}\.){3}\d{1,3}\b"
}
found_pii = {}
redacted_content = content
for pii_type, pattern in pii_patterns.items():
matches = re.findall(pattern, content)
if matches:
found_pii[pii_type] = len(matches)
# Redact if configured
if context and context.get("redact_pii", False):
redacted_content = re.sub(
pattern,
f"[REDACTED_{pii_type.upper()}]",
redacted_content
)
if found_pii:
action = GuardrailAction.MODIFY if context and context.get("redact_pii") else GuardrailAction.FLAG
return GuardrailResult(
passed=False,
action=action,
guardrail_name="pii_detection",
reason=f"PII detected: {', '.join(found_pii.keys())}",
modified_content=redacted_content if action == GuardrailAction.MODIFY else None,
metadata={"pii_found": found_pii}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="pii_detection"
)Output Filtering Guardrails
Content Safety Classifier
# output_guardrails.py
from typing import Dict, List
import aiohttp
class ContentSafetyClassifier:
"""ML-based content safety classification"""
CATEGORIES = [
"hate_speech",
"violence",
"self_harm",
"sexual_content",
"harassment",
"dangerous_content"
]
def __init__(self, api_endpoint: str, api_key: str):
self.api_endpoint = api_endpoint
self.api_key = api_key
async def classify(self, content: str) -> Dict:
"""Classify content for safety issues"""
async with aiohttp.ClientSession() as session:
async with session.post(
self.api_endpoint,
headers={"Authorization": f"Bearer {self.api_key}"},
json={"content": content}
) as response:
return await response.json()
async def check_content_safety(content: str, context: Dict = None) -> GuardrailResult:
"""Check output content for safety issues"""
# Keyword-based quick check first
harmful_keywords = {
"violence": ["kill", "murder", "attack", "weapon", "bomb"],
"self_harm": ["suicide", "self-harm", "hurt myself"],
"dangerous": ["how to make", "instructions for", "step by step to create"]
}
flagged_categories = []
for category, keywords in harmful_keywords.items():
content_lower = content.lower()
for keyword in keywords:
if keyword in content_lower:
flagged_categories.append(category)
break
# If keywords found, do deeper analysis
if flagged_categories:
# In production, call ML classifier here
# classifier = ContentSafetyClassifier(...)
# result = await classifier.classify(content)
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="content_safety",
reason=f"Content flagged for: {', '.join(flagged_categories)}",
confidence=0.8,
metadata={"categories": flagged_categories}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="content_safety"
)
async def check_hallucination_indicators(
content: str,
context: Dict = None
) -> GuardrailResult:
"""Detect potential hallucination indicators"""
indicators = []
# Check for confident claims about uncertain things
uncertainty_phrases = [
"I'm not sure",
"I don't have information",
"I cannot verify",
"As of my knowledge cutoff"
]
certainty_phrases = [
"definitely",
"certainly",
"absolutely",
"without a doubt",
"100%"
]
# Contradiction detection
has_uncertainty = any(phrase.lower() in content.lower() for phrase in uncertainty_phrases)
has_certainty = any(phrase.lower() in content.lower() for phrase in certainty_phrases)
if has_uncertainty and has_certainty:
indicators.append("contradictory_certainty")
# Check for fabricated citations
fake_citation_patterns = [
r"according to a \d{4} study",
r"research from \w+ University shows",
r"Dr\. \w+ \w+ stated"
]
for pattern in fake_citation_patterns:
if re.search(pattern, content):
indicators.append("potential_fabricated_citation")
break
# Check for specific numbers that might be hallucinated
if re.search(r"\d{1,2}\.\d{1,2}%", content):
indicators.append("specific_statistics")
if indicators:
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="hallucination_detection",
reason=f"Potential hallucination indicators: {', '.join(indicators)}",
confidence=0.6,
metadata={"indicators": indicators}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="hallucination_detection"
)Topic and Scope Enforcement
async def check_topic_adherence(content: str, context: Dict = None) -> GuardrailResult:
"""Ensure response stays within allowed topics"""
allowed_topics = context.get("allowed_topics", []) if context else []
blocked_topics = context.get("blocked_topics", []) if context else []
if not allowed_topics and not blocked_topics:
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="topic_adherence"
)
content_lower = content.lower()
# Check blocked topics
for topic in blocked_topics:
topic_keywords = topic.get("keywords", [])
for keyword in topic_keywords:
if keyword.lower() in content_lower:
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="topic_adherence",
reason=f"Response contains blocked topic: {topic.get('name')}",
metadata={"blocked_topic": topic.get("name")}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="topic_adherence"
)
async def check_response_format(content: str, context: Dict = None) -> GuardrailResult:
"""Validate response follows expected format"""
expected_format = context.get("expected_format") if context else None
if not expected_format:
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="response_format"
)
issues = []
if expected_format == "json":
try:
import json
json.loads(content)
except json.JSONDecodeError as e:
issues.append(f"Invalid JSON: {str(e)}")
elif expected_format == "markdown":
# Basic markdown validation
if not re.search(r'(^#|\*\*|__|```)', content, re.MULTILINE):
issues.append("Expected markdown formatting not found")
elif expected_format == "bullet_list":
if not re.search(r'^[\-\*•]\s', content, re.MULTILINE):
issues.append("Expected bullet list format")
if issues:
return GuardrailResult(
passed=False,
action=GuardrailAction.FLAG,
guardrail_name="response_format",
reason="; ".join(issues),
metadata={"expected_format": expected_format}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="response_format"
)Real-Time Moderation
Streaming Content Moderation
# streaming_moderation.py
from typing import AsyncGenerator, Tuple
class StreamingModerator:
"""Real-time moderation for streaming LLM responses"""
def __init__(self, guardrails: GuardrailsEngine):
self.guardrails = guardrails
self.buffer = ""
self.buffer_size = 100 # Characters to buffer before checking
async def moderate_stream(
self,
token_stream: AsyncGenerator[str, None]
) -> AsyncGenerator[Tuple[str, bool], None]:
"""Moderate streaming tokens in real-time"""
async for token in token_stream:
self.buffer += token
# Check when buffer reaches threshold
if len(self.buffer) >= self.buffer_size:
results = await self.guardrails.check_output(self.buffer)
blocking = self.guardrails.get_blocking_result(results)
if blocking:
# Stop streaming and return blocked status
yield ("", False)
return
# Emit buffered content
yield (self.buffer, True)
self.buffer = ""
# Emit remaining buffer
if self.buffer:
results = await self.guardrails.check_output(self.buffer)
blocking = self.guardrails.get_blocking_result(results)
if blocking:
yield ("", False)
else:
yield (self.buffer, True)
class IncrementalModerator:
"""Incremental moderation with lookback"""
def __init__(self, window_size: int = 500):
self.window_size = window_size
self.full_response = ""
async def check_increment(
self,
new_content: str,
context: Dict = None
) -> GuardrailResult:
"""Check new content increment with context window"""
self.full_response += new_content
# Get window of recent content
window_start = max(0, len(self.full_response) - self.window_size)
check_content = self.full_response[window_start:]
# Quick pattern checks on increment
dangerous_patterns = [
r"rm\s+-rf",
r"DROP\s+TABLE",
r"<script>",
r"eval\(",
r"exec\("
]
for pattern in dangerous_patterns:
if re.search(pattern, check_content, re.IGNORECASE):
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
guardrail_name="incremental_moderation",
reason=f"Dangerous pattern detected",
metadata={"pattern": pattern}
)
return GuardrailResult(
passed=True,
action=GuardrailAction.ALLOW,
guardrail_name="incremental_moderation"
)Integration Example
Complete Guardrails Pipeline
# guardrails_pipeline.py
async def create_guardrailed_completion(
prompt: str,
system_prompt: str,
llm_client,
guardrails: GuardrailsEngine,
context: Dict = None
) -> Dict:
"""Complete LLM call with guardrails"""
# Input guardrails
input_results = await guardrails.check_input(prompt, context)
blocking = guardrails.get_blocking_result(input_results)
if blocking:
return {
"success": False,
"blocked": True,
"stage": "input",
"reason": blocking.reason,
"response": None
}
# Get potentially modified input
modified_prompt = prompt
for result in input_results:
if result.modified_content:
modified_prompt = result.modified_content
# Call LLM
try:
response = await llm_client.generate(
system_prompt=system_prompt,
user_prompt=modified_prompt
)
except Exception as e:
return {
"success": False,
"blocked": False,
"stage": "llm",
"reason": str(e),
"response": None
}
# Output guardrails
output_results = await guardrails.check_output(response, context)
blocking = guardrails.get_blocking_result(output_results)
if blocking:
return {
"success": False,
"blocked": True,
"stage": "output",
"reason": blocking.reason,
"response": None
}
# Get potentially modified output
final_response = response
for result in output_results:
if result.modified_content:
final_response = result.modified_content
# Check for flags that need attention
flags = [r for r in input_results + output_results if r.action == GuardrailAction.FLAG]
return {
"success": True,
"blocked": False,
"response": final_response,
"flags": [{"name": f.guardrail_name, "reason": f.reason} for f in flags],
"input_modified": modified_prompt != prompt,
"output_modified": final_response != response
}
# Usage example
async def main():
# Initialize guardrails engine
guardrails = GuardrailsEngine()
# Add input guardrails
guardrails.add_guardrail(GuardrailConfig(
name="prompt_injection",
guardrail_type=GuardrailType.INPUT,
enabled=True,
check_function=check_prompt_injection,
action_on_fail=GuardrailAction.BLOCK,
priority=10
))
guardrails.add_guardrail(GuardrailConfig(
name="pii_detection",
guardrail_type=GuardrailType.INPUT,
enabled=True,
check_function=check_pii_in_input,
action_on_fail=GuardrailAction.MODIFY,
priority=20
))
# Add output guardrails
guardrails.add_guardrail(GuardrailConfig(
name="content_safety",
guardrail_type=GuardrailType.OUTPUT,
enabled=True,
check_function=check_content_safety,
action_on_fail=GuardrailAction.BLOCK,
priority=10
))
guardrails.add_guardrail(GuardrailConfig(
name="hallucination_check",
guardrail_type=GuardrailType.OUTPUT,
enabled=True,
check_function=check_hallucination_indicators,
action_on_fail=GuardrailAction.FLAG,
priority=20
))
# Use in application
result = await create_guardrailed_completion(
prompt="User question here",
system_prompt="You are a helpful assistant",
llm_client=my_llm_client,
guardrails=guardrails,
context={"redact_pii": True}
)
print(result)Summary
Effective LLM guardrails require:
- Input validation: Block prompt injection and validate content
- Output filtering: Check for harmful content and hallucinations
- Real-time moderation: Handle streaming responses safely
- Layered defense: Multiple guardrails with different priorities
- Configurable actions: Block, modify, flag, or escalate
- Comprehensive logging: Track all guardrail decisions
Deploy guardrails as the first and last line of defense for all LLM interactions to ensure safe, reliable AI applications.
Is your AI system compliant with the EU AI Act? Free risk assessment - find out in 2 minutes →