DevSecOps

Infrastructure as Code Security: Terraform and CloudFormation Hardening

DeviDevs Team
16 min read
#infrastructure-as-code#terraform#cloudformation#security#devsecops#policy-as-code

Infrastructure as Code Security: Terraform and CloudFormation Hardening

Infrastructure as Code (IaC) has revolutionized how organizations provision and manage cloud resources. However, misconfigurations in IaC templates are among the leading causes of cloud security breaches. This guide covers comprehensive security practices for Terraform and CloudFormation deployments.

The IaC Security Challenge

A 2024 study found that 72% of organizations had at least one critical misconfiguration in their IaC templates, with common issues including:

  • Overly permissive IAM policies
  • Unencrypted storage resources
  • Public network exposure
  • Missing logging and monitoring
  • Hardcoded secrets

Security-First Terraform Patterns

Secure Provider Configuration

# versions.tf - Lock provider versions
terraform {
  required_version = ">= 1.6.0"
 
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.31.0"
    }
    random = {
      source  = "hashicorp/random"
      version = "~> 3.6.0"
    }
  }
 
  # Secure state backend
  backend "s3" {
    bucket         = "company-terraform-state"
    key            = "production/terraform.tfstate"
    region         = "us-east-1"
    encrypt        = true
    dynamodb_table = "terraform-state-lock"
 
    # Require specific role for state access
    role_arn = "arn:aws:iam::123456789012:role/TerraformStateAccess"
  }
}
 
# Provider with security defaults
provider "aws" {
  region = var.aws_region
 
  # Require specific role assumption
  assume_role {
    role_arn     = var.deployment_role_arn
    session_name = "TerraformDeployment"
    external_id  = var.external_id
  }
 
  default_tags {
    tags = {
      Environment     = var.environment
      ManagedBy       = "Terraform"
      SecurityReview  = "Required"
      CostCenter      = var.cost_center
    }
  }
}

Secure S3 Bucket Module

# modules/secure-s3/main.tf
resource "aws_s3_bucket" "secure" {
  bucket = var.bucket_name
 
  # Force destroy protection for production
  force_destroy = var.environment != "production"
 
  tags = merge(var.tags, {
    DataClassification = var.data_classification
  })
}
 
# Block all public access
resource "aws_s3_bucket_public_access_block" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}
 
# Server-side encryption with KMS
resource "aws_s3_bucket_server_side_encryption_configuration" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  rule {
    apply_server_side_encryption_by_default {
      kms_master_key_id = var.kms_key_arn
      sse_algorithm     = "aws:kms"
    }
    bucket_key_enabled = true
  }
}
 
# Versioning for data protection
resource "aws_s3_bucket_versioning" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  versioning_configuration {
    status = "Enabled"
  }
}
 
# Lifecycle rules for cost and compliance
resource "aws_s3_bucket_lifecycle_configuration" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  rule {
    id     = "transition-to-glacier"
    status = "Enabled"
 
    transition {
      days          = 90
      storage_class = "GLACIER"
    }
 
    noncurrent_version_transition {
      noncurrent_days = 30
      storage_class   = "GLACIER"
    }
 
    noncurrent_version_expiration {
      noncurrent_days = var.retention_days
    }
  }
}
 
# Access logging
resource "aws_s3_bucket_logging" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  target_bucket = var.logging_bucket_id
  target_prefix = "s3-access-logs/${var.bucket_name}/"
}
 
# Bucket policy enforcing TLS
resource "aws_s3_bucket_policy" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid       = "EnforceTLS"
        Effect    = "Deny"
        Principal = "*"
        Action    = "s3:*"
        Resource = [
          aws_s3_bucket.secure.arn,
          "${aws_s3_bucket.secure.arn}/*"
        ]
        Condition = {
          Bool = {
            "aws:SecureTransport" = "false"
          }
        }
      },
      {
        Sid       = "EnforceEncryption"
        Effect    = "Deny"
        Principal = "*"
        Action    = "s3:PutObject"
        Resource  = "${aws_s3_bucket.secure.arn}/*"
        Condition = {
          StringNotEquals = {
            "s3:x-amz-server-side-encryption" = "aws:kms"
          }
        }
      }
    ]
  })
}
 
