Securitatea Infrastructure as Code: Securizarea Terraform si CloudFormation
Infrastructure as Code (IaC) a revolutionat modul in care organizatiile provizioneza si gestioneaza resursele cloud. Totusi, configuratiile gresite din template-urile IaC se numara printre principalele cauze ale breșurilor de securitate in cloud. Acest ghid acopera practici complete de securitate pentru deploymenturile Terraform si CloudFormation.
Provocarea securitatii IaC
Un studiu din 2024 a constatat ca 72% dintre organizatii aveau cel putin o configuratie gresita critica in template-urile lor IaC, cu probleme frecvente precum:
- Politici IAM excesiv de permisive
- Resurse de stocare necriptate
- Expunere publica a retelei
- Lipsa logarii si monitorizarii
- Secrete hardcodate
Patternuri Terraform orientate spre securitate
Configuratie sigura a providerului
# versions.tf - Lock provider versions
terraform {
required_version = ">= 1.6.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.31.0"
}
random = {
source = "hashicorp/random"
version = "~> 3.6.0"
}
}
# Secure state backend
backend "s3" {
bucket = "company-terraform-state"
key = "production/terraform.tfstate"
region = "us-east-1"
encrypt = true
dynamodb_table = "terraform-state-lock"
# Require specific role for state access
role_arn = "arn:aws:iam::123456789012:role/TerraformStateAccess"
}
}
# Provider with security defaults
provider "aws" {
region = var.aws_region
# Require specific role assumption
assume_role {
role_arn = var.deployment_role_arn
session_name = "TerraformDeployment"
external_id = var.external_id
}
default_tags {
tags = {
Environment = var.environment
ManagedBy = "Terraform"
SecurityReview = "Required"
CostCenter = var.cost_center
}
}
}Modul S3 Bucket securizat
# modules/secure-s3/main.tf
resource "aws_s3_bucket" "secure" {
bucket = var.bucket_name
# Force destroy protection for production
force_destroy = var.environment != "production"
tags = merge(var.tags, {
DataClassification = var.data_classification
})
}
# Block all public access
resource "aws_s3_bucket_public_access_block" "secure" {
bucket = aws_s3_bucket.secure.id
block_public_acls = true
block_public_policy = true
ignore_public_acls = true
restrict_public_buckets = true
}
# Server-side encryption with KMS
resource "aws_s3_bucket_server_side_encryption_configuration" "secure" {
bucket = aws_s3_bucket.secure.id
rule {
apply_server_side_encryption_by_default {
kms_master_key_id = var.kms_key_arn
sse_algorithm = "aws:kms"
}
bucket_key_enabled = true
}
}
# Versioning for data protection
resource "aws_s3_bucket_versioning" "secure" {
bucket = aws_s3_bucket.secure.id
versioning_configuration {
status = "Enabled"
}
}
# Lifecycle rules for cost and compliance
resource "aws_s3_bucket_lifecycle_configuration" "secure" {
bucket = aws_s3_bucket.secure.id
rule {
id = "transition-to-glacier"
status = "Enabled"
transition {
days = 90
storage_class = "GLACIER"
}
noncurrent_version_transition {
noncurrent_days = 30
storage_class = "GLACIER"
}
noncurrent_version_expiration {
noncurrent_days = var.retention_days
}
}
}
# Access logging
resource "aws_s3_bucket_logging" "secure" {
bucket = aws_s3_bucket.secure.id
target_bucket = var.logging_bucket_id
target_prefix = "s3-access-logs/${var.bucket_name}/"
}
# Bucket policy enforcing TLS
resource "aws_s3_bucket_policy" "secure" {
bucket = aws_s3_bucket.secure.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Sid = "EnforceTLS"
Effect = "Deny"
Principal = "*"
Action = "s3:*"
Resource = [
aws_s3_bucket.secure.arn,
"${aws_s3_bucket.secure.arn}/*"
]
Condition = {
Bool = {
"aws:SecureTransport" = "false"
}
}
},
{
Sid = "EnforceEncryption"
Effect = "Deny"
Principal = "*"
Action = "s3:PutObject"
Resource = "${aws_s3_bucket.secure.arn}/*"
Condition = {
StringNotEquals = {
"s3:x-amz-server-side-encryption" = "aws:kms"
}
}
}
]
})
}
# Output for dependency management
output "bucket_arn" {
value = aws_s3_bucket.secure.arn
description = "ARN of the secure S3 bucket"
}
output "bucket_id" {
value = aws_s3_bucket.secure.id
description = "ID of the secure S3 bucket"
}Modul IAM Role securizat
# modules/secure-iam-role/main.tf
locals {
# Validate that role isn't overly permissive
has_admin_access = contains(var.managed_policy_arns, "arn:aws:iam::aws:policy/AdministratorAccess")
has_wildcard_resource = can(regex("\"Resource\"\\s*:\\s*\"\\*\"", jsonencode(var.inline_policy)))
}
# Fail-safe validation
resource "null_resource" "security_validation" {
count = local.has_admin_access && var.environment == "production" ? "ADMIN_ACCESS_NOT_ALLOWED_IN_PRODUCTION" : 0
}
resource "aws_iam_role" "secure" {
name = var.role_name
max_session_duration = var.max_session_duration
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
Service = var.trusted_services
AWS = var.trusted_accounts
}
Action = "sts:AssumeRole"
Condition = merge(
# Require external ID for cross-account
length(var.trusted_accounts) > 0 ? {
StringEquals = {
"sts:ExternalId" = var.external_id
}
} : {},
# Require MFA for human users
var.require_mfa ? {
Bool = {
"aws:MultiFactorAuthPresent" = "true"
}
} : {},
# IP restriction
length(var.allowed_ip_ranges) > 0 ? {
IpAddress = {
"aws:SourceIp" = var.allowed_ip_ranges
}
} : {}
)
}
]
})
tags = var.tags
}
# Permission boundary enforcement
resource "aws_iam_role_policy_attachment" "permission_boundary" {
count = var.permission_boundary_arn != null ? 1 : 0
role = aws_iam_role.secure.name
policy_arn = var.permission_boundary_arn
}
# Inline policy with least privilege
resource "aws_iam_role_policy" "inline" {
count = var.inline_policy != null ? 1 : 0
name = "${var.role_name}-policy"
role = aws_iam_role.secure.id
policy = var.inline_policy
}
# Managed policy attachments
resource "aws_iam_role_policy_attachment" "managed" {
for_each = toset(var.managed_policy_arns)
role = aws_iam_role.secure.name
policy_arn = each.value
}Patternuri de securitate CloudFormation
Template de stack securizat
# secure-infrastructure.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Secure infrastructure stack with security best practices'
Metadata:
cfn-lint:
config:
regions:
- us-east-1
- us-west-2
AWS::CloudFormation::Interface:
ParameterGroups:
- Label:
default: Security Configuration
Parameters:
- Environment
- KMSKeyArn
- AllowedCIDR
Parameters:
Environment:
Type: String
AllowedValues:
- development
- staging
- production
Default: development
KMSKeyArn:
Type: String
Description: ARN of KMS key for encryption
AllowedPattern: '^arn:aws:kms:[a-z0-9-]+:[0-9]+:key/[a-f0-9-]+$'
AllowedCIDR:
Type: String
Description: CIDR range for access
AllowedPattern: '^([0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}$'
Default: '10.0.0.0/8'
Conditions:
IsProduction: !Equals [!Ref Environment, 'production']
Rules:
ProductionRequiresKMS:
RuleCondition: !Equals [!Ref Environment, 'production']
Assertions:
- Assert: !Not [!Equals [!Ref KMSKeyArn, '']]
AssertDescription: 'KMS key is required for production'
Resources:
# VPC securizat cu flow logs
SecureVPC:
Type: AWS::EC2::VPC
Properties:
CidrBlock: !Ref AllowedCIDR
EnableDnsHostnames: true
EnableDnsSupport: true
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-vpc'
- Key: Environment
Value: !Ref Environment
VPCFlowLog:
Type: AWS::EC2::FlowLog
Properties:
ResourceId: !Ref SecureVPC
ResourceType: VPC
TrafficType: ALL
LogDestinationType: cloud-watch-logs
LogGroupName: !Sub '/aws/vpc/${AWS::StackName}-flow-logs'
MaxAggregationInterval: 60
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-flow-log'
FlowLogGroup:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/vpc/${AWS::StackName}-flow-logs'
RetentionInDays: !If [IsProduction, 365, 30]
KmsKeyId: !If [IsProduction, !Ref KMSKeyArn, !Ref 'AWS::NoValue']
# Security Group securizat
SecureSecurityGroup:
Type: AWS::EC2::SecurityGroup
Metadata:
cfn_nag:
rules_to_suppress:
- id: W5
reason: 'Egress to 0.0.0.0/0 required for updates'
Properties:
GroupDescription: Secure security group with minimal access
VpcId: !Ref SecureVPC
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 443
ToPort: 443
CidrIp: !Ref AllowedCIDR
Description: HTTPS from allowed CIDR
SecurityGroupEgress:
- IpProtocol: tcp
FromPort: 443
ToPort: 443
CidrIp: 0.0.0.0/0
Description: HTTPS outbound
Tags:
- Key: Name
Value: !Sub '${AWS::StackName}-sg'
# Instanta RDS criptata
SecureDatabase:
Type: AWS::RDS::DBInstance
DeletionPolicy: !If [IsProduction, Retain, Delete]
UpdateReplacePolicy: !If [IsProduction, Retain, Delete]
Properties:
DBInstanceIdentifier: !Sub '${AWS::StackName}-db'
DBInstanceClass: db.t3.medium
Engine: postgres
EngineVersion: '15'
MasterUsername: !Sub '{{resolve:secretsmanager:${DatabaseSecret}:SecretString:username}}'
MasterUserPassword: !Sub '{{resolve:secretsmanager:${DatabaseSecret}:SecretString:password}}'
StorageEncrypted: true
KmsKeyId: !Ref KMSKeyArn
MultiAZ: !If [IsProduction, true, false]
PubliclyAccessible: false
EnableIAMDatabaseAuthentication: true
DeletionProtection: !If [IsProduction, true, false]
BackupRetentionPeriod: !If [IsProduction, 35, 7]
EnableCloudwatchLogsExports:
- postgresql
- upgrade
VPCSecurityGroups:
- !Ref SecureSecurityGroup
Tags:
- Key: Environment
Value: !Ref Environment
# Secrets Manager pentru credentiale baza de date
DatabaseSecret:
Type: AWS::SecretsManager::Secret
Properties:
Name: !Sub '${AWS::StackName}/database/credentials'
Description: Database credentials
KmsKeyId: !Ref KMSKeyArn
GenerateSecretString:
SecretStringTemplate: '{"username": "dbadmin"}'
GenerateStringKey: password
PasswordLength: 32
ExcludeCharacters: '"@/\'
Tags:
- Key: Environment
Value: !Ref Environment
# Rotatie secrete
SecretRotationSchedule:
Type: AWS::SecretsManager::RotationSchedule
Condition: IsProduction
Properties:
SecretId: !Ref DatabaseSecret
RotationRules:
AutomaticallyAfterDays: 30
Outputs:
VPCId:
Description: VPC ID
Value: !Ref SecureVPC
Export:
Name: !Sub '${AWS::StackName}-VPCId'
SecurityGroupId:
Description: Security Group ID
Value: !Ref SecureSecurityGroup
Export:
Name: !Sub '${AWS::StackName}-SecurityGroupId'Policy as Code cu OPA
Politici de securitate Terraform
# opa_policies/terraform_security.py
"""
Open Policy Agent policies for Terraform security scanning.
"""
import json
import subprocess
from dataclasses import dataclass
from typing import List, Dict, Any
from pathlib import Path
@dataclass
class PolicyViolation:
"""Represents a policy violation."""
rule_id: str
severity: str
resource: str
message: str
remediation: str
class TerraformSecurityScanner:
"""Scan Terraform plans against security policies."""
def __init__(self, policy_dir: str = "policies"):
self.policy_dir = Path(policy_dir)
self.policies = self._load_policies()
def _load_policies(self) -> Dict[str, Any]:
"""Load OPA policies from directory."""
policies = {}
for policy_file in self.policy_dir.glob("*.rego"):
policies[policy_file.stem] = policy_file.read_text()
return policies
def scan_plan(self, plan_json: Dict[str, Any]) -> List[PolicyViolation]:
"""Scan Terraform plan JSON against all policies."""
violations = []
# Verificari securitate S3 bucket
violations.extend(self._check_s3_security(plan_json))
# Verificari securitate IAM
violations.extend(self._check_iam_security(plan_json))
# Verificari securitate retea
violations.extend(self._check_network_security(plan_json))
# Verificari criptare
violations.extend(self._check_encryption(plan_json))
return violations
def _check_s3_security(self, plan: Dict) -> List[PolicyViolation]:
"""Verifica configuratiile de securitate ale bucket-urilor S3."""
violations = []
for resource in self._get_resources(plan, "aws_s3_bucket"):
bucket_name = resource.get("name", "unknown")
config = resource.get("values", {})
# Verificare acces public
if not self._has_public_access_block(plan, bucket_name):
violations.append(PolicyViolation(
rule_id="S3-001",
severity="HIGH",
resource=f"aws_s3_bucket.{bucket_name}",
message="S3 bucket missing public access block",
remediation="Add aws_s3_bucket_public_access_block resource"
))
# Verificare criptare
if not self._has_encryption(plan, bucket_name):
violations.append(PolicyViolation(
rule_id="S3-002",
severity="HIGH",
resource=f"aws_s3_bucket.{bucket_name}",
message="S3 bucket missing server-side encryption",
remediation="Add aws_s3_bucket_server_side_encryption_configuration"
))
# Verificare versioning
if not self._has_versioning(plan, bucket_name):
violations.append(PolicyViolation(
rule_id="S3-003",
severity="MEDIUM",
resource=f"aws_s3_bucket.{bucket_name}",
message="S3 bucket versioning not enabled",
remediation="Add aws_s3_bucket_versioning resource"
))
# Verificare logging
if not self._has_logging(plan, bucket_name):
violations.append(PolicyViolation(
rule_id="S3-004",
severity="MEDIUM",
resource=f"aws_s3_bucket.{bucket_name}",
message="S3 bucket access logging not enabled",
remediation="Add aws_s3_bucket_logging resource"
))
return violations
def _check_iam_security(self, plan: Dict) -> List[PolicyViolation]:
"""Verifica securitatea configuratiei IAM."""
violations = []
for resource in self._get_resources(plan, "aws_iam_role"):
role_name = resource.get("name", "unknown")
config = resource.get("values", {})
assume_role_policy = config.get("assume_role_policy", "{}")
if isinstance(assume_role_policy, str):
assume_role_policy = json.loads(assume_role_policy)
# Verificare trust excesiv de permisiv
for statement in assume_role_policy.get("Statement", []):
principal = statement.get("Principal", {})
if principal == "*" or principal.get("AWS") == "*":
violations.append(PolicyViolation(
rule_id="IAM-001",
severity="CRITICAL",
resource=f"aws_iam_role.{role_name}",
message="IAM role has overly permissive trust policy",
remediation="Restrict Principal to specific accounts/services"
))
for resource in self._get_resources(plan, "aws_iam_policy"):
policy_name = resource.get("name", "unknown")
config = resource.get("values", {})
policy_doc = config.get("policy", "{}")
if isinstance(policy_doc, str):
policy_doc = json.loads(policy_doc)
for statement in policy_doc.get("Statement", []):
# Verificare acces admin
actions = statement.get("Action", [])
if isinstance(actions, str):
actions = [actions]
resources = statement.get("Resource", [])
if isinstance(resources, str):
resources = [resources]
if "*" in actions and "*" in resources:
violations.append(PolicyViolation(
rule_id="IAM-002",
severity="CRITICAL",
resource=f"aws_iam_policy.{policy_name}",
message="IAM policy grants full admin access",
remediation="Apply least privilege - restrict actions and resources"
))
# Verificare wildcard resources cu actiuni sensibile
sensitive_actions = [
"iam:*", "kms:*", "s3:*", "ec2:*",
"lambda:*", "rds:*", "secretsmanager:*"
]
for action in actions:
if action in sensitive_actions and "*" in resources:
violations.append(PolicyViolation(
rule_id="IAM-003",
severity="HIGH",
resource=f"aws_iam_policy.{policy_name}",
message=f"IAM policy grants {action} on all resources",
remediation="Restrict Resource to specific ARNs"
))
return violations
def _check_network_security(self, plan: Dict) -> List[PolicyViolation]:
"""Verifica configuratiile de securitate ale retelei."""
violations = []
for resource in self._get_resources(plan, "aws_security_group"):
sg_name = resource.get("name", "unknown")
config = resource.get("values", {})
# Verificare reguli ingress
for ingress in config.get("ingress", []):
cidr_blocks = ingress.get("cidr_blocks", [])
from_port = ingress.get("from_port", 0)
to_port = ingress.get("to_port", 65535)
# Verificare 0.0.0.0/0 pe porturi sensibile
if "0.0.0.0/0" in cidr_blocks:
sensitive_ports = [22, 3389, 3306, 5432, 27017, 6379]
for port in sensitive_ports:
if from_port <= port <= to_port:
violations.append(PolicyViolation(
rule_id="NET-001",
severity="CRITICAL",
resource=f"aws_security_group.{sg_name}",
message=f"Security group allows 0.0.0.0/0 access to port {port}",
remediation="Restrict CIDR to specific IP ranges"
))
# Verificare egress nerestrictiv (doar avertisment)
for egress in config.get("egress", []):
if "0.0.0.0/0" in egress.get("cidr_blocks", []):
if egress.get("from_port") == 0 and egress.get("to_port") == 0:
violations.append(PolicyViolation(
rule_id="NET-002",
severity="LOW",
resource=f"aws_security_group.{sg_name}",
message="Security group allows unrestricted egress",
remediation="Consider restricting egress to required ports"
))
return violations
def _check_encryption(self, plan: Dict) -> List[PolicyViolation]:
"""Verifica configuratiile de criptare."""
violations = []
# Criptare RDS
for resource in self._get_resources(plan, "aws_db_instance"):
db_name = resource.get("name", "unknown")
config = resource.get("values", {})
if not config.get("storage_encrypted", False):
violations.append(PolicyViolation(
rule_id="ENC-001",
severity="HIGH",
resource=f"aws_db_instance.{db_name}",
message="RDS instance storage encryption not enabled",
remediation="Set storage_encrypted = true"
))
if config.get("publicly_accessible", False):
violations.append(PolicyViolation(
rule_id="ENC-002",
severity="CRITICAL",
resource=f"aws_db_instance.{db_name}",
message="RDS instance is publicly accessible",
remediation="Set publicly_accessible = false"
))
# Criptare EBS
for resource in self._get_resources(plan, "aws_ebs_volume"):
vol_name = resource.get("name", "unknown")
config = resource.get("values", {})
if not config.get("encrypted", False):
violations.append(PolicyViolation(
rule_id="ENC-003",
severity="HIGH",
resource=f"aws_ebs_volume.{vol_name}",
message="EBS volume encryption not enabled",
remediation="Set encrypted = true"
))
return violations
def _get_resources(self, plan: Dict, resource_type: str) -> List[Dict]:
"""Extrage resursele de un anumit tip din plan."""
resources = []
planned_values = plan.get("planned_values", {})
root_module = planned_values.get("root_module", {})
for resource in root_module.get("resources", []):
if resource.get("type") == resource_type:
resources.append(resource)
# Verificare module copil
for module in root_module.get("child_modules", []):
for resource in module.get("resources", []):
if resource.get("type") == resource_type:
resources.append(resource)
return resources
def _has_public_access_block(self, plan: Dict, bucket_name: str) -> bool:
"""Verifica daca bucket-ul are blocare acces public."""
for resource in self._get_resources(plan, "aws_s3_bucket_public_access_block"):
if bucket_name in str(resource.get("values", {}).get("bucket", "")):
return True
return False
def _has_encryption(self, plan: Dict, bucket_name: str) -> bool:
"""Verifica daca bucket-ul are criptare configurata."""
for resource in self._get_resources(plan, "aws_s3_bucket_server_side_encryption_configuration"):
if bucket_name in str(resource.get("values", {}).get("bucket", "")):
return True
return False
def _has_versioning(self, plan: Dict, bucket_name: str) -> bool:
"""Verifica daca bucket-ul are versioning activat."""
for resource in self._get_resources(plan, "aws_s3_bucket_versioning"):
if bucket_name in str(resource.get("values", {}).get("bucket", "")):
config = resource.get("values", {}).get("versioning_configuration", [{}])
if config and config[0].get("status") == "Enabled":
return True
return False
def _has_logging(self, plan: Dict, bucket_name: str) -> bool:
"""Verifica daca bucket-ul are logging activat."""
for resource in self._get_resources(plan, "aws_s3_bucket_logging"):
if bucket_name in str(resource.get("values", {}).get("bucket", "")):
return True
return False
def scan_terraform_directory(directory: str) -> Dict[str, Any]:
"""Scaneaza un director Terraform si returneaza violatiile."""
import tempfile
# Genereaza planul
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
plan_file = f.name
try:
# Initializare si plan
subprocess.run(
["terraform", "init"],
cwd=directory,
capture_output=True,
check=True
)
subprocess.run(
["terraform", "plan", "-out=tfplan"],
cwd=directory,
capture_output=True,
check=True
)
subprocess.run(
["terraform", "show", "-json", "tfplan"],
cwd=directory,
capture_output=True,
check=True
)
with open(f"{directory}/tfplan.json") as f:
plan_json = json.load(f)
# Scanare plan
scanner = TerraformSecurityScanner()
violations = scanner.scan_plan(plan_json)
return {
"total_violations": len(violations),
"critical": len([v for v in violations if v.severity == "CRITICAL"]),
"high": len([v for v in violations if v.severity == "HIGH"]),
"medium": len([v for v in violations if v.severity == "MEDIUM"]),
"low": len([v for v in violations if v.severity == "LOW"]),
"violations": [
{
"rule_id": v.rule_id,
"severity": v.severity,
"resource": v.resource,
"message": v.message,
"remediation": v.remediation
}
for v in violations
]
}
finally:
Path(plan_file).unlink(missing_ok=True)Integrare CI/CD
Workflow GitHub Actions
# .github/workflows/iac-security.yaml
name: IaC Security Scan
on:
pull_request:
paths:
- 'terraform/**'
- 'cloudformation/**'
push:
branches:
- main
permissions:
contents: read
pull-requests: write
security-events: write
jobs:
terraform-security:
name: Terraform Security Scan
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
with:
terraform_version: 1.6.0
- name: Terraform Format Check
run: terraform fmt -check -recursive terraform/
- name: TFLint
uses: terraform-linters/setup-tflint@v4
- name: Run TFLint
run: |
cd terraform
tflint --init
tflint --format=sarif > ../tflint-results.sarif
- name: Checkov Scan
uses: bridgecrewio/checkov-action@v12
with:
directory: terraform/
framework: terraform
output_format: sarif
output_file_path: checkov-results.sarif
soft_fail: false
skip_check: CKV_AWS_144 # Skip cross-region replication check
- name: Terrascan
uses: tenable/terrascan-action@v1
with:
iac_type: terraform
iac_dir: terraform/
policy_type: aws
sarif_upload: true
- name: Upload SARIF Results
uses: github/codeql-action/upload-sarif@v2
if: always()
with:
sarif_file: |
tflint-results.sarif
checkov-results.sarif
- name: Comment PR with Results
if: github.event_name == 'pull_request'
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
let comment = '## 🔒 IaC Security Scan Results\n\n';
// Parse and summarize results
const checkovResults = JSON.parse(fs.readFileSync('checkov-results.sarif'));
const violations = checkovResults.runs[0].results || [];
const critical = violations.filter(v => v.level === 'error').length;
const warning = violations.filter(v => v.level === 'warning').length;
if (critical > 0) {
comment += `❌ **${critical} Critical Issues Found**\n\n`;
} else if (warning > 0) {
comment += `⚠️ **${warning} Warnings Found**\n\n`;
} else {
comment += `✅ **No Security Issues Found**\n\n`;
}
// Add details
for (const violation of violations.slice(0, 10)) {
comment += `- **${violation.ruleId}**: ${violation.message.text}\n`;
comment += ` - Location: \`${violation.locations[0].physicalLocation.artifactLocation.uri}\`\n`;
}
if (violations.length > 10) {
comment += `\n... and ${violations.length - 10} more issues\n`;
}
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: comment
});
cloudformation-security:
name: CloudFormation Security Scan
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install cfn-lint
run: pip install cfn-lint cfn-nag-sarif
- name: CFN Lint
run: |
cfn-lint cloudformation/**/*.yaml -f sarif > cfn-lint-results.sarif
- name: CFN Nag
uses: stelligent/cfn_nag@master
with:
input_path: cloudformation/
output_format: sarif
- name: Upload SARIF Results
uses: github/codeql-action/upload-sarif@v2
if: always()
with:
sarif_file: cfn-lint-results.sarifDetectia si remedierea derivei de configuratie
# drift_detection.py
"""
Infrastructure drift detection and automated remediation.
"""
import json
import boto3
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass
import subprocess
@dataclass
class DriftItem:
"""Reprezinta o resursa cu deriva."""
resource_type: str
resource_id: str
expected_value: str
actual_value: str
attribute: str
severity: str
class DriftDetector:
"""Detecteaza si remediaza deriva infrastructurii."""
def __init__(self, state_bucket: str, state_key: str):
self.s3 = boto3.client('s3')
self.state_bucket = state_bucket
self.state_key = state_key
def detect_drift(self) -> List[DriftItem]:
"""Detecteaza deriva intre starea Terraform si infrastructura reala."""
# Ruleaza terraform refresh si plan
result = subprocess.run(
["terraform", "plan", "-detailed-exitcode", "-json"],
capture_output=True,
text=True
)
drift_items = []
if result.returncode == 2: # Schimbari detectate
for line in result.stdout.split('\n'):
if not line.strip():
continue
try:
entry = json.loads(line)
if entry.get("type") == "resource_drift":
drift_items.append(self._parse_drift(entry))
except json.JSONDecodeError:
continue
return drift_items
def _parse_drift(self, entry: Dict) -> DriftItem:
"""Parseaza o intrare de deriva din outputul Terraform."""
change = entry.get("change", {})
resource = change.get("resource", {})
before = change.get("before", {})
after = change.get("after", {})
# Gaseste atributul modificat
changed_attrs = []
for key in set(list(before.keys()) + list(after.keys())):
if before.get(key) != after.get(key):
changed_attrs.append(key)
return DriftItem(
resource_type=resource.get("type", "unknown"),
resource_id=resource.get("name", "unknown"),
expected_value=str(before),
actual_value=str(after),
attribute=", ".join(changed_attrs),
severity=self._assess_severity(resource.get("type"), changed_attrs)
)
def _assess_severity(self, resource_type: str, attributes: List[str]) -> str:
"""Evalueaza severitatea derivei in functie de tipul resursei si atribute."""
critical_combinations = {
"aws_security_group": ["ingress", "egress"],
"aws_iam_role": ["assume_role_policy"],
"aws_iam_policy": ["policy"],
"aws_s3_bucket_public_access_block": ["block_public_acls", "block_public_policy"],
}
high_combinations = {
"aws_s3_bucket_server_side_encryption_configuration": ["rule"],
"aws_db_instance": ["publicly_accessible", "storage_encrypted"],
}
if resource_type in critical_combinations:
if any(attr in critical_combinations[resource_type] for attr in attributes):
return "CRITICAL"
if resource_type in high_combinations:
if any(attr in high_combinations[resource_type] for attr in attributes):
return "HIGH"
return "MEDIUM"
def remediate_drift(self, drift_items: List[DriftItem], auto_remediate: bool = False) -> Dict:
"""Remediaza deriva detectata."""
remediation_results = {
"timestamp": datetime.utcnow().isoformat(),
"total_drift": len(drift_items),
"remediated": 0,
"failed": 0,
"skipped": 0,
"details": []
}
for item in drift_items:
if item.severity == "CRITICAL" and not auto_remediate:
# Deriva critica necesita revizuire manuala
remediation_results["skipped"] += 1
remediation_results["details"].append({
"resource": f"{item.resource_type}.{item.resource_id}",
"status": "SKIPPED",
"reason": "Critical drift requires manual review"
})
continue
try:
# Aplica terraform pentru remediere
result = subprocess.run(
["terraform", "apply", "-auto-approve",
f"-target={item.resource_type}.{item.resource_id}"],
capture_output=True,
text=True,
timeout=300
)
if result.returncode == 0:
remediation_results["remediated"] += 1
remediation_results["details"].append({
"resource": f"{item.resource_type}.{item.resource_id}",
"status": "REMEDIATED"
})
else:
raise Exception(result.stderr)
except Exception as e:
remediation_results["failed"] += 1
remediation_results["details"].append({
"resource": f"{item.resource_type}.{item.resource_id}",
"status": "FAILED",
"error": str(e)
})
return remediation_results
def schedule_drift_check(self, schedule: str = "rate(1 hour)"):
"""Creeaza o regula EventBridge pentru detectia programata a derivei."""
events = boto3.client('events')
lambda_client = boto3.client('lambda')
# Creeaza sau actualizeaza regula
events.put_rule(
Name='terraform-drift-detection',
ScheduleExpression=schedule,
State='ENABLED',
Description='Scheduled Terraform drift detection'
)
# Adauga target Lambda
events.put_targets(
Rule='terraform-drift-detection',
Targets=[{
'Id': 'drift-detector-lambda',
'Arn': f'arn:aws:lambda:{boto3.session.Session().region_name}:'
f'{boto3.client("sts").get_caller_identity()["Account"]}:'
f'function:terraform-drift-detector'
}]
)
# Handler Lambda pentru detectia programata a derivei
def lambda_handler(event, context):
"""AWS Lambda handler for drift detection."""
import os
detector = DriftDetector(
state_bucket=os.environ['STATE_BUCKET'],
state_key=os.environ['STATE_KEY']
)
drift_items = detector.detect_drift()
if drift_items:
# Trimite alerta
sns = boto3.client('sns')
sns.publish(
TopicArn=os.environ['ALERT_TOPIC_ARN'],
Subject='Infrastructure Drift Detected',
Message=json.dumps({
"drift_count": len(drift_items),
"critical": len([d for d in drift_items if d.severity == "CRITICAL"]),
"items": [
{
"resource": f"{d.resource_type}.{d.resource_id}",
"attribute": d.attribute,
"severity": d.severity
}
for d in drift_items
]
}, indent=2)
)
return {
"statusCode": 200,
"body": json.dumps({
"drift_detected": len(drift_items) > 0,
"drift_count": len(drift_items)
})
}Concluzie
Securizarea Infrastructure as Code necesita o abordare pe mai multe niveluri, combinand patternuri de codare sigure, aplicarea politicilor, scanare automata si detectie continua a derivei. Practicile cheie acoperite includ:
- Design securizat al modulelor - cu criptare, controale de acces si logare integrate
- Policy as Code - cu OPA si scannere de securitate precum Checkov si cfn-nag
- Integrare CI/CD - blocarea deploymenturilor nesigure inainte de a ajunge in productie
- Detectia derivei - identificarea si remedierea automata a derivei de configuratie
Prin implementarea acestor practici, organizatiile pot obtine securitate a infrastructurii la scara larga, mentinand in acelasi timp beneficiile de agilitate ale Infrastructure as Code.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →