Runtime Application Self-Protection (RASP): Implementation and Best Practices
Runtime Application Self-Protection (RASP) provides real-time security by embedding protection directly into application runtime environments. Unlike perimeter defenses, RASP can detect and block attacks with full context of application behavior.
Understanding RASP Architecture
RASP operates at the application layer, intercepting requests and monitoring execution to detect malicious behavior in real-time.
Core Components
# rasp_core.py
"""
Core RASP framework implementation.
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from enum import Enum
import threading
import logging
import hashlib
import re
class ThreatLevel(Enum):
"""Threat severity levels."""
INFO = "info"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class ResponseAction(Enum):
"""RASP response actions."""
LOG = "log"
ALERT = "alert"
BLOCK = "block"
SANITIZE = "sanitize"
TERMINATE = "terminate"
@dataclass
class SecurityEvent:
"""Represents a detected security event."""
event_id: str
timestamp: datetime
event_type: str
threat_level: ThreatLevel
description: str
source_ip: Optional[str]
user_id: Optional[str]
request_id: Optional[str]
attack_vector: str
payload: Optional[str]
stack_trace: Optional[str]
context: Dict[str, Any] = field(default_factory=dict)
@dataclass
class RASPConfig:
"""RASP configuration."""
mode: str = "monitor" # monitor, block, learn
block_threshold: ThreatLevel = ThreatLevel.HIGH
log_level: str = "INFO"
max_payload_log_size: int = 1024
enable_stack_traces: bool = True
custom_rules: List[Dict] = field(default_factory=list)
whitelist_paths: List[str] = field(default_factory=list)
response_headers: Dict[str, str] = field(default_factory=dict)
class SecuritySensor(ABC):
"""Base class for RASP security sensors."""
def __init__(self, name: str, config: RASPConfig):
self.name = name
self.config = config
self.enabled = True
self.events_detected = 0
@abstractmethod
def analyze(self, context: Dict[str, Any]) -> Optional[SecurityEvent]:
"""Analyze context for security threats."""
pass
@abstractmethod
def get_attack_types(self) -> List[str]:
"""Return list of attack types this sensor detects."""
pass
class SQLInjectionSensor(SecuritySensor):
"""Detects SQL injection attacks."""
SQL_PATTERNS = [
r"(\b(union|select|insert|update|delete|drop|truncate|alter)\b.*\b(from|into|set|table)\b)",
r"(--|\#|\/\*|\*\/)",
r"(\b(or|and)\b\s+[\w\s]*=[\w\s]*)",
r"(;.*\b(drop|delete|truncate)\b)",
r"(\bwaitfor\s+delay\b)",
r"(\bbenchmark\s*\()",
r"(\bsleep\s*\()",
r"(0x[0-9a-fA-F]+)",
r"(\bchar\s*\(\d+\))",
]
def __init__(self, config: RASPConfig):
super().__init__("SQLInjectionSensor", config)
self.compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.SQL_PATTERNS]
def analyze(self, context: Dict[str, Any]) -> Optional[SecurityEvent]:
"""Check for SQL injection patterns."""
inputs_to_check = []
# Collect all user inputs
if 'query_params' in context:
inputs_to_check.extend(context['query_params'].values())
if 'body_params' in context:
inputs_to_check.extend(self._flatten_dict(context['body_params']))
if 'path' in context:
inputs_to_check.append(context['path'])
for input_value in inputs_to_check:
if not isinstance(input_value, str):
continue
for pattern in self.compiled_patterns:
if pattern.search(input_value):
return SecurityEvent(
event_id=self._generate_event_id(),
timestamp=datetime.utcnow(),
event_type="sql_injection",
threat_level=ThreatLevel.HIGH,
description=f"SQL injection pattern detected: {pattern.pattern[:50]}",
source_ip=context.get('client_ip'),
user_id=context.get('user_id'),
request_id=context.get('request_id'),
attack_vector="sql_injection",
payload=input_value[:self.config.max_payload_log_size],
stack_trace=self._get_stack_trace() if self.config.enable_stack_traces else None,
context={'matched_pattern': pattern.pattern}
)
return None
def get_attack_types(self) -> List[str]:
return ["sql_injection", "sql_injection_time_based", "sql_injection_union"]
def _flatten_dict(self, d: Dict, parent_key: str = '') -> List[str]:
"""Flatten nested dictionary values."""
items = []
for k, v in d.items():
if isinstance(v, dict):
items.extend(self._flatten_dict(v, f"{parent_key}{k}."))
elif isinstance(v, list):
for item in v:
if isinstance(item, str):
items.append(item)
elif isinstance(v, str):
items.append(v)
return items
def _generate_event_id(self) -> str:
import secrets
return f"EVT-{secrets.token_hex(8)}"
def _get_stack_trace(self) -> str:
import traceback
return ''.join(traceback.format_stack()[:-2])
class XSSSensor(SecuritySensor):
"""Detects Cross-Site Scripting (XSS) attacks."""
XSS_PATTERNS = [
r"<script[^>]*>.*</script>",
r"javascript\s*:",
r"on\w+\s*=",
r"<\s*img[^>]+onerror\s*=",
r"<\s*svg[^>]+onload\s*=",
r"<\s*iframe",
r"<\s*object",
r"<\s*embed",
r"expression\s*\(",
r"url\s*\(\s*['\"]?\s*javascript:",
]
def __init__(self, config: RASPConfig):
super().__init__("XSSSensor", config)
self.compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.XSS_PATTERNS]
def analyze(self, context: Dict[str, Any]) -> Optional[SecurityEvent]:
"""Check for XSS patterns."""
inputs_to_check = []
if 'query_params' in context:
inputs_to_check.extend(context['query_params'].values())
if 'body_params' in context:
inputs_to_check.extend(self._flatten_dict(context['body_params']))
if 'headers' in context:
# Check specific headers that might be reflected
for header in ['referer', 'user-agent']:
if header in context['headers']:
inputs_to_check.append(context['headers'][header])
for input_value in inputs_to_check:
if not isinstance(input_value, str):
continue
for pattern in self.compiled_patterns:
if pattern.search(input_value):
return SecurityEvent(
event_id=self._generate_event_id(),
timestamp=datetime.utcnow(),
event_type="xss",
threat_level=ThreatLevel.HIGH,
description=f"XSS pattern detected: {pattern.pattern[:50]}",
source_ip=context.get('client_ip'),
user_id=context.get('user_id'),
request_id=context.get('request_id'),
attack_vector="xss",
payload=input_value[:self.config.max_payload_log_size],
stack_trace=None,
context={'matched_pattern': pattern.pattern}
)
return None
def get_attack_types(self) -> List[str]:
return ["xss_reflected", "xss_stored", "xss_dom"]
def _flatten_dict(self, d: Dict, parent_key: str = '') -> List[str]:
items = []
for k, v in d.items():
if isinstance(v, dict):
items.extend(self._flatten_dict(v))
elif isinstance(v, str):
items.append(v)
return items
def _generate_event_id(self) -> str:
import secrets
return f"EVT-{secrets.token_hex(8)}"
class PathTraversalSensor(SecuritySensor):
"""Detects path traversal attacks."""
TRAVERSAL_PATTERNS = [
r"\.\./",
r"\.\.\\",
r"%2e%2e%2f",
r"%2e%2e/",
r"\.%2e/",
r"%2e\./",
r"\.\.%5c",
r"%252e%252e%255c",
r"/etc/passwd",
r"/etc/shadow",
r"c:\\windows",
r"/proc/self",
]
def __init__(self, config: RASPConfig):
super().__init__("PathTraversalSensor", config)
self.compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.TRAVERSAL_PATTERNS]
def analyze(self, context: Dict[str, Any]) -> Optional[SecurityEvent]:
"""Check for path traversal patterns."""
inputs_to_check = []
if 'path' in context:
inputs_to_check.append(context['path'])
if 'query_params' in context:
inputs_to_check.extend(context['query_params'].values())
if 'body_params' in context:
for v in context['body_params'].values():
if isinstance(v, str):
inputs_to_check.append(v)
for input_value in inputs_to_check:
if not isinstance(input_value, str):
continue
for pattern in self.compiled_patterns:
if pattern.search(input_value):
return SecurityEvent(
event_id=self._generate_event_id(),
timestamp=datetime.utcnow(),
event_type="path_traversal",
threat_level=ThreatLevel.HIGH,
description="Path traversal attempt detected",
source_ip=context.get('client_ip'),
user_id=context.get('user_id'),
request_id=context.get('request_id'),
attack_vector="path_traversal",
payload=input_value[:self.config.max_payload_log_size],
stack_trace=None,
context={'matched_pattern': pattern.pattern}
)
return None
def get_attack_types(self) -> List[str]:
return ["path_traversal", "lfi", "rfi"]
def _generate_event_id(self) -> str:
import secrets
return f"EVT-{secrets.token_hex(8)}"
class RASPEngine:
"""Main RASP engine that coordinates sensors and responses."""
def __init__(self, config: RASPConfig):
self.config = config
self.sensors: List[SecuritySensor] = []
self.event_handlers: List[Callable[[SecurityEvent], None]] = []
self.logger = logging.getLogger("RASP")
self._lock = threading.Lock()
# Initialize default sensors
self._init_default_sensors()
def _init_default_sensors(self):
"""Initialize default security sensors."""
self.sensors.append(SQLInjectionSensor(self.config))
self.sensors.append(XSSSensor(self.config))
self.sensors.append(PathTraversalSensor(self.config))
def add_sensor(self, sensor: SecuritySensor):
"""Add a custom sensor."""
with self._lock:
self.sensors.append(sensor)
def add_event_handler(self, handler: Callable[[SecurityEvent], None]):
"""Add an event handler."""
self.event_handlers.append(handler)
def analyze_request(self, context: Dict[str, Any]) -> List[SecurityEvent]:
"""Analyze a request through all sensors."""
events = []
# Check whitelist
if self._is_whitelisted(context):
return events
for sensor in self.sensors:
if not sensor.enabled:
continue
try:
event = sensor.analyze(context)
if event:
events.append(event)
sensor.events_detected += 1
except Exception as e:
self.logger.error(f"Sensor {sensor.name} error: {e}")
# Process events
for event in events:
self._process_event(event)
return events
def should_block(self, events: List[SecurityEvent]) -> bool:
"""Determine if request should be blocked."""
if self.config.mode != "block":
return False
threshold_order = [ThreatLevel.INFO, ThreatLevel.LOW, ThreatLevel.MEDIUM,
ThreatLevel.HIGH, ThreatLevel.CRITICAL]
for event in events:
if threshold_order.index(event.threat_level) >= threshold_order.index(self.config.block_threshold):
return True
return False
def get_response_action(self, events: List[SecurityEvent]) -> ResponseAction:
"""Determine appropriate response action."""
if not events:
return ResponseAction.LOG
max_threat = max(events, key=lambda e: [ThreatLevel.INFO, ThreatLevel.LOW,
ThreatLevel.MEDIUM, ThreatLevel.HIGH,
ThreatLevel.CRITICAL].index(e.threat_level))
if self.config.mode == "monitor":
return ResponseAction.ALERT if max_threat.threat_level in [ThreatLevel.HIGH, ThreatLevel.CRITICAL] else ResponseAction.LOG
if self.config.mode == "block":
if max_threat.threat_level == ThreatLevel.CRITICAL:
return ResponseAction.TERMINATE
elif max_threat.threat_level == ThreatLevel.HIGH:
return ResponseAction.BLOCK
else:
return ResponseAction.ALERT
return ResponseAction.LOG
def _is_whitelisted(self, context: Dict[str, Any]) -> bool:
"""Check if request path is whitelisted."""
path = context.get('path', '')
for whitelist_pattern in self.config.whitelist_paths:
if re.match(whitelist_pattern, path):
return True
return False
def _process_event(self, event: SecurityEvent):
"""Process a security event."""
# Log event
self.logger.warning(
f"Security event detected: {event.event_type} "
f"[{event.threat_level.value}] - {event.description}"
)
# Call handlers
for handler in self.event_handlers:
try:
handler(event)
except Exception as e:
self.logger.error(f"Event handler error: {e}")
def get_stats(self) -> Dict[str, Any]:
"""Get RASP statistics."""
return {
"mode": self.config.mode,
"sensors": [
{
"name": s.name,
"enabled": s.enabled,
"events_detected": s.events_detected,
"attack_types": s.get_attack_types()
}
for s in self.sensors
]
}HTTP Middleware Integration
# rasp_middleware.py
"""
RASP middleware for web frameworks.
"""
from typing import Callable, Dict, Any
import time
import uuid
from functools import wraps
class RASPMiddleware:
"""
Generic RASP middleware that can be adapted to various frameworks.
"""
def __init__(self, rasp_engine, app=None):
self.rasp = rasp_engine
self.app = app
def extract_context(self, request) -> Dict[str, Any]:
"""Extract security-relevant context from request."""
context = {
'request_id': str(uuid.uuid4()),
'timestamp': time.time(),
'method': getattr(request, 'method', 'GET'),
'path': getattr(request, 'path', '/'),
'client_ip': self._get_client_ip(request),
'user_agent': request.headers.get('User-Agent', ''),
'content_type': request.headers.get('Content-Type', ''),
'query_params': dict(request.args) if hasattr(request, 'args') else {},
'headers': dict(request.headers) if hasattr(request, 'headers') else {},
}
# Extract body params based on content type
if hasattr(request, 'json') and request.json:
context['body_params'] = request.json
elif hasattr(request, 'form'):
context['body_params'] = dict(request.form)
# Extract user ID if authenticated
if hasattr(request, 'user') and request.user:
context['user_id'] = str(getattr(request.user, 'id', None))
return context
def _get_client_ip(self, request) -> str:
"""Get real client IP, handling proxies."""
# Check X-Forwarded-For first
xff = request.headers.get('X-Forwarded-For', '')
if xff:
return xff.split(',')[0].strip()
# Check X-Real-IP
xri = request.headers.get('X-Real-IP', '')
if xri:
return xri
# Fall back to remote address
return getattr(request, 'remote_addr', '0.0.0.0')
def create_blocked_response(self, events):
"""Create a response for blocked requests."""
return {
'status_code': 403,
'body': {
'error': 'Request blocked by security policy',
'request_id': events[0].request_id if events else None
},
'headers': {
'X-RASP-Block': 'true',
**self.rasp.config.response_headers
}
}
# Flask integration
def flask_rasp_middleware(rasp_engine):
"""Create Flask RASP middleware."""
from flask import request, jsonify, g
middleware = RASPMiddleware(rasp_engine)
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
# Extract context
context = middleware.extract_context(request)
g.rasp_request_id = context['request_id']
# Analyze request
events = rasp_engine.analyze_request(context)
# Check if should block
if rasp_engine.should_block(events):
response = middleware.create_blocked_response(events)
return jsonify(response['body']), response['status_code'], response['headers']
# Continue with request
return f(*args, **kwargs)
return wrapped
return decorator
# Express.js/Node.js style middleware (for reference)
EXPRESS_MIDDLEWARE_CODE = '''
// rasp-middleware.js
const RASPEngine = require('./rasp-engine');
function createRASPMiddleware(config) {
const rasp = new RASPEngine(config);
return function(req, res, next) {
const context = {
request_id: req.id || require('uuid').v4(),
method: req.method,
path: req.path,
client_ip: req.ip || req.connection.remoteAddress,
user_agent: req.get('User-Agent'),
query_params: req.query,
body_params: req.body,
headers: req.headers,
user_id: req.user?.id
};
const events = rasp.analyzeRequest(context);
if (rasp.shouldBlock(events)) {
return res.status(403).json({
error: 'Request blocked by security policy',
request_id: context.request_id
});
}
// Attach RASP context for logging
req.raspContext = { events, request_id: context.request_id };
next();
};
}
module.exports = createRASPMiddleware;
'''
# Django middleware
class DjangoRASPMiddleware:
"""Django RASP middleware."""
def __init__(self, get_response, rasp_engine):
self.get_response = get_response
self.rasp = rasp_engine
def __call__(self, request):
# Extract context
context = self._extract_context(request)
# Analyze request
events = self.rasp.analyze_request(context)
# Check if should block
if self.rasp.should_block(events):
from django.http import JsonResponse
return JsonResponse({
'error': 'Request blocked by security policy',
'request_id': context['request_id']
}, status=403)
# Store context for later use
request.rasp_context = {'events': events, 'request_id': context['request_id']}
response = self.get_response(request)
# Add security headers
for header, value in self.rasp.config.response_headers.items():
response[header] = value
return response
def _extract_context(self, request) -> Dict[str, Any]:
import json
import uuid
context = {
'request_id': str(uuid.uuid4()),
'method': request.method,
'path': request.path,
'client_ip': self._get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', ''),
'query_params': dict(request.GET),
'headers': {k: v for k, v in request.META.items() if k.startswith('HTTP_')},
}
# Extract body
if request.content_type == 'application/json':
try:
context['body_params'] = json.loads(request.body)
except:
context['body_params'] = {}
else:
context['body_params'] = dict(request.POST)
# User ID
if hasattr(request, 'user') and request.user.is_authenticated:
context['user_id'] = str(request.user.id)
return context
def _get_client_ip(self, request) -> str:
xff = request.META.get('HTTP_X_FORWARDED_FOR', '')
if xff:
return xff.split(',')[0].strip()
return request.META.get('REMOTE_ADDR', '0.0.0.0')Behavioral Analysis
# behavioral_analysis.py
"""
Behavioral analysis for RASP.
"""
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import threading
@dataclass
class BehaviorProfile:
"""User/IP behavior profile."""
identifier: str
identifier_type: str # 'ip' or 'user'
request_count: int = 0
error_count: int = 0
blocked_count: int = 0
unique_paths: set = None
unique_user_agents: set = None
first_seen: datetime = None
last_seen: datetime = None
anomaly_score: float = 0.0
def __post_init__(self):
if self.unique_paths is None:
self.unique_paths = set()
if self.unique_user_agents is None:
self.unique_user_agents = set()
class BehavioralAnalyzer:
"""
Analyzes request patterns to detect anomalous behavior.
"""
def __init__(self, config: Dict):
self.config = config
self.profiles: Dict[str, BehaviorProfile] = {}
self.request_windows: Dict[str, List[datetime]] = defaultdict(list)
self._lock = threading.Lock()
# Thresholds
self.rate_limit = config.get('rate_limit', 100) # requests per minute
self.rate_window = config.get('rate_window', 60) # seconds
self.path_diversity_threshold = config.get('path_diversity_threshold', 50)
self.error_rate_threshold = config.get('error_rate_threshold', 0.5)
def analyze(self, context: Dict) -> Optional[Dict]:
"""Analyze request behavior and return anomalies."""
client_ip = context.get('client_ip', 'unknown')
user_id = context.get('user_id')
anomalies = []
# Analyze by IP
ip_anomalies = self._analyze_entity(client_ip, 'ip', context)
anomalies.extend(ip_anomalies)
# Analyze by user if authenticated
if user_id:
user_anomalies = self._analyze_entity(user_id, 'user', context)
anomalies.extend(user_anomalies)
if anomalies:
return {
'anomalies': anomalies,
'client_ip': client_ip,
'user_id': user_id
}
return None
def _analyze_entity(
self,
identifier: str,
identifier_type: str,
context: Dict
) -> List[Dict]:
"""Analyze behavior for a specific entity."""
anomalies = []
with self._lock:
# Get or create profile
profile_key = f"{identifier_type}:{identifier}"
if profile_key not in self.profiles:
self.profiles[profile_key] = BehaviorProfile(
identifier=identifier,
identifier_type=identifier_type,
first_seen=datetime.utcnow()
)
profile = self.profiles[profile_key]
# Update profile
profile.request_count += 1
profile.last_seen = datetime.utcnow()
profile.unique_paths.add(context.get('path', '/'))
profile.unique_user_agents.add(context.get('user_agent', ''))
# Check rate limiting
rate_anomaly = self._check_rate_limit(profile_key)
if rate_anomaly:
anomalies.append(rate_anomaly)
# Check path diversity (potential scanning)
if len(profile.unique_paths) > self.path_diversity_threshold:
anomalies.append({
'type': 'high_path_diversity',
'severity': 'medium',
'description': f'Entity accessing unusually high number of unique paths ({len(profile.unique_paths)})',
'identifier': identifier,
'identifier_type': identifier_type
})
# Check user agent diversity (potential bot)
if len(profile.unique_user_agents) > 5:
anomalies.append({
'type': 'user_agent_rotation',
'severity': 'medium',
'description': 'Multiple user agents from same source',
'identifier': identifier,
'identifier_type': identifier_type
})
# Check error rate
if profile.request_count > 10:
error_rate = profile.error_count / profile.request_count
if error_rate > self.error_rate_threshold:
anomalies.append({
'type': 'high_error_rate',
'severity': 'medium',
'description': f'High error rate: {error_rate:.1%}',
'identifier': identifier,
'identifier_type': identifier_type
})
return anomalies
def _check_rate_limit(self, profile_key: str) -> Optional[Dict]:
"""Check if entity is exceeding rate limits."""
now = datetime.utcnow()
window_start = now - timedelta(seconds=self.rate_window)
# Add current request
self.request_windows[profile_key].append(now)
# Clean old entries
self.request_windows[profile_key] = [
t for t in self.request_windows[profile_key]
if t > window_start
]
request_count = len(self.request_windows[profile_key])
if request_count > self.rate_limit:
return {
'type': 'rate_limit_exceeded',
'severity': 'high',
'description': f'Rate limit exceeded: {request_count} requests in {self.rate_window}s',
'request_count': request_count,
'limit': self.rate_limit
}
return None
def record_error(self, identifier: str, identifier_type: str):
"""Record an error for the entity."""
profile_key = f"{identifier_type}:{identifier}"
with self._lock:
if profile_key in self.profiles:
self.profiles[profile_key].error_count += 1
def record_block(self, identifier: str, identifier_type: str):
"""Record a blocked request."""
profile_key = f"{identifier_type}:{identifier}"
with self._lock:
if profile_key in self.profiles:
self.profiles[profile_key].blocked_count += 1
def get_profile(self, identifier: str, identifier_type: str) -> Optional[BehaviorProfile]:
"""Get behavior profile for an entity."""
profile_key = f"{identifier_type}:{identifier}"
return self.profiles.get(profile_key)
def cleanup_old_profiles(self, max_age_hours: int = 24):
"""Remove old profiles to prevent memory issues."""
cutoff = datetime.utcnow() - timedelta(hours=max_age_hours)
with self._lock:
keys_to_remove = [
key for key, profile in self.profiles.items()
if profile.last_seen < cutoff
]
for key in keys_to_remove:
del self.profiles[key]
if key in self.request_windows:
del self.request_windows[key]Conclusion
RASP provides a powerful layer of defense by embedding security directly into applications:
- Pattern-based detection catches known attack signatures
- Behavioral analysis identifies anomalous activity
- Context-aware blocking makes accurate decisions with full request context
- Real-time protection stops attacks as they happen
When combined with traditional security measures like WAFs and SAST/DAST, RASP creates defense in depth that significantly reduces application risk.