DevSecOps

Runtime Application Self-Protection (RASP) Implementation

DeviDevs Team
8 min read
#rasp#runtime-security#application-security#attack-detection#devsecops

Runtime Application Self-Protection (RASP) provides real-time attack detection and prevention from within the application. This guide covers implementing RASP capabilities for comprehensive runtime security.

RASP Architecture Overview

Build a RASP framework for application protection:

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
from datetime import datetime
from abc import ABC, abstractmethod
import re
import hashlib
import threading
import functools
 
class AttackType(Enum):
    SQL_INJECTION = "sql_injection"
    XSS = "xss"
    COMMAND_INJECTION = "command_injection"
    PATH_TRAVERSAL = "path_traversal"
    SSRF = "ssrf"
    DESERIALIZATION = "deserialization"
    XXE = "xxe"
    LDAP_INJECTION = "ldap_injection"
 
class ActionType(Enum):
    BLOCK = "block"
    LOG = "log"
    ALERT = "alert"
    SANITIZE = "sanitize"
 
class Severity(Enum):
    CRITICAL = 4
    HIGH = 3
    MEDIUM = 2
    LOW = 1
 
@dataclass
class SecurityEvent:
    event_id: str
    attack_type: AttackType
    severity: Severity
    timestamp: datetime
    request_id: str
    source_ip: str
    user_id: Optional[str]
    endpoint: str
    payload: str
    context: Dict[str, Any]
    action_taken: ActionType
    blocked: bool
 
@dataclass
class RASPRule:
    rule_id: str
    attack_type: AttackType
    severity: Severity
    pattern: str
    description: str
    action: ActionType
    enabled: bool = True
    compile_pattern: re.Pattern = field(init=False)
 
    def __post_init__(self):
        self.compiled_pattern = re.compile(self.pattern, re.IGNORECASE)
 
