DevSecOps

Runtime Application Self-Protection (RASP): Implementation and Best Practices

DeviDevs Team
12 min read
#rasp#application-security#runtime-protection#threat-detection#devsecops#security

Runtime Application Self-Protection (RASP): Implementation and Best Practices

Runtime Application Self-Protection (RASP) provides real-time security by embedding protection directly into application runtime environments. Unlike perimeter defenses, RASP can detect and block attacks with full context of application behavior.

Understanding RASP Architecture

RASP operates at the application layer, intercepting requests and monitoring execution to detect malicious behavior in real-time.

Core Components

# rasp_core.py
"""
Core RASP framework implementation.
"""
 
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable
from datetime import datetime
from enum import Enum
import threading
import logging
import hashlib
import re
 
 
class ThreatLevel(Enum):
    """Threat severity levels."""
    INFO = "info"
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"
 
 
class ResponseAction(Enum):
    """RASP response actions."""
    LOG = "log"
    ALERT = "alert"
    BLOCK = "block"
    SANITIZE = "sanitize"
    TERMINATE = "terminate"
 
 
@dataclass
class SecurityEvent:
    """Represents a detected security event."""
    event_id: str
    timestamp: datetime
    event_type: str
    threat_level: ThreatLevel
    description: str
    source_ip: Optional[str]
    user_id: Optional[str]
    request_id: Optional[str]
    attack_vector: str
    payload: Optional[str]
    stack_trace: Optional[str]
    context: Dict[str, Any] = field(default_factory=dict)
 
 
@dataclass
class RASPConfig:
    """RASP configuration."""
    mode: str = "monitor"  # monitor, block, learn
    block_threshold: ThreatLevel = ThreatLevel.HIGH
    log_level: str = "INFO"
    max_payload_log_size: int = 1024
    enable_stack_traces: bool = True
    custom_rules: List[Dict] = field(default_factory=list)
    whitelist_paths: List[str] = field(default_factory=list)
    response_headers: Dict[str, str] = field(default_factory=dict)
 
 
class SecuritySensor(ABC):
    """Base class for RASP security sensors."""
 
    def __init__(self, name: str, config: RASPConfig):
        self.name = name
        self.config = config
        self.enabled = True
        self.events_detected = 0
 
    @abstractmethod
    def analyze(self, context: Dict[str, Any]) -> Optional[SecurityEvent]:
        """Analyze context for security threats."""
        pass
 
    @abstractmethod
    def get_attack_types(self) -> List[str]:
        """Return list of attack types this sensor detects."""
        pass
 
 
class SQLInjectionSensor(SecuritySensor):
    """Detects SQL injection attacks."""
 
    SQL_PATTERNS = [
        r"(\b(union|select|insert|update|delete|drop|truncate|alter)\b.*\b(from|into|set|table)\b)",
        r"(--|\#|\/\*|\*\/)",
        r"(\b(or|and)\b\s+[\w\s]*=[\w\s]*)",
        r"(;.*\b(drop|delete|truncate)\b)",
        r"(\bwaitfor\s+delay\b)",
        r"(\bbenchmark\s*\()",
        r"(\bsleep\s*\()",
        r"(0x[0-9a-fA-F]+)",
        r"(\bchar\s*\(\d+\))",
    ]
 
    def __init__(self, config: RASPConfig):
        super().__init__("SQLInjectionSensor", config)
        self.compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.SQL_PATTERNS]
 
    def analyze(self, context: Dict[str, Any]) -> Optional[SecurityEvent]:
        """Check for SQL injection patterns."""
        inputs_to_check = []
 
        # Collect all user inputs
        if 'query_params' in context:
            inputs_to_check.extend(context['query_params'].values())
        if 'body_params' in context:
            inputs_to_check.extend(self._flatten_dict(context['body_params']))
        if 'path' in context:
            inputs_to_check.append(context['path'])
 
        for input_value in inputs_to_check:
            if not isinstance(input_value, str):
                continue
 
            for pattern in self.compiled_patterns:
                if pattern.search(input_value):
                    return SecurityEvent(
                        event_id=self._generate_event_id(),
                        timestamp=datetime.utcnow(),
                        event_type="sql_injection",
                        threat_level=ThreatLevel.HIGH,
                        description=f"SQL injection pattern detected: {pattern.pattern[:50]}",
                        source_ip=context.get('client_ip'),
                        user_id=context.get('user_id'),
                        request_id=context.get('request_id'),
                        attack_vector="sql_injection",
                        payload=input_value[:self.config.max_payload_log_size],
                        stack_trace=self._get_stack_trace() if self.config.enable_stack_traces else None,
                        context={'matched_pattern': pattern.pattern}
                    )
 
        return None
 
    def get_attack_types(self) -> List[str]:
        return ["sql_injection", "sql_injection_time_based", "sql_injection_union"]
 
    def _flatten_dict(self, d: Dict, parent_key: str = '') -> List[str]:
        """Flatten nested dictionary values."""
        items = []
        for k, v in d.items():
            if isinstance(v, dict):
                items.extend(self._flatten_dict(v, f"{parent_key}{k}."))
            elif isinstance(v, list):
                for item in v:
                    if isinstance(item, str):
                        items.append(item)
            elif isinstance(v, str):
                items.append(v)
        return items
 
    def _generate_event_id(self) -> str:
        import secrets
        return f"EVT-{secrets.token_hex(8)}"
 
    def _get_stack_trace(self) -> str:
        import traceback
        return ''.join(traceback.format_stack()[:-2])
 
 
