AI Security

Securitate agenti LLM: riscuri AI autonom

Nicu Constantin
--11 min lectura
#ai-agents#llm-security#tool-use#autonomous-ai#risk-management

Securitatea Agentilor LLM: Intelegerea si Atenuarea Riscurilor AI Autonome

Agentii LLM reprezinta o schimbare de paradigma de la sisteme simple de intrebare-raspuns la entitati autonome care pot planifica, folosi unelte si lua actiuni in lumea reala. Desi aceasta capabilitate debloca cazuri de utilizare puternice, introduce si riscuri semnificative de securitate care necesita atentie speciala.

Acest ghid examineaza provocarile unice de securitate ale agentilor LLM si ofera strategii pentru deployment-ul in siguranta.

Ce face agentii diferiti

Aplicatiile LLM traditionale sunt in esenta transformatoare fara stare: input-ul intra, output-ul iese. Agentii sunt fundamental diferiti:

Traditional LLM:
User Query → LLM → Response → Done

LLM Agent:
User Query → LLM → Plan → Tool Call → Observe → Plan → Tool Call → ... → Response
                    ↑__________________________________|
                              (Autonomous Loop)

Bucla agentului introduce cateva caracteristici critice din punct de vedere al securitatii:

  1. Luarea deciziilor autonome - LLM-ul decide ce actiuni sa intreprinda
  2. Efecte in lumea reala - Uneltele pot modifica baze de date, trimite emailuri, apela API-uri
  3. Executie in mai multi pasi - Erorile sau atacurile se pot acumula
  4. Comportament orientat spre obiective - Agentii urmaresc obiective, uneori in mod neasteptat

Modelul de amenintari al agentilor

Categorii de amenintari

class AgentThreatModel:
    """Security threat model for LLM agents."""
 
    threats = {
        'prompt_injection_amplification': {
            'description': '''Prompt injection attacks are amplified because
                            the agent can take actions based on injected instructions''',
            'example': 'Injected prompt causes agent to email sensitive data',
            'severity': 'critical',
            'mitigations': [
                'Strict input sanitization',
                'Action confirmation for sensitive operations',
                'Output filtering before tool use',
            ]
        },
 
        'confused_deputy': {
            'description': '''Agent uses its authorized access to perform
                            actions on behalf of an attacker''',
            'example': 'User tricks agent into accessing another user\'s data',
            'severity': 'critical',
            'mitigations': [
                'Per-request authorization checks',
                'User context isolation',
                'Action scope limitations',
            ]
        },
 
        'goal_hijacking': {
            'description': '''Attacker modifies the agent\'s objective
                            mid-execution through injected content''',
            'example': 'Document being processed contains instructions to change task',
            'severity': 'high',
            'mitigations': [
                'Goal immutability after initialization',
                'Content isolation during processing',
                'Behavioral consistency checks',
            ]
        },
 
        'tool_misuse': {
            'description': '''Agent uses tools in unintended or harmful ways''',
            'example': 'Agent executes arbitrary shell commands',
            'severity': 'critical',
            'mitigations': [
                'Tool capability restrictions',
                'Parameter validation',
                'Sandboxed execution',
            ]
        },
 
        'resource_exhaustion': {
            'description': '''Agent enters infinite loop or consumes
                            excessive resources''',
            'example': 'Agent keeps retrying failed operation indefinitely',
            'severity': 'high',
            'mitigations': [
                'Iteration limits',
                'Timeout enforcement',
                'Cost tracking and limits',
            ]
        },
 
        'information_leakage': {
            'description': '''Agent inadvertently exposes sensitive information
                            through tool outputs or reasoning''',
            'example': 'Agent includes API keys in error messages',
            'severity': 'high',
            'mitigations': [
                'Output sanitization',
                'Secrets masking',
                'Reasoning trace filtering',
            ]
        },
 
        'capability_escalation': {
            'description': '''Agent acquires capabilities beyond intended scope''',
            'example': 'Agent installs additional tools or modifies its own prompts',
            'severity': 'critical',
            'mitigations': [
                'Immutable capability set',
                'Self-modification prevention',
                'Capability audit logging',
            ]
        },
    }

Arhitectura securizata a agentilor

Principii fundamentale de securitate

from abc import ABC, abstractmethod
from typing import Any, List, Optional
from dataclasses import dataclass
from enum import Enum
 
class ActionRiskLevel(Enum):
    LOW = "low"           # Read-only, no side effects
    MEDIUM = "medium"     # Limited side effects, reversible
    HIGH = "high"         # Significant side effects
    CRITICAL = "critical" # Irreversible, high impact
 
