AI Security

Securizarea chatbotilor AI: ghid enterprise

Nicu Constantin
--10 min lectura
#AI chatbot#security#enterprise AI#input validation#abuse prevention

Securizarea chatbot-urilor AI: Ghid de implementare pentru deploymenturi enterprise

Chatbot-urile AI sunt tot mai des deployate in aplicatii orientate spre clienti si in medii enterprise, ceea ce face securitatea critica. Acest ghid acopera masuri cuprinzatoare de securitate pentru implementarile de chatbot.

Securitatea input-ului

Validare multi-strat a input-ului

# input_validation.py
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
import re
from enum import Enum
 
class ValidationResult(Enum):
    PASS = "pass"
    SANITIZED = "sanitized"
    BLOCKED = "blocked"
 
@dataclass
class InputValidationResult:
    """Result of input validation."""
    status: ValidationResult
    original_input: str
    sanitized_input: Optional[str]
    blocked_patterns: List[str]
    warnings: List[str]
 
class ChatbotInputValidator:
    """Multi-layer input validation for chatbot security."""
 
    def __init__(self, config: Dict):
        self.config = config
        self.max_length = config.get('max_length', 4000)
        self.blocked_patterns = self._compile_blocked_patterns()
        self.sanitization_rules = self._load_sanitization_rules()
 
    def _compile_blocked_patterns(self) -> List[Tuple[re.Pattern, str]]:
        """Compile patterns that should be blocked."""
 
        patterns = [
            # Prompt injection attempts
            (r'ignore\s+(all\s+)?previous\s+instructions?', 'prompt_injection'),
            (r'you\s+are\s+now\s+in\s+["\']?(\w+)["\']?\s+mode', 'mode_switch_attempt'),
            (r'pretend\s+(you\s+are|to\s+be)\s+a', 'role_override'),
            (r'act\s+as\s+(if\s+you\s+(are|were)\s+)?a', 'role_override'),
            (r'system\s*:\s*', 'system_prompt_injection'),
            (r'\[INST\]|\[\/INST\]', 'instruction_tag_injection'),
            (r'<\|im_start\|>|<\|im_end\|>', 'special_token_injection'),
 
            # Code injection
            (r'<script[^>]*>.*?</script>', 'xss_attempt'),
            (r'javascript:', 'javascript_injection'),
            (r'on\w+\s*=', 'event_handler_injection'),
 
            # SQL injection patterns
            (r"'\s*or\s*'.*?'\s*=\s*'", 'sql_injection'),
            (r';\s*drop\s+table', 'sql_injection'),
            (r'union\s+select', 'sql_injection'),
 
            # Dangerous commands
            (r'sudo\s+', 'command_injection'),
            (r'rm\s+-rf', 'command_injection'),
            (r'\|\s*bash', 'command_injection'),
        ]
 
        return [
            (re.compile(pattern, re.IGNORECASE), name)
            for pattern, name in patterns
        ]
 
    def _load_sanitization_rules(self) -> List[Dict]:
        """Load sanitization rules."""
 
        return [
            {
                'pattern': r'<[^>]+>',
                'replacement': '',
                'description': 'Remove HTML tags'
            },
            {
                'pattern': r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]',
                'replacement': '',
                'description': 'Remove control characters'
            },
            {
                'pattern': r'\s{3,}',
                'replacement': '  ',
                'description': 'Normalize whitespace'
            }
        ]
 
    def validate(self, user_input: str) -> InputValidationResult:
        """Validate user input through all security layers."""
 
        blocked_patterns = []
        warnings = []
 
        # Length check
        if len(user_input) > self.max_length:
            return InputValidationResult(
                status=ValidationResult.BLOCKED,
                original_input=user_input[:100] + '...',
                sanitized_input=None,
                blocked_patterns=['length_exceeded'],
                warnings=[f'Input exceeds maximum length of {self.max_length}']
            )
 
        # Check for blocked patterns
        for pattern, pattern_name in self.blocked_patterns:
            if pattern.search(user_input):
                blocked_patterns.append(pattern_name)
 
        if blocked_patterns:
            return InputValidationResult(
                status=ValidationResult.BLOCKED,
                original_input=user_input,
                sanitized_input=None,
                blocked_patterns=blocked_patterns,
                warnings=['Potentially malicious content detected']
            )
 
        # Sanitize input
        sanitized = user_input
        for rule in self.sanitization_rules:
            original = sanitized
            sanitized = re.sub(rule['pattern'], rule['replacement'], sanitized)
            if original != sanitized:
                warnings.append(f"Applied: {rule['description']}")
 
        # Check for suspicious patterns (warn but don't block)
        suspicious_patterns = [
            (r'password|secret|api.?key|token', 'sensitive_data_request'),
            (r'hack|exploit|bypass', 'security_related_terms'),
            (r'(\b\w+\b)(\s+\1){3,}', 'repetitive_content')
        ]
 
        for pattern, warning_type in suspicious_patterns:
            if re.search(pattern, sanitized, re.IGNORECASE):
                warnings.append(f'Suspicious pattern: {warning_type}')
 
        status = ValidationResult.SANITIZED if sanitized != user_input else ValidationResult.PASS
 
        return InputValidationResult(
            status=status,
            original_input=user_input,
            sanitized_input=sanitized,
            blocked_patterns=[],
            warnings=warnings
        )
 
 
