API security testing must be automated and integrated into CI/CD pipelines to catch vulnerabilities before production. This guide covers comprehensive testing for OWASP API Top 10 vulnerabilities with practical automation frameworks.
API Security Testing Framework
Core Testing Architecture
# api_security_tester.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from enum import Enum
import aiohttp
import asyncio
import json
from datetime import datetime
import uuid
class VulnerabilityType(Enum):
BOLA = "API1:2023 Broken Object Level Authorization"
BROKEN_AUTH = "API2:2023 Broken Authentication"
BOPLA = "API3:2023 Broken Object Property Level Authorization"
UNRESTRICTED_RESOURCE = "API4:2023 Unrestricted Resource Consumption"
BFLA = "API5:2023 Broken Function Level Authorization"
SSRF = "API6:2023 Server Side Request Forgery"
SECURITY_MISCONFIG = "API7:2023 Security Misconfiguration"
LACK_OF_PROTECTION = "API8:2023 Lack of Protection from Automated Threats"
IMPROPER_INVENTORY = "API9:2023 Improper Inventory Management"
UNSAFE_CONSUMPTION = "API10:2023 Unsafe Consumption of APIs"
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class SecurityFinding:
finding_id: str
vulnerability_type: VulnerabilityType
severity: Severity
endpoint: str
method: str
description: str
evidence: Dict
remediation: str
cvss_score: Optional[float] = None
@dataclass
class TestCase:
test_id: str
name: str
vulnerability_type: VulnerabilityType
test_function: Callable
severity: Severity
description: str
@dataclass
class APISecurityReport:
report_id: str
target_api: str
scan_start: datetime
scan_end: datetime
findings: List[SecurityFinding]
endpoints_tested: int
tests_executed: int
risk_score: float
class APISecurityTester:
"""Comprehensive API security testing framework"""
def __init__(self, base_url: str, auth_config: Dict = None):
self.base_url = base_url.rstrip('/')
self.auth_config = auth_config or {}
self.findings: List[SecurityFinding] = []
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
await self.session.close()
async def run_full_scan(self, endpoints: List[Dict]) -> APISecurityReport:
"""Run complete security scan"""
scan_start = datetime.utcnow()
tests_executed = 0
for endpoint in endpoints:
# Run all applicable tests
tests = self._get_tests_for_endpoint(endpoint)
for test in tests:
finding = await test.test_function(self, endpoint)
if finding:
self.findings.append(finding)
tests_executed += 1
scan_end = datetime.utcnow()
return APISecurityReport(
report_id=str(uuid.uuid4()),
target_api=self.base_url,
scan_start=scan_start,
scan_end=scan_end,
findings=self.findings,
endpoints_tested=len(endpoints),
tests_executed=tests_executed,
risk_score=self._calculate_risk_score()
)
def _get_tests_for_endpoint(self, endpoint: Dict) -> List[TestCase]:
"""Get applicable tests for endpoint"""
tests = [
TestCase("BOLA-01", "IDOR Test", VulnerabilityType.BOLA,
self.test_bola, Severity.HIGH, "Test for broken object level authorization"),
TestCase("AUTH-01", "Auth Bypass", VulnerabilityType.BROKEN_AUTH,
self.test_auth_bypass, Severity.CRITICAL, "Test authentication bypass"),
TestCase("RATE-01", "Rate Limiting", VulnerabilityType.UNRESTRICTED_RESOURCE,
self.test_rate_limiting, Severity.MEDIUM, "Test rate limiting"),
TestCase("INJECT-01", "SQL Injection", VulnerabilityType.SECURITY_MISCONFIG,
self.test_sql_injection, Severity.CRITICAL, "Test SQL injection"),
]
return tests
def _calculate_risk_score(self) -> float:
"""Calculate overall risk score"""
if not self.findings:
return 0.0
severity_weights = {
Severity.CRITICAL: 10,
Severity.HIGH: 7,
Severity.MEDIUM: 4,
Severity.LOW: 1,
Severity.INFO: 0
}
total = sum(severity_weights[f.severity] for f in self.findings)
max_possible = len(self.findings) * 10
return min(100, (total / max_possible) * 100) if max_possible > 0 else 0BOLA (Broken Object Level Authorization) Testing
IDOR Detection
# bola_tests.py
async def test_bola(self, endpoint: Dict) -> Optional[SecurityFinding]:
"""Test for Broken Object Level Authorization (BOLA/IDOR)"""
# Extract ID parameter patterns
path = endpoint.get('path', '')
method = endpoint.get('method', 'GET')
# Skip if no ID in path
if not any(p in path for p in ['{id}', '{userId}', '{orderId}', '<id>']):
return None
# Get valid IDs for authenticated user
valid_ids = await self._get_user_resources(endpoint)
if not valid_ids:
return None
# Test accessing other users' resources
test_cases = [
# Increment ID
str(int(valid_ids[0]) + 1) if valid_ids[0].isdigit() else None,
# Common test IDs
"1", "0", "999999",
# UUID manipulation
valid_ids[0].replace(valid_ids[0][-1], 'a') if len(valid_ids[0]) > 10 else None,
]
for test_id in filter(None, test_cases):
test_path = self._replace_id(path, test_id)
url = f"{self.base_url}{test_path}"
try:
async with self.session.request(
method,
url,
headers=self._get_auth_headers()
) as response:
if response.status == 200:
body = await response.json()
# Check if we accessed another user's data
if self._is_unauthorized_access(body, test_id):
return SecurityFinding(
finding_id=f"BOLA-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.BOLA,
severity=Severity.HIGH,
endpoint=path,
method=method,
description=f"IDOR vulnerability: Able to access resource {test_id}",
evidence={
"tested_id": test_id,
"response_status": response.status,
"response_preview": str(body)[:200]
},
remediation="Implement proper authorization checks verifying resource ownership"
)
except Exception as e:
continue
return None
async def _get_user_resources(self, endpoint: Dict) -> List[str]:
"""Get valid resource IDs for current user"""
# Implementation depends on API structure
list_endpoint = endpoint.get('list_endpoint')
if not list_endpoint:
return []
async with self.session.get(
f"{self.base_url}{list_endpoint}",
headers=self._get_auth_headers()
) as response:
if response.status == 200:
data = await response.json()
return [str(item.get('id')) for item in data.get('items', data)]
return []
def _replace_id(self, path: str, new_id: str) -> str:
"""Replace ID placeholder in path"""
import re
patterns = [r'\{[^}]*[iI]d[^}]*\}', r'<[^>]*[iI]d[^>]*>', r'/\d+(/|$)']
for pattern in patterns:
path = re.sub(pattern, f'/{new_id}/', path)
return path.replace('//', '/')
def _is_unauthorized_access(self, body: Dict, test_id: str) -> bool:
"""Check if response indicates unauthorized access to another user's data"""
# Check if response contains the test ID (indicating we got someone else's data)
body_str = json.dumps(body)
return test_id in body_strAuthentication Testing
Auth Bypass Detection
# auth_tests.py
async def test_auth_bypass(self, endpoint: Dict) -> Optional[SecurityFinding]:
"""Test for authentication bypass vulnerabilities"""
path = endpoint.get('path', '')
method = endpoint.get('method', 'GET')
# Skip public endpoints
if endpoint.get('public', False):
return None
findings = []
# Test 1: No authentication
async with self.session.request(
method,
f"{self.base_url}{path}",
headers={"Content-Type": "application/json"}
) as response:
if response.status == 200:
findings.append({
"test": "no_auth",
"description": "Endpoint accessible without authentication",
"status": response.status
})
# Test 2: Invalid token
async with self.session.request(
method,
f"{self.base_url}{path}",
headers={
"Authorization": "Bearer invalid_token_12345",
"Content-Type": "application/json"
}
) as response:
if response.status == 200:
findings.append({
"test": "invalid_token",
"description": "Endpoint accepts invalid token",
"status": response.status
})
# Test 3: Expired token format
expired_token = self._generate_expired_jwt()
async with self.session.request(
method,
f"{self.base_url}{path}",
headers={
"Authorization": f"Bearer {expired_token}",
"Content-Type": "application/json"
}
) as response:
if response.status == 200:
findings.append({
"test": "expired_token",
"description": "Endpoint accepts expired token",
"status": response.status
})
# Test 4: JWT none algorithm
none_alg_token = self._generate_none_alg_jwt()
async with self.session.request(
method,
f"{self.base_url}{path}",
headers={
"Authorization": f"Bearer {none_alg_token}",
"Content-Type": "application/json"
}
) as response:
if response.status == 200:
findings.append({
"test": "jwt_none_alg",
"description": "Endpoint accepts JWT with 'none' algorithm",
"status": response.status
})
if findings:
return SecurityFinding(
finding_id=f"AUTH-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.BROKEN_AUTH,
severity=Severity.CRITICAL,
endpoint=path,
method=method,
description="Authentication bypass vulnerability detected",
evidence={"tests_failed": findings},
remediation="Implement proper JWT validation including algorithm verification and expiry checks"
)
return None
def _generate_expired_jwt(self) -> str:
"""Generate expired JWT for testing"""
import base64
import json
header = {"alg": "HS256", "typ": "JWT"}
payload = {
"sub": "test_user",
"exp": 1000000000, # Expired
"iat": 1000000000
}
header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
return f"{header_b64}.{payload_b64}.fake_signature"
def _generate_none_alg_jwt(self) -> str:
"""Generate JWT with 'none' algorithm"""
import base64
import json
header = {"alg": "none", "typ": "JWT"}
payload = {
"sub": "admin",
"role": "admin",
"exp": 9999999999
}
header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
return f"{header_b64}.{payload_b64}."Injection Testing
SQL Injection Detection
# injection_tests.py
async def test_sql_injection(self, endpoint: Dict) -> Optional[SecurityFinding]:
"""Test for SQL injection vulnerabilities"""
path = endpoint.get('path', '')
method = endpoint.get('method', 'GET')
params = endpoint.get('parameters', [])
sql_payloads = [
"' OR '1'='1",
"' OR '1'='1' --",
"'; DROP TABLE users; --",
"1' AND SLEEP(5) --",
"1 UNION SELECT NULL,NULL,NULL --",
"' UNION SELECT username,password FROM users --",
"1'; WAITFOR DELAY '0:0:5' --",
"1 AND 1=1",
"1 AND 1=2",
]
for param in params:
for payload in sql_payloads:
start_time = datetime.utcnow()
if method == 'GET':
test_url = f"{self.base_url}{path}?{param['name']}={payload}"
async with self.session.get(
test_url,
headers=self._get_auth_headers()
) as response:
elapsed = (datetime.utcnow() - start_time).total_seconds()
body = await response.text()
finding = self._analyze_sqli_response(
response.status, body, elapsed, payload, path, method, param['name']
)
if finding:
return finding
elif method == 'POST':
test_body = {param['name']: payload}
async with self.session.post(
f"{self.base_url}{path}",
headers=self._get_auth_headers(),
json=test_body
) as response:
elapsed = (datetime.utcnow() - start_time).total_seconds()
body = await response.text()
finding = self._analyze_sqli_response(
response.status, body, elapsed, payload, path, method, param['name']
)
if finding:
return finding
return None
def _analyze_sqli_response(
self,
status: int,
body: str,
elapsed: float,
payload: str,
path: str,
method: str,
param: str
) -> Optional[SecurityFinding]:
"""Analyze response for SQL injection indicators"""
# Error-based detection
sql_errors = [
"sql syntax", "mysql", "sqlite", "postgresql", "oracle",
"mssql", "syntax error", "unclosed quotation",
"quoted string not properly terminated"
]
body_lower = body.lower()
for error in sql_errors:
if error in body_lower:
return SecurityFinding(
finding_id=f"SQLI-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.SECURITY_MISCONFIG,
severity=Severity.CRITICAL,
endpoint=path,
method=method,
description=f"SQL injection via {param} parameter (error-based)",
evidence={
"payload": payload,
"error_indicator": error,
"response_preview": body[:200]
},
remediation="Use parameterized queries/prepared statements",
cvss_score=9.8
)
# Time-based detection
if "SLEEP" in payload or "WAITFOR" in payload:
if elapsed > 4.5: # Expected delay is 5 seconds
return SecurityFinding(
finding_id=f"SQLI-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.SECURITY_MISCONFIG,
severity=Severity.CRITICAL,
endpoint=path,
method=method,
description=f"SQL injection via {param} parameter (time-based)",
evidence={
"payload": payload,
"response_time": elapsed
},
remediation="Use parameterized queries/prepared statements",
cvss_score=9.8
)
# Boolean-based detection handled separately
return NoneRate Limiting Testing
Rate Limit Bypass
# rate_limit_tests.py
async def test_rate_limiting(self, endpoint: Dict) -> Optional[SecurityFinding]:
"""Test for missing or bypassable rate limiting"""
path = endpoint.get('path', '')
method = endpoint.get('method', 'GET')
# Test 1: Rapid requests
request_count = 100
successful_requests = 0
rate_limited = False
for i in range(request_count):
async with self.session.request(
method,
f"{self.base_url}{path}",
headers=self._get_auth_headers()
) as response:
if response.status == 429:
rate_limited = True
break
elif response.status in [200, 201, 204]:
successful_requests += 1
if not rate_limited and successful_requests >= request_count:
return SecurityFinding(
finding_id=f"RATE-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.UNRESTRICTED_RESOURCE,
severity=Severity.MEDIUM,
endpoint=path,
method=method,
description="No rate limiting detected on endpoint",
evidence={
"requests_sent": request_count,
"successful_requests": successful_requests
},
remediation="Implement rate limiting (e.g., 100 requests/minute per IP/user)"
)
# Test 2: Rate limit bypass via headers
bypass_headers = [
{"X-Forwarded-For": "127.0.0.1"},
{"X-Real-IP": "10.0.0.1"},
{"X-Originating-IP": "192.168.1.1"},
{"X-Client-IP": "172.16.0.1"},
{"True-Client-IP": "8.8.8.8"}
]
for bypass_header in bypass_headers:
headers = {**self._get_auth_headers(), **bypass_header}
# Send requests quickly
success_count = 0
for _ in range(20):
async with self.session.request(
method,
f"{self.base_url}{path}",
headers=headers
) as response:
if response.status in [200, 201, 204]:
success_count += 1
if success_count >= 20:
return SecurityFinding(
finding_id=f"RATE-BYPASS-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.UNRESTRICTED_RESOURCE,
severity=Severity.HIGH,
endpoint=path,
method=method,
description=f"Rate limiting bypassed via {list(bypass_header.keys())[0]}",
evidence={
"bypass_header": bypass_header,
"successful_requests": success_count
},
remediation="Implement rate limiting based on authenticated user ID, not client-provided headers"
)
return NoneCI/CD Integration
GitHub Actions Security Pipeline
# .github/workflows/api-security-tests.yml
name: API Security Tests
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
schedule:
- cron: '0 2 * * *' # Daily at 2 AM
jobs:
api-security-scan:
runs-on: ubuntu-latest
services:
app:
image: ${{ github.repository }}:${{ github.sha }}
ports:
- 3000:3000
env:
NODE_ENV: test
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install aiohttp pytest pytest-asyncio
- name: Wait for API
run: |
timeout 60 bash -c 'until curl -s http://localhost:3000/health; do sleep 2; done'
- name: Run API Security Tests
run: |
python -m pytest tests/security/ \
--api-url=http://localhost:3000 \
--junit-xml=security-results.xml \
--html=security-report.html
- name: Run OWASP ZAP Scan
uses: zaproxy/action-full-scan@v0.9.0
with:
target: 'http://localhost:3000'
rules_file_name: '.zap/rules.tsv'
cmd_options: '-a'
- name: Upload Security Report
uses: actions/upload-artifact@v4
if: always()
with:
name: security-reports
path: |
security-results.xml
security-report.html
zap-report.html
- name: Check for Critical Findings
run: |
python scripts/check_security_findings.py security-results.xmlSecurity Check Script
# scripts/check_security_findings.py
import xml.etree.ElementTree as ET
import sys
def check_findings(report_path: str) -> int:
"""Check security findings and fail on critical issues"""
tree = ET.parse(report_path)
root = tree.getroot()
critical_count = 0
high_count = 0
for testcase in root.findall('.//testcase'):
failure = testcase.find('failure')
if failure is not None:
message = failure.get('message', '')
if 'CRITICAL' in message:
critical_count += 1
print(f"❌ CRITICAL: {testcase.get('name')}")
elif 'HIGH' in message:
high_count += 1
print(f"⚠️ HIGH: {testcase.get('name')}")
print(f"\nSummary: {critical_count} critical, {high_count} high severity findings")
# Fail pipeline on critical findings
if critical_count > 0:
print("\n❌ Pipeline failed due to critical security findings")
return 1
# Warn but don't fail on high findings
if high_count > 0:
print("\n⚠️ Warning: High severity findings detected")
return 0
if __name__ == "__main__":
sys.exit(check_findings(sys.argv[1]))Summary
Comprehensive API security testing should cover:
- BOLA/IDOR: Test horizontal and vertical authorization
- Authentication: JWT validation, token expiry, algorithm attacks
- Injection: SQL, NoSQL, command injection
- Rate Limiting: Bypass techniques and resource exhaustion
- SSRF: Internal network access attempts
- Mass Assignment: Property-level authorization
Integrate these tests into CI/CD to catch vulnerabilities before deployment. Regular testing against the OWASP API Security Top 10 provides comprehensive coverage of common API vulnerabilities.