class XSSSensor(SecuritySensor):
    """Detects Cross-Site Scripting (XSS) attacks."""
 
    XSS_PATTERNS = [
        r"<script[^>]*>.*</script>",
        r"javascript\s*:",
        r"on\w+\s*=",
        r"<\s*img[^>]+onerror\s*=",
        r"<\s*svg[^>]+onload\s*=",
        r"<\s*iframe",
        r"<\s*object",
        r"<\s*embed",
        r"expression\s*\(",
        r"url\s*\(\s*['\"]?\s*javascript:",
    ]
 
    def __init__(self, config: RASPConfig):
        super().__init__("XSSSensor", config)
        self.compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.XSS_PATTERNS]
 
    def analyze(self, context: Dict[str, Any]) -> Optional[SecurityEvent]:
        """Check for XSS patterns."""
        inputs_to_check = []
 
        if 'query_params' in context:
            inputs_to_check.extend(context['query_params'].values())
        if 'body_params' in context:
            inputs_to_check.extend(self._flatten_dict(context['body_params']))
        if 'headers' in context:
            # Check specific headers that might be reflected
            for header in ['referer', 'user-agent']:
                if header in context['headers']:
                    inputs_to_check.append(context['headers'][header])
 
        for input_value in inputs_to_check:
            if not isinstance(input_value, str):
                continue
 
            for pattern in self.compiled_patterns:
                if pattern.search(input_value):
                    return SecurityEvent(
                        event_id=self._generate_event_id(),
                        timestamp=datetime.utcnow(),
                        event_type="xss",
                        threat_level=ThreatLevel.HIGH,
                        description=f"XSS pattern detected: {pattern.pattern[:50]}",
                        source_ip=context.get('client_ip'),
                        user_id=context.get('user_id'),
                        request_id=context.get('request_id'),
                        attack_vector="xss",
                        payload=input_value[:self.config.max_payload_log_size],
                        stack_trace=None,
                        context={'matched_pattern': pattern.pattern}
                    )
 
        return None
 
    def get_attack_types(self) -> List[str]:
        return ["xss_reflected", "xss_stored", "xss_dom"]
 
    def _flatten_dict(self, d: Dict, parent_key: str = '') -> List[str]:
        items = []
        for k, v in d.items():
            if isinstance(v, dict):
                items.extend(self._flatten_dict(v))
            elif isinstance(v, str):
                items.append(v)
        return items
 
    def _generate_event_id(self) -> str:
        import secrets
        return f"EVT-{secrets.token_hex(8)}"
 
 
class PathTraversalSensor(SecuritySensor):
    """Detects path traversal attacks."""
 
    TRAVERSAL_PATTERNS = [
        r"\.\./",
        r"\.\.\\",
        r"%2e%2e%2f",
        r"%2e%2e/",
        r"\.%2e/",
        r"%2e\./",
        r"\.\.%5c",
        r"%252e%252e%255c",
        r"/etc/passwd",
        r"/etc/shadow",
        r"c:\\windows",
        r"/proc/self",
    ]
 
    def __init__(self, config: RASPConfig):
        super().__init__("PathTraversalSensor", config)
        self.compiled_patterns = [re.compile(p, re.IGNORECASE) for p in self.TRAVERSAL_PATTERNS]
 
    def analyze(self, context: Dict[str, Any]) -> Optional[SecurityEvent]:
        """Check for path traversal patterns."""
        inputs_to_check = []
 
        if 'path' in context:
            inputs_to_check.append(context['path'])
        if 'query_params' in context:
            inputs_to_check.extend(context['query_params'].values())
        if 'body_params' in context:
            for v in context['body_params'].values():
                if isinstance(v, str):
                    inputs_to_check.append(v)
 
        for input_value in inputs_to_check:
            if not isinstance(input_value, str):
                continue
 
            for pattern in self.compiled_patterns:
                if pattern.search(input_value):
                    return SecurityEvent(
                        event_id=self._generate_event_id(),
                        timestamp=datetime.utcnow(),
                        event_type="path_traversal",
                        threat_level=ThreatLevel.HIGH,
                        description="Path traversal attempt detected",
                        source_ip=context.get('client_ip'),
                        user_id=context.get('user_id'),
                        request_id=context.get('request_id'),
                        attack_vector="path_traversal",
                        payload=input_value[:self.config.max_payload_log_size],
                        stack_trace=None,
                        context={'matched_pattern': pattern.pattern}
                    )
 
        return None
 
    def get_attack_types(self) -> List[str]:
        return ["path_traversal", "lfi", "rfi"]
 
    def _generate_event_id(self) -> str:
        import secrets
        return f"EVT-{secrets.token_hex(8)}"
 
 
class RASPEngine:
    """Main RASP engine that coordinates sensors and responses."""
 
    def __init__(self, config: RASPConfig):
        self.config = config
        self.sensors: List[SecuritySensor] = []
        self.event_handlers: List[Callable[[SecurityEvent], None]] = []
        self.logger = logging.getLogger("RASP")
        self._lock = threading.Lock()
 
        # Initialize default sensors
        self._init_default_sensors()
 
    def _init_default_sensors(self):
        """Initialize default security sensors."""
        self.sensors.append(SQLInjectionSensor(self.config))
        self.sensors.append(XSSSensor(self.config))
        self.sensors.append(PathTraversalSensor(self.config))
 
    def add_sensor(self, sensor: SecuritySensor):
        """Add a custom sensor."""
        with self._lock:
            self.sensors.append(sensor)
 
    def add_event_handler(self, handler: Callable[[SecurityEvent], None]):
        """Add an event handler."""
        self.event_handlers.append(handler)
 
    def analyze_request(self, context: Dict[str, Any]) -> List[SecurityEvent]:
        """Analyze a request through all sensors."""
        events = []
 
        # Check whitelist
        if self._is_whitelisted(context):
            return events
 
        for sensor in self.sensors:
            if not sensor.enabled:
                continue
 
            try:
                event = sensor.analyze(context)
                if event:
                    events.append(event)
                    sensor.events_detected += 1
            except Exception as e:
                self.logger.error(f"Sensor {sensor.name} error: {e}")
 
        # Process events
        for event in events:
            self._process_event(event)
 
        return events
 
    def should_block(self, events: List[SecurityEvent]) -> bool:
        """Determine if request should be blocked."""
        if self.config.mode != "block":
            return False
 
        threshold_order = [ThreatLevel.INFO, ThreatLevel.LOW, ThreatLevel.MEDIUM,
                         ThreatLevel.HIGH, ThreatLevel.CRITICAL]
 
        for event in events:
            if threshold_order.index(event.threat_level) >= threshold_order.index(self.config.block_threshold):
                return True
 
        return False
 
    def get_response_action(self, events: List[SecurityEvent]) -> ResponseAction:
        """Determine appropriate response action."""
        if not events:
            return ResponseAction.LOG
 
        max_threat = max(events, key=lambda e: [ThreatLevel.INFO, ThreatLevel.LOW,
                                                ThreatLevel.MEDIUM, ThreatLevel.HIGH,
                                                ThreatLevel.CRITICAL].index(e.threat_level))
 
        if self.config.mode == "monitor":
            return ResponseAction.ALERT if max_threat.threat_level in [ThreatLevel.HIGH, ThreatLevel.CRITICAL] else ResponseAction.LOG
 
        if self.config.mode == "block":
            if max_threat.threat_level == ThreatLevel.CRITICAL:
                return ResponseAction.TERMINATE
            elif max_threat.threat_level == ThreatLevel.HIGH:
                return ResponseAction.BLOCK
            else:
                return ResponseAction.ALERT
 
        return ResponseAction.LOG
 
    def _is_whitelisted(self, context: Dict[str, Any]) -> bool:
        """Check if request path is whitelisted."""
        path = context.get('path', '')
        for whitelist_pattern in self.config.whitelist_paths:
            if re.match(whitelist_pattern, path):
                return True
        return False
 
    def _process_event(self, event: SecurityEvent):
        """Process a security event."""
        # Log event
        self.logger.warning(
            f"Security event detected: {event.event_type} "
            f"[{event.threat_level.value}] - {event.description}"
        )
 
        # Call handlers
        for handler in self.event_handlers:
            try:
                handler(event)
            except Exception as e:
                self.logger.error(f"Event handler error: {e}")
 
    def get_stats(self) -> Dict[str, Any]:
        """Get RASP statistics."""
        return {
            "mode": self.config.mode,
            "sensors": [
                {
                    "name": s.name,
                    "enabled": s.enabled,
                    "events_detected": s.events_detected,
                    "attack_types": s.get_attack_types()
                }
                for s in self.sensors
            ]
        }