# Output for dependency management
output "bucket_arn" {
  value       = aws_s3_bucket.secure.arn
  description = "ARN of the secure S3 bucket"
}
 
output "bucket_id" {
  value       = aws_s3_bucket.secure.id
  description = "ID of the secure S3 bucket"
}

Secure IAM Role Module

# modules/secure-iam-role/main.tf
locals {
  # Validate that role isn't overly permissive
  has_admin_access = contains(var.managed_policy_arns, "arn:aws:iam::aws:policy/AdministratorAccess")
  has_wildcard_resource = can(regex("\"Resource\"\\s*:\\s*\"\\*\"", jsonencode(var.inline_policy)))
}
 
# Fail-safe validation
resource "null_resource" "security_validation" {
  count = local.has_admin_access && var.environment == "production" ? "ADMIN_ACCESS_NOT_ALLOWED_IN_PRODUCTION" : 0
}
 
resource "aws_iam_role" "secure" {
  name                 = var.role_name
  max_session_duration = var.max_session_duration
 
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = var.trusted_services
          AWS     = var.trusted_accounts
        }
        Action = "sts:AssumeRole"
        Condition = merge(
          # Require external ID for cross-account
          length(var.trusted_accounts) > 0 ? {
            StringEquals = {
              "sts:ExternalId" = var.external_id
            }
          } : {},
          # Require MFA for human users
          var.require_mfa ? {
            Bool = {
              "aws:MultiFactorAuthPresent" = "true"
            }
          } : {},
          # IP restriction
          length(var.allowed_ip_ranges) > 0 ? {
            IpAddress = {
              "aws:SourceIp" = var.allowed_ip_ranges
            }
          } : {}
        )
      }
    ]
  })
 
  tags = var.tags
}
 
# Permission boundary enforcement
resource "aws_iam_role_policy_attachment" "permission_boundary" {
  count      = var.permission_boundary_arn != null ? 1 : 0
  role       = aws_iam_role.secure.name
  policy_arn = var.permission_boundary_arn
}
 
# Inline policy with least privilege
resource "aws_iam_role_policy" "inline" {
  count  = var.inline_policy != null ? 1 : 0
  name   = "${var.role_name}-policy"
  role   = aws_iam_role.secure.id
  policy = var.inline_policy
}
 
# Managed policy attachments
resource "aws_iam_role_policy_attachment" "managed" {
  for_each   = toset(var.managed_policy_arns)
  role       = aws_iam_role.secure.name
  policy_arn = each.value
}

CloudFormation Security Patterns

Secure Stack Template

# secure-infrastructure.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Secure infrastructure stack with security best practices'
 
Metadata:
  cfn-lint:
    config:
      regions:
        - us-east-1
        - us-west-2
 
  AWS::CloudFormation::Interface:
    ParameterGroups:
      - Label:
          default: Security Configuration
        Parameters:
          - Environment
          - KMSKeyArn
          - AllowedCIDR
 
Parameters:
  Environment:
    Type: String
    AllowedValues:
      - development
      - staging
      - production
    Default: development
 
  KMSKeyArn:
    Type: String
    Description: ARN of KMS key for encryption
    AllowedPattern: '^arn:aws:kms:[a-z0-9-]+:[0-9]+:key/[a-f0-9-]+$'
 
  AllowedCIDR:
    Type: String
    Description: CIDR range for access
    AllowedPattern: '^([0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}$'
    Default: '10.0.0.0/8'
 
Conditions:
  IsProduction: !Equals [!Ref Environment, 'production']
 
Rules:
  ProductionRequiresKMS:
    RuleCondition: !Equals [!Ref Environment, 'production']
    Assertions:
      - Assert: !Not [!Equals [!Ref KMSKeyArn, '']]
        AssertDescription: 'KMS key is required for production'
 