@dataclass
class ToolAction:
    tool_name: str
    parameters: dict
    risk_level: ActionRiskLevel
    requires_confirmation: bool
    reversible: bool
 
@dataclass
class ExecutionContext:
    user_id: str
    session_id: str
    permissions: set
    resource_limits: dict
    execution_budget: dict
 
class SecureAgentFramework:
    """Framework for building secure LLM agents."""
 
    def __init__(self, config: dict):
        self.llm = config['llm']
        self.tools = SecureToolRegistry(config['tools'])
        self.policy_engine = PolicyEngine(config['policies'])
        self.execution_monitor = ExecutionMonitor(config['monitoring'])
        self.confirmation_handler = ConfirmationHandler()
 
    async def run(self, task: str, context: ExecutionContext) -> dict:
        """Execute agent task with security controls."""
 
        # Initialize execution tracking
        execution_id = self.execution_monitor.start_execution(context)
 
        try:
            # Parse and validate initial task
            validated_task = self._validate_task(task, context)
 
            # Initialize agent state
            state = AgentState(
                task=validated_task,
                context=context,
                iteration=0,
                actions_taken=[],
                budget_used={'tokens': 0, 'api_calls': 0, 'time_seconds': 0}
            )
 
            # Main agent loop with security controls
            while not state.is_complete():
                # Check execution limits
                if not self._check_limits(state, context):
                    return self._create_limit_exceeded_response(state)
 
                # Get next action from LLM
                action = await self._get_next_action(state)
 
                if action is None:
                    break
 
                # Security checks before execution
                security_check = self._pre_execution_check(action, state, context)
 
                if not security_check['allowed']:
                    self.execution_monitor.log_blocked_action(
                        execution_id, action, security_check['reason']
                    )
 
                    if security_check['fatal']:
                        return self._create_security_violation_response(
                            action, security_check
                        )
 
                    # Inform agent action was blocked
                    state.add_observation({
                        'type': 'action_blocked',
                        'reason': security_check['user_message']
                    })
                    continue
 
                # Handle confirmation requirements
                if action.requires_confirmation:
                    confirmed = await self.confirmation_handler.request_confirmation(
                        action, context
                    )
                    if not confirmed:
                        state.add_observation({
                            'type': 'action_cancelled',
                            'reason': 'User declined confirmation'
                        })
                        continue
 
                # Execute action with monitoring
                result = await self._execute_action(action, state, context)
 
                # Update state
                state.add_action_result(action, result)
                state.iteration += 1
 
            return self._create_success_response(state)
 
        except Exception as e:
            self.execution_monitor.log_error(execution_id, e)
            raise
 
        finally:
            self.execution_monitor.end_execution(execution_id)
 
    def _pre_execution_check(self, action: ToolAction,
                            state: AgentState,
                            context: ExecutionContext) -> dict:
        """Comprehensive pre-execution security check."""
 
        checks = []
 
        # Check 1: Tool is registered and allowed
        if action.tool_name not in self.tools.allowed_tools(context.permissions):
            return {
                'allowed': False,
                'fatal': True,
                'reason': 'tool_not_allowed',
                'user_message': f'Tool {action.tool_name} is not available'
            }
 
        # Check 2: Parameters are valid and safe
        param_check = self.tools.validate_parameters(
            action.tool_name, action.parameters
        )
        if not param_check['valid']:
            return {
                'allowed': False,
                'fatal': False,
                'reason': 'invalid_parameters',
                'user_message': param_check['message']
            }
 
        # Check 3: Action complies with policies
        policy_check = self.policy_engine.check_action(action, context)
        if not policy_check['compliant']:
            return {
                'allowed': False,
                'fatal': policy_check['severity'] == 'critical',
                'reason': policy_check['violation'],
                'user_message': policy_check['message']
            }
 
        # Check 4: Resource limits not exceeded
        if not self._check_resource_limits(action, state, context):
            return {
                'allowed': False,
                'fatal': True,
                'reason': 'resource_limit_exceeded',
                'user_message': 'Execution resource limits reached'
            }
 
        # Check 5: No dangerous action patterns
        pattern_check = self._check_dangerous_patterns(action, state)
        if pattern_check['dangerous']:
            return {
                'allowed': False,
                'fatal': True,
                'reason': 'dangerous_pattern',
                'user_message': pattern_check['message']
            }
 
        return {'allowed': True}

Registru securizat de unelte