HTTP Middleware Integration

# rasp_middleware.py
"""
RASP middleware for web frameworks.
"""
 
from typing import Callable, Dict, Any
import time
import uuid
from functools import wraps
 
 
class RASPMiddleware:
    """
    Generic RASP middleware that can be adapted to various frameworks.
    """
 
    def __init__(self, rasp_engine, app=None):
        self.rasp = rasp_engine
        self.app = app
 
    def extract_context(self, request) -> Dict[str, Any]:
        """Extract security-relevant context from request."""
        context = {
            'request_id': str(uuid.uuid4()),
            'timestamp': time.time(),
            'method': getattr(request, 'method', 'GET'),
            'path': getattr(request, 'path', '/'),
            'client_ip': self._get_client_ip(request),
            'user_agent': request.headers.get('User-Agent', ''),
            'content_type': request.headers.get('Content-Type', ''),
            'query_params': dict(request.args) if hasattr(request, 'args') else {},
            'headers': dict(request.headers) if hasattr(request, 'headers') else {},
        }
 
        # Extract body params based on content type
        if hasattr(request, 'json') and request.json:
            context['body_params'] = request.json
        elif hasattr(request, 'form'):
            context['body_params'] = dict(request.form)
 
        # Extract user ID if authenticated
        if hasattr(request, 'user') and request.user:
            context['user_id'] = str(getattr(request.user, 'id', None))
 
        return context
 
    def _get_client_ip(self, request) -> str:
        """Get real client IP, handling proxies."""
        # Check X-Forwarded-For first
        xff = request.headers.get('X-Forwarded-For', '')
        if xff:
            return xff.split(',')[0].strip()
 
        # Check X-Real-IP
        xri = request.headers.get('X-Real-IP', '')
        if xri:
            return xri
 
        # Fall back to remote address
        return getattr(request, 'remote_addr', '0.0.0.0')
 
    def create_blocked_response(self, events):
        """Create a response for blocked requests."""
        return {
            'status_code': 403,
            'body': {
                'error': 'Request blocked by security policy',
                'request_id': events[0].request_id if events else None
            },
            'headers': {
                'X-RASP-Block': 'true',
                **self.rasp.config.response_headers
            }
        }
 
 
# Flask integration
def flask_rasp_middleware(rasp_engine):
    """Create Flask RASP middleware."""
    from flask import request, jsonify, g
 
    middleware = RASPMiddleware(rasp_engine)
 
    def decorator(f):
        @wraps(f)
        def wrapped(*args, **kwargs):
            # Extract context
            context = middleware.extract_context(request)
            g.rasp_request_id = context['request_id']
 
            # Analyze request
            events = rasp_engine.analyze_request(context)
 
            # Check if should block
            if rasp_engine.should_block(events):
                response = middleware.create_blocked_response(events)
                return jsonify(response['body']), response['status_code'], response['headers']
 
            # Continue with request
            return f(*args, **kwargs)
 
        return wrapped
    return decorator
 
 
