DevSecOps

Securitate IaC: Terraform si CloudFormation

Nicu Constantin
--16 min lectura
#infrastructure-as-code#terraform#cloudformation#security#devsecops#policy-as-code

Securitatea Infrastructure as Code: Securizarea Terraform si CloudFormation

Infrastructure as Code (IaC) a revolutionat modul in care organizatiile provizioneza si gestioneaza resursele cloud. Totusi, configuratiile gresite din template-urile IaC se numara printre principalele cauze ale breșurilor de securitate in cloud. Acest ghid acopera practici complete de securitate pentru deploymenturile Terraform si CloudFormation.

Provocarea securitatii IaC

Un studiu din 2024 a constatat ca 72% dintre organizatii aveau cel putin o configuratie gresita critica in template-urile lor IaC, cu probleme frecvente precum:

  • Politici IAM excesiv de permisive
  • Resurse de stocare necriptate
  • Expunere publica a retelei
  • Lipsa logarii si monitorizarii
  • Secrete hardcodate

Patternuri Terraform orientate spre securitate

Configuratie sigura a providerului

# versions.tf - Lock provider versions
terraform {
  required_version = ">= 1.6.0"
 
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.31.0"
    }
    random = {
      source  = "hashicorp/random"
      version = "~> 3.6.0"
    }
  }
 
  # Secure state backend
  backend "s3" {
    bucket         = "company-terraform-state"
    key            = "production/terraform.tfstate"
    region         = "us-east-1"
    encrypt        = true
    dynamodb_table = "terraform-state-lock"
 
    # Require specific role for state access
    role_arn = "arn:aws:iam::123456789012:role/TerraformStateAccess"
  }
}
 
# Provider with security defaults
provider "aws" {
  region = var.aws_region
 
  # Require specific role assumption
  assume_role {
    role_arn     = var.deployment_role_arn
    session_name = "TerraformDeployment"
    external_id  = var.external_id
  }
 
  default_tags {
    tags = {
      Environment     = var.environment
      ManagedBy       = "Terraform"
      SecurityReview  = "Required"
      CostCenter      = var.cost_center
    }
  }
}

Modul S3 Bucket securizat

# modules/secure-s3/main.tf
resource "aws_s3_bucket" "secure" {
  bucket = var.bucket_name
 
  # Force destroy protection for production
  force_destroy = var.environment != "production"
 
  tags = merge(var.tags, {
    DataClassification = var.data_classification
  })
}
 
# Block all public access
resource "aws_s3_bucket_public_access_block" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}
 
# Server-side encryption with KMS
resource "aws_s3_bucket_server_side_encryption_configuration" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  rule {
    apply_server_side_encryption_by_default {
      kms_master_key_id = var.kms_key_arn
      sse_algorithm     = "aws:kms"
    }
    bucket_key_enabled = true
  }
}
 
# Versioning for data protection
resource "aws_s3_bucket_versioning" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  versioning_configuration {
    status = "Enabled"
  }
}
 
# Lifecycle rules for cost and compliance
resource "aws_s3_bucket_lifecycle_configuration" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  rule {
    id     = "transition-to-glacier"
    status = "Enabled"
 
    transition {
      days          = 90
      storage_class = "GLACIER"
    }
 
    noncurrent_version_transition {
      noncurrent_days = 30
      storage_class   = "GLACIER"
    }
 
    noncurrent_version_expiration {
      noncurrent_days = var.retention_days
    }
  }
}
 
# Access logging
resource "aws_s3_bucket_logging" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  target_bucket = var.logging_bucket_id
  target_prefix = "s3-access-logs/${var.bucket_name}/"
}
 
# Bucket policy enforcing TLS
resource "aws_s3_bucket_policy" "secure" {
  bucket = aws_s3_bucket.secure.id
 
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid       = "EnforceTLS"
        Effect    = "Deny"
        Principal = "*"
        Action    = "s3:*"
        Resource = [
          aws_s3_bucket.secure.arn,
          "${aws_s3_bucket.secure.arn}/*"
        ]
        Condition = {
          Bool = {
            "aws:SecureTransport" = "false"
          }
        }
      },
      {
        Sid       = "EnforceEncryption"
        Effect    = "Deny"
        Principal = "*"
        Action    = "s3:PutObject"
        Resource  = "${aws_s3_bucket.secure.arn}/*"
        Condition = {
          StringNotEquals = {
            "s3:x-amz-server-side-encryption" = "aws:kms"
          }
        }
      }
    ]
  })
}
 
