DevSecOps

Web Application Firewall Implementation: WAF Security Configuration Guide

DeviDevs Team
10 min read
#WAF#web security#application security#DevSecOps#firewall

Web Application Firewall Implementation: WAF Security Configuration Guide

Web Application Firewalls provide critical protection against web attacks. This guide covers implementing and configuring WAF solutions for comprehensive application security.

WAF Architecture Overview

Custom WAF Rule Engine

from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional, Callable, Set
from enum import Enum
import re
import json
 
class RuleAction(Enum):
    ALLOW = "allow"
    BLOCK = "block"
    LOG = "log"
    CHALLENGE = "challenge"
    RATE_LIMIT = "rate_limit"
 
class RulePhase(Enum):
    REQUEST_HEADERS = "request_headers"
    REQUEST_BODY = "request_body"
    RESPONSE_HEADERS = "response_headers"
    RESPONSE_BODY = "response_body"
 
class ThreatCategory(Enum):
    SQL_INJECTION = "sql_injection"
    XSS = "cross_site_scripting"
    PATH_TRAVERSAL = "path_traversal"
    COMMAND_INJECTION = "command_injection"
    FILE_INCLUSION = "file_inclusion"
    PROTOCOL_ATTACK = "protocol_attack"
    BOT = "bot"
    SCANNER = "scanner"
    RATE_ABUSE = "rate_abuse"
 
@dataclass
class WAFRule:
    id: str
    name: str
    description: str
    phase: RulePhase
    action: RuleAction
    severity: str  # critical, high, medium, low
    category: ThreatCategory
    conditions: List[Dict]
    transformations: List[str] = field(default_factory=list)
    enabled: bool = True
    paranoia_level: int = 1  # 1-4, higher = more strict
 
@dataclass
class WAFRequest:
    request_id: str
    timestamp: datetime
    client_ip: str
    method: str
    uri: str
    query_string: Optional[str]
    headers: Dict[str, str]
    cookies: Dict[str, str]
    body: Optional[str]
    user_agent: str
    country: Optional[str] = None
    asn: Optional[int] = None
 
@dataclass
class WAFDecision:
    request_id: str
    action: RuleAction
    matched_rules: List[str]
    threat_score: int
    details: Dict
    timestamp: datetime = field(default_factory=datetime.utcnow)
 
