LLM Output Validation and Safety Filters: Building Reliable AI Applications
LLM outputs require careful validation before presenting to users or using in downstream systems. This guide covers comprehensive strategies for validating, filtering, and ensuring the safety of AI-generated content.
Content Moderation Pipeline
Multi-Layer Content Filter
# content_moderation.py
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from enum import Enum
import re
class ContentCategory(Enum):
"""Content categories for moderation."""
SAFE = "safe"
HATE_SPEECH = "hate_speech"
VIOLENCE = "violence"
SEXUAL = "sexual"
SELF_HARM = "self_harm"
HARASSMENT = "harassment"
DANGEROUS = "dangerous"
MISINFORMATION = "misinformation"
class ModerationAction(Enum):
"""Actions for moderated content."""
ALLOW = "allow"
WARN = "warn"
BLOCK = "block"
REVIEW = "review"
@dataclass
class ModerationResult:
"""Result of content moderation."""
original_content: str
filtered_content: Optional[str]
categories_detected: List[ContentCategory]
action: ModerationAction
confidence: float
details: Dict
class ContentModerator:
"""Multi-layer content moderation system."""
def __init__(self, config: Dict):
self.config = config
self.filters = self._initialize_filters()
def _initialize_filters(self) -> List:
"""Initialize moderation filters."""
return [
KeywordFilter(self.config.get('keyword_lists', {})),
PatternFilter(self.config.get('patterns', {})),
SemanticFilter(self.config.get('semantic_model')),
ContextualFilter(self.config.get('context_rules', {}))
]
def moderate(self, content: str, context: Optional[Dict] = None) -> ModerationResult:
"""Apply all moderation filters to content."""
detected_categories = []
highest_severity = 0
details = {}
for filter_instance in self.filters:
result = filter_instance.check(content, context)
detected_categories.extend(result['categories'])
highest_severity = max(highest_severity, result['severity'])
details[filter_instance.name] = result
# Deduplicate categories
detected_categories = list(set(detected_categories))
# Determine action based on severity
action = self._determine_action(highest_severity, detected_categories)
# Filter content if needed
filtered_content = None
if action != ModerationAction.BLOCK:
filtered_content = self._apply_filters(content, detected_categories)
return ModerationResult(
original_content=content,
filtered_content=filtered_content,
categories_detected=detected_categories,
action=action,
confidence=highest_severity,
details=details
)
def _determine_action(
self,
severity: float,
categories: List[ContentCategory]
) -> ModerationAction:
"""Determine moderation action based on severity and categories."""
# Always block certain categories
block_categories = {
ContentCategory.SELF_HARM,
ContentCategory.DANGEROUS
}
if any(cat in block_categories for cat in categories):
return ModerationAction.BLOCK
if severity >= 0.9:
return ModerationAction.BLOCK
elif severity >= 0.7:
return ModerationAction.REVIEW
elif severity >= 0.5:
return ModerationAction.WARN
return ModerationAction.ALLOW
def _apply_filters(
self,
content: str,
categories: List[ContentCategory]
) -> str:
"""Apply content filters and redactions."""
filtered = content
# Apply category-specific filters
for category in categories:
if category == ContentCategory.HATE_SPEECH:
filtered = self._redact_hate_speech(filtered)
elif category == ContentCategory.VIOLENCE:
filtered = self._soften_violence(filtered)
return filtered
def _redact_hate_speech(self, content: str) -> str:
"""Redact hate speech terms."""
# Implementation with hate speech term list
return content
def _soften_violence(self, content: str) -> str:
"""Soften violent content descriptions."""
return content
class KeywordFilter:
"""Filter based on keyword lists."""
name = "keyword_filter"
def __init__(self, keyword_lists: Dict[str, List[str]]):
self.keyword_lists = keyword_lists
self._compile_patterns()
def _compile_patterns(self):
"""Compile keyword patterns for efficient matching."""
self.patterns = {}
for category, keywords in self.keyword_lists.items():
pattern = '|'.join(re.escape(kw) for kw in keywords)
self.patterns[category] = re.compile(pattern, re.IGNORECASE)
def check(self, content: str, context: Optional[Dict] = None) -> Dict:
"""Check content against keyword lists."""
categories = []
max_severity = 0
for category, pattern in self.patterns.items():
matches = pattern.findall(content)
if matches:
categories.append(ContentCategory(category))
# Severity based on number of matches
severity = min(len(matches) * 0.2, 1.0)
max_severity = max(max_severity, severity)
return {
'categories': categories,
'severity': max_severity
}
class SemanticFilter:
"""Filter using semantic understanding."""
name = "semantic_filter"
def __init__(self, model_config: Optional[Dict] = None):
self.model = self._load_model(model_config)
def _load_model(self, config: Optional[Dict]):
"""Load semantic classification model."""
# Load transformer-based classifier
return None # Placeholder
def check(self, content: str, context: Optional[Dict] = None) -> Dict:
"""Semantic analysis of content."""
if not self.model:
return {'categories': [], 'severity': 0}
# Use the model to classify content
# predictions = self.model.predict(content)
return {
'categories': [],
'severity': 0
}Hallucination Detection
Fact Verification System
# hallucination_detection.py
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from enum import Enum
class VerificationStatus(Enum):
"""Status of fact verification."""
VERIFIED = "verified"
UNVERIFIED = "unverified"
CONTRADICTED = "contradicted"
UNCERTAIN = "uncertain"
@dataclass
class FactClaim:
"""A factual claim extracted from text."""
claim_text: str
claim_type: str
entities: List[str]
confidence: float
@dataclass
class VerificationResult:
"""Result of fact verification."""
claim: FactClaim
status: VerificationStatus
evidence: List[Dict]
confidence: float
source_urls: List[str]
class HallucinationDetector:
"""Detect and flag potential hallucinations in LLM output."""
def __init__(self, config: Dict):
self.config = config
self.claim_extractor = ClaimExtractor()
self.fact_verifier = FactVerifier(config.get('knowledge_base'))
self.consistency_checker = ConsistencyChecker()
def analyze(
self,
llm_output: str,
context: Optional[str] = None,
source_documents: Optional[List[str]] = None
) -> Dict:
"""Analyze LLM output for potential hallucinations."""
# Extract factual claims
claims = self.claim_extractor.extract(llm_output)
# Verify each claim
verification_results = []
for claim in claims:
result = self.fact_verifier.verify(claim, source_documents)
verification_results.append(result)
# Check internal consistency
consistency = self.consistency_checker.check(claims, llm_output)
# Calculate overall hallucination score
hallucination_score = self._calculate_score(
verification_results, consistency
)
return {
'claims': claims,
'verifications': verification_results,
'consistency': consistency,
'hallucination_score': hallucination_score,
'recommendation': self._get_recommendation(hallucination_score)
}
def _calculate_score(
self,
verifications: List[VerificationResult],
consistency: Dict
) -> float:
"""Calculate overall hallucination score."""
if not verifications:
return 0.0
# Weight different factors
contradicted = sum(
1 for v in verifications
if v.status == VerificationStatus.CONTRADICTED
)
unverified = sum(
1 for v in verifications
if v.status == VerificationStatus.UNVERIFIED
)
total = len(verifications)
# Score: higher = more likely hallucinated
contradiction_score = contradicted / total * 0.6
unverified_score = unverified / total * 0.3
inconsistency_score = (1 - consistency['score']) * 0.1
return min(contradiction_score + unverified_score + inconsistency_score, 1.0)
def _get_recommendation(self, score: float) -> str:
"""Get recommendation based on hallucination score."""
if score < 0.2:
return "Output appears reliable"
elif score < 0.4:
return "Minor concerns - consider verification"
elif score < 0.6:
return "Significant concerns - verification recommended"
elif score < 0.8:
return "High risk of hallucination - manual review required"
else:
return "Likely hallucinated - do not use without verification"
class ClaimExtractor:
"""Extract factual claims from text."""
def extract(self, text: str) -> List[FactClaim]:
"""Extract factual claims from text."""
claims = []
# Extract different types of claims
claims.extend(self._extract_numerical_claims(text))
claims.extend(self._extract_entity_claims(text))
claims.extend(self._extract_temporal_claims(text))
claims.extend(self._extract_causal_claims(text))
return claims
def _extract_numerical_claims(self, text: str) -> List[FactClaim]:
"""Extract claims with numerical data."""
import re
pattern = r'(\d+(?:\.\d+)?(?:\s*%|\s*percent)?)\s*(?:of|are|is|was|were)\s*([^.]+)'
matches = re.findall(pattern, text)
claims = []
for number, subject in matches:
claims.append(FactClaim(
claim_text=f"{number} {subject}",
claim_type="numerical",
entities=[subject.strip()],
confidence=0.8
))
return claims
def _extract_entity_claims(self, text: str) -> List[FactClaim]:
"""Extract claims about named entities."""
# Use NER to identify entities and their relationships
return []
def _extract_temporal_claims(self, text: str) -> List[FactClaim]:
"""Extract claims with dates/times."""
return []
def _extract_causal_claims(self, text: str) -> List[FactClaim]:
"""Extract cause-effect claims."""
return []
class FactVerifier:
"""Verify factual claims against knowledge sources."""
def __init__(self, knowledge_base: Optional[Dict] = None):
self.knowledge_base = knowledge_base or {}
def verify(
self,
claim: FactClaim,
source_documents: Optional[List[str]] = None
) -> VerificationResult:
"""Verify a factual claim."""
evidence = []
# Check against source documents (RAG grounding)
if source_documents:
doc_evidence = self._check_source_documents(claim, source_documents)
evidence.extend(doc_evidence)
# Check against knowledge base
kb_evidence = self._check_knowledge_base(claim)
evidence.extend(kb_evidence)
# Determine verification status
status, confidence = self._determine_status(evidence)
return VerificationResult(
claim=claim,
status=status,
evidence=evidence,
confidence=confidence,
source_urls=[e.get('url', '') for e in evidence if e.get('url')]
)
def _check_source_documents(
self,
claim: FactClaim,
documents: List[str]
) -> List[Dict]:
"""Check claim against source documents."""
evidence = []
for doc in documents:
# Use semantic similarity to find supporting/contradicting passages
similarity = self._calculate_similarity(claim.claim_text, doc)
if similarity > 0.7:
# Check if supporting or contradicting
relationship = self._determine_relationship(claim.claim_text, doc)
evidence.append({
'source': 'source_document',
'text': doc[:200],
'similarity': similarity,
'relationship': relationship
})
return evidence
def _check_knowledge_base(self, claim: FactClaim) -> List[Dict]:
"""Check claim against knowledge base."""
return []
def _calculate_similarity(self, text1: str, text2: str) -> float:
"""Calculate semantic similarity between texts."""
# Use embedding model for similarity
return 0.0
def _determine_relationship(self, claim: str, evidence: str) -> str:
"""Determine if evidence supports or contradicts claim."""
return "neutral"
def _determine_status(
self,
evidence: List[Dict]
) -> Tuple[VerificationStatus, float]:
"""Determine verification status from evidence."""
if not evidence:
return VerificationStatus.UNVERIFIED, 0.0
supporting = sum(1 for e in evidence if e.get('relationship') == 'supporting')
contradicting = sum(1 for e in evidence if e.get('relationship') == 'contradicting')
total = len(evidence)
if contradicting > supporting:
return VerificationStatus.CONTRADICTED, contradicting / total
elif supporting > contradicting:
return VerificationStatus.VERIFIED, supporting / total
else:
return VerificationStatus.UNCERTAIN, 0.5
class ConsistencyChecker:
"""Check internal consistency of claims."""
def check(self, claims: List[FactClaim], full_text: str) -> Dict:
"""Check for internal contradictions."""
contradictions = []
# Compare each pair of claims
for i, claim1 in enumerate(claims):
for claim2 in claims[i+1:]:
if self._are_contradictory(claim1, claim2):
contradictions.append({
'claim1': claim1.claim_text,
'claim2': claim2.claim_text,
'type': 'direct_contradiction'
})
score = 1.0 - (len(contradictions) / max(len(claims), 1))
return {
'score': score,
'contradictions': contradictions
}
def _are_contradictory(self, claim1: FactClaim, claim2: FactClaim) -> bool:
"""Check if two claims contradict each other."""
# Implement contradiction detection logic
return FalsePII Detection and Filtering
PII Filter Implementation
# pii_filter.py
import re
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
class PIIType(Enum):
"""Types of Personally Identifiable Information."""
EMAIL = "email"
PHONE = "phone"
SSN = "ssn"
CREDIT_CARD = "credit_card"
ADDRESS = "address"
NAME = "name"
DATE_OF_BIRTH = "date_of_birth"
IP_ADDRESS = "ip_address"
MEDICAL_ID = "medical_id"
PASSPORT = "passport"
DRIVERS_LICENSE = "drivers_license"
BANK_ACCOUNT = "bank_account"
@dataclass
class PIIDetection:
"""Detected PII instance."""
pii_type: PIIType
value: str
start_position: int
end_position: int
confidence: float
redacted_value: str
class PIIFilter:
"""Filter PII from LLM outputs."""
def __init__(self, config: Optional[Dict] = None):
self.config = config or {}
self.patterns = self._compile_patterns()
def _compile_patterns(self) -> Dict[PIIType, re.Pattern]:
"""Compile regex patterns for PII detection."""
return {
PIIType.EMAIL: re.compile(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
),
PIIType.PHONE: re.compile(
r'\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b'
),
PIIType.SSN: re.compile(
r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b'
),
PIIType.CREDIT_CARD: re.compile(
r'\b(?:\d{4}[-\s]?){3}\d{4}\b'
),
PIIType.IP_ADDRESS: re.compile(
r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
),
PIIType.DATE_OF_BIRTH: re.compile(
r'\b(?:DOB|Date of Birth|born)[:\s]*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\b',
re.IGNORECASE
),
PIIType.PASSPORT: re.compile(
r'\b[A-Z]{1,2}\d{6,9}\b'
),
PIIType.BANK_ACCOUNT: re.compile(
r'\b\d{8,17}\b' # Bank account numbers vary by country
)
}
def detect(self, text: str) -> List[PIIDetection]:
"""Detect all PII in text."""
detections = []
for pii_type, pattern in self.patterns.items():
for match in pattern.finditer(text):
detection = PIIDetection(
pii_type=pii_type,
value=match.group(),
start_position=match.start(),
end_position=match.end(),
confidence=self._calculate_confidence(pii_type, match.group()),
redacted_value=self._redact(pii_type, match.group())
)
detections.append(detection)
# Sort by position
detections.sort(key=lambda x: x.start_position)
return detections
def filter(self, text: str, redaction_style: str = "mask") -> str:
"""Filter PII from text."""
detections = self.detect(text)
# Apply redactions from end to start to preserve positions
filtered_text = text
for detection in reversed(detections):
if redaction_style == "mask":
replacement = detection.redacted_value
elif redaction_style == "remove":
replacement = "[REDACTED]"
elif redaction_style == "type_label":
replacement = f"[{detection.pii_type.value.upper()}]"
else:
replacement = detection.redacted_value
filtered_text = (
filtered_text[:detection.start_position] +
replacement +
filtered_text[detection.end_position:]
)
return filtered_text
def _calculate_confidence(self, pii_type: PIIType, value: str) -> float:
"""Calculate confidence score for PII detection."""
# Base confidence from pattern match
confidence = 0.7
# Adjust based on PII type and value characteristics
if pii_type == PIIType.EMAIL:
# Higher confidence for common email patterns
if re.match(r'.+@(gmail|yahoo|outlook|hotmail)\.com$', value, re.IGNORECASE):
confidence = 0.95
else:
confidence = 0.85
elif pii_type == PIIType.SSN:
# Check for valid SSN format
if self._is_valid_ssn(value):
confidence = 0.9
else:
confidence = 0.6
elif pii_type == PIIType.CREDIT_CARD:
# Use Luhn algorithm
if self._luhn_check(value):
confidence = 0.95
else:
confidence = 0.5
return confidence
def _redact(self, pii_type: PIIType, value: str) -> str:
"""Create redacted version of PII value."""
if pii_type == PIIType.EMAIL:
parts = value.split('@')
return parts[0][:2] + '***@' + parts[1]
elif pii_type == PIIType.PHONE:
clean = re.sub(r'[^\d]', '', value)
return '***-***-' + clean[-4:]
elif pii_type == PIIType.SSN:
return 'XXX-XX-' + value[-4:]
elif pii_type == PIIType.CREDIT_CARD:
return 'XXXX-XXXX-XXXX-' + value[-4:]
elif pii_type == PIIType.IP_ADDRESS:
parts = value.split('.')
return f"{parts[0]}.XXX.XXX.XXX"
else:
# Generic redaction
if len(value) > 4:
return value[:2] + '*' * (len(value) - 4) + value[-2:]
return '*' * len(value)
def _is_valid_ssn(self, value: str) -> bool:
"""Validate SSN format."""
clean = re.sub(r'[^\d]', '', value)
if len(clean) != 9:
return False
# Check for invalid SSNs
invalid_prefixes = ['000', '666', '900-999']
area = clean[:3]
if area == '000' or area == '666' or int(area) >= 900:
return False
return True
def _luhn_check(self, card_number: str) -> bool:
"""Validate credit card using Luhn algorithm."""
clean = re.sub(r'[^\d]', '', card_number)
if len(clean) < 13 or len(clean) > 19:
return False
digits = [int(d) for d in clean]
checksum = 0
for i, digit in enumerate(reversed(digits)):
if i % 2 == 1:
digit *= 2
if digit > 9:
digit -= 9
checksum += digit
return checksum % 10 == 0Structured Output Enforcement
JSON Schema Validation
# structured_output.py
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Type
from enum import Enum
import jsonschema
class OutputFormat(Enum):
"""Supported output formats."""
JSON = "json"
MARKDOWN = "markdown"
HTML = "html"
PLAIN_TEXT = "plain_text"
@dataclass
class ValidationResult:
"""Result of output validation."""
valid: bool
errors: List[str]
fixed_output: Optional[str]
original_output: str
class StructuredOutputValidator:
"""Validate and enforce structured LLM outputs."""
def __init__(self):
self.validators = {}
def register_schema(self, name: str, schema: Dict):
"""Register a JSON schema for validation."""
jsonschema.Draft7Validator.check_schema(schema)
self.validators[name] = jsonschema.Draft7Validator(schema)
def validate_json(
self,
output: str,
schema_name: str,
auto_fix: bool = True
) -> ValidationResult:
"""Validate JSON output against schema."""
validator = self.validators.get(schema_name)
if not validator:
return ValidationResult(
valid=False,
errors=[f"Schema '{schema_name}' not found"],
fixed_output=None,
original_output=output
)
# Try to parse JSON
try:
data = json.loads(output)
except json.JSONDecodeError as e:
if auto_fix:
fixed = self._attempt_json_fix(output)
if fixed:
return self.validate_json(fixed, schema_name, auto_fix=False)
return ValidationResult(
valid=False,
errors=[f"Invalid JSON: {str(e)}"],
fixed_output=None,
original_output=output
)
# Validate against schema
errors = list(validator.iter_errors(data))
if errors:
error_messages = [
f"{e.path}: {e.message}" if e.path else e.message
for e in errors
]
if auto_fix:
fixed_data = self._attempt_schema_fix(data, errors, validator.schema)
if fixed_data:
fixed_output = json.dumps(fixed_data, indent=2)
# Re-validate
new_errors = list(validator.iter_errors(fixed_data))
if not new_errors:
return ValidationResult(
valid=True,
errors=[],
fixed_output=fixed_output,
original_output=output
)
return ValidationResult(
valid=False,
errors=error_messages,
fixed_output=None,
original_output=output
)
return ValidationResult(
valid=True,
errors=[],
fixed_output=None,
original_output=output
)
def _attempt_json_fix(self, output: str) -> Optional[str]:
"""Attempt to fix common JSON issues."""
fixed = output
# Remove markdown code blocks
if fixed.startswith('```'):
lines = fixed.split('\n')
if lines[0].startswith('```'):
lines = lines[1:]
if lines and lines[-1].strip() == '```':
lines = lines[:-1]
fixed = '\n'.join(lines)
# Fix trailing commas
fixed = re.sub(r',\s*}', '}', fixed)
fixed = re.sub(r',\s*]', ']', fixed)
# Fix single quotes
fixed = fixed.replace("'", '"')
# Try to parse
try:
json.loads(fixed)
return fixed
except json.JSONDecodeError:
return None
def _attempt_schema_fix(
self,
data: Dict,
errors: List,
schema: Dict
) -> Optional[Dict]:
"""Attempt to fix schema validation errors."""
fixed_data = data.copy()
for error in errors:
# Handle missing required properties
if error.validator == 'required':
for prop in error.validator_value:
if prop not in fixed_data:
# Add default value based on schema
prop_schema = schema.get('properties', {}).get(prop, {})
fixed_data[prop] = self._get_default_value(prop_schema)
# Handle type errors
elif error.validator == 'type':
path = list(error.path)
if path:
current = fixed_data
for key in path[:-1]:
current = current[key]
current[path[-1]] = self._coerce_type(
current[path[-1]],
error.validator_value
)
return fixed_data
def _get_default_value(self, schema: Dict) -> Any:
"""Get default value for schema type."""
if 'default' in schema:
return schema['default']
type_defaults = {
'string': '',
'number': 0,
'integer': 0,
'boolean': False,
'array': [],
'object': {}
}
return type_defaults.get(schema.get('type'), None)
def _coerce_type(self, value: Any, target_type: str) -> Any:
"""Attempt to coerce value to target type."""
try:
if target_type == 'string':
return str(value)
elif target_type == 'number':
return float(value)
elif target_type == 'integer':
return int(value)
elif target_type == 'boolean':
return bool(value)
elif target_type == 'array' and not isinstance(value, list):
return [value]
except (ValueError, TypeError):
pass
return value
# Example usage
validator = StructuredOutputValidator()
# Register schema for product recommendations
validator.register_schema('product_recommendation', {
"type": "object",
"required": ["products", "reasoning"],
"properties": {
"products": {
"type": "array",
"items": {
"type": "object",
"required": ["name", "price", "relevance_score"],
"properties": {
"name": {"type": "string"},
"price": {"type": "number", "minimum": 0},
"relevance_score": {"type": "number", "minimum": 0, "maximum": 1}
}
}
},
"reasoning": {"type": "string"},
"confidence": {"type": "number", "minimum": 0, "maximum": 1}
}
})Complete Output Validation Pipeline
# output_pipeline.py
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class PipelineResult:
"""Result of complete validation pipeline."""
original_output: str
final_output: str
passed: bool
moderation: Dict
hallucination: Dict
pii: Dict
structure: Dict
applied_fixes: List[str]
class OutputValidationPipeline:
"""Complete output validation pipeline."""
def __init__(self, config: Dict):
self.moderator = ContentModerator(config.get('moderation', {}))
self.hallucination_detector = HallucinationDetector(config.get('hallucination', {}))
self.pii_filter = PIIFilter(config.get('pii', {}))
self.structure_validator = StructuredOutputValidator()
# Register schemas
for name, schema in config.get('schemas', {}).items():
self.structure_validator.register_schema(name, schema)
def process(
self,
output: str,
context: Optional[Dict] = None,
source_documents: Optional[List[str]] = None,
expected_schema: Optional[str] = None
) -> PipelineResult:
"""Process output through complete validation pipeline."""
applied_fixes = []
current_output = output
# Step 1: Content moderation
moderation_result = self.moderator.moderate(current_output, context)
if moderation_result.action == ModerationAction.BLOCK:
return PipelineResult(
original_output=output,
final_output="[Content blocked due to policy violation]",
passed=False,
moderation=moderation_result.__dict__,
hallucination={},
pii={},
structure={},
applied_fixes=[]
)
if moderation_result.filtered_content:
current_output = moderation_result.filtered_content
applied_fixes.append("content_moderation")
# Step 2: PII filtering
pii_detections = self.pii_filter.detect(current_output)
if pii_detections:
current_output = self.pii_filter.filter(current_output)
applied_fixes.append("pii_filtering")
# Step 3: Hallucination detection
hallucination_result = self.hallucination_detector.analyze(
current_output,
context.get('prompt') if context else None,
source_documents
)
# Step 4: Structure validation (if schema specified)
structure_result = {}
if expected_schema:
validation = self.structure_validator.validate_json(
current_output,
expected_schema
)
structure_result = validation.__dict__
if validation.fixed_output:
current_output = validation.fixed_output
applied_fixes.append("structure_fix")
# Determine if output passed
passed = (
moderation_result.action in [ModerationAction.ALLOW, ModerationAction.WARN] and
hallucination_result['hallucination_score'] < 0.6 and
(not expected_schema or structure_result.get('valid', True))
)
return PipelineResult(
original_output=output,
final_output=current_output,
passed=passed,
moderation=moderation_result.__dict__,
hallucination=hallucination_result,
pii={'detections': [d.__dict__ for d in pii_detections]},
structure=structure_result,
applied_fixes=applied_fixes
)Conclusion
Robust LLM output validation requires multiple layers of defense:
- Content Moderation - Filter harmful or inappropriate content
- Hallucination Detection - Verify factual claims against sources
- PII Filtering - Protect sensitive personal information
- Structure Enforcement - Ensure outputs match expected schemas
Implementing these safeguards ensures your AI applications produce reliable, safe, and trustworthy outputs for end users.