# Output for dependency management
output "bucket_arn" {
  value       = aws_s3_bucket.secure.arn
  description = "ARN of the secure S3 bucket"
}
 
output "bucket_id" {
  value       = aws_s3_bucket.secure.id
  description = "ID of the secure S3 bucket"
}

Modul IAM Role securizat

# modules/secure-iam-role/main.tf
locals {
  # Validate that role isn't overly permissive
  has_admin_access = contains(var.managed_policy_arns, "arn:aws:iam::aws:policy/AdministratorAccess")
  has_wildcard_resource = can(regex("\"Resource\"\\s*:\\s*\"\\*\"", jsonencode(var.inline_policy)))
}
 
# Fail-safe validation
resource "null_resource" "security_validation" {
  count = local.has_admin_access && var.environment == "production" ? "ADMIN_ACCESS_NOT_ALLOWED_IN_PRODUCTION" : 0
}
 
resource "aws_iam_role" "secure" {
  name                 = var.role_name
  max_session_duration = var.max_session_duration
 
  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {
          Service = var.trusted_services
          AWS     = var.trusted_accounts
        }
        Action = "sts:AssumeRole"
        Condition = merge(
          # Require external ID for cross-account
          length(var.trusted_accounts) > 0 ? {
            StringEquals = {
              "sts:ExternalId" = var.external_id
            }
          } : {},
          # Require MFA for human users
          var.require_mfa ? {
            Bool = {
              "aws:MultiFactorAuthPresent" = "true"
            }
          } : {},
          # IP restriction
          length(var.allowed_ip_ranges) > 0 ? {
            IpAddress = {
              "aws:SourceIp" = var.allowed_ip_ranges
            }
          } : {}
        )
      }
    ]
  })
 
  tags = var.tags
}
 
# Permission boundary enforcement
resource "aws_iam_role_policy_attachment" "permission_boundary" {
  count      = var.permission_boundary_arn != null ? 1 : 0
  role       = aws_iam_role.secure.name
  policy_arn = var.permission_boundary_arn
}
 
# Inline policy with least privilege
resource "aws_iam_role_policy" "inline" {
  count  = var.inline_policy != null ? 1 : 0
  name   = "${var.role_name}-policy"
  role   = aws_iam_role.secure.id
  policy = var.inline_policy
}
 
# Managed policy attachments
resource "aws_iam_role_policy_attachment" "managed" {
  for_each   = toset(var.managed_policy_arns)
  role       = aws_iam_role.secure.name
  policy_arn = each.value
}

Patternuri de securitate CloudFormation

Template de stack securizat

# secure-infrastructure.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Secure infrastructure stack with security best practices'
 
Metadata:
  cfn-lint:
    config:
      regions:
        - us-east-1
        - us-west-2
 
  AWS::CloudFormation::Interface:
    ParameterGroups:
      - Label:
          default: Security Configuration
        Parameters:
          - Environment
          - KMSKeyArn
          - AllowedCIDR
 
Parameters:
  Environment:
    Type: String
    AllowedValues:
      - development
      - staging
      - production
    Default: development
 
  KMSKeyArn:
    Type: String
    Description: ARN of KMS key for encryption
    AllowedPattern: '^arn:aws:kms:[a-z0-9-]+:[0-9]+:key/[a-f0-9-]+$'
 
  AllowedCIDR:
    Type: String
    Description: CIDR range for access
    AllowedPattern: '^([0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}$'
    Default: '10.0.0.0/8'
 
Conditions:
  IsProduction: !Equals [!Ref Environment, 'production']
 
Rules:
  ProductionRequiresKMS:
    RuleCondition: !Equals [!Ref Environment, 'production']
    Assertions:
      - Assert: !Not [!Equals [!Ref KMSKeyArn, '']]
        AssertDescription: 'KMS key is required for production'
 
