Skip to content
Learn Agentic AI14 min read0 views

Workflow Observability: Monitoring, Alerting, and Debugging Agent Orchestration

Learn how to build observability into AI agent orchestration systems. Covers dashboard design, metric collection, alert rules, trace correlation, and debugging strategies for agent workflows.

Why Agent Workflows Need Specialized Observability

Traditional application monitoring tracks request latency, error rates, and throughput. AI agent workflows add unique challenges:

  • Non-deterministic execution: The same input produces different step counts, different LLM calls, and different durations each run
  • Long execution times: A workflow might run for minutes or hours, making real-time dashboards essential
  • Cost visibility: Every LLM call has a dollar cost that must be tracked alongside performance metrics
  • Quality signals: Beyond "did it succeed," you need to know "was the output good"

Effective observability for agent systems requires three pillars: metrics (what is happening), logs (why it happened), and traces (how it happened across steps).

Metric Collection

Define and collect the metrics that matter most for agent workflows.

import time
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Any

@dataclass
class WorkflowMetrics:
    workflow_id: str
    workflow_name: str
    start_time: float = field(default_factory=time.time)
    end_time: float | None = None
    step_metrics: list[dict] = field(default_factory=list)
    llm_calls: list[dict] = field(default_factory=list)
    total_tokens: int = 0
    total_cost_usd: float = 0.0
    error_count: int = 0
    retry_count: int = 0

    @property
    def duration_seconds(self) -> float | None:
        if self.end_time is None:
            return time.time() - self.start_time
        return self.end_time - self.start_time

class MetricsCollector:
    """Collects and exposes workflow metrics."""

    def __init__(self):
        self._active_workflows: dict[str, WorkflowMetrics] = {}
        self._completed: list[WorkflowMetrics] = []
        self._counters: dict[str, int] = defaultdict(int)

    def start_workflow(self, workflow_id: str, name: str) -> WorkflowMetrics:
        metrics = WorkflowMetrics(
            workflow_id=workflow_id,
            workflow_name=name,
        )
        self._active_workflows[workflow_id] = metrics
        self._counters["workflows_started"] += 1
        return metrics

    def record_step(
        self,
        workflow_id: str,
        step_name: str,
        duration_ms: float,
        status: str,
        metadata: dict | None = None,
    ):
        metrics = self._active_workflows.get(workflow_id)
        if not metrics:
            return
        metrics.step_metrics.append({
            "step": step_name,
            "duration_ms": duration_ms,
            "status": status,
            "timestamp": time.time(),
            **(metadata or {}),
        })
        if status == "failed":
            metrics.error_count += 1
        if status == "retried":
            metrics.retry_count += 1

    def record_llm_call(
        self,
        workflow_id: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        duration_ms: float,
        cost_usd: float,
    ):
        metrics = self._active_workflows.get(workflow_id)
        if not metrics:
            return
        metrics.llm_calls.append({
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "duration_ms": duration_ms,
            "cost_usd": cost_usd,
            "timestamp": time.time(),
        })
        metrics.total_tokens += input_tokens + output_tokens
        metrics.total_cost_usd += cost_usd

    def complete_workflow(self, workflow_id: str, status: str):
        metrics = self._active_workflows.pop(workflow_id, None)
        if metrics:
            metrics.end_time = time.time()
            self._completed.append(metrics)
            self._counters[f"workflows_{status}"] += 1

    def get_summary(self) -> dict:
        return {
            "active_workflows": len(self._active_workflows),
            "counters": dict(self._counters),
            "recent_completed": [
                {
                    "id": m.workflow_id,
                    "name": m.workflow_name,
                    "duration_s": round(m.duration_seconds, 2),
                    "steps": len(m.step_metrics),
                    "tokens": m.total_tokens,
                    "cost_usd": round(m.total_cost_usd, 4),
                    "errors": m.error_count,
                }
                for m in self._completed[-20:]
            ],
        }

Prometheus Integration

Export metrics in Prometheus format for Grafana dashboards.

from prometheus_client import Counter, Histogram, Gauge, Info

# Workflow-level metrics
workflow_started = Counter(
    "agent_workflow_started_total",
    "Total workflows started",
    ["workflow_name"],
)
workflow_completed = Counter(
    "agent_workflow_completed_total",
    "Total workflows completed",
    ["workflow_name", "status"],
)
workflow_duration = Histogram(
    "agent_workflow_duration_seconds",
    "Workflow execution duration",
    ["workflow_name"],
    buckets=[1, 5, 10, 30, 60, 120, 300, 600],
)
active_workflows = Gauge(
    "agent_active_workflows",
    "Currently running workflows",
    ["workflow_name"],
)

# Step-level metrics
step_duration = Histogram(
    "agent_step_duration_seconds",
    "Individual step duration",
    ["workflow_name", "step_name"],
    buckets=[0.1, 0.5, 1, 2, 5, 10, 30, 60],
)
step_errors = Counter(
    "agent_step_errors_total",
    "Step execution errors",
    ["workflow_name", "step_name", "error_type"],
)

