AI Security

LLM Guardrails Implementation: Building Safe AI Applications

Nicu Constantin
10 min read
#LLM guardrails#AI safety#content moderation#input validation#AI security

Guardrails are essential for deploying LLM applications safely in production. This guide covers comprehensive guardrail implementation including input validation, output filtering, and real-time content moderation.

Guardrails Architecture

Core Guardrail Framework

# guardrails_framework.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
from datetime import datetime
import re
import asyncio
 
class GuardrailAction(Enum):
    ALLOW = "allow"
    BLOCK = "block"
    MODIFY = "modify"
    FLAG = "flag"
    ESCALATE = "escalate"
 
class GuardrailType(Enum):
    INPUT = "input"
    OUTPUT = "output"
    CONTEXT = "context"
 
@dataclass
class GuardrailResult:
    passed: bool
    action: GuardrailAction
    guardrail_name: str
    reason: Optional[str] = None
    modified_content: Optional[str] = None
    confidence: float = 1.0
    metadata: Dict = field(default_factory=dict)
 
@dataclass
class GuardrailConfig:
    name: str
    guardrail_type: GuardrailType
    enabled: bool
    check_function: Callable
    action_on_fail: GuardrailAction
    priority: int = 50
    timeout_ms: int = 5000
 
class GuardrailsEngine:
    """Core guardrails engine for LLM applications"""
 
    def __init__(self, config: Dict = None):
        self.config = config or {}
        self.input_guardrails: List[GuardrailConfig] = []
        self.output_guardrails: List[GuardrailConfig] = []
        self.context_guardrails: List[GuardrailConfig] = []
        self.results_log = []
 
    def add_guardrail(self, guardrail: GuardrailConfig):
        """Add a guardrail to the engine"""
        if guardrail.guardrail_type == GuardrailType.INPUT:
            self.input_guardrails.append(guardrail)
            self.input_guardrails.sort(key=lambda g: g.priority)
        elif guardrail.guardrail_type == GuardrailType.OUTPUT:
            self.output_guardrails.append(guardrail)
            self.output_guardrails.sort(key=lambda g: g.priority)
        else:
            self.context_guardrails.append(guardrail)
            self.context_guardrails.sort(key=lambda g: g.priority)
 
    async def check_input(self, content: str, context: Dict = None) -> List[GuardrailResult]:
        """Run all input guardrails"""
        return await self._run_guardrails(
            self.input_guardrails, content, context
        )
 
    async def check_output(self, content: str, context: Dict = None) -> List[GuardrailResult]:
        """Run all output guardrails"""
        return await self._run_guardrails(
            self.output_guardrails, content, context
        )
 
    async def _run_guardrails(
        self,
        guardrails: List[GuardrailConfig],
        content: str,
        context: Dict = None
    ) -> List[GuardrailResult]:
        """Execute guardrails in priority order"""
        results = []
        current_content = content
 
        for guardrail in guardrails:
            if not guardrail.enabled:
                continue
 
            try:
                result = await asyncio.wait_for(
                    guardrail.check_function(current_content, context),
                    timeout=guardrail.timeout_ms / 1000
                )
 
                results.append(result)
 
                # Handle blocking
                if not result.passed and guardrail.action_on_fail == GuardrailAction.BLOCK:
                    break
 
                # Handle modification
                if result.modified_content:
                    current_content = result.modified_content
 
            except asyncio.TimeoutError:
                results.append(GuardrailResult(
                    passed=False,
                    action=GuardrailAction.FLAG,
                    guardrail_name=guardrail.name,
                    reason="Guardrail timeout"
                ))
 
            except Exception as e:
                results.append(GuardrailResult(
                    passed=False,
                    action=GuardrailAction.FLAG,
                    guardrail_name=guardrail.name,
                    reason=f"Guardrail error: {str(e)}"
                ))
 
        self._log_results(content, results)
        return results
 
    def _log_results(self, content: str, results: List[GuardrailResult]):
        """Log guardrail execution results"""
        self.results_log.append({
            "timestamp": datetime.utcnow().isoformat(),
            "content_preview": content[:100],
            "results": [
                {
                    "name": r.guardrail_name,
                    "passed": r.passed,
                    "action": r.action.value,
                    "reason": r.reason
                }
                for r in results
            ]
        })
 
    def get_blocking_result(self, results: List[GuardrailResult]) -> Optional[GuardrailResult]:
        """Get first blocking result if any"""
        for result in results:
            if not result.passed and result.action == GuardrailAction.BLOCK:
                return result
        return None

