Runtime Application Self-Protection (RASP) ofera detectia si prevenirea atacurilor in timp real din interiorul aplicatiei. Acest ghid acopera implementarea capabilitatilor RASP pentru securitate runtime completa.
Privire de Ansamblu asupra Arhitecturii RASP
Construieste un framework RASP pentru protectia aplicatiei:
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
from enum import Enum
from datetime import datetime
from abc import ABC, abstractmethod
import re
import hashlib
import threading
import functools
class AttackType(Enum):
SQL_INJECTION = "sql_injection"
XSS = "xss"
COMMAND_INJECTION = "command_injection"
PATH_TRAVERSAL = "path_traversal"
SSRF = "ssrf"
DESERIALIZATION = "deserialization"
XXE = "xxe"
LDAP_INJECTION = "ldap_injection"
class ActionType(Enum):
BLOCK = "block"
LOG = "log"
ALERT = "alert"
SANITIZE = "sanitize"
class Severity(Enum):
CRITICAL = 4
HIGH = 3
MEDIUM = 2
LOW = 1
@dataclass
class SecurityEvent:
event_id: str
attack_type: AttackType
severity: Severity
timestamp: datetime
request_id: str
source_ip: str
user_id: Optional[str]
endpoint: str
payload: str
context: Dict[str, Any]
action_taken: ActionType
blocked: bool
@dataclass
class RASPRule:
rule_id: str
attack_type: AttackType
severity: Severity
pattern: str
description: str
action: ActionType
enabled: bool = True
compile_pattern: re.Pattern = field(init=False)
def __post_init__(self):
self.compiled_pattern = re.compile(self.pattern, re.IGNORECASE)
class RASPEngine:
def __init__(self):
self.rules: Dict[AttackType, List[RASPRule]] = {}
self.events: List[SecurityEvent] = []
self.event_handlers: List[Callable] = []
self.mode = 'blocking'
self._lock = threading.Lock()
self._load_default_rules()
def _load_default_rules(self):
"""Incarca regulile de securitate implicite."""
default_rules = [
# Reguli SQL Injection
RASPRule(
rule_id="SQLI-001",
attack_type=AttackType.SQL_INJECTION,
severity=Severity.CRITICAL,
pattern=r"(\b(union|select|insert|update|delete|drop|truncate)\b.*\b(from|into|where|set)\b)",
description="SQL keyword injection detected",
action=ActionType.BLOCK
),
RASPRule(
rule_id="SQLI-002",
attack_type=AttackType.SQL_INJECTION,
severity=Severity.HIGH,
pattern=r"('|\")\s*(or|and)\s*('|\")?\s*\d+\s*=\s*\d+",
description="SQL boolean injection detected",
action=ActionType.BLOCK
),
RASPRule(
rule_id="SQLI-003",
attack_type=AttackType.SQL_INJECTION,
severity=Severity.HIGH,
pattern=r";\s*(drop|delete|truncate|alter)\s+",
description="SQL statement termination attack",
action=ActionType.BLOCK
),
# Reguli XSS
RASPRule(
rule_id="XSS-001",
attack_type=AttackType.XSS,
severity=Severity.HIGH,
pattern=r"<script[^>]*>.*?</script>",
description="Script tag injection detected",
action=ActionType.SANITIZE
),
RASPRule(
rule_id="XSS-002",
attack_type=AttackType.XSS,
severity=Severity.HIGH,
pattern=r"(on\w+)\s*=\s*['\"]?[^'\"]*['\"]?",
description="Event handler injection detected",
action=ActionType.SANITIZE
),
RASPRule(
rule_id="XSS-003",
attack_type=AttackType.XSS,
severity=Severity.MEDIUM,
pattern=r"javascript\s*:",
description="JavaScript protocol injection",
action=ActionType.BLOCK
),
# Reguli Command Injection
RASPRule(
rule_id="CMDI-001",
attack_type=AttackType.COMMAND_INJECTION,
severity=Severity.CRITICAL,
pattern=r"[;&|`$]|\$\(|\bexec\b|\beval\b",
description="Command injection characters detected",
action=ActionType.BLOCK
),
RASPRule(
rule_id="CMDI-002",
attack_type=AttackType.COMMAND_INJECTION,
severity=Severity.CRITICAL,
pattern=r"\b(cat|ls|pwd|whoami|id|uname|wget|curl|nc|bash|sh)\b",
description="Shell command detected in input",
action=ActionType.BLOCK
),
# Reguli Path Traversal
RASPRule(
rule_id="PATH-001",
attack_type=AttackType.PATH_TRAVERSAL,
severity=Severity.HIGH,
pattern=r"\.\.[\\/]|\.\.%2[fF]",
description="Directory traversal sequence detected",
action=ActionType.BLOCK
),
RASPRule(
rule_id="PATH-002",
attack_type=AttackType.PATH_TRAVERSAL,
severity=Severity.HIGH,
pattern=r"(etc/(passwd|shadow)|windows/system32)",
description="Sensitive file path detected",
action=ActionType.BLOCK
),
# Reguli SSRF
RASPRule(
rule_id="SSRF-001",
attack_type=AttackType.SSRF,
severity=Severity.HIGH,
pattern=r"(localhost|127\.0\.0\.1|0\.0\.0\.0|::1|169\.254\.\d+\.\d+)",
description="Internal address in URL detected",
action=ActionType.BLOCK
),
RASPRule(
rule_id="SSRF-002",
attack_type=AttackType.SSRF,
severity=Severity.MEDIUM,
pattern=r"(file|gopher|dict|ftp)://",
description="Dangerous URL scheme detected",
action=ActionType.BLOCK
)
]
for rule in default_rules:
if rule.attack_type not in self.rules:
self.rules[rule.attack_type] = []
self.rules[rule.attack_type].append(rule)
def analyze(self, input_data: str, context: Dict[str, Any] = None) -> Optional[SecurityEvent]:
"""Analizeaza inputul pentru amenintari de securitate."""
context = context or {}
for attack_type, rules in self.rules.items():
for rule in rules:
if not rule.enabled:
continue
if rule.compiled_pattern.search(input_data):
event = self._create_event(rule, input_data, context)
self._handle_event(event)
if rule.action == ActionType.BLOCK and self.mode == 'blocking':
return event
return None
def _create_event(self, rule: RASPRule, payload: str, context: Dict) -> SecurityEvent:
"""Creeaza un eveniment de securitate din potrivirea regulii."""
return SecurityEvent(
event_id=self._generate_event_id(),
attack_type=rule.attack_type,
severity=rule.severity,
timestamp=datetime.utcnow(),
request_id=context.get('request_id', 'unknown'),
source_ip=context.get('source_ip', 'unknown'),
user_id=context.get('user_id'),
endpoint=context.get('endpoint', 'unknown'),
payload=payload[:500],
context=context,
action_taken=rule.action,
blocked=rule.action == ActionType.BLOCK
)
def _generate_event_id(self) -> str:
return f"RASP-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}-{hashlib.sha256(str(datetime.utcnow().timestamp()).encode()).hexdigest()[:8]}"
def _handle_event(self, event: SecurityEvent):
"""Gestioneaza evenimentul de securitate detectat."""
with self._lock:
self.events.append(event)
for handler in self.event_handlers:
try:
handler(event)
except Exception as e:
print(f"Event handler error: {e}")
def register_handler(self, handler: Callable[[SecurityEvent], None]):
"""Inregistreaza un handler de evenimente."""
self.event_handlers.append(handler)
def sanitize(self, input_data: str, attack_type: AttackType) -> str:
"""Sanitizeaza inputul prin eliminarea pattern-urilor periculoase."""
sanitized = input_data
for rule in self.rules.get(attack_type, []):
if rule.action == ActionType.SANITIZE:
sanitized = rule.compiled_pattern.sub('', sanitized)
return sanitizedProtectie la Nivel de Metoda
Implementeaza decoratori de protectie:
from functools import wraps
from typing import List, Optional
import inspect
# Instanta globala a motorului RASP
_rasp_engine = RASPEngine()
def protect(
attack_types: List[AttackType] = None,
params: List[str] = None,
sanitize: bool = False
):
"""Decorator pentru protejarea parametrilor functiei impotriva atacurilor."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Obtine signatura functiei
sig = inspect.signature(func)
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
# Determina ce parametri sa verifice
params_to_check = params or list(bound.arguments.keys())
# Construieste contextul
context = {
'function': func.__name__,
'module': func.__module__,
'request_id': kwargs.get('request_id', 'unknown')
}
# Analizeaza fiecare parametru
for param_name in params_to_check:
if param_name not in bound.arguments:
continue
value = bound.arguments[param_name]
if not isinstance(value, str):
continue
context['parameter'] = param_name
# Verifica tipurile de atac specificate sau toate
types_to_check = attack_types or list(AttackType)
for attack_type in types_to_check:
event = _rasp_engine.analyze(value, context)
if event:
if sanitize:
bound.arguments[param_name] = _rasp_engine.sanitize(value, attack_type)
else:
raise SecurityException(
f"Security violation detected: {event.attack_type.value}",
event
)
return func(*bound.args, **bound.kwargs)
return wrapper
return decorator
class SecurityException(Exception):
def __init__(self, message: str, event: SecurityEvent):
super().__init__(message)
self.event = event
# Protectie query-uri baza de date
def protect_query(func):
"""Decorator specific pentru functiile de query baza de date."""
@wraps(func)
def wrapper(query: str, params: tuple = None, *args, **kwargs):
context = {'function': func.__name__, 'query_type': 'database'}
# Analizeaza string-ul query
event = _rasp_engine.analyze(query, context)
if event and event.attack_type == AttackType.SQL_INJECTION:
raise SecurityException("SQL injection detected in query", event)
# Analizeaza parametrii
if params:
for i, param in enumerate(params):
if isinstance(param, str):
context['parameter_index'] = i
event = _rasp_engine.analyze(param, context)
if event:
raise SecurityException(f"Malicious input in parameter {i}", event)
return func(query, params, *args, **kwargs)
return wrapper
# Validare URL pentru protectie SSRF
def protect_url(func):
"""Decorator pentru protejarea parametrilor URL impotriva SSRF."""
@wraps(func)
def wrapper(url: str, *args, **kwargs):
from urllib.parse import urlparse
context = {'function': func.__name__, 'input_type': 'url'}
# Analiza RASP de baza
event = _rasp_engine.analyze(url, context)
if event and event.attack_type == AttackType.SSRF:
raise SecurityException("SSRF attempt detected", event)
# Validare URL aditionala
try:
parsed = urlparse(url)
# Blocheaza schemele interne
if parsed.scheme in ['file', 'gopher', 'dict']:
raise SecurityException("Blocked URL scheme", None)
# Blocheaza IP-urile interne
hostname = parsed.hostname or ''
if _is_internal_ip(hostname):
raise SecurityException("Internal IP not allowed", None)
except ValueError as e:
raise SecurityException(f"Invalid URL: {e}", None)
return func(url, *args, **kwargs)
return wrapper
def _is_internal_ip(hostname: str) -> bool:
"""Verifica daca hostname-ul se rezolva la un IP intern."""
import socket
import ipaddress
try:
ip = socket.gethostbyname(hostname)
ip_obj = ipaddress.ip_address(ip)
return ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local
except (socket.gaierror, ValueError):
return FalseAnaliza Comportamentala
Implementeaza detectia anomaliilor comportamentale:
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
import statistics
class BehavioralAnalyzer:
def __init__(self, rasp_engine: RASPEngine):
self.rasp_engine = rasp_engine
self.user_profiles: Dict[str, UserProfile] = {}
self.baseline_period = timedelta(days=7)
self.anomaly_threshold = 2.0
def analyze_request(self, request_context: Dict) -> Optional[SecurityEvent]:
"""Analizeaza cererea pentru anomalii comportamentale."""
user_id = request_context.get('user_id', 'anonymous')
profile = self._get_or_create_profile(user_id)
# Actualizeaza profilul cu cererea curenta
profile.record_request(request_context)
# Verifica anomalii
anomalies = []
# Anomalie rata de cereri
if profile.is_rate_anomalous():
anomalies.append(('rate', 'Unusual request rate detected'))
# Anomalie bazata pe timp
if profile.is_time_anomalous(request_context.get('timestamp', datetime.utcnow())):
anomalies.append(('time', 'Request at unusual time'))
# Anomalie endpoint
if profile.is_endpoint_anomalous(request_context.get('endpoint', '')):
anomalies.append(('endpoint', 'Access to unusual endpoint'))
# Anomalie parametri
if profile.is_parameter_anomalous(request_context.get('parameters', {})):
anomalies.append(('parameter', 'Unusual parameter patterns'))
if anomalies:
return self._create_behavioral_event(anomalies, request_context)
return None
def _get_or_create_profile(self, user_id: str) -> 'UserProfile':
if user_id not in self.user_profiles:
self.user_profiles[user_id] = UserProfile(user_id)
return self.user_profiles[user_id]
def _create_behavioral_event(self, anomalies: List[tuple], context: Dict) -> SecurityEvent:
severity = Severity.HIGH if len(anomalies) > 2 else Severity.MEDIUM
return SecurityEvent(
event_id=f"BEHAV-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
attack_type=AttackType.SQL_INJECTION,
severity=severity,
timestamp=datetime.utcnow(),
request_id=context.get('request_id', 'unknown'),
source_ip=context.get('source_ip', 'unknown'),
user_id=context.get('user_id'),
endpoint=context.get('endpoint', 'unknown'),
payload=str(anomalies),
context={**context, 'anomalies': anomalies},
action_taken=ActionType.ALERT,
blocked=False
)
class UserProfile:
def __init__(self, user_id: str):
self.user_id = user_id
self.request_times: List[datetime] = []
self.endpoints_accessed: Dict[str, int] = defaultdict(int)
self.hourly_activity: Dict[int, int] = defaultdict(int)
self.request_sizes: List[int] = []
self.baseline_established = False
def record_request(self, context: Dict):
"""Inregistreaza cererea pentru profilare."""
now = datetime.utcnow()
self.request_times.append(now)
self.hourly_activity[now.hour] += 1
endpoint = context.get('endpoint', '')
self.endpoints_accessed[endpoint] += 1
if 'content_length' in context:
self.request_sizes.append(context['content_length'])
# Curata datele vechi
cutoff = now - timedelta(days=30)
self.request_times = [t for t in self.request_times if t > cutoff]
def is_rate_anomalous(self) -> bool:
"""Verifica daca rata curenta de cereri e anormala."""
if len(self.request_times) < 100:
return False
now = datetime.utcnow()
recent = [t for t in self.request_times if t > now - timedelta(minutes=5)]
historical = [t for t in self.request_times if t > now - timedelta(hours=24)]
if len(historical) < 10:
return False
current_rate = len(recent) / 5
historical_rate = len(historical) / (24 * 60)
return current_rate > historical_rate * 5
def is_time_anomalous(self, timestamp: datetime) -> bool:
"""Verifica daca ora cererii e anormala pentru acest utilizator."""
hour = timestamp.hour
total_requests = sum(self.hourly_activity.values())
if total_requests < 100:
return False
hour_ratio = self.hourly_activity[hour] / total_requests
return hour_ratio < 0.01
def is_endpoint_anomalous(self, endpoint: str) -> bool:
"""Verifica daca accesul la endpoint e anormal."""
if endpoint not in self.endpoints_accessed:
return True
return False
def is_parameter_anomalous(self, parameters: Dict) -> bool:
"""Verifica pattern-uri anormale de parametri."""
for key, value in parameters.items():
if isinstance(value, str):
if len(value) > 10000:
return True
if value.count('<') > 10 or value.count('>') > 10:
return True
return FalseIntegrare cu DevSecOps
Integreaza RASP cu CI/CD:
# .github/workflows/rasp-config-validation.yml
name: RASP Configuration Validation
on:
pull_request:
paths:
- 'security/rasp-rules.yaml'
- 'security/rasp-config.yaml'
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate RASP Rules
run: |
python scripts/validate_rasp_rules.py security/rasp-rules.yaml
- name: Test Rules Against Samples
run: |
python scripts/test_rasp_rules.py \
--rules security/rasp-rules.yaml \
--samples tests/attack-samples/
- name: Check for False Positive Rate
run: |
python scripts/fp_analysis.py \
--rules security/rasp-rules.yaml \
--legitimate tests/legitimate-samples/ \
--max-fp-rate 0.01
- name: Generate Rule Coverage Report
run: |
python scripts/coverage_report.py \
--rules security/rasp-rules.yaml \
--output rasp-coverage.html
- name: Upload Coverage Report
uses: actions/upload-artifact@v4
with:
name: rasp-coverage
path: rasp-coverage.htmlConcluzie
RASP ofera detectia si prevenirea atacurilor in timp real din interiorul aplicatiilor. Implementeaza reguli bazate pe pattern-uri pentru atacuri cunoscute, analiza comportamentala pentru detectia anomaliilor si decoratori de protectie la nivel de metoda. Integreaza cu pipeline-uri DevSecOps pentru validare continua a securitatii. RASP complementeaza WAF-ul si alte aparari perimetrale prin protectie orientata catre aplicatie.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →