AI Security

AI Security Monitoring and Observability: Real-Time Threat Detection for ML Systems

DeviDevs Team
13 min read
#AI security#monitoring#observability#anomaly detection#MLOps

AI Security Monitoring and Observability: Real-Time Threat Detection for ML Systems

AI systems require specialized monitoring beyond traditional application observability. This guide covers security monitoring patterns for detecting attacks, drift, and anomalies in ML deployments.

AI-Specific Monitoring Architecture

Core Monitoring Framework

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
import numpy as np
from collections import deque
import threading
import logging
 
logger = logging.getLogger(__name__)
 
class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"
 
class ThreatCategory(Enum):
    INPUT_MANIPULATION = "input_manipulation"
    MODEL_EXTRACTION = "model_extraction"
    DATA_POISONING = "data_poisoning"
    ADVERSARIAL_ATTACK = "adversarial_attack"
    PROMPT_INJECTION = "prompt_injection"
    UNUSUAL_BEHAVIOR = "unusual_behavior"
 
@dataclass
class SecurityAlert:
    id: str
    timestamp: datetime
    category: ThreatCategory
    severity: AlertSeverity
    description: str
    details: Dict[str, Any]
    source_ip: Optional[str] = None
    user_id: Optional[str] = None
    model_id: Optional[str] = None
    request_id: Optional[str] = None
    mitigated: bool = False
 
@dataclass
class ModelMetrics:
    model_id: str
    timestamp: datetime
    latency_ms: float
    input_size: int
    output_confidence: float
    prediction_class: Optional[str] = None
    input_hash: Optional[str] = None
    anomaly_score: float = 0.0
 
class AISecurityMonitor:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.alerts: deque = deque(maxlen=10000)
        self.metrics_buffer: deque = deque(maxlen=50000)
        self.alert_handlers: List[Callable[[SecurityAlert], None]] = []
        self.detection_rules: Dict[str, Callable] = {}
        self.baseline_stats: Dict[str, Dict] = {}
        self._lock = threading.Lock()
 
        # Initialize detection rules
        self._register_default_rules()
 
    def _register_default_rules(self):
        """Register default security detection rules."""
        self.detection_rules = {
            "high_frequency_requests": self._detect_high_frequency,
            "anomalous_input": self._detect_anomalous_input,
            "confidence_drift": self._detect_confidence_drift,
            "latency_anomaly": self._detect_latency_anomaly,
            "repeated_queries": self._detect_repeated_queries,
            "input_perturbation": self._detect_input_perturbation
        }
 
    def record_inference(self, metrics: ModelMetrics) -> List[SecurityAlert]:
        """Record model inference and check for security issues."""
        with self._lock:
            self.metrics_buffer.append(metrics)
 
        alerts = []
        for rule_name, rule_func in self.detection_rules.items():
            try:
                alert = rule_func(metrics)
                if alert:
                    alerts.append(alert)
                    self._handle_alert(alert)
            except Exception as e:
                logger.error(f"Detection rule {rule_name} failed: {e}")
 
        return alerts
 
    def _detect_high_frequency(self, metrics: ModelMetrics) -> Optional[SecurityAlert]:
        """Detect unusually high request frequency (potential extraction attack)."""
        window_seconds = self.config.get("frequency_window", 60)
        threshold = self.config.get("frequency_threshold", 100)
 
        cutoff = datetime.utcnow() - timedelta(seconds=window_seconds)
        recent = [m for m in self.metrics_buffer if m.timestamp > cutoff]
 
        # Group by source (could be IP or user)
        source_counts: Dict[str, int] = {}
        for m in recent:
            # Use model_id as grouping key for this example
            source_counts[m.model_id] = source_counts.get(m.model_id, 0) + 1
 
        for source, count in source_counts.items():
            if count > threshold:
                return SecurityAlert(
                    id=f"alert_{datetime.utcnow().timestamp()}",
                    timestamp=datetime.utcnow(),
                    category=ThreatCategory.MODEL_EXTRACTION,
                    severity=AlertSeverity.WARNING,
                    description=f"High frequency requests detected: {count} in {window_seconds}s",
                    details={
                        "request_count": count,
                        "window_seconds": window_seconds,
                        "threshold": threshold,
                        "source": source
                    },
                    model_id=metrics.model_id
                )
        return None
 
    def _detect_anomalous_input(self, metrics: ModelMetrics) -> Optional[SecurityAlert]:
        """Detect statistically anomalous inputs."""
        model_stats = self.baseline_stats.get(metrics.model_id, {})
 
        if not model_stats:
            return None
 
        # Check input size anomaly
        mean_size = model_stats.get("mean_input_size", 0)
        std_size = model_stats.get("std_input_size", 1)
 
        if std_size > 0:
            z_score = abs(metrics.input_size - mean_size) / std_size
 
            if z_score > self.config.get("input_anomaly_threshold", 3):
                return SecurityAlert(
                    id=f"alert_{datetime.utcnow().timestamp()}",
                    timestamp=datetime.utcnow(),
                    category=ThreatCategory.INPUT_MANIPULATION,
                    severity=AlertSeverity.WARNING,
                    description=f"Anomalous input size detected (z-score: {z_score:.2f})",
                    details={
                        "input_size": metrics.input_size,
                        "mean_size": mean_size,
                        "std_size": std_size,
                        "z_score": z_score
                    },
                    model_id=metrics.model_id,
                    request_id=metrics.input_hash
                )
        return None
 
    def _detect_confidence_drift(self, metrics: ModelMetrics) -> Optional[SecurityAlert]:
        """Detect unusual confidence score patterns."""
        model_stats = self.baseline_stats.get(metrics.model_id, {})
 
        if not model_stats:
            return None
 
        # Check for adversarial-like confidence (very high or clustered at boundaries)
        if metrics.output_confidence > 0.99 or metrics.output_confidence < 0.01:
            # Get recent confidence scores
            recent = [m for m in self.metrics_buffer
                     if m.model_id == metrics.model_id][-100:]
 
            extreme_count = sum(1 for m in recent
                               if m.output_confidence > 0.99 or m.output_confidence < 0.01)
 
            if extreme_count > len(recent) * 0.3:  # 30% threshold
                return SecurityAlert(
                    id=f"alert_{datetime.utcnow().timestamp()}",
                    timestamp=datetime.utcnow(),
                    category=ThreatCategory.ADVERSARIAL_ATTACK,
                    severity=AlertSeverity.WARNING,
                    description="Unusual confidence distribution detected",
                    details={
                        "extreme_confidence_ratio": extreme_count / len(recent),
                        "current_confidence": metrics.output_confidence,
                        "sample_size": len(recent)
                    },
                    model_id=metrics.model_id
                )
        return None
 
    def _detect_latency_anomaly(self, metrics: ModelMetrics) -> Optional[SecurityAlert]:
        """Detect latency anomalies that might indicate attacks."""
        model_stats = self.baseline_stats.get(metrics.model_id, {})
 
        if not model_stats:
            return None
 
        mean_latency = model_stats.get("mean_latency", 0)
        std_latency = model_stats.get("std_latency", 1)
 
        if std_latency > 0:
            z_score = (metrics.latency_ms - mean_latency) / std_latency
 
            # Unusually slow could indicate complex adversarial input
            if z_score > self.config.get("latency_anomaly_threshold", 4):
                return SecurityAlert(
                    id=f"alert_{datetime.utcnow().timestamp()}",
                    timestamp=datetime.utcnow(),
                    category=ThreatCategory.ADVERSARIAL_ATTACK,
                    severity=AlertSeverity.INFO,
                    description=f"Latency anomaly detected (z-score: {z_score:.2f})",
                    details={
                        "latency_ms": metrics.latency_ms,
                        "mean_latency": mean_latency,
                        "z_score": z_score
                    },
                    model_id=metrics.model_id
                )
        return None
 
    def _detect_repeated_queries(self, metrics: ModelMetrics) -> Optional[SecurityAlert]:
        """Detect repeated or near-identical queries (probing behavior)."""
        if not metrics.input_hash:
            return None
 
        window_minutes = self.config.get("repeat_window_minutes", 5)
        threshold = self.config.get("repeat_threshold", 10)
 
        cutoff = datetime.utcnow() - timedelta(minutes=window_minutes)
        recent = [m for m in self.metrics_buffer
                 if m.timestamp > cutoff and m.input_hash]
 
        hash_counts: Dict[str, int] = {}
        for m in recent:
            hash_counts[m.input_hash] = hash_counts.get(m.input_hash, 0) + 1
 
        if hash_counts.get(metrics.input_hash, 0) > threshold:
            return SecurityAlert(
                id=f"alert_{datetime.utcnow().timestamp()}",
                timestamp=datetime.utcnow(),
                category=ThreatCategory.MODEL_EXTRACTION,
                severity=AlertSeverity.WARNING,
                description="Repeated identical queries detected",
                details={
                    "input_hash": metrics.input_hash,
                    "repeat_count": hash_counts[metrics.input_hash],
                    "window_minutes": window_minutes
                },
                model_id=metrics.model_id
            )
        return None
 
    def _detect_input_perturbation(self, metrics: ModelMetrics) -> Optional[SecurityAlert]:
        """Detect systematic input perturbation patterns."""
        # Get recent inputs for the same model
        recent = [m for m in self.metrics_buffer
                 if m.model_id == metrics.model_id][-50:]
 
        if len(recent) < 10:
            return None
 
        # Analyze input size patterns for systematic perturbation
        sizes = [m.input_size for m in recent]
        size_diffs = [abs(sizes[i] - sizes[i-1]) for i in range(1, len(sizes))]
 
        # Check for suspiciously uniform small differences
        if size_diffs:
            avg_diff = np.mean(size_diffs)
            std_diff = np.std(size_diffs)
 
            # Very consistent small changes suggest adversarial probing
            if avg_diff < 10 and std_diff < 2:
                return SecurityAlert(
                    id=f"alert_{datetime.utcnow().timestamp()}",
                    timestamp=datetime.utcnow(),
                    category=ThreatCategory.ADVERSARIAL_ATTACK,
                    severity=AlertSeverity.WARNING,
                    description="Systematic input perturbation pattern detected",
                    details={
                        "avg_size_diff": avg_diff,
                        "std_size_diff": std_diff,
                        "sample_count": len(recent)
                    },
                    model_id=metrics.model_id
                )
        return None
 
    def update_baseline(self, model_id: str, metrics_window: List[ModelMetrics]):
        """Update baseline statistics for a model."""
        if not metrics_window:
            return
 
        input_sizes = [m.input_size for m in metrics_window]
        latencies = [m.latency_ms for m in metrics_window]
        confidences = [m.output_confidence for m in metrics_window]
 
        self.baseline_stats[model_id] = {
            "mean_input_size": np.mean(input_sizes),
            "std_input_size": np.std(input_sizes),
            "mean_latency": np.mean(latencies),
            "std_latency": np.std(latencies),
            "mean_confidence": np.mean(confidences),
            "std_confidence": np.std(confidences),
            "updated_at": datetime.utcnow().isoformat()
        }
 
    def register_alert_handler(self, handler: Callable[[SecurityAlert], None]):
        """Register a handler for security alerts."""
        self.alert_handlers.append(handler)
 
    def _handle_alert(self, alert: SecurityAlert):
        """Process and distribute alert to handlers."""
        with self._lock:
            self.alerts.append(alert)
 
        for handler in self.alert_handlers:
            try:
                handler(alert)
            except Exception as e:
                logger.error(f"Alert handler failed: {e}")
 
    def get_alerts(self,
                   severity: Optional[AlertSeverity] = None,
                   category: Optional[ThreatCategory] = None,
                   since: Optional[datetime] = None) -> List[SecurityAlert]:
        """Query alerts with optional filters."""
        alerts = list(self.alerts)
 
        if severity:
            alerts = [a for a in alerts if a.severity == severity]
        if category:
            alerts = [a for a in alerts if a.category == category]
        if since:
            alerts = [a for a in alerts if a.timestamp > since]
 
        return alerts

LLM-Specific Monitoring

Prompt Injection Detection

import re
from typing import List, Tuple
import hashlib
 
class LLMSecurityMonitor:
    def __init__(self):
        self.prompt_patterns = self._load_injection_patterns()
        self.seen_prompts: dict = {}  # For deduplication
        self.jailbreak_signatures = self._load_jailbreak_signatures()
 
    def _load_injection_patterns(self) -> List[Tuple[str, str, float]]:
        """Load patterns for prompt injection detection."""
        return [
            # (pattern, description, severity_score)
            (r"ignore\s+(previous|all|above)\s+instructions?",
             "Instruction override attempt", 0.9),
            (r"disregard\s+(the\s+)?(system|previous)\s+(prompt|instructions?)",
             "System prompt override", 0.95),
            (r"you\s+are\s+now\s+(a|an|in)\s+",
             "Role reassignment attempt", 0.7),
            (r"pretend\s+(you('re|are)|to\s+be)",
             "Role pretending attempt", 0.6),
            (r"act\s+as\s+(if|though|a|an)",
             "Role acting attempt", 0.5),
            (r"(new|different|alternate)\s+persona",
             "Persona switch attempt", 0.7),
            (r"output\s+(the\s+)?(system|initial|original)\s+prompt",
             "Prompt extraction attempt", 0.85),
            (r"reveal\s+(your|the)\s+(instructions?|prompt|programming)",
             "Instruction reveal attempt", 0.85),
            (r"what\s+(are|were)\s+your\s+(initial|system|original)\s+instructions?",
             "System prompt query", 0.8),
            (r"developer\s+mode|sudo\s+mode|admin\s+mode",
             "Privilege escalation attempt", 0.9),
            (r"<\|?(system|end|im_start|im_end)\|?>",
             "Token injection attempt", 0.95),
            (r"\\n\\n.*human:|\\n\\nassistant:",
             "Conversation injection", 0.9),
        ]
 
    def _load_jailbreak_signatures(self) -> List[dict]:
        """Load known jailbreak attempt signatures."""
        return [
            {
                "name": "DAN",
                "patterns": [r"DAN\s*(\d+)?", r"do\s+anything\s+now"],
                "severity": 0.95
            },
            {
                "name": "Developer Mode",
                "patterns": [r"developer\s+mode\s+(enabled|on|active)"],
                "severity": 0.9
            },
            {
                "name": "Evil Twin",
                "patterns": [r"evil\s+(twin|version|mode)", r"opposite\s+day"],
                "severity": 0.85
            },
            {
                "name": "Token Smuggling",
                "patterns": [r"<\|.*?\|>", r"\[INST\]", r"\[/INST\]"],
                "severity": 0.95
            }
        ]
 
    def analyze_prompt(self, prompt: str, context: dict = None) -> dict:
        """Analyze prompt for security issues."""
        results = {
            "is_safe": True,
            "risk_score": 0.0,
            "detections": [],
            "recommendations": []
        }
 
        # Normalize prompt for analysis
        normalized = prompt.lower().strip()
 
        # Check injection patterns
        for pattern, description, severity in self.prompt_patterns:
            if re.search(pattern, normalized, re.IGNORECASE):
                results["detections"].append({
                    "type": "prompt_injection",
                    "pattern": pattern,
                    "description": description,
                    "severity": severity
                })
                results["risk_score"] = max(results["risk_score"], severity)
 
        # Check jailbreak signatures
        for signature in self.jailbreak_signatures:
            for pattern in signature["patterns"]:
                if re.search(pattern, normalized, re.IGNORECASE):
                    results["detections"].append({
                        "type": "jailbreak_attempt",
                        "name": signature["name"],
                        "severity": signature["severity"]
                    })
                    results["risk_score"] = max(
                        results["risk_score"],
                        signature["severity"]
                    )
                    break
 
        # Check for suspicious encoding
        encoding_issues = self._check_encoding_attacks(prompt)
        if encoding_issues:
            results["detections"].extend(encoding_issues)
            results["risk_score"] = max(
                results["risk_score"],
                max(i["severity"] for i in encoding_issues)
            )
 
        # Check for repeated prompt patterns (probing)
        prompt_hash = hashlib.md5(normalized.encode()).hexdigest()
        if prompt_hash in self.seen_prompts:
            self.seen_prompts[prompt_hash] += 1
            if self.seen_prompts[prompt_hash] > 5:
                results["detections"].append({
                    "type": "repeated_prompt",
                    "count": self.seen_prompts[prompt_hash],
                    "severity": 0.6
                })
        else:
            self.seen_prompts[prompt_hash] = 1
 
        # Determine safety
        results["is_safe"] = results["risk_score"] < 0.7
 
        # Generate recommendations
        if results["detections"]:
            results["recommendations"] = self._generate_recommendations(
                results["detections"]
            )
 
        return results
 
    def _check_encoding_attacks(self, prompt: str) -> List[dict]:
        """Check for encoding-based attacks."""
        issues = []
 
        # Check for Unicode tricks
        if any(ord(c) > 127 for c in prompt):
            # Look for confusable characters
            confusables = self._find_confusable_chars(prompt)
            if confusables:
                issues.append({
                    "type": "unicode_confusion",
                    "description": "Potentially confusable Unicode characters",
                    "characters": confusables,
                    "severity": 0.6
                })
 
        # Check for invisible characters
        invisible_chars = [c for c in prompt if ord(c) in
                         [0x200B, 0x200C, 0x200D, 0xFEFF, 0x2060]]
        if invisible_chars:
            issues.append({
                "type": "invisible_characters",
                "description": "Invisible Unicode characters detected",
                "count": len(invisible_chars),
                "severity": 0.8
            })
 
        # Check for base64 encoded payloads
        base64_pattern = r"[A-Za-z0-9+/]{40,}={0,2}"
        if re.search(base64_pattern, prompt):
            issues.append({
                "type": "encoded_payload",
                "description": "Potential base64 encoded content",
                "severity": 0.5
            })
 
        return issues
 
    def _find_confusable_chars(self, text: str) -> List[dict]:
        """Find visually confusable characters."""
        confusables = []
 
        # Common confusable mappings
        confusable_map = {
            '\u0430': 'a',  # Cyrillic а
            '\u0435': 'e',  # Cyrillic е
            '\u043e': 'o',  # Cyrillic о
            '\u0440': 'p',  # Cyrillic р
            '\u0441': 'c',  # Cyrillic с
            '\u0443': 'y',  # Cyrillic у
            '\u0445': 'x',  # Cyrillic х
        }
 
        for i, char in enumerate(text):
            if char in confusable_map:
                confusables.append({
                    "position": i,
                    "char": char,
                    "looks_like": confusable_map[char]
                })
 
        return confusables
 
    def _generate_recommendations(self, detections: List[dict]) -> List[str]:
        """Generate security recommendations based on detections."""
        recommendations = []
 
        detection_types = set(d["type"] for d in detections)
 
        if "prompt_injection" in detection_types:
            recommendations.append(
                "Implement input sanitization to remove injection attempts"
            )
            recommendations.append(
                "Use prompt templates with clear boundaries"
            )
 
        if "jailbreak_attempt" in detection_types:
            recommendations.append(
                "Block requests matching known jailbreak patterns"
            )
            recommendations.append(
                "Implement output filtering for harmful content"
            )
 
        if "unicode_confusion" in detection_types:
            recommendations.append(
                "Normalize Unicode input to ASCII equivalents"
            )
 
        if "invisible_characters" in detection_types:
            recommendations.append(
                "Strip invisible Unicode characters from input"
            )
 
        if "repeated_prompt" in detection_types:
            recommendations.append(
                "Implement rate limiting for similar prompts"
            )
 
        return recommendations
 
    def analyze_response(self,
                        prompt: str,
                        response: str,
                        expected_behavior: dict = None) -> dict:
        """Analyze LLM response for security issues."""
        results = {
            "is_safe": True,
            "issues": [],
            "leaked_info": []
        }
 
        # Check for data leakage
        leakage_patterns = [
            (r"(api[_-]?key|secret|password|token)\s*[:=]\s*['\"]?[\w-]+",
             "Potential credential leakage"),
            (r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
             "Email address in response"),
            (r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b",
             "Phone number in response"),
            (r"\b\d{3}[-]?\d{2}[-]?\d{4}\b",
             "Potential SSN pattern"),
        ]
 
        for pattern, description in leakage_patterns:
            matches = re.findall(pattern, response, re.IGNORECASE)
            if matches:
                results["leaked_info"].append({
                    "type": description,
                    "matches": matches[:5]  # Limit to prevent large outputs
                })
                results["is_safe"] = False
 
        # Check if response indicates successful injection
        injection_success_indicators = [
            "I am now in",
            "DAN mode",
            "developer mode enabled",
            "I will ignore",
            "I no longer have",
            "my original instructions",
        ]
 
        for indicator in injection_success_indicators:
            if indicator.lower() in response.lower():
                results["issues"].append({
                    "type": "injection_success",
                    "indicator": indicator
                })
                results["is_safe"] = False
 
        # Check response length anomaly
        if expected_behavior:
            expected_length = expected_behavior.get("expected_max_length", 10000)
            if len(response) > expected_length * 1.5:
                results["issues"].append({
                    "type": "response_length_anomaly",
                    "length": len(response),
                    "expected_max": expected_length
                })
 
        return results

