Infrastructure as Code security scanning catches misconfigurations before deployment. This guide covers practical implementations for securing Terraform, CloudFormation, and Kubernetes configurations.
Multi-Framework IaC Scanner
Build a scanner supporting multiple IaC formats:
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from abc import ABC, abstractmethod
import json
import yaml
import re
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
@dataclass
class Finding:
rule_id: str
severity: Severity
resource_type: str
resource_name: str
file_path: str
line_number: int
description: str
remediation: str
framework: str
class IaCParser(ABC):
@abstractmethod
def parse(self, content: str) -> Dict:
pass
@abstractmethod
def get_resources(self, parsed: Dict) -> List[Dict]:
pass
class TerraformParser(IaCParser):
def parse(self, content: str) -> Dict:
import hcl2
from io import StringIO
return hcl2.load(StringIO(content))
def get_resources(self, parsed: Dict) -> List[Dict]:
resources = []
for resource_block in parsed.get('resource', []):
for res_type, res_instances in resource_block.items():
for res_name, config in res_instances.items():
resources.append({
'type': res_type,
'name': res_name,
'config': config,
'framework': 'terraform'
})
return resources
class CloudFormationParser(IaCParser):
def parse(self, content: str) -> Dict:
if content.strip().startswith('{'):
return json.loads(content)
return yaml.safe_load(content)
def get_resources(self, parsed: Dict) -> List[Dict]:
resources = []
for res_name, res_config in parsed.get('Resources', {}).items():
resources.append({
'type': res_config.get('Type', ''),
'name': res_name,
'config': res_config.get('Properties', {}),
'framework': 'cloudformation'
})
return resources
class KubernetesParser(IaCParser):
def parse(self, content: str) -> Dict:
return yaml.safe_load(content)
def get_resources(self, parsed: Dict) -> List[Dict]:
return [{
'type': parsed.get('kind', ''),
'name': parsed.get('metadata', {}).get('name', ''),
'config': parsed.get('spec', {}),
'metadata': parsed.get('metadata', {}),
'framework': 'kubernetes'
}]
class IaCSecurityScanner:
def __init__(self):
self.parsers = {
'terraform': TerraformParser(),
'cloudformation': CloudFormationParser(),
'kubernetes': KubernetesParser()
}
self.rules = self._load_rules()
self.findings: List[Finding] = []
def _load_rules(self) -> List[Dict]:
return [
# AWS S3 Rules
{
"rule_id": "AWS-S3-001",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_s3_bucket", "AWS::S3::Bucket"],
"check": self._check_s3_public,
"description": "S3 bucket allows public access",
"remediation": "Block public access using bucket policy or ACL"
},
{
"rule_id": "AWS-S3-002",
"severity": Severity.HIGH,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_s3_bucket", "AWS::S3::Bucket"],
"check": self._check_s3_encryption,
"description": "S3 bucket lacks server-side encryption",
"remediation": "Enable SSE-S3 or SSE-KMS encryption"
},
# AWS Security Group Rules
{
"rule_id": "AWS-SG-001",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_security_group", "AWS::EC2::SecurityGroup"],
"check": self._check_sg_open_ingress,
"description": "Security group allows unrestricted ingress (0.0.0.0/0)",
"remediation": "Restrict ingress to specific IP ranges"
},
{
"rule_id": "AWS-SG-002",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_security_group", "AWS::EC2::SecurityGroup"],
"check": self._check_sg_ssh_open,
"description": "SSH port 22 open to the internet",
"remediation": "Restrict SSH to bastion hosts or VPN IPs"
},
# AWS RDS Rules
{
"rule_id": "AWS-RDS-001",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_db_instance", "AWS::RDS::DBInstance"],
"check": self._check_rds_public,
"description": "RDS instance is publicly accessible",
"remediation": "Set publicly_accessible to false"
},
{
"rule_id": "AWS-RDS-002",
"severity": Severity.HIGH,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_db_instance", "AWS::RDS::DBInstance"],
"check": self._check_rds_encryption,
"description": "RDS instance lacks encryption at rest",
"remediation": "Enable storage_encrypted"
},
# Kubernetes Rules
{
"rule_id": "K8S-POD-001",
"severity": Severity.CRITICAL,
"frameworks": ["kubernetes"],
"resource_types": ["Pod", "Deployment", "StatefulSet"],
"check": self._check_k8s_privileged,
"description": "Container running in privileged mode",
"remediation": "Set securityContext.privileged to false"
},
{
"rule_id": "K8S-POD-002",
"severity": Severity.HIGH,
"frameworks": ["kubernetes"],
"resource_types": ["Pod", "Deployment", "StatefulSet"],
"check": self._check_k8s_root,
"description": "Container running as root",
"remediation": "Set runAsNonRoot to true"
},
{
"rule_id": "K8S-POD-003",
"severity": Severity.MEDIUM,
"frameworks": ["kubernetes"],
"resource_types": ["Pod", "Deployment", "StatefulSet"],
"check": self._check_k8s_resources,
"description": "Container lacks resource limits",
"remediation": "Define CPU and memory limits"
}
]
def scan_file(self, file_path: str, framework: str) -> List[Finding]:
"""Scan a single file for security issues."""
with open(file_path, 'r') as f:
content = f.read()
parser = self.parsers.get(framework)
if not parser:
raise ValueError(f"Unsupported framework: {framework}")
try:
parsed = parser.parse(content)
resources = parser.get_resources(parsed)
except Exception as e:
print(f"Failed to parse {file_path}: {e}")
return []
findings = []
for resource in resources:
for rule in self.rules:
if framework not in rule["frameworks"]:
continue
if resource['type'] not in rule["resource_types"]:
continue
if rule["check"](resource['config'], framework):
finding = Finding(
rule_id=rule["rule_id"],
severity=rule["severity"],
resource_type=resource['type'],
resource_name=resource['name'],
file_path=file_path,
line_number=self._find_line(content, resource['name']),
description=rule["description"],
remediation=rule["remediation"],
framework=framework
)
findings.append(finding)
self.findings.extend(findings)
return findings
def _find_line(self, content: str, name: str) -> int:
lines = content.split('\n')
for i, line in enumerate(lines):
if name in line:
return i + 1
return 0
# Check functions
def _check_s3_public(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
acl = config.get('acl', '')
return acl in ['public-read', 'public-read-write']
elif framework == 'cloudformation':
acl = config.get('AccessControl', '')
return acl in ['PublicRead', 'PublicReadWrite']
return False
def _check_s3_encryption(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
return 'server_side_encryption_configuration' not in str(config)
elif framework == 'cloudformation':
return 'BucketEncryption' not in config
return False
def _check_sg_open_ingress(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
for ingress in config.get('ingress', []):
if '0.0.0.0/0' in str(ingress.get('cidr_blocks', [])):
return True
elif framework == 'cloudformation':
for ingress in config.get('SecurityGroupIngress', []):
if ingress.get('CidrIp') == '0.0.0.0/0':
return True
return False
def _check_sg_ssh_open(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
for ingress in config.get('ingress', []):
from_port = ingress.get('from_port', 0)
to_port = ingress.get('to_port', 0)
cidrs = ingress.get('cidr_blocks', [])
if from_port <= 22 <= to_port and '0.0.0.0/0' in cidrs:
return True
elif framework == 'cloudformation':
for ingress in config.get('SecurityGroupIngress', []):
from_port = ingress.get('FromPort', 0)
to_port = ingress.get('ToPort', 0)
cidr = ingress.get('CidrIp', '')
if from_port <= 22 <= to_port and cidr == '0.0.0.0/0':
return True
return False
def _check_rds_public(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
return config.get('publicly_accessible', False) is True
elif framework == 'cloudformation':
return config.get('PubliclyAccessible', False) is True
return False
def _check_rds_encryption(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
return config.get('storage_encrypted', False) is False
elif framework == 'cloudformation':
return config.get('StorageEncrypted', False) is False
return False
def _check_k8s_privileged(self, config: Dict, framework: str) -> bool:
containers = self._get_k8s_containers(config)
for container in containers:
sc = container.get('securityContext', {})
if sc.get('privileged', False):
return True
return False
def _check_k8s_root(self, config: Dict, framework: str) -> bool:
containers = self._get_k8s_containers(config)
for container in containers:
sc = container.get('securityContext', {})
if sc.get('runAsNonRoot') is not True:
return True
return False
def _check_k8s_resources(self, config: Dict, framework: str) -> bool:
containers = self._get_k8s_containers(config)
for container in containers:
resources = container.get('resources', {})
if not resources.get('limits'):
return True
return False
def _get_k8s_containers(self, config: Dict) -> List[Dict]:
# Handle both Pod and Deployment/StatefulSet
if 'containers' in config:
return config.get('containers', [])
elif 'template' in config:
return config.get('template', {}).get('spec', {}).get('containers', [])
return []
def generate_report(self) -> Dict:
"""Generate scan report."""
severity_counts = {s.value: 0 for s in Severity}
for finding in self.findings:
severity_counts[finding.severity.value] += 1
return {
"scan_summary": {
"total_findings": len(self.findings),
"by_severity": severity_counts,
"passed": severity_counts['critical'] == 0 and severity_counts['high'] == 0
},
"findings": [
{
"rule_id": f.rule_id,
"severity": f.severity.value,
"resource": f"{f.resource_type}.{f.resource_name}",
"file": f.file_path,
"line": f.line_number,
"description": f.description,
"remediation": f.remediation,
"framework": f.framework
}
for f in sorted(self.findings, key=lambda x: Severity[x.severity.name].value, reverse=True)
]
}CI/CD Integration
Integrate scanning into CI/CD pipelines:
# .github/workflows/iac-security.yml
name: IaC Security Scan
on:
pull_request:
paths:
- 'terraform/**'
- 'cloudformation/**'
- 'kubernetes/**'
push:
branches: [main]
jobs:
scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install checkov tfsec
- name: Run Checkov
run: |
checkov -d . --framework terraform cloudformation kubernetes \
--output sarif --output-file checkov-results.sarif
- name: Run tfsec
run: |
tfsec terraform/ --format sarif > tfsec-results.sarif
- name: Upload SARIF results
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: checkov-results.sarif
- name: Fail on critical findings
run: |
checkov -d . --framework terraform --check CKV_AWS_19,CKV_AWS_20 \
--hard-fail-on CRITICALConclusion
IaC security scanning prevents misconfigurations from reaching production. Integrate scanning into CI/CD pipelines, use policy as code for organizational standards, and monitor for configuration drift. Start with critical security rules and expand coverage as your program matures.
Is your AI system compliant with the EU AI Act? Free risk assessment - find out in 2 minutes →