class WAFEngine:
    def __init__(self, config: Dict = None):
        self.config = config or {}
        self.rules: Dict[str, WAFRule] = {}
        self.ip_reputation: Dict[str, int] = {}
        self.rate_limiters: Dict[str, 'RateLimiter'] = {}
        self.paranoia_level = config.get('paranoia_level', 1)
 
        self._load_core_rules()
 
    def _load_core_rules(self):
        """Load OWASP ModSecurity Core Rule Set inspired rules."""
        # SQL Injection rules
        self._add_sqli_rules()
        # XSS rules
        self._add_xss_rules()
        # Path traversal rules
        self._add_path_traversal_rules()
        # Protocol attack rules
        self._add_protocol_rules()
        # Scanner detection rules
        self._add_scanner_rules()
 
    def _add_sqli_rules(self):
        """Add SQL injection detection rules."""
        sqli_patterns = [
            r"(?i)(\b(union|select|insert|update|delete|drop|create|alter|exec|execute)\b.*\b(from|into|table|database|schema)\b)",
            r"(?i)(--|\#|\/\*|\*\/)",
            r"(?i)(\bor\b|\band\b)\s*[\d\w'\"]+\s*[=<>]",
            r"(?i)(char|nchar|varchar|nvarchar)\s*\(",
            r"(?i)(\bwaitfor\b\s+\bdelay\b|\bbenchmark\b\s*\()",
            r"(?i)(\bsleep\b\s*\(\d+\))",
            r"(?:'|\")?\s*;\s*(drop|truncate|delete|update|insert)",
        ]
 
        for i, pattern in enumerate(sqli_patterns):
            self.rules[f"sqli_{i:03d}"] = WAFRule(
                id=f"sqli_{i:03d}",
                name=f"SQL Injection Pattern {i+1}",
                description=f"Detects SQL injection pattern: {pattern[:50]}...",
                phase=RulePhase.REQUEST_BODY,
                action=RuleAction.BLOCK,
                severity="critical",
                category=ThreatCategory.SQL_INJECTION,
                conditions=[{
                    "type": "regex",
                    "target": ["args", "body", "cookies"],
                    "pattern": pattern
                }],
                transformations=["lowercase", "url_decode", "html_entity_decode"]
            )
 
    def _add_xss_rules(self):
        """Add XSS detection rules."""
        xss_patterns = [
            r"(?i)<script[^>]*>[\s\S]*?</script>",
            r"(?i)javascript\s*:",
            r"(?i)on\w+\s*=",
            r"(?i)<\s*img[^>]+\s*onerror\s*=",
            r"(?i)<\s*svg[^>]+\s*onload\s*=",
            r"(?i)<\s*iframe[^>]*>",
            r"(?i)expression\s*\(",
            r"(?i)document\.(cookie|domain|write)",
        ]
 
        for i, pattern in enumerate(xss_patterns):
            self.rules[f"xss_{i:03d}"] = WAFRule(
                id=f"xss_{i:03d}",
                name=f"XSS Pattern {i+1}",
                description=f"Detects XSS pattern",
                phase=RulePhase.REQUEST_BODY,
                action=RuleAction.BLOCK,
                severity="high",
                category=ThreatCategory.XSS,
                conditions=[{
                    "type": "regex",
                    "target": ["args", "body", "headers"],
                    "pattern": pattern
                }],
                transformations=["url_decode", "html_entity_decode"]
            )
 
    def _add_path_traversal_rules(self):
        """Add path traversal detection rules."""
        traversal_patterns = [
            r"(?:\.{2}[/\\])+",
            r"(?:/etc/passwd|/etc/shadow)",
            r"(?:c:\\windows|c:\\winnt)",
            r"(?:\.htaccess|\.htpasswd)",
            r"(?:boot\.ini|win\.ini)",
        ]
 
        for i, pattern in enumerate(traversal_patterns):
            self.rules[f"traversal_{i:03d}"] = WAFRule(
                id=f"traversal_{i:03d}",
                name=f"Path Traversal Pattern {i+1}",
                description="Detects path traversal attempts",
                phase=RulePhase.REQUEST_HEADERS,
                action=RuleAction.BLOCK,
                severity="high",
                category=ThreatCategory.PATH_TRAVERSAL,
                conditions=[{
                    "type": "regex",
                    "target": ["uri", "args"],
                    "pattern": pattern
                }],
                transformations=["url_decode", "normalize_path"]
            )
 
    def _add_protocol_rules(self):
        """Add protocol attack detection rules."""
        protocol_rules = [
            {
                "name": "HTTP Request Smuggling",
                "pattern": r"(?:content-length|transfer-encoding):\s*chunked.*(?:content-length|transfer-encoding)",
                "target": ["headers"]
            },
            {
                "name": "Invalid HTTP Version",
                "pattern": r"HTTP/[^\d\.\s]",
                "target": ["request_line"]
            },
            {
                "name": "Null Byte Injection",
                "pattern": r"%00|\\x00|\\0",
                "target": ["uri", "args", "body"]
            }
        ]
 
        for i, rule in enumerate(protocol_rules):
            self.rules[f"protocol_{i:03d}"] = WAFRule(
                id=f"protocol_{i:03d}",
                name=rule["name"],
                description=f"Detects {rule['name']}",
                phase=RulePhase.REQUEST_HEADERS,
                action=RuleAction.BLOCK,
                severity="high",
                category=ThreatCategory.PROTOCOL_ATTACK,
                conditions=[{
                    "type": "regex",
                    "target": rule["target"],
                    "pattern": rule["pattern"]
                }]
            )
 
    def _add_scanner_rules(self):
        """Add vulnerability scanner detection rules."""
        scanner_signatures = [
            r"(?i)(nikto|nessus|nmap|sqlmap|burp|acunetix|qualys|rapid7)",
            r"(?i)(dirbuster|gobuster|wfuzz|ffuf)",
            r"(?i)(w3af|arachni|skipfish)",
        ]
 
        for i, pattern in enumerate(scanner_signatures):
            self.rules[f"scanner_{i:03d}"] = WAFRule(
                id=f"scanner_{i:03d}",
                name=f"Scanner Detection {i+1}",
                description="Detects vulnerability scanner",
                phase=RulePhase.REQUEST_HEADERS,
                action=RuleAction.BLOCK,
                severity="medium",
                category=ThreatCategory.SCANNER,
                conditions=[{
                    "type": "regex",
                    "target": ["user_agent", "headers"],
                    "pattern": pattern
                }]
            )
 
    def evaluate(self, request: WAFRequest) -> WAFDecision:
        """Evaluate a request against all rules."""
        matched_rules = []
        threat_score = 0
        details = {"matched_patterns": [], "anomaly_scores": {}}
 
        # Check IP reputation first
        ip_score = self.ip_reputation.get(request.client_ip, 0)
        if ip_score > 80:
            return WAFDecision(
                request_id=request.request_id,
                action=RuleAction.BLOCK,
                matched_rules=["ip_reputation"],
                threat_score=ip_score,
                details={"reason": "IP reputation block", "ip_score": ip_score}
            )
 
        # Check rate limits
        rate_decision = self._check_rate_limits(request)
        if rate_decision:
            return rate_decision
 
        # Evaluate rules by phase
        for rule_id, rule in self.rules.items():
            if not rule.enabled:
                continue
 
            if rule.paranoia_level > self.paranoia_level:
                continue
 
            if self._rule_matches(rule, request):
                matched_rules.append(rule_id)
                threat_score += self._get_severity_score(rule.severity)
                details["matched_patterns"].append({
                    "rule_id": rule_id,
                    "category": rule.category.value,
                    "severity": rule.severity
                })
 
        # Determine final action based on threat score
        action = self._determine_action(threat_score, matched_rules)
 
        decision = WAFDecision(
            request_id=request.request_id,
            action=action,
            matched_rules=matched_rules,
            threat_score=threat_score,
            details=details
        )
 
        # Update IP reputation based on decision
        self._update_ip_reputation(request.client_ip, threat_score)
 
        return decision
 
    def _rule_matches(self, rule: WAFRule, request: WAFRequest) -> bool:
        """Check if a rule matches the request."""
        for condition in rule.conditions:
            if condition["type"] == "regex":
                pattern = condition["pattern"]
                targets = condition["target"]
 
                for target in targets:
                    value = self._get_target_value(request, target)
                    if value:
                        # Apply transformations
                        transformed = self._apply_transformations(value, rule.transformations)
                        if re.search(pattern, transformed):
                            return True
 
        return False
 
    def _get_target_value(self, request: WAFRequest, target: str) -> Optional[str]:
        """Get value from request for the specified target."""
        target_mapping = {
            "uri": request.uri,
            "args": request.query_string,
            "body": request.body,
            "user_agent": request.user_agent,
            "headers": json.dumps(request.headers),
            "cookies": json.dumps(request.cookies)
        }
        return target_mapping.get(target)
 
    def _apply_transformations(self, value: str, transformations: List[str]) -> str:
        """Apply transformations to a value."""
        result = value
        for transform in transformations:
            if transform == "lowercase":
                result = result.lower()
            elif transform == "url_decode":
                import urllib.parse
                result = urllib.parse.unquote(result)
            elif transform == "html_entity_decode":
                import html
                result = html.unescape(result)
            elif transform == "normalize_path":
                result = re.sub(r'/+', '/', result)
        return result
 
    def _get_severity_score(self, severity: str) -> int:
        """Get numeric score for severity."""
        scores = {
            "critical": 50,
            "high": 25,
            "medium": 10,
            "low": 5
        }
        return scores.get(severity, 5)
 
    def _determine_action(self, threat_score: int, matched_rules: List[str]) -> RuleAction:
        """Determine final action based on threat score."""
        thresholds = self.config.get("thresholds", {
            "block": 50,
            "challenge": 25,
            "log": 10
        })
 
        if threat_score >= thresholds["block"]:
            return RuleAction.BLOCK
        elif threat_score >= thresholds["challenge"]:
            return RuleAction.CHALLENGE
        elif threat_score >= thresholds["log"]:
            return RuleAction.LOG
        return RuleAction.ALLOW
 
    def _check_rate_limits(self, request: WAFRequest) -> Optional[WAFDecision]:
        """Check rate limits for the request."""
        # Implementation would check against rate limit store
        return None
 
    def _update_ip_reputation(self, ip: str, threat_score: int):
        """Update IP reputation based on request."""
        current = self.ip_reputation.get(ip, 0)
        # Weighted average with decay
        self.ip_reputation[ip] = min(100, int(current * 0.9 + threat_score * 0.1))
 
 
