DevSecOps

Infrastructure as Code Security Scanning Best Practices

Nicu Constantin
--7 min lectura
#iac-security#terraform#devsecops#policy-as-code#cloud-security

Infrastructure as Code security scanning catches misconfigurations before deployment. This guide covers practical implementations for securing Terraform, CloudFormation, and Kubernetes configurations.

Multi-Framework IaC Scanner

Build a scanner supporting multiple IaC formats:

from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from abc import ABC, abstractmethod
import json
import yaml
import re
 
class Severity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
 
@dataclass
class Finding:
    rule_id: str
    severity: Severity
    resource_type: str
    resource_name: str
    file_path: str
    line_number: int
    description: str
    remediation: str
    framework: str
 
class IaCParser(ABC):
    @abstractmethod
    def parse(self, content: str) -> Dict:
        pass
 
    @abstractmethod
    def get_resources(self, parsed: Dict) -> List[Dict]:
        pass
 
class TerraformParser(IaCParser):
    def parse(self, content: str) -> Dict:
        import hcl2
        from io import StringIO
        return hcl2.load(StringIO(content))
 
    def get_resources(self, parsed: Dict) -> List[Dict]:
        resources = []
        for resource_block in parsed.get('resource', []):
            for res_type, res_instances in resource_block.items():
                for res_name, config in res_instances.items():
                    resources.append({
                        'type': res_type,
                        'name': res_name,
                        'config': config,
                        'framework': 'terraform'
                    })
        return resources
 
class CloudFormationParser(IaCParser):
    def parse(self, content: str) -> Dict:
        if content.strip().startswith('{'):
            return json.loads(content)
        return yaml.safe_load(content)
 
    def get_resources(self, parsed: Dict) -> List[Dict]:
        resources = []
        for res_name, res_config in parsed.get('Resources', {}).items():
            resources.append({
                'type': res_config.get('Type', ''),
                'name': res_name,
                'config': res_config.get('Properties', {}),
                'framework': 'cloudformation'
            })
        return resources
 
class KubernetesParser(IaCParser):
    def parse(self, content: str) -> Dict:
        return yaml.safe_load(content)
 
    def get_resources(self, parsed: Dict) -> List[Dict]:
        return [{
            'type': parsed.get('kind', ''),
            'name': parsed.get('metadata', {}).get('name', ''),
            'config': parsed.get('spec', {}),
            'metadata': parsed.get('metadata', {}),
            'framework': 'kubernetes'
        }]
 
