DevSecOps

API Security Testing Automation: OWASP API Top 10 Coverage

DeviDevs Team
10 min read
#API security#OWASP#security testing#DevSecOps#automation

API security testing must be automated and integrated into CI/CD pipelines to catch vulnerabilities before production. This guide covers comprehensive testing for OWASP API Top 10 vulnerabilities with practical automation frameworks.

API Security Testing Framework

Core Testing Architecture

# api_security_tester.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from enum import Enum
import aiohttp
import asyncio
import json
from datetime import datetime
import uuid
 
class VulnerabilityType(Enum):
    BOLA = "API1:2023 Broken Object Level Authorization"
    BROKEN_AUTH = "API2:2023 Broken Authentication"
    BOPLA = "API3:2023 Broken Object Property Level Authorization"
    UNRESTRICTED_RESOURCE = "API4:2023 Unrestricted Resource Consumption"
    BFLA = "API5:2023 Broken Function Level Authorization"
    SSRF = "API6:2023 Server Side Request Forgery"
    SECURITY_MISCONFIG = "API7:2023 Security Misconfiguration"
    LACK_OF_PROTECTION = "API8:2023 Lack of Protection from Automated Threats"
    IMPROPER_INVENTORY = "API9:2023 Improper Inventory Management"
    UNSAFE_CONSUMPTION = "API10:2023 Unsafe Consumption of APIs"
 
class Severity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"
 
@dataclass
class SecurityFinding:
    finding_id: str
    vulnerability_type: VulnerabilityType
    severity: Severity
    endpoint: str
    method: str
    description: str
    evidence: Dict
    remediation: str
    cvss_score: Optional[float] = None
 
@dataclass
class TestCase:
    test_id: str
    name: str
    vulnerability_type: VulnerabilityType
    test_function: Callable
    severity: Severity
    description: str
 
@dataclass
class APISecurityReport:
    report_id: str
    target_api: str
    scan_start: datetime
    scan_end: datetime
    findings: List[SecurityFinding]
    endpoints_tested: int
    tests_executed: int
    risk_score: float
 
class APISecurityTester:
    """Comprehensive API security testing framework"""
 
    def __init__(self, base_url: str, auth_config: Dict = None):
        self.base_url = base_url.rstrip('/')
        self.auth_config = auth_config or {}
        self.findings: List[SecurityFinding] = []
        self.session = None
 
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
 
    async def __aexit__(self, *args):
        await self.session.close()
 
    async def run_full_scan(self, endpoints: List[Dict]) -> APISecurityReport:
        """Run complete security scan"""
        scan_start = datetime.utcnow()
        tests_executed = 0
 
        for endpoint in endpoints:
            # Run all applicable tests
            tests = self._get_tests_for_endpoint(endpoint)
            for test in tests:
                finding = await test.test_function(self, endpoint)
                if finding:
                    self.findings.append(finding)
                tests_executed += 1
 
        scan_end = datetime.utcnow()
 
        return APISecurityReport(
            report_id=str(uuid.uuid4()),
            target_api=self.base_url,
            scan_start=scan_start,
            scan_end=scan_end,
            findings=self.findings,
            endpoints_tested=len(endpoints),
            tests_executed=tests_executed,
            risk_score=self._calculate_risk_score()
        )
 
    def _get_tests_for_endpoint(self, endpoint: Dict) -> List[TestCase]:
        """Get applicable tests for endpoint"""
        tests = [
            TestCase("BOLA-01", "IDOR Test", VulnerabilityType.BOLA,
                    self.test_bola, Severity.HIGH, "Test for broken object level authorization"),
            TestCase("AUTH-01", "Auth Bypass", VulnerabilityType.BROKEN_AUTH,
                    self.test_auth_bypass, Severity.CRITICAL, "Test authentication bypass"),
            TestCase("RATE-01", "Rate Limiting", VulnerabilityType.UNRESTRICTED_RESOURCE,
                    self.test_rate_limiting, Severity.MEDIUM, "Test rate limiting"),
            TestCase("INJECT-01", "SQL Injection", VulnerabilityType.SECURITY_MISCONFIG,
                    self.test_sql_injection, Severity.CRITICAL, "Test SQL injection"),
        ]
        return tests
 
    def _calculate_risk_score(self) -> float:
        """Calculate overall risk score"""
        if not self.findings:
            return 0.0
 
        severity_weights = {
            Severity.CRITICAL: 10,
            Severity.HIGH: 7,
            Severity.MEDIUM: 4,
            Severity.LOW: 1,
            Severity.INFO: 0
        }
 
        total = sum(severity_weights[f.severity] for f in self.findings)
        max_possible = len(self.findings) * 10
        return min(100, (total / max_possible) * 100) if max_possible > 0 else 0

