DevSecOps

Infrastructure as Code Security Scanning Best Practices

DeviDevs Team
7 min read
#iac-security#terraform#devsecops#policy-as-code#cloud-security

Infrastructure as Code security scanning catches misconfigurations before deployment. This guide covers practical implementations for securing Terraform, CloudFormation, and Kubernetes configurations.

Multi-Framework IaC Scanner

Build a scanner supporting multiple IaC formats:

from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from abc import ABC, abstractmethod
import json
import yaml
import re
 
class Severity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
 
@dataclass
class Finding:
    rule_id: str
    severity: Severity
    resource_type: str
    resource_name: str
    file_path: str
    line_number: int
    description: str
    remediation: str
    framework: str
 
class IaCParser(ABC):
    @abstractmethod
    def parse(self, content: str) -> Dict:
        pass
 
    @abstractmethod
    def get_resources(self, parsed: Dict) -> List[Dict]:
        pass
 
class TerraformParser(IaCParser):
    def parse(self, content: str) -> Dict:
        import hcl2
        from io import StringIO
        return hcl2.load(StringIO(content))
 
    def get_resources(self, parsed: Dict) -> List[Dict]:
        resources = []
        for resource_block in parsed.get('resource', []):
            for res_type, res_instances in resource_block.items():
                for res_name, config in res_instances.items():
                    resources.append({
                        'type': res_type,
                        'name': res_name,
                        'config': config,
                        'framework': 'terraform'
                    })
        return resources
 
class CloudFormationParser(IaCParser):
    def parse(self, content: str) -> Dict:
        if content.strip().startswith('{'):
            return json.loads(content)
        return yaml.safe_load(content)
 
    def get_resources(self, parsed: Dict) -> List[Dict]:
        resources = []
        for res_name, res_config in parsed.get('Resources', {}).items():
            resources.append({
                'type': res_config.get('Type', ''),
                'name': res_name,
                'config': res_config.get('Properties', {}),
                'framework': 'cloudformation'
            })
        return resources
 
class KubernetesParser(IaCParser):
    def parse(self, content: str) -> Dict:
        return yaml.safe_load(content)
 
    def get_resources(self, parsed: Dict) -> List[Dict]:
        return [{
            'type': parsed.get('kind', ''),
            'name': parsed.get('metadata', {}).get('name', ''),
            'config': parsed.get('spec', {}),
            'metadata': parsed.get('metadata', {}),
            'framework': 'kubernetes'
        }]
 