class ConversationContextValidator:
    """Validate conversation context and history."""
 
    def __init__(self, max_history: int = 20):
        self.max_history = max_history
 
    def validate_context(
        self,
        conversation_history: List[Dict],
        system_prompt: str
    ) -> Dict:
        """Validate conversation context for anomalies."""
 
        issues = []
 
        # Check history length
        if len(conversation_history) > self.max_history:
            issues.append({
                'type': 'history_length',
                'severity': 'warning',
                'message': f'Conversation exceeds {self.max_history} messages'
            })
 
        # Check for system prompt tampering attempts in history
        for i, message in enumerate(conversation_history):
            if message.get('role') == 'user':
                content = message.get('content', '')
 
                # Check if user is trying to inject system-like content
                if any(marker in content.lower() for marker in ['system:', '[system]', '<system>']):
                    issues.append({
                        'type': 'system_injection_attempt',
                        'severity': 'critical',
                        'message': f'System prompt injection attempt in message {i}'
                    })
 
        # Check for role confusion
        roles = [m.get('role') for m in conversation_history]
        if roles and roles[0] != 'user':
            issues.append({
                'type': 'role_sequence',
                'severity': 'warning',
                'message': 'Conversation should start with user message'
            })
 
        # Check for consecutive same-role messages
        for i in range(1, len(roles)):
            if roles[i] == roles[i-1] == 'user':
                issues.append({
                    'type': 'consecutive_user_messages',
                    'severity': 'info',
                    'message': f'Consecutive user messages at position {i}'
                })
 
        return {
            'valid': not any(i['severity'] == 'critical' for i in issues),
            'issues': issues
        }

Securitatea output-ului

Filtrarea raspunsurilor

# output_security.py
from dataclasses import dataclass
from typing import List, Dict, Optional
import re
 
@dataclass
class OutputFilterResult:
    """Result of output filtering."""
    original_output: str
    filtered_output: str
    filters_applied: List[str]
    blocked: bool
    block_reason: Optional[str]
 
