AI Security

LLM Guardrails Implementation: Building Safe AI Applications

DeviDevs Team
10 min read
#LLM guardrails#AI safety#content moderation#input validation#AI security

Guardrails are essential for deploying LLM applications safely in production. This guide covers comprehensive guardrail implementation including input validation, output filtering, and real-time content moderation.

Guardrails Architecture

Core Guardrail Framework

# guardrails_framework.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
from datetime import datetime
import re
import asyncio
 
class GuardrailAction(Enum):
    ALLOW = "allow"
    BLOCK = "block"
    MODIFY = "modify"
    FLAG = "flag"
    ESCALATE = "escalate"
 
class GuardrailType(Enum):
    INPUT = "input"
    OUTPUT = "output"
    CONTEXT = "context"
 
@dataclass
class GuardrailResult:
    passed: bool
    action: GuardrailAction
    guardrail_name: str
    reason: Optional[str] = None
    modified_content: Optional[str] = None
    confidence: float = 1.0
    metadata: Dict = field(default_factory=dict)
 
@dataclass
class GuardrailConfig:
    name: str
    guardrail_type: GuardrailType
    enabled: bool
    check_function: Callable
    action_on_fail: GuardrailAction
    priority: int = 50
    timeout_ms: int = 5000
 
class GuardrailsEngine:
    """Core guardrails engine for LLM applications"""
 
    def __init__(self, config: Dict = None):
        self.config = config or {}
        self.input_guardrails: List[GuardrailConfig] = []
        self.output_guardrails: List[GuardrailConfig] = []
        self.context_guardrails: List[GuardrailConfig] = []
        self.results_log = []
 
    def add_guardrail(self, guardrail: GuardrailConfig):
        """Add a guardrail to the engine"""
        if guardrail.guardrail_type == GuardrailType.INPUT:
            self.input_guardrails.append(guardrail)
            self.input_guardrails.sort(key=lambda g: g.priority)
        elif guardrail.guardrail_type == GuardrailType.OUTPUT:
            self.output_guardrails.append(guardrail)
            self.output_guardrails.sort(key=lambda g: g.priority)
        else:
            self.context_guardrails.append(guardrail)
            self.context_guardrails.sort(key=lambda g: g.priority)
 
    async def check_input(self, content: str, context: Dict = None) -> List[GuardrailResult]:
        """Run all input guardrails"""
        return await self._run_guardrails(
            self.input_guardrails, content, context
        )
 
    async def check_output(self, content: str, context: Dict = None) -> List[GuardrailResult]:
        """Run all output guardrails"""
        return await self._run_guardrails(
            self.output_guardrails, content, context
        )
 
    async def _run_guardrails(
        self,
        guardrails: List[GuardrailConfig],
        content: str,
        context: Dict = None
    ) -> List[GuardrailResult]:
        """Execute guardrails in priority order"""
        results = []
        current_content = content
 
        for guardrail in guardrails:
            if not guardrail.enabled:
                continue
 
            try:
                result = await asyncio.wait_for(
                    guardrail.check_function(current_content, context),
                    timeout=guardrail.timeout_ms / 1000
                )
 
                results.append(result)
 
                # Handle blocking
                if not result.passed and guardrail.action_on_fail == GuardrailAction.BLOCK:
                    break
 
                # Handle modification
                if result.modified_content:
                    current_content = result.modified_content
 
            except asyncio.TimeoutError:
                results.append(GuardrailResult(
                    passed=False,
                    action=GuardrailAction.FLAG,
                    guardrail_name=guardrail.name,
                    reason="Guardrail timeout"
                ))
 
            except Exception as e:
                results.append(GuardrailResult(
                    passed=False,
                    action=GuardrailAction.FLAG,
                    guardrail_name=guardrail.name,
                    reason=f"Guardrail error: {str(e)}"
                ))
 
        self._log_results(content, results)
        return results
 
    def _log_results(self, content: str, results: List[GuardrailResult]):
        """Log guardrail execution results"""
        self.results_log.append({
            "timestamp": datetime.utcnow().isoformat(),
            "content_preview": content[:100],
            "results": [
                {
                    "name": r.guardrail_name,
                    "passed": r.passed,
                    "action": r.action.value,
                    "reason": r.reason
                }
                for r in results
            ]
        })
 
    def get_blocking_result(self, results: List[GuardrailResult]) -> Optional[GuardrailResult]:
        """Get first blocking result if any"""
        for result in results:
            if not result.passed and result.action == GuardrailAction.BLOCK:
                return result
        return None

