Production AI Agent Monitoring and Alerting

body{font-family:-apple-system,BlinkMacSystemFont,’Segoe UI‘,Roboto,sans-serif;max-width:900px;margin:0 auto;padding:2rem;line-height:1.7;color:#1a1a1a}
h1{color:#1a1a1a;border-bottom:3px solid #6366f1;padding-bottom:.5rem}
h2{color:#334155;margin-top:2rem}
code{background:#f1f5f9;padding:.2rem .5rem;border-radius:4px;font-size:.9em}
pre{background:#1e293b;color:#e2e8f0;padding:1.5rem;border-radius:8px;overflow-x:auto;font-size:.9em}
blockquote{border-left:4px solid #6366f1;padding-left:1rem;color:#64748b;font-style:italic}
table{border-collapse:collapse;width:100%;margin:1rem 0}
th,td{border:1px solid #e2e8f0;padding:.75rem;text-align:left}
th{background:#f8fafc}
.tag{display:inline-block;background:#e0e7ff;color:#4338ca;padding:.2rem .6rem;border-radius:999px;font-size:.85em;margin-right:.5rem}

Production AI Agent Monitoring and Alerting

Reviewed: June 4, 2026

Published: May 26, 2026 | Reading time: 11 min | Monitoring Production AI Observability

AI agents in production don’t fail like traditional software. They degrade slowly — giving wrong answers, drifting off-topic, or burning tokens on infinite loops. Effective monitoring catches these failures before users do.

The Four Pillars of Agent Observability

Traditional application monitoring tracks CPU, memory, and error rates. Agent monitoring needs additional dimensions:

Pillar What to Monitor Key Metrics
Quality Output correctness, relevance, hallucination rate Accuracy score, human eval rating, refusal rate
Performance Latency, token usage, tool call efficiency P95 latency, tokens/request, tool call count
Cost Per-request and aggregate spending $/request, daily burn rate, budget utilization
Safety Policy violations, injection attempts, data leakage Violation rate, blocked requests, PII detection

Quality Monitoring

class AgentQualityMonitor:
    """Track output quality metrics for AI agents."""
    
    def __init__(self):
        self.responses = []
    
    def record_response(self, request_id: str, user_input: str, 
                        response: str, metadata: dict):
        entry = {
            "request_id": request_id,
            "timestamp": datetime.utcnow().isoformat(),
            "input_length": len(user_input),
            "response_length": len(response),
            "tokens_used": metadata.get("tokens_used", 0),
            "tools_called": metadata.get("tools_called", []),
            "latency_ms": metadata.get("latency_ms", 0),
            "model": metadata.get("model", "unknown"),
        }
        self.responses.append(entry)
    
    def get_quality_stats(self, window_hours: int = 24) -> dict:
        cutoff = datetime.utcnow() - timedelta(hours=window_hours)
        recent = [r for r in self.responses 
                  if datetime.fromisoformat(r["timestamp"]) > cutoff]
        
        if not recent:
            return {"error": "No data in window"}
        
        latencies = [r["latency_ms"] for r in recent]
        tokens = [r["tokens_used"] for r in recent]
        
        return {
            "total_requests": len(recent),
            "avg_latency_ms": sum(latencies) / len(latencies),
            "p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)],
            "avg_tokens": sum(tokens) / len(tokens),
            "total_tokens": sum(tokens),
            "avg_tools_per_request": sum(len(r["tools_called"]) for r in recent) / len(recent),
        }

Cost Monitoring and Budget Alerts

class AgentCostMonitor:
    """Monitor and alert on AI agent spending."""
    
    def __init__(self, daily_budget: float, alert_thresholds: list[float] = None):
        self.daily_budget = daily_budget
        self.alert_thresholds = alert_thresholds or [0.5, 0.75, 0.9, 1.0]
        self.spending = {}  # date -> total_cost
    
    def record_cost(self, cost: float, model: str = "unknown"):
        today = datetime.utcnow().strftime("%Y-%m-%d")
        if today not in self.spending:
            self.spending[today] = {"total": 0.0, "by_model": {}}
        
        self.spending[today]["total"] += cost
        self.spending[today]["by_model"][model] = 
            self.spending[today]["by_model"].get(model, 0) + cost
        
        # Check thresholds
        self._check_alerts(today)
    
    def _check_alerts(self, today: str):
        total = self.spending[today]["total"]
        utilization = total / self.daily_budget
        
        for threshold in self.alert_thresholds:
            if utilization >= threshold:
                self._send_alert(threshold, total, self.daily_budget)
    
    def _send_alert(self, threshold: float, spent: float, budget: float):
        level = "WARNING" if threshold  dict:
        today = datetime.utcnow().strftime("%Y-%m-%d")
        return self.spending.get(today, {"total": 0, "by_model": {}})

Anomaly Detection for Agent Behavior

class AgentAnomalyDetector:
    """Detect unusual agent behavior patterns."""
    
    def __init__(self, baseline_window_days: int = 7):
        self.baseline = {}
        self.window = baseline_window_days
    
    def update_baseline(self, metrics: dict):
        """Update expected ranges from historical data."""
        for key, value in metrics.items():
            if key not in self.baseline:
                self.baseline[key] = []
            self.baseline[key].append(value)
            # Keep only recent window
            self.baseline[key] = self.baseline[key][-1000:]
    
    def check_anomaly(self, current: dict) -> list[dict]:
        """Check current metrics against baseline."""
        anomalies = []
        
        for key, value in current.items():
            if key not in self.baseline or len(self.baseline[key])  3:  # 3-sigma rule
                anomalies.append({
                    "metric": key,
                    "current": value,
                    "expected_mean": mean,
                    "z_score": z_score,
                    "severity": "high" if z_score > 5 else "medium"
                })
        
        return anomalies

# Usage
detector = AgentAnomalyDetector()
# After collecting baseline data...
anomalies = detector.check_anomaly({
    "avg_latency_ms": 5000,  # Way above normal
    "tokens_per_request": 15000,  # Unusual
    "error_rate": 0.15  # Above normal
})
for a in anomalies:
    print(f"ANOMALY: {a['metric']} = {a['current']} (expected ~{a['expected_mean']:.0f}, z={a['z_score']:.1f})")

Alert Routing and Escalation

class AlertRouter:
    """Route alerts to appropriate channels based on severity."""
    
    SEVERITY_CHANNELS = {
        "info": ["log"],
        "warning": ["log", "slack"],
        "high": ["log", "slack", "pagerduty"],
        "critical": ["log", "slack", "pagerduty", "sms"]
    }
    
    def route(self, alert: dict):
        severity = alert.get("severity", "info")
        channels = self.SEVERITY_CHANNELS.get(severity, ["log"])
        
        for channel in channels:
            if channel == "log":
                self._log(alert)
            elif channel == "slack":
                self._slack(alert)
            elif channel == "pagerduty":
                self._pagerduty(alert)
            elif channel == "sms":
                self._sms(alert)
    
    def _log(self, alert):
        print(f"[ALERT:{alert['severity'].upper()}] {alert['message']}")
    
    def _slack(self, alert):
        # POST to Slack webhook
        pass
    
    def _pagerduty(self, alert):
        # Trigger PagerDuty incident
        pass
    
    def _sms(self, alert):
        # Send SMS for critical alerts
        pass

Dashboard Metrics to Track

Metric Warning Threshold Critical Threshold
P95 Response Latency > 5 seconds > 15 seconds
Error Rate > 2% > 10%
Token Usage (avg/request) > 2x baseline > 5x baseline
Daily Budget Utilization > 75% > 95%
Hallucination Rate (sampled) > 5% > 15%
Tool Call Failure Rate > 5% > 20%
Unique Users (drop) > 30% decrease > 60% decrease

Part of the AI Agent Security series on DataGate.ch. See also: Agent Observability Guide.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert