Effective security monitoring is the foundation of threat detection and incident response. This guide covers implementing comprehensive logging, SIEM integration, and automated response capabilities for modern applications.
Security Logging Architecture
Structured Security Log Format
# security_logger.py
import json
import logging
import hashlib
from datetime import datetime
from typing import Dict, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
import uuid
class SecurityEventType(Enum):
AUTHENTICATION = "authentication"
AUTHORIZATION = "authorization"
DATA_ACCESS = "data_access"
CONFIGURATION_CHANGE = "configuration_change"
SECURITY_ALERT = "security_alert"
NETWORK_EVENT = "network_event"
FILE_OPERATION = "file_operation"
SYSTEM_EVENT = "system_event"
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class SecurityEvent:
"""Structured security event following common logging standards"""
event_id: str
timestamp: str
event_type: str
severity: str
source: Dict[str, Any]
actor: Dict[str, Any]
action: str
outcome: str
target: Optional[Dict[str, Any]] = None
details: Optional[Dict[str, Any]] = None
tags: Optional[list] = None
correlation_id: Optional[str] = None
class SecurityLogger:
"""Centralized security logging with SIEM compatibility"""
def __init__(self, app_name: str, environment: str):
self.app_name = app_name
self.environment = environment
self.logger = logging.getLogger("security")
self._setup_handlers()
def _setup_handlers(self):
"""Configure logging handlers"""
# JSON formatter for SIEM ingestion
json_formatter = logging.Formatter(
'%(message)s' # We'll format as JSON ourselves
)
# File handler for persistent storage
file_handler = logging.FileHandler('/var/log/security/events.json')
file_handler.setFormatter(json_formatter)
self.logger.addHandler(file_handler)
# Console handler for real-time monitoring
console_handler = logging.StreamHandler()
console_handler.setFormatter(json_formatter)
self.logger.addHandler(console_handler)
self.logger.setLevel(logging.INFO)
def log_event(
self,
event_type: SecurityEventType,
severity: Severity,
action: str,
outcome: str,
actor: Dict[str, Any],
source_ip: str = None,
target: Dict[str, Any] = None,
details: Dict[str, Any] = None,
correlation_id: str = None,
tags: list = None
):
"""Log a security event"""
event = SecurityEvent(
event_id=str(uuid.uuid4()),
timestamp=datetime.utcnow().isoformat() + "Z",
event_type=event_type.value,
severity=severity.value,
source={
"application": self.app_name,
"environment": self.environment,
"ip": source_ip,
"hostname": self._get_hostname()
},
actor=actor,
action=action,
outcome=outcome,
target=target,
details=details,
tags=tags or [],
correlation_id=correlation_id
)
# Compute event hash for integrity
event_dict = asdict(event)
event_dict["hash"] = self._compute_hash(event_dict)
# Log as JSON
self.logger.info(json.dumps(event_dict))
# Trigger alerts for high severity events
if severity in [Severity.CRITICAL, Severity.HIGH]:
self._trigger_alert(event_dict)
return event.event_id
def _compute_hash(self, event: Dict) -> str:
"""Compute integrity hash for event"""
# Exclude hash field from computation
event_copy = {k: v for k, v in event.items() if k != "hash"}
content = json.dumps(event_copy, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()[:16]
def _get_hostname(self) -> str:
"""Get server hostname"""
import socket
return socket.gethostname()
def _trigger_alert(self, event: Dict):
"""Trigger alert for critical events"""
# Send to alerting system (webhook, PagerDuty, etc.)
pass
# Convenience methods for common event types
def log_authentication(
self,
user_id: str,
outcome: str,
method: str,
source_ip: str,
details: Dict = None
):
"""Log authentication event"""
return self.log_event(
event_type=SecurityEventType.AUTHENTICATION,
severity=Severity.INFO if outcome == "success" else Severity.MEDIUM,
action=f"authentication.{method}",
outcome=outcome,
actor={"user_id": user_id, "type": "user"},
source_ip=source_ip,
details=details,
tags=["auth", method]
)
def log_authorization(
self,
user_id: str,
resource: str,
action: str,
outcome: str,
source_ip: str
):
"""Log authorization event"""
return self.log_event(
event_type=SecurityEventType.AUTHORIZATION,
severity=Severity.INFO if outcome == "allowed" else Severity.MEDIUM,
action=f"authorization.{action}",
outcome=outcome,
actor={"user_id": user_id, "type": "user"},
source_ip=source_ip,
target={"resource": resource, "type": "resource"},
tags=["authz", action]
)
def log_security_alert(
self,
alert_type: str,
severity: Severity,
description: str,
source_ip: str = None,
details: Dict = None
):
"""Log security alert"""
return self.log_event(
event_type=SecurityEventType.SECURITY_ALERT,
severity=severity,
action=f"alert.{alert_type}",
outcome="detected",
actor={"type": "system"},
source_ip=source_ip,
details={
"description": description,
**(details or {})
},
tags=["alert", alert_type]
)SIEM Integration
Log Forwarding to SIEM
# siem_forwarder.py
import json
import asyncio
from typing import Dict, List
from datetime import datetime
import aiohttp
import ssl
class SIEMForwarder:
"""Forward security logs to SIEM systems"""
def __init__(self, config: Dict):
self.config = config
self.buffer = []
self.buffer_size = config.get("buffer_size", 100)
self.flush_interval = config.get("flush_interval", 30)
async def forward_event(self, event: Dict):
"""Add event to buffer and flush if needed"""
self.buffer.append(event)
if len(self.buffer) >= self.buffer_size:
await self.flush()
async def flush(self):
"""Send buffered events to SIEM"""
if not self.buffer:
return
events_to_send = self.buffer.copy()
self.buffer.clear()
# Send to configured SIEM
siem_type = self.config.get("type")
if siem_type == "splunk":
await self._send_to_splunk(events_to_send)
elif siem_type == "elastic":
await self._send_to_elastic(events_to_send)
elif siem_type == "sentinel":
await self._send_to_sentinel(events_to_send)
async def _send_to_splunk(self, events: List[Dict]):
"""Send events to Splunk HEC"""
url = self.config["splunk"]["hec_url"]
token = self.config["splunk"]["hec_token"]
headers = {
"Authorization": f"Splunk {token}",
"Content-Type": "application/json"
}
# Format for Splunk HEC
payload = ""
for event in events:
payload += json.dumps({
"time": datetime.fromisoformat(
event["timestamp"].rstrip("Z")
).timestamp(),
"source": event["source"]["application"],
"sourcetype": "_json",
"event": event
}) + "\n"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers=headers,
data=payload,
ssl=self._get_ssl_context()
) as response:
if response.status != 200:
raise Exception(f"Splunk HEC error: {await response.text()}")
async def _send_to_elastic(self, events: List[Dict]):
"""Send events to Elasticsearch"""
url = self.config["elastic"]["url"]
index = self.config["elastic"]["index"]
api_key = self.config["elastic"]["api_key"]
headers = {
"Authorization": f"ApiKey {api_key}",
"Content-Type": "application/x-ndjson"
}
# Format for bulk API
payload = ""
for event in events:
payload += json.dumps({"index": {"_index": index}}) + "\n"
payload += json.dumps(event) + "\n"
async with aiohttp.ClientSession() as session:
async with session.post(
f"{url}/_bulk",
headers=headers,
data=payload,
ssl=self._get_ssl_context()
) as response:
if response.status not in [200, 201]:
raise Exception(f"Elastic error: {await response.text()}")
async def _send_to_sentinel(self, events: List[Dict]):
"""Send events to Microsoft Sentinel"""
workspace_id = self.config["sentinel"]["workspace_id"]
shared_key = self.config["sentinel"]["shared_key"]
log_type = self.config["sentinel"]["log_type"]
# Build authorization signature
import hmac
import base64
body = json.dumps(events)
content_length = len(body)
rfc1123_date = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
string_to_sign = f"POST\n{content_length}\napplication/json\nx-ms-date:{rfc1123_date}\n/api/logs"
signature = base64.b64encode(
hmac.new(
base64.b64decode(shared_key),
string_to_sign.encode('utf-8'),
'sha256'
).digest()
).decode('utf-8')
headers = {
"Authorization": f"SharedKey {workspace_id}:{signature}",
"Content-Type": "application/json",
"Log-Type": log_type,
"x-ms-date": rfc1123_date
}
url = f"https://{workspace_id}.ods.opinsights.azure.com/api/logs?api-version=2016-04-01"
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, data=body) as response:
if response.status not in [200, 202]:
raise Exception(f"Sentinel error: {await response.text()}")
def _get_ssl_context(self) -> ssl.SSLContext:
"""Get SSL context for secure connections"""
ctx = ssl.create_default_context()
if self.config.get("verify_ssl", True):
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
else:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctxCorrelation Rules Engine
Event Correlation and Detection
# correlation_engine.py
from dataclasses import dataclass
from typing import List, Dict, Callable, Optional
from datetime import datetime, timedelta
from collections import defaultdict
import re
@dataclass
class CorrelationRule:
rule_id: str
name: str
description: str
severity: str
conditions: List[Dict]
time_window: int # seconds
threshold: int
action: str
class CorrelationEngine:
"""Real-time event correlation for threat detection"""
def __init__(self):
self.rules = []
self.event_buffer = defaultdict(list)
self.alerts = []
def add_rule(self, rule: CorrelationRule):
"""Add correlation rule"""
self.rules.append(rule)
def process_event(self, event: Dict) -> List[Dict]:
"""Process event and check against correlation rules"""
triggered_alerts = []
for rule in self.rules:
if self._matches_rule(event, rule):
# Add to buffer for this rule
key = f"{rule.rule_id}:{self._get_grouping_key(event, rule)}"
self.event_buffer[key].append(event)
# Clean old events
self._clean_buffer(key, rule.time_window)
# Check threshold
if len(self.event_buffer[key]) >= rule.threshold:
alert = self._create_alert(rule, self.event_buffer[key])
triggered_alerts.append(alert)
self.event_buffer[key].clear()
return triggered_alerts
def _matches_rule(self, event: Dict, rule: CorrelationRule) -> bool:
"""Check if event matches rule conditions"""
for condition in rule.conditions:
field = condition["field"]
operator = condition["operator"]
value = condition["value"]
event_value = self._get_nested_value(event, field)
if not self._evaluate_condition(event_value, operator, value):
return False
return True
def _evaluate_condition(
self,
event_value: Any,
operator: str,
value: Any
) -> bool:
"""Evaluate single condition"""
if operator == "equals":
return event_value == value
elif operator == "not_equals":
return event_value != value
elif operator == "contains":
return value in str(event_value)
elif operator == "regex":
return bool(re.match(value, str(event_value)))
elif operator == "greater_than":
return event_value > value
elif operator == "less_than":
return event_value < value
elif operator == "in":
return event_value in value
elif operator == "not_in":
return event_value not in value
return False
def _get_nested_value(self, obj: Dict, path: str) -> Any:
"""Get nested value from dict using dot notation"""
keys = path.split(".")
value = obj
for key in keys:
if isinstance(value, dict):
value = value.get(key)
else:
return None
return value
def _get_grouping_key(self, event: Dict, rule: CorrelationRule) -> str:
"""Generate grouping key for correlation"""
# Group by source IP and user by default
parts = []
if event.get("source", {}).get("ip"):
parts.append(event["source"]["ip"])
if event.get("actor", {}).get("user_id"):
parts.append(event["actor"]["user_id"])
return ":".join(parts) or "default"
def _clean_buffer(self, key: str, time_window: int):
"""Remove events outside time window"""
cutoff = datetime.utcnow() - timedelta(seconds=time_window)
self.event_buffer[key] = [
e for e in self.event_buffer[key]
if datetime.fromisoformat(e["timestamp"].rstrip("Z")) > cutoff
]
def _create_alert(
self,
rule: CorrelationRule,
events: List[Dict]
) -> Dict:
"""Create alert from correlated events"""
return {
"alert_id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat() + "Z",
"rule_id": rule.rule_id,
"rule_name": rule.name,
"severity": rule.severity,
"description": rule.description,
"event_count": len(events),
"first_event": events[0]["timestamp"],
"last_event": events[-1]["timestamp"],
"sample_events": events[:5],
"action": rule.action
}
# Pre-built detection rules
DETECTION_RULES = [
CorrelationRule(
rule_id="brute_force_login",
name="Brute Force Login Attempt",
description="Multiple failed login attempts from same IP",
severity="high",
conditions=[
{"field": "event_type", "operator": "equals", "value": "authentication"},
{"field": "outcome", "operator": "equals", "value": "failure"}
],
time_window=300, # 5 minutes
threshold=5,
action="block_ip"
),
CorrelationRule(
rule_id="privilege_escalation",
name="Privilege Escalation Attempt",
description="User attempting to access admin resources without authorization",
severity="critical",
conditions=[
{"field": "event_type", "operator": "equals", "value": "authorization"},
{"field": "outcome", "operator": "equals", "value": "denied"},
{"field": "target.resource", "operator": "contains", "value": "admin"}
],
time_window=60,
threshold=3,
action="alert_soc"
),
CorrelationRule(
rule_id="data_exfiltration",
name="Potential Data Exfiltration",
description="Large volume of data access in short time",
severity="high",
conditions=[
{"field": "event_type", "operator": "equals", "value": "data_access"},
{"field": "outcome", "operator": "equals", "value": "success"}
],
time_window=300,
threshold=100,
action="alert_soc"
),
CorrelationRule(
rule_id="suspicious_api_activity",
name="Suspicious API Activity",
description="Unusual API call patterns",
severity="medium",
conditions=[
{"field": "event_type", "operator": "equals", "value": "network_event"},
{"field": "details.response_code", "operator": "in", "value": [401, 403, 429]}
],
time_window=60,
threshold=20,
action="rate_limit"
)
]Alerting and Response
Alert Management System
# alert_manager.py
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum
import asyncio
import aiohttp
class AlertChannel(Enum):
EMAIL = "email"
SLACK = "slack"
PAGERDUTY = "pagerduty"
WEBHOOK = "webhook"
SMS = "sms"
@dataclass
class AlertConfig:
channel: AlertChannel
severity_filter: List[str]
endpoint: str
credentials: Dict
class AlertManager:
"""Manage and route security alerts"""
def __init__(self, configs: List[AlertConfig]):
self.configs = configs
self.alert_history = []
async def send_alert(self, alert: Dict):
"""Route alert to appropriate channels"""
severity = alert.get("severity", "medium")
tasks = []
for config in self.configs:
if severity in config.severity_filter:
tasks.append(self._send_to_channel(alert, config))
# Send to all matching channels concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
# Log results
self.alert_history.append({
"alert": alert,
"channels": [c.channel.value for c in self.configs],
"results": results
})
async def _send_to_channel(
self,
alert: Dict,
config: AlertConfig
):
"""Send alert to specific channel"""
if config.channel == AlertChannel.SLACK:
await self._send_slack(alert, config)
elif config.channel == AlertChannel.PAGERDUTY:
await self._send_pagerduty(alert, config)
elif config.channel == AlertChannel.WEBHOOK:
await self._send_webhook(alert, config)
async def _send_slack(self, alert: Dict, config: AlertConfig):
"""Send alert to Slack"""
severity_colors = {
"critical": "#FF0000",
"high": "#FF6600",
"medium": "#FFCC00",
"low": "#00FF00"
}
payload = {
"attachments": [{
"color": severity_colors.get(alert["severity"], "#808080"),
"title": f"🚨 {alert['rule_name']}",
"text": alert["description"],
"fields": [
{"title": "Severity", "value": alert["severity"], "short": True},
{"title": "Events", "value": str(alert["event_count"]), "short": True},
{"title": "Time Range", "value": f"{alert['first_event']} - {alert['last_event']}", "short": False}
],
"footer": f"Alert ID: {alert['alert_id']}"
}]
}
async with aiohttp.ClientSession() as session:
await session.post(config.endpoint, json=payload)
async def _send_pagerduty(self, alert: Dict, config: AlertConfig):
"""Send alert to PagerDuty"""
severity_map = {
"critical": "critical",
"high": "error",
"medium": "warning",
"low": "info"
}
payload = {
"routing_key": config.credentials["routing_key"],
"event_action": "trigger",
"dedup_key": alert["rule_id"],
"payload": {
"summary": f"{alert['rule_name']}: {alert['description']}",
"severity": severity_map.get(alert["severity"], "warning"),
"source": "security-monitoring",
"custom_details": {
"alert_id": alert["alert_id"],
"event_count": alert["event_count"],
"first_event": alert["first_event"],
"last_event": alert["last_event"]
}
}
}
async with aiohttp.ClientSession() as session:
await session.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload
)
async def _send_webhook(self, alert: Dict, config: AlertConfig):
"""Send alert to generic webhook"""
headers = config.credentials.get("headers", {})
async with aiohttp.ClientSession() as session:
await session.post(
config.endpoint,
json=alert,
headers=headers
)Automated Response
SOAR Integration
# automated_response.py
from typing import Dict, Callable, List
from dataclasses import dataclass
import asyncio
@dataclass
class ResponseAction:
action_id: str
name: str
handler: Callable
requires_approval: bool = False
class AutomatedResponse:
"""Security Orchestration, Automation and Response"""
def __init__(self):
self.actions = {}
self.playbooks = {}
def register_action(self, action: ResponseAction):
"""Register response action"""
self.actions[action.action_id] = action
def register_playbook(self, playbook_id: str, steps: List[str]):
"""Register response playbook"""
self.playbooks[playbook_id] = steps
async def execute_response(
self,
alert: Dict,
action_id: str
) -> Dict:
"""Execute automated response"""
action = self.actions.get(action_id)
if not action:
raise ValueError(f"Unknown action: {action_id}")
if action.requires_approval:
# Queue for human approval
return {
"status": "pending_approval",
"action": action_id,
"alert": alert["alert_id"]
}
# Execute action
result = await action.handler(alert)
return {
"status": "executed",
"action": action_id,
"result": result
}
async def run_playbook(
self,
alert: Dict,
playbook_id: str
) -> List[Dict]:
"""Run response playbook"""
steps = self.playbooks.get(playbook_id, [])
results = []
for step in steps:
result = await self.execute_response(alert, step)
results.append(result)
# Stop if step requires approval
if result["status"] == "pending_approval":
break
return results
# Response action handlers
async def block_ip(alert: Dict) -> Dict:
"""Block IP address at firewall"""
source_ip = alert.get("sample_events", [{}])[0].get("source", {}).get("ip")
if not source_ip:
return {"success": False, "error": "No IP found"}
# Call firewall API to block IP
# This would integrate with your firewall/WAF
return {
"success": True,
"blocked_ip": source_ip,
"duration": "24h"
}
async def disable_user(alert: Dict) -> Dict:
"""Disable user account"""
user_id = alert.get("sample_events", [{}])[0].get("actor", {}).get("user_id")
if not user_id:
return {"success": False, "error": "No user found"}
# Call identity provider to disable user
# This would integrate with your IdP
return {
"success": True,
"disabled_user": user_id
}
async def isolate_host(alert: Dict) -> Dict:
"""Isolate host from network"""
hostname = alert.get("sample_events", [{}])[0].get("source", {}).get("hostname")
if not hostname:
return {"success": False, "error": "No hostname found"}
# Call EDR/network management to isolate host
return {
"success": True,
"isolated_host": hostname
}
# Initialize response system
response_system = AutomatedResponse()
response_system.register_action(ResponseAction(
action_id="block_ip",
name="Block IP Address",
handler=block_ip,
requires_approval=False
))
response_system.register_action(ResponseAction(
action_id="disable_user",
name="Disable User Account",
handler=disable_user,
requires_approval=True # Requires human approval
))
response_system.register_action(ResponseAction(
action_id="isolate_host",
name="Isolate Host",
handler=isolate_host,
requires_approval=True
))
# Register playbooks
response_system.register_playbook("brute_force_response", [
"block_ip",
"disable_user"
])Summary
Effective security monitoring requires:
- Structured logging: Consistent, machine-readable security events
- SIEM integration: Centralized log collection and analysis
- Correlation rules: Detect complex attack patterns
- Alerting: Route alerts to the right teams
- Automated response: SOAR capabilities for rapid containment
Implement these components as part of your DevSecOps pipeline to enable proactive threat detection and rapid incident response.