AI Security

Validare Output LLM si Filtre de Siguranta: Construirea Aplicatiilor AI Fiabile

Nicu Constantin
--15 min lectura
#LLM#AI safety#content moderation#output validation#guardrails

Validare Output LLM si Filtre de Siguranta: Construirea Aplicatiilor AI Fiabile

Output-urile LLM necesita validare atenta inainte de a fi prezentate utilizatorilor sau folosite in sisteme downstream. Acest ghid acopera strategii complete pentru validarea, filtrarea si asigurarea sigurantei continutului generat de AI.

Pipeline de Moderare a Continutului

Filtru de Continut Multi-Strat

# content_moderation.py
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from enum import Enum
import re
 
class ContentCategory(Enum):
    """Categorii de continut pentru moderare."""
    SAFE = "safe"
    HATE_SPEECH = "hate_speech"
    VIOLENCE = "violence"
    SEXUAL = "sexual"
    SELF_HARM = "self_harm"
    HARASSMENT = "harassment"
    DANGEROUS = "dangerous"
    MISINFORMATION = "misinformation"
 
class ModerationAction(Enum):
    """Actiuni pentru continut moderat."""
    ALLOW = "allow"
    WARN = "warn"
    BLOCK = "block"
    REVIEW = "review"
 
@dataclass
class ModerationResult:
    """Rezultatul moderarii continutului."""
    original_content: str
    filtered_content: Optional[str]
    categories_detected: List[ContentCategory]
    action: ModerationAction
    confidence: float
    details: Dict
 
class ContentModerator:
    """Sistem de moderare a continutului multi-strat."""
 
    def __init__(self, config: Dict):
        self.config = config
        self.filters = self._initialize_filters()
 
    def _initialize_filters(self) -> List:
        """Initializare filtre de moderare."""
        return [
            KeywordFilter(self.config.get('keyword_lists', {})),
            PatternFilter(self.config.get('patterns', {})),
            SemanticFilter(self.config.get('semantic_model')),
            ContextualFilter(self.config.get('context_rules', {}))
        ]
 
    def moderate(self, content: str, context: Optional[Dict] = None) -> ModerationResult:
        """Aplica toate filtrele de moderare asupra continutului."""
 
        detected_categories = []
        highest_severity = 0
        details = {}
 
        for filter_instance in self.filters:
            result = filter_instance.check(content, context)
            detected_categories.extend(result['categories'])
            highest_severity = max(highest_severity, result['severity'])
            details[filter_instance.name] = result
 
        # Deduplicare categorii
        detected_categories = list(set(detected_categories))
 
        # Determinare actiune pe baza severitatii
        action = self._determine_action(highest_severity, detected_categories)
 
        # Filtrare continut daca este necesar
        filtered_content = None
        if action != ModerationAction.BLOCK:
            filtered_content = self._apply_filters(content, detected_categories)
 
        return ModerationResult(
            original_content=content,
            filtered_content=filtered_content,
            categories_detected=detected_categories,
            action=action,
            confidence=highest_severity,
            details=details
        )
 
    def _determine_action(
        self,
        severity: float,
        categories: List[ContentCategory]
    ) -> ModerationAction:
        """Determinare actiune de moderare pe baza severitatii si categoriilor."""
 
        # Blocheaza mereu anumite categorii
        block_categories = {
            ContentCategory.SELF_HARM,
            ContentCategory.DANGEROUS
        }
 
        if any(cat in block_categories for cat in categories):
            return ModerationAction.BLOCK
 
        if severity >= 0.9:
            return ModerationAction.BLOCK
        elif severity >= 0.7:
            return ModerationAction.REVIEW
        elif severity >= 0.5:
            return ModerationAction.WARN
 
        return ModerationAction.ALLOW
 
    def _apply_filters(
        self,
        content: str,
        categories: List[ContentCategory]
    ) -> str:
        """Aplica filtre de continut si redactari."""
 
        filtered = content
 
        # Aplica filtre specifice categoriei
        for category in categories:
            if category == ContentCategory.HATE_SPEECH:
                filtered = self._redact_hate_speech(filtered)
            elif category == ContentCategory.VIOLENCE:
                filtered = self._soften_violence(filtered)
 
        return filtered
 
    def _redact_hate_speech(self, content: str) -> str:
        """Redacteaza termenii de hate speech."""
        # Implementare cu lista de termeni hate speech
        return content
 
    def _soften_violence(self, content: str) -> str:
        """Atenueaza descrierile de continut violent."""
        return content
 
 