Resources:
  # VPC securizat cu flow logs
  SecureVPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: !Ref AllowedCIDR
      EnableDnsHostnames: true
      EnableDnsSupport: true
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-vpc'
        - Key: Environment
          Value: !Ref Environment
 
  VPCFlowLog:
    Type: AWS::EC2::FlowLog
    Properties:
      ResourceId: !Ref SecureVPC
      ResourceType: VPC
      TrafficType: ALL
      LogDestinationType: cloud-watch-logs
      LogGroupName: !Sub '/aws/vpc/${AWS::StackName}-flow-logs'
      MaxAggregationInterval: 60
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-flow-log'
 
  FlowLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/vpc/${AWS::StackName}-flow-logs'
      RetentionInDays: !If [IsProduction, 365, 30]
      KmsKeyId: !If [IsProduction, !Ref KMSKeyArn, !Ref 'AWS::NoValue']
 
  # Security Group securizat
  SecureSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Metadata:
      cfn_nag:
        rules_to_suppress:
          - id: W5
            reason: 'Egress to 0.0.0.0/0 required for updates'
    Properties:
      GroupDescription: Secure security group with minimal access
      VpcId: !Ref SecureVPC
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          CidrIp: !Ref AllowedCIDR
          Description: HTTPS from allowed CIDR
      SecurityGroupEgress:
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          CidrIp: 0.0.0.0/0
          Description: HTTPS outbound
      Tags:
        - Key: Name
          Value: !Sub '${AWS::StackName}-sg'
 
  # Instanta RDS criptata
  SecureDatabase:
    Type: AWS::RDS::DBInstance
    DeletionPolicy: !If [IsProduction, Retain, Delete]
    UpdateReplacePolicy: !If [IsProduction, Retain, Delete]
    Properties:
      DBInstanceIdentifier: !Sub '${AWS::StackName}-db'
      DBInstanceClass: db.t3.medium
      Engine: postgres
      EngineVersion: '15'
      MasterUsername: !Sub '{{resolve:secretsmanager:${DatabaseSecret}:SecretString:username}}'
      MasterUserPassword: !Sub '{{resolve:secretsmanager:${DatabaseSecret}:SecretString:password}}'
      StorageEncrypted: true
      KmsKeyId: !Ref KMSKeyArn
      MultiAZ: !If [IsProduction, true, false]
      PubliclyAccessible: false
      EnableIAMDatabaseAuthentication: true
      DeletionProtection: !If [IsProduction, true, false]
      BackupRetentionPeriod: !If [IsProduction, 35, 7]
      EnableCloudwatchLogsExports:
        - postgresql
        - upgrade
      VPCSecurityGroups:
        - !Ref SecureSecurityGroup
      Tags:
        - Key: Environment
          Value: !Ref Environment
 
  # Secrets Manager pentru credentiale baza de date
  DatabaseSecret:
    Type: AWS::SecretsManager::Secret
    Properties:
      Name: !Sub '${AWS::StackName}/database/credentials'
      Description: Database credentials
      KmsKeyId: !Ref KMSKeyArn
      GenerateSecretString:
        SecretStringTemplate: '{"username": "dbadmin"}'
        GenerateStringKey: password
        PasswordLength: 32
        ExcludeCharacters: '"@/\'
      Tags:
        - Key: Environment
          Value: !Ref Environment
 
  # Rotatie secrete
  SecretRotationSchedule:
    Type: AWS::SecretsManager::RotationSchedule
    Condition: IsProduction
    Properties:
      SecretId: !Ref DatabaseSecret
      RotationRules:
        AutomaticallyAfterDays: 30
 
Outputs:
  VPCId:
    Description: VPC ID
    Value: !Ref SecureVPC
    Export:
      Name: !Sub '${AWS::StackName}-VPCId'
 
  SecurityGroupId:
    Description: Security Group ID
    Value: !Ref SecureSecurityGroup
    Export:
      Name: !Sub '${AWS::StackName}-SecurityGroupId'

Policy as Code cu OPA

Politici de securitate Terraform

# opa_policies/terraform_security.py
"""
Open Policy Agent policies for Terraform security scanning.
"""
 
import json
import subprocess
from dataclasses import dataclass
from typing import List, Dict, Any
from pathlib import Path
 
 
@dataclass
class PolicyViolation:
    """Represents a policy violation."""
    rule_id: str
    severity: str
    resource: str
    message: str
    remediation: str
 
 
class TerraformSecurityScanner:
    """Scan Terraform plans against security policies."""
 
    def __init__(self, policy_dir: str = "policies"):
        self.policy_dir = Path(policy_dir)
        self.policies = self._load_policies()
 
    def _load_policies(self) -> Dict[str, Any]:
        """Load OPA policies from directory."""
        policies = {}
        for policy_file in self.policy_dir.glob("*.rego"):
            policies[policy_file.stem] = policy_file.read_text()
        return policies
 
    def scan_plan(self, plan_json: Dict[str, Any]) -> List[PolicyViolation]:
        """Scan Terraform plan JSON against all policies."""
        violations = []
 
        # Verificari securitate S3 bucket
        violations.extend(self._check_s3_security(plan_json))
 
        # Verificari securitate IAM
        violations.extend(self._check_iam_security(plan_json))
 
        # Verificari securitate retea
        violations.extend(self._check_network_security(plan_json))
 
        # Verificari criptare
        violations.extend(self._check_encryption(plan_json))
 
        return violations
 
    def _check_s3_security(self, plan: Dict) -> List[PolicyViolation]:
        """Verifica configuratiile de securitate ale bucket-urilor S3."""
        violations = []
 
        for resource in self._get_resources(plan, "aws_s3_bucket"):
            bucket_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            # Verificare acces public
            if not self._has_public_access_block(plan, bucket_name):
                violations.append(PolicyViolation(
                    rule_id="S3-001",
                    severity="HIGH",
                    resource=f"aws_s3_bucket.{bucket_name}",
                    message="S3 bucket missing public access block",
                    remediation="Add aws_s3_bucket_public_access_block resource"
                ))
 
            # Verificare criptare
            if not self._has_encryption(plan, bucket_name):
                violations.append(PolicyViolation(
                    rule_id="S3-002",
                    severity="HIGH",
                    resource=f"aws_s3_bucket.{bucket_name}",
                    message="S3 bucket missing server-side encryption",
                    remediation="Add aws_s3_bucket_server_side_encryption_configuration"
                ))
 
            # Verificare versioning
            if not self._has_versioning(plan, bucket_name):
                violations.append(PolicyViolation(
                    rule_id="S3-003",
                    severity="MEDIUM",
                    resource=f"aws_s3_bucket.{bucket_name}",
                    message="S3 bucket versioning not enabled",
                    remediation="Add aws_s3_bucket_versioning resource"
                ))
 
            # Verificare logging
            if not self._has_logging(plan, bucket_name):
                violations.append(PolicyViolation(
                    rule_id="S3-004",
                    severity="MEDIUM",
                    resource=f"aws_s3_bucket.{bucket_name}",
                    message="S3 bucket access logging not enabled",
                    remediation="Add aws_s3_bucket_logging resource"
                ))
 
        return violations
 
    def _check_iam_security(self, plan: Dict) -> List[PolicyViolation]:
        """Verifica securitatea configuratiei IAM."""
        violations = []
 
        for resource in self._get_resources(plan, "aws_iam_role"):
            role_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            assume_role_policy = config.get("assume_role_policy", "{}")
            if isinstance(assume_role_policy, str):
                assume_role_policy = json.loads(assume_role_policy)
 
            # Verificare trust excesiv de permisiv
            for statement in assume_role_policy.get("Statement", []):
                principal = statement.get("Principal", {})
                if principal == "*" or principal.get("AWS") == "*":
                    violations.append(PolicyViolation(
                        rule_id="IAM-001",
                        severity="CRITICAL",
                        resource=f"aws_iam_role.{role_name}",
                        message="IAM role has overly permissive trust policy",
                        remediation="Restrict Principal to specific accounts/services"
                    ))
 
        for resource in self._get_resources(plan, "aws_iam_policy"):
            policy_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            policy_doc = config.get("policy", "{}")
            if isinstance(policy_doc, str):
                policy_doc = json.loads(policy_doc)
 
            for statement in policy_doc.get("Statement", []):
                # Verificare acces admin
                actions = statement.get("Action", [])
                if isinstance(actions, str):
                    actions = [actions]
 
                resources = statement.get("Resource", [])
                if isinstance(resources, str):
                    resources = [resources]
 
                if "*" in actions and "*" in resources:
                    violations.append(PolicyViolation(
                        rule_id="IAM-002",
                        severity="CRITICAL",
                        resource=f"aws_iam_policy.{policy_name}",
                        message="IAM policy grants full admin access",
                        remediation="Apply least privilege - restrict actions and resources"
                    ))
 
                # Verificare wildcard resources cu actiuni sensibile
                sensitive_actions = [
                    "iam:*", "kms:*", "s3:*", "ec2:*",
                    "lambda:*", "rds:*", "secretsmanager:*"
                ]
                for action in actions:
                    if action in sensitive_actions and "*" in resources:
                        violations.append(PolicyViolation(
                            rule_id="IAM-003",
                            severity="HIGH",
                            resource=f"aws_iam_policy.{policy_name}",
                            message=f"IAM policy grants {action} on all resources",
                            remediation="Restrict Resource to specific ARNs"
                        ))
 
        return violations
 
    def _check_network_security(self, plan: Dict) -> List[PolicyViolation]:
        """Verifica configuratiile de securitate ale retelei."""
        violations = []
 
        for resource in self._get_resources(plan, "aws_security_group"):
            sg_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            # Verificare reguli ingress
            for ingress in config.get("ingress", []):
                cidr_blocks = ingress.get("cidr_blocks", [])
                from_port = ingress.get("from_port", 0)
                to_port = ingress.get("to_port", 65535)
 
                # Verificare 0.0.0.0/0 pe porturi sensibile
                if "0.0.0.0/0" in cidr_blocks:
                    sensitive_ports = [22, 3389, 3306, 5432, 27017, 6379]
                    for port in sensitive_ports:
                        if from_port <= port <= to_port:
                            violations.append(PolicyViolation(
                                rule_id="NET-001",
                                severity="CRITICAL",
                                resource=f"aws_security_group.{sg_name}",
                                message=f"Security group allows 0.0.0.0/0 access to port {port}",
                                remediation="Restrict CIDR to specific IP ranges"
                            ))
 
            # Verificare egress nerestrictiv (doar avertisment)
            for egress in config.get("egress", []):
                if "0.0.0.0/0" in egress.get("cidr_blocks", []):
                    if egress.get("from_port") == 0 and egress.get("to_port") == 0:
                        violations.append(PolicyViolation(
                            rule_id="NET-002",
                            severity="LOW",
                            resource=f"aws_security_group.{sg_name}",
                            message="Security group allows unrestricted egress",
                            remediation="Consider restricting egress to required ports"
                        ))
 
        return violations
 
    def _check_encryption(self, plan: Dict) -> List[PolicyViolation]:
        """Verifica configuratiile de criptare."""
        violations = []
 
        # Criptare RDS
        for resource in self._get_resources(plan, "aws_db_instance"):
            db_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            if not config.get("storage_encrypted", False):
                violations.append(PolicyViolation(
                    rule_id="ENC-001",
                    severity="HIGH",
                    resource=f"aws_db_instance.{db_name}",
                    message="RDS instance storage encryption not enabled",
                    remediation="Set storage_encrypted = true"
                ))
 
            if config.get("publicly_accessible", False):
                violations.append(PolicyViolation(
                    rule_id="ENC-002",
                    severity="CRITICAL",
                    resource=f"aws_db_instance.{db_name}",
                    message="RDS instance is publicly accessible",
                    remediation="Set publicly_accessible = false"
                ))
 
        # Criptare EBS
        for resource in self._get_resources(plan, "aws_ebs_volume"):
            vol_name = resource.get("name", "unknown")
            config = resource.get("values", {})
 
            if not config.get("encrypted", False):
                violations.append(PolicyViolation(
                    rule_id="ENC-003",
                    severity="HIGH",
                    resource=f"aws_ebs_volume.{vol_name}",
                    message="EBS volume encryption not enabled",
                    remediation="Set encrypted = true"
                ))
 
        return violations
 
    def _get_resources(self, plan: Dict, resource_type: str) -> List[Dict]:
        """Extrage resursele de un anumit tip din plan."""
        resources = []
 
        planned_values = plan.get("planned_values", {})
        root_module = planned_values.get("root_module", {})
 
        for resource in root_module.get("resources", []):
            if resource.get("type") == resource_type:
                resources.append(resource)
 
        # Verificare module copil
        for module in root_module.get("child_modules", []):
            for resource in module.get("resources", []):
                if resource.get("type") == resource_type:
                    resources.append(resource)
 
        return resources
 
    def _has_public_access_block(self, plan: Dict, bucket_name: str) -> bool:
        """Verifica daca bucket-ul are blocare acces public."""
        for resource in self._get_resources(plan, "aws_s3_bucket_public_access_block"):
            if bucket_name in str(resource.get("values", {}).get("bucket", "")):
                return True
        return False
 
    def _has_encryption(self, plan: Dict, bucket_name: str) -> bool:
        """Verifica daca bucket-ul are criptare configurata."""
        for resource in self._get_resources(plan, "aws_s3_bucket_server_side_encryption_configuration"):
            if bucket_name in str(resource.get("values", {}).get("bucket", "")):
                return True
        return False
 
    def _has_versioning(self, plan: Dict, bucket_name: str) -> bool:
        """Verifica daca bucket-ul are versioning activat."""
        for resource in self._get_resources(plan, "aws_s3_bucket_versioning"):
            if bucket_name in str(resource.get("values", {}).get("bucket", "")):
                config = resource.get("values", {}).get("versioning_configuration", [{}])
                if config and config[0].get("status") == "Enabled":
                    return True
        return False
 
    def _has_logging(self, plan: Dict, bucket_name: str) -> bool:
        """Verifica daca bucket-ul are logging activat."""
        for resource in self._get_resources(plan, "aws_s3_bucket_logging"):
            if bucket_name in str(resource.get("values", {}).get("bucket", "")):
                return True
        return False
 
 
def scan_terraform_directory(directory: str) -> Dict[str, Any]:
    """Scaneaza un director Terraform si returneaza violatiile."""
    import tempfile
 
    # Genereaza planul
    with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
        plan_file = f.name
 
    try:
        # Initializare si plan
        subprocess.run(
            ["terraform", "init"],
            cwd=directory,
            capture_output=True,
            check=True
        )
 
        subprocess.run(
            ["terraform", "plan", "-out=tfplan"],
            cwd=directory,
            capture_output=True,
            check=True
        )
 
        subprocess.run(
            ["terraform", "show", "-json", "tfplan"],
            cwd=directory,
            capture_output=True,
            check=True
        )
 
        with open(f"{directory}/tfplan.json") as f:
            plan_json = json.load(f)
 
        # Scanare plan
        scanner = TerraformSecurityScanner()
        violations = scanner.scan_plan(plan_json)
 
        return {
            "total_violations": len(violations),
            "critical": len([v for v in violations if v.severity == "CRITICAL"]),
            "high": len([v for v in violations if v.severity == "HIGH"]),
            "medium": len([v for v in violations if v.severity == "MEDIUM"]),
            "low": len([v for v in violations if v.severity == "LOW"]),
            "violations": [
                {
                    "rule_id": v.rule_id,
                    "severity": v.severity,
                    "resource": v.resource,
                    "message": v.message,
                    "remediation": v.remediation
                }
                for v in violations
            ]
        }
 
    finally:
        Path(plan_file).unlink(missing_ok=True)