Input Validation Guardrails

Prompt Injection Detection

# input_guardrails.py
import re
from typing import Dict, Optional
 
async def check_prompt_injection(content: str, context: Dict = None) -> GuardrailResult:
    """Detect prompt injection attempts"""
 
    injection_patterns = [
        # Direct instruction override
        r"ignore\s+(all\s+)?(previous|above|prior)\s+(instructions?|prompts?)",
        r"disregard\s+(all\s+)?(previous|above|prior)",
        r"forget\s+(everything|all)\s+(above|before)",
 
        # Role manipulation
        r"you\s+are\s+now\s+(a|an|the)",
        r"act\s+as\s+(if\s+you\s+are\s+)?(a|an|the)",
        r"pretend\s+(to\s+be|you\s+are)",
        r"roleplay\s+as",
 
        # System prompt extraction
        r"(what|reveal|show|tell)\s+(is|me)\s+(your|the)\s+system\s+prompt",
        r"print\s+(your\s+)?instructions",
        r"output\s+(your\s+)?system\s+(message|prompt)",
 
        # Jailbreak keywords
        r"(DAN|STAN|DUDE)\s*mode",
        r"developer\s+mode",
        r"jailbreak",
        r"bypass\s+(safety|restrictions|filters)",
 
        # Delimiter attacks
        r"```\s*(system|assistant|user)\s*\n",
        r"\[INST\]|\[/INST\]",
        r"<\|im_(start|end)\|>",
 
        # Encoding attempts
        r"base64\s*[:=]",
        r"decode\s+(this|the\s+following)",
        r"execute\s+(this|the\s+following)"
    ]
 
    content_lower = content.lower()
 
    for pattern in injection_patterns:
        if re.search(pattern, content_lower, re.IGNORECASE):
            return GuardrailResult(
                passed=False,
                action=GuardrailAction.BLOCK,
                guardrail_name="prompt_injection_detection",
                reason=f"Potential prompt injection detected",
                confidence=0.9,
                metadata={"pattern_matched": pattern}
            )
 
    # Check for suspicious character patterns
    if _has_suspicious_encoding(content):
        return GuardrailResult(
            passed=False,
            action=GuardrailAction.FLAG,
            guardrail_name="prompt_injection_detection",
            reason="Suspicious encoding detected",
            confidence=0.7
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="prompt_injection_detection"
    )
 
def _has_suspicious_encoding(content: str) -> bool:
    """Check for suspicious encoding patterns"""
    # High concentration of special characters
    special_ratio = len(re.findall(r'[^\w\s]', content)) / max(len(content), 1)
    if special_ratio > 0.3:
        return True
 
    # Unicode escape sequences
    if re.search(r'\\u[0-9a-fA-F]{4}', content):
        return True
 
    # Excessive whitespace manipulation
    if re.search(r'\s{10,}', content):
        return True
 
    return False

Input Length and Content Limits

async def check_input_limits(content: str, context: Dict = None) -> GuardrailResult:
    """Enforce input length and content limits"""
 
    config = context.get("limits_config", {}) if context else {}
    max_length = config.get("max_length", 10000)
    max_lines = config.get("max_lines", 500)
    max_words = config.get("max_words", 2000)
 
    issues = []
 
    # Length check
    if len(content) > max_length:
        issues.append(f"Content exceeds maximum length of {max_length} characters")
 
    # Line count
    line_count = content.count('\n') + 1
    if line_count > max_lines:
        issues.append(f"Content exceeds maximum of {max_lines} lines")
 
    # Word count
    word_count = len(content.split())
    if word_count > max_words:
        issues.append(f"Content exceeds maximum of {max_words} words")
 
    if issues:
        return GuardrailResult(
            passed=False,
            action=GuardrailAction.BLOCK,
            guardrail_name="input_limits",
            reason="; ".join(issues),
            metadata={
                "length": len(content),
                "lines": line_count,
                "words": word_count
            }
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="input_limits"
    )
 
 