class KeywordFilter:
    """Filtru bazat pe liste de cuvinte cheie."""
 
    name = "keyword_filter"
 
    def __init__(self, keyword_lists: Dict[str, List[str]]):
        self.keyword_lists = keyword_lists
        self._compile_patterns()
 
    def _compile_patterns(self):
        """Compileaza pattern-urile de cuvinte cheie pentru potrivire eficienta."""
        self.patterns = {}
        for category, keywords in self.keyword_lists.items():
            pattern = '|'.join(re.escape(kw) for kw in keywords)
            self.patterns[category] = re.compile(pattern, re.IGNORECASE)
 
    def check(self, content: str, context: Optional[Dict] = None) -> Dict:
        """Verifica continutul fata de listele de cuvinte cheie."""
        categories = []
        max_severity = 0
 
        for category, pattern in self.patterns.items():
            matches = pattern.findall(content)
            if matches:
                categories.append(ContentCategory(category))
                # Severitate bazata pe numarul de potriviri
                severity = min(len(matches) * 0.2, 1.0)
                max_severity = max(max_severity, severity)
 
        return {
            'categories': categories,
            'severity': max_severity
        }
 
 
class SemanticFilter:
    """Filtru folosind intelegere semantica."""
 
    name = "semantic_filter"
 
    def __init__(self, model_config: Optional[Dict] = None):
        self.model = self._load_model(model_config)
 
    def _load_model(self, config: Optional[Dict]):
        """Incarca modelul de clasificare semantica."""
        # Incarca clasificator bazat pe transformer
        return None  # Placeholder
 
    def check(self, content: str, context: Optional[Dict] = None) -> Dict:
        """Analiza semantica a continutului."""
 
        if not self.model:
            return {'categories': [], 'severity': 0}
 
        # Foloseste modelul pentru a clasifica continutul
        # predictions = self.model.predict(content)
 
        return {
            'categories': [],
            'severity': 0
        }

Detectia Halucinatiilor

Sistem de Verificare a Faptelor

# hallucination_detection.py
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from enum import Enum
 
class VerificationStatus(Enum):
    """Statusul verificarii faptelor."""
    VERIFIED = "verified"
    UNVERIFIED = "unverified"
    CONTRADICTED = "contradicted"
    UNCERTAIN = "uncertain"
 
@dataclass
class FactClaim:
    """O afirmatie factuala extrasa din text."""
    claim_text: str
    claim_type: str
    entities: List[str]
    confidence: float
 
@dataclass
class VerificationResult:
    """Rezultatul verificarii faptelor."""
    claim: FactClaim
    status: VerificationStatus
    evidence: List[Dict]
    confidence: float
    source_urls: List[str]
 
