API security testing must be automated and integrated into CI/CD pipelines to catch vulnerabilities before production. This guide covers comprehensive testing for OWASP API Top 10 vulnerabilities with practical automation frameworks.
API Security Testing Framework
Core Testing Architecture
# api_security_tester.py
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from enum import Enum
import aiohttp
import asyncio
import json
from datetime import datetime
import uuid
class VulnerabilityType(Enum):
BOLA = "API1:2023 Broken Object Level Authorization"
BROKEN_AUTH = "API2:2023 Broken Authentication"
BOPLA = "API3:2023 Broken Object Property Level Authorization"
UNRESTRICTED_RESOURCE = "API4:2023 Unrestricted Resource Consumption"
BFLA = "API5:2023 Broken Function Level Authorization"
SSRF = "API6:2023 Server Side Request Forgery"
SECURITY_MISCONFIG = "API7:2023 Security Misconfiguration"
LACK_OF_PROTECTION = "API8:2023 Lack of Protection from Automated Threats"
IMPROPER_INVENTORY = "API9:2023 Improper Inventory Management"
UNSAFE_CONSUMPTION = "API10:2023 Unsafe Consumption of APIs"
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class SecurityFinding:
finding_id: str
vulnerability_type: VulnerabilityType
severity: Severity
endpoint: str
method: str
description: str
evidence: Dict
remediation: str
cvss_score: Optional[float] = None
@dataclass
class TestCase:
test_id: str
name: str
vulnerability_type: VulnerabilityType
test_function: Callable
severity: Severity
description: str
@dataclass
class APISecurityReport:
report_id: str
target_api: str
scan_start: datetime
scan_end: datetime
findings: List[SecurityFinding]
endpoints_tested: int
tests_executed: int
risk_score: float
class APISecurityTester:
"""Comprehensive API security testing framework"""
def __init__(self, base_url: str, auth_config: Dict = None):
self.base_url = base_url.rstrip('/')
self.auth_config = auth_config or {}
self.findings: List[SecurityFinding] = []
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, *args):
await self.session.close()
async def run_full_scan(self, endpoints: List[Dict]) -> APISecurityReport:
"""Run complete security scan"""
scan_start = datetime.utcnow()
tests_executed = 0
for endpoint in endpoints:
# Run all applicable tests
tests = self._get_tests_for_endpoint(endpoint)
for test in tests:
finding = await test.test_function(self, endpoint)
if finding:
self.findings.append(finding)
tests_executed += 1
scan_end = datetime.utcnow()
return APISecurityReport(
report_id=str(uuid.uuid4()),
target_api=self.base_url,
scan_start=scan_start,
scan_end=scan_end,
findings=self.findings,
endpoints_tested=len(endpoints),
tests_executed=tests_executed,
risk_score=self._calculate_risk_score()
)
def _get_tests_for_endpoint(self, endpoint: Dict) -> List[TestCase]:
"""Get applicable tests for endpoint"""
tests = [
TestCase("BOLA-01", "IDOR Test", VulnerabilityType.BOLA,
self.test_bola, Severity.HIGH, "Test for broken object level authorization"),
TestCase("AUTH-01", "Auth Bypass", VulnerabilityType.BROKEN_AUTH,
self.test_auth_bypass, Severity.CRITICAL, "Test authentication bypass"),
TestCase("RATE-01", "Rate Limiting", VulnerabilityType.UNRESTRICTED_RESOURCE,
self.test_rate_limiting, Severity.MEDIUM, "Test rate limiting"),
TestCase("INJECT-01", "SQL Injection", VulnerabilityType.SECURITY_MISCONFIG,
self.test_sql_injection, Severity.CRITICAL, "Test SQL injection"),
]
return tests
def _calculate_risk_score(self) -> float:
"""Calculate overall risk score"""
if not self.findings:
return 0.0
severity_weights = {
Severity.CRITICAL: 10,
Severity.HIGH: 7,
Severity.MEDIUM: 4,
Severity.LOW: 1,
Severity.INFO: 0
}
total = sum(severity_weights[f.severity] for f in self.findings)
max_possible = len(self.findings) * 10
return min(100, (total / max_possible) * 100) if max_possible > 0 else 0BOLA (Broken Object Level Authorization) Testing
IDOR Detection
# bola_tests.py
async def test_bola(self, endpoint: Dict) -> Optional[SecurityFinding]:
"""Test for Broken Object Level Authorization (BOLA/IDOR)"""
# Extract ID parameter patterns
path = endpoint.get('path', '')
method = endpoint.get('method', 'GET')
# Skip if no ID in path
if not any(p in path for p in ['{id}', '{userId}', '{orderId}', '<id>']):
return None
# Get valid IDs for authenticated user
valid_ids = await self._get_user_resources(endpoint)
if not valid_ids:
return None
# Test accessing other users' resources
test_cases = [
# Increment ID
str(int(valid_ids[0]) + 1) if valid_ids[0].isdigit() else None,
# Common test IDs
"1", "0", "999999",
# UUID manipulation
valid_ids[0].replace(valid_ids[0][-1], 'a') if len(valid_ids[0]) > 10 else None,
]
for test_id in filter(None, test_cases):
test_path = self._replace_id(path, test_id)
url = f"{self.base_url}{test_path}"
try:
async with self.session.request(
method,
url,
headers=self._get_auth_headers()
) as response:
if response.status == 200:
body = await response.json()
# Check if we accessed another user's data
if self._is_unauthorized_access(body, test_id):
return SecurityFinding(
finding_id=f"BOLA-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.BOLA,
severity=Severity.HIGH,
endpoint=path,
method=method,
description=f"IDOR vulnerability: Able to access resource {test_id}",
evidence={
"tested_id": test_id,
"response_status": response.status,
"response_preview": str(body)[:200]
},
remediation="Implement proper authorization checks verifying resource ownership"
)
except Exception as e:
continue
return None
async def _get_user_resources(self, endpoint: Dict) -> List[str]:
"""Get valid resource IDs for current user"""
# Implementation depends on API structure
list_endpoint = endpoint.get('list_endpoint')
if not list_endpoint:
return []
async with self.session.get(
f"{self.base_url}{list_endpoint}",
headers=self._get_auth_headers()
) as response:
if response.status == 200:
data = await response.json()
return [str(item.get('id')) for item in data.get('items', data)]
return []
def _replace_id(self, path: str, new_id: str) -> str:
"""Replace ID placeholder in path"""
import re
patterns = [r'\{[^}]*[iI]d[^}]*\}', r'<[^>]*[iI]d[^>]*>', r'/\d+(/|$)']
for pattern in patterns:
path = re.sub(pattern, f'/{new_id}/', path)
return path.replace('//', '/')
def _is_unauthorized_access(self, body: Dict, test_id: str) -> bool:
"""Check if response indicates unauthorized access to another user's data"""
# Check if response contains the test ID (indicating we got someone else's data)
body_str = json.dumps(body)
return test_id in body_strAuthentication Testing
Auth Bypass Detection
# auth_tests.py
async def test_auth_bypass(self, endpoint: Dict) -> Optional[SecurityFinding]:
"""Test for authentication bypass vulnerabilities"""
path = endpoint.get('path', '')
method = endpoint.get('method', 'GET')
# Skip public endpoints
if endpoint.get('public', False):
return None
findings = []
# Test 1: No authentication
async with self.session.request(
method,
f"{self.base_url}{path}",
headers={"Content-Type": "application/json"}
) as response:
if response.status == 200:
findings.append({
"test": "no_auth",
"description": "Endpoint accessible without authentication",
"status": response.status
})
# Test 2: Invalid token
async with self.session.request(
method,
f"{self.base_url}{path}",
headers={
"Authorization": "Bearer invalid_token_12345",
"Content-Type": "application/json"
}
) as response:
if response.status == 200:
findings.append({
"test": "invalid_token",
"description": "Endpoint accepts invalid token",
"status": response.status
})
# Test 3: Expired token format
expired_token = self._generate_expired_jwt()
async with self.session.request(
method,
f"{self.base_url}{path}",
headers={
"Authorization": f"Bearer {expired_token}",
"Content-Type": "application/json"
}
) as response:
if response.status == 200:
findings.append({
"test": "expired_token",
"description": "Endpoint accepts expired token",
"status": response.status
})
# Test 4: JWT none algorithm
none_alg_token = self._generate_none_alg_jwt()
async with self.session.request(
method,
f"{self.base_url}{path}",
headers={
"Authorization": f"Bearer {none_alg_token}",
"Content-Type": "application/json"
}
) as response:
if response.status == 200:
findings.append({
"test": "jwt_none_alg",
"description": "Endpoint accepts JWT with 'none' algorithm",
"status": response.status
})
if findings:
return SecurityFinding(
finding_id=f"AUTH-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.BROKEN_AUTH,
severity=Severity.CRITICAL,
endpoint=path,
method=method,
description="Authentication bypass vulnerability detected",
evidence={"tests_failed": findings},
remediation="Implement proper JWT validation including algorithm verification and expiry checks"
)
return None
def _generate_expired_jwt(self) -> str:
"""Generate expired JWT for testing"""
import base64
import json
header = {"alg": "HS256", "typ": "JWT"}
payload = {
"sub": "test_user",
"exp": 1000000000, # Expired
"iat": 1000000000
}
header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
return f"{header_b64}.{payload_b64}.fake_signature"
def _generate_none_alg_jwt(self) -> str:
"""Generate JWT with 'none' algorithm"""
import base64
import json
header = {"alg": "none", "typ": "JWT"}
payload = {
"sub": "admin",
"role": "admin",
"exp": 9999999999
}
header_b64 = base64.urlsafe_b64encode(json.dumps(header).encode()).decode().rstrip('=')
payload_b64 = base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
return f"{header_b64}.{payload_b64}."Injection Testing
SQL Injection Detection
# injection_tests.py
async def test_sql_injection(self, endpoint: Dict) -> Optional[SecurityFinding]:
"""Test for SQL injection vulnerabilities"""
path = endpoint.get('path', '')
method = endpoint.get('method', 'GET')
params = endpoint.get('parameters', [])
sql_payloads = [
"' OR '1'='1",
"' OR '1'='1' --",
"'; DROP TABLE users; --",
"1' AND SLEEP(5) --",
"1 UNION SELECT NULL,NULL,NULL --",
"' UNION SELECT username,password FROM users --",
"1'; WAITFOR DELAY '0:0:5' --",
"1 AND 1=1",
"1 AND 1=2",
]
for param in params:
for payload in sql_payloads:
start_time = datetime.utcnow()
if method == 'GET':
test_url = f"{self.base_url}{path}?{param['name']}={payload}"
async with self.session.get(
test_url,
headers=self._get_auth_headers()
) as response:
elapsed = (datetime.utcnow() - start_time).total_seconds()
body = await response.text()
finding = self._analyze_sqli_response(
response.status, body, elapsed, payload, path, method, param['name']
)
if finding:
return finding
elif method == 'POST':
test_body = {param['name']: payload}
async with self.session.post(
f"{self.base_url}{path}",
headers=self._get_auth_headers(),
json=test_body
) as response:
elapsed = (datetime.utcnow() - start_time).total_seconds()
body = await response.text()
finding = self._analyze_sqli_response(
response.status, body, elapsed, payload, path, method, param['name']
)
if finding:
return finding
return None
def _analyze_sqli_response(
self,
status: int,
body: str,
elapsed: float,
payload: str,
path: str,
method: str,
param: str
) -> Optional[SecurityFinding]:
"""Analyze response for SQL injection indicators"""
# Error-based detection
sql_errors = [
"sql syntax", "mysql", "sqlite", "postgresql", "oracle",
"mssql", "syntax error", "unclosed quotation",
"quoted string not properly terminated"
]
body_lower = body.lower()
for error in sql_errors:
if error in body_lower:
return SecurityFinding(
finding_id=f"SQLI-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.SECURITY_MISCONFIG,
severity=Severity.CRITICAL,
endpoint=path,
method=method,
description=f"SQL injection via {param} parameter (error-based)",
evidence={
"payload": payload,
"error_indicator": error,
"response_preview": body[:200]
},
remediation="Use parameterized queries/prepared statements",
cvss_score=9.8
)
# Time-based detection
if "SLEEP" in payload or "WAITFOR" in payload:
if elapsed > 4.5: # Expected delay is 5 seconds
return SecurityFinding(
finding_id=f"SQLI-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.SECURITY_MISCONFIG,
severity=Severity.CRITICAL,
endpoint=path,
method=method,
description=f"SQL injection via {param} parameter (time-based)",
evidence={
"payload": payload,
"response_time": elapsed
},
remediation="Use parameterized queries/prepared statements",
cvss_score=9.8
)
# Boolean-based detection handled separately
return NoneRate Limiting Testing
Rate Limit Bypass
# rate_limit_tests.py
async def test_rate_limiting(self, endpoint: Dict) -> Optional[SecurityFinding]:
"""Test for missing or bypassable rate limiting"""
path = endpoint.get('path', '')
method = endpoint.get('method', 'GET')
# Test 1: Rapid requests
request_count = 100
successful_requests = 0
rate_limited = False
for i in range(request_count):
async with self.session.request(
method,
f"{self.base_url}{path}",
headers=self._get_auth_headers()
) as response:
if response.status == 429:
rate_limited = True
break
elif response.status in [200, 201, 204]:
successful_requests += 1
if not rate_limited and successful_requests >= request_count:
return SecurityFinding(
finding_id=f"RATE-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.UNRESTRICTED_RESOURCE,
severity=Severity.MEDIUM,
endpoint=path,
method=method,
description="No rate limiting detected on endpoint",
evidence={
"requests_sent": request_count,
"successful_requests": successful_requests
},
remediation="Implement rate limiting (e.g., 100 requests/minute per IP/user)"
)
# Test 2: Rate limit bypass via headers
bypass_headers = [
{"X-Forwarded-For": "127.0.0.1"},
{"X-Real-IP": "10.0.0.1"},
{"X-Originating-IP": "192.168.1.1"},
{"X-Client-IP": "172.16.0.1"},
{"True-Client-IP": "8.8.8.8"}
]
for bypass_header in bypass_headers:
headers = {**self._get_auth_headers(), **bypass_header}
# Send requests quickly
success_count = 0
for _ in range(20):
async with self.session.request(
method,
f"{self.base_url}{path}",
headers=headers
) as response:
if response.status in [200, 201, 204]:
success_count += 1
if success_count >= 20:
return SecurityFinding(
finding_id=f"RATE-BYPASS-{uuid.uuid4().hex[:8]}",
vulnerability_type=VulnerabilityType.UNRESTRICTED_RESOURCE,
severity=Severity.HIGH,
endpoint=path,
method=method,
description=f"Rate limiting bypassed via {list(bypass_header.keys())[0]}",
evidence={
"bypass_header": bypass_header,
"successful_requests": success_count
},
remediation="Implement rate limiting based on authenticated user ID, not client-provided headers"
)
return NoneCI/CD Integration
GitHub Actions Security Pipeline
# .github/workflows/api-security-tests.yml
name: API Security Tests
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
schedule:
- cron: '0 2 * * *' # Daily at 2 AM
jobs:
api-security-scan:
runs-on: ubuntu-latest
services:
app:
image: ${{ github.repository }}:${{ github.sha }}
ports:
- 3000:3000
env:
NODE_ENV: test
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install aiohttp pytest pytest-asyncio
- name: Wait for API
run: |
timeout 60 bash -c 'until curl -s http://localhost:3000/health; do sleep 2; done'
- name: Run API Security Tests
run: |
python -m pytest tests/security/ \
--api-url=http://localhost:3000 \
--junit-xml=security-results.xml \
--html=security-report.html
- name: Run OWASP ZAP Scan
uses: zaproxy/action-full-scan@v0.9.0
with:
target: 'http://localhost:3000'
rules_file_name: '.zap/rules.tsv'
cmd_options: '-a'
- name: Upload Security Report
uses: actions/upload-artifact@v4
if: always()
with:
name: security-reports
path: |
security-results.xml
security-report.html
zap-report.html
- name: Check for Critical Findings
run: |
python scripts/check_security_findings.py security-results.xmlSecurity Check Script
# scripts/check_security_findings.py
import xml.etree.ElementTree as ET
import sys
def check_findings(report_path: str) -> int:
"""Check security findings and fail on critical issues"""
tree = ET.parse(report_path)
root = tree.getroot()
critical_count = 0
high_count = 0
for testcase in root.findall('.//testcase'):
failure = testcase.find('failure')
if failure is not None:
message = failure.get('message', '')
if 'CRITICAL' in message:
critical_count += 1
print(f"❌ CRITICAL: {testcase.get('name')}")
elif 'HIGH' in message:
high_count += 1
print(f"⚠️ HIGH: {testcase.get('name')}")
print(f"\nSummary: {critical_count} critical, {high_count} high severity findings")
# Fail pipeline on critical findings
if critical_count > 0:
print("\n❌ Pipeline failed due to critical security findings")
return 1
# Warn but don't fail on high findings
if high_count > 0:
print("\n⚠️ Warning: High severity findings detected")
return 0
if __name__ == "__main__":
sys.exit(check_findings(sys.argv[1]))Summary
Comprehensive API security testing should cover:
- BOLA/IDOR: Test horizontal and vertical authorization
- Authentication: JWT validation, token expiry, algorithm attacks
- Injection: SQL, NoSQL, command injection
- Rate Limiting: Bypass techniques and resource exhaustion
- SSRF: Internal network access attempts
- Mass Assignment: Property-level authorization
Integrate these tests into CI/CD to catch vulnerabilities before deployment. Regular testing against the OWASP API Security Top 10 provides comprehensive coverage of common API vulnerabilities.
Is your AI system compliant with the EU AI Act? Free risk assessment - find out in 2 minutes →