BOLA (Broken Object Level Authorization) Testing

IDOR Detection

# bola_tests.py
async def test_bola(self, endpoint: Dict) -> Optional[SecurityFinding]:
    """Test for Broken Object Level Authorization (BOLA/IDOR)"""
 
    # Extract ID parameter patterns
    path = endpoint.get('path', '')
    method = endpoint.get('method', 'GET')
 
    # Skip if no ID in path
    if not any(p in path for p in ['{id}', '{userId}', '{orderId}', '<id>']):
        return None
 
    # Get valid IDs for authenticated user
    valid_ids = await self._get_user_resources(endpoint)
 
    if not valid_ids:
        return None
 
    # Test accessing other users' resources
    test_cases = [
        # Increment ID
        str(int(valid_ids[0]) + 1) if valid_ids[0].isdigit() else None,
        # Common test IDs
        "1", "0", "999999",
        # UUID manipulation
        valid_ids[0].replace(valid_ids[0][-1], 'a') if len(valid_ids[0]) > 10 else None,
    ]
 
    for test_id in filter(None, test_cases):
        test_path = self._replace_id(path, test_id)
        url = f"{self.base_url}{test_path}"
 
        try:
            async with self.session.request(
                method,
                url,
                headers=self._get_auth_headers()
            ) as response:
 
                if response.status == 200:
                    body = await response.json()
 
                    # Check if we accessed another user's data
                    if self._is_unauthorized_access(body, test_id):
                        return SecurityFinding(
                            finding_id=f"BOLA-{uuid.uuid4().hex[:8]}",
                            vulnerability_type=VulnerabilityType.BOLA,
                            severity=Severity.HIGH,
                            endpoint=path,
                            method=method,
                            description=f"IDOR vulnerability: Able to access resource {test_id}",
                            evidence={
                                "tested_id": test_id,
                                "response_status": response.status,
                                "response_preview": str(body)[:200]
                            },
                            remediation="Implement proper authorization checks verifying resource ownership"
                        )
 
        except Exception as e:
            continue
 
    return None
 
async def _get_user_resources(self, endpoint: Dict) -> List[str]:
    """Get valid resource IDs for current user"""
    # Implementation depends on API structure
    list_endpoint = endpoint.get('list_endpoint')
    if not list_endpoint:
        return []
 
    async with self.session.get(
        f"{self.base_url}{list_endpoint}",
        headers=self._get_auth_headers()
    ) as response:
        if response.status == 200:
            data = await response.json()
            return [str(item.get('id')) for item in data.get('items', data)]
    return []
 
def _replace_id(self, path: str, new_id: str) -> str:
    """Replace ID placeholder in path"""
    import re
    patterns = [r'\{[^}]*[iI]d[^}]*\}', r'<[^>]*[iI]d[^>]*>', r'/\d+(/|$)']
    for pattern in patterns:
        path = re.sub(pattern, f'/{new_id}/', path)
    return path.replace('//', '/')
 
def _is_unauthorized_access(self, body: Dict, test_id: str) -> bool:
    """Check if response indicates unauthorized access to another user's data"""
    # Check if response contains the test ID (indicating we got someone else's data)
    body_str = json.dumps(body)
    return test_id in body_str

Authentication Testing

Auth Bypass Detection

