Validare Output LLM si Filtre de Siguranta: Construirea Aplicatiilor AI Fiabile
Output-urile LLM necesita validare atenta inainte de a fi prezentate utilizatorilor sau folosite in sisteme downstream. Acest ghid acopera strategii complete pentru validarea, filtrarea si asigurarea sigurantei continutului generat de AI.
Pipeline de Moderare a Continutului
Filtru de Continut Multi-Strat
# content_moderation.py
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from enum import Enum
import re
class ContentCategory(Enum):
"""Categorii de continut pentru moderare."""
SAFE = "safe"
HATE_SPEECH = "hate_speech"
VIOLENCE = "violence"
SEXUAL = "sexual"
SELF_HARM = "self_harm"
HARASSMENT = "harassment"
DANGEROUS = "dangerous"
MISINFORMATION = "misinformation"
class ModerationAction(Enum):
"""Actiuni pentru continut moderat."""
ALLOW = "allow"
WARN = "warn"
BLOCK = "block"
REVIEW = "review"
@dataclass
class ModerationResult:
"""Rezultatul moderarii continutului."""
original_content: str
filtered_content: Optional[str]
categories_detected: List[ContentCategory]
action: ModerationAction
confidence: float
details: Dict
class ContentModerator:
"""Sistem de moderare a continutului multi-strat."""
def __init__(self, config: Dict):
self.config = config
self.filters = self._initialize_filters()
def _initialize_filters(self) -> List:
"""Initializare filtre de moderare."""
return [
KeywordFilter(self.config.get('keyword_lists', {})),
PatternFilter(self.config.get('patterns', {})),
SemanticFilter(self.config.get('semantic_model')),
ContextualFilter(self.config.get('context_rules', {}))
]
def moderate(self, content: str, context: Optional[Dict] = None) -> ModerationResult:
"""Aplica toate filtrele de moderare asupra continutului."""
detected_categories = []
highest_severity = 0
details = {}
for filter_instance in self.filters:
result = filter_instance.check(content, context)
detected_categories.extend(result['categories'])
highest_severity = max(highest_severity, result['severity'])
details[filter_instance.name] = result
# Deduplicare categorii
detected_categories = list(set(detected_categories))
# Determinare actiune pe baza severitatii
action = self._determine_action(highest_severity, detected_categories)
# Filtrare continut daca este necesar
filtered_content = None
if action != ModerationAction.BLOCK:
filtered_content = self._apply_filters(content, detected_categories)
return ModerationResult(
original_content=content,
filtered_content=filtered_content,
categories_detected=detected_categories,
action=action,
confidence=highest_severity,
details=details
)
def _determine_action(
self,
severity: float,
categories: List[ContentCategory]
) -> ModerationAction:
"""Determinare actiune de moderare pe baza severitatii si categoriilor."""
# Blocheaza mereu anumite categorii
block_categories = {
ContentCategory.SELF_HARM,
ContentCategory.DANGEROUS
}
if any(cat in block_categories for cat in categories):
return ModerationAction.BLOCK
if severity >= 0.9:
return ModerationAction.BLOCK
elif severity >= 0.7:
return ModerationAction.REVIEW
elif severity >= 0.5:
return ModerationAction.WARN
return ModerationAction.ALLOW
def _apply_filters(
self,
content: str,
categories: List[ContentCategory]
) -> str:
"""Aplica filtre de continut si redactari."""
filtered = content
# Aplica filtre specifice categoriei
for category in categories:
if category == ContentCategory.HATE_SPEECH:
filtered = self._redact_hate_speech(filtered)
elif category == ContentCategory.VIOLENCE:
filtered = self._soften_violence(filtered)
return filtered
def _redact_hate_speech(self, content: str) -> str:
"""Redacteaza termenii de hate speech."""
# Implementare cu lista de termeni hate speech
return content
def _soften_violence(self, content: str) -> str:
"""Atenueaza descrierile de continut violent."""
return content
class KeywordFilter:
"""Filtru bazat pe liste de cuvinte cheie."""
name = "keyword_filter"
def __init__(self, keyword_lists: Dict[str, List[str]]):
self.keyword_lists = keyword_lists
self._compile_patterns()
def _compile_patterns(self):
"""Compileaza pattern-urile de cuvinte cheie pentru potrivire eficienta."""
self.patterns = {}
for category, keywords in self.keyword_lists.items():
pattern = '|'.join(re.escape(kw) for kw in keywords)
self.patterns[category] = re.compile(pattern, re.IGNORECASE)
def check(self, content: str, context: Optional[Dict] = None) -> Dict:
"""Verifica continutul fata de listele de cuvinte cheie."""
categories = []
max_severity = 0
for category, pattern in self.patterns.items():
matches = pattern.findall(content)
if matches:
categories.append(ContentCategory(category))
# Severitate bazata pe numarul de potriviri
severity = min(len(matches) * 0.2, 1.0)
max_severity = max(max_severity, severity)
return {
'categories': categories,
'severity': max_severity
}
class SemanticFilter:
"""Filtru folosind intelegere semantica."""
name = "semantic_filter"
def __init__(self, model_config: Optional[Dict] = None):
self.model = self._load_model(model_config)
def _load_model(self, config: Optional[Dict]):
"""Incarca modelul de clasificare semantica."""
# Incarca clasificator bazat pe transformer
return None # Placeholder
def check(self, content: str, context: Optional[Dict] = None) -> Dict:
"""Analiza semantica a continutului."""
if not self.model:
return {'categories': [], 'severity': 0}
# Foloseste modelul pentru a clasifica continutul
# predictions = self.model.predict(content)
return {
'categories': [],
'severity': 0
}Detectia Halucinatiilor
Sistem de Verificare a Faptelor
# hallucination_detection.py
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from enum import Enum
class VerificationStatus(Enum):
"""Statusul verificarii faptelor."""
VERIFIED = "verified"
UNVERIFIED = "unverified"
CONTRADICTED = "contradicted"
UNCERTAIN = "uncertain"
@dataclass
class FactClaim:
"""O afirmatie factuala extrasa din text."""
claim_text: str
claim_type: str
entities: List[str]
confidence: float
@dataclass
class VerificationResult:
"""Rezultatul verificarii faptelor."""
claim: FactClaim
status: VerificationStatus
evidence: List[Dict]
confidence: float
source_urls: List[str]
class HallucinationDetector:
"""Detecteaza si semnaleaza halucinatiile potentiale din output-ul LLM."""
def __init__(self, config: Dict):
self.config = config
self.claim_extractor = ClaimExtractor()
self.fact_verifier = FactVerifier(config.get('knowledge_base'))
self.consistency_checker = ConsistencyChecker()
def analyze(
self,
llm_output: str,
context: Optional[str] = None,
source_documents: Optional[List[str]] = None
) -> Dict:
"""Analizeaza output-ul LLM pentru halucinatii potentiale."""
# Extrage afirmatii factuale
claims = self.claim_extractor.extract(llm_output)
# Verifica fiecare afirmatie
verification_results = []
for claim in claims:
result = self.fact_verifier.verify(claim, source_documents)
verification_results.append(result)
# Verifica consistenta interna
consistency = self.consistency_checker.check(claims, llm_output)
# Calculeaza scorul general de halucinatie
hallucination_score = self._calculate_score(
verification_results, consistency
)
return {
'claims': claims,
'verifications': verification_results,
'consistency': consistency,
'hallucination_score': hallucination_score,
'recommendation': self._get_recommendation(hallucination_score)
}
def _calculate_score(
self,
verifications: List[VerificationResult],
consistency: Dict
) -> float:
"""Calculeaza scorul general de halucinatie."""
if not verifications:
return 0.0
# Pondereaza diferiti factori
contradicted = sum(
1 for v in verifications
if v.status == VerificationStatus.CONTRADICTED
)
unverified = sum(
1 for v in verifications
if v.status == VerificationStatus.UNVERIFIED
)
total = len(verifications)
# Scor: mai mare = mai probabil halucinat
contradiction_score = contradicted / total * 0.6
unverified_score = unverified / total * 0.3
inconsistency_score = (1 - consistency['score']) * 0.1
return min(contradiction_score + unverified_score + inconsistency_score, 1.0)
def _get_recommendation(self, score: float) -> str:
"""Obtine recomandarea pe baza scorului de halucinatie."""
if score < 0.2:
return "Output-ul pare fiabil"
elif score < 0.4:
return "Preocupari minore - ia in considerare verificarea"
elif score < 0.6:
return "Preocupari semnificative - verificare recomandata"
elif score < 0.8:
return "Risc ridicat de halucinatie - revizuire manuala necesara"
else:
return "Probabil halucinat - nu folosi fara verificare"
class ClaimExtractor:
"""Extrage afirmatii factuale din text."""
def extract(self, text: str) -> List[FactClaim]:
"""Extrage afirmatii factuale din text."""
claims = []
# Extrage diferite tipuri de afirmatii
claims.extend(self._extract_numerical_claims(text))
claims.extend(self._extract_entity_claims(text))
claims.extend(self._extract_temporal_claims(text))
claims.extend(self._extract_causal_claims(text))
return claims
def _extract_numerical_claims(self, text: str) -> List[FactClaim]:
"""Extrage afirmatii cu date numerice."""
import re
pattern = r'(\d+(?:\.\d+)?(?:\s*%|\s*percent)?)\s*(?:of|are|is|was|were)\s*([^.]+)'
matches = re.findall(pattern, text)
claims = []
for number, subject in matches:
claims.append(FactClaim(
claim_text=f"{number} {subject}",
claim_type="numerical",
entities=[subject.strip()],
confidence=0.8
))
return claims
def _extract_entity_claims(self, text: str) -> List[FactClaim]:
"""Extrage afirmatii despre entitati numite."""
# Foloseste NER pentru a identifica entitatile si relatiile lor
return []
def _extract_temporal_claims(self, text: str) -> List[FactClaim]:
"""Extrage afirmatii cu date/ore."""
return []
def _extract_causal_claims(self, text: str) -> List[FactClaim]:
"""Extrage afirmatii cauza-efect."""
return []
class FactVerifier:
"""Verifica afirmatiile factuale fata de surse de cunostinte."""
def __init__(self, knowledge_base: Optional[Dict] = None):
self.knowledge_base = knowledge_base or {}
def verify(
self,
claim: FactClaim,
source_documents: Optional[List[str]] = None
) -> VerificationResult:
"""Verifica o afirmatie factuala."""
evidence = []
# Verifica fata de documentele sursa (grounding RAG)
if source_documents:
doc_evidence = self._check_source_documents(claim, source_documents)
evidence.extend(doc_evidence)
# Verifica fata de baza de cunostinte
kb_evidence = self._check_knowledge_base(claim)
evidence.extend(kb_evidence)
# Determina statusul verificarii
status, confidence = self._determine_status(evidence)
return VerificationResult(
claim=claim,
status=status,
evidence=evidence,
confidence=confidence,
source_urls=[e.get('url', '') for e in evidence if e.get('url')]
)
def _check_source_documents(
self,
claim: FactClaim,
documents: List[str]
) -> List[Dict]:
"""Verifica afirmatia fata de documentele sursa."""
evidence = []
for doc in documents:
# Foloseste similaritate semantica pentru a gasi pasaje care sustin/contrazic
similarity = self._calculate_similarity(claim.claim_text, doc)
if similarity > 0.7:
# Verifica daca sustine sau contrazice
relationship = self._determine_relationship(claim.claim_text, doc)
evidence.append({
'source': 'source_document',
'text': doc[:200],
'similarity': similarity,
'relationship': relationship
})
return evidence
def _check_knowledge_base(self, claim: FactClaim) -> List[Dict]:
"""Verifica afirmatia fata de baza de cunostinte."""
return []
def _calculate_similarity(self, text1: str, text2: str) -> float:
"""Calculeaza similaritatea semantica intre texte."""
# Foloseste model de embedding pentru similaritate
return 0.0
def _determine_relationship(self, claim: str, evidence: str) -> str:
"""Determina daca dovada sustine sau contrazice afirmatia."""
return "neutral"
def _determine_status(
self,
evidence: List[Dict]
) -> Tuple[VerificationStatus, float]:
"""Determina statusul verificarii din dovezi."""
if not evidence:
return VerificationStatus.UNVERIFIED, 0.0
supporting = sum(1 for e in evidence if e.get('relationship') == 'supporting')
contradicting = sum(1 for e in evidence if e.get('relationship') == 'contradicting')
total = len(evidence)
if contradicting > supporting:
return VerificationStatus.CONTRADICTED, contradicting / total
elif supporting > contradicting:
return VerificationStatus.VERIFIED, supporting / total
else:
return VerificationStatus.UNCERTAIN, 0.5
class ConsistencyChecker:
"""Verifica consistenta interna a afirmatiilor."""
def check(self, claims: List[FactClaim], full_text: str) -> Dict:
"""Verifica contradictiile interne."""
contradictions = []
# Compara fiecare pereche de afirmatii
for i, claim1 in enumerate(claims):
for claim2 in claims[i+1:]:
if self._are_contradictory(claim1, claim2):
contradictions.append({
'claim1': claim1.claim_text,
'claim2': claim2.claim_text,
'type': 'direct_contradiction'
})
score = 1.0 - (len(contradictions) / max(len(claims), 1))
return {
'score': score,
'contradictions': contradictions
}
def _are_contradictory(self, claim1: FactClaim, claim2: FactClaim) -> bool:
"""Verifica daca doua afirmatii se contrazic."""
# Implementeaza logica de detectie a contradictiilor
return FalseDetectie si Filtrare PII
Implementare Filtru PII
# pii_filter.py
import re
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
class PIIType(Enum):
"""Tipuri de Informatii Personale Identificabile."""
EMAIL = "email"
PHONE = "phone"
SSN = "ssn"
CREDIT_CARD = "credit_card"
ADDRESS = "address"
NAME = "name"
DATE_OF_BIRTH = "date_of_birth"
IP_ADDRESS = "ip_address"
MEDICAL_ID = "medical_id"
PASSPORT = "passport"
DRIVERS_LICENSE = "drivers_license"
BANK_ACCOUNT = "bank_account"
@dataclass
class PIIDetection:
"""Instanta PII detectata."""
pii_type: PIIType
value: str
start_position: int
end_position: int
confidence: float
redacted_value: str
class PIIFilter:
"""Filtreaza PII din output-urile LLM."""
def __init__(self, config: Optional[Dict] = None):
self.config = config or {}
self.patterns = self._compile_patterns()
def _compile_patterns(self) -> Dict[PIIType, re.Pattern]:
"""Compileaza pattern-urile regex pentru detectia PII."""
return {
PIIType.EMAIL: re.compile(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
),
PIIType.PHONE: re.compile(
r'\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b'
),
PIIType.SSN: re.compile(
r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b'
),
PIIType.CREDIT_CARD: re.compile(
r'\b(?:\d{4}[-\s]?){3}\d{4}\b'
),
PIIType.IP_ADDRESS: re.compile(
r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
),
PIIType.DATE_OF_BIRTH: re.compile(
r'\b(?:DOB|Date of Birth|born)[:\s]*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\b',
re.IGNORECASE
),
PIIType.PASSPORT: re.compile(
r'\b[A-Z]{1,2}\d{6,9}\b'
),
PIIType.BANK_ACCOUNT: re.compile(
r'\b\d{8,17}\b' # Numerele de cont bancar variaza in functie de tara
)
}
def detect(self, text: str) -> List[PIIDetection]:
"""Detecteaza tot PII-ul din text."""
detections = []
for pii_type, pattern in self.patterns.items():
for match in pattern.finditer(text):
detection = PIIDetection(
pii_type=pii_type,
value=match.group(),
start_position=match.start(),
end_position=match.end(),
confidence=self._calculate_confidence(pii_type, match.group()),
redacted_value=self._redact(pii_type, match.group())
)
detections.append(detection)
# Sorteaza dupa pozitie
detections.sort(key=lambda x: x.start_position)
return detections
def filter(self, text: str, redaction_style: str = "mask") -> str:
"""Filtreaza PII din text."""
detections = self.detect(text)
# Aplica redactarile de la sfarsit la inceput pentru a pastra pozitiile
filtered_text = text
for detection in reversed(detections):
if redaction_style == "mask":
replacement = detection.redacted_value
elif redaction_style == "remove":
replacement = "[REDACTAT]"
elif redaction_style == "type_label":
replacement = f"[{detection.pii_type.value.upper()}]"
else:
replacement = detection.redacted_value
filtered_text = (
filtered_text[:detection.start_position] +
replacement +
filtered_text[detection.end_position:]
)
return filtered_text
def _calculate_confidence(self, pii_type: PIIType, value: str) -> float:
"""Calculeaza scorul de incredere pentru detectia PII."""
# Incredere de baza din potrivirea pattern-ului
confidence = 0.7
# Ajusteaza pe baza tipului PII si caracteristicilor valorii
if pii_type == PIIType.EMAIL:
# Incredere mai mare pentru pattern-uri comune de email
if re.match(r'.+@(gmail|yahoo|outlook|hotmail)\.com$', value, re.IGNORECASE):
confidence = 0.95
else:
confidence = 0.85
elif pii_type == PIIType.SSN:
# Verifica format SSN valid
if self._is_valid_ssn(value):
confidence = 0.9
else:
confidence = 0.6
elif pii_type == PIIType.CREDIT_CARD:
# Foloseste algoritmul Luhn
if self._luhn_check(value):
confidence = 0.95
else:
confidence = 0.5
return confidence
def _redact(self, pii_type: PIIType, value: str) -> str:
"""Creeaza versiunea redactata a valorii PII."""
if pii_type == PIIType.EMAIL:
parts = value.split('@')
return parts[0][:2] + '***@' + parts[1]
elif pii_type == PIIType.PHONE:
clean = re.sub(r'[^\d]', '', value)
return '***-***-' + clean[-4:]
elif pii_type == PIIType.SSN:
return 'XXX-XX-' + value[-4:]
elif pii_type == PIIType.CREDIT_CARD:
return 'XXXX-XXXX-XXXX-' + value[-4:]
elif pii_type == PIIType.IP_ADDRESS:
parts = value.split('.')
return f"{parts[0]}.XXX.XXX.XXX"
else:
# Redactare generica
if len(value) > 4:
return value[:2] + '*' * (len(value) - 4) + value[-2:]
return '*' * len(value)
def _is_valid_ssn(self, value: str) -> bool:
"""Valideaza formatul SSN."""
clean = re.sub(r'[^\d]', '', value)
if len(clean) != 9:
return False
# Verifica SSN-uri invalide
invalid_prefixes = ['000', '666', '900-999']
area = clean[:3]
if area == '000' or area == '666' or int(area) >= 900:
return False
return True
def _luhn_check(self, card_number: str) -> bool:
"""Valideaza cardul de credit folosind algoritmul Luhn."""
clean = re.sub(r'[^\d]', '', card_number)
if len(clean) < 13 or len(clean) > 19:
return False
digits = [int(d) for d in clean]
checksum = 0
for i, digit in enumerate(reversed(digits)):
if i % 2 == 1:
digit *= 2
if digit > 9:
digit -= 9
checksum += digit
return checksum % 10 == 0Aplicarea Output-ului Structurat
Validare JSON Schema
# structured_output.py
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Type
from enum import Enum
import jsonschema
class OutputFormat(Enum):
"""Formate de output suportate."""
JSON = "json"
MARKDOWN = "markdown"
HTML = "html"
PLAIN_TEXT = "plain_text"
@dataclass
class ValidationResult:
"""Rezultatul validarii output-ului."""
valid: bool
errors: List[str]
fixed_output: Optional[str]
original_output: str
class StructuredOutputValidator:
"""Valideaza si aplica output-uri LLM structurate."""
def __init__(self):
self.validators = {}
def register_schema(self, name: str, schema: Dict):
"""Inregistreaza o schema JSON pentru validare."""
jsonschema.Draft7Validator.check_schema(schema)
self.validators[name] = jsonschema.Draft7Validator(schema)
def validate_json(
self,
output: str,
schema_name: str,
auto_fix: bool = True
) -> ValidationResult:
"""Valideaza output-ul JSON fata de schema."""
validator = self.validators.get(schema_name)
if not validator:
return ValidationResult(
valid=False,
errors=[f"Schema '{schema_name}' nu a fost gasita"],
fixed_output=None,
original_output=output
)
# Incearca sa parseze JSON
try:
data = json.loads(output)
except json.JSONDecodeError as e:
if auto_fix:
fixed = self._attempt_json_fix(output)
if fixed:
return self.validate_json(fixed, schema_name, auto_fix=False)
return ValidationResult(
valid=False,
errors=[f"JSON invalid: {str(e)}"],
fixed_output=None,
original_output=output
)
# Valideaza fata de schema
errors = list(validator.iter_errors(data))
if errors:
error_messages = [
f"{e.path}: {e.message}" if e.path else e.message
for e in errors
]
if auto_fix:
fixed_data = self._attempt_schema_fix(data, errors, validator.schema)
if fixed_data:
fixed_output = json.dumps(fixed_data, indent=2)
# Re-valideaza
new_errors = list(validator.iter_errors(fixed_data))
if not new_errors:
return ValidationResult(
valid=True,
errors=[],
fixed_output=fixed_output,
original_output=output
)
return ValidationResult(
valid=False,
errors=error_messages,
fixed_output=None,
original_output=output
)
return ValidationResult(
valid=True,
errors=[],
fixed_output=None,
original_output=output
)
def _attempt_json_fix(self, output: str) -> Optional[str]:
"""Incearca sa repare probleme comune JSON."""
fixed = output
# Elimina blocuri de cod markdown
if fixed.startswith('```'):
lines = fixed.split('\n')
if lines[0].startswith('```'):
lines = lines[1:]
if lines and lines[-1].strip() == '```':
lines = lines[:-1]
fixed = '\n'.join(lines)
# Repara virgule finale
fixed = re.sub(r',\s*}', '}', fixed)
fixed = re.sub(r',\s*]', ']', fixed)
# Repara ghilimele simple
fixed = fixed.replace("'", '"')
# Incearca sa parseze
try:
json.loads(fixed)
return fixed
except json.JSONDecodeError:
return None
def _attempt_schema_fix(
self,
data: Dict,
errors: List,
schema: Dict
) -> Optional[Dict]:
"""Incearca sa repare erorile de validare a schemei."""
fixed_data = data.copy()
for error in errors:
# Gestioneaza proprietati obligatorii lipsa
if error.validator == 'required':
for prop in error.validator_value:
if prop not in fixed_data:
# Adauga valoare implicita bazata pe schema
prop_schema = schema.get('properties', {}).get(prop, {})
fixed_data[prop] = self._get_default_value(prop_schema)
# Gestioneaza erori de tip
elif error.validator == 'type':
path = list(error.path)
if path:
current = fixed_data
for key in path[:-1]:
current = current[key]
current[path[-1]] = self._coerce_type(
current[path[-1]],
error.validator_value
)
return fixed_data
def _get_default_value(self, schema: Dict) -> Any:
"""Obtine valoarea implicita pentru tipul din schema."""
if 'default' in schema:
return schema['default']
type_defaults = {
'string': '',
'number': 0,
'integer': 0,
'boolean': False,
'array': [],
'object': {}
}
return type_defaults.get(schema.get('type'), None)
def _coerce_type(self, value: Any, target_type: str) -> Any:
"""Incearca sa converteasca valoarea la tipul tinta."""
try:
if target_type == 'string':
return str(value)
elif target_type == 'number':
return float(value)
elif target_type == 'integer':
return int(value)
elif target_type == 'boolean':
return bool(value)
elif target_type == 'array' and not isinstance(value, list):
return [value]
except (ValueError, TypeError):
pass
return value
# Exemplu de utilizare
validator = StructuredOutputValidator()
# Inregistreaza schema pentru recomandari de produse
validator.register_schema('product_recommendation', {
"type": "object",
"required": ["products", "reasoning"],
"properties": {
"products": {
"type": "array",
"items": {
"type": "object",
"required": ["name", "price", "relevance_score"],
"properties": {
"name": {"type": "string"},
"price": {"type": "number", "minimum": 0},
"relevance_score": {"type": "number", "minimum": 0, "maximum": 1}
}
}
},
"reasoning": {"type": "string"},
"confidence": {"type": "number", "minimum": 0, "maximum": 1}
}
})Pipeline Complet de Validare Output
# output_pipeline.py
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class PipelineResult:
"""Rezultatul pipeline-ului complet de validare."""
original_output: str
final_output: str
passed: bool
moderation: Dict
hallucination: Dict
pii: Dict
structure: Dict
applied_fixes: List[str]
class OutputValidationPipeline:
"""Pipeline complet de validare a output-ului."""
def __init__(self, config: Dict):
self.moderator = ContentModerator(config.get('moderation', {}))
self.hallucination_detector = HallucinationDetector(config.get('hallucination', {}))
self.pii_filter = PIIFilter(config.get('pii', {}))
self.structure_validator = StructuredOutputValidator()
# Inregistreaza scheme
for name, schema in config.get('schemas', {}).items():
self.structure_validator.register_schema(name, schema)
def process(
self,
output: str,
context: Optional[Dict] = None,
source_documents: Optional[List[str]] = None,
expected_schema: Optional[str] = None
) -> PipelineResult:
"""Proceseaza output-ul prin pipeline-ul complet de validare."""
applied_fixes = []
current_output = output
# Pasul 1: Moderare continut
moderation_result = self.moderator.moderate(current_output, context)
if moderation_result.action == ModerationAction.BLOCK:
return PipelineResult(
original_output=output,
final_output="[Continut blocat din cauza incalcarii politicii]",
passed=False,
moderation=moderation_result.__dict__,
hallucination={},
pii={},
structure={},
applied_fixes=[]
)
if moderation_result.filtered_content:
current_output = moderation_result.filtered_content
applied_fixes.append("content_moderation")
# Pasul 2: Filtrare PII
pii_detections = self.pii_filter.detect(current_output)
if pii_detections:
current_output = self.pii_filter.filter(current_output)
applied_fixes.append("pii_filtering")
# Pasul 3: Detectie halucinatii
hallucination_result = self.hallucination_detector.analyze(
current_output,
context.get('prompt') if context else None,
source_documents
)
# Pasul 4: Validare structura (daca schema e specificata)
structure_result = {}
if expected_schema:
validation = self.structure_validator.validate_json(
current_output,
expected_schema
)
structure_result = validation.__dict__
if validation.fixed_output:
current_output = validation.fixed_output
applied_fixes.append("structure_fix")
# Determina daca output-ul a trecut
passed = (
moderation_result.action in [ModerationAction.ALLOW, ModerationAction.WARN] and
hallucination_result['hallucination_score'] < 0.6 and
(not expected_schema or structure_result.get('valid', True))
)
return PipelineResult(
original_output=output,
final_output=current_output,
passed=passed,
moderation=moderation_result.__dict__,
hallucination=hallucination_result,
pii={'detections': [d.__dict__ for d in pii_detections]},
structure=structure_result,
applied_fixes=applied_fixes
)Concluzie
Validarea robusta a output-ului LLM necesita mai multe niveluri de aparare:
- Moderare Continut - Filtreaza continut daunator sau nepotrivit
- Detectie Halucinatii - Verifica afirmatiile factuale fata de surse
- Filtrare PII - Protejeaza informatiile personale sensibile
- Aplicare Structura - Asigura ca output-urile corespund schemelor asteptate
Implementarea acestor masuri de siguranta garanteaza ca aplicatiile tale AI produc output-uri fiabile, sigure si de incredere pentru utilizatorii finali.