# Express.js/Node.js style middleware (for reference)
EXPRESS_MIDDLEWARE_CODE = '''
// rasp-middleware.js
const RASPEngine = require('./rasp-engine');
 
function createRASPMiddleware(config) {
  const rasp = new RASPEngine(config);
 
  return function(req, res, next) {
    const context = {
      request_id: req.id || require('uuid').v4(),
      method: req.method,
      path: req.path,
      client_ip: req.ip || req.connection.remoteAddress,
      user_agent: req.get('User-Agent'),
      query_params: req.query,
      body_params: req.body,
      headers: req.headers,
      user_id: req.user?.id
    };
 
    const events = rasp.analyzeRequest(context);
 
    if (rasp.shouldBlock(events)) {
      return res.status(403).json({
        error: 'Request blocked by security policy',
        request_id: context.request_id
      });
    }
 
    // Attach RASP context for logging
    req.raspContext = { events, request_id: context.request_id };
 
    next();
  };
}
 
module.exports = createRASPMiddleware;
'''
 
 
# Django middleware
class DjangoRASPMiddleware:
    """Django RASP middleware."""
 
    def __init__(self, get_response, rasp_engine):
        self.get_response = get_response
        self.rasp = rasp_engine
 
    def __call__(self, request):
        # Extract context
        context = self._extract_context(request)
 
        # Analyze request
        events = self.rasp.analyze_request(context)
 
        # Check if should block
        if self.rasp.should_block(events):
            from django.http import JsonResponse
            return JsonResponse({
                'error': 'Request blocked by security policy',
                'request_id': context['request_id']
            }, status=403)
 
        # Store context for later use
        request.rasp_context = {'events': events, 'request_id': context['request_id']}
 
        response = self.get_response(request)
 
        # Add security headers
        for header, value in self.rasp.config.response_headers.items():
            response[header] = value
 
        return response
 
    def _extract_context(self, request) -> Dict[str, Any]:
        import json
        import uuid
 
        context = {
            'request_id': str(uuid.uuid4()),
            'method': request.method,
            'path': request.path,
            'client_ip': self._get_client_ip(request),
            'user_agent': request.META.get('HTTP_USER_AGENT', ''),
            'query_params': dict(request.GET),
            'headers': {k: v for k, v in request.META.items() if k.startswith('HTTP_')},
        }
 
        # Extract body
        if request.content_type == 'application/json':
            try:
                context['body_params'] = json.loads(request.body)
            except:
                context['body_params'] = {}
        else:
            context['body_params'] = dict(request.POST)
 
        # User ID
        if hasattr(request, 'user') and request.user.is_authenticated:
            context['user_id'] = str(request.user.id)
 
        return context
 
    def _get_client_ip(self, request) -> str:
        xff = request.META.get('HTTP_X_FORWARDED_FOR', '')
        if xff:
            return xff.split(',')[0].strip()
        return request.META.get('REMOTE_ADDR', '0.0.0.0')

Behavioral Analysis

# behavioral_analysis.py
"""
Behavioral analysis for RASP.
"""
 
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass
import threading
 
 
@dataclass
class BehaviorProfile:
    """User/IP behavior profile."""
    identifier: str
    identifier_type: str  # 'ip' or 'user'
    request_count: int = 0
    error_count: int = 0
    blocked_count: int = 0
    unique_paths: set = None
    unique_user_agents: set = None
    first_seen: datetime = None
    last_seen: datetime = None
    anomaly_score: float = 0.0
 
    def __post_init__(self):
        if self.unique_paths is None:
            self.unique_paths = set()
        if self.unique_user_agents is None:
            self.unique_user_agents = set()
 
 