Resources:
  # Secure VPC with flow logs
  SecureVPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: !Ref AllowedCIDR
      EnableDnsHostnames: true
      EnableDnsSupport: true
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-vpc'
        - Key: Environment
          Value: !Ref Environment
 
  VPCFlowLog:
    Type: AWS::EC2::FlowLog
    Properties:
      ResourceId: !Ref SecureVPC
      ResourceType: VPC
      TrafficType: ALL
      LogDestinationType: cloud-watch-logs
      LogGroupName: !Sub '/aws/vpc/${AWS::StackName}-flow-logs'
      MaxAggregationInterval: 60
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-flow-log'
 
  FlowLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/vpc/${AWS::StackName}-flow-logs'
      RetentionInDays: !If [IsProduction, 365, 30]
      KmsKeyId: !If [IsProduction, !Ref KMSKeyArn, !Ref 'AWS::NoValue']
 
  # Secure Security Group
  SecureSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Metadata:
      cfn_nag:
        rules_to_suppress:
          - id: W5
            reason: 'Egress to 0.0.0.0/0 required for updates'
    Properties:
      GroupDescription: Secure security group with minimal access
      VpcId: !Ref SecureVPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          CidrIp: !Ref AllowedCIDR
          Description: HTTPS from allowed CIDR
      SecurityGroupEgress:
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          CidrIp: 0.0.0.0/0
          Description: HTTPS outbound
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-sg'
 
  # Encrypted RDS Instance
  SecureDatabase:
    Type: AWS::RDS::DBInstance
    DeletionPolicy: !If [IsProduction, Retain, Delete]
    UpdateReplacePolicy: !If [IsProduction, Retain, Delete]
    Properties:
      DBInstanceIdentifier: !Sub '${AWS::StackName}-db'
      DBInstanceClass: db.t3.medium
      Engine: postgres
      EngineVersion: '15'
      MasterUsername: !Sub '{{resolve:secretsmanager:${DatabaseSecret}:SecretString:username}}'
      MasterUserPassword: !Sub '{{resolve:secretsmanager:${DatabaseSecret}:SecretString:password}}'
      StorageEncrypted: true
      KmsKeyId: !Ref KMSKeyArn
      MultiAZ: !If [IsProduction, true, false]
      PubliclyAccessible: false
      EnableIAMDatabaseAuthentication: true
      DeletionProtection: !If [IsProduction, true, false]
      BackupRetentionPeriod: !If [IsProduction, 35, 7]
      EnableCloudwatchLogsExports:
        - postgresql
        - upgrade
      VPCSecurityGroups:
        - !Ref SecureSecurityGroup
      Tags:
        - Key: Environment
          Value: !Ref Environment
 
  # Secrets Manager for database credentials
  DatabaseSecret:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: !Sub '${AWS::StackName}/database/credentials'
      Description: Database credentials
      KmsKeyId: !Ref KMSKeyArn
      GenerateSecretString:
        SecretStringTemplate: '{"username": "dbadmin"}'
        GenerateStringKey: password
        PasswordLength: 32
        ExcludeCharacters: '"@/\'
      Tags:
        - Key: Environment
          Value: !Ref Environment
 
  # Secret rotation
  SecretRotationSchedule:
    Type: AWS::SecretsManager::RotationSchedule
    Condition: IsProduction
    Properties:
      SecretId: !Ref DatabaseSecret
      RotationRules:
        AutomaticallyAfterDays: 30
 
Outputs:
  VPCId:
    Description: VPC ID
    Value: !Ref SecureVPC
    Export:
      Name: !Sub '${AWS::StackName}-VPCId'
 
  SecurityGroupId:
    Description: Security Group ID
    Value: !Ref SecureSecurityGroup
    Export:
      Name: !Sub '${AWS::StackName}-SecurityGroupId'

Policy as Code with OPA

Terraform Security Policies

# opa_policies/terraform_security.py
"""
Open Policy Agent policies for Terraform security scanning.
"""
 
import json
import subprocess
from dataclasses import dataclass
from typing import List, Dict, Any
from pathlib import Path
 
 
@dataclass
class PolicyViolation:
    """Represents a policy violation."""
    rule_id: str
    severity: str
    resource: str
    message: str
    remediation: str
 
 
