Web Application Firewall Implementation: WAF Security Configuration Guide
Web Application Firewalls provide critical protection against web attacks. This guide covers implementing and configuring WAF solutions for comprehensive application security.
WAF Architecture Overview
Custom WAF Rule Engine
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Callable, Set
from enum import Enum
import re
import json
class RuleAction(Enum):
ALLOW = "allow"
BLOCK = "block"
LOG = "log"
CHALLENGE = "challenge"
RATE_LIMIT = "rate_limit"
class RulePhase(Enum):
REQUEST_HEADERS = "request_headers"
REQUEST_BODY = "request_body"
RESPONSE_HEADERS = "response_headers"
RESPONSE_BODY = "response_body"
class ThreatCategory(Enum):
SQL_INJECTION = "sql_injection"
XSS = "cross_site_scripting"
PATH_TRAVERSAL = "path_traversal"
COMMAND_INJECTION = "command_injection"
FILE_INCLUSION = "file_inclusion"
PROTOCOL_ATTACK = "protocol_attack"
BOT = "bot"
SCANNER = "scanner"
RATE_ABUSE = "rate_abuse"
@dataclass
class WAFRule:
id: str
name: str
description: str
phase: RulePhase
action: RuleAction
severity: str # critical, high, medium, low
category: ThreatCategory
conditions: List[Dict]
transformations: List[str] = field(default_factory=list)
enabled: bool = True
paranoia_level: int = 1 # 1-4, higher = more strict
@dataclass
class WAFRequest:
request_id: str
timestamp: datetime
client_ip: str
method: str
uri: str
query_string: Optional[str]
headers: Dict[str, str]
cookies: Dict[str, str]
body: Optional[str]
user_agent: str
country: Optional[str] = None
asn: Optional[int] = None
@dataclass
class WAFDecision:
request_id: str
action: RuleAction
matched_rules: List[str]
threat_score: int
details: Dict
timestamp: datetime = field(default_factory=datetime.utcnow)
class WAFEngine:
def __init__(self, config: Dict = None):
self.config = config or {}
self.rules: Dict[str, WAFRule] = {}
self.ip_reputation: Dict[str, int] = {}
self.rate_limiters: Dict[str, 'RateLimiter'] = {}
self.paranoia_level = config.get('paranoia_level', 1)
self._load_core_rules()
def _load_core_rules(self):
"""Load OWASP ModSecurity Core Rule Set inspired rules."""
# SQL Injection rules
self._add_sqli_rules()
# XSS rules
self._add_xss_rules()
# Path traversal rules
self._add_path_traversal_rules()
# Protocol attack rules
self._add_protocol_rules()
# Scanner detection rules
self._add_scanner_rules()
def _add_sqli_rules(self):
"""Add SQL injection detection rules."""
sqli_patterns = [
r"(?i)(\b(union|select|insert|update|delete|drop|create|alter|exec|execute)\b.*\b(from|into|table|database|schema)\b)",
r"(?i)(--|\#|\/\*|\*\/)",
r"(?i)(\bor\b|\band\b)\s*[\d\w'\"]+\s*[=<>]",
r"(?i)(char|nchar|varchar|nvarchar)\s*\(",
r"(?i)(\bwaitfor\b\s+\bdelay\b|\bbenchmark\b\s*\()",
r"(?i)(\bsleep\b\s*\(\d+\))",
r"(?:'|\")?\s*;\s*(drop|truncate|delete|update|insert)",
]
for i, pattern in enumerate(sqli_patterns):
self.rules[f"sqli_{i:03d}"] = WAFRule(
id=f"sqli_{i:03d}",
name=f"SQL Injection Pattern {i+1}",
description=f"Detects SQL injection pattern: {pattern[:50]}...",
phase=RulePhase.REQUEST_BODY,
action=RuleAction.BLOCK,
severity="critical",
category=ThreatCategory.SQL_INJECTION,
conditions=[{
"type": "regex",
"target": ["args", "body", "cookies"],
"pattern": pattern
}],
transformations=["lowercase", "url_decode", "html_entity_decode"]
)
def _add_xss_rules(self):
"""Add XSS detection rules."""
xss_patterns = [
r"(?i)<script[^>]*>[\s\S]*?</script>",
r"(?i)javascript\s*:",
r"(?i)on\w+\s*=",
r"(?i)<\s*img[^>]+\s*onerror\s*=",
r"(?i)<\s*svg[^>]+\s*onload\s*=",
r"(?i)<\s*iframe[^>]*>",
r"(?i)expression\s*\(",
r"(?i)document\.(cookie|domain|write)",
]
for i, pattern in enumerate(xss_patterns):
self.rules[f"xss_{i:03d}"] = WAFRule(
id=f"xss_{i:03d}",
name=f"XSS Pattern {i+1}",
description=f"Detects XSS pattern",
phase=RulePhase.REQUEST_BODY,
action=RuleAction.BLOCK,
severity="high",
category=ThreatCategory.XSS,
conditions=[{
"type": "regex",
"target": ["args", "body", "headers"],
"pattern": pattern
}],
transformations=["url_decode", "html_entity_decode"]
)
def _add_path_traversal_rules(self):
"""Add path traversal detection rules."""
traversal_patterns = [
r"(?:\.{2}[/\\])+",
r"(?:/etc/passwd|/etc/shadow)",
r"(?:c:\\windows|c:\\winnt)",
r"(?:\.htaccess|\.htpasswd)",
r"(?:boot\.ini|win\.ini)",
]
for i, pattern in enumerate(traversal_patterns):
self.rules[f"traversal_{i:03d}"] = WAFRule(
id=f"traversal_{i:03d}",
name=f"Path Traversal Pattern {i+1}",
description="Detects path traversal attempts",
phase=RulePhase.REQUEST_HEADERS,
action=RuleAction.BLOCK,
severity="high",
category=ThreatCategory.PATH_TRAVERSAL,
conditions=[{
"type": "regex",
"target": ["uri", "args"],
"pattern": pattern
}],
transformations=["url_decode", "normalize_path"]
)
def _add_protocol_rules(self):
"""Add protocol attack detection rules."""
protocol_rules = [
{
"name": "HTTP Request Smuggling",
"pattern": r"(?:content-length|transfer-encoding):\s*chunked.*(?:content-length|transfer-encoding)",
"target": ["headers"]
},
{
"name": "Invalid HTTP Version",
"pattern": r"HTTP/[^\d\.\s]",
"target": ["request_line"]
},
{
"name": "Null Byte Injection",
"pattern": r"%00|\\x00|\\0",
"target": ["uri", "args", "body"]
}
]
for i, rule in enumerate(protocol_rules):
self.rules[f"protocol_{i:03d}"] = WAFRule(
id=f"protocol_{i:03d}",
name=rule["name"],
description=f"Detects {rule['name']}",
phase=RulePhase.REQUEST_HEADERS,
action=RuleAction.BLOCK,
severity="high",
category=ThreatCategory.PROTOCOL_ATTACK,
conditions=[{
"type": "regex",
"target": rule["target"],
"pattern": rule["pattern"]
}]
)
def _add_scanner_rules(self):
"""Add vulnerability scanner detection rules."""
scanner_signatures = [
r"(?i)(nikto|nessus|nmap|sqlmap|burp|acunetix|qualys|rapid7)",
r"(?i)(dirbuster|gobuster|wfuzz|ffuf)",
r"(?i)(w3af|arachni|skipfish)",
]
for i, pattern in enumerate(scanner_signatures):
self.rules[f"scanner_{i:03d}"] = WAFRule(
id=f"scanner_{i:03d}",
name=f"Scanner Detection {i+1}",
description="Detects vulnerability scanner",
phase=RulePhase.REQUEST_HEADERS,
action=RuleAction.BLOCK,
severity="medium",
category=ThreatCategory.SCANNER,
conditions=[{
"type": "regex",
"target": ["user_agent", "headers"],
"pattern": pattern
}]
)
def evaluate(self, request: WAFRequest) -> WAFDecision:
"""Evaluate a request against all rules."""
matched_rules = []
threat_score = 0
details = {"matched_patterns": [], "anomaly_scores": {}}
# Check IP reputation first
ip_score = self.ip_reputation.get(request.client_ip, 0)
if ip_score > 80:
return WAFDecision(
request_id=request.request_id,
action=RuleAction.BLOCK,
matched_rules=["ip_reputation"],
threat_score=ip_score,
details={"reason": "IP reputation block", "ip_score": ip_score}
)
# Check rate limits
rate_decision = self._check_rate_limits(request)
if rate_decision:
return rate_decision
# Evaluate rules by phase
for rule_id, rule in self.rules.items():
if not rule.enabled:
continue
if rule.paranoia_level > self.paranoia_level:
continue
if self._rule_matches(rule, request):
matched_rules.append(rule_id)
threat_score += self._get_severity_score(rule.severity)
details["matched_patterns"].append({
"rule_id": rule_id,
"category": rule.category.value,
"severity": rule.severity
})
# Determine final action based on threat score
action = self._determine_action(threat_score, matched_rules)
decision = WAFDecision(
request_id=request.request_id,
action=action,
matched_rules=matched_rules,
threat_score=threat_score,
details=details
)
# Update IP reputation based on decision
self._update_ip_reputation(request.client_ip, threat_score)
return decision
def _rule_matches(self, rule: WAFRule, request: WAFRequest) -> bool:
"""Check if a rule matches the request."""
for condition in rule.conditions:
if condition["type"] == "regex":
pattern = condition["pattern"]
targets = condition["target"]
for target in targets:
value = self._get_target_value(request, target)
if value:
# Apply transformations
transformed = self._apply_transformations(value, rule.transformations)
if re.search(pattern, transformed):
return True
return False
def _get_target_value(self, request: WAFRequest, target: str) -> Optional[str]:
"""Get value from request for the specified target."""
target_mapping = {
"uri": request.uri,
"args": request.query_string,
"body": request.body,
"user_agent": request.user_agent,
"headers": json.dumps(request.headers),
"cookies": json.dumps(request.cookies)
}
return target_mapping.get(target)
def _apply_transformations(self, value: str, transformations: List[str]) -> str:
"""Apply transformations to a value."""
result = value
for transform in transformations:
if transform == "lowercase":
result = result.lower()
elif transform == "url_decode":
import urllib.parse
result = urllib.parse.unquote(result)
elif transform == "html_entity_decode":
import html
result = html.unescape(result)
elif transform == "normalize_path":
result = re.sub(r'/+', '/', result)
return result
def _get_severity_score(self, severity: str) -> int:
"""Get numeric score for severity."""
scores = {
"critical": 50,
"high": 25,
"medium": 10,
"low": 5
}
return scores.get(severity, 5)
def _determine_action(self, threat_score: int, matched_rules: List[str]) -> RuleAction:
"""Determine final action based on threat score."""
thresholds = self.config.get("thresholds", {
"block": 50,
"challenge": 25,
"log": 10
})
if threat_score >= thresholds["block"]:
return RuleAction.BLOCK
elif threat_score >= thresholds["challenge"]:
return RuleAction.CHALLENGE
elif threat_score >= thresholds["log"]:
return RuleAction.LOG
return RuleAction.ALLOW
def _check_rate_limits(self, request: WAFRequest) -> Optional[WAFDecision]:
"""Check rate limits for the request."""
# Implementation would check against rate limit store
return None
def _update_ip_reputation(self, ip: str, threat_score: int):
"""Update IP reputation based on request."""
current = self.ip_reputation.get(ip, 0)
# Weighted average with decay
self.ip_reputation[ip] = min(100, int(current * 0.9 + threat_score * 0.1))
class RateLimiter:
def __init__(self, requests_per_second: int, burst: int):
self.rps = requests_per_second
self.burst = burst
self.tokens: Dict[str, float] = {}
self.last_update: Dict[str, datetime] = {}
def check(self, key: str) -> bool:
"""Check if request is within rate limit."""
now = datetime.utcnow()
if key not in self.tokens:
self.tokens[key] = self.burst
self.last_update[key] = now
return True
# Add tokens based on time elapsed
elapsed = (now - self.last_update[key]).total_seconds()
self.tokens[key] = min(
self.burst,
self.tokens[key] + elapsed * self.rps
)
self.last_update[key] = now
if self.tokens[key] >= 1:
self.tokens[key] -= 1
return True
return FalseBot Protection
class BotDetector:
def __init__(self):
self.known_bots = self._load_known_bots()
self.challenge_tokens: Dict[str, datetime] = {}
def _load_known_bots(self) -> Dict:
"""Load known good and bad bot signatures."""
return {
"good_bots": {
"googlebot": r"(?i)googlebot",
"bingbot": r"(?i)bingbot",
"yandexbot": r"(?i)yandexbot",
"duckduckbot": r"(?i)duckduckbot"
},
"bad_bots": {
"generic_bot": r"(?i)\bbot\b",
"crawler": r"(?i)crawler|spider|scraper",
"headless": r"(?i)headless|phantom|puppeteer|selenium"
}
}
def analyze(self, request: WAFRequest) -> Dict:
"""Analyze request for bot characteristics."""
analysis = {
"is_bot": False,
"bot_type": None,
"confidence": 0,
"signals": []
}
# Check User-Agent
ua_analysis = self._analyze_user_agent(request.user_agent)
analysis["signals"].append(ua_analysis)
# Check request patterns
pattern_analysis = self._analyze_patterns(request)
analysis["signals"].append(pattern_analysis)
# Check for automation signals
automation_analysis = self._check_automation_signals(request)
analysis["signals"].append(automation_analysis)
# Aggregate signals
bot_score = sum(s["score"] for s in analysis["signals"])
analysis["confidence"] = min(100, bot_score)
analysis["is_bot"] = bot_score >= 50
if analysis["is_bot"]:
analysis["bot_type"] = self._classify_bot(analysis["signals"])
return analysis
def _analyze_user_agent(self, user_agent: str) -> Dict:
"""Analyze User-Agent string."""
result = {"type": "user_agent", "score": 0, "details": []}
# Check for known good bots
for name, pattern in self.known_bots["good_bots"].items():
if re.search(pattern, user_agent):
result["details"].append(f"Known good bot: {name}")
result["score"] = -20
return result
# Check for known bad bots
for name, pattern in self.known_bots["bad_bots"].items():
if re.search(pattern, user_agent):
result["details"].append(f"Known bad bot signature: {name}")
result["score"] = 40
# Check for missing or suspicious UA
if not user_agent or len(user_agent) < 10:
result["details"].append("Missing or short User-Agent")
result["score"] += 30
# Check for common browser markers
browser_markers = ["Mozilla", "Chrome", "Safari", "Firefox", "Edge"]
if not any(marker in user_agent for marker in browser_markers):
result["details"].append("Missing browser markers")
result["score"] += 20
return result
def _analyze_patterns(self, request: WAFRequest) -> Dict:
"""Analyze request patterns."""
result = {"type": "patterns", "score": 0, "details": []}
# Check headers
expected_headers = ["accept", "accept-language", "accept-encoding"]
missing_headers = [h for h in expected_headers if h not in [k.lower() for k in request.headers.keys()]]
if missing_headers:
result["details"].append(f"Missing expected headers: {missing_headers}")
result["score"] += len(missing_headers) * 10
# Check for JavaScript execution evidence
if "sec-ch-ua" not in [k.lower() for k in request.headers.keys()]:
result["details"].append("Missing client hints (no JS execution)")
result["score"] += 15
return result
def _check_automation_signals(self, request: WAFRequest) -> Dict:
"""Check for browser automation signals."""
result = {"type": "automation", "score": 0, "details": []}
suspicious_indicators = [
("webdriver", "WebDriver detected"),
("phantom", "PhantomJS detected"),
("selenium", "Selenium detected"),
("puppeteer", "Puppeteer detected"),
("headless", "Headless browser detected")
]
for indicator, message in suspicious_indicators:
if indicator in request.user_agent.lower():
result["details"].append(message)
result["score"] += 50
return result
def _classify_bot(self, signals: List[Dict]) -> str:
"""Classify the type of bot."""
all_details = " ".join(str(s.get("details", "")) for s in signals).lower()
if "headless" in all_details or "automation" in all_details:
return "automation_tool"
if "scraper" in all_details or "crawler" in all_details:
return "scraper"
if "scanner" in all_details:
return "vulnerability_scanner"
return "unknown_bot"
def generate_challenge(self, request_id: str) -> Dict:
"""Generate a JavaScript challenge."""
import secrets
token = secrets.token_urlsafe(32)
self.challenge_tokens[token] = datetime.utcnow()
return {
"type": "js_challenge",
"token": token,
"html": f"""
<!DOCTYPE html>
<html>
<head><title>Security Check</title></head>
<body>
<h1>Verifying your browser...</h1>
<script>
// Simple JS challenge
var solution = btoa('{token}' + navigator.userAgent.length);
document.cookie = '_cf_challenge=' + solution + '; path=/';
location.reload();
</script>
</body>
</html>
"""
}
def verify_challenge(self, token: str, solution: str) -> bool:
"""Verify challenge solution."""
if token not in self.challenge_tokens:
return False
# Check token age (5 minute expiry)
age = (datetime.utcnow() - self.challenge_tokens[token]).total_seconds()
if age > 300:
del self.challenge_tokens[token]
return False
# Verify solution (simplified)
# In production, would verify the actual computation
del self.challenge_tokens[token]
return len(solution) > 0CI/CD Integration
# .github/workflows/waf-rules.yml
name: WAF Rules Deployment
on:
push:
paths:
- 'waf-rules/**'
pull_request:
paths:
- 'waf-rules/**'
jobs:
validate-rules:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate Rule Syntax
run: |
python scripts/validate_waf_rules.py waf-rules/
- name: Test Rules Against Sample Traffic
run: |
python scripts/test_waf_rules.py \
--rules waf-rules/ \
--traffic test-traffic/samples.json \
--report results/test-report.json
- name: Check for False Positives
run: |
python scripts/check_false_positives.py \
--results results/test-report.json \
--threshold 0.01
deploy-staging:
needs: validate-rules
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to Staging WAF
run: |
waf-cli deploy \
--environment staging \
--rules waf-rules/ \
--dry-run false
- name: Run Integration Tests
run: |
pytest tests/integration/waf/ \
--environment staging
deploy-production:
needs: deploy-staging
runs-on: ubuntu-latest
environment: production
steps:
- name: Deploy to Production WAF
run: |
waf-cli deploy \
--environment production \
--rules waf-rules/ \
--canary 10 \
--canary-duration 30m
- name: Monitor Deployment
run: |
waf-cli monitor \
--duration 30m \
--alert-threshold 5Best Practices
WAF Configuration
- Start in detection mode: Log-only before blocking
- Tune for your application: Adjust rules to reduce false positives
- Layer defenses: Combine WAF with other security controls
- Regular updates: Keep rules current with threat landscape
Monitoring and Response
- Monitor block rates and false positive trends
- Set up alerts for attack spikes
- Review logs regularly for tuning opportunities
- Implement incident response procedures
WAF implementation provides essential protection against web application attacks when properly configured and maintained.