Input Validation Guardrails

Prompt Injection Detection

# input_guardrails.py
import re
from typing import Dict, Optional
 
async def check_prompt_injection(content: str, context: Dict = None) -> GuardrailResult:
    """Detect prompt injection attempts"""
 
    injection_patterns = [
        # Direct instruction override
        r"ignore\s+(all\s+)?(previous|above|prior)\s+(instructions?|prompts?)",
        r"disregard\s+(all\s+)?(previous|above|prior)",
        r"forget\s+(everything|all)\s+(above|before)",
 
        # Role manipulation
        r"you\s+are\s+now\s+(a|an|the)",
        r"act\s+as\s+(if\s+you\s+are\s+)?(a|an|the)",
        r"pretend\s+(to\s+be|you\s+are)",
        r"roleplay\s+as",
 
        # System prompt extraction
        r"(what|reveal|show|tell)\s+(is|me)\s+(your|the)\s+system\s+prompt",
        r"print\s+(your\s+)?instructions",
        r"output\s+(your\s+)?system\s+(message|prompt)",
 
        # Jailbreak keywords
        r"(DAN|STAN|DUDE)\s*mode",
        r"developer\s+mode",
        r"jailbreak",
        r"bypass\s+(safety|restrictions|filters)",
 
        # Delimiter attacks
        r"```\s*(system|assistant|user)\s*\n",
        r"\[INST\]|\[/INST\]",
        r"<\|im_(start|end)\|>",
 
        # Encoding attempts
        r"base64\s*[:=]",
        r"decode\s+(this|the\s+following)",
        r"execute\s+(this|the\s+following)"
    ]
 
    content_lower = content.lower()
 
    for pattern in injection_patterns:
        if re.search(pattern, content_lower, re.IGNORECASE):
            return GuardrailResult(
                passed=False,
                action=GuardrailAction.BLOCK,
                guardrail_name="prompt_injection_detection",
                reason=f"Potential prompt injection detected",
                confidence=0.9,
                metadata={"pattern_matched": pattern}
            )
 
    # Check for suspicious character patterns
    if _has_suspicious_encoding(content):
        return GuardrailResult(
            passed=False,
            action=GuardrailAction.FLAG,
            guardrail_name="prompt_injection_detection",
            reason="Suspicious encoding detected",
            confidence=0.7
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="prompt_injection_detection"
    )
 
def _has_suspicious_encoding(content: str) -> bool:
    """Check for suspicious encoding patterns"""
    # High concentration of special characters
    special_ratio = len(re.findall(r'[^\w\s]', content)) / max(len(content), 1)
    if special_ratio > 0.3:
        return True
 
    # Unicode escape sequences
    if re.search(r'\\u[0-9a-fA-F]{4}', content):
        return True
 
    # Excessive whitespace manipulation
    if re.search(r'\s{10,}', content):
        return True
 
    return False

Input Length and Content Limits

async def check_input_limits(content: str, context: Dict = None) -> GuardrailResult:
    """Enforce input length and content limits"""
 
    config = context.get("limits_config", {}) if context else {}
    max_length = config.get("max_length", 10000)
    max_lines = config.get("max_lines", 500)
    max_words = config.get("max_words", 2000)
 
    issues = []
 
    # Length check
    if len(content) > max_length:
        issues.append(f"Content exceeds maximum length of {max_length} characters")
 
    # Line count
    line_count = content.count('\n') + 1
    if line_count > max_lines:
        issues.append(f"Content exceeds maximum of {max_lines} lines")
 
    # Word count
    word_count = len(content.split())
    if word_count > max_words:
        issues.append(f"Content exceeds maximum of {max_words} words")
 
    if issues:
        return GuardrailResult(
            passed=False,
            action=GuardrailAction.BLOCK,
            guardrail_name="input_limits",
            reason="; ".join(issues),
            metadata={
                "length": len(content),
                "lines": line_count,
                "words": word_count
            }
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="input_limits"
    )
 
 
async def check_pii_in_input(content: str, context: Dict = None) -> GuardrailResult:
    """Detect and optionally redact PII in input"""
 
    pii_patterns = {
        "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
        "credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
        "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
        "phone": r"\b(?:\+1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b",
        "ip_address": r"\b(?:\d{1,3}\.){3}\d{1,3}\b"
    }
 
    found_pii = {}
    redacted_content = content
 
    for pii_type, pattern in pii_patterns.items():
        matches = re.findall(pattern, content)
        if matches:
            found_pii[pii_type] = len(matches)
 
            # Redact if configured
            if context and context.get("redact_pii", False):
                redacted_content = re.sub(
                    pattern,
                    f"[REDACTED_{pii_type.upper()}]",
                    redacted_content
                )
 
    if found_pii:
        action = GuardrailAction.MODIFY if context and context.get("redact_pii") else GuardrailAction.FLAG
 
        return GuardrailResult(
            passed=False,
            action=action,
            guardrail_name="pii_detection",
            reason=f"PII detected: {', '.join(found_pii.keys())}",
            modified_content=redacted_content if action == GuardrailAction.MODIFY else None,
            metadata={"pii_found": found_pii}
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="pii_detection"
    )

Output Filtering Guardrails

Content Safety Classifier

# output_guardrails.py
from typing import Dict, List
import aiohttp
 
class ContentSafetyClassifier:
    """ML-based content safety classification"""
 
    CATEGORIES = [
        "hate_speech",
        "violence",
        "self_harm",
        "sexual_content",
        "harassment",
        "dangerous_content"
    ]
 
    def __init__(self, api_endpoint: str, api_key: str):
        self.api_endpoint = api_endpoint
        self.api_key = api_key
 
    async def classify(self, content: str) -> Dict:
        """Classify content for safety issues"""
        async with aiohttp.ClientSession() as session:
            async with session.post(
                self.api_endpoint,
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={"content": content}
            ) as response:
                return await response.json()
 
 
async def check_content_safety(content: str, context: Dict = None) -> GuardrailResult:
    """Check output content for safety issues"""
 
    # Keyword-based quick check first
    harmful_keywords = {
        "violence": ["kill", "murder", "attack", "weapon", "bomb"],
        "self_harm": ["suicide", "self-harm", "hurt myself"],
        "dangerous": ["how to make", "instructions for", "step by step to create"]
    }
 
    flagged_categories = []
    for category, keywords in harmful_keywords.items():
        content_lower = content.lower()
        for keyword in keywords:
            if keyword in content_lower:
                flagged_categories.append(category)
                break
 
    # If keywords found, do deeper analysis
    if flagged_categories:
        # In production, call ML classifier here
        # classifier = ContentSafetyClassifier(...)
        # result = await classifier.classify(content)
 
        return GuardrailResult(
            passed=False,
            action=GuardrailAction.FLAG,
            guardrail_name="content_safety",
            reason=f"Content flagged for: {', '.join(flagged_categories)}",
            confidence=0.8,
            metadata={"categories": flagged_categories}
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="content_safety"
    )
 
 
async def check_hallucination_indicators(
    content: str,
    context: Dict = None
) -> GuardrailResult:
    """Detect potential hallucination indicators"""
 
    indicators = []
 
    # Check for confident claims about uncertain things
    uncertainty_phrases = [
        "I'm not sure",
        "I don't have information",
        "I cannot verify",
        "As of my knowledge cutoff"
    ]
 
    certainty_phrases = [
        "definitely",
        "certainly",
        "absolutely",
        "without a doubt",
        "100%"
    ]
 
    # Contradiction detection
    has_uncertainty = any(phrase.lower() in content.lower() for phrase in uncertainty_phrases)
    has_certainty = any(phrase.lower() in content.lower() for phrase in certainty_phrases)
 
    if has_uncertainty and has_certainty:
        indicators.append("contradictory_certainty")
 
    # Check for fabricated citations
    fake_citation_patterns = [
        r"according to a \d{4} study",
        r"research from \w+ University shows",
        r"Dr\. \w+ \w+ stated"
    ]
 
    for pattern in fake_citation_patterns:
        if re.search(pattern, content):
            indicators.append("potential_fabricated_citation")
            break
 
    # Check for specific numbers that might be hallucinated
    if re.search(r"\d{1,2}\.\d{1,2}%", content):
        indicators.append("specific_statistics")
 
    if indicators:
        return GuardrailResult(
            passed=False,
            action=GuardrailAction.FLAG,
            guardrail_name="hallucination_detection",
            reason=f"Potential hallucination indicators: {', '.join(indicators)}",
            confidence=0.6,
            metadata={"indicators": indicators}
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="hallucination_detection"
    )