class TerraformSecurityScanner:
    """Scan Terraform plans against security policies."""
 
    def __init__(self, policy_dir: str = "policies"):
        self.policy_dir = Path(policy_dir)
        self.policies = self._load_policies()
 
    def _load_policies(self) -> Dict[str, Any]:
        """Load OPA policies from directory."""
        policies = {}
        for policy_file in self.policy_dir.glob("*.rego"):
            policies[policy_file.stem] = policy_file.read_text()
        return policies
 
    def scan_plan(self, plan_json: Dict[str, Any]) -> List[PolicyViolation]:
        """Scan Terraform plan JSON against all policies."""
        violations = []
 
        # S3 bucket security checks
        violations.extend(self._check_s3_security(plan_json))
 
        # IAM security checks
        violations.extend(self._check_iam_security(plan_json))
 
        # Network security checks
        violations.extend(self._check_network_security(plan_json))
 
        # Encryption checks
        violations.extend(self._check_encryption(plan_json))
 
        return violations
 
    def _check_s3_security(self, plan: Dict) -> List[PolicyViolation]:
        """Check S3 bucket security configurations."""
        violations = []
 
        for resource in self._get_resources(plan, "aws_s3_bucket"):
            bucket_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            # Check for public access
            if not self._has_public_access_block(plan, bucket_name):
                violations.append(PolicyViolation(
                    rule_id="S3-001",
                    severity="HIGH",
                    resource=f"aws_s3_bucket.{bucket_name}",
                    message="S3 bucket missing public access block",
                    remediation="Add aws_s3_bucket_public_access_block resource"
                ))
 
            # Check for encryption
            if not self._has_encryption(plan, bucket_name):
                violations.append(PolicyViolation(
                    rule_id="S3-002",
                    severity="HIGH",
                    resource=f"aws_s3_bucket.{bucket_name}",
                    message="S3 bucket missing server-side encryption",
                    remediation="Add aws_s3_bucket_server_side_encryption_configuration"
                ))
 
            # Check for versioning
            if not self._has_versioning(plan, bucket_name):
                violations.append(PolicyViolation(
                    rule_id="S3-003",
                    severity="MEDIUM",
                    resource=f"aws_s3_bucket.{bucket_name}",
                    message="S3 bucket versioning not enabled",
                    remediation="Add aws_s3_bucket_versioning resource"
                ))
 
            # Check for logging
            if not self._has_logging(plan, bucket_name):
                violations.append(PolicyViolation(
                    rule_id="S3-004",
                    severity="MEDIUM",
                    resource=f"aws_s3_bucket.{bucket_name}",
                    message="S3 bucket access logging not enabled",
                    remediation="Add aws_s3_bucket_logging resource"
                ))
 
        return violations
 
    def _check_iam_security(self, plan: Dict) -> List[PolicyViolation]:
        """Check IAM configuration security."""
        violations = []
 
        for resource in self._get_resources(plan, "aws_iam_role"):
            role_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            assume_role_policy = config.get("assume_role_policy", "{}")
            if isinstance(assume_role_policy, str):
                assume_role_policy = json.loads(assume_role_policy)
 
            # Check for overly permissive trust
            for statement in assume_role_policy.get("Statement", []):
                principal = statement.get("Principal", {})
                if principal == "*" or principal.get("AWS") == "*":
                    violations.append(PolicyViolation(
                        rule_id="IAM-001",
                        severity="CRITICAL",
                        resource=f"aws_iam_role.{role_name}",
                        message="IAM role has overly permissive trust policy",
                        remediation="Restrict Principal to specific accounts/services"
                    ))
 
        for resource in self._get_resources(plan, "aws_iam_policy"):
            policy_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            policy_doc = config.get("policy", "{}")
            if isinstance(policy_doc, str):
                policy_doc = json.loads(policy_doc)
 
            for statement in policy_doc.get("Statement", []):
                # Check for admin access
                actions = statement.get("Action", [])
                if isinstance(actions, str):
                    actions = [actions]
 
                resources = statement.get("Resource", [])
                if isinstance(resources, str):
                    resources = [resources]
 
                if "*" in actions and "*" in resources:
                    violations.append(PolicyViolation(
                        rule_id="IAM-002",
                        severity="CRITICAL",
                        resource=f"aws_iam_policy.{policy_name}",
                        message="IAM policy grants full admin access",
                        remediation="Apply least privilege - restrict actions and resources"
                    ))
 
                # Check for wildcard resources with sensitive actions
                sensitive_actions = [
                    "iam:*", "kms:*", "s3:*", "ec2:*",
                    "lambda:*", "rds:*", "secretsmanager:*"
                ]
                for action in actions:
                    if action in sensitive_actions and "*" in resources:
                        violations.append(PolicyViolation(
                            rule_id="IAM-003",
                            severity="HIGH",
                            resource=f"aws_iam_policy.{policy_name}",
                            message=f"IAM policy grants {action} on all resources",
                            remediation="Restrict Resource to specific ARNs"
                        ))
 
        return violations
 
    def _check_network_security(self, plan: Dict) -> List[PolicyViolation]:
        """Check network security configurations."""
        violations = []
 
        for resource in self._get_resources(plan, "aws_security_group"):
            sg_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            # Check ingress rules
            for ingress in config.get("ingress", []):
                cidr_blocks = ingress.get("cidr_blocks", [])
                from_port = ingress.get("from_port", 0)
                to_port = ingress.get("to_port", 65535)
 
                # Check for 0.0.0.0/0 on sensitive ports
                if "0.0.0.0/0" in cidr_blocks:
                    sensitive_ports = [22, 3389, 3306, 5432, 27017, 6379]
                    for port in sensitive_ports:
                        if from_port <= port <= to_port:
                            violations.append(PolicyViolation(
                                rule_id="NET-001",
                                severity="CRITICAL",
                                resource=f"aws_security_group.{sg_name}",
                                message=f"Security group allows 0.0.0.0/0 access to port {port}",
                                remediation="Restrict CIDR to specific IP ranges"
                            ))
 
            # Check for unrestricted egress (warning only)
            for egress in config.get("egress", []):
                if "0.0.0.0/0" in egress.get("cidr_blocks", []):
                    if egress.get("from_port") == 0 and egress.get("to_port") == 0:
                        violations.append(PolicyViolation(
                            rule_id="NET-002",
                            severity="LOW",
                            resource=f"aws_security_group.{sg_name}",
                            message="Security group allows unrestricted egress",
                            remediation="Consider restricting egress to required ports"
                        ))
 
        return violations
 
    def _check_encryption(self, plan: Dict) -> List[PolicyViolation]:
        """Check encryption configurations."""
        violations = []
 
        # RDS encryption
        for resource in self._get_resources(plan, "aws_db_instance"):
            db_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            if not config.get("storage_encrypted", False):
                violations.append(PolicyViolation(
                    rule_id="ENC-001",
                    severity="HIGH",
                    resource=f"aws_db_instance.{db_name}",
                    message="RDS instance storage encryption not enabled",
                    remediation="Set storage_encrypted = true"
                ))
 
            if config.get("publicly_accessible", False):
                violations.append(PolicyViolation(
                    rule_id="ENC-002",
                    severity="CRITICAL",
                    resource=f"aws_db_instance.{db_name}",
                    message="RDS instance is publicly accessible",
                    remediation="Set publicly_accessible = false"
                ))
 
        # EBS encryption
        for resource in self._get_resources(plan, "aws_ebs_volume"):
            vol_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            if not config.get("encrypted", False):
                violations.append(PolicyViolation(
                    rule_id="ENC-003",
                    severity="HIGH",
                    resource=f"aws_ebs_volume.{vol_name}",
                    message="EBS volume encryption not enabled",
                    remediation="Set encrypted = true"
                ))
 
        return violations
 
    def _get_resources(self, plan: Dict, resource_type: str) -> List[Dict]:
        """Extract resources of a specific type from plan."""
        resources = []
 
        planned_values = plan.get("planned_values", {})
        root_module = planned_values.get("root_module", {})
 
        for resource in root_module.get("resources", []):
            if resource.get("type") == resource_type:
                resources.append(resource)
 
        # Check child modules
        for module in root_module.get("child_modules", []):
            for resource in module.get("resources", []):
                if resource.get("type") == resource_type:
                    resources.append(resource)
 
        return resources
 
    def _has_public_access_block(self, plan: Dict, bucket_name: str) -> bool:
        """Check if bucket has public access block."""
        for resource in self._get_resources(plan, "aws_s3_bucket_public_access_block"):
            if bucket_name in str(resource.get("values", {}).get("bucket", "")):
                return True
        return False
 
    def _has_encryption(self, plan: Dict, bucket_name: str) -> bool:
        """Check if bucket has encryption configured."""
        for resource in self._get_resources(plan, "aws_s3_bucket_server_side_encryption_configuration"):
            if bucket_name in str(resource.get("values", {}).get("bucket", "")):
                return True
        return False
 
    def _has_versioning(self, plan: Dict, bucket_name: str) -> bool:
        """Check if bucket has versioning enabled."""
        for resource in self._get_resources(plan, "aws_s3_bucket_versioning"):
            if bucket_name in str(resource.get("values", {}).get("bucket", "")):
                config = resource.get("values", {}).get("versioning_configuration", [{}])
                if config and config[0].get("status") == "Enabled":
                    return True
        return False
 
    def _has_logging(self, plan: Dict, bucket_name: str) -> bool:
        """Check if bucket has logging enabled."""
        for resource in self._get_resources(plan, "aws_s3_bucket_logging"):
            if bucket_name in str(resource.get("values", {}).get("bucket", "")):
                return True
        return False
 
 
