DevSecOps

Implementarea Runtime Application Self-Protection (RASP)

Nicu Constantin
--8 min lectura
#rasp#runtime-security#application-security#attack-detection#devsecops

Runtime Application Self-Protection (RASP) ofera detectia si prevenirea atacurilor in timp real din interiorul aplicatiei. Acest ghid acopera implementarea capabilitatilor RASP pentru securitate runtime completa.

Privire de Ansamblu asupra Arhitecturii RASP

Construieste un framework RASP pentru protectia aplicatiei:

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
from datetime import datetime
from abc import ABC, abstractmethod
import re
import hashlib
import threading
import functools
 
class AttackType(Enum):
    SQL_INJECTION = "sql_injection"
    XSS = "xss"
    COMMAND_INJECTION = "command_injection"
    PATH_TRAVERSAL = "path_traversal"
    SSRF = "ssrf"
    DESERIALIZATION = "deserialization"
    XXE = "xxe"
    LDAP_INJECTION = "ldap_injection"
 
class ActionType(Enum):
    BLOCK = "block"
    LOG = "log"
    ALERT = "alert"
    SANITIZE = "sanitize"
 
class Severity(Enum):
    CRITICAL = 4
    HIGH = 3
    MEDIUM = 2
    LOW = 1
 
@dataclass
class SecurityEvent:
    event_id: str
    attack_type: AttackType
    severity: Severity
    timestamp: datetime
    request_id: str
    source_ip: str
    user_id: Optional[str]
    endpoint: str
    payload: str
    context: Dict[str, Any]
    action_taken: ActionType
    blocked: bool
 
@dataclass
class RASPRule:
    rule_id: str
    attack_type: AttackType
    severity: Severity
    pattern: str
    description: str
    action: ActionType
    enabled: bool = True
    compile_pattern: re.Pattern = field(init=False)
 
    def __post_init__(self):
        self.compiled_pattern = re.compile(self.pattern, re.IGNORECASE)
 
