DevSecOps

Security Logging and Monitoring: Building Effective SIEM Pipelines

DeviDevs Team
12 min read
#SIEM#security monitoring#logging#DevSecOps#incident response

Effective security monitoring is the foundation of threat detection and incident response. This guide covers implementing comprehensive logging, SIEM integration, and automated response capabilities for modern applications.

Security Logging Architecture

Structured Security Log Format

# security_logger.py
import json
import logging
import hashlib
from datetime import datetime
from typing import Dict, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
import uuid
 
class SecurityEventType(Enum):
    AUTHENTICATION = "authentication"
    AUTHORIZATION = "authorization"
    DATA_ACCESS = "data_access"
    CONFIGURATION_CHANGE = "configuration_change"
    SECURITY_ALERT = "security_alert"
    NETWORK_EVENT = "network_event"
    FILE_OPERATION = "file_operation"
    SYSTEM_EVENT = "system_event"
 
class Severity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"
 
@dataclass
class SecurityEvent:
    """Structured security event following common logging standards"""
    event_id: str
    timestamp: str
    event_type: str
    severity: str
    source: Dict[str, Any]
    actor: Dict[str, Any]
    action: str
    outcome: str
    target: Optional[Dict[str, Any]] = None
    details: Optional[Dict[str, Any]] = None
    tags: Optional[list] = None
    correlation_id: Optional[str] = None
 
class SecurityLogger:
    """Centralized security logging with SIEM compatibility"""
 
    def __init__(self, app_name: str, environment: str):
        self.app_name = app_name
        self.environment = environment
        self.logger = logging.getLogger("security")
        self._setup_handlers()
 
    def _setup_handlers(self):
        """Configure logging handlers"""
        # JSON formatter for SIEM ingestion
        json_formatter = logging.Formatter(
            '%(message)s'  # We'll format as JSON ourselves
        )
 
        # File handler for persistent storage
        file_handler = logging.FileHandler('/var/log/security/events.json')
        file_handler.setFormatter(json_formatter)
        self.logger.addHandler(file_handler)
 
        # Console handler for real-time monitoring
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(json_formatter)
        self.logger.addHandler(console_handler)
 
        self.logger.setLevel(logging.INFO)
 
    def log_event(
        self,
        event_type: SecurityEventType,
        severity: Severity,
        action: str,
        outcome: str,
        actor: Dict[str, Any],
        source_ip: str = None,
        target: Dict[str, Any] = None,
        details: Dict[str, Any] = None,
        correlation_id: str = None,
        tags: list = None
    ):
        """Log a security event"""
        event = SecurityEvent(
            event_id=str(uuid.uuid4()),
            timestamp=datetime.utcnow().isoformat() + "Z",
            event_type=event_type.value,
            severity=severity.value,
            source={
                "application": self.app_name,
                "environment": self.environment,
                "ip": source_ip,
                "hostname": self._get_hostname()
            },
            actor=actor,
            action=action,
            outcome=outcome,
            target=target,
            details=details,
            tags=tags or [],
            correlation_id=correlation_id
        )
 
        # Compute event hash for integrity
        event_dict = asdict(event)
        event_dict["hash"] = self._compute_hash(event_dict)
 
        # Log as JSON
        self.logger.info(json.dumps(event_dict))
 
        # Trigger alerts for high severity events
        if severity in [Severity.CRITICAL, Severity.HIGH]:
            self._trigger_alert(event_dict)
 
        return event.event_id
 
    def _compute_hash(self, event: Dict) -> str:
        """Compute integrity hash for event"""
        # Exclude hash field from computation
        event_copy = {k: v for k, v in event.items() if k != "hash"}
        content = json.dumps(event_copy, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()[:16]
 
    def _get_hostname(self) -> str:
        """Get server hostname"""
        import socket
        return socket.gethostname()
 
    def _trigger_alert(self, event: Dict):
        """Trigger alert for critical events"""
        # Send to alerting system (webhook, PagerDuty, etc.)
        pass
 
    # Convenience methods for common event types
    def log_authentication(
        self,
        user_id: str,
        outcome: str,
        method: str,
        source_ip: str,
        details: Dict = None
    ):
        """Log authentication event"""
        return self.log_event(
            event_type=SecurityEventType.AUTHENTICATION,
            severity=Severity.INFO if outcome == "success" else Severity.MEDIUM,
            action=f"authentication.{method}",
            outcome=outcome,
            actor={"user_id": user_id, "type": "user"},
            source_ip=source_ip,
            details=details,
            tags=["auth", method]
        )
 
    def log_authorization(
        self,
        user_id: str,
        resource: str,
        action: str,
        outcome: str,
        source_ip: str
    ):
        """Log authorization event"""
        return self.log_event(
            event_type=SecurityEventType.AUTHORIZATION,
            severity=Severity.INFO if outcome == "allowed" else Severity.MEDIUM,
            action=f"authorization.{action}",
            outcome=outcome,
            actor={"user_id": user_id, "type": "user"},
            source_ip=source_ip,
            target={"resource": resource, "type": "resource"},
            tags=["authz", action]
        )
 
    def log_security_alert(
        self,
        alert_type: str,
        severity: Severity,
        description: str,
        source_ip: str = None,
        details: Dict = None
    ):
        """Log security alert"""
        return self.log_event(
            event_type=SecurityEventType.SECURITY_ALERT,
            severity=severity,
            action=f"alert.{alert_type}",
            outcome="detected",
            actor={"type": "system"},
            source_ip=source_ip,
            details={
                "description": description,
                **(details or {})
            },
            tags=["alert", alert_type]
        )

SIEM Integration

Log Forwarding to SIEM

# siem_forwarder.py
import json
import asyncio
from typing import Dict, List
from datetime import datetime
import aiohttp
import ssl
 
class SIEMForwarder:
    """Forward security logs to SIEM systems"""
 
    def __init__(self, config: Dict):
        self.config = config
        self.buffer = []
        self.buffer_size = config.get("buffer_size", 100)
        self.flush_interval = config.get("flush_interval", 30)
 
    async def forward_event(self, event: Dict):
        """Add event to buffer and flush if needed"""
        self.buffer.append(event)
 
        if len(self.buffer) >= self.buffer_size:
            await self.flush()
 
    async def flush(self):
        """Send buffered events to SIEM"""
        if not self.buffer:
            return
 
        events_to_send = self.buffer.copy()
        self.buffer.clear()
 
        # Send to configured SIEM
        siem_type = self.config.get("type")
 
        if siem_type == "splunk":
            await self._send_to_splunk(events_to_send)
        elif siem_type == "elastic":
            await self._send_to_elastic(events_to_send)
        elif siem_type == "sentinel":
            await self._send_to_sentinel(events_to_send)
 
    async def _send_to_splunk(self, events: List[Dict]):
        """Send events to Splunk HEC"""
        url = self.config["splunk"]["hec_url"]
        token = self.config["splunk"]["hec_token"]
 
        headers = {
            "Authorization": f"Splunk {token}",
            "Content-Type": "application/json"
        }
 
        # Format for Splunk HEC
        payload = ""
        for event in events:
            payload += json.dumps({
                "time": datetime.fromisoformat(
                    event["timestamp"].rstrip("Z")
                ).timestamp(),
                "source": event["source"]["application"],
                "sourcetype": "_json",
                "event": event
            }) + "\n"
 
        async with aiohttp.ClientSession() as session:
            async with session.post(
                url,
                headers=headers,
                data=payload,
                ssl=self._get_ssl_context()
            ) as response:
                if response.status != 200:
                    raise Exception(f"Splunk HEC error: {await response.text()}")
 
    async def _send_to_elastic(self, events: List[Dict]):
        """Send events to Elasticsearch"""
        url = self.config["elastic"]["url"]
        index = self.config["elastic"]["index"]
        api_key = self.config["elastic"]["api_key"]
 
        headers = {
            "Authorization": f"ApiKey {api_key}",
            "Content-Type": "application/x-ndjson"
        }
 
        # Format for bulk API
        payload = ""
        for event in events:
            payload += json.dumps({"index": {"_index": index}}) + "\n"
            payload += json.dumps(event) + "\n"
 
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{url}/_bulk",
                headers=headers,
                data=payload,
                ssl=self._get_ssl_context()
            ) as response:
                if response.status not in [200, 201]:
                    raise Exception(f"Elastic error: {await response.text()}")
 
    async def _send_to_sentinel(self, events: List[Dict]):
        """Send events to Microsoft Sentinel"""
        workspace_id = self.config["sentinel"]["workspace_id"]
        shared_key = self.config["sentinel"]["shared_key"]
        log_type = self.config["sentinel"]["log_type"]
 
        # Build authorization signature
        import hmac
        import base64
 
        body = json.dumps(events)
        content_length = len(body)
        rfc1123_date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
 
        string_to_sign = f"POST\n{content_length}\napplication/json\nx-ms-date:{rfc1123_date}\n/api/logs"
        signature = base64.b64encode(
            hmac.new(
                base64.b64decode(shared_key),
                string_to_sign.encode('utf-8'),
                'sha256'
            ).digest()
        ).decode('utf-8')
 
        headers = {
            "Authorization": f"SharedKey {workspace_id}:{signature}",
            "Content-Type": "application/json",
            "Log-Type": log_type,
            "x-ms-date": rfc1123_date
        }
 
        url = f"https://{workspace_id}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01"
 
        async with aiohttp.ClientSession() as session:
            async with session.post(url, headers=headers, data=body) as response:
                if response.status not in [200, 202]:
                    raise Exception(f"Sentinel error: {await response.text()}")
 
    def _get_ssl_context(self) -> ssl.SSLContext:
        """Get SSL context for secure connections"""
        ctx = ssl.create_default_context()
        if self.config.get("verify_ssl", True):
            ctx.check_hostname = True
            ctx.verify_mode = ssl.CERT_REQUIRED
        else:
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE
        return ctx

Correlation Rules Engine

Event Correlation and Detection

# correlation_engine.py
from dataclasses import dataclass
from typing import List, Dict, Callable, Optional
from datetime import datetime, timedelta
from collections import defaultdict
import re
 
@dataclass
class CorrelationRule:
    rule_id: str
    name: str
    description: str
    severity: str
    conditions: List[Dict]
    time_window: int  # seconds
    threshold: int
    action: str
 
class CorrelationEngine:
    """Real-time event correlation for threat detection"""
 
    def __init__(self):
        self.rules = []
        self.event_buffer = defaultdict(list)
        self.alerts = []
 
    def add_rule(self, rule: CorrelationRule):
        """Add correlation rule"""
        self.rules.append(rule)
 
    def process_event(self, event: Dict) -> List[Dict]:
        """Process event and check against correlation rules"""
        triggered_alerts = []
 
        for rule in self.rules:
            if self._matches_rule(event, rule):
                # Add to buffer for this rule
                key = f"{rule.rule_id}:{self._get_grouping_key(event, rule)}"
                self.event_buffer[key].append(event)
 
                # Clean old events
                self._clean_buffer(key, rule.time_window)
 
                # Check threshold
                if len(self.event_buffer[key]) >= rule.threshold:
                    alert = self._create_alert(rule, self.event_buffer[key])
                    triggered_alerts.append(alert)
                    self.event_buffer[key].clear()
 
        return triggered_alerts
 
    def _matches_rule(self, event: Dict, rule: CorrelationRule) -> bool:
        """Check if event matches rule conditions"""
        for condition in rule.conditions:
            field = condition["field"]
            operator = condition["operator"]
            value = condition["value"]
 
            event_value = self._get_nested_value(event, field)
 
            if not self._evaluate_condition(event_value, operator, value):
                return False
 
        return True
 
    def _evaluate_condition(
        self,
        event_value: Any,
        operator: str,
        value: Any
    ) -> bool:
        """Evaluate single condition"""
        if operator == "equals":
            return event_value == value
        elif operator == "not_equals":
            return event_value != value
        elif operator == "contains":
            return value in str(event_value)
        elif operator == "regex":
            return bool(re.match(value, str(event_value)))
        elif operator == "greater_than":
            return event_value > value
        elif operator == "less_than":
            return event_value < value
        elif operator == "in":
            return event_value in value
        elif operator == "not_in":
            return event_value not in value
 
        return False
 
    def _get_nested_value(self, obj: Dict, path: str) -> Any:
        """Get nested value from dict using dot notation"""
        keys = path.split(".")
        value = obj
        for key in keys:
            if isinstance(value, dict):
                value = value.get(key)
            else:
                return None
        return value
 
    def _get_grouping_key(self, event: Dict, rule: CorrelationRule) -> str:
        """Generate grouping key for correlation"""
        # Group by source IP and user by default
        parts = []
        if event.get("source", {}).get("ip"):
            parts.append(event["source"]["ip"])
        if event.get("actor", {}).get("user_id"):
            parts.append(event["actor"]["user_id"])
        return ":".join(parts) or "default"
 
    def _clean_buffer(self, key: str, time_window: int):
        """Remove events outside time window"""
        cutoff = datetime.utcnow() - timedelta(seconds=time_window)
        self.event_buffer[key] = [
            e for e in self.event_buffer[key]
            if datetime.fromisoformat(e["timestamp"].rstrip("Z")) > cutoff
        ]
 
    def _create_alert(
        self,
        rule: CorrelationRule,
        events: List[Dict]
    ) -> Dict:
        """Create alert from correlated events"""
        return {
            "alert_id": str(uuid.uuid4()),
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "rule_id": rule.rule_id,
            "rule_name": rule.name,
            "severity": rule.severity,
            "description": rule.description,
            "event_count": len(events),
            "first_event": events[0]["timestamp"],
            "last_event": events[-1]["timestamp"],
            "sample_events": events[:5],
            "action": rule.action
        }
 
# Pre-built detection rules
DETECTION_RULES = [
    CorrelationRule(
        rule_id="brute_force_login",
        name="Brute Force Login Attempt",
        description="Multiple failed login attempts from same IP",
        severity="high",
        conditions=[
            {"field": "event_type", "operator": "equals", "value": "authentication"},
            {"field": "outcome", "operator": "equals", "value": "failure"}
        ],
        time_window=300,  # 5 minutes
        threshold=5,
        action="block_ip"
    ),
    CorrelationRule(
        rule_id="privilege_escalation",
        name="Privilege Escalation Attempt",
        description="User attempting to access admin resources without authorization",
        severity="critical",
        conditions=[
            {"field": "event_type", "operator": "equals", "value": "authorization"},
            {"field": "outcome", "operator": "equals", "value": "denied"},
            {"field": "target.resource", "operator": "contains", "value": "admin"}
        ],
        time_window=60,
        threshold=3,
        action="alert_soc"
    ),
    CorrelationRule(
        rule_id="data_exfiltration",
        name="Potential Data Exfiltration",
        description="Large volume of data access in short time",
        severity="high",
        conditions=[
            {"field": "event_type", "operator": "equals", "value": "data_access"},
            {"field": "outcome", "operator": "equals", "value": "success"}
        ],
        time_window=300,
        threshold=100,
        action="alert_soc"
    ),
    CorrelationRule(
        rule_id="suspicious_api_activity",
        name="Suspicious API Activity",
        description="Unusual API call patterns",
        severity="medium",
        conditions=[
            {"field": "event_type", "operator": "equals", "value": "network_event"},
            {"field": "details.response_code", "operator": "in", "value": [401, 403, 429]}
        ],
        time_window=60,
        threshold=20,
        action="rate_limit"
    )
]