class SecureToolRegistry:
    """Registry for agent tools with security controls."""
 
    def __init__(self, tool_configs: List[dict]):
        self.tools = {}
        self.tool_permissions = {}
 
        for config in tool_configs:
            tool = self._create_tool(config)
            self.tools[config['name']] = tool
            self.tool_permissions[config['name']] = config.get('required_permissions', set())
 
    def register_tool(self, tool_config: dict):
        """Register a new tool with security metadata."""
 
        tool = SecureTool(
            name=tool_config['name'],
            description=tool_config['description'],
            parameters=tool_config['parameters'],
            risk_level=ActionRiskLevel(tool_config.get('risk_level', 'medium')),
            requires_confirmation=tool_config.get('requires_confirmation', False),
            parameter_validators=tool_config.get('validators', {}),
            execution_wrapper=self._create_secure_wrapper(tool_config)
        )
 
        self.tools[tool.name] = tool
        self.tool_permissions[tool.name] = set(tool_config.get('required_permissions', []))
 
    def allowed_tools(self, user_permissions: set) -> List[str]:
        """Get tools allowed for given permissions."""
        return [
            name for name, required in self.tool_permissions.items()
            if required.issubset(user_permissions)
        ]
 
    def validate_parameters(self, tool_name: str, parameters: dict) -> dict:
        """Validate tool parameters for safety."""
 
        tool = self.tools.get(tool_name)
        if not tool:
            return {'valid': False, 'message': 'Unknown tool'}
 
        # Schema validation
        schema_result = tool.validate_schema(parameters)
        if not schema_result['valid']:
            return schema_result
 
        # Security validation
        for param_name, value in parameters.items():
            validator = tool.parameter_validators.get(param_name)
            if validator:
                validation_result = validator(value)
                if not validation_result['valid']:
                    return validation_result
 
        return {'valid': True}
 
    def _create_secure_wrapper(self, config: dict):
        """Create a secure execution wrapper for the tool."""
 
        base_executor = config['executor']
        timeout = config.get('timeout_seconds', 30)
        sandboxed = config.get('sandboxed', True)
 
        async def secure_wrapper(parameters: dict, context: ExecutionContext):
            # Pre-execution logging
            execution_id = str(uuid.uuid4())
            logger.info(f"Tool execution start: {config['name']}", extra={
                'execution_id': execution_id,
                'parameters': self._mask_sensitive(parameters),
                'user_id': context.user_id
            })
 
            try:
                # Execute with timeout
                if sandboxed:
                    result = await asyncio.wait_for(
                        self._sandboxed_execute(base_executor, parameters),
                        timeout=timeout
                    )
                else:
                    result = await asyncio.wait_for(
                        base_executor(parameters),
                        timeout=timeout
                    )
 
                # Post-execution validation
                validated_result = self._validate_result(result, config)
 
                logger.info(f"Tool execution success: {config['name']}", extra={
                    'execution_id': execution_id,
                    'result_type': type(result).__name__
                })
 
                return validated_result
 
            except asyncio.TimeoutError:
                logger.error(f"Tool execution timeout: {config['name']}", extra={
                    'execution_id': execution_id
                })
                raise ToolExecutionError(f"Tool {config['name']} timed out")
 
            except Exception as e:
                logger.error(f"Tool execution error: {config['name']}", extra={
                    'execution_id': execution_id,
                    'error': str(e)
                })
                raise
 
        return secure_wrapper
 
 