class RASPEngine:
    def __init__(self):
        self.rules: Dict[AttackType, List[RASPRule]] = {}
        self.events: List[SecurityEvent] = []
        self.event_handlers: List[Callable] = []
        self.mode = 'blocking'
        self._lock = threading.Lock()
        self._load_default_rules()
 
    def _load_default_rules(self):
        """Incarca regulile de securitate implicite."""
        default_rules = [
            # Reguli SQL Injection
            RASPRule(
                rule_id="SQLI-001",
                attack_type=AttackType.SQL_INJECTION,
                severity=Severity.CRITICAL,
                pattern=r"(\b(union|select|insert|update|delete|drop|truncate)\b.*\b(from|into|where|set)\b)",
                description="SQL keyword injection detected",
                action=ActionType.BLOCK
            ),
            RASPRule(
                rule_id="SQLI-002",
                attack_type=AttackType.SQL_INJECTION,
                severity=Severity.HIGH,
                pattern=r"('|\")\s*(or|and)\s*('|\")?\s*\d+\s*=\s*\d+",
                description="SQL boolean injection detected",
                action=ActionType.BLOCK
            ),
            RASPRule(
                rule_id="SQLI-003",
                attack_type=AttackType.SQL_INJECTION,
                severity=Severity.HIGH,
                pattern=r";\s*(drop|delete|truncate|alter)\s+",
                description="SQL statement termination attack",
                action=ActionType.BLOCK
            ),
            # Reguli XSS
            RASPRule(
                rule_id="XSS-001",
                attack_type=AttackType.XSS,
                severity=Severity.HIGH,
                pattern=r"<script[^>]*>.*?</script>",
                description="Script tag injection detected",
                action=ActionType.SANITIZE
            ),
            RASPRule(
                rule_id="XSS-002",
                attack_type=AttackType.XSS,
                severity=Severity.HIGH,
                pattern=r"(on\w+)\s*=\s*['\"]?[^'\"]*['\"]?",
                description="Event handler injection detected",
                action=ActionType.SANITIZE
            ),
            RASPRule(
                rule_id="XSS-003",
                attack_type=AttackType.XSS,
                severity=Severity.MEDIUM,
                pattern=r"javascript\s*:",
                description="JavaScript protocol injection",
                action=ActionType.BLOCK
            ),
            # Reguli Command Injection
            RASPRule(
                rule_id="CMDI-001",
                attack_type=AttackType.COMMAND_INJECTION,
                severity=Severity.CRITICAL,
                pattern=r"[;&|`$]|\$\(|\bexec\b|\beval\b",
                description="Command injection characters detected",
                action=ActionType.BLOCK
            ),
            RASPRule(
                rule_id="CMDI-002",
                attack_type=AttackType.COMMAND_INJECTION,
                severity=Severity.CRITICAL,
                pattern=r"\b(cat|ls|pwd|whoami|id|uname|wget|curl|nc|bash|sh)\b",
                description="Shell command detected in input",
                action=ActionType.BLOCK
            ),
            # Reguli Path Traversal
            RASPRule(
                rule_id="PATH-001",
                attack_type=AttackType.PATH_TRAVERSAL,
                severity=Severity.HIGH,
                pattern=r"\.\.[\\/]|\.\.%2[fF]",
                description="Directory traversal sequence detected",
                action=ActionType.BLOCK
            ),
            RASPRule(
                rule_id="PATH-002",
                attack_type=AttackType.PATH_TRAVERSAL,
                severity=Severity.HIGH,
                pattern=r"(etc/(passwd|shadow)|windows/system32)",
                description="Sensitive file path detected",
                action=ActionType.BLOCK
            ),
            # Reguli SSRF
            RASPRule(
                rule_id="SSRF-001",
                attack_type=AttackType.SSRF,
                severity=Severity.HIGH,
                pattern=r"(localhost|127\.0\.0\.1|0\.0\.0\.0|::1|169\.254\.\d+\.\d+)",
                description="Internal address in URL detected",
                action=ActionType.BLOCK
            ),
            RASPRule(
                rule_id="SSRF-002",
                attack_type=AttackType.SSRF,
                severity=Severity.MEDIUM,
                pattern=r"(file|gopher|dict|ftp)://",
                description="Dangerous URL scheme detected",
                action=ActionType.BLOCK
            )
        ]
 
        for rule in default_rules:
            if rule.attack_type not in self.rules:
                self.rules[rule.attack_type] = []
            self.rules[rule.attack_type].append(rule)
 
    def analyze(self, input_data: str, context: Dict[str, Any] = None) -> Optional[SecurityEvent]:
        """Analizeaza inputul pentru amenintari de securitate."""
        context = context or {}
 
        for attack_type, rules in self.rules.items():
            for rule in rules:
                if not rule.enabled:
                    continue
 
                if rule.compiled_pattern.search(input_data):
                    event = self._create_event(rule, input_data, context)
                    self._handle_event(event)
 
                    if rule.action == ActionType.BLOCK and self.mode == 'blocking':
                        return event
 
        return None
 
    def _create_event(self, rule: RASPRule, payload: str, context: Dict) -> SecurityEvent:
        """Creeaza un eveniment de securitate din potrivirea regulii."""
        return SecurityEvent(
            event_id=self._generate_event_id(),
            attack_type=rule.attack_type,
            severity=rule.severity,
            timestamp=datetime.utcnow(),
            request_id=context.get('request_id', 'unknown'),
            source_ip=context.get('source_ip', 'unknown'),
            user_id=context.get('user_id'),
            endpoint=context.get('endpoint', 'unknown'),
            payload=payload[:500],
            context=context,
            action_taken=rule.action,
            blocked=rule.action == ActionType.BLOCK
        )
 
    def _generate_event_id(self) -> str:
        return f"RASP-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{hashlib.sha256(str(datetime.utcnow().timestamp()).encode()).hexdigest()[:8]}"
 
    def _handle_event(self, event: SecurityEvent):
        """Gestioneaza evenimentul de securitate detectat."""
        with self._lock:
            self.events.append(event)
 
        for handler in self.event_handlers:
            try:
                handler(event)
            except Exception as e:
                print(f"Event handler error: {e}")
 
    def register_handler(self, handler: Callable[[SecurityEvent], None]):
        """Inregistreaza un handler de evenimente."""
        self.event_handlers.append(handler)
 
    def sanitize(self, input_data: str, attack_type: AttackType) -> str:
        """Sanitizeaza inputul prin eliminarea pattern-urilor periculoase."""
        sanitized = input_data
 
        for rule in self.rules.get(attack_type, []):
            if rule.action == ActionType.SANITIZE:
                sanitized = rule.compiled_pattern.sub('', sanitized)
 
        return sanitized

Protectie la Nivel de Metoda

Implementeaza decoratori de protectie:

from functools import wraps
from typing import List, Optional
import inspect
 
# Instanta globala a motorului RASP
_rasp_engine = RASPEngine()
 
def protect(
    attack_types: List[AttackType] = None,
    params: List[str] = None,
    sanitize: bool = False
):
    """Decorator pentru protejarea parametrilor functiei impotriva atacurilor."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Obtine signatura functiei
            sig = inspect.signature(func)
            bound = sig.bind(*args, **kwargs)
            bound.apply_defaults()
 
            # Determina ce parametri sa verifice
            params_to_check = params or list(bound.arguments.keys())
 
            # Construieste contextul
            context = {
                'function': func.__name__,
                'module': func.__module__,
                'request_id': kwargs.get('request_id', 'unknown')
            }
 
            # Analizeaza fiecare parametru
            for param_name in params_to_check:
                if param_name not in bound.arguments:
                    continue
 
                value = bound.arguments[param_name]
                if not isinstance(value, str):
                    continue
 
                context['parameter'] = param_name
 
                # Verifica tipurile de atac specificate sau toate
                types_to_check = attack_types or list(AttackType)
 
                for attack_type in types_to_check:
                    event = _rasp_engine.analyze(value, context)
                    if event:
                        if sanitize:
                            bound.arguments[param_name] = _rasp_engine.sanitize(value, attack_type)
                        else:
                            raise SecurityException(
                                f"Security violation detected: {event.attack_type.value}",
                                event
                            )
 
            return func(*bound.args, **bound.kwargs)
        return wrapper
    return decorator
 
class SecurityException(Exception):
    def __init__(self, message: str, event: SecurityEvent):
        super().__init__(message)
        self.event = event
 
# Protectie query-uri baza de date
def protect_query(func):
    """Decorator specific pentru functiile de query baza de date."""
    @wraps(func)
    def wrapper(query: str, params: tuple = None, *args, **kwargs):
        context = {'function': func.__name__, 'query_type': 'database'}
 
        # Analizeaza string-ul query
        event = _rasp_engine.analyze(query, context)
        if event and event.attack_type == AttackType.SQL_INJECTION:
            raise SecurityException("SQL injection detected in query", event)
 
        # Analizeaza parametrii
        if params:
            for i, param in enumerate(params):
                if isinstance(param, str):
                    context['parameter_index'] = i
                    event = _rasp_engine.analyze(param, context)
                    if event:
                        raise SecurityException(f"Malicious input in parameter {i}", event)
 
        return func(query, params, *args, **kwargs)
    return wrapper
 
# Validare URL pentru protectie SSRF
def protect_url(func):
    """Decorator pentru protejarea parametrilor URL impotriva SSRF."""
    @wraps(func)
    def wrapper(url: str, *args, **kwargs):
        from urllib.parse import urlparse
 
        context = {'function': func.__name__, 'input_type': 'url'}
 
        # Analiza RASP de baza
        event = _rasp_engine.analyze(url, context)
        if event and event.attack_type == AttackType.SSRF:
            raise SecurityException("SSRF attempt detected", event)
 
        # Validare URL aditionala
        try:
            parsed = urlparse(url)
 
            # Blocheaza schemele interne
            if parsed.scheme in ['file', 'gopher', 'dict']:
                raise SecurityException("Blocked URL scheme", None)
 
            # Blocheaza IP-urile interne
            hostname = parsed.hostname or ''
            if _is_internal_ip(hostname):
                raise SecurityException("Internal IP not allowed", None)
 
        except ValueError as e:
            raise SecurityException(f"Invalid URL: {e}", None)
 
        return func(url, *args, **kwargs)
    return wrapper
 
def _is_internal_ip(hostname: str) -> bool:
    """Verifica daca hostname-ul se rezolva la un IP intern."""
    import socket
    import ipaddress
 
    try:
        ip = socket.gethostbyname(hostname)
        ip_obj = ipaddress.ip_address(ip)
        return ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local
    except (socket.gaierror, ValueError):
        return False

Analiza Comportamentala

Implementeaza detectia anomaliilor comportamentale:

from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
import statistics
 
class BehavioralAnalyzer:
    def __init__(self, rasp_engine: RASPEngine):
        self.rasp_engine = rasp_engine
        self.user_profiles: Dict[str, UserProfile] = {}
        self.baseline_period = timedelta(days=7)
        self.anomaly_threshold = 2.0
 
    def analyze_request(self, request_context: Dict) -> Optional[SecurityEvent]:
        """Analizeaza cererea pentru anomalii comportamentale."""
        user_id = request_context.get('user_id', 'anonymous')
        profile = self._get_or_create_profile(user_id)
 
        # Actualizeaza profilul cu cererea curenta
        profile.record_request(request_context)
 
        # Verifica anomalii
        anomalies = []
 
        # Anomalie rata de cereri
        if profile.is_rate_anomalous():
            anomalies.append(('rate', 'Unusual request rate detected'))
 
        # Anomalie bazata pe timp
        if profile.is_time_anomalous(request_context.get('timestamp', datetime.utcnow())):
            anomalies.append(('time', 'Request at unusual time'))
 
        # Anomalie endpoint
        if profile.is_endpoint_anomalous(request_context.get('endpoint', '')):
            anomalies.append(('endpoint', 'Access to unusual endpoint'))
 
        # Anomalie parametri
        if profile.is_parameter_anomalous(request_context.get('parameters', {})):
            anomalies.append(('parameter', 'Unusual parameter patterns'))
 
        if anomalies:
            return self._create_behavioral_event(anomalies, request_context)
 
        return None
 
    def _get_or_create_profile(self, user_id: str) -> 'UserProfile':
        if user_id not in self.user_profiles:
            self.user_profiles[user_id] = UserProfile(user_id)
        return self.user_profiles[user_id]
 
    def _create_behavioral_event(self, anomalies: List[tuple], context: Dict) -> SecurityEvent:
        severity = Severity.HIGH if len(anomalies) > 2 else Severity.MEDIUM
 
        return SecurityEvent(
            event_id=f"BEHAV-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
            attack_type=AttackType.SQL_INJECTION,
            severity=severity,
            timestamp=datetime.utcnow(),
            request_id=context.get('request_id', 'unknown'),
            source_ip=context.get('source_ip', 'unknown'),
            user_id=context.get('user_id'),
            endpoint=context.get('endpoint', 'unknown'),
            payload=str(anomalies),
            context={**context, 'anomalies': anomalies},
            action_taken=ActionType.ALERT,
            blocked=False
        )
 
class UserProfile:
    def __init__(self, user_id: str):
        self.user_id = user_id
        self.request_times: List[datetime] = []
        self.endpoints_accessed: Dict[str, int] = defaultdict(int)
        self.hourly_activity: Dict[int, int] = defaultdict(int)
        self.request_sizes: List[int] = []
        self.baseline_established = False
 
    def record_request(self, context: Dict):
        """Inregistreaza cererea pentru profilare."""
        now = datetime.utcnow()
        self.request_times.append(now)
        self.hourly_activity[now.hour] += 1
 
        endpoint = context.get('endpoint', '')
        self.endpoints_accessed[endpoint] += 1
 
        if 'content_length' in context:
            self.request_sizes.append(context['content_length'])
 
        # Curata datele vechi
        cutoff = now - timedelta(days=30)
        self.request_times = [t for t in self.request_times if t > cutoff]
 
    def is_rate_anomalous(self) -> bool:
        """Verifica daca rata curenta de cereri e anormala."""
        if len(self.request_times) < 100:
            return False
 
        now = datetime.utcnow()
        recent = [t for t in self.request_times if t > now - timedelta(minutes=5)]
        historical = [t for t in self.request_times if t > now - timedelta(hours=24)]
 
        if len(historical) < 10:
            return False
 
        current_rate = len(recent) / 5
        historical_rate = len(historical) / (24 * 60)
 
        return current_rate > historical_rate * 5
 
    def is_time_anomalous(self, timestamp: datetime) -> bool:
        """Verifica daca ora cererii e anormala pentru acest utilizator."""
        hour = timestamp.hour
        total_requests = sum(self.hourly_activity.values())
 
        if total_requests < 100:
            return False
 
        hour_ratio = self.hourly_activity[hour] / total_requests
 
        return hour_ratio < 0.01
 
    def is_endpoint_anomalous(self, endpoint: str) -> bool:
        """Verifica daca accesul la endpoint e anormal."""
        if endpoint not in self.endpoints_accessed:
            return True
        return False
 
    def is_parameter_anomalous(self, parameters: Dict) -> bool:
        """Verifica pattern-uri anormale de parametri."""
        for key, value in parameters.items():
            if isinstance(value, str):
                if len(value) > 10000:
                    return True
                if value.count('<') > 10 or value.count('>') > 10:
                    return True
 
        return False

Integrare cu DevSecOps

Integreaza RASP cu CI/CD:

# .github/workflows/rasp-config-validation.yml
name: RASP Configuration Validation
 
on:
  pull_request:
    paths:
      - 'security/rasp-rules.yaml'
      - 'security/rasp-config.yaml'
 
jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Validate RASP Rules
        run: |
          python scripts/validate_rasp_rules.py security/rasp-rules.yaml
 
      - name: Test Rules Against Samples
        run: |
          python scripts/test_rasp_rules.py \
            --rules security/rasp-rules.yaml \
            --samples tests/attack-samples/
 
      - name: Check for False Positive Rate
        run: |
          python scripts/fp_analysis.py \
            --rules security/rasp-rules.yaml \
            --legitimate tests/legitimate-samples/ \
            --max-fp-rate 0.01
 
      - name: Generate Rule Coverage Report
        run: |
          python scripts/coverage_report.py \
            --rules security/rasp-rules.yaml \
            --output rasp-coverage.html
 
      - name: Upload Coverage Report
        uses: actions/upload-artifact@v4
        with:
          name: rasp-coverage
          path: rasp-coverage.html

Concluzie

RASP ofera detectia si prevenirea atacurilor in timp real din interiorul aplicatiilor. Implementeaza reguli bazate pe pattern-uri pentru atacuri cunoscute, analiza comportamentala pentru detectia anomaliilor si decoratori de protectie la nivel de metoda. Integreaza cu pipeline-uri DevSecOps pentru validare continua a securitatii. RASP complementeaza WAF-ul si alte aparari perimetrale prin protectie orientata catre aplicatie.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.