Infrastructure as Code Security: Terraform and CloudFormation Hardening
Infrastructure as Code (IaC) has revolutionized how organizations provision and manage cloud resources. However, misconfigurations in IaC templates are among the leading causes of cloud security breaches. This guide covers comprehensive security practices for Terraform and CloudFormation deployments.
The IaC Security Challenge
A 2024 study found that 72% of organizations had at least one critical misconfiguration in their IaC templates, with common issues including:
- Overly permissive IAM policies
- Unencrypted storage resources
- Public network exposure
- Missing logging and monitoring
- Hardcoded secrets
Security-First Terraform Patterns
Secure Provider Configuration
# versions.tf - Lock provider versions
terraform {
required_version = ">= 1.6.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.31.0"
}
random = {
source = "hashicorp/random"
version = "~> 3.6.0"
}
}
# Secure state backend
backend "s3" {
bucket = "company-terraform-state"
key = "production/terraform.tfstate"
region = "us-east-1"
encrypt = true
dynamodb_table = "terraform-state-lock"
# Require specific role for state access
role_arn = "arn:aws:iam::123456789012:role/TerraformStateAccess"
}
}
# Provider with security defaults
provider "aws" {
region = var.aws_region
# Require specific role assumption
assume_role {
role_arn = var.deployment_role_arn
session_name = "TerraformDeployment"
external_id = var.external_id
}
default_tags {
tags = {
Environment = var.environment
ManagedBy = "Terraform"
SecurityReview = "Required"
CostCenter = var.cost_center
}
}
}Secure S3 Bucket Module
# modules/secure-s3/main.tf
resource "aws_s3_bucket" "secure" {
bucket = var.bucket_name
# Force destroy protection for production
force_destroy = var.environment != "production"
tags = merge(var.tags, {
DataClassification = var.data_classification
})
}
# Block all public access
resource "aws_s3_bucket_public_access_block" "secure" {
bucket = aws_s3_bucket.secure.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
# Server-side encryption with KMS
resource "aws_s3_bucket_server_side_encryption_configuration" "secure" {
bucket = aws_s3_bucket.secure.id
rule {
apply_server_side_encryption_by_default {
kms_master_key_id = var.kms_key_arn
sse_algorithm = "aws:kms"
}
bucket_key_enabled = true
}
}
# Versioning for data protection
resource "aws_s3_bucket_versioning" "secure" {
bucket = aws_s3_bucket.secure.id
versioning_configuration {
status = "Enabled"
}
}
# Lifecycle rules for cost and compliance
resource "aws_s3_bucket_lifecycle_configuration" "secure" {
bucket = aws_s3_bucket.secure.id
rule {
id = "transition-to-glacier"
status = "Enabled"
transition {
days = 90
storage_class = "GLACIER"
}
noncurrent_version_transition {
noncurrent_days = 30
storage_class = "GLACIER"
}
noncurrent_version_expiration {
noncurrent_days = var.retention_days
}
}
}
# Access logging
resource "aws_s3_bucket_logging" "secure" {
bucket = aws_s3_bucket.secure.id
target_bucket = var.logging_bucket_id
target_prefix = "s3-access-logs/${var.bucket_name}/"
}
# Bucket policy enforcing TLS
resource "aws_s3_bucket_policy" "secure" {
bucket = aws_s3_bucket.secure.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "EnforceTLS"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.secure.arn,
"${aws_s3_bucket.secure.arn}/*"
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
},
{
Sid = "EnforceEncryption"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.secure.arn}/*"
Condition = {
StringNotEquals = {
"s3:x-amz-server-side-encryption" = "aws:kms"
}
}
}
]
})
}
# Output for dependency management
output "bucket_arn" {
value = aws_s3_bucket.secure.arn
description = "ARN of the secure S3 bucket"
}
output "bucket_id" {
value = aws_s3_bucket.secure.id
description = "ID of the secure S3 bucket"
}Secure IAM Role Module
# modules/secure-iam-role/main.tf
locals {
# Validate that role isn't overly permissive
has_admin_access = contains(var.managed_policy_arns, "arn:aws:iam::aws:policy/AdministratorAccess")
has_wildcard_resource = can(regex("\"Resource\"\\s*:\\s*\"\\*\"", jsonencode(var.inline_policy)))
}
# Fail-safe validation
resource "null_resource" "security_validation" {
count = local.has_admin_access && var.environment == "production" ? "ADMIN_ACCESS_NOT_ALLOWED_IN_PRODUCTION" : 0
}
resource "aws_iam_role" "secure" {
name = var.role_name
max_session_duration = var.max_session_duration
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
Service = var.trusted_services
AWS = var.trusted_accounts
}
Action = "sts:AssumeRole"
Condition = merge(
# Require external ID for cross-account
length(var.trusted_accounts) > 0 ? {
StringEquals = {
"sts:ExternalId" = var.external_id
}
} : {},
# Require MFA for human users
var.require_mfa ? {
Bool = {
"aws:MultiFactorAuthPresent" = "true"
}
} : {},
# IP restriction
length(var.allowed_ip_ranges) > 0 ? {
IpAddress = {
"aws:SourceIp" = var.allowed_ip_ranges
}
} : {}
)
}
]
})
tags = var.tags
}
# Permission boundary enforcement
resource "aws_iam_role_policy_attachment" "permission_boundary" {
count = var.permission_boundary_arn != null ? 1 : 0
role = aws_iam_role.secure.name
policy_arn = var.permission_boundary_arn
}
# Inline policy with least privilege
resource "aws_iam_role_policy" "inline" {
count = var.inline_policy != null ? 1 : 0
name = "${var.role_name}-policy"
role = aws_iam_role.secure.id
policy = var.inline_policy
}
# Managed policy attachments
resource "aws_iam_role_policy_attachment" "managed" {
for_each = toset(var.managed_policy_arns)
role = aws_iam_role.secure.name
policy_arn = each.value
}CloudFormation Security Patterns
Secure Stack Template
# secure-infrastructure.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Secure infrastructure stack with security best practices'
Metadata:
cfn-lint:
config:
regions:
- us-east-1
- us-west-2
AWS::CloudFormation::Interface:
ParameterGroups:
- Label:
default: Security Configuration
Parameters:
- Environment
- KMSKeyArn
- AllowedCIDR
Parameters:
Environment:
Type: String
AllowedValues:
- development
- staging
- production
Default: development
KMSKeyArn:
Type: String
Description: ARN of KMS key for encryption
AllowedPattern: '^arn:aws:kms:[a-z0-9-]+:[0-9]+:key/[a-f0-9-]+$'
AllowedCIDR:
Type: String
Description: CIDR range for access
AllowedPattern: '^([0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}$'
Default: '10.0.0.0/8'
Conditions:
IsProduction: !Equals [!Ref Environment, 'production']
Rules:
ProductionRequiresKMS:
RuleCondition: !Equals [!Ref Environment, 'production']
Assertions:
- Assert: !Not [!Equals [!Ref KMSKeyArn, '']]
AssertDescription: 'KMS key is required for production'
Resources:
# Secure VPC with flow logs
SecureVPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: !Ref AllowedCIDR
EnableDnsHostnames: true
EnableDnsSupport: true
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-vpc'
- Key: Environment
Value: !Ref Environment
VPCFlowLog:
Type: AWS::EC2::FlowLog
Properties:
ResourceId: !Ref SecureVPC
ResourceType: VPC
TrafficType: ALL
LogDestinationType: cloud-watch-logs
LogGroupName: !Sub '/aws/vpc/${AWS::StackName}-flow-logs'
MaxAggregationInterval: 60
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-flow-log'
FlowLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/vpc/${AWS::StackName}-flow-logs'
RetentionInDays: !If [IsProduction, 365, 30]
KmsKeyId: !If [IsProduction, !Ref KMSKeyArn, !Ref 'AWS::NoValue']
# Secure Security Group
SecureSecurityGroup:
Type: AWS::EC2::SecurityGroup
Metadata:
cfn_nag:
rules_to_suppress:
- id: W5
reason: 'Egress to 0.0.0.0/0 required for updates'
Properties:
GroupDescription: Secure security group with minimal access
VpcId: !Ref SecureVPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 443
ToPort: 443
CidrIp: !Ref AllowedCIDR
Description: HTTPS from allowed CIDR
SecurityGroupEgress:
- IpProtocol: tcp
FromPort: 443
ToPort: 443
CidrIp: 0.0.0.0/0
Description: HTTPS outbound
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-sg'
# Encrypted RDS Instance
SecureDatabase:
Type: AWS::RDS::DBInstance
DeletionPolicy: !If [IsProduction, Retain, Delete]
UpdateReplacePolicy: !If [IsProduction, Retain, Delete]
Properties:
DBInstanceIdentifier: !Sub '${AWS::StackName}-db'
DBInstanceClass: db.t3.medium
Engine: postgres
EngineVersion: '15'
MasterUsername: !Sub '{{resolve:secretsmanager:${DatabaseSecret}:SecretString:username}}'
MasterUserPassword: !Sub '{{resolve:secretsmanager:${DatabaseSecret}:SecretString:password}}'
StorageEncrypted: true
KmsKeyId: !Ref KMSKeyArn
MultiAZ: !If [IsProduction, true, false]
PubliclyAccessible: false
EnableIAMDatabaseAuthentication: true
DeletionProtection: !If [IsProduction, true, false]
BackupRetentionPeriod: !If [IsProduction, 35, 7]
EnableCloudwatchLogsExports:
- postgresql
- upgrade
VPCSecurityGroups:
- !Ref SecureSecurityGroup
Tags:
- Key: Environment
Value: !Ref Environment
# Secrets Manager for database credentials
DatabaseSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub '${AWS::StackName}/database/credentials'
Description: Database credentials
KmsKeyId: !Ref KMSKeyArn
GenerateSecretString:
SecretStringTemplate: '{"username": "dbadmin"}'
GenerateStringKey: password
PasswordLength: 32
ExcludeCharacters: '"@/\'
Tags:
- Key: Environment
Value: !Ref Environment
# Secret rotation
SecretRotationSchedule:
Type: AWS::SecretsManager::RotationSchedule
Condition: IsProduction
Properties:
SecretId: !Ref DatabaseSecret
RotationRules:
AutomaticallyAfterDays: 30
Outputs:
VPCId:
Description: VPC ID
Value: !Ref SecureVPC
Export:
Name: !Sub '${AWS::StackName}-VPCId'
SecurityGroupId:
Description: Security Group ID
Value: !Ref SecureSecurityGroup
Export:
Name: !Sub '${AWS::StackName}-SecurityGroupId'Policy as Code with OPA
Terraform Security Policies
# opa_policies/terraform_security.py
"""
Open Policy Agent policies for Terraform security scanning.
"""
import json
import subprocess
from dataclasses import dataclass
from typing import List, Dict, Any
from pathlib import Path
@dataclass
class PolicyViolation:
"""Represents a policy violation."""
rule_id: str
severity: str
resource: str
message: str
remediation: str
class TerraformSecurityScanner:
"""Scan Terraform plans against security policies."""
def __init__(self, policy_dir: str = "policies"):
self.policy_dir = Path(policy_dir)
self.policies = self._load_policies()
def _load_policies(self) -> Dict[str, Any]:
"""Load OPA policies from directory."""
policies = {}
for policy_file in self.policy_dir.glob("*.rego"):
policies[policy_file.stem] = policy_file.read_text()
return policies
def scan_plan(self, plan_json: Dict[str, Any]) -> List[PolicyViolation]:
"""Scan Terraform plan JSON against all policies."""
violations = []
# S3 bucket security checks
violations.extend(self._check_s3_security(plan_json))
# IAM security checks
violations.extend(self._check_iam_security(plan_json))
# Network security checks
violations.extend(self._check_network_security(plan_json))
# Encryption checks
violations.extend(self._check_encryption(plan_json))
return violations
def _check_s3_security(self, plan: Dict) -> List[PolicyViolation]:
"""Check S3 bucket security configurations."""
violations = []
for resource in self._get_resources(plan, "aws_s3_bucket"):
bucket_name = resource.get("name", "unknown")
config = resource.get("values", {})
# Check for public access
if not self._has_public_access_block(plan, bucket_name):
violations.append(PolicyViolation(
rule_id="S3-001",
severity="HIGH",
resource=f"aws_s3_bucket.{bucket_name}",
message="S3 bucket missing public access block",
remediation="Add aws_s3_bucket_public_access_block resource"
))
# Check for encryption
if not self._has_encryption(plan, bucket_name):
violations.append(PolicyViolation(
rule_id="S3-002",
severity="HIGH",
resource=f"aws_s3_bucket.{bucket_name}",
message="S3 bucket missing server-side encryption",
remediation="Add aws_s3_bucket_server_side_encryption_configuration"
))
# Check for versioning
if not self._has_versioning(plan, bucket_name):
violations.append(PolicyViolation(
rule_id="S3-003",
severity="MEDIUM",
resource=f"aws_s3_bucket.{bucket_name}",
message="S3 bucket versioning not enabled",
remediation="Add aws_s3_bucket_versioning resource"
))
# Check for logging
if not self._has_logging(plan, bucket_name):
violations.append(PolicyViolation(
rule_id="S3-004",
severity="MEDIUM",
resource=f"aws_s3_bucket.{bucket_name}",
message="S3 bucket access logging not enabled",
remediation="Add aws_s3_bucket_logging resource"
))
return violations
def _check_iam_security(self, plan: Dict) -> List[PolicyViolation]:
"""Check IAM configuration security."""
violations = []
for resource in self._get_resources(plan, "aws_iam_role"):
role_name = resource.get("name", "unknown")
config = resource.get("values", {})
assume_role_policy = config.get("assume_role_policy", "{}")
if isinstance(assume_role_policy, str):
assume_role_policy = json.loads(assume_role_policy)
# Check for overly permissive trust
for statement in assume_role_policy.get("Statement", []):
principal = statement.get("Principal", {})
if principal == "*" or principal.get("AWS") == "*":
violations.append(PolicyViolation(
rule_id="IAM-001",
severity="CRITICAL",
resource=f"aws_iam_role.{role_name}",
message="IAM role has overly permissive trust policy",
remediation="Restrict Principal to specific accounts/services"
))
for resource in self._get_resources(plan, "aws_iam_policy"):
policy_name = resource.get("name", "unknown")
config = resource.get("values", {})
policy_doc = config.get("policy", "{}")
if isinstance(policy_doc, str):
policy_doc = json.loads(policy_doc)
for statement in policy_doc.get("Statement", []):
# Check for admin access
actions = statement.get("Action", [])
if isinstance(actions, str):
actions = [actions]
resources = statement.get("Resource", [])
if isinstance(resources, str):
resources = [resources]
if "*" in actions and "*" in resources:
violations.append(PolicyViolation(
rule_id="IAM-002",
severity="CRITICAL",
resource=f"aws_iam_policy.{policy_name}",
message="IAM policy grants full admin access",
remediation="Apply least privilege - restrict actions and resources"
))
# Check for wildcard resources with sensitive actions
sensitive_actions = [
"iam:*", "kms:*", "s3:*", "ec2:*",
"lambda:*", "rds:*", "secretsmanager:*"
]
for action in actions:
if action in sensitive_actions and "*" in resources:
violations.append(PolicyViolation(
rule_id="IAM-003",
severity="HIGH",
resource=f"aws_iam_policy.{policy_name}",
message=f"IAM policy grants {action} on all resources",
remediation="Restrict Resource to specific ARNs"
))
return violations
def _check_network_security(self, plan: Dict) -> List[PolicyViolation]:
"""Check network security configurations."""
violations = []
for resource in self._get_resources(plan, "aws_security_group"):
sg_name = resource.get("name", "unknown")
config = resource.get("values", {})
# Check ingress rules
for ingress in config.get("ingress", []):
cidr_blocks = ingress.get("cidr_blocks", [])
from_port = ingress.get("from_port", 0)
to_port = ingress.get("to_port", 65535)
# Check for 0.0.0.0/0 on sensitive ports
if "0.0.0.0/0" in cidr_blocks:
sensitive_ports = [22, 3389, 3306, 5432, 27017, 6379]
for port in sensitive_ports:
if from_port <= port <= to_port:
violations.append(PolicyViolation(
rule_id="NET-001",
severity="CRITICAL",
resource=f"aws_security_group.{sg_name}",
message=f"Security group allows 0.0.0.0/0 access to port {port}",
remediation="Restrict CIDR to specific IP ranges"
))
# Check for unrestricted egress (warning only)
for egress in config.get("egress", []):
if "0.0.0.0/0" in egress.get("cidr_blocks", []):
if egress.get("from_port") == 0 and egress.get("to_port") == 0:
violations.append(PolicyViolation(
rule_id="NET-002",
severity="LOW",
resource=f"aws_security_group.{sg_name}",
message="Security group allows unrestricted egress",
remediation="Consider restricting egress to required ports"
))
return violations
def _check_encryption(self, plan: Dict) -> List[PolicyViolation]:
"""Check encryption configurations."""
violations = []
# RDS encryption
for resource in self._get_resources(plan, "aws_db_instance"):
db_name = resource.get("name", "unknown")
config = resource.get("values", {})
if not config.get("storage_encrypted", False):
violations.append(PolicyViolation(
rule_id="ENC-001",
severity="HIGH",
resource=f"aws_db_instance.{db_name}",
message="RDS instance storage encryption not enabled",
remediation="Set storage_encrypted = true"
))
if config.get("publicly_accessible", False):
violations.append(PolicyViolation(
rule_id="ENC-002",
severity="CRITICAL",
resource=f"aws_db_instance.{db_name}",
message="RDS instance is publicly accessible",
remediation="Set publicly_accessible = false"
))
# EBS encryption
for resource in self._get_resources(plan, "aws_ebs_volume"):
vol_name = resource.get("name", "unknown")
config = resource.get("values", {})
if not config.get("encrypted", False):
violations.append(PolicyViolation(
rule_id="ENC-003",
severity="HIGH",
resource=f"aws_ebs_volume.{vol_name}",
message="EBS volume encryption not enabled",
remediation="Set encrypted = true"
))
return violations
def _get_resources(self, plan: Dict, resource_type: str) -> List[Dict]:
"""Extract resources of a specific type from plan."""
resources = []
planned_values = plan.get("planned_values", {})
root_module = planned_values.get("root_module", {})
for resource in root_module.get("resources", []):
if resource.get("type") == resource_type:
resources.append(resource)
# Check child modules
for module in root_module.get("child_modules", []):
for resource in module.get("resources", []):
if resource.get("type") == resource_type:
resources.append(resource)
return resources
def _has_public_access_block(self, plan: Dict, bucket_name: str) -> bool:
"""Check if bucket has public access block."""
for resource in self._get_resources(plan, "aws_s3_bucket_public_access_block"):
if bucket_name in str(resource.get("values", {}).get("bucket", "")):
return True
return False
def _has_encryption(self, plan: Dict, bucket_name: str) -> bool:
"""Check if bucket has encryption configured."""
for resource in self._get_resources(plan, "aws_s3_bucket_server_side_encryption_configuration"):
if bucket_name in str(resource.get("values", {}).get("bucket", "")):
return True
return False
def _has_versioning(self, plan: Dict, bucket_name: str) -> bool:
"""Check if bucket has versioning enabled."""
for resource in self._get_resources(plan, "aws_s3_bucket_versioning"):
if bucket_name in str(resource.get("values", {}).get("bucket", "")):
config = resource.get("values", {}).get("versioning_configuration", [{}])
if config and config[0].get("status") == "Enabled":
return True
return False
def _has_logging(self, plan: Dict, bucket_name: str) -> bool:
"""Check if bucket has logging enabled."""
for resource in self._get_resources(plan, "aws_s3_bucket_logging"):
if bucket_name in str(resource.get("values", {}).get("bucket", "")):
return True
return False
def scan_terraform_directory(directory: str) -> Dict[str, Any]:
"""Scan a Terraform directory and return violations."""
import tempfile
# Generate plan
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
plan_file = f.name
try:
# Initialize and plan
subprocess.run(
["terraform", "init"],
cwd=directory,
capture_output=True,
check=True
)
subprocess.run(
["terraform", "plan", "-out=tfplan"],
cwd=directory,
capture_output=True,
check=True
)
subprocess.run(
["terraform", "show", "-json", "tfplan"],
cwd=directory,
capture_output=True,
check=True
)
with open(f"{directory}/tfplan.json") as f:
plan_json = json.load(f)
# Scan plan
scanner = TerraformSecurityScanner()
violations = scanner.scan_plan(plan_json)
return {
"total_violations": len(violations),
"critical": len([v for v in violations if v.severity == "CRITICAL"]),
"high": len([v for v in violations if v.severity == "HIGH"]),
"medium": len([v for v in violations if v.severity == "MEDIUM"]),
"low": len([v for v in violations if v.severity == "LOW"]),
"violations": [
{
"rule_id": v.rule_id,
"severity": v.severity,
"resource": v.resource,
"message": v.message,
"remediation": v.remediation
}
for v in violations
]
}
finally:
Path(plan_file).unlink(missing_ok=True)CI/CD Integration
GitHub Actions Workflow
# .github/workflows/iac-security.yaml
name: IaC Security Scan
on:
pull_request:
paths:
- 'terraform/**'
- 'cloudformation/**'
push:
branches:
- main
permissions:
contents: read
pull-requests: write
security-events: write
jobs:
terraform-security:
name: Terraform Security Scan
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
with:
terraform_version: 1.6.0
- name: Terraform Format Check
run: terraform fmt -check -recursive terraform/
- name: TFLint
uses: terraform-linters/setup-tflint@v4
- name: Run TFLint
run: |
cd terraform
tflint --init
tflint --format=sarif > ../tflint-results.sarif
- name: Checkov Scan
uses: bridgecrewio/checkov-action@v12
with:
directory: terraform/
framework: terraform
output_format: sarif
output_file_path: checkov-results.sarif
soft_fail: false
skip_check: CKV_AWS_144 # Skip cross-region replication check
- name: Terrascan
uses: tenable/terrascan-action@v1
with:
iac_type: terraform
iac_dir: terraform/
policy_type: aws
sarif_upload: true
- name: Upload SARIF Results
uses: github/codeql-action/upload-sarif@v2
if: always()
with:
sarif_file: |
tflint-results.sarif
checkov-results.sarif
- name: Comment PR with Results
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
let comment = '## 🔒 IaC Security Scan Results\n\n';
// Parse and summarize results
const checkovResults = JSON.parse(fs.readFileSync('checkov-results.sarif'));
const violations = checkovResults.runs[0].results || [];
const critical = violations.filter(v => v.level === 'error').length;
const warning = violations.filter(v => v.level === 'warning').length;
if (critical > 0) {
comment += `❌ **${critical} Critical Issues Found**\n\n`;
} else if (warning > 0) {
comment += `⚠️ **${warning} Warnings Found**\n\n`;
} else {
comment += `✅ **No Security Issues Found**\n\n`;
}
// Add details
for (const violation of violations.slice(0, 10)) {
comment += `- **${violation.ruleId}**: ${violation.message.text}\n`;
comment += ` - Location: \`${violation.locations[0].physicalLocation.artifactLocation.uri}\`\n`;
}
if (violations.length > 10) {
comment += `\n... and ${violations.length - 10} more issues\n`;
}
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: comment
});
cloudformation-security:
name: CloudFormation Security Scan
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install cfn-lint
run: pip install cfn-lint cfn-nag-sarif
- name: CFN Lint
run: |
cfn-lint cloudformation/**/*.yaml -f sarif > cfn-lint-results.sarif
- name: CFN Nag
uses: stelligent/cfn_nag@master
with:
input_path: cloudformation/
output_format: sarif
- name: Upload SARIF Results
uses: github/codeql-action/upload-sarif@v2
if: always()
with:
sarif_file: cfn-lint-results.sarifDrift Detection and Remediation
# drift_detection.py
"""
Infrastructure drift detection and automated remediation.
"""
import json
import boto3
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass
import subprocess
@dataclass
class DriftItem:
"""Represents a drifted resource."""
resource_type: str
resource_id: str
expected_value: str
actual_value: str
attribute: str
severity: str
class DriftDetector:
"""Detect and remediate infrastructure drift."""
def __init__(self, state_bucket: str, state_key: str):
self.s3 = boto3.client('s3')
self.state_bucket = state_bucket
self.state_key = state_key
def detect_drift(self) -> List[DriftItem]:
"""Detect drift between Terraform state and actual infrastructure."""
# Run terraform refresh and plan
result = subprocess.run(
["terraform", "plan", "-detailed-exitcode", "-json"],
capture_output=True,
text=True
)
drift_items = []
if result.returncode == 2: # Changes detected
for line in result.stdout.split('\n'):
if not line.strip():
continue
try:
entry = json.loads(line)
if entry.get("type") == "resource_drift":
drift_items.append(self._parse_drift(entry))
except json.JSONDecodeError:
continue
return drift_items
def _parse_drift(self, entry: Dict) -> DriftItem:
"""Parse drift entry from Terraform output."""
change = entry.get("change", {})
resource = change.get("resource", {})
before = change.get("before", {})
after = change.get("after", {})
# Find the changed attribute
changed_attrs = []
for key in set(list(before.keys()) + list(after.keys())):
if before.get(key) != after.get(key):
changed_attrs.append(key)
return DriftItem(
resource_type=resource.get("type", "unknown"),
resource_id=resource.get("name", "unknown"),
expected_value=str(before),
actual_value=str(after),
attribute=", ".join(changed_attrs),
severity=self._assess_severity(resource.get("type"), changed_attrs)
)
def _assess_severity(self, resource_type: str, attributes: List[str]) -> str:
"""Assess drift severity based on resource type and attributes."""
critical_combinations = {
"aws_security_group": ["ingress", "egress"],
"aws_iam_role": ["assume_role_policy"],
"aws_iam_policy": ["policy"],
"aws_s3_bucket_public_access_block": ["block_public_acls", "block_public_policy"],
}
high_combinations = {
"aws_s3_bucket_server_side_encryption_configuration": ["rule"],
"aws_db_instance": ["publicly_accessible", "storage_encrypted"],
}
if resource_type in critical_combinations:
if any(attr in critical_combinations[resource_type] for attr in attributes):
return "CRITICAL"
if resource_type in high_combinations:
if any(attr in high_combinations[resource_type] for attr in attributes):
return "HIGH"
return "MEDIUM"
def remediate_drift(self, drift_items: List[DriftItem], auto_remediate: bool = False) -> Dict:
"""Remediate detected drift."""
remediation_results = {
"timestamp": datetime.utcnow().isoformat(),
"total_drift": len(drift_items),
"remediated": 0,
"failed": 0,
"skipped": 0,
"details": []
}
for item in drift_items:
if item.severity == "CRITICAL" and not auto_remediate:
# Critical drift requires manual review
remediation_results["skipped"] += 1
remediation_results["details"].append({
"resource": f"{item.resource_type}.{item.resource_id}",
"status": "SKIPPED",
"reason": "Critical drift requires manual review"
})
continue
try:
# Apply terraform to remediate
result = subprocess.run(
["terraform", "apply", "-auto-approve",
f"-target={item.resource_type}.{item.resource_id}"],
capture_output=True,
text=True,
timeout=300
)
if result.returncode == 0:
remediation_results["remediated"] += 1
remediation_results["details"].append({
"resource": f"{item.resource_type}.{item.resource_id}",
"status": "REMEDIATED"
})
else:
raise Exception(result.stderr)
except Exception as e:
remediation_results["failed"] += 1
remediation_results["details"].append({
"resource": f"{item.resource_type}.{item.resource_id}",
"status": "FAILED",
"error": str(e)
})
return remediation_results
def schedule_drift_check(self, schedule: str = "rate(1 hour)"):
"""Create EventBridge rule for scheduled drift detection."""
events = boto3.client('events')
lambda_client = boto3.client('lambda')
# Create or update rule
events.put_rule(
Name='terraform-drift-detection',
ScheduleExpression=schedule,
State='ENABLED',
Description='Scheduled Terraform drift detection'
)
# Add Lambda target
events.put_targets(
Rule='terraform-drift-detection',
Targets=[{
'Id': 'drift-detector-lambda',
'Arn': f'arn:aws:lambda:{boto3.session.Session().region_name}:'
f'{boto3.client("sts").get_caller_identity()["Account"]}:'
f'function:terraform-drift-detector'
}]
)
# Lambda handler for scheduled drift detection
def lambda_handler(event, context):
"""AWS Lambda handler for drift detection."""
import os
detector = DriftDetector(
state_bucket=os.environ['STATE_BUCKET'],
state_key=os.environ['STATE_KEY']
)
drift_items = detector.detect_drift()
if drift_items:
# Send alert
sns = boto3.client('sns')
sns.publish(
TopicArn=os.environ['ALERT_TOPIC_ARN'],
Subject='Infrastructure Drift Detected',
Message=json.dumps({
"drift_count": len(drift_items),
"critical": len([d for d in drift_items if d.severity == "CRITICAL"]),
"items": [
{
"resource": f"{d.resource_type}.{d.resource_id}",
"attribute": d.attribute,
"severity": d.severity
}
for d in drift_items
]
}, indent=2)
)
return {
"statusCode": 200,
"body": json.dumps({
"drift_detected": len(drift_items) > 0,
"drift_count": len(drift_items)
})
}Conclusion
Securing Infrastructure as Code requires a multi-layered approach combining secure coding patterns, policy enforcement, automated scanning, and continuous drift detection. The key practices covered include:
- Secure module design with encryption, access controls, and logging built-in
- Policy as Code with OPA and security scanners like Checkov and cfn-nag
- CI/CD integration blocking insecure deployments before they reach production
- Drift detection identifying and remediating configuration drift automatically
By implementing these practices, organizations can achieve infrastructure security at scale while maintaining the agility benefits of Infrastructure as Code.