Scanarea de securitate a Infrastructure as Code detecteaza configuratiile gresite inainte de deploy. Acest ghid acopera implementari practice pentru securizarea configuratiilor Terraform, CloudFormation si Kubernetes.
Scanner IaC multi-framework
Construieste un scanner care suporta mai multe formate IaC:
from dataclasses import dataclass
from typing import List, Dict, Optional
from enum import Enum
from abc import ABC, abstractmethod
import json
import yaml
import re
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
@dataclass
class Finding:
rule_id: str
severity: Severity
resource_type: str
resource_name: str
file_path: str
line_number: int
description: str
remediation: str
framework: str
class IaCParser(ABC):
@abstractmethod
def parse(self, content: str) -> Dict:
pass
@abstractmethod
def get_resources(self, parsed: Dict) -> List[Dict]:
pass
class TerraformParser(IaCParser):
def parse(self, content: str) -> Dict:
import hcl2
from io import StringIO
return hcl2.load(StringIO(content))
def get_resources(self, parsed: Dict) -> List[Dict]:
resources = []
for resource_block in parsed.get('resource', []):
for res_type, res_instances in resource_block.items():
for res_name, config in res_instances.items():
resources.append({
'type': res_type,
'name': res_name,
'config': config,
'framework': 'terraform'
})
return resources
class CloudFormationParser(IaCParser):
def parse(self, content: str) -> Dict:
if content.strip().startswith('{'):
return json.loads(content)
return yaml.safe_load(content)
def get_resources(self, parsed: Dict) -> List[Dict]:
resources = []
for res_name, res_config in parsed.get('Resources', {}).items():
resources.append({
'type': res_config.get('Type', ''),
'name': res_name,
'config': res_config.get('Properties', {}),
'framework': 'cloudformation'
})
return resources
class KubernetesParser(IaCParser):
def parse(self, content: str) -> Dict:
return yaml.safe_load(content)
def get_resources(self, parsed: Dict) -> List[Dict]:
return [{
'type': parsed.get('kind', ''),
'name': parsed.get('metadata', {}).get('name', ''),
'config': parsed.get('spec', {}),
'metadata': parsed.get('metadata', {}),
'framework': 'kubernetes'
}]
class IaCSecurityScanner:
def __init__(self):
self.parsers = {
'terraform': TerraformParser(),
'cloudformation': CloudFormationParser(),
'kubernetes': KubernetesParser()
}
self.rules = self._load_rules()
self.findings: List[Finding] = []
def _load_rules(self) -> List[Dict]:
return [
# Reguli AWS S3
{
"rule_id": "AWS-S3-001",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_s3_bucket", "AWS::S3::Bucket"],
"check": self._check_s3_public,
"description": "S3 bucket allows public access",
"remediation": "Block public access using bucket policy or ACL"
},
{
"rule_id": "AWS-S3-002",
"severity": Severity.HIGH,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_s3_bucket", "AWS::S3::Bucket"],
"check": self._check_s3_encryption,
"description": "S3 bucket lacks server-side encryption",
"remediation": "Enable SSE-S3 or SSE-KMS encryption"
},
# Reguli AWS Security Group
{
"rule_id": "AWS-SG-001",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_security_group", "AWS::EC2::SecurityGroup"],
"check": self._check_sg_open_ingress,
"description": "Security group allows unrestricted ingress (0.0.0.0/0)",
"remediation": "Restrict ingress to specific IP ranges"
},
{
"rule_id": "AWS-SG-002",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_security_group", "AWS::EC2::SecurityGroup"],
"check": self._check_sg_ssh_open,
"description": "SSH port 22 open to the internet",
"remediation": "Restrict SSH to bastion hosts or VPN IPs"
},
# Reguli AWS RDS
{
"rule_id": "AWS-RDS-001",
"severity": Severity.CRITICAL,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_db_instance", "AWS::RDS::DBInstance"],
"check": self._check_rds_public,
"description": "RDS instance is publicly accessible",
"remediation": "Set publicly_accessible to false"
},
{
"rule_id": "AWS-RDS-002",
"severity": Severity.HIGH,
"frameworks": ["terraform", "cloudformation"],
"resource_types": ["aws_db_instance", "AWS::RDS::DBInstance"],
"check": self._check_rds_encryption,
"description": "RDS instance lacks encryption at rest",
"remediation": "Enable storage_encrypted"
},
# Reguli Kubernetes
{
"rule_id": "K8S-POD-001",
"severity": Severity.CRITICAL,
"frameworks": ["kubernetes"],
"resource_types": ["Pod", "Deployment", "StatefulSet"],
"check": self._check_k8s_privileged,
"description": "Container running in privileged mode",
"remediation": "Set securityContext.privileged to false"
},
{
"rule_id": "K8S-POD-002",
"severity": Severity.HIGH,
"frameworks": ["kubernetes"],
"resource_types": ["Pod", "Deployment", "StatefulSet"],
"check": self._check_k8s_root,
"description": "Container running as root",
"remediation": "Set runAsNonRoot to true"
},
{
"rule_id": "K8S-POD-003",
"severity": Severity.MEDIUM,
"frameworks": ["kubernetes"],
"resource_types": ["Pod", "Deployment", "StatefulSet"],
"check": self._check_k8s_resources,
"description": "Container lacks resource limits",
"remediation": "Define CPU and memory limits"
}
]
def scan_file(self, file_path: str, framework: str) -> List[Finding]:
"""Scaneaza un singur fisier pentru probleme de securitate."""
with open(file_path, 'r') as f:
content = f.read()
parser = self.parsers.get(framework)
if not parser:
raise ValueError(f"Unsupported framework: {framework}")
try:
parsed = parser.parse(content)
resources = parser.get_resources(parsed)
except Exception as e:
print(f"Failed to parse {file_path}: {e}")
return []
findings = []
for resource in resources:
for rule in self.rules:
if framework not in rule["frameworks"]:
continue
if resource['type'] not in rule["resource_types"]:
continue
if rule["check"](resource['config'], framework):
finding = Finding(
rule_id=rule["rule_id"],
severity=rule["severity"],
resource_type=resource['type'],
resource_name=resource['name'],
file_path=file_path,
line_number=self._find_line(content, resource['name']),
description=rule["description"],
remediation=rule["remediation"],
framework=framework
)
findings.append(finding)
self.findings.extend(findings)
return findings
def _find_line(self, content: str, name: str) -> int:
lines = content.split('\n')
for i, line in enumerate(lines):
if name in line:
return i + 1
return 0
# Functii de verificare
def _check_s3_public(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
acl = config.get('acl', '')
return acl in ['public-read', 'public-read-write']
elif framework == 'cloudformation':
acl = config.get('AccessControl', '')
return acl in ['PublicRead', 'PublicReadWrite']
return False
def _check_s3_encryption(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
return 'server_side_encryption_configuration' not in str(config)
elif framework == 'cloudformation':
return 'BucketEncryption' not in config
return False
def _check_sg_open_ingress(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
for ingress in config.get('ingress', []):
if '0.0.0.0/0' in str(ingress.get('cidr_blocks', [])):
return True
elif framework == 'cloudformation':
for ingress in config.get('SecurityGroupIngress', []):
if ingress.get('CidrIp') == '0.0.0.0/0':
return True
return False
def _check_sg_ssh_open(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
for ingress in config.get('ingress', []):
from_port = ingress.get('from_port', 0)
to_port = ingress.get('to_port', 0)
cidrs = ingress.get('cidr_blocks', [])
if from_port <= 22 <= to_port and '0.0.0.0/0' in cidrs:
return True
elif framework == 'cloudformation':
for ingress in config.get('SecurityGroupIngress', []):
from_port = ingress.get('FromPort', 0)
to_port = ingress.get('ToPort', 0)
cidr = ingress.get('CidrIp', '')
if from_port <= 22 <= to_port and cidr == '0.0.0.0/0':
return True
return False
def _check_rds_public(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
return config.get('publicly_accessible', False) is True
elif framework == 'cloudformation':
return config.get('PubliclyAccessible', False) is True
return False
def _check_rds_encryption(self, config: Dict, framework: str) -> bool:
if framework == 'terraform':
return config.get('storage_encrypted', False) is False
elif framework == 'cloudformation':
return config.get('StorageEncrypted', False) is False
return False
def _check_k8s_privileged(self, config: Dict, framework: str) -> bool:
containers = self._get_k8s_containers(config)
for container in containers:
sc = container.get('securityContext', {})
if sc.get('privileged', False):
return True
return False
def _check_k8s_root(self, config: Dict, framework: str) -> bool:
containers = self._get_k8s_containers(config)
for container in containers:
sc = container.get('securityContext', {})
if sc.get('runAsNonRoot') is not True:
return True
return False
def _check_k8s_resources(self, config: Dict, framework: str) -> bool:
containers = self._get_k8s_containers(config)
for container in containers:
resources = container.get('resources', {})
if not resources.get('limits'):
return True
return False
def _get_k8s_containers(self, config: Dict) -> List[Dict]:
# Gestioneaza atat Pod cat si Deployment/StatefulSet
if 'containers' in config:
return config.get('containers', [])
elif 'template' in config:
return config.get('template', {}).get('spec', {}).get('containers', [])
return []
def generate_report(self) -> Dict:
"""Genereaza raportul de scanare."""
severity_counts = {s.value: 0 for s in Severity}
for finding in self.findings:
severity_counts[finding.severity.value] += 1
return {
"scan_summary": {
"total_findings": len(self.findings),
"by_severity": severity_counts,
"passed": severity_counts['critical'] == 0 and severity_counts['high'] == 0
},
"findings": [
{
"rule_id": f.rule_id,
"severity": f.severity.value,
"resource": f"{f.resource_type}.{f.resource_name}",
"file": f.file_path,
"line": f.line_number,
"description": f.description,
"remediation": f.remediation,
"framework": f.framework
}
for f in sorted(self.findings, key=lambda x: Severity[x.severity.name].value, reverse=True)
]
}Integrare CI/CD
Integreaza scanarea in pipeline-urile CI/CD:
# .github/workflows/iac-security.yml
name: IaC Security Scan
on:
pull_request:
paths:
- 'terraform/**'
- 'cloudformation/**'
- 'kubernetes/**'
push:
branches: [main]
jobs:
scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install checkov tfsec
- name: Run Checkov
run: |
checkov -d . --framework terraform cloudformation kubernetes \
--output sarif --output-file checkov-results.sarif
- name: Run tfsec
run: |
tfsec terraform/ --format sarif > tfsec-results.sarif
- name: Upload SARIF results
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: checkov-results.sarif
- name: Fail on critical findings
run: |
checkov -d . --framework terraform --check CKV_AWS_19,CKV_AWS_20 \
--hard-fail-on CRITICALConcluzie
Scanarea de securitate IaC previne ajungerea configuratiilor gresite in productie. Integreaza scanarea in pipeline-urile CI/CD, foloseste policy as code pentru standardele organizationale si monitorizeaza deriva de configuratie. Incepe cu regulile de securitate critice si extinde acoperirea pe masura ce programul tau se maturizeaza.
Sistemul tau AI e conform cu EU AI Act? Evaluare gratuita de risc - afla in 2 minute →