class HallucinationDetector:
    """Detecteaza si semnaleaza halucinatiile potentiale din output-ul LLM."""
 
    def __init__(self, config: Dict):
        self.config = config
        self.claim_extractor = ClaimExtractor()
        self.fact_verifier = FactVerifier(config.get('knowledge_base'))
        self.consistency_checker = ConsistencyChecker()
 
    def analyze(
        self,
        llm_output: str,
        context: Optional[str] = None,
        source_documents: Optional[List[str]] = None
    ) -> Dict:
        """Analizeaza output-ul LLM pentru halucinatii potentiale."""
 
        # Extrage afirmatii factuale
        claims = self.claim_extractor.extract(llm_output)
 
        # Verifica fiecare afirmatie
        verification_results = []
        for claim in claims:
            result = self.fact_verifier.verify(claim, source_documents)
            verification_results.append(result)
 
        # Verifica consistenta interna
        consistency = self.consistency_checker.check(claims, llm_output)
 
        # Calculeaza scorul general de halucinatie
        hallucination_score = self._calculate_score(
            verification_results, consistency
        )
 
        return {
            'claims': claims,
            'verifications': verification_results,
            'consistency': consistency,
            'hallucination_score': hallucination_score,
            'recommendation': self._get_recommendation(hallucination_score)
        }
 
    def _calculate_score(
        self,
        verifications: List[VerificationResult],
        consistency: Dict
    ) -> float:
        """Calculeaza scorul general de halucinatie."""
 
        if not verifications:
            return 0.0
 
        # Pondereaza diferiti factori
        contradicted = sum(
            1 for v in verifications
            if v.status == VerificationStatus.CONTRADICTED
        )
        unverified = sum(
            1 for v in verifications
            if v.status == VerificationStatus.UNVERIFIED
        )
 
        total = len(verifications)
 
        # Scor: mai mare = mai probabil halucinat
        contradiction_score = contradicted / total * 0.6
        unverified_score = unverified / total * 0.3
        inconsistency_score = (1 - consistency['score']) * 0.1
 
        return min(contradiction_score + unverified_score + inconsistency_score, 1.0)
 
    def _get_recommendation(self, score: float) -> str:
        """Obtine recomandarea pe baza scorului de halucinatie."""
 
        if score < 0.2:
            return "Output-ul pare fiabil"
        elif score < 0.4:
            return "Preocupari minore - ia in considerare verificarea"
        elif score < 0.6:
            return "Preocupari semnificative - verificare recomandata"
        elif score < 0.8:
            return "Risc ridicat de halucinatie - revizuire manuala necesara"
        else:
            return "Probabil halucinat - nu folosi fara verificare"
 
 
class ClaimExtractor:
    """Extrage afirmatii factuale din text."""
 
    def extract(self, text: str) -> List[FactClaim]:
        """Extrage afirmatii factuale din text."""
 
        claims = []
 
        # Extrage diferite tipuri de afirmatii
        claims.extend(self._extract_numerical_claims(text))
        claims.extend(self._extract_entity_claims(text))
        claims.extend(self._extract_temporal_claims(text))
        claims.extend(self._extract_causal_claims(text))
 
        return claims
 
    def _extract_numerical_claims(self, text: str) -> List[FactClaim]:
        """Extrage afirmatii cu date numerice."""
 
        import re
 
        pattern = r'(\d+(?:\.\d+)?(?:\s*%|\s*percent)?)\s*(?:of|are|is|was|were)\s*([^.]+)'
        matches = re.findall(pattern, text)
 
        claims = []
        for number, subject in matches:
            claims.append(FactClaim(
                claim_text=f"{number} {subject}",
                claim_type="numerical",
                entities=[subject.strip()],
                confidence=0.8
            ))
 
        return claims
 
    def _extract_entity_claims(self, text: str) -> List[FactClaim]:
        """Extrage afirmatii despre entitati numite."""
        # Foloseste NER pentru a identifica entitatile si relatiile lor
        return []
 
    def _extract_temporal_claims(self, text: str) -> List[FactClaim]:
        """Extrage afirmatii cu date/ore."""
        return []
 
    def _extract_causal_claims(self, text: str) -> List[FactClaim]:
        """Extrage afirmatii cauza-efect."""
        return []
 
 
class FactVerifier:
    """Verifica afirmatiile factuale fata de surse de cunostinte."""
 
    def __init__(self, knowledge_base: Optional[Dict] = None):
        self.knowledge_base = knowledge_base or {}
 
    def verify(
        self,
        claim: FactClaim,
        source_documents: Optional[List[str]] = None
    ) -> VerificationResult:
        """Verifica o afirmatie factuala."""
 
        evidence = []
 
        # Verifica fata de documentele sursa (grounding RAG)
        if source_documents:
            doc_evidence = self._check_source_documents(claim, source_documents)
            evidence.extend(doc_evidence)
 
        # Verifica fata de baza de cunostinte
        kb_evidence = self._check_knowledge_base(claim)
        evidence.extend(kb_evidence)
 
        # Determina statusul verificarii
        status, confidence = self._determine_status(evidence)
 
        return VerificationResult(
            claim=claim,
            status=status,
            evidence=evidence,
            confidence=confidence,
            source_urls=[e.get('url', '') for e in evidence if e.get('url')]
        )
 
    def _check_source_documents(
        self,
        claim: FactClaim,
        documents: List[str]
    ) -> List[Dict]:
        """Verifica afirmatia fata de documentele sursa."""
 
        evidence = []
 
        for doc in documents:
            # Foloseste similaritate semantica pentru a gasi pasaje care sustin/contrazic
            similarity = self._calculate_similarity(claim.claim_text, doc)
 
            if similarity > 0.7:
                # Verifica daca sustine sau contrazice
                relationship = self._determine_relationship(claim.claim_text, doc)
                evidence.append({
                    'source': 'source_document',
                    'text': doc[:200],
                    'similarity': similarity,
                    'relationship': relationship
                })
 
        return evidence
 
    def _check_knowledge_base(self, claim: FactClaim) -> List[Dict]:
        """Verifica afirmatia fata de baza de cunostinte."""
        return []
 
    def _calculate_similarity(self, text1: str, text2: str) -> float:
        """Calculeaza similaritatea semantica intre texte."""
        # Foloseste model de embedding pentru similaritate
        return 0.0
 
    def _determine_relationship(self, claim: str, evidence: str) -> str:
        """Determina daca dovada sustine sau contrazice afirmatia."""
        return "neutral"
 
    def _determine_status(
        self,
        evidence: List[Dict]
    ) -> Tuple[VerificationStatus, float]:
        """Determina statusul verificarii din dovezi."""
 
        if not evidence:
            return VerificationStatus.UNVERIFIED, 0.0
 
        supporting = sum(1 for e in evidence if e.get('relationship') == 'supporting')
        contradicting = sum(1 for e in evidence if e.get('relationship') == 'contradicting')
 
        total = len(evidence)
 
        if contradicting > supporting:
            return VerificationStatus.CONTRADICTED, contradicting / total
        elif supporting > contradicting:
            return VerificationStatus.VERIFIED, supporting / total
        else:
            return VerificationStatus.UNCERTAIN, 0.5
 
 
class ConsistencyChecker:
    """Verifica consistenta interna a afirmatiilor."""
 
    def check(self, claims: List[FactClaim], full_text: str) -> Dict:
        """Verifica contradictiile interne."""
 
        contradictions = []
 
        # Compara fiecare pereche de afirmatii
        for i, claim1 in enumerate(claims):
            for claim2 in claims[i+1:]:
                if self._are_contradictory(claim1, claim2):
                    contradictions.append({
                        'claim1': claim1.claim_text,
                        'claim2': claim2.claim_text,
                        'type': 'direct_contradiction'
                    })
 
        score = 1.0 - (len(contradictions) / max(len(claims), 1))
 
        return {
            'score': score,
            'contradictions': contradictions
        }
 
    def _are_contradictory(self, claim1: FactClaim, claim2: FactClaim) -> bool:
        """Verifica daca doua afirmatii se contrazic."""
        # Implementeaza logica de detectie a contradictiilor
        return False

Detectie si Filtrare PII

Implementare Filtru PII

# pii_filter.py
import re
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
 
class PIIType(Enum):
    """Tipuri de Informatii Personale Identificabile."""
    EMAIL = "email"
    PHONE = "phone"
    SSN = "ssn"
    CREDIT_CARD = "credit_card"
    ADDRESS = "address"
    NAME = "name"
    DATE_OF_BIRTH = "date_of_birth"
    IP_ADDRESS = "ip_address"
    MEDICAL_ID = "medical_id"
    PASSPORT = "passport"
    DRIVERS_LICENSE = "drivers_license"
    BANK_ACCOUNT = "bank_account"
 