Integrare CI/CD

Workflow GitHub Actions

# .github/workflows/iac-security.yaml
name: IaC Security Scan
 
on:
  pull_request:
    paths:
      - 'terraform/**'
      - 'cloudformation/**'
  push:
    branches:
      - main
 
permissions:
  contents: read
  pull-requests: write
  security-events: write
 
jobs:
  terraform-security:
    name: Terraform Security Scan
    runs-on: ubuntu-latest
 
    steps:
      - name: Checkout
        uses: actions/checkout@v4
 
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: 1.6.0
 
      - name: Terraform Format Check
        run: terraform fmt -check -recursive terraform/
 
      - name: TFLint
        uses: terraform-linters/setup-tflint@v4
 
      - name: Run TFLint
        run: |
          cd terraform
          tflint --init
          tflint --format=sarif > ../tflint-results.sarif
 
      - name: Checkov Scan
        uses: bridgecrewio/checkov-action@v12
        with:
          directory: terraform/
          framework: terraform
          output_format: sarif
          output_file_path: checkov-results.sarif
          soft_fail: false
          skip_check: CKV_AWS_144  # Skip cross-region replication check
 
      - name: Terrascan
        uses: tenable/terrascan-action@v1
        with:
          iac_type: terraform
          iac_dir: terraform/
          policy_type: aws
          sarif_upload: true
 
      - name: Upload SARIF Results
        uses: github/codeql-action/upload-sarif@v2
        if: always()
        with:
          sarif_file: |
            tflint-results.sarif
            checkov-results.sarif
 
      - name: Comment PR with Results
        if: github.event_name == 'pull_request'
        uses: actions/github-script@v7
        with:
          script: |
            const fs = require('fs');
 
            let comment = '## 🔒 IaC Security Scan Results\n\n';
 
            // Parse and summarize results
            const checkovResults = JSON.parse(fs.readFileSync('checkov-results.sarif'));
            const violations = checkovResults.runs[0].results || [];
 
            const critical = violations.filter(v => v.level === 'error').length;
            const warning = violations.filter(v => v.level === 'warning').length;
 
            if (critical > 0) {
              comment += `❌ **${critical} Critical Issues Found**\n\n`;
            } else if (warning > 0) {
              comment += `⚠️ **${warning} Warnings Found**\n\n`;
            } else {
              comment += `✅ **No Security Issues Found**\n\n`;
            }
 
            // Add details
            for (const violation of violations.slice(0, 10)) {
              comment += `- **${violation.ruleId}**: ${violation.message.text}\n`;
              comment += `  - Location: \`${violation.locations[0].physicalLocation.artifactLocation.uri}\`\n`;
            }
 
            if (violations.length > 10) {
              comment += `\n... and ${violations.length - 10} more issues\n`;
            }
 
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: comment
            });
 
  cloudformation-security:
    name: CloudFormation Security Scan
    runs-on: ubuntu-latest
 
    steps:
      - name: Checkout
        uses: actions/checkout@v4
 
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
 
      - name: Install cfn-lint
        run: pip install cfn-lint cfn-nag-sarif
 
      - name: CFN Lint
        run: |
          cfn-lint cloudformation/**/*.yaml -f sarif > cfn-lint-results.sarif
 
      - name: CFN Nag
        uses: stelligent/cfn_nag@master
        with:
          input_path: cloudformation/
          output_format: sarif
 
      - name: Upload SARIF Results
        uses: github/codeql-action/upload-sarif@v2
        if: always()
        with:
          sarif_file: cfn-lint-results.sarif

