DevSecOps

API Security Testing Automation: OWASP API Top 10 Coverage

Nicu Constantin
--10 min lectura
#API security#OWASP#security testing#DevSecOps#automation

API security testing must be automated and integrated into CI/CD pipelines to catch vulnerabilities before production. This guide covers comprehensive testing for OWASP API Top 10 vulnerabilities with practical automation frameworks.

API Security Testing Framework

Core Testing Architecture

# api_security_tester.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from enum import Enum
import aiohttp
import asyncio
import json
from datetime import datetime
import uuid
 
class VulnerabilityType(Enum):
    BOLA = "API1:2023 Broken Object Level Authorization"
    BROKEN_AUTH = "API2:2023 Broken Authentication"
    BOPLA = "API3:2023 Broken Object Property Level Authorization"
    UNRESTRICTED_RESOURCE = "API4:2023 Unrestricted Resource Consumption"
    BFLA = "API5:2023 Broken Function Level Authorization"
    SSRF = "API6:2023 Server Side Request Forgery"
    SECURITY_MISCONFIG = "API7:2023 Security Misconfiguration"
    LACK_OF_PROTECTION = "API8:2023 Lack of Protection from Automated Threats"
    IMPROPER_INVENTORY = "API9:2023 Improper Inventory Management"
    UNSAFE_CONSUMPTION = "API10:2023 Unsafe Consumption of APIs"
 
class Severity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"
 
@dataclass
class SecurityFinding:
    finding_id: str
    vulnerability_type: VulnerabilityType
    severity: Severity
    endpoint: str
    method: str
    description: str
    evidence: Dict
    remediation: str
    cvss_score: Optional[float] = None
 
@dataclass
class TestCase:
    test_id: str
    name: str
    vulnerability_type: VulnerabilityType
    test_function: Callable
    severity: Severity
    description: str
 
@dataclass
class APISecurityReport:
    report_id: str
    target_api: str
    scan_start: datetime
    scan_end: datetime
    findings: List[SecurityFinding]
    endpoints_tested: int
    tests_executed: int
    risk_score: float
 
class APISecurityTester:
    """Comprehensive API security testing framework"""
 
    def __init__(self, base_url: str, auth_config: Dict = None):
        self.base_url = base_url.rstrip('/')
        self.auth_config = auth_config or {}
        self.findings: List[SecurityFinding] = []
        self.session = None
 
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
 
    async def __aexit__(self, *args):
        await self.session.close()
 
    async def run_full_scan(self, endpoints: List[Dict]) -> APISecurityReport:
        """Run complete security scan"""
        scan_start = datetime.utcnow()
        tests_executed = 0
 
        for endpoint in endpoints:
            # Run all applicable tests
            tests = self._get_tests_for_endpoint(endpoint)
            for test in tests:
                finding = await test.test_function(self, endpoint)
                if finding:
                    self.findings.append(finding)
                tests_executed += 1
 
        scan_end = datetime.utcnow()
 
        return APISecurityReport(
            report_id=str(uuid.uuid4()),
            target_api=self.base_url,
            scan_start=scan_start,
            scan_end=scan_end,
            findings=self.findings,
            endpoints_tested=len(endpoints),
            tests_executed=tests_executed,
            risk_score=self._calculate_risk_score()
        )
 
    def _get_tests_for_endpoint(self, endpoint: Dict) -> List[TestCase]:
        """Get applicable tests for endpoint"""
        tests = [
            TestCase("BOLA-01", "IDOR Test", VulnerabilityType.BOLA,
                    self.test_bola, Severity.HIGH, "Test for broken object level authorization"),
            TestCase("AUTH-01", "Auth Bypass", VulnerabilityType.BROKEN_AUTH,
                    self.test_auth_bypass, Severity.CRITICAL, "Test authentication bypass"),
            TestCase("RATE-01", "Rate Limiting", VulnerabilityType.UNRESTRICTED_RESOURCE,
                    self.test_rate_limiting, Severity.MEDIUM, "Test rate limiting"),
            TestCase("INJECT-01", "SQL Injection", VulnerabilityType.SECURITY_MISCONFIG,
                    self.test_sql_injection, Severity.CRITICAL, "Test SQL injection"),
        ]
        return tests
 
    def _calculate_risk_score(self) -> float:
        """Calculate overall risk score"""
        if not self.findings:
            return 0.0
 
        severity_weights = {
            Severity.CRITICAL: 10,
            Severity.HIGH: 7,
            Severity.MEDIUM: 4,
            Severity.LOW: 1,
            Severity.INFO: 0
        }
 
        total = sum(severity_weights[f.severity] for f in self.findings)
        max_possible = len(self.findings) * 10
        return min(100, (total / max_possible) * 100) if max_possible > 0 else 0