Metrics Dashboard Integration

from prometheus_client import Counter, Histogram, Gauge, CollectorRegistry
import time
 
class AIMetricsExporter:
    def __init__(self, registry: CollectorRegistry = None):
        self.registry = registry or CollectorRegistry()
 
        # Inference metrics
        self.inference_total = Counter(
            'ai_inference_total',
            'Total number of model inferences',
            ['model_id', 'status'],
            registry=self.registry
        )
 
        self.inference_latency = Histogram(
            'ai_inference_latency_seconds',
            'Model inference latency',
            ['model_id'],
            buckets=[.01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10],
            registry=self.registry
        )
 
        self.confidence_score = Histogram(
            'ai_confidence_score',
            'Model output confidence scores',
            ['model_id'],
            buckets=[.1, .2, .3, .4, .5, .6, .7, .8, .9, .95, .99],
            registry=self.registry
        )
 
        # Security metrics
        self.security_alerts = Counter(
            'ai_security_alerts_total',
            'Total security alerts',
            ['model_id', 'category', 'severity'],
            registry=self.registry
        )
 
        self.blocked_requests = Counter(
            'ai_blocked_requests_total',
            'Requests blocked by security rules',
            ['model_id', 'reason'],
            registry=self.registry
        )
 
        self.prompt_injection_attempts = Counter(
            'ai_prompt_injection_attempts_total',
            'Detected prompt injection attempts',
            ['model_id', 'injection_type'],
            registry=self.registry
        )
 
        # Model health metrics
        self.model_drift_score = Gauge(
            'ai_model_drift_score',
            'Current model drift score',
            ['model_id'],
            registry=self.registry
        )
 
        self.anomaly_score = Gauge(
            'ai_anomaly_score',
            'Current anomaly detection score',
            ['model_id'],
            registry=self.registry
        )
 
    def record_inference(self, model_id: str, latency: float,
                        confidence: float, status: str = "success"):
        """Record inference metrics."""
        self.inference_total.labels(
            model_id=model_id,
            status=status
        ).inc()
 
        self.inference_latency.labels(
            model_id=model_id
        ).observe(latency)
 
        self.confidence_score.labels(
            model_id=model_id
        ).observe(confidence)
 
    def record_security_alert(self, model_id: str,
                             category: str, severity: str):
        """Record security alert."""
        self.security_alerts.labels(
            model_id=model_id,
            category=category,
            severity=severity
        ).inc()
 
    def record_blocked_request(self, model_id: str, reason: str):
        """Record blocked request."""
        self.blocked_requests.labels(
            model_id=model_id,
            reason=reason
        ).inc()
 
    def record_prompt_injection(self, model_id: str, injection_type: str):
        """Record prompt injection attempt."""
        self.prompt_injection_attempts.labels(
            model_id=model_id,
            injection_type=injection_type
        ).inc()
 
    def update_drift_score(self, model_id: str, score: float):
        """Update model drift score."""
        self.model_drift_score.labels(model_id=model_id).set(score)
 
    def update_anomaly_score(self, model_id: str, score: float):
        """Update anomaly score."""
        self.anomaly_score.labels(model_id=model_id).set(score)
 
 