@dataclass
class PIIDetection:
    """Instanta PII detectata."""
    pii_type: PIIType
    value: str
    start_position: int
    end_position: int
    confidence: float
    redacted_value: str
 
class PIIFilter:
    """Filtreaza PII din output-urile LLM."""
 
    def __init__(self, config: Optional[Dict] = None):
        self.config = config or {}
        self.patterns = self._compile_patterns()
 
    def _compile_patterns(self) -> Dict[PIIType, re.Pattern]:
        """Compileaza pattern-urile regex pentru detectia PII."""
 
        return {
            PIIType.EMAIL: re.compile(
                r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
            ),
            PIIType.PHONE: re.compile(
                r'\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b'
            ),
            PIIType.SSN: re.compile(
                r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b'
            ),
            PIIType.CREDIT_CARD: re.compile(
                r'\b(?:\d{4}[-\s]?){3}\d{4}\b'
            ),
            PIIType.IP_ADDRESS: re.compile(
                r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b'
            ),
            PIIType.DATE_OF_BIRTH: re.compile(
                r'\b(?:DOB|Date of Birth|born)[:\s]*(\d{1,2}[/-]\d{1,2}[/-]\d{2,4})\b',
                re.IGNORECASE
            ),
            PIIType.PASSPORT: re.compile(
                r'\b[A-Z]{1,2}\d{6,9}\b'
            ),
            PIIType.BANK_ACCOUNT: re.compile(
                r'\b\d{8,17}\b'  # Numerele de cont bancar variaza in functie de tara
            )
        }
 
    def detect(self, text: str) -> List[PIIDetection]:
        """Detecteaza tot PII-ul din text."""
 
        detections = []
 
        for pii_type, pattern in self.patterns.items():
            for match in pattern.finditer(text):
                detection = PIIDetection(
                    pii_type=pii_type,
                    value=match.group(),
                    start_position=match.start(),
                    end_position=match.end(),
                    confidence=self._calculate_confidence(pii_type, match.group()),
                    redacted_value=self._redact(pii_type, match.group())
                )
                detections.append(detection)
 
        # Sorteaza dupa pozitie
        detections.sort(key=lambda x: x.start_position)
 
        return detections
 
    def filter(self, text: str, redaction_style: str = "mask") -> str:
        """Filtreaza PII din text."""
 
        detections = self.detect(text)
 
        # Aplica redactarile de la sfarsit la inceput pentru a pastra pozitiile
        filtered_text = text
        for detection in reversed(detections):
            if redaction_style == "mask":
                replacement = detection.redacted_value
            elif redaction_style == "remove":
                replacement = "[REDACTAT]"
            elif redaction_style == "type_label":
                replacement = f"[{detection.pii_type.value.upper()}]"
            else:
                replacement = detection.redacted_value
 
            filtered_text = (
                filtered_text[:detection.start_position] +
                replacement +
                filtered_text[detection.end_position:]
            )
 
        return filtered_text
 
    def _calculate_confidence(self, pii_type: PIIType, value: str) -> float:
        """Calculeaza scorul de incredere pentru detectia PII."""
 
        # Incredere de baza din potrivirea pattern-ului
        confidence = 0.7
 
        # Ajusteaza pe baza tipului PII si caracteristicilor valorii
        if pii_type == PIIType.EMAIL:
            # Incredere mai mare pentru pattern-uri comune de email
            if re.match(r'.+@(gmail|yahoo|outlook|hotmail)\.com$', value, re.IGNORECASE):
                confidence = 0.95
            else:
                confidence = 0.85
 
        elif pii_type == PIIType.SSN:
            # Verifica format SSN valid
            if self._is_valid_ssn(value):
                confidence = 0.9
            else:
                confidence = 0.6
 
        elif pii_type == PIIType.CREDIT_CARD:
            # Foloseste algoritmul Luhn
            if self._luhn_check(value):
                confidence = 0.95
            else:
                confidence = 0.5
 
        return confidence
 
    def _redact(self, pii_type: PIIType, value: str) -> str:
        """Creeaza versiunea redactata a valorii PII."""
 
        if pii_type == PIIType.EMAIL:
            parts = value.split('@')
            return parts[0][:2] + '***@' + parts[1]
 
        elif pii_type == PIIType.PHONE:
            clean = re.sub(r'[^\d]', '', value)
            return '***-***-' + clean[-4:]
 
        elif pii_type == PIIType.SSN:
            return 'XXX-XX-' + value[-4:]
 
        elif pii_type == PIIType.CREDIT_CARD:
            return 'XXXX-XXXX-XXXX-' + value[-4:]
 
        elif pii_type == PIIType.IP_ADDRESS:
            parts = value.split('.')
            return f"{parts[0]}.XXX.XXX.XXX"
 
        else:
            # Redactare generica
            if len(value) > 4:
                return value[:2] + '*' * (len(value) - 4) + value[-2:]
            return '*' * len(value)
 
    def _is_valid_ssn(self, value: str) -> bool:
        """Valideaza formatul SSN."""
        clean = re.sub(r'[^\d]', '', value)
        if len(clean) != 9:
            return False
 
        # Verifica SSN-uri invalide
        invalid_prefixes = ['000', '666', '900-999']
        area = clean[:3]
 
        if area == '000' or area == '666' or int(area) >= 900:
            return False
 
        return True
 
    def _luhn_check(self, card_number: str) -> bool:
        """Valideaza cardul de credit folosind algoritmul Luhn."""
 
        clean = re.sub(r'[^\d]', '', card_number)
        if len(clean) < 13 or len(clean) > 19:
            return False
 
        digits = [int(d) for d in clean]
        checksum = 0
 
        for i, digit in enumerate(reversed(digits)):
            if i % 2 == 1:
                digit *= 2
                if digit > 9:
                    digit -= 9
            checksum += digit
 
        return checksum % 10 == 0