class IaCSecurityScanner:
    def __init__(self):
        self.parsers = {
            'terraform': TerraformParser(),
            'cloudformation': CloudFormationParser(),
            'kubernetes': KubernetesParser()
        }
        self.rules = self._load_rules()
        self.findings: List[Finding] = []
 
    def _load_rules(self) -> List[Dict]:
        return [
            # AWS S3 Rules
            {
                "rule_id": "AWS-S3-001",
                "severity": Severity.CRITICAL,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_s3_bucket", "AWS::S3::Bucket"],
                "check": self._check_s3_public,
                "description": "S3 bucket allows public access",
                "remediation": "Block public access using bucket policy or ACL"
            },
            {
                "rule_id": "AWS-S3-002",
                "severity": Severity.HIGH,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_s3_bucket", "AWS::S3::Bucket"],
                "check": self._check_s3_encryption,
                "description": "S3 bucket lacks server-side encryption",
                "remediation": "Enable SSE-S3 or SSE-KMS encryption"
            },
            # AWS Security Group Rules
            {
                "rule_id": "AWS-SG-001",
                "severity": Severity.CRITICAL,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_security_group", "AWS::EC2::SecurityGroup"],
                "check": self._check_sg_open_ingress,
                "description": "Security group allows unrestricted ingress (0.0.0.0/0)",
                "remediation": "Restrict ingress to specific IP ranges"
            },
            {
                "rule_id": "AWS-SG-002",
                "severity": Severity.CRITICAL,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_security_group", "AWS::EC2::SecurityGroup"],
                "check": self._check_sg_ssh_open,
                "description": "SSH port 22 open to the internet",
                "remediation": "Restrict SSH to bastion hosts or VPN IPs"
            },
            # AWS RDS Rules
            {
                "rule_id": "AWS-RDS-001",
                "severity": Severity.CRITICAL,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_db_instance", "AWS::RDS::DBInstance"],
                "check": self._check_rds_public,
                "description": "RDS instance is publicly accessible",
                "remediation": "Set publicly_accessible to false"
            },
            {
                "rule_id": "AWS-RDS-002",
                "severity": Severity.HIGH,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_db_instance", "AWS::RDS::DBInstance"],
                "check": self._check_rds_encryption,
                "description": "RDS instance lacks encryption at rest",
                "remediation": "Enable storage_encrypted"
            },
            # Kubernetes Rules
            {
                "rule_id": "K8S-POD-001",
                "severity": Severity.CRITICAL,
                "frameworks": ["kubernetes"],
                "resource_types": ["Pod", "Deployment", "StatefulSet"],
                "check": self._check_k8s_privileged,
                "description": "Container running in privileged mode",
                "remediation": "Set securityContext.privileged to false"
            },
            {
                "rule_id": "K8S-POD-002",
                "severity": Severity.HIGH,
                "frameworks": ["kubernetes"],
                "resource_types": ["Pod", "Deployment", "StatefulSet"],
                "check": self._check_k8s_root,
                "description": "Container running as root",
                "remediation": "Set runAsNonRoot to true"
            },
            {
                "rule_id": "K8S-POD-003",
                "severity": Severity.MEDIUM,
                "frameworks": ["kubernetes"],
                "resource_types": ["Pod", "Deployment", "StatefulSet"],
                "check": self._check_k8s_resources,
                "description": "Container lacks resource limits",
                "remediation": "Define CPU and memory limits"
            }
        ]
 
    def scan_file(self, file_path: str, framework: str) -> List[Finding]:
        """Scan a single file for security issues."""
        with open(file_path, 'r') as f:
            content = f.read()
 
        parser = self.parsers.get(framework)
        if not parser:
            raise ValueError(f"Unsupported framework: {framework}")
 
        try:
            parsed = parser.parse(content)
            resources = parser.get_resources(parsed)
        except Exception as e:
            print(f"Failed to parse {file_path}: {e}")
            return []
 
        findings = []
        for resource in resources:
            for rule in self.rules:
                if framework not in rule["frameworks"]:
                    continue
 
                if resource['type'] not in rule["resource_types"]:
                    continue
 
                if rule["check"](resource['config'], framework):
                    finding = Finding(
                        rule_id=rule["rule_id"],
                        severity=rule["severity"],
                        resource_type=resource['type'],
                        resource_name=resource['name'],
                        file_path=file_path,
                        line_number=self._find_line(content, resource['name']),
                        description=rule["description"],
                        remediation=rule["remediation"],
                        framework=framework
                    )
                    findings.append(finding)
 
        self.findings.extend(findings)
        return findings
 
    def _find_line(self, content: str, name: str) -> int:
        lines = content.split('\n')
        for i, line in enumerate(lines):
            if name in line:
                return i + 1
        return 0
 
    # Check functions
    def _check_s3_public(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            acl = config.get('acl', '')
            return acl in ['public-read', 'public-read-write']
        elif framework == 'cloudformation':
            acl = config.get('AccessControl', '')
            return acl in ['PublicRead', 'PublicReadWrite']
        return False
 
    def _check_s3_encryption(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            return 'server_side_encryption_configuration' not in str(config)
        elif framework == 'cloudformation':
            return 'BucketEncryption' not in config
        return False
 
    def _check_sg_open_ingress(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            for ingress in config.get('ingress', []):
                if '0.0.0.0/0' in str(ingress.get('cidr_blocks', [])):
                    return True
        elif framework == 'cloudformation':
            for ingress in config.get('SecurityGroupIngress', []):
                if ingress.get('CidrIp') == '0.0.0.0/0':
                    return True
        return False
 
    def _check_sg_ssh_open(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            for ingress in config.get('ingress', []):
                from_port = ingress.get('from_port', 0)
                to_port = ingress.get('to_port', 0)
                cidrs = ingress.get('cidr_blocks', [])
                if from_port <= 22 <= to_port and '0.0.0.0/0' in cidrs:
                    return True
        elif framework == 'cloudformation':
            for ingress in config.get('SecurityGroupIngress', []):
                from_port = ingress.get('FromPort', 0)
                to_port = ingress.get('ToPort', 0)
                cidr = ingress.get('CidrIp', '')
                if from_port <= 22 <= to_port and cidr == '0.0.0.0/0':
                    return True
        return False
 
    def _check_rds_public(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            return config.get('publicly_accessible', False) is True
        elif framework == 'cloudformation':
            return config.get('PubliclyAccessible', False) is True
        return False
 
    def _check_rds_encryption(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            return config.get('storage_encrypted', False) is False
        elif framework == 'cloudformation':
            return config.get('StorageEncrypted', False) is False
        return False
 
    def _check_k8s_privileged(self, config: Dict, framework: str) -> bool:
        containers = self._get_k8s_containers(config)
        for container in containers:
            sc = container.get('securityContext', {})
            if sc.get('privileged', False):
                return True
        return False
 
    def _check_k8s_root(self, config: Dict, framework: str) -> bool:
        containers = self._get_k8s_containers(config)
        for container in containers:
            sc = container.get('securityContext', {})
            if sc.get('runAsNonRoot') is not True:
                return True
        return False
 
    def _check_k8s_resources(self, config: Dict, framework: str) -> bool:
        containers = self._get_k8s_containers(config)
        for container in containers:
            resources = container.get('resources', {})
            if not resources.get('limits'):
                return True
        return False
 
    def _get_k8s_containers(self, config: Dict) -> List[Dict]:
        # Handle both Pod and Deployment/StatefulSet
        if 'containers' in config:
            return config.get('containers', [])
        elif 'template' in config:
            return config.get('template', {}).get('spec', {}).get('containers', [])
        return []
 
    def generate_report(self) -> Dict:
        """Generate scan report."""
        severity_counts = {s.value: 0 for s in Severity}
        for finding in self.findings:
            severity_counts[finding.severity.value] += 1
 
        return {
            "scan_summary": {
                "total_findings": len(self.findings),
                "by_severity": severity_counts,
                "passed": severity_counts['critical'] == 0 and severity_counts['high'] == 0
            },
            "findings": [
                {
                    "rule_id": f.rule_id,
                    "severity": f.severity.value,
                    "resource": f"{f.resource_type}.{f.resource_name}",
                    "file": f.file_path,
                    "line": f.line_number,
                    "description": f.description,
                    "remediation": f.remediation,
                    "framework": f.framework
                }
                for f in sorted(self.findings, key=lambda x: Severity[x.severity.name].value, reverse=True)
            ]
        }

CI/CD Integration

Integrate scanning into CI/CD pipelines:

# .github/workflows/iac-security.yml
name: IaC Security Scan
 
on:
  pull_request:
    paths:
      - 'terraform/**'
      - 'cloudformation/**'
      - 'kubernetes/**'
  push:
    branches: [main]
 
jobs:
  scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
 
      - name: Install dependencies
        run: |
          pip install checkov tfsec
 
      - name: Run Checkov
        run: |
          checkov -d . --framework terraform cloudformation kubernetes \
            --output sarif --output-file checkov-results.sarif
 
      - name: Run tfsec
        run: |
          tfsec terraform/ --format sarif > tfsec-results.sarif
 
      - name: Upload SARIF results
        uses: github/codeql-action/upload-sarif@v3
        with:
          sarif_file: checkov-results.sarif
 
      - name: Fail on critical findings
        run: |
          checkov -d . --framework terraform --check CKV_AWS_19,CKV_AWS_20 \
            --hard-fail-on CRITICAL

Conclusion

IaC security scanning prevents misconfigurations from reaching production. Integrate scanning into CI/CD pipelines, use policy as code for organizational standards, and monitor for configuration drift. Start with critical security rules and expand coverage as your program matures.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.