Detectia si remedierea derivei de configuratie

# drift_detection.py
"""
Infrastructure drift detection and automated remediation.
"""
 
import json
import boto3
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass
import subprocess
 
 
@dataclass
class DriftItem:
    """Reprezinta o resursa cu deriva."""
    resource_type: str
    resource_id: str
    expected_value: str
    actual_value: str
    attribute: str
    severity: str
 
 
class DriftDetector:
    """Detecteaza si remediaza deriva infrastructurii."""
 
    def __init__(self, state_bucket: str, state_key: str):
        self.s3 = boto3.client('s3')
        self.state_bucket = state_bucket
        self.state_key = state_key
 
    def detect_drift(self) -> List[DriftItem]:
        """Detecteaza deriva intre starea Terraform si infrastructura reala."""
        # Ruleaza terraform refresh si plan
        result = subprocess.run(
            ["terraform", "plan", "-detailed-exitcode", "-json"],
            capture_output=True,
            text=True
        )
 
        drift_items = []
 
        if result.returncode == 2:  # Schimbari detectate
            for line in result.stdout.split('\n'):
                if not line.strip():
                    continue
 
                try:
                    entry = json.loads(line)
                    if entry.get("type") == "resource_drift":
                        drift_items.append(self._parse_drift(entry))
                except json.JSONDecodeError:
                    continue
 
        return drift_items
 
    def _parse_drift(self, entry: Dict) -> DriftItem:
        """Parseaza o intrare de deriva din outputul Terraform."""
        change = entry.get("change", {})
        resource = change.get("resource", {})
 
        before = change.get("before", {})
        after = change.get("after", {})
 
        # Gaseste atributul modificat
        changed_attrs = []
        for key in set(list(before.keys()) + list(after.keys())):
            if before.get(key) != after.get(key):
                changed_attrs.append(key)
 
        return DriftItem(
            resource_type=resource.get("type", "unknown"),
            resource_id=resource.get("name", "unknown"),
            expected_value=str(before),
            actual_value=str(after),
            attribute=", ".join(changed_attrs),
            severity=self._assess_severity(resource.get("type"), changed_attrs)
        )
 
    def _assess_severity(self, resource_type: str, attributes: List[str]) -> str:
        """Evalueaza severitatea derivei in functie de tipul resursei si atribute."""
        critical_combinations = {
            "aws_security_group": ["ingress", "egress"],
            "aws_iam_role": ["assume_role_policy"],
            "aws_iam_policy": ["policy"],
            "aws_s3_bucket_public_access_block": ["block_public_acls", "block_public_policy"],
        }
 
        high_combinations = {
            "aws_s3_bucket_server_side_encryption_configuration": ["rule"],
            "aws_db_instance": ["publicly_accessible", "storage_encrypted"],
        }
 
        if resource_type in critical_combinations:
            if any(attr in critical_combinations[resource_type] for attr in attributes):
                return "CRITICAL"
 
        if resource_type in high_combinations:
            if any(attr in high_combinations[resource_type] for attr in attributes):
                return "HIGH"
 
        return "MEDIUM"
 
    def remediate_drift(self, drift_items: List[DriftItem], auto_remediate: bool = False) -> Dict:
        """Remediaza deriva detectata."""
        remediation_results = {
            "timestamp": datetime.utcnow().isoformat(),
            "total_drift": len(drift_items),
            "remediated": 0,
            "failed": 0,
            "skipped": 0,
            "details": []
        }
 
        for item in drift_items:
            if item.severity == "CRITICAL" and not auto_remediate:
                # Deriva critica necesita revizuire manuala
                remediation_results["skipped"] += 1
                remediation_results["details"].append({
                    "resource": f"{item.resource_type}.{item.resource_id}",
                    "status": "SKIPPED",
                    "reason": "Critical drift requires manual review"
                })
                continue
 
            try:
                # Aplica terraform pentru remediere
                result = subprocess.run(
                    ["terraform", "apply", "-auto-approve",
                     f"-target={item.resource_type}.{item.resource_id}"],
                    capture_output=True,
                    text=True,
                    timeout=300
                )
 
                if result.returncode == 0:
                    remediation_results["remediated"] += 1
                    remediation_results["details"].append({
                        "resource": f"{item.resource_type}.{item.resource_id}",
                        "status": "REMEDIATED"
                    })
                else:
                    raise Exception(result.stderr)
 
            except Exception as e:
                remediation_results["failed"] += 1
                remediation_results["details"].append({
                    "resource": f"{item.resource_type}.{item.resource_id}",
                    "status": "FAILED",
                    "error": str(e)
                })
 
        return remediation_results
 
    def schedule_drift_check(self, schedule: str = "rate(1 hour)"):
        """Creeaza o regula EventBridge pentru detectia programata a derivei."""
        events = boto3.client('events')
        lambda_client = boto3.client('lambda')
 
        # Creeaza sau actualizeaza regula
        events.put_rule(
            Name='terraform-drift-detection',
            ScheduleExpression=schedule,
            State='ENABLED',
            Description='Scheduled Terraform drift detection'
        )
 
        # Adauga target Lambda
        events.put_targets(
            Rule='terraform-drift-detection',
            Targets=[{
                'Id': 'drift-detector-lambda',
                'Arn': f'arn:aws:lambda:{boto3.session.Session().region_name}:'
                       f'{boto3.client("sts").get_caller_identity()["Account"]}:'
                       f'function:terraform-drift-detector'
            }]
        )
 
 