Aplicarea Output-ului Structurat

Validare JSON Schema

# structured_output.py
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Type
from enum import Enum
import jsonschema
 
class OutputFormat(Enum):
    """Formate de output suportate."""
    JSON = "json"
    MARKDOWN = "markdown"
    HTML = "html"
    PLAIN_TEXT = "plain_text"
 
@dataclass
class ValidationResult:
    """Rezultatul validarii output-ului."""
    valid: bool
    errors: List[str]
    fixed_output: Optional[str]
    original_output: str
 
class StructuredOutputValidator:
    """Valideaza si aplica output-uri LLM structurate."""
 
    def __init__(self):
        self.validators = {}
 
    def register_schema(self, name: str, schema: Dict):
        """Inregistreaza o schema JSON pentru validare."""
        jsonschema.Draft7Validator.check_schema(schema)
        self.validators[name] = jsonschema.Draft7Validator(schema)
 
    def validate_json(
        self,
        output: str,
        schema_name: str,
        auto_fix: bool = True
    ) -> ValidationResult:
        """Valideaza output-ul JSON fata de schema."""
 
        validator = self.validators.get(schema_name)
        if not validator:
            return ValidationResult(
                valid=False,
                errors=[f"Schema '{schema_name}' nu a fost gasita"],
                fixed_output=None,
                original_output=output
            )
 
        # Incearca sa parseze JSON
        try:
            data = json.loads(output)
        except json.JSONDecodeError as e:
            if auto_fix:
                fixed = self._attempt_json_fix(output)
                if fixed:
                    return self.validate_json(fixed, schema_name, auto_fix=False)
 
            return ValidationResult(
                valid=False,
                errors=[f"JSON invalid: {str(e)}"],
                fixed_output=None,
                original_output=output
            )
 
        # Valideaza fata de schema
        errors = list(validator.iter_errors(data))
 
        if errors:
            error_messages = [
                f"{e.path}: {e.message}" if e.path else e.message
                for e in errors
            ]
 
            if auto_fix:
                fixed_data = self._attempt_schema_fix(data, errors, validator.schema)
                if fixed_data:
                    fixed_output = json.dumps(fixed_data, indent=2)
                    # Re-valideaza
                    new_errors = list(validator.iter_errors(fixed_data))
                    if not new_errors:
                        return ValidationResult(
                            valid=True,
                            errors=[],
                            fixed_output=fixed_output,
                            original_output=output
                        )
 
            return ValidationResult(
                valid=False,
                errors=error_messages,
                fixed_output=None,
                original_output=output
            )
 
        return ValidationResult(
            valid=True,
            errors=[],
            fixed_output=None,
            original_output=output
        )
 
    def _attempt_json_fix(self, output: str) -> Optional[str]:
        """Incearca sa repare probleme comune JSON."""
 
        fixed = output
 
        # Elimina blocuri de cod markdown
        if fixed.startswith('```'):
            lines = fixed.split('\n')
            if lines[0].startswith('```'):
                lines = lines[1:]
            if lines and lines[-1].strip() == '```':
                lines = lines[:-1]
            fixed = '\n'.join(lines)
 
        # Repara virgule finale
        fixed = re.sub(r',\s*}', '}', fixed)
        fixed = re.sub(r',\s*]', ']', fixed)
 
        # Repara ghilimele simple
        fixed = fixed.replace("'", '"')
 
        # Incearca sa parseze
        try:
            json.loads(fixed)
            return fixed
        except json.JSONDecodeError:
            return None
 
    def _attempt_schema_fix(
        self,
        data: Dict,
        errors: List,
        schema: Dict
    ) -> Optional[Dict]:
        """Incearca sa repare erorile de validare a schemei."""
 
        fixed_data = data.copy()
 
        for error in errors:
            # Gestioneaza proprietati obligatorii lipsa
            if error.validator == 'required':
                for prop in error.validator_value:
                    if prop not in fixed_data:
                        # Adauga valoare implicita bazata pe schema
                        prop_schema = schema.get('properties', {}).get(prop, {})
                        fixed_data[prop] = self._get_default_value(prop_schema)
 
            # Gestioneaza erori de tip
            elif error.validator == 'type':
                path = list(error.path)
                if path:
                    current = fixed_data
                    for key in path[:-1]:
                        current = current[key]
                    current[path[-1]] = self._coerce_type(
                        current[path[-1]],
                        error.validator_value
                    )
 
        return fixed_data
 
    def _get_default_value(self, schema: Dict) -> Any:
        """Obtine valoarea implicita pentru tipul din schema."""
 
        if 'default' in schema:
            return schema['default']
 
        type_defaults = {
            'string': '',
            'number': 0,
            'integer': 0,
            'boolean': False,
            'array': [],
            'object': {}
        }
 
        return type_defaults.get(schema.get('type'), None)
 
    def _coerce_type(self, value: Any, target_type: str) -> Any:
        """Incearca sa converteasca valoarea la tipul tinta."""
 
        try:
            if target_type == 'string':
                return str(value)
            elif target_type == 'number':
                return float(value)
            elif target_type == 'integer':
                return int(value)
            elif target_type == 'boolean':
                return bool(value)
            elif target_type == 'array' and not isinstance(value, list):
                return [value]
        except (ValueError, TypeError):
            pass
 
        return value
 
 