# LLM-specific metrics
llm_call_duration = Histogram(
    "agent_llm_call_duration_seconds",
    "LLM API call duration",
    ["model"],
    buckets=[0.5, 1, 2, 5, 10, 30],
)
llm_tokens_used = Counter(
    "agent_llm_tokens_total",
    "Total tokens consumed",
    ["model", "direction"],  # direction: input or output
)
llm_cost = Counter(
    "agent_llm_cost_usd_total",
    "Total LLM cost in USD",
    ["model"],
)

Alert Rules

Define alerts that catch real problems without creating noise.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

alert_rules = {
    "high_failure_rate": {
        "expr": (
            "rate(agent_workflow_completed_total{status='failed'}[5m]) / "
            "rate(agent_workflow_started_total[5m]) > 0.1"
        ),
        "for": "5m",
        "severity": "critical",
        "summary": "More than 10% of agent workflows are failing",
    },
    "workflow_stuck": {
        "expr": (
            "time() - agent_workflow_last_step_timestamp > 600"
        ),
        "for": "1m",
        "severity": "warning",
        "summary": "Agent workflow has not progressed in 10 minutes",
    },
    "llm_latency_spike": {
        "expr": (
            "histogram_quantile(0.95, "
            "rate(agent_llm_call_duration_seconds_bucket[5m])) > 15"
        ),
        "for": "3m",
        "severity": "warning",
        "summary": "P95 LLM call latency exceeds 15 seconds",
    },
    "cost_spike": {
        "expr": (
            "rate(agent_llm_cost_usd_total[1h]) > 10"
        ),
        "for": "5m",
        "severity": "critical",
        "summary": "LLM spending exceeds $10/hour",
    },
}

Trace Correlation

Link individual steps across a workflow execution using trace IDs. This lets you follow the full execution path in your logging system.

import uuid
import logging
import contextvars

trace_id_var: contextvars.ContextVar[str] = contextvars.ContextVar(
    "trace_id", default=""
)

class TraceContext:
    def __init__(self, workflow_id: str):
        self.workflow_id = workflow_id
        self.trace_id = str(uuid.uuid4())
        self.span_stack: list[str] = []

    def start_span(self, step_name: str) -> str:
        span_id = str(uuid.uuid4())[:8]
        self.span_stack.append(span_id)
        trace_id_var.set(self.trace_id)
        return span_id

    def end_span(self):
        if self.span_stack:
            self.span_stack.pop()

class StructuredLogger:
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)

    def log_step(
        self,
        level: str,
        message: str,
        trace: TraceContext,
        step_name: str,
        **extra,
    ):
        self.logger.log(
            getattr(logging, level.upper()),
            message,
            extra={
                "trace_id": trace.trace_id,
                "workflow_id": trace.workflow_id,
                "step_name": step_name,
                "span_id": (
                    trace.span_stack[-1] if trace.span_stack else None
                ),
                **extra,
            },
        )

# Usage
logger = StructuredLogger("agent")
trace = TraceContext(workflow_id="wf-123")
span = trace.start_span("analyze")
logger.log_step(
    "info",
    "Starting analysis step",
    trace,
    "analyze",
    input_length=1500,
)

Debugging Failed Workflows

When a workflow fails, you need to reconstruct what happened. Build a debugging utility that pulls together metrics, logs, and state.

class WorkflowDebugger:
    def __init__(self, store, metrics_collector, log_store):
        self.store = store
        self.metrics = metrics_collector
        self.logs = log_store

    async def investigate(self, workflow_id: str) -> dict:
        workflow = await self.store.load(workflow_id)
        logs = await self.logs.query(
            f'workflow_id="{workflow_id}"',
            limit=100,
        )

        failed_steps = [
            s for s in workflow.steps
            if s.status == "failed"
        ]

        return {
            "workflow": {
                "id": workflow.id,
                "status": workflow.status,
                "version": workflow.version,
                "started": workflow.created_at.isoformat(),
            },
            "failed_steps": [
                {
                    "name": s.name,
                    "error": s.error,
                    "attempts": s.attempts,
                    "last_attempt": s.completed_at.isoformat(),
                }
                for s in failed_steps
            ],
            "recent_logs": logs,
            "context_snapshot": workflow.context,
        }

FAQ

What is the single most important metric for agent workflows?

The step failure rate by step name. This tells you which specific step is causing problems and at what rate. Aggregate workflow failure rates hide whether the issue is systemic (all steps failing) or localized (one flaky API integration). Once you know the failing step, you can look at its error logs and retry behavior.

How do I avoid alert fatigue with AI agent monitoring?

Set alerts on rates and percentiles, not individual failures. A single failed LLM call is expected. A 10% failure rate sustained for 5 minutes is a real problem. Use the for clause in Prometheus alert rules to require sustained anomalies before firing. Also, separate informational alerts (Slack notifications) from actionable alerts (PagerDuty pages).

Should I log full LLM prompts and responses?

Log them in development and staging for debugging. In production, log truncated versions (first 200 characters) or hashes. Full prompts and responses can contain sensitive user data and consume enormous storage. Use sampling — log full content for 1% of executions — to maintain debugging capability without the storage cost.


#Observability #Monitoring #Alerting #AIAgents #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.