def scan_terraform_directory(directory: str) -> Dict[str, Any]:
    """Scan a Terraform directory and return violations."""
    import tempfile
 
    # Generate plan
    with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
        plan_file = f.name
 
    try:
        # Initialize and plan
        subprocess.run(
            ["terraform", "init"],
            cwd=directory,
            capture_output=True,
            check=True
        )
 
        subprocess.run(
            ["terraform", "plan", "-out=tfplan"],
            cwd=directory,
            capture_output=True,
            check=True
        )
 
        subprocess.run(
            ["terraform", "show", "-json", "tfplan"],
            cwd=directory,
            capture_output=True,
            check=True
        )
 
        with open(f"{directory}/tfplan.json") as f:
            plan_json = json.load(f)
 
        # Scan plan
        scanner = TerraformSecurityScanner()
        violations = scanner.scan_plan(plan_json)
 
        return {
            "total_violations": len(violations),
            "critical": len([v for v in violations if v.severity == "CRITICAL"]),
            "high": len([v for v in violations if v.severity == "HIGH"]),
            "medium": len([v for v in violations if v.severity == "MEDIUM"]),
            "low": len([v for v in violations if v.severity == "LOW"]),
            "violations": [
                {
                    "rule_id": v.rule_id,
                    "severity": v.severity,
                    "resource": v.resource,
                    "message": v.message,
                    "remediation": v.remediation
                }
                for v in violations
            ]
        }
 
    finally:
        Path(plan_file).unlink(missing_ok=True)