class RateLimiter:
    def __init__(self, requests_per_second: int, burst: int):
        self.rps = requests_per_second
        self.burst = burst
        self.tokens: Dict[str, float] = {}
        self.last_update: Dict[str, datetime] = {}
 
    def check(self, key: str) -> bool:
        """Check if request is within rate limit."""
        now = datetime.utcnow()
 
        if key not in self.tokens:
            self.tokens[key] = self.burst
            self.last_update[key] = now
            return True
 
        # Add tokens based on time elapsed
        elapsed = (now - self.last_update[key]).total_seconds()
        self.tokens[key] = min(
            self.burst,
            self.tokens[key] + elapsed * self.rps
        )
        self.last_update[key] = now
 
        if self.tokens[key] >= 1:
            self.tokens[key] -= 1
            return True
 
        return False

Bot Protection

class BotDetector:
    def __init__(self):
        self.known_bots = self._load_known_bots()
        self.challenge_tokens: Dict[str, datetime] = {}
 
    def _load_known_bots(self) -> Dict:
        """Load known good and bad bot signatures."""
        return {
            "good_bots": {
                "googlebot": r"(?i)googlebot",
                "bingbot": r"(?i)bingbot",
                "yandexbot": r"(?i)yandexbot",
                "duckduckbot": r"(?i)duckduckbot"
            },
            "bad_bots": {
                "generic_bot": r"(?i)\bbot\b",
                "crawler": r"(?i)crawler|spider|scraper",
                "headless": r"(?i)headless|phantom|puppeteer|selenium"
            }
        }
 
    def analyze(self, request: WAFRequest) -> Dict:
        """Analyze request for bot characteristics."""
        analysis = {
            "is_bot": False,
            "bot_type": None,
            "confidence": 0,
            "signals": []
        }
 
        # Check User-Agent
        ua_analysis = self._analyze_user_agent(request.user_agent)
        analysis["signals"].append(ua_analysis)
 
        # Check request patterns
        pattern_analysis = self._analyze_patterns(request)
        analysis["signals"].append(pattern_analysis)
 
        # Check for automation signals
        automation_analysis = self._check_automation_signals(request)
        analysis["signals"].append(automation_analysis)
 
        # Aggregate signals
        bot_score = sum(s["score"] for s in analysis["signals"])
        analysis["confidence"] = min(100, bot_score)
        analysis["is_bot"] = bot_score >= 50
 
        if analysis["is_bot"]:
            analysis["bot_type"] = self._classify_bot(analysis["signals"])
 
        return analysis
 
    def _analyze_user_agent(self, user_agent: str) -> Dict:
        """Analyze User-Agent string."""
        result = {"type": "user_agent", "score": 0, "details": []}
 
        # Check for known good bots
        for name, pattern in self.known_bots["good_bots"].items():
            if re.search(pattern, user_agent):
                result["details"].append(f"Known good bot: {name}")
                result["score"] = -20
                return result
 
        # Check for known bad bots
        for name, pattern in self.known_bots["bad_bots"].items():
            if re.search(pattern, user_agent):
                result["details"].append(f"Known bad bot signature: {name}")
                result["score"] = 40
 
        # Check for missing or suspicious UA
        if not user_agent or len(user_agent) < 10:
            result["details"].append("Missing or short User-Agent")
            result["score"] += 30
 
        # Check for common browser markers
        browser_markers = ["Mozilla", "Chrome", "Safari", "Firefox", "Edge"]
        if not any(marker in user_agent for marker in browser_markers):
            result["details"].append("Missing browser markers")
            result["score"] += 20
 
        return result
 
    def _analyze_patterns(self, request: WAFRequest) -> Dict:
        """Analyze request patterns."""
        result = {"type": "patterns", "score": 0, "details": []}
 
        # Check headers
        expected_headers = ["accept", "accept-language", "accept-encoding"]
        missing_headers = [h for h in expected_headers if h not in [k.lower() for k in request.headers.keys()]]
 
        if missing_headers:
            result["details"].append(f"Missing expected headers: {missing_headers}")
            result["score"] += len(missing_headers) * 10
 
        # Check for JavaScript execution evidence
        if "sec-ch-ua" not in [k.lower() for k in request.headers.keys()]:
            result["details"].append("Missing client hints (no JS execution)")
            result["score"] += 15
 
        return result
 
    def _check_automation_signals(self, request: WAFRequest) -> Dict:
        """Check for browser automation signals."""
        result = {"type": "automation", "score": 0, "details": []}
 
        suspicious_indicators = [
            ("webdriver", "WebDriver detected"),
            ("phantom", "PhantomJS detected"),
            ("selenium", "Selenium detected"),
            ("puppeteer", "Puppeteer detected"),
            ("headless", "Headless browser detected")
        ]
 
        for indicator, message in suspicious_indicators:
            if indicator in request.user_agent.lower():
                result["details"].append(message)
                result["score"] += 50
 
        return result
 
    def _classify_bot(self, signals: List[Dict]) -> str:
        """Classify the type of bot."""
        all_details = " ".join(str(s.get("details", "")) for s in signals).lower()
 
        if "headless" in all_details or "automation" in all_details:
            return "automation_tool"
        if "scraper" in all_details or "crawler" in all_details:
            return "scraper"
        if "scanner" in all_details:
            return "vulnerability_scanner"
        return "unknown_bot"
 
    def generate_challenge(self, request_id: str) -> Dict:
        """Generate a JavaScript challenge."""
        import secrets
        token = secrets.token_urlsafe(32)
        self.challenge_tokens[token] = datetime.utcnow()
 
        return {
            "type": "js_challenge",
            "token": token,
            "html": f"""
            <!DOCTYPE html>
            <html>
            <head><title>Security Check</title></head>
            <body>
            <h1>Verifying your browser...</h1>
            <script>
                // Simple JS challenge
                var solution = btoa('{token}' + navigator.userAgent.length);
                document.cookie = '_cf_challenge=' + solution + '; path=/';
                location.reload();
            </script>
            </body>
            </html>
            """
        }
 
    def verify_challenge(self, token: str, solution: str) -> bool:
        """Verify challenge solution."""
        if token not in self.challenge_tokens:
            return False
 
        # Check token age (5 minute expiry)
        age = (datetime.utcnow() - self.challenge_tokens[token]).total_seconds()
        if age > 300:
            del self.challenge_tokens[token]
            return False
 
        # Verify solution (simplified)
        # In production, would verify the actual computation
        del self.challenge_tokens[token]
        return len(solution) > 0