BOLA (Broken Object Level Authorization) Testing

IDOR Detection

# bola_tests.py
async def test_bola(self, endpoint: Dict) -> Optional[SecurityFinding]:
    """Test for Broken Object Level Authorization (BOLA/IDOR)"""
 
    # Extract ID parameter patterns
    path = endpoint.get('path', '')
    method = endpoint.get('method', 'GET')
 
    # Skip if no ID in path
    if not any(p in path for p in ['{id}', '{userId}', '{orderId}', '<id>']):
        return None
 
    # Get valid IDs for authenticated user
    valid_ids = await self._get_user_resources(endpoint)
 
    if not valid_ids:
        return None
 
    # Test accessing other users' resources
    test_cases = [
        # Increment ID
        str(int(valid_ids[0]) + 1) if valid_ids[0].isdigit() else None,
        # Common test IDs
        "1", "0", "999999",
        # UUID manipulation
        valid_ids[0].replace(valid_ids[0][-1], 'a') if len(valid_ids[0]) > 10 else None,
    ]
 
    for test_id in filter(None, test_cases):
        test_path = self._replace_id(path, test_id)
        url = f"{self.base_url}{test_path}"
 
        try:
            async with self.session.request(
                method,
                url,
                headers=self._get_auth_headers()
            ) as response:
 
                if response.status == 200:
                    body = await response.json()
 
                    # Check if we accessed another user's data
                    if self._is_unauthorized_access(body, test_id):
                        return SecurityFinding(
                            finding_id=f"BOLA-{uuid.uuid4().hex[:8]}",
                            vulnerability_type=VulnerabilityType.BOLA,
                            severity=Severity.HIGH,
                            endpoint=path,
                            method=method,
                            description=f"IDOR vulnerability: Able to access resource {test_id}",
                            evidence={
                                "tested_id": test_id,
                                "response_status": response.status,
                                "response_preview": str(body)[:200]
                            },
                            remediation="Implement proper authorization checks verifying resource ownership"
                        )
 
        except Exception as e:
            continue
 
    return None
 
async def _get_user_resources(self, endpoint: Dict) -> List[str]:
    """Get valid resource IDs for current user"""
    # Implementation depends on API structure
    list_endpoint = endpoint.get('list_endpoint')
    if not list_endpoint:
        return []
 
    async with self.session.get(
        f"{self.base_url}{list_endpoint}",
        headers=self._get_auth_headers()
    ) as response:
        if response.status == 200:
            data = await response.json()
            return [str(item.get('id')) for item in data.get('items', data)]
    return []
 
def _replace_id(self, path: str, new_id: str) -> str:
    """Replace ID placeholder in path"""
    import re
    patterns = [r'\{[^}]*[iI]d[^}]*\}', r'<[^>]*[iI]d[^>]*>', r'/\d+(/|$)']
    for pattern in patterns:
        path = re.sub(pattern, f'/{new_id}/', path)
    return path.replace('//', '/')
 
def _is_unauthorized_access(self, body: Dict, test_id: str) -> bool:
    """Check if response indicates unauthorized access to another user's data"""
    # Check if response contains the test ID (indicating we got someone else's data)
    body_str = json.dumps(body)
    return test_id in body_str

Authentication Testing

Auth Bypass Detection

# auth_tests.py
async def test_auth_bypass(self, endpoint: Dict) -> Optional[SecurityFinding]:
    """Test for authentication bypass vulnerabilities"""
 
    path = endpoint.get('path', '')
    method = endpoint.get('method', 'GET')
 
    # Skip public endpoints
    if endpoint.get('public', False):
        return None
 
    findings = []
 
    # Test 1: No authentication
    async with self.session.request(
        method,
        f"{self.base_url}{path}",
        headers={"Content-Type": "application/json"}
    ) as response:
 
        if response.status == 200:
            findings.append({
                "test": "no_auth",
                "description": "Endpoint accessible without authentication",
                "status": response.status
            })
 
    # Test 2: Invalid token
    async with self.session.request(
        method,
        f"{self.base_url}{path}",
        headers={
            "Authorization": "Bearer invalid_token_12345",
            "Content-Type": "application/json"
        }
    ) as response:
 
        if response.status == 200:
            findings.append({
                "test": "invalid_token",
                "description": "Endpoint accepts invalid token",
                "status": response.status
            })
 
    # Test 3: Expired token format
    expired_token = self._generate_expired_jwt()
    async with self.session.request(
        method,
        f"{self.base_url}{path}",
        headers={
            "Authorization": f"Bearer {expired_token}",
            "Content-Type": "application/json"
        }
    ) as response:
 
        if response.status == 200:
            findings.append({
                "test": "expired_token",
                "description": "Endpoint accepts expired token",
                "status": response.status
            })
 
    # Test 4: JWT none algorithm
    none_alg_token = self._generate_none_alg_jwt()
    async with self.session.request(
        method,
        f"{self.base_url}{path}",
        headers={
            "Authorization": f"Bearer {none_alg_token}",
            "Content-Type": "application/json"
        }
    ) as response:
 
        if response.status == 200:
            findings.append({
                "test": "jwt_none_alg",
                "description": "Endpoint accepts JWT with 'none' algorithm",
                "status": response.status
            })
 
    if findings:
        return SecurityFinding(
            finding_id=f"AUTH-{uuid.uuid4().hex[:8]}",
            vulnerability_type=VulnerabilityType.BROKEN_AUTH,
            severity=Severity.CRITICAL,
            endpoint=path,
            method=method,
            description="Authentication bypass vulnerability detected",
            evidence={"tests_failed": findings},
            remediation="Implement proper JWT validation including algorithm verification and expiry checks"
        )
 
    return None
 
def _generate_expired_jwt(self) -> str:
    """Generate expired JWT for testing"""
    import base64
    import json
 
    header = {"alg": "HS256", "typ": "JWT"}
    payload = {
        "sub": "test_user",
        "exp": 1000000000,  # Expired
        "iat": 1000000000
    }
 
    header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
    payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
 
    return f"{header_b64}.{payload_b64}.fake_signature"
 
def _generate_none_alg_jwt(self) -> str:
    """Generate JWT with 'none' algorithm"""
    import base64
    import json
 
    header = {"alg": "none", "typ": "JWT"}
    payload = {
        "sub": "admin",
        "role": "admin",
        "exp": 9999999999
    }
 
    header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
    payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
 
    return f"{header_b64}.{payload_b64}."

Injection Testing

SQL Injection Detection

# injection_tests.py
async def test_sql_injection(self, endpoint: Dict) -> Optional[SecurityFinding]:
    """Test for SQL injection vulnerabilities"""
 
    path = endpoint.get('path', '')
    method = endpoint.get('method', 'GET')
    params = endpoint.get('parameters', [])
 
    sql_payloads = [
        "' OR '1'='1",
        "' OR '1'='1' --",
        "'; DROP TABLE users; --",
        "1' AND SLEEP(5) --",
        "1 UNION SELECT NULL,NULL,NULL --",
        "' UNION SELECT username,password FROM users --",
        "1'; WAITFOR DELAY '0:0:5' --",
        "1 AND 1=1",
        "1 AND 1=2",
    ]
 
    for param in params:
        for payload in sql_payloads:
            start_time = datetime.utcnow()
 
            if method == 'GET':
                test_url = f"{self.base_url}{path}?{param['name']}={payload}"
                async with self.session.get(
                    test_url,
                    headers=self._get_auth_headers()
                ) as response:
                    elapsed = (datetime.utcnow() - start_time).total_seconds()
                    body = await response.text()
 
                    finding = self._analyze_sqli_response(
                        response.status, body, elapsed, payload, path, method, param['name']
                    )
                    if finding:
                        return finding
 
            elif method == 'POST':
                test_body = {param['name']: payload}
                async with self.session.post(
                    f"{self.base_url}{path}",
                    headers=self._get_auth_headers(),
                    json=test_body
                ) as response:
                    elapsed = (datetime.utcnow() - start_time).total_seconds()
                    body = await response.text()
 
                    finding = self._analyze_sqli_response(
                        response.status, body, elapsed, payload, path, method, param['name']
                    )
                    if finding:
                        return finding
 
    return None
 