Alerting and Response

Alert Management System

# alert_manager.py
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum
import asyncio
import aiohttp
 
class AlertChannel(Enum):
    EMAIL = "email"
    SLACK = "slack"
    PAGERDUTY = "pagerduty"
    WEBHOOK = "webhook"
    SMS = "sms"
 
@dataclass
class AlertConfig:
    channel: AlertChannel
    severity_filter: List[str]
    endpoint: str
    credentials: Dict
 
class AlertManager:
    """Manage and route security alerts"""
 
    def __init__(self, configs: List[AlertConfig]):
        self.configs = configs
        self.alert_history = []
 
    async def send_alert(self, alert: Dict):
        """Route alert to appropriate channels"""
        severity = alert.get("severity", "medium")
 
        tasks = []
        for config in self.configs:
            if severity in config.severity_filter:
                tasks.append(self._send_to_channel(alert, config))
 
        # Send to all matching channels concurrently
        results = await asyncio.gather(*tasks, return_exceptions=True)
 
        # Log results
        self.alert_history.append({
            "alert": alert,
            "channels": [c.channel.value for c in self.configs],
            "results": results
        })
 
    async def _send_to_channel(
        self,
        alert: Dict,
        config: AlertConfig
    ):
        """Send alert to specific channel"""
        if config.channel == AlertChannel.SLACK:
            await self._send_slack(alert, config)
        elif config.channel == AlertChannel.PAGERDUTY:
            await self._send_pagerduty(alert, config)
        elif config.channel == AlertChannel.WEBHOOK:
            await self._send_webhook(alert, config)
 
    async def _send_slack(self, alert: Dict, config: AlertConfig):
        """Send alert to Slack"""
        severity_colors = {
            "critical": "#FF0000",
            "high": "#FF6600",
            "medium": "#FFCC00",
            "low": "#00FF00"
        }
 
        payload = {
            "attachments": [{
                "color": severity_colors.get(alert["severity"], "#808080"),
                "title": f"🚨 {alert['rule_name']}",
                "text": alert["description"],
                "fields": [
                    {"title": "Severity", "value": alert["severity"], "short": True},
                    {"title": "Events", "value": str(alert["event_count"]), "short": True},
                    {"title": "Time Range", "value": f"{alert['first_event']} - {alert['last_event']}", "short": False}
                ],
                "footer": f"Alert ID: {alert['alert_id']}"
            }]
        }
 
        async with aiohttp.ClientSession() as session:
            await session.post(config.endpoint, json=payload)
 
    async def _send_pagerduty(self, alert: Dict, config: AlertConfig):
        """Send alert to PagerDuty"""
        severity_map = {
            "critical": "critical",
            "high": "error",
            "medium": "warning",
            "low": "info"
        }
 
        payload = {
            "routing_key": config.credentials["routing_key"],
            "event_action": "trigger",
            "dedup_key": alert["rule_id"],
            "payload": {
                "summary": f"{alert['rule_name']}: {alert['description']}",
                "severity": severity_map.get(alert["severity"], "warning"),
                "source": "security-monitoring",
                "custom_details": {
                    "alert_id": alert["alert_id"],
                    "event_count": alert["event_count"],
                    "first_event": alert["first_event"],
                    "last_event": alert["last_event"]
                }
            }
        }
 
        async with aiohttp.ClientSession() as session:
            await session.post(
                "https://events.pagerduty.com/v2/enqueue",
                json=payload
            )
 
    async def _send_webhook(self, alert: Dict, config: AlertConfig):
        """Send alert to generic webhook"""
        headers = config.credentials.get("headers", {})
 
        async with aiohttp.ClientSession() as session:
            await session.post(
                config.endpoint,
                json=alert,
                headers=headers
            )