# Handler Lambda pentru detectia programata a derivei
def lambda_handler(event, context):
    """AWS Lambda handler for drift detection."""
    import os
 
    detector = DriftDetector(
        state_bucket=os.environ['STATE_BUCKET'],
        state_key=os.environ['STATE_KEY']
    )
 
    drift_items = detector.detect_drift()
 
    if drift_items:
        # Trimite alerta
        sns = boto3.client('sns')
        sns.publish(
            TopicArn=os.environ['ALERT_TOPIC_ARN'],
            Subject='Infrastructure Drift Detected',
            Message=json.dumps({
                "drift_count": len(drift_items),
                "critical": len([d for d in drift_items if d.severity == "CRITICAL"]),
                "items": [
                    {
                        "resource": f"{d.resource_type}.{d.resource_id}",
                        "attribute": d.attribute,
                        "severity": d.severity
                    }
                    for d in drift_items
                ]
            }, indent=2)
        )
 
    return {
        "statusCode": 200,
        "body": json.dumps({
            "drift_detected": len(drift_items) > 0,
            "drift_count": len(drift_items)
        })
    }

Concluzie

Securizarea Infrastructure as Code necesita o abordare pe mai multe niveluri, combinand patternuri de codare sigure, aplicarea politicilor, scanare automata si detectie continua a derivei. Practicile cheie acoperite includ:

  1. Design securizat al modulelor - cu criptare, controale de acces si logare integrate
  2. Policy as Code - cu OPA si scannere de securitate precum Checkov si cfn-nag
  3. Integrare CI/CD - blocarea deploymenturilor nesigure inainte de a ajunge in productie
  4. Detectia derivei - identificarea si remedierea automata a derivei de configuratie

Prin implementarea acestor practici, organizatiile pot obtine securitate a infrastructurii la scara larga, mentinand in acelasi timp beneficiile de agilitate ale Infrastructure as Code.


Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →

Ai nevoie de ajutor cu conformitatea EU AI Act sau securitatea AI?

Programeaza o consultatie gratuita de 30 de minute. Fara obligatii.

Programeaza un Apel

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.