def _analyze_sqli_response(
    self,
    status: int,
    body: str,
    elapsed: float,
    payload: str,
    path: str,
    method: str,
    param: str
) -> Optional[SecurityFinding]:
    """Analyze response for SQL injection indicators"""
 
    # Error-based detection
    sql_errors = [
        "sql syntax", "mysql", "sqlite", "postgresql", "oracle",
        "mssql", "syntax error", "unclosed quotation",
        "quoted string not properly terminated"
    ]
 
    body_lower = body.lower()
    for error in sql_errors:
        if error in body_lower:
            return SecurityFinding(
                finding_id=f"SQLI-{uuid.uuid4().hex[:8]}",
                vulnerability_type=VulnerabilityType.SECURITY_MISCONFIG,
                severity=Severity.CRITICAL,
                endpoint=path,
                method=method,
                description=f"SQL injection via {param} parameter (error-based)",
                evidence={
                    "payload": payload,
                    "error_indicator": error,
                    "response_preview": body[:200]
                },
                remediation="Use parameterized queries/prepared statements",
                cvss_score=9.8
            )
 
    # Time-based detection
    if "SLEEP" in payload or "WAITFOR" in payload:
        if elapsed > 4.5:  # Expected delay is 5 seconds
            return SecurityFinding(
                finding_id=f"SQLI-{uuid.uuid4().hex[:8]}",
                vulnerability_type=VulnerabilityType.SECURITY_MISCONFIG,
                severity=Severity.CRITICAL,
                endpoint=path,
                method=method,
                description=f"SQL injection via {param} parameter (time-based)",
                evidence={
                    "payload": payload,
                    "response_time": elapsed
                },
                remediation="Use parameterized queries/prepared statements",
                cvss_score=9.8
            )
 
    # Boolean-based detection handled separately
    return None

Rate Limiting Testing

Rate Limit Bypass

# rate_limit_tests.py
async def test_rate_limiting(self, endpoint: Dict) -> Optional[SecurityFinding]:
    """Test for missing or bypassable rate limiting"""
 
    path = endpoint.get('path', '')
    method = endpoint.get('method', 'GET')
 
    # Test 1: Rapid requests
    request_count = 100
    successful_requests = 0
    rate_limited = False
 
    for i in range(request_count):
        async with self.session.request(
            method,
            f"{self.base_url}{path}",
            headers=self._get_auth_headers()
        ) as response:
 
            if response.status == 429:
                rate_limited = True
                break
            elif response.status in [200, 201, 204]:
                successful_requests += 1
 
    if not rate_limited and successful_requests >= request_count:
        return SecurityFinding(
            finding_id=f"RATE-{uuid.uuid4().hex[:8]}",
            vulnerability_type=VulnerabilityType.UNRESTRICTED_RESOURCE,
            severity=Severity.MEDIUM,
            endpoint=path,
            method=method,
            description="No rate limiting detected on endpoint",
            evidence={
                "requests_sent": request_count,
                "successful_requests": successful_requests
            },
            remediation="Implement rate limiting (e.g., 100 requests/minute per IP/user)"
        )
 
    # Test 2: Rate limit bypass via headers
    bypass_headers = [
        {"X-Forwarded-For": "127.0.0.1"},
        {"X-Real-IP": "10.0.0.1"},
        {"X-Originating-IP": "192.168.1.1"},
        {"X-Client-IP": "172.16.0.1"},
        {"True-Client-IP": "8.8.8.8"}
    ]
 
    for bypass_header in bypass_headers:
        headers = {**self._get_auth_headers(), **bypass_header}
 
        # Send requests quickly
        success_count = 0
        for _ in range(20):
            async with self.session.request(
                method,
                f"{self.base_url}{path}",
                headers=headers
            ) as response:
                if response.status in [200, 201, 204]:
                    success_count += 1
 
        if success_count >= 20:
            return SecurityFinding(
                finding_id=f"RATE-BYPASS-{uuid.uuid4().hex[:8]}",
                vulnerability_type=VulnerabilityType.UNRESTRICTED_RESOURCE,
                severity=Severity.HIGH,
                endpoint=path,
                method=method,
                description=f"Rate limiting bypassed via {list(bypass_header.keys())[0]}",
                evidence={
                    "bypass_header": bypass_header,
                    "successful_requests": success_count
                },
                remediation="Implement rate limiting based on authenticated user ID, not client-provided headers"
            )
 
    return None