# auth_tests.py
async def test_auth_bypass(self, endpoint: Dict) -> Optional[SecurityFinding]:
    """Test for authentication bypass vulnerabilities"""
 
    path = endpoint.get('path', '')
    method = endpoint.get('method', 'GET')
 
    # Skip public endpoints
    if endpoint.get('public', False):
        return None
 
    findings = []
 
    # Test 1: No authentication
    async with self.session.request(
        method,
        f"{self.base_url}{path}",
        headers={"Content-Type": "application/json"}
    ) as response:
 
        if response.status == 200:
            findings.append({
                "test": "no_auth",
                "description": "Endpoint accessible without authentication",
                "status": response.status
            })
 
    # Test 2: Invalid token
    async with self.session.request(
        method,
        f"{self.base_url}{path}",
        headers={
            "Authorization": "Bearer invalid_token_12345",
            "Content-Type": "application/json"
        }
    ) as response:
 
        if response.status == 200:
            findings.append({
                "test": "invalid_token",
                "description": "Endpoint accepts invalid token",
                "status": response.status
            })
 
    # Test 3: Expired token format
    expired_token = self._generate_expired_jwt()
    async with self.session.request(
        method,
        f"{self.base_url}{path}",
        headers={
            "Authorization": f"Bearer {expired_token}",
            "Content-Type": "application/json"
        }
    ) as response:
 
        if response.status == 200:
            findings.append({
                "test": "expired_token",
                "description": "Endpoint accepts expired token",
                "status": response.status
            })
 
    # Test 4: JWT none algorithm
    none_alg_token = self._generate_none_alg_jwt()
    async with self.session.request(
        method,
        f"{self.base_url}{path}",
        headers={
            "Authorization": f"Bearer {none_alg_token}",
            "Content-Type": "application/json"
        }
    ) as response:
 
        if response.status == 200:
            findings.append({
                "test": "jwt_none_alg",
                "description": "Endpoint accepts JWT with 'none' algorithm",
                "status": response.status
            })
 
    if findings:
        return SecurityFinding(
            finding_id=f"AUTH-{uuid.uuid4().hex[:8]}",
            vulnerability_type=VulnerabilityType.BROKEN_AUTH,
            severity=Severity.CRITICAL,
            endpoint=path,
            method=method,
            description="Authentication bypass vulnerability detected",
            evidence={"tests_failed": findings},
            remediation="Implement proper JWT validation including algorithm verification and expiry checks"
        )
 
    return None
 
def _generate_expired_jwt(self) -> str:
    """Generate expired JWT for testing"""
    import base64
    import json
 
    header = {"alg": "HS256", "typ": "JWT"}
    payload = {
        "sub": "test_user",
        "exp": 1000000000,  # Expired
        "iat": 1000000000
    }
 
    header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
    payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
 
    return f"{header_b64}.{payload_b64}.fake_signature"
 
def _generate_none_alg_jwt(self) -> str:
    """Generate JWT with 'none' algorithm"""
    import base64
    import json
 
    header = {"alg": "none", "typ": "JWT"}
    payload = {
        "sub": "admin",
        "role": "admin",
        "exp": 9999999999
    }
 
    header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
    payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
 
    return f"{header_b64}.{payload_b64}."

Injection Testing

SQL Injection Detection

# injection_tests.py
async def test_sql_injection(self, endpoint: Dict) -> Optional[SecurityFinding]:
    """Test for SQL injection vulnerabilities"""
 
    path = endpoint.get('path', '')
    method = endpoint.get('method', 'GET')
    params = endpoint.get('parameters', [])
 
    sql_payloads = [
        "' OR '1'='1",
        "' OR '1'='1' --",
        "'; DROP TABLE users; --",
        "1' AND SLEEP(5) --",
        "1 UNION SELECT NULL,NULL,NULL --",
        "' UNION SELECT username,password FROM users --",
        "1'; WAITFOR DELAY '0:0:5' --",
        "1 AND 1=1",
        "1 AND 1=2",
    ]
 
    for param in params:
        for payload in sql_payloads:
            start_time = datetime.utcnow()
 
            if method == 'GET':
                test_url = f"{self.base_url}{path}?{param['name']}={payload}"
                async with self.session.get(
                    test_url,
                    headers=self._get_auth_headers()
                ) as response:
                    elapsed = (datetime.utcnow() - start_time).total_seconds()
                    body = await response.text()
 
                    finding = self._analyze_sqli_response(
                        response.status, body, elapsed, payload, path, method, param['name']
                    )
                    if finding:
                        return finding
 
            elif method == 'POST':
                test_body = {param['name']: payload}
                async with self.session.post(
                    f"{self.base_url}{path}",
                    headers=self._get_auth_headers(),
                    json=test_body
                ) as response:
                    elapsed = (datetime.utcnow() - start_time).total_seconds()
                    body = await response.text()
 
                    finding = self._analyze_sqli_response(
                        response.status, body, elapsed, payload, path, method, param['name']
                    )
                    if finding:
                        return finding
 
    return None
 