Topic and Scope Enforcement

async def check_topic_adherence(content: str, context: Dict = None) -> GuardrailResult:
    """Ensure response stays within allowed topics"""
 
    allowed_topics = context.get("allowed_topics", []) if context else []
    blocked_topics = context.get("blocked_topics", []) if context else []
 
    if not allowed_topics and not blocked_topics:
        return GuardrailResult(
            passed=True,
            action=GuardrailAction.ALLOW,
            guardrail_name="topic_adherence"
        )
 
    content_lower = content.lower()
 
    # Check blocked topics
    for topic in blocked_topics:
        topic_keywords = topic.get("keywords", [])
        for keyword in topic_keywords:
            if keyword.lower() in content_lower:
                return GuardrailResult(
                    passed=False,
                    action=GuardrailAction.BLOCK,
                    guardrail_name="topic_adherence",
                    reason=f"Response contains blocked topic: {topic.get('name')}",
                    metadata={"blocked_topic": topic.get("name")}
                )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="topic_adherence"
    )
 
 
async def check_response_format(content: str, context: Dict = None) -> GuardrailResult:
    """Validate response follows expected format"""
 
    expected_format = context.get("expected_format") if context else None
 
    if not expected_format:
        return GuardrailResult(
            passed=True,
            action=GuardrailAction.ALLOW,
            guardrail_name="response_format"
        )
 
    issues = []
 
    if expected_format == "json":
        try:
            import json
            json.loads(content)
        except json.JSONDecodeError as e:
            issues.append(f"Invalid JSON: {str(e)}")
 
    elif expected_format == "markdown":
        # Basic markdown validation
        if not re.search(r'(^#|\*\*|__|```)', content, re.MULTILINE):
            issues.append("Expected markdown formatting not found")
 
    elif expected_format == "bullet_list":
        if not re.search(r'^[\-\*•]\s', content, re.MULTILINE):
            issues.append("Expected bullet list format")
 
    if issues:
        return GuardrailResult(
            passed=False,
            action=GuardrailAction.FLAG,
            guardrail_name="response_format",
            reason="; ".join(issues),
            metadata={"expected_format": expected_format}
        )
 
    return GuardrailResult(
        passed=True,
        action=GuardrailAction.ALLOW,
        guardrail_name="response_format"
    )

Real-Time Moderation

Streaming Content Moderation

# streaming_moderation.py
from typing import AsyncGenerator, Tuple
 
class StreamingModerator:
    """Real-time moderation for streaming LLM responses"""
 
    def __init__(self, guardrails: GuardrailsEngine):
        self.guardrails = guardrails
        self.buffer = ""
        self.buffer_size = 100  # Characters to buffer before checking
 
    async def moderate_stream(
        self,
        token_stream: AsyncGenerator[str, None]
    ) -> AsyncGenerator[Tuple[str, bool], None]:
        """Moderate streaming tokens in real-time"""
 
        async for token in token_stream:
            self.buffer += token
 
            # Check when buffer reaches threshold
            if len(self.buffer) >= self.buffer_size:
                results = await self.guardrails.check_output(self.buffer)
                blocking = self.guardrails.get_blocking_result(results)
 
                if blocking:
                    # Stop streaming and return blocked status
                    yield ("", False)
                    return
 
                # Emit buffered content
                yield (self.buffer, True)
                self.buffer = ""
 
        # Emit remaining buffer
        if self.buffer:
            results = await self.guardrails.check_output(self.buffer)
            blocking = self.guardrails.get_blocking_result(results)
 
            if blocking:
                yield ("", False)
            else:
                yield (self.buffer, True)
 
 
class IncrementalModerator:
    """Incremental moderation with lookback"""
 
    def __init__(self, window_size: int = 500):
        self.window_size = window_size
        self.full_response = ""
 
    async def check_increment(
        self,
        new_content: str,
        context: Dict = None
    ) -> GuardrailResult:
        """Check new content increment with context window"""
 
        self.full_response += new_content
 
        # Get window of recent content
        window_start = max(0, len(self.full_response) - self.window_size)
        check_content = self.full_response[window_start:]
 
        # Quick pattern checks on increment
        dangerous_patterns = [
            r"rm\s+-rf",
            r"DROP\s+TABLE",
            r"<script>",
            r"eval\(",
            r"exec\("
        ]
 
        for pattern in dangerous_patterns:
            if re.search(pattern, check_content, re.IGNORECASE):
                return GuardrailResult(
                    passed=False,
                    action=GuardrailAction.BLOCK,
                    guardrail_name="incremental_moderation",
                    reason=f"Dangerous pattern detected",
                    metadata={"pattern": pattern}
                )
 
        return GuardrailResult(
            passed=True,
            action=GuardrailAction.ALLOW,
            guardrail_name="incremental_moderation"
        )