CI/CD Integration

GitHub Actions Security Pipeline

# .github/workflows/api-security-tests.yml
name: API Security Tests
 
on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 2 * * *'  # Daily at 2 AM
 
jobs:
  api-security-scan:
    runs-on: ubuntu-latest
 
    services:
      app:
        image: ${{ github.repository }}:${{ github.sha }}
        ports:
          - 3000:3000
        env:
          NODE_ENV: test
 
    steps:
      - uses: actions/checkout@v4
 
      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
 
      - name: Install dependencies
        run: |
          pip install aiohttp pytest pytest-asyncio
 
      - name: Wait for API
        run: |
          timeout 60 bash -c 'until curl -s http://localhost:3000/health; do sleep 2; done'
 
      - name: Run API Security Tests
        run: |
          python -m pytest tests/security/ \
            --api-url=http://localhost:3000 \
            --junit-xml=security-results.xml \
            --html=security-report.html
 
      - name: Run OWASP ZAP Scan
        uses: zaproxy/action-full-scan@v0.9.0
        with:
          target: 'http://localhost:3000'
          rules_file_name: '.zap/rules.tsv'
          cmd_options: '-a'
 
      - name: Upload Security Report
        uses: actions/upload-artifact@v4
        if: always()
        with:
          name: security-reports
          path: |
            security-results.xml
            security-report.html
            zap-report.html
 
      - name: Check for Critical Findings
        run: |
          python scripts/check_security_findings.py security-results.xml

Security Check Script

# scripts/check_security_findings.py
import xml.etree.ElementTree as ET
import sys
 
def check_findings(report_path: str) -> int:
    """Check security findings and fail on critical issues"""
 
    tree = ET.parse(report_path)
    root = tree.getroot()
 
    critical_count = 0
    high_count = 0
 
    for testcase in root.findall('.//testcase'):
        failure = testcase.find('failure')
        if failure is not None:
            message = failure.get('message', '')
            if 'CRITICAL' in message:
                critical_count += 1
                print(f"❌ CRITICAL: {testcase.get('name')}")
            elif 'HIGH' in message:
                high_count += 1
                print(f"⚠️  HIGH: {testcase.get('name')}")
 
    print(f"\nSummary: {critical_count} critical, {high_count} high severity findings")
 
    # Fail pipeline on critical findings
    if critical_count > 0:
        print("\n❌ Pipeline failed due to critical security findings")
        return 1
 
    # Warn but don't fail on high findings
    if high_count > 0:
        print("\n⚠️  Warning: High severity findings detected")
 
    return 0
 
if __name__ == "__main__":
    sys.exit(check_findings(sys.argv[1]))

Summary

Comprehensive API security testing should cover:

  1. BOLA/IDOR: Test horizontal and vertical authorization
  2. Authentication: JWT validation, token expiry, algorithm attacks
  3. Injection: SQL, NoSQL, command injection
  4. Rate Limiting: Bypass techniques and resource exhaustion
  5. SSRF: Internal network access attempts
  6. Mass Assignment: Property-level authorization

Integrate these tests into CI/CD to catch vulnerabilities before deployment. Regular testing against the OWASP API Security Top 10 provides comprehensive coverage of common API vulnerabilities.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.