class RASPEngine:
    def __init__(self):
        self.rules: Dict[AttackType, List[RASPRule]] = {}
        self.events: List[SecurityEvent] = []
        self.event_handlers: List[Callable] = []
        self.mode = 'blocking'
        self._lock = threading.Lock()
        self._load_default_rules()
 
    def _load_default_rules(self):
        """Load default security rules."""
        default_rules = [
            # SQL Injection Rules
            RASPRule(
                rule_id="SQLI-001",
                attack_type=AttackType.SQL_INJECTION,
                severity=Severity.CRITICAL,
                pattern=r"(\b(union|select|insert|update|delete|drop|truncate)\b.*\b(from|into|where|set)\b)",
                description="SQL keyword injection detected",
                action=ActionType.BLOCK
            ),
            RASPRule(
                rule_id="SQLI-002",
                attack_type=AttackType.SQL_INJECTION,
                severity=Severity.HIGH,
                pattern=r"('|\")\s*(or|and)\s*('|\")?\s*\d+\s*=\s*\d+",
                description="SQL boolean injection detected",
                action=ActionType.BLOCK
            ),
            RASPRule(
                rule_id="SQLI-003",
                attack_type=AttackType.SQL_INJECTION,
                severity=Severity.HIGH,
                pattern=r";\s*(drop|delete|truncate|alter)\s+",
                description="SQL statement termination attack",
                action=ActionType.BLOCK
            ),
            # XSS Rules
            RASPRule(
                rule_id="XSS-001",
                attack_type=AttackType.XSS,
                severity=Severity.HIGH,
                pattern=r"<script[^>]*>.*?</script>",
                description="Script tag injection detected",
                action=ActionType.SANITIZE
            ),
            RASPRule(
                rule_id="XSS-002",
                attack_type=AttackType.XSS,
                severity=Severity.HIGH,
                pattern=r"(on\w+)\s*=\s*['\"]?[^'\"]*['\"]?",
                description="Event handler injection detected",
                action=ActionType.SANITIZE
            ),
            RASPRule(
                rule_id="XSS-003",
                attack_type=AttackType.XSS,
                severity=Severity.MEDIUM,
                pattern=r"javascript\s*:",
                description="JavaScript protocol injection",
                action=ActionType.BLOCK
            ),
            # Command Injection Rules
            RASPRule(
                rule_id="CMDI-001",
                attack_type=AttackType.COMMAND_INJECTION,
                severity=Severity.CRITICAL,
                pattern=r"[;&|`$]|\$\(|\bexec\b|\beval\b",
                description="Command injection characters detected",
                action=ActionType.BLOCK
            ),
            RASPRule(
                rule_id="CMDI-002",
                attack_type=AttackType.COMMAND_INJECTION,
                severity=Severity.CRITICAL,
                pattern=r"\b(cat|ls|pwd|whoami|id|uname|wget|curl|nc|bash|sh)\b",
                description="Shell command detected in input",
                action=ActionType.BLOCK
            ),
            # Path Traversal Rules
            RASPRule(
                rule_id="PATH-001",
                attack_type=AttackType.PATH_TRAVERSAL,
                severity=Severity.HIGH,
                pattern=r"\.\.[\\/]|\.\.%2[fF]",
                description="Directory traversal sequence detected",
                action=ActionType.BLOCK
            ),
            RASPRule(
                rule_id="PATH-002",
                attack_type=AttackType.PATH_TRAVERSAL,
                severity=Severity.HIGH,
                pattern=r"(etc/(passwd|shadow)|windows/system32)",
                description="Sensitive file path detected",
                action=ActionType.BLOCK
            ),
            # SSRF Rules
            RASPRule(
                rule_id="SSRF-001",
                attack_type=AttackType.SSRF,
                severity=Severity.HIGH,
                pattern=r"(localhost|127\.0\.0\.1|0\.0\.0\.0|::1|169\.254\.\d+\.\d+)",
                description="Internal address in URL detected",
                action=ActionType.BLOCK
            ),
            RASPRule(
                rule_id="SSRF-002",
                attack_type=AttackType.SSRF,
                severity=Severity.MEDIUM,
                pattern=r"(file|gopher|dict|ftp)://",
                description="Dangerous URL scheme detected",
                action=ActionType.BLOCK
            )
        ]
 
        for rule in default_rules:
            if rule.attack_type not in self.rules:
                self.rules[rule.attack_type] = []
            self.rules[rule.attack_type].append(rule)
 
    def analyze(self, input_data: str, context: Dict[str, Any] = None) -> Optional[SecurityEvent]:
        """Analyze input for security threats."""
        context = context or {}
 
        for attack_type, rules in self.rules.items():
            for rule in rules:
                if not rule.enabled:
                    continue
 
                if rule.compiled_pattern.search(input_data):
                    event = self._create_event(rule, input_data, context)
                    self._handle_event(event)
 
                    if rule.action == ActionType.BLOCK and self.mode == 'blocking':
                        return event
 
        return None
 
    def _create_event(self, rule: RASPRule, payload: str, context: Dict) -> SecurityEvent:
        """Create security event from rule match."""
        return SecurityEvent(
            event_id=self._generate_event_id(),
            attack_type=rule.attack_type,
            severity=rule.severity,
            timestamp=datetime.utcnow(),
            request_id=context.get('request_id', 'unknown'),
            source_ip=context.get('source_ip', 'unknown'),
            user_id=context.get('user_id'),
            endpoint=context.get('endpoint', 'unknown'),
            payload=payload[:500],
            context=context,
            action_taken=rule.action,
            blocked=rule.action == ActionType.BLOCK
        )
 
    def _generate_event_id(self) -> str:
        return f"RASP-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{hashlib.sha256(str(datetime.utcnow().timestamp()).encode()).hexdigest()[:8]}"
 
    def _handle_event(self, event: SecurityEvent):
        """Handle detected security event."""
        with self._lock:
            self.events.append(event)
 
        for handler in self.event_handlers:
            try:
                handler(event)
            except Exception as e:
                print(f"Event handler error: {e}")
 
    def register_handler(self, handler: Callable[[SecurityEvent], None]):
        """Register event handler."""
        self.event_handlers.append(handler)
 
    def sanitize(self, input_data: str, attack_type: AttackType) -> str:
        """Sanitize input by removing dangerous patterns."""
        sanitized = input_data
 
        for rule in self.rules.get(attack_type, []):
            if rule.action == ActionType.SANITIZE:
                sanitized = rule.compiled_pattern.sub('', sanitized)
 
        return sanitized

Method-Level Protection

Implement protection decorators:

from functools import wraps
from typing import List, Optional
import inspect
 
# Global RASP engine instance
_rasp_engine = RASPEngine()
 