# Exemplu de utilizare
validator = StructuredOutputValidator()
 
# Inregistreaza schema pentru recomandari de produse
validator.register_schema('product_recommendation', {
    "type": "object",
    "required": ["products", "reasoning"],
    "properties": {
        "products": {
            "type": "array",
            "items": {
                "type": "object",
                "required": ["name", "price", "relevance_score"],
                "properties": {
                    "name": {"type": "string"},
                    "price": {"type": "number", "minimum": 0},
                    "relevance_score": {"type": "number", "minimum": 0, "maximum": 1}
                }
            }
        },
        "reasoning": {"type": "string"},
        "confidence": {"type": "number", "minimum": 0, "maximum": 1}
    }
})

Pipeline Complet de Validare Output

# output_pipeline.py
from dataclasses import dataclass
from typing import Dict, List, Optional
 
@dataclass
class PipelineResult:
    """Rezultatul pipeline-ului complet de validare."""
    original_output: str
    final_output: str
    passed: bool
    moderation: Dict
    hallucination: Dict
    pii: Dict
    structure: Dict
    applied_fixes: List[str]
 
class OutputValidationPipeline:
    """Pipeline complet de validare a output-ului."""
 
    def __init__(self, config: Dict):
        self.moderator = ContentModerator(config.get('moderation', {}))
        self.hallucination_detector = HallucinationDetector(config.get('hallucination', {}))
        self.pii_filter = PIIFilter(config.get('pii', {}))
        self.structure_validator = StructuredOutputValidator()
 
        # Inregistreaza scheme
        for name, schema in config.get('schemas', {}).items():
            self.structure_validator.register_schema(name, schema)
 
    def process(
        self,
        output: str,
        context: Optional[Dict] = None,
        source_documents: Optional[List[str]] = None,
        expected_schema: Optional[str] = None
    ) -> PipelineResult:
        """Proceseaza output-ul prin pipeline-ul complet de validare."""
 
        applied_fixes = []
        current_output = output
 
        # Pasul 1: Moderare continut
        moderation_result = self.moderator.moderate(current_output, context)
 
        if moderation_result.action == ModerationAction.BLOCK:
            return PipelineResult(
                original_output=output,
                final_output="[Continut blocat din cauza incalcarii politicii]",
                passed=False,
                moderation=moderation_result.__dict__,
                hallucination={},
                pii={},
                structure={},
                applied_fixes=[]
            )
 
        if moderation_result.filtered_content:
            current_output = moderation_result.filtered_content
            applied_fixes.append("content_moderation")
 
        # Pasul 2: Filtrare PII
        pii_detections = self.pii_filter.detect(current_output)
        if pii_detections:
            current_output = self.pii_filter.filter(current_output)
            applied_fixes.append("pii_filtering")
 
        # Pasul 3: Detectie halucinatii
        hallucination_result = self.hallucination_detector.analyze(
            current_output,
            context.get('prompt') if context else None,
            source_documents
        )
 
        # Pasul 4: Validare structura (daca schema e specificata)
        structure_result = {}
        if expected_schema:
            validation = self.structure_validator.validate_json(
                current_output,
                expected_schema
            )
            structure_result = validation.__dict__
 
            if validation.fixed_output:
                current_output = validation.fixed_output
                applied_fixes.append("structure_fix")
 
        # Determina daca output-ul a trecut
        passed = (
            moderation_result.action in [ModerationAction.ALLOW, ModerationAction.WARN] and
            hallucination_result['hallucination_score'] < 0.6 and
            (not expected_schema or structure_result.get('valid', True))
        )
 
        return PipelineResult(
            original_output=output,
            final_output=current_output,
            passed=passed,
            moderation=moderation_result.__dict__,
            hallucination=hallucination_result,
            pii={'detections': [d.__dict__ for d in pii_detections]},
            structure=structure_result,
            applied_fixes=applied_fixes
        )

Concluzie

Validarea robusta a output-ului LLM necesita mai multe niveluri de aparare:

  1. Moderare Continut - Filtreaza continut daunator sau nepotrivit
  2. Detectie Halucinatii - Verifica afirmatiile factuale fata de surse
  3. Filtrare PII - Protejeaza informatiile personale sensibile
  4. Aplicare Structura - Asigura ca output-urile corespund schemelor asteptate

Implementarea acestor masuri de siguranta garanteaza ca aplicatiile tale AI produc output-uri fiabile, sigure si de incredere pentru utilizatorii finali.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.