Automated Response

SOAR Integration

# automated_response.py
from typing import Dict, Callable, List
from dataclasses import dataclass
import asyncio
 
@dataclass
class ResponseAction:
    action_id: str
    name: str
    handler: Callable
    requires_approval: bool = False
 
class AutomatedResponse:
    """Security Orchestration, Automation and Response"""
 
    def __init__(self):
        self.actions = {}
        self.playbooks = {}
 
    def register_action(self, action: ResponseAction):
        """Register response action"""
        self.actions[action.action_id] = action
 
    def register_playbook(self, playbook_id: str, steps: List[str]):
        """Register response playbook"""
        self.playbooks[playbook_id] = steps
 
    async def execute_response(
        self,
        alert: Dict,
        action_id: str
    ) -> Dict:
        """Execute automated response"""
        action = self.actions.get(action_id)
        if not action:
            raise ValueError(f"Unknown action: {action_id}")
 
        if action.requires_approval:
            # Queue for human approval
            return {
                "status": "pending_approval",
                "action": action_id,
                "alert": alert["alert_id"]
            }
 
        # Execute action
        result = await action.handler(alert)
 
        return {
            "status": "executed",
            "action": action_id,
            "result": result
        }
 
    async def run_playbook(
        self,
        alert: Dict,
        playbook_id: str
    ) -> List[Dict]:
        """Run response playbook"""
        steps = self.playbooks.get(playbook_id, [])
        results = []
 
        for step in steps:
            result = await self.execute_response(alert, step)
            results.append(result)
 
            # Stop if step requires approval
            if result["status"] == "pending_approval":
                break
 
        return results
 
# Response action handlers
async def block_ip(alert: Dict) -> Dict:
    """Block IP address at firewall"""
    source_ip = alert.get("sample_events", [{}])[0].get("source", {}).get("ip")
 
    if not source_ip:
        return {"success": False, "error": "No IP found"}
 
    # Call firewall API to block IP
    # This would integrate with your firewall/WAF
 
    return {
        "success": True,
        "blocked_ip": source_ip,
        "duration": "24h"
    }
 
async def disable_user(alert: Dict) -> Dict:
    """Disable user account"""
    user_id = alert.get("sample_events", [{}])[0].get("actor", {}).get("user_id")
 
    if not user_id:
        return {"success": False, "error": "No user found"}
 
    # Call identity provider to disable user
    # This would integrate with your IdP
 
    return {
        "success": True,
        "disabled_user": user_id
    }
 
async def isolate_host(alert: Dict) -> Dict:
    """Isolate host from network"""
    hostname = alert.get("sample_events", [{}])[0].get("source", {}).get("hostname")
 
    if not hostname:
        return {"success": False, "error": "No hostname found"}
 
    # Call EDR/network management to isolate host
 
    return {
        "success": True,
        "isolated_host": hostname
    }
 
# Initialize response system
response_system = AutomatedResponse()
response_system.register_action(ResponseAction(
    action_id="block_ip",
    name="Block IP Address",
    handler=block_ip,
    requires_approval=False
))
response_system.register_action(ResponseAction(
    action_id="disable_user",
    name="Disable User Account",
    handler=disable_user,
    requires_approval=True  # Requires human approval
))
response_system.register_action(ResponseAction(
    action_id="isolate_host",
    name="Isolate Host",
    handler=isolate_host,
    requires_approval=True
))
 
# Register playbooks
response_system.register_playbook("brute_force_response", [
    "block_ip",
    "disable_user"
])

Summary

Effective security monitoring requires:

  1. Structured logging: Consistent, machine-readable security events
  2. SIEM integration: Centralized log collection and analysis
  3. Correlation rules: Detect complex attack patterns
  4. Alerting: Route alerts to the right teams
  5. Automated response: SOAR capabilities for rapid containment

Implement these components as part of your DevSecOps pipeline to enable proactive threat detection and rapid incident response.

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.