def protect(
    attack_types: List[AttackType] = None,
    params: List[str] = None,
    sanitize: bool = False
):
    """Decorator to protect function parameters from attacks."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Get function signature
            sig = inspect.signature(func)
            bound = sig.bind(*args, **kwargs)
            bound.apply_defaults()
 
            # Determine which parameters to check
            params_to_check = params or list(bound.arguments.keys())
 
            # Build context
            context = {
                'function': func.__name__,
                'module': func.__module__,
                'request_id': kwargs.get('request_id', 'unknown')
            }
 
            # Analyze each parameter
            for param_name in params_to_check:
                if param_name not in bound.arguments:
                    continue
 
                value = bound.arguments[param_name]
                if not isinstance(value, str):
                    continue
 
                context['parameter'] = param_name
 
                # Check against specified or all attack types
                types_to_check = attack_types or list(AttackType)
 
                for attack_type in types_to_check:
                    event = _rasp_engine.analyze(value, context)
                    if event:
                        if sanitize:
                            bound.arguments[param_name] = _rasp_engine.sanitize(value, attack_type)
                        else:
                            raise SecurityException(
                                f"Security violation detected: {event.attack_type.value}",
                                event
                            )
 
            return func(*bound.args, **bound.kwargs)
        return wrapper
    return decorator
 
class SecurityException(Exception):
    def __init__(self, message: str, event: SecurityEvent):
        super().__init__(message)
        self.event = event
 
# Database query protection
def protect_query(func):
    """Decorator specifically for database query functions."""
    @wraps(func)
    def wrapper(query: str, params: tuple = None, *args, **kwargs):
        context = {'function': func.__name__, 'query_type': 'database'}
 
        # Analyze query string
        event = _rasp_engine.analyze(query, context)
        if event and event.attack_type == AttackType.SQL_INJECTION:
            raise SecurityException("SQL injection detected in query", event)
 
        # Analyze parameters
        if params:
            for i, param in enumerate(params):
                if isinstance(param, str):
                    context['parameter_index'] = i
                    event = _rasp_engine.analyze(param, context)
                    if event:
                        raise SecurityException(f"Malicious input in parameter {i}", event)
 
        return func(query, params, *args, **kwargs)
    return wrapper
 
# URL validation for SSRF protection
def protect_url(func):
    """Decorator to protect URL parameters from SSRF."""
    @wraps(func)
    def wrapper(url: str, *args, **kwargs):
        from urllib.parse import urlparse
 
        context = {'function': func.__name__, 'input_type': 'url'}
 
        # Basic RASP analysis
        event = _rasp_engine.analyze(url, context)
        if event and event.attack_type == AttackType.SSRF:
            raise SecurityException("SSRF attempt detected", event)
 
        # Additional URL validation
        try:
            parsed = urlparse(url)
 
            # Block internal schemes
            if parsed.scheme in ['file', 'gopher', 'dict']:
                raise SecurityException("Blocked URL scheme", None)
 
            # Block internal IPs
            hostname = parsed.hostname or ''
            if _is_internal_ip(hostname):
                raise SecurityException("Internal IP not allowed", None)
 
        except ValueError as e:
            raise SecurityException(f"Invalid URL: {e}", None)
 
        return func(url, *args, **kwargs)
    return wrapper
 
def _is_internal_ip(hostname: str) -> bool:
    """Check if hostname resolves to internal IP."""
    import socket
    import ipaddress
 
    try:
        ip = socket.gethostbyname(hostname)
        ip_obj = ipaddress.ip_address(ip)
        return ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local
    except (socket.gaierror, ValueError):
        return False

Behavioral Analysis

Implement behavioral anomaly detection:

from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
import statistics
 
class BehavioralAnalyzer:
    def __init__(self, rasp_engine: RASPEngine):
        self.rasp_engine = rasp_engine
        self.user_profiles: Dict[str, UserProfile] = {}
        self.baseline_period = timedelta(days=7)
        self.anomaly_threshold = 2.0
 
    def analyze_request(self, request_context: Dict) -> Optional[SecurityEvent]:
        """Analyze request for behavioral anomalies."""
        user_id = request_context.get('user_id', 'anonymous')
        profile = self._get_or_create_profile(user_id)
 
        # Update profile with current request
        profile.record_request(request_context)
 
        # Check for anomalies
        anomalies = []
 
        # Request rate anomaly
        if profile.is_rate_anomalous():
            anomalies.append(('rate', 'Unusual request rate detected'))
 
        # Time-based anomaly
        if profile.is_time_anomalous(request_context.get('timestamp', datetime.utcnow())):
            anomalies.append(('time', 'Request at unusual time'))
 
        # Endpoint anomaly
        if profile.is_endpoint_anomalous(request_context.get('endpoint', '')):
            anomalies.append(('endpoint', 'Access to unusual endpoint'))
 
        # Parameter anomaly
        if profile.is_parameter_anomalous(request_context.get('parameters', {})):
            anomalies.append(('parameter', 'Unusual parameter patterns'))
 
        if anomalies:
            return self._create_behavioral_event(anomalies, request_context)
 
        return None
 
    def _get_or_create_profile(self, user_id: str) -> 'UserProfile':
        if user_id not in self.user_profiles:
            self.user_profiles[user_id] = UserProfile(user_id)
        return self.user_profiles[user_id]
 
    def _create_behavioral_event(self, anomalies: List[tuple], context: Dict) -> SecurityEvent:
        severity = Severity.HIGH if len(anomalies) > 2 else Severity.MEDIUM
 
        return SecurityEvent(
            event_id=f"BEHAV-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
            attack_type=AttackType.SQL_INJECTION,
            severity=severity,
            timestamp=datetime.utcnow(),
            request_id=context.get('request_id', 'unknown'),
            source_ip=context.get('source_ip', 'unknown'),
            user_id=context.get('user_id'),
            endpoint=context.get('endpoint', 'unknown'),
            payload=str(anomalies),
            context={**context, 'anomalies': anomalies},
            action_taken=ActionType.ALERT,
            blocked=False
        )
 
class UserProfile:
    def __init__(self, user_id: str):
        self.user_id = user_id
        self.request_times: List[datetime] = []
        self.endpoints_accessed: Dict[str, int] = defaultdict(int)
        self.hourly_activity: Dict[int, int] = defaultdict(int)
        self.request_sizes: List[int] = []
        self.baseline_established = False
 
    def record_request(self, context: Dict):
        """Record request for profiling."""
        now = datetime.utcnow()
        self.request_times.append(now)
        self.hourly_activity[now.hour] += 1
 
        endpoint = context.get('endpoint', '')
        self.endpoints_accessed[endpoint] += 1
 
        if 'content_length' in context:
            self.request_sizes.append(context['content_length'])
 
        # Cleanup old data
        cutoff = now - timedelta(days=30)
        self.request_times = [t for t in self.request_times if t > cutoff]
 
    def is_rate_anomalous(self) -> bool:
        """Check if current request rate is anomalous."""
        if len(self.request_times) < 100:
            return False
 
        now = datetime.utcnow()
        recent = [t for t in self.request_times if t > now - timedelta(minutes=5)]
        historical = [t for t in self.request_times if t > now - timedelta(hours=24)]
 
        if len(historical) < 10:
            return False
 
        current_rate = len(recent) / 5
        historical_rate = len(historical) / (24 * 60)
 
        return current_rate > historical_rate * 5
 
    def is_time_anomalous(self, timestamp: datetime) -> bool:
        """Check if request time is anomalous for this user."""
        hour = timestamp.hour
        total_requests = sum(self.hourly_activity.values())
 
        if total_requests < 100:
            return False
 
        hour_ratio = self.hourly_activity[hour] / total_requests
 
        return hour_ratio < 0.01
 
    def is_endpoint_anomalous(self, endpoint: str) -> bool:
        """Check if endpoint access is anomalous."""
        if endpoint not in self.endpoints_accessed:
            return True
        return False
 
    def is_parameter_anomalous(self, parameters: Dict) -> bool:
        """Check for anomalous parameter patterns."""
        for key, value in parameters.items():
            if isinstance(value, str):
                if len(value) > 10000:
                    return True
                if value.count('<') > 10 or value.count('>') > 10:
                    return True
 
        return False

Integration with DevSecOps

Integrate RASP with CI/CD:

# .github/workflows/rasp-config-validation.yml
name: RASP Configuration Validation
 
on:
  pull_request:
    paths:
      - 'security/rasp-rules.yaml'
      - 'security/rasp-config.yaml'
 
jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Validate RASP Rules
        run: |
          python scripts/validate_rasp_rules.py security/rasp-rules.yaml
 
      - name: Test Rules Against Samples
        run: |
          python scripts/test_rasp_rules.py \
            --rules security/rasp-rules.yaml \
            --samples tests/attack-samples/
 
      - name: Check for False Positive Rate
        run: |
          python scripts/fp_analysis.py \
            --rules security/rasp-rules.yaml \
            --legitimate tests/legitimate-samples/ \
            --max-fp-rate 0.01
 
      - name: Generate Rule Coverage Report
        run: |
          python scripts/coverage_report.py \
            --rules security/rasp-rules.yaml \
            --output rasp-coverage.html
 
      - name: Upload Coverage Report
        uses: actions/upload-artifact@v4
        with:
          name: rasp-coverage
          path: rasp-coverage.html

Conclusion

RASP provides real-time attack detection and prevention from within applications. Implement pattern-based rules for known attacks, behavioral analysis for anomaly detection, and method-level protection decorators. Integrate with DevSecOps pipelines for continuous security validation. RASP complements WAF and other perimeter defenses by providing application-aware protection.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.