Runtime Application Self-Protection (RASP) provides real-time attack detection and prevention from within the application. This guide covers implementing RASP capabilities for comprehensive runtime security.
RASP Architecture Overview
Build a RASP framework for application protection:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
from datetime import datetime
from abc import ABC, abstractmethod
import re
import hashlib
import threading
import functools
class AttackType(Enum):
SQL_INJECTION = "sql_injection"
XSS = "xss"
COMMAND_INJECTION = "command_injection"
PATH_TRAVERSAL = "path_traversal"
SSRF = "ssrf"
DESERIALIZATION = "deserialization"
XXE = "xxe"
LDAP_INJECTION = "ldap_injection"
class ActionType(Enum):
BLOCK = "block"
LOG = "log"
ALERT = "alert"
SANITIZE = "sanitize"
class Severity(Enum):
CRITICAL = 4
HIGH = 3
MEDIUM = 2
LOW = 1
@dataclass
class SecurityEvent:
event_id: str
attack_type: AttackType
severity: Severity
timestamp: datetime
request_id: str
source_ip: str
user_id: Optional[str]
endpoint: str
payload: str
context: Dict[str, Any]
action_taken: ActionType
blocked: bool
@dataclass
class RASPRule:
rule_id: str
attack_type: AttackType
severity: Severity
pattern: str
description: str
action: ActionType
enabled: bool = True
compile_pattern: re.Pattern = field(init=False)
def __post_init__(self):
self.compiled_pattern = re.compile(self.pattern, re.IGNORECASE)
class RASPEngine:
def __init__(self):
self.rules: Dict[AttackType, List[RASPRule]] = {}
self.events: List[SecurityEvent] = []
self.event_handlers: List[Callable] = []
self.mode = 'blocking'
self._lock = threading.Lock()
self._load_default_rules()
def _load_default_rules(self):
"""Load default security rules."""
default_rules = [
# SQL Injection Rules
RASPRule(
rule_id="SQLI-001",
attack_type=AttackType.SQL_INJECTION,
severity=Severity.CRITICAL,
pattern=r"(\b(union|select|insert|update|delete|drop|truncate)\b.*\b(from|into|where|set)\b)",
description="SQL keyword injection detected",
action=ActionType.BLOCK
),
RASPRule(
rule_id="SQLI-002",
attack_type=AttackType.SQL_INJECTION,
severity=Severity.HIGH,
pattern=r"('|\")\s*(or|and)\s*('|\")?\s*\d+\s*=\s*\d+",
description="SQL boolean injection detected",
action=ActionType.BLOCK
),
RASPRule(
rule_id="SQLI-003",
attack_type=AttackType.SQL_INJECTION,
severity=Severity.HIGH,
pattern=r";\s*(drop|delete|truncate|alter)\s+",
description="SQL statement termination attack",
action=ActionType.BLOCK
),
# XSS Rules
RASPRule(
rule_id="XSS-001",
attack_type=AttackType.XSS,
severity=Severity.HIGH,
pattern=r"<script[^>]*>.*?</script>",
description="Script tag injection detected",
action=ActionType.SANITIZE
),
RASPRule(
rule_id="XSS-002",
attack_type=AttackType.XSS,
severity=Severity.HIGH,
pattern=r"(on\w+)\s*=\s*['\"]?[^'\"]*['\"]?",
description="Event handler injection detected",
action=ActionType.SANITIZE
),
RASPRule(
rule_id="XSS-003",
attack_type=AttackType.XSS,
severity=Severity.MEDIUM,
pattern=r"javascript\s*:",
description="JavaScript protocol injection",
action=ActionType.BLOCK
),
# Command Injection Rules
RASPRule(
rule_id="CMDI-001",
attack_type=AttackType.COMMAND_INJECTION,
severity=Severity.CRITICAL,
pattern=r"[;&|`$]|\$\(|\bexec\b|\beval\b",
description="Command injection characters detected",
action=ActionType.BLOCK
),
RASPRule(
rule_id="CMDI-002",
attack_type=AttackType.COMMAND_INJECTION,
severity=Severity.CRITICAL,
pattern=r"\b(cat|ls|pwd|whoami|id|uname|wget|curl|nc|bash|sh)\b",
description="Shell command detected in input",
action=ActionType.BLOCK
),
# Path Traversal Rules
RASPRule(
rule_id="PATH-001",
attack_type=AttackType.PATH_TRAVERSAL,
severity=Severity.HIGH,
pattern=r"\.\.[\\/]|\.\.%2[fF]",
description="Directory traversal sequence detected",
action=ActionType.BLOCK
),
RASPRule(
rule_id="PATH-002",
attack_type=AttackType.PATH_TRAVERSAL,
severity=Severity.HIGH,
pattern=r"(etc/(passwd|shadow)|windows/system32)",
description="Sensitive file path detected",
action=ActionType.BLOCK
),
# SSRF Rules
RASPRule(
rule_id="SSRF-001",
attack_type=AttackType.SSRF,
severity=Severity.HIGH,
pattern=r"(localhost|127\.0\.0\.1|0\.0\.0\.0|::1|169\.254\.\d+\.\d+)",
description="Internal address in URL detected",
action=ActionType.BLOCK
),
RASPRule(
rule_id="SSRF-002",
attack_type=AttackType.SSRF,
severity=Severity.MEDIUM,
pattern=r"(file|gopher|dict|ftp)://",
description="Dangerous URL scheme detected",
action=ActionType.BLOCK
)
]
for rule in default_rules:
if rule.attack_type not in self.rules:
self.rules[rule.attack_type] = []
self.rules[rule.attack_type].append(rule)
def analyze(self, input_data: str, context: Dict[str, Any] = None) -> Optional[SecurityEvent]:
"""Analyze input for security threats."""
context = context or {}
for attack_type, rules in self.rules.items():
for rule in rules:
if not rule.enabled:
continue
if rule.compiled_pattern.search(input_data):
event = self._create_event(rule, input_data, context)
self._handle_event(event)
if rule.action == ActionType.BLOCK and self.mode == 'blocking':
return event
return None
def _create_event(self, rule: RASPRule, payload: str, context: Dict) -> SecurityEvent:
"""Create security event from rule match."""
return SecurityEvent(
event_id=self._generate_event_id(),
attack_type=rule.attack_type,
severity=rule.severity,
timestamp=datetime.utcnow(),
request_id=context.get('request_id', 'unknown'),
source_ip=context.get('source_ip', 'unknown'),
user_id=context.get('user_id'),
endpoint=context.get('endpoint', 'unknown'),
payload=payload[:500],
context=context,
action_taken=rule.action,
blocked=rule.action == ActionType.BLOCK
)
def _generate_event_id(self) -> str:
return f"RASP-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{hashlib.sha256(str(datetime.utcnow().timestamp()).encode()).hexdigest()[:8]}"
def _handle_event(self, event: SecurityEvent):
"""Handle detected security event."""
with self._lock:
self.events.append(event)
for handler in self.event_handlers:
try:
handler(event)
except Exception as e:
print(f"Event handler error: {e}")
def register_handler(self, handler: Callable[[SecurityEvent], None]):
"""Register event handler."""
self.event_handlers.append(handler)
def sanitize(self, input_data: str, attack_type: AttackType) -> str:
"""Sanitize input by removing dangerous patterns."""
sanitized = input_data
for rule in self.rules.get(attack_type, []):
if rule.action == ActionType.SANITIZE:
sanitized = rule.compiled_pattern.sub('', sanitized)
return sanitizedMethod-Level Protection
Implement protection decorators:
from functools import wraps
from typing import List, Optional
import inspect
# Global RASP engine instance
_rasp_engine = RASPEngine()
def protect(
attack_types: List[AttackType] = None,
params: List[str] = None,
sanitize: bool = False
):
"""Decorator to protect function parameters from attacks."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Get function signature
sig = inspect.signature(func)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
# Determine which parameters to check
params_to_check = params or list(bound.arguments.keys())
# Build context
context = {
'function': func.__name__,
'module': func.__module__,
'request_id': kwargs.get('request_id', 'unknown')
}
# Analyze each parameter
for param_name in params_to_check:
if param_name not in bound.arguments:
continue
value = bound.arguments[param_name]
if not isinstance(value, str):
continue
context['parameter'] = param_name
# Check against specified or all attack types
types_to_check = attack_types or list(AttackType)
for attack_type in types_to_check:
event = _rasp_engine.analyze(value, context)
if event:
if sanitize:
bound.arguments[param_name] = _rasp_engine.sanitize(value, attack_type)
else:
raise SecurityException(
f"Security violation detected: {event.attack_type.value}",
event
)
return func(*bound.args, **bound.kwargs)
return wrapper
return decorator
class SecurityException(Exception):
def __init__(self, message: str, event: SecurityEvent):
super().__init__(message)
self.event = event
# Database query protection
def protect_query(func):
"""Decorator specifically for database query functions."""
@wraps(func)
def wrapper(query: str, params: tuple = None, *args, **kwargs):
context = {'function': func.__name__, 'query_type': 'database'}
# Analyze query string
event = _rasp_engine.analyze(query, context)
if event and event.attack_type == AttackType.SQL_INJECTION:
raise SecurityException("SQL injection detected in query", event)
# Analyze parameters
if params:
for i, param in enumerate(params):
if isinstance(param, str):
context['parameter_index'] = i
event = _rasp_engine.analyze(param, context)
if event:
raise SecurityException(f"Malicious input in parameter {i}", event)
return func(query, params, *args, **kwargs)
return wrapper
# URL validation for SSRF protection
def protect_url(func):
"""Decorator to protect URL parameters from SSRF."""
@wraps(func)
def wrapper(url: str, *args, **kwargs):
from urllib.parse import urlparse
context = {'function': func.__name__, 'input_type': 'url'}
# Basic RASP analysis
event = _rasp_engine.analyze(url, context)
if event and event.attack_type == AttackType.SSRF:
raise SecurityException("SSRF attempt detected", event)
# Additional URL validation
try:
parsed = urlparse(url)
# Block internal schemes
if parsed.scheme in ['file', 'gopher', 'dict']:
raise SecurityException("Blocked URL scheme", None)
# Block internal IPs
hostname = parsed.hostname or ''
if _is_internal_ip(hostname):
raise SecurityException("Internal IP not allowed", None)
except ValueError as e:
raise SecurityException(f"Invalid URL: {e}", None)
return func(url, *args, **kwargs)
return wrapper
def _is_internal_ip(hostname: str) -> bool:
"""Check if hostname resolves to internal IP."""
import socket
import ipaddress
try:
ip = socket.gethostbyname(hostname)
ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local
except (socket.gaierror, ValueError):
return FalseBehavioral Analysis
Implement behavioral anomaly detection:
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
import statistics
class BehavioralAnalyzer:
def __init__(self, rasp_engine: RASPEngine):
self.rasp_engine = rasp_engine
self.user_profiles: Dict[str, UserProfile] = {}
self.baseline_period = timedelta(days=7)
self.anomaly_threshold = 2.0
def analyze_request(self, request_context: Dict) -> Optional[SecurityEvent]:
"""Analyze request for behavioral anomalies."""
user_id = request_context.get('user_id', 'anonymous')
profile = self._get_or_create_profile(user_id)
# Update profile with current request
profile.record_request(request_context)
# Check for anomalies
anomalies = []
# Request rate anomaly
if profile.is_rate_anomalous():
anomalies.append(('rate', 'Unusual request rate detected'))
# Time-based anomaly
if profile.is_time_anomalous(request_context.get('timestamp', datetime.utcnow())):
anomalies.append(('time', 'Request at unusual time'))
# Endpoint anomaly
if profile.is_endpoint_anomalous(request_context.get('endpoint', '')):
anomalies.append(('endpoint', 'Access to unusual endpoint'))
# Parameter anomaly
if profile.is_parameter_anomalous(request_context.get('parameters', {})):
anomalies.append(('parameter', 'Unusual parameter patterns'))
if anomalies:
return self._create_behavioral_event(anomalies, request_context)
return None
def _get_or_create_profile(self, user_id: str) -> 'UserProfile':
if user_id not in self.user_profiles:
self.user_profiles[user_id] = UserProfile(user_id)
return self.user_profiles[user_id]
def _create_behavioral_event(self, anomalies: List[tuple], context: Dict) -> SecurityEvent:
severity = Severity.HIGH if len(anomalies) > 2 else Severity.MEDIUM
return SecurityEvent(
event_id=f"BEHAV-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
attack_type=AttackType.SQL_INJECTION,
severity=severity,
timestamp=datetime.utcnow(),
request_id=context.get('request_id', 'unknown'),
source_ip=context.get('source_ip', 'unknown'),
user_id=context.get('user_id'),
endpoint=context.get('endpoint', 'unknown'),
payload=str(anomalies),
context={**context, 'anomalies': anomalies},
action_taken=ActionType.ALERT,
blocked=False
)
class UserProfile:
def __init__(self, user_id: str):
self.user_id = user_id
self.request_times: List[datetime] = []
self.endpoints_accessed: Dict[str, int] = defaultdict(int)
self.hourly_activity: Dict[int, int] = defaultdict(int)
self.request_sizes: List[int] = []
self.baseline_established = False
def record_request(self, context: Dict):
"""Record request for profiling."""
now = datetime.utcnow()
self.request_times.append(now)
self.hourly_activity[now.hour] += 1
endpoint = context.get('endpoint', '')
self.endpoints_accessed[endpoint] += 1
if 'content_length' in context:
self.request_sizes.append(context['content_length'])
# Cleanup old data
cutoff = now - timedelta(days=30)
self.request_times = [t for t in self.request_times if t > cutoff]
def is_rate_anomalous(self) -> bool:
"""Check if current request rate is anomalous."""
if len(self.request_times) < 100:
return False
now = datetime.utcnow()
recent = [t for t in self.request_times if t > now - timedelta(minutes=5)]
historical = [t for t in self.request_times if t > now - timedelta(hours=24)]
if len(historical) < 10:
return False
current_rate = len(recent) / 5
historical_rate = len(historical) / (24 * 60)
return current_rate > historical_rate * 5
def is_time_anomalous(self, timestamp: datetime) -> bool:
"""Check if request time is anomalous for this user."""
hour = timestamp.hour
total_requests = sum(self.hourly_activity.values())
if total_requests < 100:
return False
hour_ratio = self.hourly_activity[hour] / total_requests
return hour_ratio < 0.01
def is_endpoint_anomalous(self, endpoint: str) -> bool:
"""Check if endpoint access is anomalous."""
if endpoint not in self.endpoints_accessed:
return True
return False
def is_parameter_anomalous(self, parameters: Dict) -> bool:
"""Check for anomalous parameter patterns."""
for key, value in parameters.items():
if isinstance(value, str):
if len(value) > 10000:
return True
if value.count('<') > 10 or value.count('>') > 10:
return True
return FalseIntegration with DevSecOps
Integrate RASP with CI/CD:
# .github/workflows/rasp-config-validation.yml
name: RASP Configuration Validation
on:
pull_request:
paths:
- 'security/rasp-rules.yaml'
- 'security/rasp-config.yaml'
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate RASP Rules
run: |
python scripts/validate_rasp_rules.py security/rasp-rules.yaml
- name: Test Rules Against Samples
run: |
python scripts/test_rasp_rules.py \
--rules security/rasp-rules.yaml \
--samples tests/attack-samples/
- name: Check for False Positive Rate
run: |
python scripts/fp_analysis.py \
--rules security/rasp-rules.yaml \
--legitimate tests/legitimate-samples/ \
--max-fp-rate 0.01
- name: Generate Rule Coverage Report
run: |
python scripts/coverage_report.py \
--rules security/rasp-rules.yaml \
--output rasp-coverage.html
- name: Upload Coverage Report
uses: actions/upload-artifact@v4
with:
name: rasp-coverage
path: rasp-coverage.htmlConclusion
RASP provides real-time attack detection and prevention from within applications. Implement pattern-based rules for known attacks, behavioral analysis for anomaly detection, and method-level protection decorators. Integrate with DevSecOps pipelines for continuous security validation. RASP complements WAF and other perimeter defenses by providing application-aware protection.