async def check_pii_in_input(content: str, context: Dict = None) -> GuardrailResult:
    """Detect and optionally redact PII in input"""
 
    pii_patterns = {
        "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
        "credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
        "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
        "phone": r"\b(?:\+1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b",
        "ip_address": r"\b(?:\d{1,3}\.){3}\d{1,3}\b"
    }
 
    found_pii = {}
    redacted_content = content
 
    for pii_type, pattern in pii_patterns.items():
        matches = re.findall(pattern, content)
        if matches:
            found_pii[pii_type] = len(matches)
 
            # Redact if configured
            if context and context.get("redact_pii", False):
                redacted_content = re.sub(
                    pattern,
                    f"[REDACTED_{pii_type.upper()}]",
                    redacted_content
                )
 
    if found_pii:
        action = GuardrailAction.MODIFY if context and context.get("redact_pii") else GuardrailAction.FLAG
 
        return GuardrailResult(
            passed=False,
            action=action,
            guardrail_name="pii_detection",
            reason=f"PII detected: {', '.join(found_pii.keys())}",
            modified_content=redacted_content if action == GuardrailAction.MODIFY else None,
            metadata={"pii_found": found_pii}
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="pii_detection"
    )

Output Filtering Guardrails

Content Safety Classifier

# output_guardrails.py
from typing import Dict, List
import aiohttp
 
class ContentSafetyClassifier:
    """ML-based content safety classification"""
 
    CATEGORIES = [
        "hate_speech",
        "violence",
        "self_harm",
        "sexual_content",
        "harassment",
        "dangerous_content"
    ]
 
    def __init__(self, api_endpoint: str, api_key: str):
        self.api_endpoint = api_endpoint
        self.api_key = api_key
 
    async def classify(self, content: str) -> Dict:
        """Classify content for safety issues"""
        async with aiohttp.ClientSession() as session:
            async with session.post(
                self.api_endpoint,
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={"content": content}
            ) as response:
                return await response.json()
 
 
async def check_content_safety(content: str, context: Dict = None) -> GuardrailResult:
    """Check output content for safety issues"""
 
    # Keyword-based quick check first
    harmful_keywords = {
        "violence": ["kill", "murder", "attack", "weapon", "bomb"],
        "self_harm": ["suicide", "self-harm", "hurt myself"],
        "dangerous": ["how to make", "instructions for", "step by step to create"]
    }
 
    flagged_categories = []
    for category, keywords in harmful_keywords.items():
        content_lower = content.lower()
        for keyword in keywords:
            if keyword in content_lower:
                flagged_categories.append(category)
                break
 
    # If keywords found, do deeper analysis
    if flagged_categories:
        # In production, call ML classifier here
        # classifier = ContentSafetyClassifier(...)
        # result = await classifier.classify(content)
 
        return GuardrailResult(
            passed=False,
            action=GuardrailAction.FLAG,
            guardrail_name="content_safety",
            reason=f"Content flagged for: {', '.join(flagged_categories)}",
            confidence=0.8,
            metadata={"categories": flagged_categories}
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="content_safety"
    )
 
 
async def check_hallucination_indicators(
    content: str,
    context: Dict = None
) -> GuardrailResult:
    """Detect potential hallucination indicators"""
 
    indicators = []
 
    # Check for confident claims about uncertain things
    uncertainty_phrases = [
        "I'm not sure",
        "I don't have information",
        "I cannot verify",
        "As of my knowledge cutoff"
    ]
 
    certainty_phrases = [
        "definitely",
        "certainly",
        "absolutely",
        "without a doubt",
        "100%"
    ]
 
    # Contradiction detection
    has_uncertainty = any(phrase.lower() in content.lower() for phrase in uncertainty_phrases)
    has_certainty = any(phrase.lower() in content.lower() for phrase in certainty_phrases)
 
    if has_uncertainty and has_certainty:
        indicators.append("contradictory_certainty")
 
    # Check for fabricated citations
    fake_citation_patterns = [
        r"according to a \d{4} study",
        r"research from \w+ University shows",
        r"Dr\. \w+ \w+ stated"
    ]
 
    for pattern in fake_citation_patterns:
        if re.search(pattern, content):
            indicators.append("potential_fabricated_citation")
            break
 
    # Check for specific numbers that might be hallucinated
    if re.search(r"\d{1,2}\.\d{1,2}%", content):
        indicators.append("specific_statistics")
 
    if indicators:
        return GuardrailResult(
            passed=False,
            action=GuardrailAction.FLAG,
            guardrail_name="hallucination_detection",
            reason=f"Potential hallucination indicators: {', '.join(indicators)}",
            confidence=0.6,
            metadata={"indicators": indicators}
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="hallucination_detection"
    )