CI/CD Integration

GitHub Actions Workflow

# .github/workflows/iac-security.yaml
name: IaC Security Scan
 
on:
  pull_request:
    paths:
      - 'terraform/**'
      - 'cloudformation/**'
  push:
    branches:
      - main
 
permissions:
  contents: read
  pull-requests: write
  security-events: write
 
jobs:
  terraform-security:
    name: Terraform Security Scan
    runs-on: ubuntu-latest
 
    steps:
      - name: Checkout
        uses: actions/checkout@v4
 
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: 1.6.0
 
      - name: Terraform Format Check
        run: terraform fmt -check -recursive terraform/
 
      - name: TFLint
        uses: terraform-linters/setup-tflint@v4
 
      - name: Run TFLint
        run: |
          cd terraform
          tflint --init
          tflint --format=sarif > ../tflint-results.sarif
 
      - name: Checkov Scan
        uses: bridgecrewio/checkov-action@v12
        with:
          directory: terraform/
          framework: terraform
          output_format: sarif
          output_file_path: checkov-results.sarif
          soft_fail: false
          skip_check: CKV_AWS_144  # Skip cross-region replication check
 
      - name: Terrascan
        uses: tenable/terrascan-action@v1
        with:
          iac_type: terraform
          iac_dir: terraform/
          policy_type: aws
          sarif_upload: true
 
      - name: Upload SARIF Results
        uses: github/codeql-action/upload-sarif@v2
        if: always()
        with:
          sarif_file: |
            tflint-results.sarif
            checkov-results.sarif
 
      - name: Comment PR with Results
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v7
        with:
          script: |
            const fs = require('fs');
 
            let comment = '## 🔒 IaC Security Scan Results\n\n';
 
            // Parse and summarize results
            const checkovResults = JSON.parse(fs.readFileSync('checkov-results.sarif'));
            const violations = checkovResults.runs[0].results || [];
 
            const critical = violations.filter(v => v.level === 'error').length;
            const warning = violations.filter(v => v.level === 'warning').length;
 
            if (critical > 0) {
              comment += `❌ **${critical} Critical Issues Found**\n\n`;
            } else if (warning > 0) {
              comment += `⚠️ **${warning} Warnings Found**\n\n`;
            } else {
              comment += `✅ **No Security Issues Found**\n\n`;
            }
 
            // Add details
            for (const violation of violations.slice(0, 10)) {
              comment += `- **${violation.ruleId}**: ${violation.message.text}\n`;
              comment += `  - Location: \`${violation.locations[0].physicalLocation.artifactLocation.uri}\`\n`;
            }
 
            if (violations.length > 10) {
              comment += `\n... and ${violations.length - 10} more issues\n`;
            }
 
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: comment
            });
 
  cloudformation-security:
    name: CloudFormation Security Scan
    runs-on: ubuntu-latest
 
    steps:
      - name: Checkout
        uses: actions/checkout@v4
 
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
 
      - name: Install cfn-lint
        run: pip install cfn-lint cfn-nag-sarif
 
      - name: CFN Lint
        run: |
          cfn-lint cloudformation/**/*.yaml -f sarif > cfn-lint-results.sarif
 
      - name: CFN Nag
        uses: stelligent/cfn_nag@master
        with:
          input_path: cloudformation/
          output_format: sarif
 
      - name: Upload SARIF Results
        uses: github/codeql-action/upload-sarif@v2
        if: always()
        with:
          sarif_file: cfn-lint-results.sarif