# Grafana dashboard configuration
GRAFANA_DASHBOARD = {
    "title": "AI Security Monitoring",
    "panels": [
        {
            "title": "Inference Rate",
            "type": "graph",
            "query": "rate(ai_inference_total[5m])"
        },
        {
            "title": "Security Alerts by Category",
            "type": "piechart",
            "query": "sum by (category) (ai_security_alerts_total)"
        },
        {
            "title": "Prompt Injection Attempts",
            "type": "stat",
            "query": "sum(increase(ai_prompt_injection_attempts_total[1h]))"
        },
        {
            "title": "Model Drift Score",
            "type": "gauge",
            "query": "ai_model_drift_score",
            "thresholds": [0.3, 0.7]
        },
        {
            "title": "P99 Latency",
            "type": "graph",
            "query": "histogram_quantile(0.99, rate(ai_inference_latency_seconds_bucket[5m]))"
        },
        {
            "title": "Blocked Requests",
            "type": "table",
            "query": "topk(10, sum by (reason) (ai_blocked_requests_total))"
        }
    ]
}

Alerting Configuration

# alertmanager-rules.yaml
groups:
  - name: ai-security-alerts
    rules:
      - alert: HighPromptInjectionRate
        expr: rate(ai_prompt_injection_attempts_total[5m]) > 10
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "High rate of prompt injection attempts"
          description: "More than 10 prompt injection attempts per second detected"
 
      - alert: ModelDriftDetected
        expr: ai_model_drift_score > 0.7
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Model drift detected"
          description: "Model {{ $labels.model_id }} drift score is {{ $value }}"
 
      - alert: AnomalousInferencePattern
        expr: ai_anomaly_score > 0.8
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Anomalous inference pattern detected"
          description: "Model {{ $labels.model_id }} showing anomalous behavior"
 
      - alert: HighBlockedRequestRate
        expr: rate(ai_blocked_requests_total[5m]) > 50
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High rate of blocked requests"
          description: "Possible attack in progress"
 
      - alert: InferenceLatencyHigh
        expr: histogram_quantile(0.99, rate(ai_inference_latency_seconds_bucket[5m])) > 5
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "High inference latency"
          description: "P99 latency is {{ $value }}s for model {{ $labels.model_id }}"

Best Practices

Monitoring Strategy

  1. Baseline establishment: Collect normal behavior metrics before enabling alerting
  2. Multi-layer detection: Combine statistical, rule-based, and ML-based detection
  3. Context preservation: Log full request context for forensic analysis
  4. Tunable thresholds: Allow adjustment based on model and traffic patterns

Alert Management

  • Use severity levels appropriately
  • Implement alert aggregation to prevent fatigue
  • Create runbooks for each alert type
  • Regular review and tuning of alert thresholds

Need help building AI monitoring infrastructure? DeviDevs implements production MLOps platforms with comprehensive security monitoring. Get a free assessment →

Weekly AI Security & Automation Digest

Get the latest on AI Security, workflow automation, secure integrations, and custom platform development delivered weekly.

No spam. Unsubscribe anytime.