def _analyze_sqli_response(
    self,
    status: int,
    body: str,
    elapsed: float,
    payload: str,
    path: str,
    method: str,
    param: str
) -> Optional[SecurityFinding]:
    """Analyze response for SQL injection indicators"""
 
    # Error-based detection
    sql_errors = [
        "sql syntax", "mysql", "sqlite", "postgresql", "oracle",
        "mssql", "syntax error", "unclosed quotation",
        "quoted string not properly terminated"
    ]
 
    body_lower = body.lower()
    for error in sql_errors:
        if error in body_lower:
            return SecurityFinding(
                finding_id=f"SQLI-{uuid.uuid4().hex[:8]}",
                vulnerability_type=VulnerabilityType.SECURITY_MISCONFIG,
                severity=Severity.CRITICAL,
                endpoint=path,
                method=method,
                description=f"SQL injection via {param} parameter (error-based)",
                evidence={
                    "payload": payload,
                    "error_indicator": error,
                    "response_preview": body[:200]
                },
                remediation="Use parameterized queries/prepared statements",
                cvss_score=9.8
            )
 
    # Time-based detection
    if "SLEEP" in payload or "WAITFOR" in payload:
        if elapsed > 4.5:  # Expected delay is 5 seconds
            return SecurityFinding(
                finding_id=f"SQLI-{uuid.uuid4().hex[:8]}",
                vulnerability_type=VulnerabilityType.SECURITY_MISCONFIG,
                severity=Severity.CRITICAL,
                endpoint=path,
                method=method,
                description=f"SQL injection via {param} parameter (time-based)",
                evidence={
                    "payload": payload,
                    "response_time": elapsed
                },
                remediation="Use parameterized queries/prepared statements",
                cvss_score=9.8
            )
 
    # Boolean-based detection handled separately
    return None

Rate Limiting Testing

Rate Limit Bypass

# rate_limit_tests.py
async def test_rate_limiting(self, endpoint: Dict) -> Optional[SecurityFinding]:
    """Test for missing or bypassable rate limiting"""
 
    path = endpoint.get('path', '')
    method = endpoint.get('method', 'GET')
 
    # Test 1: Rapid requests
    request_count = 100
    successful_requests = 0
    rate_limited = False
 
    for i in range(request_count):
        async with self.session.request(
            method,
            f"{self.base_url}{path}",
            headers=self._get_auth_headers()
        ) as response:
 
            if response.status == 429:
                rate_limited = True
                break
            elif response.status in [200, 201, 204]:
                successful_requests += 1
 
    if not rate_limited and successful_requests >= request_count:
        return SecurityFinding(
            finding_id=f"RATE-{uuid.uuid4().hex[:8]}",
            vulnerability_type=VulnerabilityType.UNRESTRICTED_RESOURCE,
            severity=Severity.MEDIUM,
            endpoint=path,
            method=method,
            description="No rate limiting detected on endpoint",
            evidence={
                "requests_sent": request_count,
                "successful_requests": successful_requests
            },
            remediation="Implement rate limiting (e.g., 100 requests/minute per IP/user)"
        )
 
    # Test 2: Rate limit bypass via headers
    bypass_headers = [
        {"X-Forwarded-For": "127.0.0.1"},
        {"X-Real-IP": "10.0.0.1"},
        {"X-Originating-IP": "192.168.1.1"},
        {"X-Client-IP": "172.16.0.1"},
        {"True-Client-IP": "8.8.8.8"}
    ]
 
    for bypass_header in bypass_headers:
        headers = {**self._get_auth_headers(), **bypass_header}
 
        # Send requests quickly
        success_count = 0
        for _ in range(20):
            async with self.session.request(
                method,
                f"{self.base_url}{path}",
                headers=headers
            ) as response:
                if response.status in [200, 201, 204]:
                    success_count += 1
 
        if success_count >= 20:
            return SecurityFinding(
                finding_id=f"RATE-BYPASS-{uuid.uuid4().hex[:8]}",
                vulnerability_type=VulnerabilityType.UNRESTRICTED_RESOURCE,
                severity=Severity.HIGH,
                endpoint=path,
                method=method,
                description=f"Rate limiting bypassed via {list(bypass_header.keys())[0]}",
                evidence={
                    "bypass_header": bypass_header,
                    "successful_requests": success_count
                },
                remediation="Implement rate limiting based on authenticated user ID, not client-provided headers"
            )
 
    return None

