Securitatea Agentilor LLM: Intelegerea si Atenuarea Riscurilor AI Autonome
Agentii LLM reprezinta o schimbare de paradigma de la sisteme simple de intrebare-raspuns la entitati autonome care pot planifica, folosi unelte si lua actiuni in lumea reala. Desi aceasta capabilitate debloca cazuri de utilizare puternice, introduce si riscuri semnificative de securitate care necesita atentie speciala.
Acest ghid examineaza provocarile unice de securitate ale agentilor LLM si ofera strategii pentru deployment-ul in siguranta.
Ce face agentii diferiti
Aplicatiile LLM traditionale sunt in esenta transformatoare fara stare: input-ul intra, output-ul iese. Agentii sunt fundamental diferiti:
Traditional LLM:
User Query → LLM → Response → Done
LLM Agent:
User Query → LLM → Plan → Tool Call → Observe → Plan → Tool Call → ... → Response
↑__________________________________|
(Autonomous Loop)
Bucla agentului introduce cateva caracteristici critice din punct de vedere al securitatii:
- Luarea deciziilor autonome - LLM-ul decide ce actiuni sa intreprinda
- Efecte in lumea reala - Uneltele pot modifica baze de date, trimite emailuri, apela API-uri
- Executie in mai multi pasi - Erorile sau atacurile se pot acumula
- Comportament orientat spre obiective - Agentii urmaresc obiective, uneori in mod neasteptat
Modelul de amenintari al agentilor
Categorii de amenintari
class AgentThreatModel:
"""Security threat model for LLM agents."""
threats = {
'prompt_injection_amplification': {
'description': '''Prompt injection attacks are amplified because
the agent can take actions based on injected instructions''',
'example': 'Injected prompt causes agent to email sensitive data',
'severity': 'critical',
'mitigations': [
'Strict input sanitization',
'Action confirmation for sensitive operations',
'Output filtering before tool use',
]
},
'confused_deputy': {
'description': '''Agent uses its authorized access to perform
actions on behalf of an attacker''',
'example': 'User tricks agent into accessing another user\'s data',
'severity': 'critical',
'mitigations': [
'Per-request authorization checks',
'User context isolation',
'Action scope limitations',
]
},
'goal_hijacking': {
'description': '''Attacker modifies the agent\'s objective
mid-execution through injected content''',
'example': 'Document being processed contains instructions to change task',
'severity': 'high',
'mitigations': [
'Goal immutability after initialization',
'Content isolation during processing',
'Behavioral consistency checks',
]
},
'tool_misuse': {
'description': '''Agent uses tools in unintended or harmful ways''',
'example': 'Agent executes arbitrary shell commands',
'severity': 'critical',
'mitigations': [
'Tool capability restrictions',
'Parameter validation',
'Sandboxed execution',
]
},
'resource_exhaustion': {
'description': '''Agent enters infinite loop or consumes
excessive resources''',
'example': 'Agent keeps retrying failed operation indefinitely',
'severity': 'high',
'mitigations': [
'Iteration limits',
'Timeout enforcement',
'Cost tracking and limits',
]
},
'information_leakage': {
'description': '''Agent inadvertently exposes sensitive information
through tool outputs or reasoning''',
'example': 'Agent includes API keys in error messages',
'severity': 'high',
'mitigations': [
'Output sanitization',
'Secrets masking',
'Reasoning trace filtering',
]
},
'capability_escalation': {
'description': '''Agent acquires capabilities beyond intended scope''',
'example': 'Agent installs additional tools or modifies its own prompts',
'severity': 'critical',
'mitigations': [
'Immutable capability set',
'Self-modification prevention',
'Capability audit logging',
]
},
}Arhitectura securizata a agentilor
Principii fundamentale de securitate
from abc import ABC, abstractmethod
from typing import Any, List, Optional
from dataclasses import dataclass
from enum import Enum
class ActionRiskLevel(Enum):
LOW = "low" # Read-only, no side effects
MEDIUM = "medium" # Limited side effects, reversible
HIGH = "high" # Significant side effects
CRITICAL = "critical" # Irreversible, high impact
@dataclass
class ToolAction:
tool_name: str
parameters: dict
risk_level: ActionRiskLevel
requires_confirmation: bool
reversible: bool
@dataclass
class ExecutionContext:
user_id: str
session_id: str
permissions: set
resource_limits: dict
execution_budget: dict
class SecureAgentFramework:
"""Framework for building secure LLM agents."""
def __init__(self, config: dict):
self.llm = config['llm']
self.tools = SecureToolRegistry(config['tools'])
self.policy_engine = PolicyEngine(config['policies'])
self.execution_monitor = ExecutionMonitor(config['monitoring'])
self.confirmation_handler = ConfirmationHandler()
async def run(self, task: str, context: ExecutionContext) -> dict:
"""Execute agent task with security controls."""
# Initialize execution tracking
execution_id = self.execution_monitor.start_execution(context)
try:
# Parse and validate initial task
validated_task = self._validate_task(task, context)
# Initialize agent state
state = AgentState(
task=validated_task,
context=context,
iteration=0,
actions_taken=[],
budget_used={'tokens': 0, 'api_calls': 0, 'time_seconds': 0}
)
# Main agent loop with security controls
while not state.is_complete():
# Check execution limits
if not self._check_limits(state, context):
return self._create_limit_exceeded_response(state)
# Get next action from LLM
action = await self._get_next_action(state)
if action is None:
break
# Security checks before execution
security_check = self._pre_execution_check(action, state, context)
if not security_check['allowed']:
self.execution_monitor.log_blocked_action(
execution_id, action, security_check['reason']
)
if security_check['fatal']:
return self._create_security_violation_response(
action, security_check
)
# Inform agent action was blocked
state.add_observation({
'type': 'action_blocked',
'reason': security_check['user_message']
})
continue
# Handle confirmation requirements
if action.requires_confirmation:
confirmed = await self.confirmation_handler.request_confirmation(
action, context
)
if not confirmed:
state.add_observation({
'type': 'action_cancelled',
'reason': 'User declined confirmation'
})
continue
# Execute action with monitoring
result = await self._execute_action(action, state, context)
# Update state
state.add_action_result(action, result)
state.iteration += 1
return self._create_success_response(state)
except Exception as e:
self.execution_monitor.log_error(execution_id, e)
raise
finally:
self.execution_monitor.end_execution(execution_id)
def _pre_execution_check(self, action: ToolAction,
state: AgentState,
context: ExecutionContext) -> dict:
"""Comprehensive pre-execution security check."""
checks = []
# Check 1: Tool is registered and allowed
if action.tool_name not in self.tools.allowed_tools(context.permissions):
return {
'allowed': False,
'fatal': True,
'reason': 'tool_not_allowed',
'user_message': f'Tool {action.tool_name} is not available'
}
# Check 2: Parameters are valid and safe
param_check = self.tools.validate_parameters(
action.tool_name, action.parameters
)
if not param_check['valid']:
return {
'allowed': False,
'fatal': False,
'reason': 'invalid_parameters',
'user_message': param_check['message']
}
# Check 3: Action complies with policies
policy_check = self.policy_engine.check_action(action, context)
if not policy_check['compliant']:
return {
'allowed': False,
'fatal': policy_check['severity'] == 'critical',
'reason': policy_check['violation'],
'user_message': policy_check['message']
}
# Check 4: Resource limits not exceeded
if not self._check_resource_limits(action, state, context):
return {
'allowed': False,
'fatal': True,
'reason': 'resource_limit_exceeded',
'user_message': 'Execution resource limits reached'
}
# Check 5: No dangerous action patterns
pattern_check = self._check_dangerous_patterns(action, state)
if pattern_check['dangerous']:
return {
'allowed': False,
'fatal': True,
'reason': 'dangerous_pattern',
'user_message': pattern_check['message']
}
return {'allowed': True}Registru securizat de unelte
class SecureToolRegistry:
"""Registry for agent tools with security controls."""
def __init__(self, tool_configs: List[dict]):
self.tools = {}
self.tool_permissions = {}
for config in tool_configs:
tool = self._create_tool(config)
self.tools[config['name']] = tool
self.tool_permissions[config['name']] = config.get('required_permissions', set())
def register_tool(self, tool_config: dict):
"""Register a new tool with security metadata."""
tool = SecureTool(
name=tool_config['name'],
description=tool_config['description'],
parameters=tool_config['parameters'],
risk_level=ActionRiskLevel(tool_config.get('risk_level', 'medium')),
requires_confirmation=tool_config.get('requires_confirmation', False),
parameter_validators=tool_config.get('validators', {}),
execution_wrapper=self._create_secure_wrapper(tool_config)
)
self.tools[tool.name] = tool
self.tool_permissions[tool.name] = set(tool_config.get('required_permissions', []))
def allowed_tools(self, user_permissions: set) -> List[str]:
"""Get tools allowed for given permissions."""
return [
name for name, required in self.tool_permissions.items()
if required.issubset(user_permissions)
]
def validate_parameters(self, tool_name: str, parameters: dict) -> dict:
"""Validate tool parameters for safety."""
tool = self.tools.get(tool_name)
if not tool:
return {'valid': False, 'message': 'Unknown tool'}
# Schema validation
schema_result = tool.validate_schema(parameters)
if not schema_result['valid']:
return schema_result
# Security validation
for param_name, value in parameters.items():
validator = tool.parameter_validators.get(param_name)
if validator:
validation_result = validator(value)
if not validation_result['valid']:
return validation_result
return {'valid': True}
def _create_secure_wrapper(self, config: dict):
"""Create a secure execution wrapper for the tool."""
base_executor = config['executor']
timeout = config.get('timeout_seconds', 30)
sandboxed = config.get('sandboxed', True)
async def secure_wrapper(parameters: dict, context: ExecutionContext):
# Pre-execution logging
execution_id = str(uuid.uuid4())
logger.info(f"Tool execution start: {config['name']}", extra={
'execution_id': execution_id,
'parameters': self._mask_sensitive(parameters),
'user_id': context.user_id
})
try:
# Execute with timeout
if sandboxed:
result = await asyncio.wait_for(
self._sandboxed_execute(base_executor, parameters),
timeout=timeout
)
else:
result = await asyncio.wait_for(
base_executor(parameters),
timeout=timeout
)
# Post-execution validation
validated_result = self._validate_result(result, config)
logger.info(f"Tool execution success: {config['name']}", extra={
'execution_id': execution_id,
'result_type': type(result).__name__
})
return validated_result
except asyncio.TimeoutError:
logger.error(f"Tool execution timeout: {config['name']}", extra={
'execution_id': execution_id
})
raise ToolExecutionError(f"Tool {config['name']} timed out")
except Exception as e:
logger.error(f"Tool execution error: {config['name']}", extra={
'execution_id': execution_id,
'error': str(e)
})
raise
return secure_wrapper
class ParameterValidators:
"""Common parameter validators for tool security."""
@staticmethod
def file_path_validator(allowed_paths: List[str]):
"""Validate file paths to prevent path traversal."""
def validator(path: str) -> dict:
# Normalize path
normalized = os.path.normpath(os.path.abspath(path))
# Check for path traversal
if '..' in path:
return {
'valid': False,
'message': 'Path traversal not allowed'
}
# Check against allowed paths
for allowed in allowed_paths:
if normalized.startswith(os.path.abspath(allowed)):
return {'valid': True}
return {
'valid': False,
'message': f'Path not in allowed directories'
}
return validator
@staticmethod
def url_validator(allowed_domains: Optional[List[str]] = None):
"""Validate URLs for SSRF prevention."""
def validator(url: str) -> dict:
try:
parsed = urlparse(url)
# Must be http or https
if parsed.scheme not in ['http', 'https']:
return {
'valid': False,
'message': 'Only HTTP(S) URLs allowed'
}
# Check for internal IPs
try:
ip = socket.gethostbyname(parsed.hostname)
if ipaddress.ip_address(ip).is_private:
return {
'valid': False,
'message': 'Internal addresses not allowed'
}
except socket.gaierror:
pass
# Check domain allowlist if provided
if allowed_domains:
if parsed.hostname not in allowed_domains:
return {
'valid': False,
'message': 'Domain not in allowlist'
}
return {'valid': True}
except Exception as e:
return {
'valid': False,
'message': f'Invalid URL: {str(e)}'
}
return validator
@staticmethod
def sql_query_validator(allowed_operations: List[str] = None):
"""Validate SQL queries to prevent injection."""
def validator(query: str) -> dict:
query_upper = query.upper().strip()
# Default to SELECT only
if allowed_operations is None:
allowed = ['SELECT']
else:
allowed = [op.upper() for op in allowed_operations]
# Check query type
for op in allowed:
if query_upper.startswith(op):
break
else:
return {
'valid': False,
'message': f'Only {allowed} operations allowed'
}
# Check for dangerous patterns
dangerous_patterns = [
r';\s*(DROP|DELETE|TRUNCATE|UPDATE|INSERT)',
r'UNION\s+SELECT',
r'--',
r'/\*.*\*/',
]
for pattern in dangerous_patterns:
if re.search(pattern, query, re.IGNORECASE):
return {
'valid': False,
'message': 'Query contains disallowed patterns'
}
return {'valid': True}
return validatorMonitorizarea executiei
class ExecutionMonitor:
"""Monitor agent execution for security anomalies."""
def __init__(self, config: dict):
self.max_iterations = config.get('max_iterations', 50)
self.max_execution_time = config.get('max_execution_time_seconds', 300)
self.action_rate_limit = config.get('actions_per_minute', 30)
self.executions = {}
def start_execution(self, context: ExecutionContext) -> str:
"""Start monitoring an execution."""
execution_id = str(uuid.uuid4())
self.executions[execution_id] = {
'start_time': time.time(),
'context': context,
'iterations': 0,
'actions': [],
'blocked_actions': [],
'errors': [],
'status': 'running'
}
return execution_id
def check_limits(self, execution_id: str) -> dict:
"""Check if execution is within limits."""
execution = self.executions.get(execution_id)
if not execution:
return {'within_limits': False, 'reason': 'unknown_execution'}
# Check iteration limit
if execution['iterations'] >= self.max_iterations:
return {
'within_limits': False,
'reason': 'max_iterations_exceeded',
'limit': self.max_iterations
}
# Check time limit
elapsed = time.time() - execution['start_time']
if elapsed > self.max_execution_time:
return {
'within_limits': False,
'reason': 'max_time_exceeded',
'limit': self.max_execution_time
}
# Check action rate
recent_actions = [
a for a in execution['actions']
if a['timestamp'] > time.time() - 60
]
if len(recent_actions) >= self.action_rate_limit:
return {
'within_limits': False,
'reason': 'action_rate_exceeded',
'limit': self.action_rate_limit
}
return {'within_limits': True}
def detect_anomalies(self, execution_id: str) -> List[dict]:
"""Detect anomalous execution patterns."""
execution = self.executions.get(execution_id)
if not execution:
return []
anomalies = []
# Check for repeated failed actions
recent_blocked = execution['blocked_actions'][-10:]
if len(recent_blocked) >= 5:
anomalies.append({
'type': 'repeated_blocked_actions',
'count': len(recent_blocked),
'severity': 'high'
})
# Check for action pattern repetition (potential loop)
action_names = [a['tool_name'] for a in execution['actions'][-20:]]
if len(action_names) >= 10:
pattern = self._find_repeated_pattern(action_names)
if pattern:
anomalies.append({
'type': 'action_loop_detected',
'pattern': pattern,
'severity': 'medium'
})
# Check for escalating privilege attempts
privilege_actions = [
a for a in execution['blocked_actions']
if a.get('reason') == 'insufficient_permissions'
]
if len(privilege_actions) >= 3:
anomalies.append({
'type': 'privilege_escalation_attempts',
'count': len(privilege_actions),
'severity': 'high'
})
return anomalies
def _find_repeated_pattern(self, actions: List[str]) -> Optional[List[str]]:
"""Find repeated action patterns indicating a loop."""
for pattern_length in range(2, len(actions) // 3):
pattern = actions[-pattern_length:]
matches = 0
for i in range(len(actions) - pattern_length, 0, -pattern_length):
if actions[i:i+pattern_length] == pattern:
matches += 1
else:
break
if matches >= 2:
return pattern
return NoneControale Human-in-the-Loop
class ConfirmationHandler:
"""Handle human confirmation for high-risk agent actions."""
def __init__(self, config: dict = None):
self.timeout_seconds = config.get('timeout_seconds', 60) if config else 60
self.auto_deny_on_timeout = config.get('auto_deny_on_timeout', True) if config else True
async def request_confirmation(self, action: ToolAction,
context: ExecutionContext) -> bool:
"""Request human confirmation for an action."""
confirmation_request = {
'request_id': str(uuid.uuid4()),
'action': {
'tool': action.tool_name,
'parameters': self._mask_sensitive_params(action.parameters),
'risk_level': action.risk_level.value,
},
'context': {
'user_id': context.user_id,
'session_id': context.session_id,
},
'timeout_seconds': self.timeout_seconds
}
# Send confirmation request
response = await self._send_confirmation_request(confirmation_request)
# Log the confirmation decision
logger.info("Confirmation response", extra={
'request_id': confirmation_request['request_id'],
'confirmed': response.get('confirmed', False),
'responder': response.get('responder'),
'response_time_seconds': response.get('response_time')
})
return response.get('confirmed', False)
def _mask_sensitive_params(self, params: dict) -> dict:
"""Mask sensitive parameter values for confirmation display."""
masked = {}
sensitive_keys = ['password', 'secret', 'key', 'token', 'credential']
for key, value in params.items():
if any(s in key.lower() for s in sensitive_keys):
masked[key] = '***MASKED***'
elif isinstance(value, str) and len(value) > 200:
masked[key] = value[:200] + '...[truncated]'
else:
masked[key] = value
return masked
class InterventionPolicy:
"""Define when human intervention is required."""
def __init__(self, config: dict):
self.always_confirm = set(config.get('always_confirm_tools', []))
self.risk_threshold = ActionRiskLevel(config.get('risk_threshold', 'high'))
self.sensitive_patterns = config.get('sensitive_patterns', [])
def requires_confirmation(self, action: ToolAction,
context: ExecutionContext) -> bool:
"""Determine if action requires human confirmation."""
# Always confirm certain tools
if action.tool_name in self.always_confirm:
return True
# Confirm based on risk level
risk_order = [ActionRiskLevel.LOW, ActionRiskLevel.MEDIUM,
ActionRiskLevel.HIGH, ActionRiskLevel.CRITICAL]
if risk_order.index(action.risk_level) >= risk_order.index(self.risk_threshold):
return True
# Confirm if parameters match sensitive patterns
params_str = json.dumps(action.parameters)
for pattern in self.sensitive_patterns:
if re.search(pattern, params_str, re.IGNORECASE):
return True
return FalseConcluzie
Agentii LLM ofera capabilitati puternice, dar introduc riscuri semnificative de securitate care necesita atenuare atenta. Cheia este implementarea mai multor straturi de control:
- Guvernanta stricta a uneltelor - Limiteaza si valideaza ce unelte pot folosi agentii
- Monitorizarea executiei - Detecteaza comportamentul anormal in timp real
- Human-in-the-loop - Solicita confirmare pentru actiunile cu risc ridicat
- Limite de resurse - Previne executia fara control
- Logare cuprinzatoare - Permite investigatia si invatarea
La DeviDevs, ajutam organizatiile sa deployeze agenti LLM in siguranta, echilibrand capabilitatea cu masuri de protectie adecvate. Contacteaza-ne pentru a discuta cerintele tale de securitate a agentilor.
Nu esti sigur unde se incadreaza sistemul tau AI conform EU AI Act? Fa evaluarea gratuita de risc - afla in 2 minute →