Workflow Observability: Monitoring, Alerting, and Debugging Agent Orchestration
Learn how to build observability into AI agent orchestration systems. Covers dashboard design, metric collection, alert rules, trace correlation, and debugging strategies for agent workflows.
Why Agent Workflows Need Specialized Observability
Traditional application monitoring tracks request latency, error rates, and throughput. AI agent workflows add unique challenges:
- Non-deterministic execution: The same input produces different step counts, different LLM calls, and different durations each run
- Long execution times: A workflow might run for minutes or hours, making real-time dashboards essential
- Cost visibility: Every LLM call has a dollar cost that must be tracked alongside performance metrics
- Quality signals: Beyond "did it succeed," you need to know "was the output good"
Effective observability for agent systems requires three pillars: metrics (what is happening), logs (why it happened), and traces (how it happened across steps).
Metric Collection
Define and collect the metrics that matter most for agent workflows.
import time
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Any
@dataclass
class WorkflowMetrics:
workflow_id: str
workflow_name: str
start_time: float = field(default_factory=time.time)
end_time: float | None = None
step_metrics: list[dict] = field(default_factory=list)
llm_calls: list[dict] = field(default_factory=list)
total_tokens: int = 0
total_cost_usd: float = 0.0
error_count: int = 0
retry_count: int = 0
@property
def duration_seconds(self) -> float | None:
if self.end_time is None:
return time.time() - self.start_time
return self.end_time - self.start_time
class MetricsCollector:
"""Collects and exposes workflow metrics."""
def __init__(self):
self._active_workflows: dict[str, WorkflowMetrics] = {}
self._completed: list[WorkflowMetrics] = []
self._counters: dict[str, int] = defaultdict(int)
def start_workflow(self, workflow_id: str, name: str) -> WorkflowMetrics:
metrics = WorkflowMetrics(
workflow_id=workflow_id,
workflow_name=name,
)
self._active_workflows[workflow_id] = metrics
self._counters["workflows_started"] += 1
return metrics
def record_step(
self,
workflow_id: str,
step_name: str,
duration_ms: float,
status: str,
metadata: dict | None = None,
):
metrics = self._active_workflows.get(workflow_id)
if not metrics:
return
metrics.step_metrics.append({
"step": step_name,
"duration_ms": duration_ms,
"status": status,
"timestamp": time.time(),
**(metadata or {}),
})
if status == "failed":
metrics.error_count += 1
if status == "retried":
metrics.retry_count += 1
def record_llm_call(
self,
workflow_id: str,
model: str,
input_tokens: int,
output_tokens: int,
duration_ms: float,
cost_usd: float,
):
metrics = self._active_workflows.get(workflow_id)
if not metrics:
return
metrics.llm_calls.append({
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"duration_ms": duration_ms,
"cost_usd": cost_usd,
"timestamp": time.time(),
})
metrics.total_tokens += input_tokens + output_tokens
metrics.total_cost_usd += cost_usd
def complete_workflow(self, workflow_id: str, status: str):
metrics = self._active_workflows.pop(workflow_id, None)
if metrics:
metrics.end_time = time.time()
self._completed.append(metrics)
self._counters[f"workflows_{status}"] += 1
def get_summary(self) -> dict:
return {
"active_workflows": len(self._active_workflows),
"counters": dict(self._counters),
"recent_completed": [
{
"id": m.workflow_id,
"name": m.workflow_name,
"duration_s": round(m.duration_seconds, 2),
"steps": len(m.step_metrics),
"tokens": m.total_tokens,
"cost_usd": round(m.total_cost_usd, 4),
"errors": m.error_count,
}
for m in self._completed[-20:]
],
}
Prometheus Integration
Export metrics in Prometheus format for Grafana dashboards.
from prometheus_client import Counter, Histogram, Gauge, Info
# Workflow-level metrics
workflow_started = Counter(
"agent_workflow_started_total",
"Total workflows started",
["workflow_name"],
)
workflow_completed = Counter(
"agent_workflow_completed_total",
"Total workflows completed",
["workflow_name", "status"],
)
workflow_duration = Histogram(
"agent_workflow_duration_seconds",
"Workflow execution duration",
["workflow_name"],
buckets=[1, 5, 10, 30, 60, 120, 300, 600],
)
active_workflows = Gauge(
"agent_active_workflows",
"Currently running workflows",
["workflow_name"],
)
# Step-level metrics
step_duration = Histogram(
"agent_step_duration_seconds",
"Individual step duration",
["workflow_name", "step_name"],
buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60],
)
step_errors = Counter(
"agent_step_errors_total",
"Step execution errors",
["workflow_name", "step_name", "error_type"],
)
# LLM-specific metrics
llm_call_duration = Histogram(
"agent_llm_call_duration_seconds",
"LLM API call duration",
["model"],
buckets=[0.5, 1, 2, 5, 10, 30],
)
llm_tokens_used = Counter(
"agent_llm_tokens_total",
"Total tokens consumed",
["model", "direction"], # direction: input or output
)
llm_cost = Counter(
"agent_llm_cost_usd_total",
"Total LLM cost in USD",
["model"],
)
Alert Rules
Define alerts that catch real problems without creating noise.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
alert_rules = {
"high_failure_rate": {
"expr": (
"rate(agent_workflow_completed_total{status='failed'}[5m]) / "
"rate(agent_workflow_started_total[5m]) > 0.1"
),
"for": "5m",
"severity": "critical",
"summary": "More than 10% of agent workflows are failing",
},
"workflow_stuck": {
"expr": (
"time() - agent_workflow_last_step_timestamp > 600"
),
"for": "1m",
"severity": "warning",
"summary": "Agent workflow has not progressed in 10 minutes",
},
"llm_latency_spike": {
"expr": (
"histogram_quantile(0.95, "
"rate(agent_llm_call_duration_seconds_bucket[5m])) > 15"
),
"for": "3m",
"severity": "warning",
"summary": "P95 LLM call latency exceeds 15 seconds",
},
"cost_spike": {
"expr": (
"rate(agent_llm_cost_usd_total[1h]) > 10"
),
"for": "5m",
"severity": "critical",
"summary": "LLM spending exceeds $10/hour",
},
}
Trace Correlation
Link individual steps across a workflow execution using trace IDs. This lets you follow the full execution path in your logging system.
import uuid
import logging
import contextvars
trace_id_var: contextvars.ContextVar[str] = contextvars.ContextVar(
"trace_id", default=""
)
class TraceContext:
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
self.trace_id = str(uuid.uuid4())
self.span_stack: list[str] = []
def start_span(self, step_name: str) -> str:
span_id = str(uuid.uuid4())[:8]
self.span_stack.append(span_id)
trace_id_var.set(self.trace_id)
return span_id
def end_span(self):
if self.span_stack:
self.span_stack.pop()
class StructuredLogger:
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def log_step(
self,
level: str,
message: str,
trace: TraceContext,
step_name: str,
**extra,
):
self.logger.log(
getattr(logging, level.upper()),
message,
extra={
"trace_id": trace.trace_id,
"workflow_id": trace.workflow_id,
"step_name": step_name,
"span_id": (
trace.span_stack[-1] if trace.span_stack else None
),
**extra,
},
)
# Usage
logger = StructuredLogger("agent")
trace = TraceContext(workflow_id="wf-123")
span = trace.start_span("analyze")
logger.log_step(
"info",
"Starting analysis step",
trace,
"analyze",
input_length=1500,
)
Debugging Failed Workflows
When a workflow fails, you need to reconstruct what happened. Build a debugging utility that pulls together metrics, logs, and state.
class WorkflowDebugger:
def __init__(self, store, metrics_collector, log_store):
self.store = store
self.metrics = metrics_collector
self.logs = log_store
async def investigate(self, workflow_id: str) -> dict:
workflow = await self.store.load(workflow_id)
logs = await self.logs.query(
f'workflow_id="{workflow_id}"',
limit=100,
)
failed_steps = [
s for s in workflow.steps
if s.status == "failed"
]
return {
"workflow": {
"id": workflow.id,
"status": workflow.status,
"version": workflow.version,
"started": workflow.created_at.isoformat(),
},
"failed_steps": [
{
"name": s.name,
"error": s.error,
"attempts": s.attempts,
"last_attempt": s.completed_at.isoformat(),
}
for s in failed_steps
],
"recent_logs": logs,
"context_snapshot": workflow.context,
}
FAQ
What is the single most important metric for agent workflows?
The step failure rate by step name. This tells you which specific step is causing problems and at what rate. Aggregate workflow failure rates hide whether the issue is systemic (all steps failing) or localized (one flaky API integration). Once you know the failing step, you can look at its error logs and retry behavior.
How do I avoid alert fatigue with AI agent monitoring?
Set alerts on rates and percentiles, not individual failures. A single failed LLM call is expected. A 10% failure rate sustained for 5 minutes is a real problem. Use the for clause in Prometheus alert rules to require sustained anomalies before firing. Also, separate informational alerts (Slack notifications) from actionable alerts (PagerDuty pages).
Should I log full LLM prompts and responses?
Log them in development and staging for debugging. In production, log truncated versions (first 200 characters) or hashes. Full prompts and responses can contain sensitive user data and consume enormous storage. Use sampling — log full content for 1% of executions — to maintain debugging capability without the storage cost.
#Observability #Monitoring #Alerting #AIAgents #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.