CI/CD Integration

GitHub Actions Security Pipeline

# .github/workflows/api-security-tests.yml
name: API Security Tests
 
on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 2 * * *'  # Daily at 2 AM
 
jobs:
  api-security-scan:
    runs-on: ubuntu-latest
 
    services:
      app:
        image: ${{ github.repository }}:${{ github.sha }}
        ports:
          - 3000:3000
        env:
          NODE_ENV: test
 
    steps:
      - uses: actions/checkout@v4
 
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
 
      - name: Install dependencies
        run: |
          pip install aiohttp pytest pytest-asyncio
 
      - name: Wait for API
        run: |
          timeout 60 bash -c 'until curl -s http://localhost:3000/health; do sleep 2; done'
 
      - name: Run API Security Tests
        run: |
          python -m pytest tests/security/ \
            --api-url=http://localhost:3000 \
            --junit-xml=security-results.xml \
            --html=security-report.html
 
      - name: Run OWASP ZAP Scan
        uses: zaproxy/action-full-scan@v0.9.0
        with:
          target: 'http://localhost:3000'
          rules_file_name: '.zap/rules.tsv'
          cmd_options: '-a'
 
      - name: Upload Security Report
        uses: actions/upload-artifact@v4
        if: always()
        with:
          name: security-reports
          path: |
            security-results.xml
            security-report.html
            zap-report.html
 
      - name: Check for Critical Findings
        run: |
          python scripts/check_security_findings.py security-results.xml

Security Check Script

# scripts/check_security_findings.py
import xml.etree.ElementTree as ET
import sys
 
def check_findings(report_path: str) -> int:
    """Check security findings and fail on critical issues"""
 
    tree = ET.parse(report_path)
    root = tree.getroot()
 
    critical_count = 0
    high_count = 0
 
    for testcase in root.findall('.//testcase'):
        failure = testcase.find('failure')
        if failure is not None:
            message = failure.get('message', '')
            if 'CRITICAL' in message:
                critical_count += 1
                print(f"❌ CRITICAL: {testcase.get('name')}")
            elif 'HIGH' in message:
                high_count += 1
                print(f"⚠️  HIGH: {testcase.get('name')}")
 
    print(f"\nSummary: {critical_count} critical, {high_count} high severity findings")
 
    # Fail pipeline on critical findings
    if critical_count > 0:
        print("\n❌ Pipeline failed due to critical security findings")
        return 1
 
    # Warn but don't fail on high findings
    if high_count > 0:
        print("\n⚠️  Warning: High severity findings detected")
 
    return 0
 
if __name__ == "__main__":
    sys.exit(check_findings(sys.argv[1]))

Summary

Comprehensive API security testing should cover:

  1. BOLA/IDOR: Test horizontal and vertical authorization
  2. Authentication: JWT validation, token expiry, algorithm attacks
  3. Injection: SQL, NoSQL, command injection
  4. Rate Limiting: Bypass techniques and resource exhaustion
  5. SSRF: Internal network access attempts
  6. Mass Assignment: Property-level authorization

Integrate these tests into CI/CD to catch vulnerabilities before deployment. Regular testing against the OWASP API Security Top 10 provides comprehensive coverage of common API vulnerabilities.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.