Skip to content
Learn Agentic AI14 min read0 views

Building an Agent Orchestration Dashboard: Visualizing Workflow Status and Performance

Learn how to build a real-time orchestration dashboard for AI agent workflows. Covers UI components, status tracking, timeline views, error drill-down, and the backend API that powers it all.

Why Build a Custom Dashboard

Off-the-shelf orchestration platforms include their own dashboards, but custom dashboards serve a different purpose. They surface domain-specific insights — agent quality scores, LLM cost breakdowns, model performance comparisons — that generic workflow UIs do not provide.

A well-designed orchestration dashboard answers three questions at a glance: What is running right now? What failed recently and why? How much is this costing?

Dashboard API with FastAPI

The backend API provides endpoints for the dashboard frontend to consume. Start with the data models and core endpoints.

from fastapi import FastAPI, Query
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional

app = FastAPI(title="Agent Orchestration Dashboard API")

class WorkflowSummary(BaseModel):
    id: str
    name: str
    status: str
    started_at: datetime
    duration_seconds: float | None
    step_count: int
    current_step: str | None
    error: str | None
    total_tokens: int
    cost_usd: float

class DashboardOverview(BaseModel):
    active_workflows: int
    completed_today: int
    failed_today: int
    success_rate: float
    total_cost_today_usd: float
    total_tokens_today: int
    avg_duration_seconds: float

@app.get("/api/dashboard/overview")
async def get_overview(
    hours: int = Query(default=24, ge=1, le=168),
) -> DashboardOverview:
    """Get high-level dashboard statistics."""
    since = datetime.utcnow() - timedelta(hours=hours)

    active = await workflow_store.count(status="running")
    completed = await workflow_store.count(
        status="completed", since=since
    )
    failed = await workflow_store.count(
        status="failed", since=since
    )
    total = completed + failed
    success_rate = completed / total if total > 0 else 1.0

    cost_data = await metrics_store.sum_cost(since=since)
    token_data = await metrics_store.sum_tokens(since=since)
    avg_duration = await metrics_store.avg_duration(since=since)

    return DashboardOverview(
        active_workflows=active,
        completed_today=completed,
        failed_today=failed,
        success_rate=round(success_rate, 4),
        total_cost_today_usd=round(cost_data, 2),
        total_tokens_today=token_data,
        avg_duration_seconds=round(avg_duration, 2),
    )

Workflow List and Filtering

class WorkflowFilter(BaseModel):
    status: Optional[str] = None
    name: Optional[str] = None
    since: Optional[datetime] = None
    until: Optional[datetime] = None
    min_cost_usd: Optional[float] = None
    has_errors: Optional[bool] = None

@app.get("/api/dashboard/workflows")
async def list_workflows(
    status: Optional[str] = None,
    name: Optional[str] = None,
    hours: int = Query(default=24, ge=1, le=168),
    limit: int = Query(default=50, ge=1, le=200),
    offset: int = Query(default=0, ge=0),
) -> dict:
    """List workflows with filtering and pagination."""
    since = datetime.utcnow() - timedelta(hours=hours)

    workflows = await workflow_store.query(
        status=status,
        name=name,
        since=since,
        limit=limit,
        offset=offset,
    )

    total = await workflow_store.count(
        status=status, name=name, since=since
    )

    return {
        "workflows": [
            WorkflowSummary(
                id=wf.id,
                name=wf.name,
                status=wf.status,
                started_at=wf.created_at,
                duration_seconds=wf.duration_seconds,
                step_count=len(wf.steps),
                current_step=_get_current_step(wf),
                error=_get_last_error(wf),
                total_tokens=wf.metrics.total_tokens,
                cost_usd=wf.metrics.total_cost_usd,
            )
            for wf in workflows
        ],
        "total": total,
        "limit": limit,
        "offset": offset,
    }

def _get_current_step(wf) -> str | None:
    running = [s for s in wf.steps if s.status == "running"]
    return running[0].name if running else None

def _get_last_error(wf) -> str | None:
    failed = [s for s in wf.steps if s.status == "failed"]
    return failed[-1].error if failed else None

Timeline View API

The timeline view shows each step's duration and status as a horizontal bar chart, making it easy to spot bottlenecks.

class TimelineStep(BaseModel):
    name: str
    status: str
    started_at: datetime | None
    completed_at: datetime | None
    duration_ms: float | None
    attempts: int
    error: str | None
    llm_calls: int
    tokens_used: int

@app.get("/api/dashboard/workflows/{workflow_id}/timeline")
async def get_workflow_timeline(workflow_id: str) -> dict:
    """Get detailed timeline for a specific workflow."""
    wf = await workflow_store.load(workflow_id)
    if not wf:
        raise HTTPException(status_code=404, detail="Workflow not found")

    steps = []
    for step in wf.steps:
        llm_calls_for_step = [
            call for call in wf.metrics.llm_calls
            if call.get("step_name") == step.name
        ]
        steps.append(TimelineStep(
            name=step.name,
            status=step.status,
            started_at=step.started_at,
            completed_at=step.completed_at,
            duration_ms=(
                (step.completed_at - step.started_at).total_seconds()
                * 1000
                if step.started_at and step.completed_at
                else None
            ),
            attempts=step.attempts,
            error=step.error,
            llm_calls=len(llm_calls_for_step),
            tokens_used=sum(
                c.get("input_tokens", 0) + c.get("output_tokens", 0)
                for c in llm_calls_for_step
            ),
        ))

    return {
        "workflow_id": wf.id,
        "workflow_name": wf.name,
        "total_duration_ms": (
            (wf.updated_at - wf.created_at).total_seconds() * 1000
        ),
        "steps": steps,
    }