Drift Detection and Remediation

# drift_detection.py
"""
Infrastructure drift detection and automated remediation.
"""
 
import json
import boto3
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass
import subprocess
 
 
@dataclass
class DriftItem:
    """Represents a drifted resource."""
    resource_type: str
    resource_id: str
    expected_value: str
    actual_value: str
    attribute: str
    severity: str
 
 
class DriftDetector:
    """Detect and remediate infrastructure drift."""
 
    def __init__(self, state_bucket: str, state_key: str):
        self.s3 = boto3.client('s3')
        self.state_bucket = state_bucket
        self.state_key = state_key
 
    def detect_drift(self) -> List[DriftItem]:
        """Detect drift between Terraform state and actual infrastructure."""
        # Run terraform refresh and plan
        result = subprocess.run(
            ["terraform", "plan", "-detailed-exitcode", "-json"],
            capture_output=True,
            text=True
        )
 
        drift_items = []
 
        if result.returncode == 2:  # Changes detected
            for line in result.stdout.split('\n'):
                if not line.strip():
                    continue
 
                try:
                    entry = json.loads(line)
                    if entry.get("type") == "resource_drift":
                        drift_items.append(self._parse_drift(entry))
                except json.JSONDecodeError:
                    continue
 
        return drift_items
 
    def _parse_drift(self, entry: Dict) -> DriftItem:
        """Parse drift entry from Terraform output."""
        change = entry.get("change", {})
        resource = change.get("resource", {})
 
        before = change.get("before", {})
        after = change.get("after", {})
 
        # Find the changed attribute
        changed_attrs = []
        for key in set(list(before.keys()) + list(after.keys())):
            if before.get(key) != after.get(key):
                changed_attrs.append(key)
 
        return DriftItem(
            resource_type=resource.get("type", "unknown"),
            resource_id=resource.get("name", "unknown"),
            expected_value=str(before),
            actual_value=str(after),
            attribute=", ".join(changed_attrs),
            severity=self._assess_severity(resource.get("type"), changed_attrs)
        )
 
    def _assess_severity(self, resource_type: str, attributes: List[str]) -> str:
        """Assess drift severity based on resource type and attributes."""
        critical_combinations = {
            "aws_security_group": ["ingress", "egress"],
            "aws_iam_role": ["assume_role_policy"],
            "aws_iam_policy": ["policy"],
            "aws_s3_bucket_public_access_block": ["block_public_acls", "block_public_policy"],
        }
 
        high_combinations = {
            "aws_s3_bucket_server_side_encryption_configuration": ["rule"],
            "aws_db_instance": ["publicly_accessible", "storage_encrypted"],
        }
 
        if resource_type in critical_combinations:
            if any(attr in critical_combinations[resource_type] for attr in attributes):
                return "CRITICAL"
 
        if resource_type in high_combinations:
            if any(attr in high_combinations[resource_type] for attr in attributes):
                return "HIGH"
 
        return "MEDIUM"
 
    def remediate_drift(self, drift_items: List[DriftItem], auto_remediate: bool = False) -> Dict:
        """Remediate detected drift."""
        remediation_results = {
            "timestamp": datetime.utcnow().isoformat(),
            "total_drift": len(drift_items),
            "remediated": 0,
            "failed": 0,
            "skipped": 0,
            "details": []
        }
 
        for item in drift_items:
            if item.severity == "CRITICAL" and not auto_remediate:
                # Critical drift requires manual review
                remediation_results["skipped"] += 1
                remediation_results["details"].append({
                    "resource": f"{item.resource_type}.{item.resource_id}",
                    "status": "SKIPPED",
                    "reason": "Critical drift requires manual review"
                })
                continue
 
            try:
                # Apply terraform to remediate
                result = subprocess.run(
                    ["terraform", "apply", "-auto-approve",
                     f"-target={item.resource_type}.{item.resource_id}"],
                    capture_output=True,
                    text=True,
                    timeout=300
                )
 
                if result.returncode == 0:
                    remediation_results["remediated"] += 1
                    remediation_results["details"].append({
                        "resource": f"{item.resource_type}.{item.resource_id}",
                        "status": "REMEDIATED"
                    })
                else:
                    raise Exception(result.stderr)
 
            except Exception as e:
                remediation_results["failed"] += 1
                remediation_results["details"].append({
                    "resource": f"{item.resource_type}.{item.resource_id}",
                    "status": "FAILED",
                    "error": str(e)
                })
 
        return remediation_results
 
    def schedule_drift_check(self, schedule: str = "rate(1 hour)"):
        """Create EventBridge rule for scheduled drift detection."""
        events = boto3.client('events')
        lambda_client = boto3.client('lambda')
 
        # Create or update rule
        events.put_rule(
            Name='terraform-drift-detection',
            ScheduleExpression=schedule,
            State='ENABLED',
            Description='Scheduled Terraform drift detection'
        )
 
        # Add Lambda target
        events.put_targets(
            Rule='terraform-drift-detection',
            Targets=[{
                'Id': 'drift-detector-lambda',
                'Arn': f'arn:aws:lambda:{boto3.session.Session().region_name}:'
                       f'{boto3.client("sts").get_caller_identity()["Account"]}:'
                       f'function:terraform-drift-detector'
            }]
        )
 
 