class ChatbotOutputFilter:
    """Filter and sanitize chatbot outputs."""
 
    def __init__(self, config: Dict):
        self.config = config
        self.pii_patterns = self._compile_pii_patterns()
        self.blocked_content = self._load_blocked_content()
 
    def _compile_pii_patterns(self) -> Dict[str, re.Pattern]:
        """Compile PII detection patterns."""
 
        return {
            'email': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
            'phone': re.compile(r'\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b'),
            'ssn': re.compile(r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b'),
            'credit_card': re.compile(r'\b(?:\d{4}[-\s]?){3}\d{4}\b'),
            'ip_address': re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b')
        }
 
    def _load_blocked_content(self) -> List[re.Pattern]:
        """Load content that should be blocked from output."""
 
        patterns = [
            # Dangerous instructions
            r'how\s+to\s+(make|build|create)\s+(a\s+)?(bomb|weapon|explosive)',
            r'instructions\s+for\s+(illegal|dangerous)',
            # Internal system information
            r'(my|the)\s+system\s+prompt\s+is',
            r'i\s+was\s+instructed\s+to',
            r'my\s+instructions\s+(say|are)',
            # Credential leakage
            r'(api[_-]?key|secret|password)\s*[:=]\s*\S+',
        ]
 
        return [re.compile(p, re.IGNORECASE) for p in patterns]
 
    def filter_output(self, output: str) -> OutputFilterResult:
        """Filter chatbot output through all security layers."""
 
        filters_applied = []
        filtered = output
 
        # Check for blocked content
        for pattern in self.blocked_content:
            if pattern.search(filtered):
                return OutputFilterResult(
                    original_output=output,
                    filtered_output='',
                    filters_applied=['blocked_content'],
                    blocked=True,
                    block_reason='Response contains prohibited content'
                )
 
        # Filter PII
        for pii_type, pattern in self.pii_patterns.items():
            matches = pattern.findall(filtered)
            if matches:
                filters_applied.append(f'pii_{pii_type}')
                filtered = pattern.sub(f'[{pii_type.upper()}_REDACTED]', filtered)
 
        # Filter potential code execution
        code_patterns = [
            (r'```(?:bash|sh|shell)\n.*?```', 'executable_code'),
            (r'`[^`]*(?:rm|sudo|chmod|wget|curl)[^`]*`', 'dangerous_command')
        ]
 
        for pattern, filter_name in code_patterns:
            if re.search(pattern, filtered, re.DOTALL | re.IGNORECASE):
                filters_applied.append(filter_name)
                filtered = re.sub(
                    pattern,
                    '[CODE_REMOVED_FOR_SECURITY]',
                    filtered,
                    flags=re.DOTALL | re.IGNORECASE
                )
 
        # Sanitize URLs
        url_pattern = r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+[^\s]*'
        urls = re.findall(url_pattern, filtered)
 
        for url in urls:
            if not self._is_safe_url(url):
                filters_applied.append('unsafe_url')
                filtered = filtered.replace(url, '[URL_REMOVED]')
 
        return OutputFilterResult(
            original_output=output,
            filtered_output=filtered,
            filters_applied=filters_applied,
            blocked=False,
            block_reason=None
        )
 
    def _is_safe_url(self, url: str) -> bool:
        """Check if URL is safe to include in output."""
 
        # Allowed domains
        allowed_domains = self.config.get('allowed_domains', [
            'docs.example.com',
            'support.example.com',
            'help.example.com'
        ])
 
        from urllib.parse import urlparse
        parsed = urlparse(url)
 
        # Check domain
        if parsed.netloc not in allowed_domains:
            return False
 
        # Check for suspicious patterns
        suspicious = ['javascript:', 'data:', 'file:']
        if any(s in url.lower() for s in suspicious):
            return False
 
        return True

Securitatea sesiunii si conversatiei

Gestionare securizata a sesiunilor

# session_security.py
import secrets
import hashlib
from datetime import datetime, timedelta
from typing import Dict, Optional, List
from dataclasses import dataclass, field
 
@dataclass
class ChatSession:
    """Secure chat session."""
    session_id: str
    user_id: str
    created_at: datetime
    last_activity: datetime
    message_count: int
    ip_address: str
    user_agent: str
    metadata: Dict = field(default_factory=dict)
 
class SessionManager:
    """Manage chat sessions securely."""
 
    def __init__(
        self,
        session_timeout: int = 3600,
        max_messages_per_session: int = 100
    ):
        self.session_timeout = session_timeout
        self.max_messages = max_messages_per_session
        self.sessions: Dict[str, ChatSession] = {}
        self.user_sessions: Dict[str, List[str]] = {}
 
    def create_session(
        self,
        user_id: str,
        ip_address: str,
        user_agent: str
    ) -> ChatSession:
        """Create new chat session."""
 
        session_id = self._generate_session_id()
 
        session = ChatSession(
            session_id=session_id,
            user_id=user_id,
            created_at=datetime.utcnow(),
            last_activity=datetime.utcnow(),
            message_count=0,
            ip_address=ip_address,
            user_agent=user_agent
        )
 
        self.sessions[session_id] = session
 
        # Track user sessions
        if user_id not in self.user_sessions:
            self.user_sessions[user_id] = []
        self.user_sessions[user_id].append(session_id)
 
        # Limit concurrent sessions per user
        self._enforce_session_limit(user_id, max_sessions=5)
 
        return session
 
    def validate_session(
        self,
        session_id: str,
        ip_address: str,
        user_agent: str
    ) -> Dict:
        """Validate session is active and legitimate."""
 
        session = self.sessions.get(session_id)
 
        if not session:
            return {'valid': False, 'reason': 'Session not found'}
 
        # Check timeout
        if datetime.utcnow() - session.last_activity > timedelta(seconds=self.session_timeout):
            self._terminate_session(session_id)
            return {'valid': False, 'reason': 'Session expired'}
 
        # Check IP consistency (optional, may cause issues with mobile)
        if session.ip_address != ip_address:
            # Log suspicious activity but don't necessarily block
            self._log_suspicious_activity(session, 'ip_change', {
                'old_ip': session.ip_address,
                'new_ip': ip_address
            })
 
        # Check message limit
        if session.message_count >= self.max_messages:
            return {'valid': False, 'reason': 'Message limit exceeded'}
 
        return {'valid': True, 'session': session}
 
    def record_message(self, session_id: str) -> bool:
        """Record message in session."""
 
        session = self.sessions.get(session_id)
        if not session:
            return False
 
        session.message_count += 1
        session.last_activity = datetime.utcnow()
 
        return True
 
    def _generate_session_id(self) -> str:
        """Generate cryptographically secure session ID."""
        return secrets.token_urlsafe(32)
 
    def _enforce_session_limit(self, user_id: str, max_sessions: int):
        """Terminate oldest sessions if user exceeds limit."""
 
        user_session_ids = self.user_sessions.get(user_id, [])
 
        if len(user_session_ids) > max_sessions:
            # Sort by last activity and terminate oldest
            sessions_with_activity = [
                (sid, self.sessions[sid].last_activity)
                for sid in user_session_ids
                if sid in self.sessions
            ]
            sessions_with_activity.sort(key=lambda x: x[1])
 
            # Terminate excess sessions
            for sid, _ in sessions_with_activity[:-max_sessions]:
                self._terminate_session(sid)
 
    def _terminate_session(self, session_id: str):
        """Terminate a session."""
 
        session = self.sessions.pop(session_id, None)
        if session:
            user_sessions = self.user_sessions.get(session.user_id, [])
            if session_id in user_sessions:
                user_sessions.remove(session_id)
 
    def _log_suspicious_activity(
        self,
        session: ChatSession,
        activity_type: str,
        details: Dict
    ):
        """Log suspicious session activity."""
        # Implement logging to security monitoring system
        pass

Rate limiting si prevenirea abuzului

Rate limiting inteligent

# rate_limiting.py
from dataclasses import dataclass
from typing import Dict, Optional
from datetime import datetime, timedelta
from collections import defaultdict
import time
 
@dataclass
class RateLimitResult:
    """Result of rate limit check."""
    allowed: bool
    remaining: int
    reset_at: datetime
    retry_after: Optional[int]
    limit_type: str
 
class AdaptiveRateLimiter:
    """Adaptive rate limiting for chatbot API."""
 
    def __init__(self):
        self.user_requests: Dict[str, list] = defaultdict(list)
        self.user_scores: Dict[str, float] = defaultdict(lambda: 1.0)
        self.blocked_users: Dict[str, datetime] = {}
 
        # Base limits
        self.base_limits = {
            'messages_per_minute': 10,
            'messages_per_hour': 100,
            'messages_per_day': 500,
            'tokens_per_minute': 10000,
            'tokens_per_day': 100000
        }
 
    def check_rate_limit(
        self,
        user_id: str,
        token_count: int = 0
    ) -> RateLimitResult:
        """Check if request is within rate limits."""
 
        now = datetime.utcnow()
 
        # Check if user is blocked
        if user_id in self.blocked_users:
            block_until = self.blocked_users[user_id]
            if now < block_until:
                return RateLimitResult(
                    allowed=False,
                    remaining=0,
                    reset_at=block_until,
                    retry_after=int((block_until - now).total_seconds()),
                    limit_type='blocked'
                )
            else:
                del self.blocked_users[user_id]
 
        # Get user's trust score
        trust_score = self.user_scores[user_id]
 
        # Adjust limits based on trust score
        adjusted_limits = {
            k: int(v * trust_score)
            for k, v in self.base_limits.items()
        }
 
        # Clean old requests
        self._clean_old_requests(user_id)
 
        # Check minute limit
        minute_ago = now - timedelta(minutes=1)
        minute_requests = [r for r in self.user_requests[user_id] if r['time'] > minute_ago]
 
        if len(minute_requests) >= adjusted_limits['messages_per_minute']:
            return RateLimitResult(
                allowed=False,
                remaining=0,
                reset_at=minute_requests[0]['time'] + timedelta(minutes=1),
                retry_after=60,
                limit_type='minute'
            )
 
        # Check hour limit
        hour_ago = now - timedelta(hours=1)
        hour_requests = [r for r in self.user_requests[user_id] if r['time'] > hour_ago]
 
        if len(hour_requests) >= adjusted_limits['messages_per_hour']:
            return RateLimitResult(
                allowed=False,
                remaining=0,
                reset_at=hour_requests[0]['time'] + timedelta(hours=1),
                retry_after=3600,
                limit_type='hour'
            )
 
        # Check token limits
        minute_tokens = sum(r['tokens'] for r in minute_requests)
        if minute_tokens + token_count > adjusted_limits['tokens_per_minute']:
            return RateLimitResult(
                allowed=False,
                remaining=adjusted_limits['tokens_per_minute'] - minute_tokens,
                reset_at=minute_requests[0]['time'] + timedelta(minutes=1),
                retry_after=60,
                limit_type='tokens_per_minute'
            )
 
        # Record request
        self.user_requests[user_id].append({
            'time': now,
            'tokens': token_count
        })
 
        remaining = adjusted_limits['messages_per_minute'] - len(minute_requests) - 1
 
        return RateLimitResult(
            allowed=True,
            remaining=remaining,
            reset_at=now + timedelta(minutes=1),
            retry_after=None,
            limit_type='none'
        )
 
    def adjust_trust_score(
        self,
        user_id: str,
        adjustment: float,
        reason: str
    ):
        """Adjust user's trust score based on behavior."""
 
        current = self.user_scores[user_id]
        new_score = max(0.1, min(2.0, current + adjustment))
        self.user_scores[user_id] = new_score
 
        # Block user if score drops too low
        if new_score < 0.3:
            self.blocked_users[user_id] = datetime.utcnow() + timedelta(hours=1)
 
    def _clean_old_requests(self, user_id: str):
        """Remove requests older than 24 hours."""
        day_ago = datetime.utcnow() - timedelta(days=1)
        self.user_requests[user_id] = [
            r for r in self.user_requests[user_id]
            if r['time'] > day_ago
        ]
 
 
class AbuseDetector:
    """Detect and prevent chatbot abuse."""
 
    def __init__(self):
        self.user_patterns: Dict[str, list] = defaultdict(list)
        self.abuse_thresholds = {
            'repetitive_messages': 5,
            'rapid_fire': 10,
            'prompt_injection_attempts': 3
        }
 
    def analyze_behavior(
        self,
        user_id: str,
        message: str,
        timestamp: datetime
    ) -> Dict:
        """Analyze user behavior for abuse patterns."""
 
        abuse_indicators = []
 
        # Get recent activity
        recent = self.user_patterns[user_id][-50:]
 
        # Check for repetitive messages
        if recent:
            similar_count = sum(
                1 for m in recent[-10:]
                if self._similarity(m['message'], message) > 0.8
            )
            if similar_count >= self.abuse_thresholds['repetitive_messages']:
                abuse_indicators.append({
                    'type': 'repetitive_messages',
                    'severity': 'medium',
                    'count': similar_count
                })
 
        # Check for rapid-fire messages
        if len(recent) >= 2:
            recent_times = [m['time'] for m in recent[-10:]]
            if all(
                (recent_times[i+1] - recent_times[i]).total_seconds() < 2
                for i in range(len(recent_times)-1)
            ) and len(recent_times) >= self.abuse_thresholds['rapid_fire']:
                abuse_indicators.append({
                    'type': 'rapid_fire',
                    'severity': 'high'
                })
 
        # Store current message
        self.user_patterns[user_id].append({
            'message': message,
            'time': timestamp
        })
 
        # Trim history
        if len(self.user_patterns[user_id]) > 100:
            self.user_patterns[user_id] = self.user_patterns[user_id][-100:]
 
        return {
            'is_abuse': len(abuse_indicators) > 0,
            'indicators': abuse_indicators,
            'recommended_action': self._recommend_action(abuse_indicators)
        }
 
    def _similarity(self, msg1: str, msg2: str) -> float:
        """Calculate similarity between two messages."""
 
        # Simple Jaccard similarity
        words1 = set(msg1.lower().split())
        words2 = set(msg2.lower().split())
 
        if not words1 or not words2:
            return 0
 
        intersection = words1.intersection(words2)
        union = words1.union(words2)
 
        return len(intersection) / len(union)
 
    def _recommend_action(self, indicators: List[Dict]) -> str:
        """Recommend action based on abuse indicators."""
 
        if not indicators:
            return 'none'
 
        severities = [i['severity'] for i in indicators]
 
        if 'high' in severities:
            return 'temporary_block'
        elif severities.count('medium') >= 2:
            return 'rate_limit_decrease'
        else:
            return 'warning'

Concluzie

Securizarea chatbot-urilor AI necesita masuri cuprinzatoare pe mai multe straturi:

  1. Validarea input-ului - Blocheaza tentativele de injectie si sanitizeaza input-ul utilizatorului
  2. Filtrarea output-ului - Previne scurgerile de date si continutul daunator
  3. Securitatea sesiunii - Gestioneaza sesiunile in mod securizat cu validare corespunzatoare
  4. Rate limiting - Previne abuzul cu rate limiting adaptiv
  5. Analiza comportamentului - Detecteaza si raspunde la pattern-uri de abuz

Prin implementarea acestor masuri de securitate, poti deploya chatbot-uri AI in siguranta in medii enterprise, mentinand in acelasi timp o experienta pozitiva pentru utilizator.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.