Error Drill-Down API

When a workflow fails, the dashboard needs to show exactly what went wrong at each level.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class ErrorDetail(BaseModel):
    workflow_id: str
    step_name: str
    error_type: str
    error_message: str
    stack_trace: str | None
    attempt_number: int
    timestamp: datetime
    context_snapshot: dict

@app.get("/api/dashboard/workflows/{workflow_id}/errors")
async def get_workflow_errors(workflow_id: str) -> list[ErrorDetail]:
    """Get detailed error information for a workflow."""
    wf = await workflow_store.load(workflow_id)
    if not wf:
        raise HTTPException(status_code=404, detail="Workflow not found")

    errors = []
    for step in wf.steps:
        if step.error:
            errors.append(ErrorDetail(
                workflow_id=wf.id,
                step_name=step.name,
                error_type=step.error_type or "Unknown",
                error_message=step.error,
                stack_trace=step.stack_trace,
                attempt_number=step.attempts,
                timestamp=step.completed_at or wf.updated_at,
                context_snapshot={
                    k: str(v)[:200]  # Truncate large values
                    for k, v in wf.context.items()
                    if not k.startswith("_")
                },
            ))

    return errors

Cost Breakdown API

class CostBreakdown(BaseModel):
    model: str
    call_count: int
    total_tokens: int
    input_tokens: int
    output_tokens: int
    total_cost_usd: float

@app.get("/api/dashboard/costs")
async def get_cost_breakdown(
    hours: int = Query(default=24, ge=1, le=168),
    group_by: str = Query(default="model", enum=["model", "workflow", "step"]),
) -> dict:
    """Get cost breakdown by model, workflow, or step."""
    since = datetime.utcnow() - timedelta(hours=hours)

    raw_data = await metrics_store.get_llm_costs(
        since=since, group_by=group_by
    )

    breakdowns = [
        CostBreakdown(
            model=row["group_key"],
            call_count=row["call_count"],
            total_tokens=row["total_tokens"],
            input_tokens=row["input_tokens"],
            output_tokens=row["output_tokens"],
            total_cost_usd=round(row["total_cost"], 4),
        )
        for row in raw_data
    ]

    return {
        "period_hours": hours,
        "breakdowns": breakdowns,
        "total_cost_usd": round(
            sum(b.total_cost_usd for b in breakdowns), 2
        ),
    }

Real-Time Updates with WebSocket

For live dashboard updates, stream workflow state changes over WebSocket.

from fastapi import WebSocket, WebSocketDisconnect
import json

class DashboardBroadcaster:
    def __init__(self):
        self.connections: list[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.connections.append(ws)

    def disconnect(self, ws: WebSocket):
        self.connections.remove(ws)

    async def broadcast(self, event: dict):
        dead = []
        for ws in self.connections:
            try:
                await ws.send_json(event)
            except Exception:
                dead.append(ws)
        for ws in dead:
            self.connections.remove(ws)

broadcaster = DashboardBroadcaster()

@app.websocket("/ws/dashboard")
async def dashboard_ws(ws: WebSocket):
    await broadcaster.connect(ws)
    try:
        while True:
            await ws.receive_text()  # Keep connection alive
    except WebSocketDisconnect:
        broadcaster.disconnect(ws)

# Call this from your orchestrator when state changes
async def on_workflow_event(event_type: str, workflow_id: str, data: dict):
    await broadcaster.broadcast({
        "type": event_type,
        "workflow_id": workflow_id,
        "timestamp": datetime.utcnow().isoformat(),
        "data": data,
    })

FAQ

What refresh rate should the dashboard use?

For the overview panel, poll every 5-10 seconds. For individual workflow detail views, use WebSocket connections for real-time updates. Avoid polling faster than every 2 seconds — it creates unnecessary database load and the human eye cannot perceive changes faster than that anyway.

Should I build the dashboard frontend in React or use Grafana?

Use Grafana for metrics-heavy dashboards (latency percentiles, throughput charts, cost trends) since it integrates natively with Prometheus and requires no frontend code. Build a custom React or Next.js dashboard for workflow-specific views like timelines, step drill-downs, and action buttons (retry, cancel, pause) that Grafana cannot provide. Many teams use both.

How do I handle dashboard performance with thousands of workflows?

Implement server-side pagination, filtering, and aggregation. Never load all workflows into the frontend. Use database indexes on status, created_at, and workflow_name columns. For the overview metrics, pre-compute aggregates on a schedule rather than computing them on every request. A materialized view or Redis cache updated every 30 seconds works well for dashboard summary statistics.


#Dashboard #Visualization #AgentOrchestration #FastAPI #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.