class BehavioralAnalyzer:
    """
    Analyzes request patterns to detect anomalous behavior.
    """
 
    def __init__(self, config: Dict):
        self.config = config
        self.profiles: Dict[str, BehaviorProfile] = {}
        self.request_windows: Dict[str, List[datetime]] = defaultdict(list)
        self._lock = threading.Lock()
 
        # Thresholds
        self.rate_limit = config.get('rate_limit', 100)  # requests per minute
        self.rate_window = config.get('rate_window', 60)  # seconds
        self.path_diversity_threshold = config.get('path_diversity_threshold', 50)
        self.error_rate_threshold = config.get('error_rate_threshold', 0.5)
 
    def analyze(self, context: Dict) -> Optional[Dict]:
        """Analyze request behavior and return anomalies."""
        client_ip = context.get('client_ip', 'unknown')
        user_id = context.get('user_id')
 
        anomalies = []
 
        # Analyze by IP
        ip_anomalies = self._analyze_entity(client_ip, 'ip', context)
        anomalies.extend(ip_anomalies)
 
        # Analyze by user if authenticated
        if user_id:
            user_anomalies = self._analyze_entity(user_id, 'user', context)
            anomalies.extend(user_anomalies)
 
        if anomalies:
            return {
                'anomalies': anomalies,
                'client_ip': client_ip,
                'user_id': user_id
            }
 
        return None
 
    def _analyze_entity(
        self,
        identifier: str,
        identifier_type: str,
        context: Dict
    ) -> List[Dict]:
        """Analyze behavior for a specific entity."""
        anomalies = []
 
        with self._lock:
            # Get or create profile
            profile_key = f"{identifier_type}:{identifier}"
            if profile_key not in self.profiles:
                self.profiles[profile_key] = BehaviorProfile(
                    identifier=identifier,
                    identifier_type=identifier_type,
                    first_seen=datetime.utcnow()
                )
 
            profile = self.profiles[profile_key]
 
            # Update profile
            profile.request_count += 1
            profile.last_seen = datetime.utcnow()
            profile.unique_paths.add(context.get('path', '/'))
            profile.unique_user_agents.add(context.get('user_agent', ''))
 
            # Check rate limiting
            rate_anomaly = self._check_rate_limit(profile_key)
            if rate_anomaly:
                anomalies.append(rate_anomaly)
 
            # Check path diversity (potential scanning)
            if len(profile.unique_paths) > self.path_diversity_threshold:
                anomalies.append({
                    'type': 'high_path_diversity',
                    'severity': 'medium',
                    'description': f'Entity accessing unusually high number of unique paths ({len(profile.unique_paths)})',
                    'identifier': identifier,
                    'identifier_type': identifier_type
                })
 
            # Check user agent diversity (potential bot)
            if len(profile.unique_user_agents) > 5:
                anomalies.append({
                    'type': 'user_agent_rotation',
                    'severity': 'medium',
                    'description': 'Multiple user agents from same source',
                    'identifier': identifier,
                    'identifier_type': identifier_type
                })
 
            # Check error rate
            if profile.request_count > 10:
                error_rate = profile.error_count / profile.request_count
                if error_rate > self.error_rate_threshold:
                    anomalies.append({
                        'type': 'high_error_rate',
                        'severity': 'medium',
                        'description': f'High error rate: {error_rate:.1%}',
                        'identifier': identifier,
                        'identifier_type': identifier_type
                    })
 
        return anomalies
 
    def _check_rate_limit(self, profile_key: str) -> Optional[Dict]:
        """Check if entity is exceeding rate limits."""
        now = datetime.utcnow()
        window_start = now - timedelta(seconds=self.rate_window)
 
        # Add current request
        self.request_windows[profile_key].append(now)
 
        # Clean old entries
        self.request_windows[profile_key] = [
            t for t in self.request_windows[profile_key]
            if t > window_start
        ]
 
        request_count = len(self.request_windows[profile_key])
 
        if request_count > self.rate_limit:
            return {
                'type': 'rate_limit_exceeded',
                'severity': 'high',
                'description': f'Rate limit exceeded: {request_count} requests in {self.rate_window}s',
                'request_count': request_count,
                'limit': self.rate_limit
            }
 
        return None
 
    def record_error(self, identifier: str, identifier_type: str):
        """Record an error for the entity."""
        profile_key = f"{identifier_type}:{identifier}"
        with self._lock:
            if profile_key in self.profiles:
                self.profiles[profile_key].error_count += 1
 
    def record_block(self, identifier: str, identifier_type: str):
        """Record a blocked request."""
        profile_key = f"{identifier_type}:{identifier}"
        with self._lock:
            if profile_key in self.profiles:
                self.profiles[profile_key].blocked_count += 1
 
    def get_profile(self, identifier: str, identifier_type: str) -> Optional[BehaviorProfile]:
        """Get behavior profile for an entity."""
        profile_key = f"{identifier_type}:{identifier}"
        return self.profiles.get(profile_key)
 
    def cleanup_old_profiles(self, max_age_hours: int = 24):
        """Remove old profiles to prevent memory issues."""
        cutoff = datetime.utcnow() - timedelta(hours=max_age_hours)
 
        with self._lock:
            keys_to_remove = [
                key for key, profile in self.profiles.items()
                if profile.last_seen < cutoff
            ]
 
            for key in keys_to_remove:
                del self.profiles[key]
                if key in self.request_windows:
                    del self.request_windows[key]

Conclusion

RASP provides a powerful layer of defense by embedding security directly into applications:

  1. Pattern-based detection catches known attack signatures
  2. Behavioral analysis identifies anomalous activity
  3. Context-aware blocking makes accurate decisions with full request context
  4. Real-time protection stops attacks as they happen

When combined with traditional security measures like WAFs and SAST/DAST, RASP creates defense in depth that significantly reduces application risk.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.