# Lambda handler for scheduled drift detection
def lambda_handler(event, context):
    """AWS Lambda handler for drift detection."""
    import os
 
    detector = DriftDetector(
        state_bucket=os.environ['STATE_BUCKET'],
        state_key=os.environ['STATE_KEY']
    )
 
    drift_items = detector.detect_drift()
 
    if drift_items:
        # Send alert
        sns = boto3.client('sns')
        sns.publish(
            TopicArn=os.environ['ALERT_TOPIC_ARN'],
            Subject='Infrastructure Drift Detected',
            Message=json.dumps({
                "drift_count": len(drift_items),
                "critical": len([d for d in drift_items if d.severity == "CRITICAL"]),
                "items": [
                    {
                        "resource": f"{d.resource_type}.{d.resource_id}",
                        "attribute": d.attribute,
                        "severity": d.severity
                    }
                    for d in drift_items
                ]
            }, indent=2)
        )
 
    return {
        "statusCode": 200,
        "body": json.dumps({
            "drift_detected": len(drift_items) > 0,
            "drift_count": len(drift_items)
        })
    }

Conclusion

Securing Infrastructure as Code requires a multi-layered approach combining secure coding patterns, policy enforcement, automated scanning, and continuous drift detection. The key practices covered include:

  1. Secure module design with encryption, access controls, and logging built-in
  2. Policy as Code with OPA and security scanners like Checkov and cfn-nag
  3. CI/CD integration blocking insecure deployments before they reach production
  4. Drift detection identifying and remediating configuration drift automatically

By implementing these practices, organizations can achieve infrastructure security at scale while maintaining the agility benefits of Infrastructure as Code.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.