Infrastructure as Code security scanning catches misconfigurations before deployment. This guide covers practical implementations for securing Terraform, CloudFormation, and Kubernetes configurations.
Multi-Framework IaC Scanner
Build a scanner supporting multiple IaC formats:
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from abc import ABC, abstractmethod
import json
import yaml
import re
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
@dataclass
class Finding:
rule_id: str
severity: Severity
resource_type: str
resource_name: str
file_path: str
line_number: int
description: str
remediation: str
framework: str
class IaCParser(ABC):
@abstractmethod
def parse(self, content: str) -> Dict:
pass
@abstractmethod
def get_resources(self, parsed: Dict) -> List[Dict]:
pass
class TerraformParser(IaCParser):
def parse(self, content: str) -> Dict:
import hcl2
from io import StringIO
return hcl2.load(StringIO(content))
def get_resources(self, parsed: Dict) -> List[Dict]:
resources = []
for resource_block in parsed.get('resource', []):
for res_type, res_instances in resource_block.items():
for res_name, config in res_instances.items():
resources.append({
'type': res_type,
'name': res_name,
'config': config,
'framework': 'terraform'
})
return resources
class CloudFormationParser(IaCParser):
def parse(self, content: str) -> Dict:
if content.strip().startswith('{'):
return json.loads(content)
return yaml.safe_load(content)
def get_resources(self, parsed: Dict) -> List[Dict]:
resources = []
for res_name, res_config in parsed.get('Resources', {}).items():
resources.append({
'type': res_config.get('Type', ''),
'name': res_name,
'config': res_config.get('Properties', {}),
'framework': 'cloudformation'
})
return resources
class KubernetesParser(IaCParser):
def parse(self, content: str) -> Dict:
return yaml.safe_load(content)
def get_resources(self, parsed: Dict) -> List[Dict]:
return [{
'type': parsed.get('kind', ''),
'name': parsed.get('metadata', {}).get('name', ''),
'config': parsed.get('spec', {}),
'metadata': parsed.get('metadata', {}),
'framework': 'kubernetes'
}]
class IaCSecurityScanner:
def __init__(self):
self.parsers = {
'terraform': TerraformParser(),
'cloudformation': CloudFormationParser(),
'kubernetes': KubernetesParser()
}
self.rules = self._load_rules()
self.findings: List[Finding] = []
def _load_rules(self) -> List[Dict]:
return [
# AWS S3 Rules
{
"rule_id": "AWS-S3-001",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_s3_bucket", "AWS::S3::Bucket"],
"check": self._check_s3_public,
"description": "S3 bucket allows public access",
"remediation": "Block public access using bucket policy or ACL"
},
{
"rule_id": "AWS-S3-002",
"severity": Severity.HIGH,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_s3_bucket", "AWS::S3::Bucket"],
"check": self._check_s3_encryption,
"description": "S3 bucket lacks server-side encryption",
"remediation": "Enable SSE-S3 or SSE-KMS encryption"
},
# AWS Security Group Rules
{
"rule_id": "AWS-SG-001",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_security_group", "AWS::EC2::SecurityGroup"],
"check": self._check_sg_open_ingress,
"description": "Security group allows unrestricted ingress (0.0.0.0/0)",
"remediation": "Restrict ingress to specific IP ranges"
},
{
"rule_id": "AWS-SG-002",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_security_group", "AWS::EC2::SecurityGroup"],
"check": self._check_sg_ssh_open,
"description": "SSH port 22 open to the internet",
"remediation": "Restrict SSH to bastion hosts or VPN IPs"
},
# AWS RDS Rules
{
"rule_id": "AWS-RDS-001",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_db_instance", "AWS::RDS::DBInstance"],
"check": self._check_rds_public,
"description": "RDS instance is publicly accessible",
"remediation": "Set publicly_accessible to false"
},
{
"rule_id": "AWS-RDS-002",
"severity": Severity.HIGH,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_db_instance", "AWS::RDS::DBInstance"],
"check": self._check_rds_encryption,
"description": "RDS instance lacks encryption at rest",
"remediation": "Enable storage_encrypted"
},
# Kubernetes Rules
{
"rule_id": "K8S-POD-001",
"severity": Severity.CRITICAL,
"frameworks": ["kubernetes"],
"resource_types": ["Pod", "Deployment", "StatefulSet"],
"check": self._check_k8s_privileged,
"description": "Container running in privileged mode",
"remediation": "Set securityContext.privileged to false"
},
{
"rule_id": "K8S-POD-002",
"severity": Severity.HIGH,
"frameworks": ["kubernetes"],
"resource_types": ["Pod", "Deployment", "StatefulSet"],
"check": self._check_k8s_root,
"description": "Container running as root",
"remediation": "Set runAsNonRoot to true"
},
{
"rule_id": "K8S-POD-003",
"severity": Severity.MEDIUM,
"frameworks": ["kubernetes"],
"resource_types": ["Pod", "Deployment", "StatefulSet"],
"check": self._check_k8s_resources,
"description": "Container lacks resource limits",
"remediation": "Define CPU and memory limits"
}
]
def scan_file(self, file_path: str, framework: str) -> List[Finding]:
"""Scan a single file for security issues."""
with open(file_path, 'r') as f:
content = f.read()
parser = self.parsers.get(framework)
if not parser:
raise ValueError(f"Unsupported framework: {framework}")
try:
parsed = parser.parse(content)
resources = parser.get_resources(parsed)
except Exception as e:
print(f"Failed to parse {file_path}: {e}")
return []
findings = []
for resource in resources:
for rule in self.rules:
if framework not in rule["frameworks"]:
continue
if resource['type'] not in rule["resource_types"]:
continue
if rule["check"](resource['config'], framework):
finding = Finding(
rule_id=rule["rule_id"],
severity=rule["severity"],
resource_type=resource['type'],
resource_name=resource['name'],
file_path=file_path,
line_number=self._find_line(content, resource['name']),
description=rule["description"],
remediation=rule["remediation"],
framework=framework
)
findings.append(finding)
self.findings.extend(findings)
return findings
def _find_line(self, content: str, name: str) -> int:
lines = content.split('\n')
for i, line in enumerate(lines):
if name in line:
return i + 1
return 0
# Check functions
def _check_s3_public(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
acl = config.get('acl', '')
return acl in ['public-read', 'public-read-write']
elif framework == 'cloudformation':
acl = config.get('AccessControl', '')
return acl in ['PublicRead', 'PublicReadWrite']
return False
def _check_s3_encryption(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
return 'server_side_encryption_configuration' not in str(config)
elif framework == 'cloudformation':
return 'BucketEncryption' not in config
return False
def _check_sg_open_ingress(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
for ingress in config.get('ingress', []):
if '0.0.0.0/0' in str(ingress.get('cidr_blocks', [])):
return True
elif framework == 'cloudformation':
for ingress in config.get('SecurityGroupIngress', []):
if ingress.get('CidrIp') == '0.0.0.0/0':
return True
return False
def _check_sg_ssh_open(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
for ingress in config.get('ingress', []):
from_port = ingress.get('from_port', 0)
to_port = ingress.get('to_port', 0)
cidrs = ingress.get('cidr_blocks', [])
if from_port <= 22 <= to_port and '0.0.0.0/0' in cidrs:
return True
elif framework == 'cloudformation':
for ingress in config.get('SecurityGroupIngress', []):
from_port = ingress.get('FromPort', 0)
to_port = ingress.get('ToPort', 0)
cidr = ingress.get('CidrIp', '')
if from_port <= 22 <= to_port and cidr == '0.0.0.0/0':
return True
return False
def _check_rds_public(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
return config.get('publicly_accessible', False) is True
elif framework == 'cloudformation':
return config.get('PubliclyAccessible', False) is True
return False
def _check_rds_encryption(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
return config.get('storage_encrypted', False) is False
elif framework == 'cloudformation':
return config.get('StorageEncrypted', False) is False
return False
def _check_k8s_privileged(self, config: Dict, framework: str) -> bool:
containers = self._get_k8s_containers(config)
for container in containers:
sc = container.get('securityContext', {})
if sc.get('privileged', False):
return True
return False
def _check_k8s_root(self, config: Dict, framework: str) -> bool:
containers = self._get_k8s_containers(config)
for container in containers:
sc = container.get('securityContext', {})
if sc.get('runAsNonRoot') is not True:
return True
return False
def _check_k8s_resources(self, config: Dict, framework: str) -> bool:
containers = self._get_k8s_containers(config)
for container in containers:
resources = container.get('resources', {})
if not resources.get('limits'):
return True
return False
def _get_k8s_containers(self, config: Dict) -> List[Dict]:
# Handle both Pod and Deployment/StatefulSet
if 'containers' in config:
return config.get('containers', [])
elif 'template' in config:
return config.get('template', {}).get('spec', {}).get('containers', [])
return []
def generate_report(self) -> Dict:
"""Generate scan report."""
severity_counts = {s.value: 0 for s in Severity}
for finding in self.findings:
severity_counts[finding.severity.value] += 1
return {
"scan_summary": {
"total_findings": len(self.findings),
"by_severity": severity_counts,
"passed": severity_counts['critical'] == 0 and severity_counts['high'] == 0
},
"findings": [
{
"rule_id": f.rule_id,
"severity": f.severity.value,
"resource": f"{f.resource_type}.{f.resource_name}",
"file": f.file_path,
"line": f.line_number,
"description": f.description,
"remediation": f.remediation,
"framework": f.framework
}
for f in sorted(self.findings, key=lambda x: Severity[x.severity.name].value, reverse=True)
]
}CI/CD Integration
Integrate scanning into CI/CD pipelines:
# .github/workflows/iac-security.yml
name: IaC Security Scan
on:
pull_request:
paths:
- 'terraform/**'
- 'cloudformation/**'
- 'kubernetes/**'
push:
branches: [main]
jobs:
scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install checkov tfsec
- name: Run Checkov
run: |
checkov -d . --framework terraform cloudformation kubernetes \
--output sarif --output-file checkov-results.sarif
- name: Run tfsec
run: |
tfsec terraform/ --format sarif > tfsec-results.sarif
- name: Upload SARIF results
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: checkov-results.sarif
- name: Fail on critical findings
run: |
checkov -d . --framework terraform --check CKV_AWS_19,CKV_AWS_20 \
--hard-fail-on CRITICALConclusion
IaC security scanning prevents misconfigurations from reaching production. Integrate scanning into CI/CD pipelines, use policy as code for organizational standards, and monitor for configuration drift. Start with critical security rules and expand coverage as your program matures.