class ParameterValidators:
    """Common parameter validators for tool security."""
 
    @staticmethod
    def file_path_validator(allowed_paths: List[str]):
        """Validate file paths to prevent path traversal."""
        def validator(path: str) -> dict:
            # Normalize path
            normalized = os.path.normpath(os.path.abspath(path))
 
            # Check for path traversal
            if '..' in path:
                return {
                    'valid': False,
                    'message': 'Path traversal not allowed'
                }
 
            # Check against allowed paths
            for allowed in allowed_paths:
                if normalized.startswith(os.path.abspath(allowed)):
                    return {'valid': True}
 
            return {
                'valid': False,
                'message': f'Path not in allowed directories'
            }
        return validator
 
    @staticmethod
    def url_validator(allowed_domains: Optional[List[str]] = None):
        """Validate URLs for SSRF prevention."""
        def validator(url: str) -> dict:
            try:
                parsed = urlparse(url)
 
                # Must be http or https
                if parsed.scheme not in ['http', 'https']:
                    return {
                        'valid': False,
                        'message': 'Only HTTP(S) URLs allowed'
                    }
 
                # Check for internal IPs
                try:
                    ip = socket.gethostbyname(parsed.hostname)
                    if ipaddress.ip_address(ip).is_private:
                        return {
                            'valid': False,
                            'message': 'Internal addresses not allowed'
                        }
                except socket.gaierror:
                    pass
 
                # Check domain allowlist if provided
                if allowed_domains:
                    if parsed.hostname not in allowed_domains:
                        return {
                            'valid': False,
                            'message': 'Domain not in allowlist'
                        }
 
                return {'valid': True}
 
            except Exception as e:
                return {
                    'valid': False,
                    'message': f'Invalid URL: {str(e)}'
                }
        return validator
 
    @staticmethod
    def sql_query_validator(allowed_operations: List[str] = None):
        """Validate SQL queries to prevent injection."""
        def validator(query: str) -> dict:
            query_upper = query.upper().strip()
 
            # Default to SELECT only
            if allowed_operations is None:
                allowed = ['SELECT']
            else:
                allowed = [op.upper() for op in allowed_operations]
 
            # Check query type
            for op in allowed:
                if query_upper.startswith(op):
                    break
            else:
                return {
                    'valid': False,
                    'message': f'Only {allowed} operations allowed'
                }
 
            # Check for dangerous patterns
            dangerous_patterns = [
                r';\s*(DROP|DELETE|TRUNCATE|UPDATE|INSERT)',
                r'UNION\s+SELECT',
                r'--',
                r'/\*.*\*/',
            ]
 
            for pattern in dangerous_patterns:
                if re.search(pattern, query, re.IGNORECASE):
                    return {
                        'valid': False,
                        'message': 'Query contains disallowed patterns'
                    }
 
            return {'valid': True}
        return validator

Monitorizarea executiei

class ExecutionMonitor:
    """Monitor agent execution for security anomalies."""
 
    def __init__(self, config: dict):
        self.max_iterations = config.get('max_iterations', 50)
        self.max_execution_time = config.get('max_execution_time_seconds', 300)
        self.action_rate_limit = config.get('actions_per_minute', 30)
        self.executions = {}
 
    def start_execution(self, context: ExecutionContext) -> str:
        """Start monitoring an execution."""
        execution_id = str(uuid.uuid4())
 
        self.executions[execution_id] = {
            'start_time': time.time(),
            'context': context,
            'iterations': 0,
            'actions': [],
            'blocked_actions': [],
            'errors': [],
            'status': 'running'
        }
 
        return execution_id
 
    def check_limits(self, execution_id: str) -> dict:
        """Check if execution is within limits."""
        execution = self.executions.get(execution_id)
        if not execution:
            return {'within_limits': False, 'reason': 'unknown_execution'}
 
        # Check iteration limit
        if execution['iterations'] >= self.max_iterations:
            return {
                'within_limits': False,
                'reason': 'max_iterations_exceeded',
                'limit': self.max_iterations
            }
 
        # Check time limit
        elapsed = time.time() - execution['start_time']
        if elapsed > self.max_execution_time:
            return {
                'within_limits': False,
                'reason': 'max_time_exceeded',
                'limit': self.max_execution_time
            }
 
        # Check action rate
        recent_actions = [
            a for a in execution['actions']
            if a['timestamp'] > time.time() - 60
        ]
        if len(recent_actions) >= self.action_rate_limit:
            return {
                'within_limits': False,
                'reason': 'action_rate_exceeded',
                'limit': self.action_rate_limit
            }
 
        return {'within_limits': True}
 
    def detect_anomalies(self, execution_id: str) -> List[dict]:
        """Detect anomalous execution patterns."""
        execution = self.executions.get(execution_id)
        if not execution:
            return []
 
        anomalies = []
 
        # Check for repeated failed actions
        recent_blocked = execution['blocked_actions'][-10:]
        if len(recent_blocked) >= 5:
            anomalies.append({
                'type': 'repeated_blocked_actions',
                'count': len(recent_blocked),
                'severity': 'high'
            })
 
        # Check for action pattern repetition (potential loop)
        action_names = [a['tool_name'] for a in execution['actions'][-20:]]
        if len(action_names) >= 10:
            pattern = self._find_repeated_pattern(action_names)
            if pattern:
                anomalies.append({
                    'type': 'action_loop_detected',
                    'pattern': pattern,
                    'severity': 'medium'
                })
 
        # Check for escalating privilege attempts
        privilege_actions = [
            a for a in execution['blocked_actions']
            if a.get('reason') == 'insufficient_permissions'
        ]
        if len(privilege_actions) >= 3:
            anomalies.append({
                'type': 'privilege_escalation_attempts',
                'count': len(privilege_actions),
                'severity': 'high'
            })
 
        return anomalies
 
    def _find_repeated_pattern(self, actions: List[str]) -> Optional[List[str]]:
        """Find repeated action patterns indicating a loop."""
        for pattern_length in range(2, len(actions) // 3):
            pattern = actions[-pattern_length:]
            matches = 0
            for i in range(len(actions) - pattern_length, 0, -pattern_length):
                if actions[i:i+pattern_length] == pattern:
                    matches += 1
                else:
                    break
            if matches >= 2:
                return pattern
        return None

Controale Human-in-the-Loop

class ConfirmationHandler:
    """Handle human confirmation for high-risk agent actions."""
 
    def __init__(self, config: dict = None):
        self.timeout_seconds = config.get('timeout_seconds', 60) if config else 60
        self.auto_deny_on_timeout = config.get('auto_deny_on_timeout', True) if config else True
 
    async def request_confirmation(self, action: ToolAction,
                                  context: ExecutionContext) -> bool:
        """Request human confirmation for an action."""
 
        confirmation_request = {
            'request_id': str(uuid.uuid4()),
            'action': {
                'tool': action.tool_name,
                'parameters': self._mask_sensitive_params(action.parameters),
                'risk_level': action.risk_level.value,
            },
            'context': {
                'user_id': context.user_id,
                'session_id': context.session_id,
            },
            'timeout_seconds': self.timeout_seconds
        }
 
        # Send confirmation request
        response = await self._send_confirmation_request(confirmation_request)
 
        # Log the confirmation decision
        logger.info("Confirmation response", extra={
            'request_id': confirmation_request['request_id'],
            'confirmed': response.get('confirmed', False),
            'responder': response.get('responder'),
            'response_time_seconds': response.get('response_time')
        })
 
        return response.get('confirmed', False)
 
    def _mask_sensitive_params(self, params: dict) -> dict:
        """Mask sensitive parameter values for confirmation display."""
        masked = {}
        sensitive_keys = ['password', 'secret', 'key', 'token', 'credential']
 
        for key, value in params.items():
            if any(s in key.lower() for s in sensitive_keys):
                masked[key] = '***MASKED***'
            elif isinstance(value, str) and len(value) > 200:
                masked[key] = value[:200] + '...[truncated]'
            else:
                masked[key] = value
 
        return masked
 
 
class InterventionPolicy:
    """Define when human intervention is required."""
 
    def __init__(self, config: dict):
        self.always_confirm = set(config.get('always_confirm_tools', []))
        self.risk_threshold = ActionRiskLevel(config.get('risk_threshold', 'high'))
        self.sensitive_patterns = config.get('sensitive_patterns', [])
 
    def requires_confirmation(self, action: ToolAction,
                            context: ExecutionContext) -> bool:
        """Determine if action requires human confirmation."""
 
        # Always confirm certain tools
        if action.tool_name in self.always_confirm:
            return True
 
        # Confirm based on risk level
        risk_order = [ActionRiskLevel.LOW, ActionRiskLevel.MEDIUM,
                     ActionRiskLevel.HIGH, ActionRiskLevel.CRITICAL]
        if risk_order.index(action.risk_level) >= risk_order.index(self.risk_threshold):
            return True
 
        # Confirm if parameters match sensitive patterns
        params_str = json.dumps(action.parameters)
        for pattern in self.sensitive_patterns:
            if re.search(pattern, params_str, re.IGNORECASE):
                return True
 
        return False

Concluzie

Agentii LLM ofera capabilitati puternice, dar introduc riscuri semnificative de securitate care necesita atenuare atenta. Cheia este implementarea mai multor straturi de control:

  1. Guvernanta stricta a uneltelor - Limiteaza si valideaza ce unelte pot folosi agentii
  2. Monitorizarea executiei - Detecteaza comportamentul anormal in timp real
  3. Human-in-the-loop - Solicita confirmare pentru actiunile cu risc ridicat
  4. Limite de resurse - Previne executia fara control
  5. Logare cuprinzatoare - Permite investigatia si invatarea

La DeviDevs, ajutam organizatiile sa deployeze agenti LLM in siguranta, echilibrand capabilitatea cu masuri de protectie adecvate. Contacteaza-ne pentru a discuta cerintele tale de securitate a agentilor.


Nu esti sigur unde se incadreaza sistemul tau AI conform EU AI Act? Fa evaluarea gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.