Integration Example

Complete Guardrails Pipeline

# guardrails_pipeline.py
async def create_guardrailed_completion(
    prompt: str,
    system_prompt: str,
    llm_client,
    guardrails: GuardrailsEngine,
    context: Dict = None
) -> Dict:
    """Complete LLM call with guardrails"""
 
    # Input guardrails
    input_results = await guardrails.check_input(prompt, context)
    blocking = guardrails.get_blocking_result(input_results)
 
    if blocking:
        return {
            "success": False,
            "blocked": True,
            "stage": "input",
            "reason": blocking.reason,
            "response": None
        }
 
    # Get potentially modified input
    modified_prompt = prompt
    for result in input_results:
        if result.modified_content:
            modified_prompt = result.modified_content
 
    # Call LLM
    try:
        response = await llm_client.generate(
            system_prompt=system_prompt,
            user_prompt=modified_prompt
        )
    except Exception as e:
        return {
            "success": False,
            "blocked": False,
            "stage": "llm",
            "reason": str(e),
            "response": None
        }
 
    # Output guardrails
    output_results = await guardrails.check_output(response, context)
    blocking = guardrails.get_blocking_result(output_results)
 
    if blocking:
        return {
            "success": False,
            "blocked": True,
            "stage": "output",
            "reason": blocking.reason,
            "response": None
        }
 
    # Get potentially modified output
    final_response = response
    for result in output_results:
        if result.modified_content:
            final_response = result.modified_content
 
    # Check for flags that need attention
    flags = [r for r in input_results + output_results if r.action == GuardrailAction.FLAG]
 
    return {
        "success": True,
        "blocked": False,
        "response": final_response,
        "flags": [{"name": f.guardrail_name, "reason": f.reason} for f in flags],
        "input_modified": modified_prompt != prompt,
        "output_modified": final_response != response
    }
 
 
# Usage example
async def main():
    # Initialize guardrails engine
    guardrails = GuardrailsEngine()
 
    # Add input guardrails
    guardrails.add_guardrail(GuardrailConfig(
        name="prompt_injection",
        guardrail_type=GuardrailType.INPUT,
        enabled=True,
        check_function=check_prompt_injection,
        action_on_fail=GuardrailAction.BLOCK,
        priority=10
    ))
 
    guardrails.add_guardrail(GuardrailConfig(
        name="pii_detection",
        guardrail_type=GuardrailType.INPUT,
        enabled=True,
        check_function=check_pii_in_input,
        action_on_fail=GuardrailAction.MODIFY,
        priority=20
    ))
 
    # Add output guardrails
    guardrails.add_guardrail(GuardrailConfig(
        name="content_safety",
        guardrail_type=GuardrailType.OUTPUT,
        enabled=True,
        check_function=check_content_safety,
        action_on_fail=GuardrailAction.BLOCK,
        priority=10
    ))
 
    guardrails.add_guardrail(GuardrailConfig(
        name="hallucination_check",
        guardrail_type=GuardrailType.OUTPUT,
        enabled=True,
        check_function=check_hallucination_indicators,
        action_on_fail=GuardrailAction.FLAG,
        priority=20
    ))
 
    # Use in application
    result = await create_guardrailed_completion(
        prompt="User question here",
        system_prompt="You are a helpful assistant",
        llm_client=my_llm_client,
        guardrails=guardrails,
        context={"redact_pii": True}
    )
 
    print(result)

Summary

Effective LLM guardrails require:

  1. Input validation: Block prompt injection and validate content
  2. Output filtering: Check for harmful content and hallucinations
  3. Real-time moderation: Handle streaming responses safely
  4. Layered defense: Multiple guardrails with different priorities
  5. Configurable actions: Block, modify, flag, or escalate
  6. Comprehensive logging: Track all guardrail decisions

Deploy guardrails as the first and last line of defense for all LLM interactions to ensure safe, reliable AI applications.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.