class IaCSecurityScanner:
    def __init__(self):
        self.parsers = {
            'terraform': TerraformParser(),
            'cloudformation': CloudFormationParser(),
            'kubernetes': KubernetesParser()
        }
        self.rules = self._load_rules()
        self.findings: List[Finding] = []
 
    def _load_rules(self) -> List[Dict]:
        return [
            # AWS S3 Rules
            {
                "rule_id": "AWS-S3-001",
                "severity": Severity.CRITICAL,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_s3_bucket", "AWS::S3::Bucket"],
                "check": self._check_s3_public,
                "description": "S3 bucket allows public access",
                "remediation": "Block public access using bucket policy or ACL"
            },
            {
                "rule_id": "AWS-S3-002",
                "severity": Severity.HIGH,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_s3_bucket", "AWS::S3::Bucket"],
                "check": self._check_s3_encryption,
                "description": "S3 bucket lacks server-side encryption",
                "remediation": "Enable SSE-S3 or SSE-KMS encryption"
            },
            # AWS Security Group Rules
            {
                "rule_id": "AWS-SG-001",
                "severity": Severity.CRITICAL,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_security_group", "AWS::EC2::SecurityGroup"],
                "check": self._check_sg_open_ingress,
                "description": "Security group allows unrestricted ingress (0.0.0.0/0)",
                "remediation": "Restrict ingress to specific IP ranges"
            },
            {
                "rule_id": "AWS-SG-002",
                "severity": Severity.CRITICAL,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_security_group", "AWS::EC2::SecurityGroup"],
                "check": self._check_sg_ssh_open,
                "description": "SSH port 22 open to the internet",
                "remediation": "Restrict SSH to bastion hosts or VPN IPs"
            },
            # AWS RDS Rules
            {
                "rule_id": "AWS-RDS-001",
                "severity": Severity.CRITICAL,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_db_instance", "AWS::RDS::DBInstance"],
                "check": self._check_rds_public,
                "description": "RDS instance is publicly accessible",
                "remediation": "Set publicly_accessible to false"
            },
            {
                "rule_id": "AWS-RDS-002",
                "severity": Severity.HIGH,
                "frameworks": ["terraform", "cloudformation"],
                "resource_types": ["aws_db_instance", "AWS::RDS::DBInstance"],
                "check": self._check_rds_encryption,
                "description": "RDS instance lacks encryption at rest",
                "remediation": "Enable storage_encrypted"
            },
            # Kubernetes Rules
            {
                "rule_id": "K8S-POD-001",
                "severity": Severity.CRITICAL,
                "frameworks": ["kubernetes"],
                "resource_types": ["Pod", "Deployment", "StatefulSet"],
                "check": self._check_k8s_privileged,
                "description": "Container running in privileged mode",
                "remediation": "Set securityContext.privileged to false"
            },
            {
                "rule_id": "K8S-POD-002",
                "severity": Severity.HIGH,
                "frameworks": ["kubernetes"],
                "resource_types": ["Pod", "Deployment", "StatefulSet"],
                "check": self._check_k8s_root,
                "description": "Container running as root",
                "remediation": "Set runAsNonRoot to true"
            },
            {
                "rule_id": "K8S-POD-003",
                "severity": Severity.MEDIUM,
                "frameworks": ["kubernetes"],
                "resource_types": ["Pod", "Deployment", "StatefulSet"],
                "check": self._check_k8s_resources,
                "description": "Container lacks resource limits",
                "remediation": "Define CPU and memory limits"
            }
        ]
 
    def scan_file(self, file_path: str, framework: str) -> List[Finding]:
        """Scan a single file for security issues."""
        with open(file_path, 'r') as f:
            content = f.read()
 
        parser = self.parsers.get(framework)
        if not parser:
            raise ValueError(f"Unsupported framework: {framework}")
 
        try:
            parsed = parser.parse(content)
            resources = parser.get_resources(parsed)
        except Exception as e:
            print(f"Failed to parse {file_path}: {e}")
            return []
 
        findings = []
        for resource in resources:
            for rule in self.rules:
                if framework not in rule["frameworks"]:
                    continue
 
                if resource['type'] not in rule["resource_types"]:
                    continue
 
                if rule["check"](resource['config'], framework):
                    finding = Finding(
                        rule_id=rule["rule_id"],
                        severity=rule["severity"],
                        resource_type=resource['type'],
                        resource_name=resource['name'],
                        file_path=file_path,
                        line_number=self._find_line(content, resource['name']),
                        description=rule["description"],
                        remediation=rule["remediation"],
                        framework=framework
                    )
                    findings.append(finding)
 
        self.findings.extend(findings)
        return findings
 
    def _find_line(self, content: str, name: str) -> int:
        lines = content.split('\n')
        for i, line in enumerate(lines):
            if name in line:
                return i + 1
        return 0
 
    # Check functions
    def _check_s3_public(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            acl = config.get('acl', '')
            return acl in ['public-read', 'public-read-write']
        elif framework == 'cloudformation':
            acl = config.get('AccessControl', '')
            return acl in ['PublicRead', 'PublicReadWrite']
        return False
 
    def _check_s3_encryption(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            return 'server_side_encryption_configuration' not in str(config)
        elif framework == 'cloudformation':
            return 'BucketEncryption' not in config
        return False
 
    def _check_sg_open_ingress(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            for ingress in config.get('ingress', []):
                if '0.0.0.0/0' in str(ingress.get('cidr_blocks', [])):
                    return True
        elif framework == 'cloudformation':
            for ingress in config.get('SecurityGroupIngress', []):
                if ingress.get('CidrIp') == '0.0.0.0/0':
                    return True
        return False
 
    def _check_sg_ssh_open(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            for ingress in config.get('ingress', []):
                from_port = ingress.get('from_port', 0)
                to_port = ingress.get('to_port', 0)
                cidrs = ingress.get('cidr_blocks', [])
                if from_port <= 22 <= to_port and '0.0.0.0/0' in cidrs:
                    return True
        elif framework == 'cloudformation':
            for ingress in config.get('SecurityGroupIngress', []):
                from_port = ingress.get('FromPort', 0)
                to_port = ingress.get('ToPort', 0)
                cidr = ingress.get('CidrIp', '')
                if from_port <= 22 <= to_port and cidr == '0.0.0.0/0':
                    return True
        return False
 
    def _check_rds_public(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            return config.get('publicly_accessible', False) is True
        elif framework == 'cloudformation':
            return config.get('PubliclyAccessible', False) is True
        return False
 
    def _check_rds_encryption(self, config: Dict, framework: str) -> bool:
        if framework == 'terraform':
            return config.get('storage_encrypted', False) is False
        elif framework == 'cloudformation':
            return config.get('StorageEncrypted', False) is False
        return False
 
    def _check_k8s_privileged(self, config: Dict, framework: str) -> bool:
        containers = self._get_k8s_containers(config)
        for container in containers:
            sc = container.get('securityContext', {})
            if sc.get('privileged', False):
                return True
        return False
 
    def _check_k8s_root(self, config: Dict, framework: str) -> bool:
        containers = self._get_k8s_containers(config)
        for container in containers:
            sc = container.get('securityContext', {})
            if sc.get('runAsNonRoot') is not True:
                return True
        return False
 
    def _check_k8s_resources(self, config: Dict, framework: str) -> bool:
        containers = self._get_k8s_containers(config)
        for container in containers:
            resources = container.get('resources', {})
            if not resources.get('limits'):
                return True
        return False
 
    def _get_k8s_containers(self, config: Dict) -> List[Dict]:
        # Handle both Pod and Deployment/StatefulSet
        if 'containers' in config:
            return config.get('containers', [])
        elif 'template' in config:
            return config.get('template', {}).get('spec', {}).get('containers', [])
        return []
 
    def generate_report(self) -> Dict:
        """Generate scan report."""
        severity_counts = {s.value: 0 for s in Severity}
        for finding in self.findings:
            severity_counts[finding.severity.value] += 1
 
        return {
            "scan_summary": {
                "total_findings": len(self.findings),
                "by_severity": severity_counts,
                "passed": severity_counts['critical'] == 0 and severity_counts['high'] == 0
            },
            "findings": [
                {
                    "rule_id": f.rule_id,
                    "severity": f.severity.value,
                    "resource": f"{f.resource_type}.{f.resource_name}",
                    "file": f.file_path,
                    "line": f.line_number,
                    "description": f.description,
                    "remediation": f.remediation,
                    "framework": f.framework
                }
                for f in sorted(self.findings, key=lambda x: Severity[x.severity.name].value, reverse=True)
            ]
        }

CI/CD Integration

Integrate scanning into CI/CD pipelines:

# .github/workflows/iac-security.yml
name: IaC Security Scan
 
on:
  pull_request:
    paths:
      - 'terraform/**'
      - 'cloudformation/**'
      - 'kubernetes/**'
  push:
    branches: [main]
 
jobs:
  scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
 
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
 
      - name: Install dependencies
        run: |
          pip install checkov tfsec
 
      - name: Run Checkov
        run: |
          checkov -d . --framework terraform cloudformation kubernetes \
            --output sarif --output-file checkov-results.sarif
 
      - name: Run tfsec
        run: |
          tfsec terraform/ --format sarif > tfsec-results.sarif
 
      - name: Upload SARIF results
        uses: github/codeql-action/upload-sarif@v3
        with:
          sarif_file: checkov-results.sarif
 
      - name: Fail on critical findings
        run: |
          checkov -d . --framework terraform --check CKV_AWS_19,CKV_AWS_20 \
            --hard-fail-on CRITICAL

Conclusion

IaC security scanning prevents misconfigurations from reaching production. Integrate scanning into CI/CD pipelines, use policy as code for organizational standards, and monitor for configuration drift. Start with critical security rules and expand coverage as your program matures.

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.