Topic and Scope Enforcement

async def check_topic_adherence(content: str, context: Dict = None) -> GuardrailResult:
    """Ensure response stays within allowed topics"""
 
    allowed_topics = context.get("allowed_topics", []) if context else []
    blocked_topics = context.get("blocked_topics", []) if context else []
 
    if not allowed_topics and not blocked_topics:
        return GuardrailResult(
            passed=True,
            action=GuardrailAction.ALLOW,
            guardrail_name="topic_adherence"
        )
 
    content_lower = content.lower()
 
    # Check blocked topics
    for topic in blocked_topics:
        topic_keywords = topic.get("keywords", [])
        for keyword in topic_keywords:
            if keyword.lower() in content_lower:
                return GuardrailResult(
                    passed=False,
                    action=GuardrailAction.BLOCK,
                    guardrail_name="topic_adherence",
                    reason=f"Response contains blocked topic: {topic.get('name')}",
                    metadata={"blocked_topic": topic.get("name")}
                )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="topic_adherence"
    )
 
 
async def check_response_format(content: str, context: Dict = None) -> GuardrailResult:
    """Validate response follows expected format"""
 
    expected_format = context.get("expected_format") if context else None
 
    if not expected_format:
        return GuardrailResult(
            passed=True,
            action=GuardrailAction.ALLOW,
            guardrail_name="response_format"
        )
 
    issues = []
 
    if expected_format == "json":
        try:
            import json
            json.loads(content)
        except json.JSONDecodeError as e:
            issues.append(f"Invalid JSON: {str(e)}")
 
    elif expected_format == "markdown":
        # Basic markdown validation
        if not re.search(r'(^#|\*\*|__|```)', content, re.MULTILINE):
            issues.append("Expected markdown formatting not found")
 
    elif expected_format == "bullet_list":
        if not re.search(r'^[\-\*•]\s', content, re.MULTILINE):
            issues.append("Expected bullet list format")
 
    if issues:
        return GuardrailResult(
            passed=False,
            action=GuardrailAction.FLAG,
            guardrail_name="response_format",
            reason="; ".join(issues),
            metadata={"expected_format": expected_format}
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="response_format"
    )

Real-Time Moderation

Streaming Content Moderation

# streaming_moderation.py
from typing import AsyncGenerator, Tuple
 
class StreamingModerator:
    """Real-time moderation for streaming LLM responses"""
 
    def __init__(self, guardrails: GuardrailsEngine):
        self.guardrails = guardrails
        self.buffer = ""
        self.buffer_size = 100  # Characters to buffer before checking
 
    async def moderate_stream(
        self,
        token_stream: AsyncGenerator[str, None]
    ) -> AsyncGenerator[Tuple[str, bool], None]:
        """Moderate streaming tokens in real-time"""
 
        async for token in token_stream:
            self.buffer += token
 
            # Check when buffer reaches threshold
            if len(self.buffer) >= self.buffer_size:
                results = await self.guardrails.check_output(self.buffer)
                blocking = self.guardrails.get_blocking_result(results)
 
                if blocking:
                    # Stop streaming and return blocked status
                    yield ("", False)
                    return
 
                # Emit buffered content
                yield (self.buffer, True)
                self.buffer = ""
 
        # Emit remaining buffer
        if self.buffer:
            results = await self.guardrails.check_output(self.buffer)
            blocking = self.guardrails.get_blocking_result(results)
 
            if blocking:
                yield ("", False)
            else:
                yield (self.buffer, True)
 
 
class IncrementalModerator:
    """Incremental moderation with lookback"""
 
    def __init__(self, window_size: int = 500):
        self.window_size = window_size
        self.full_response = ""
 
    async def check_increment(
        self,
        new_content: str,
        context: Dict = None
    ) -> GuardrailResult:
        """Check new content increment with context window"""
 
        self.full_response += new_content
 
        # Get window of recent content
        window_start = max(0, len(self.full_response) - self.window_size)
        check_content = self.full_response[window_start:]
 
        # Quick pattern checks on increment
        dangerous_patterns = [
            r"rm\s+-rf",
            r"DROP\s+TABLE",
            r"<script>",
            r"eval\(",
            r"exec\("
        ]
 
        for pattern in dangerous_patterns:
            if re.search(pattern, check_content, re.IGNORECASE):
                return GuardrailResult(
                    passed=False,
                    action=GuardrailAction.BLOCK,
                    guardrail_name="incremental_moderation",
                    reason=f"Dangerous pattern detected",
                    metadata={"pattern": pattern}
                )
 
        return GuardrailResult(
            passed=True,
            action=GuardrailAction.ALLOW,
            guardrail_name="incremental_moderation"
        )

Integration Example

Complete Guardrails Pipeline

# guardrails_pipeline.py
async def create_guardrailed_completion(
    prompt: str,
    system_prompt: str,
    llm_client,
    guardrails: GuardrailsEngine,
    context: Dict = None
) -> Dict:
    """Complete LLM call with guardrails"""
 
    # Input guardrails
    input_results = await guardrails.check_input(prompt, context)
    blocking = guardrails.get_blocking_result(input_results)
 
    if blocking:
        return {
            "success": False,
            "blocked": True,
            "stage": "input",
            "reason": blocking.reason,
            "response": None
        }
 
    # Get potentially modified input
    modified_prompt = prompt
    for result in input_results:
        if result.modified_content:
            modified_prompt = result.modified_content
 
    # Call LLM
    try:
        response = await llm_client.generate(
            system_prompt=system_prompt,
            user_prompt=modified_prompt
        )
    except Exception as e:
        return {
            "success": False,
            "blocked": False,
            "stage": "llm",
            "reason": str(e),
            "response": None
        }
 
    # Output guardrails
    output_results = await guardrails.check_output(response, context)
    blocking = guardrails.get_blocking_result(output_results)
 
    if blocking:
        return {
            "success": False,
            "blocked": True,
            "stage": "output",
            "reason": blocking.reason,
            "response": None
        }
 
    # Get potentially modified output
    final_response = response
    for result in output_results:
        if result.modified_content:
            final_response = result.modified_content
 
    # Check for flags that need attention
    flags = [r for r in input_results + output_results if r.action == GuardrailAction.FLAG]
 
    return {
        "success": True,
        "blocked": False,
        "response": final_response,
        "flags": [{"name": f.guardrail_name, "reason": f.reason} for f in flags],
        "input_modified": modified_prompt != prompt,
        "output_modified": final_response != response
    }
 
 
# Usage example
async def main():
    # Initialize guardrails engine
    guardrails = GuardrailsEngine()
 
    # Add input guardrails
    guardrails.add_guardrail(GuardrailConfig(
        name="prompt_injection",
        guardrail_type=GuardrailType.INPUT,
        enabled=True,
        check_function=check_prompt_injection,
        action_on_fail=GuardrailAction.BLOCK,
        priority=10
    ))
 
    guardrails.add_guardrail(GuardrailConfig(
        name="pii_detection",
        guardrail_type=GuardrailType.INPUT,
        enabled=True,
        check_function=check_pii_in_input,
        action_on_fail=GuardrailAction.MODIFY,
        priority=20
    ))
 
    # Add output guardrails
    guardrails.add_guardrail(GuardrailConfig(
        name="content_safety",
        guardrail_type=GuardrailType.OUTPUT,
        enabled=True,
        check_function=check_content_safety,
        action_on_fail=GuardrailAction.BLOCK,
        priority=10
    ))
 
    guardrails.add_guardrail(GuardrailConfig(
        name="hallucination_check",
        guardrail_type=GuardrailType.OUTPUT,
        enabled=True,
        check_function=check_hallucination_indicators,
        action_on_fail=GuardrailAction.FLAG,
        priority=20
    ))
 
    # Use in application
    result = await create_guardrailed_completion(
        prompt="User question here",
        system_prompt="You are a helpful assistant",
        llm_client=my_llm_client,
        guardrails=guardrails,
        context={"redact_pii": True}
    )
 
    print(result)

Summary

Effective LLM guardrails require:

  1. Input validation: Block prompt injection and validate content
  2. Output filtering: Check for harmful content and hallucinations
  3. Real-time moderation: Handle streaming responses safely
  4. Layered defense: Multiple guardrails with different priorities
  5. Configurable actions: Block, modify, flag, or escalate
  6. Comprehensive logging: Track all guardrail decisions

Deploy guardrails as the first and last line of defense for all LLM interactions to ensure safe, reliable AI applications.


Is your AI system compliant with the EU AI Act? Free risk assessment - find out in 2 minutes →

Need help with EU AI Act compliance or AI security?

Book a free 30-minute consultation. No commitment.

Book a Call

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.