CI/CD Integration

# .github/workflows/waf-rules.yml
name: WAF Rules Deployment
 
on:
  push:
    paths:
      - 'waf-rules/**'
  pull_request:
    paths:
      - 'waf-rules/**'
 
jobs:
  validate-rules:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Validate Rule Syntax
        run: |
          python scripts/validate_waf_rules.py waf-rules/
 
      - name: Test Rules Against Sample Traffic
        run: |
          python scripts/test_waf_rules.py \
            --rules waf-rules/ \
            --traffic test-traffic/samples.json \
            --report results/test-report.json
 
      - name: Check for False Positives
        run: |
          python scripts/check_false_positives.py \
            --results results/test-report.json \
            --threshold 0.01
 
  deploy-staging:
    needs: validate-rules
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Staging WAF
        run: |
          waf-cli deploy \
            --environment staging \
            --rules waf-rules/ \
            --dry-run false
 
      - name: Run Integration Tests
        run: |
          pytest tests/integration/waf/ \
            --environment staging
 
  deploy-production:
    needs: deploy-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - name: Deploy to Production WAF
        run: |
          waf-cli deploy \
            --environment production \
            --rules waf-rules/ \
            --canary 10 \
            --canary-duration 30m
 
      - name: Monitor Deployment
        run: |
          waf-cli monitor \
            --duration 30m \
            --alert-threshold 5

Best Practices

WAF Configuration

  1. Start in detection mode: Log-only before blocking
  2. Tune for your application: Adjust rules to reduce false positives
  3. Layer defenses: Combine WAF with other security controls
  4. Regular updates: Keep rules current with threat landscape

Monitoring and Response

  • Monitor block rates and false positive trends
  • Set up alerts for attack spikes
  • Review logs regularly for tuning opportunities
  • Implement incident response procedures

WAF implementation provides essential protection against web application attacks when properly configured and maintained.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.