Building an Agent Orchestration Dashboard: Visualizing Workflow Status and Performance
Learn how to build a real-time orchestration dashboard for AI agent workflows. Covers UI components, status tracking, timeline views, error drill-down, and the backend API that powers it all.
Why Build a Custom Dashboard
Off-the-shelf orchestration platforms include their own dashboards, but custom dashboards serve a different purpose. They surface domain-specific insights — agent quality scores, LLM cost breakdowns, model performance comparisons — that generic workflow UIs do not provide.
A well-designed orchestration dashboard answers three questions at a glance: What is running right now? What failed recently and why? How much is this costing?
Dashboard API with FastAPI
The backend API provides endpoints for the dashboard frontend to consume. Start with the data models and core endpoints.
from fastapi import FastAPI, Query
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional
app = FastAPI(title="Agent Orchestration Dashboard API")
class WorkflowSummary(BaseModel):
id: str
name: str
status: str
started_at: datetime
duration_seconds: float | None
step_count: int
current_step: str | None
error: str | None
total_tokens: int
cost_usd: float
class DashboardOverview(BaseModel):
active_workflows: int
completed_today: int
failed_today: int
success_rate: float
total_cost_today_usd: float
total_tokens_today: int
avg_duration_seconds: float
@app.get("/api/dashboard/overview")
async def get_overview(
hours: int = Query(default=24, ge=1, le=168),
) -> DashboardOverview:
"""Get high-level dashboard statistics."""
since = datetime.utcnow() - timedelta(hours=hours)
active = await workflow_store.count(status="running")
completed = await workflow_store.count(
status="completed", since=since
)
failed = await workflow_store.count(
status="failed", since=since
)
total = completed + failed
success_rate = completed / total if total > 0 else 1.0
cost_data = await metrics_store.sum_cost(since=since)
token_data = await metrics_store.sum_tokens(since=since)
avg_duration = await metrics_store.avg_duration(since=since)
return DashboardOverview(
active_workflows=active,
completed_today=completed,
failed_today=failed,
success_rate=round(success_rate, 4),
total_cost_today_usd=round(cost_data, 2),
total_tokens_today=token_data,
avg_duration_seconds=round(avg_duration, 2),
)
Workflow List and Filtering
class WorkflowFilter(BaseModel):
status: Optional[str] = None
name: Optional[str] = None
since: Optional[datetime] = None
until: Optional[datetime] = None
min_cost_usd: Optional[float] = None
has_errors: Optional[bool] = None
@app.get("/api/dashboard/workflows")
async def list_workflows(
status: Optional[str] = None,
name: Optional[str] = None,
hours: int = Query(default=24, ge=1, le=168),
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
) -> dict:
"""List workflows with filtering and pagination."""
since = datetime.utcnow() - timedelta(hours=hours)
workflows = await workflow_store.query(
status=status,
name=name,
since=since,
limit=limit,
offset=offset,
)
total = await workflow_store.count(
status=status, name=name, since=since
)
return {
"workflows": [
WorkflowSummary(
id=wf.id,
name=wf.name,
status=wf.status,
started_at=wf.created_at,
duration_seconds=wf.duration_seconds,
step_count=len(wf.steps),
current_step=_get_current_step(wf),
error=_get_last_error(wf),
total_tokens=wf.metrics.total_tokens,
cost_usd=wf.metrics.total_cost_usd,
)
for wf in workflows
],
"total": total,
"limit": limit,
"offset": offset,
}
def _get_current_step(wf) -> str | None:
running = [s for s in wf.steps if s.status == "running"]
return running[0].name if running else None
def _get_last_error(wf) -> str | None:
failed = [s for s in wf.steps if s.status == "failed"]
return failed[-1].error if failed else None
Timeline View API
The timeline view shows each step's duration and status as a horizontal bar chart, making it easy to spot bottlenecks.
class TimelineStep(BaseModel):
name: str
status: str
started_at: datetime | None
completed_at: datetime | None
duration_ms: float | None
attempts: int
error: str | None
llm_calls: int
tokens_used: int
@app.get("/api/dashboard/workflows/{workflow_id}/timeline")
async def get_workflow_timeline(workflow_id: str) -> dict:
"""Get detailed timeline for a specific workflow."""
wf = await workflow_store.load(workflow_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
steps = []
for step in wf.steps:
llm_calls_for_step = [
call for call in wf.metrics.llm_calls
if call.get("step_name") == step.name
]
steps.append(TimelineStep(
name=step.name,
status=step.status,
started_at=step.started_at,
completed_at=step.completed_at,
duration_ms=(
(step.completed_at - step.started_at).total_seconds()
* 1000
if step.started_at and step.completed_at
else None
),
attempts=step.attempts,
error=step.error,
llm_calls=len(llm_calls_for_step),
tokens_used=sum(
c.get("input_tokens", 0) + c.get("output_tokens", 0)
for c in llm_calls_for_step
),
))
return {
"workflow_id": wf.id,
"workflow_name": wf.name,
"total_duration_ms": (
(wf.updated_at - wf.created_at).total_seconds() * 1000
),
"steps": steps,
}
Error Drill-Down API
When a workflow fails, the dashboard needs to show exactly what went wrong at each level.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
class ErrorDetail(BaseModel):
workflow_id: str
step_name: str
error_type: str
error_message: str
stack_trace: str | None
attempt_number: int
timestamp: datetime
context_snapshot: dict
@app.get("/api/dashboard/workflows/{workflow_id}/errors")
async def get_workflow_errors(workflow_id: str) -> list[ErrorDetail]:
"""Get detailed error information for a workflow."""
wf = await workflow_store.load(workflow_id)
if not wf:
raise HTTPException(status_code=404, detail="Workflow not found")
errors = []
for step in wf.steps:
if step.error:
errors.append(ErrorDetail(
workflow_id=wf.id,
step_name=step.name,
error_type=step.error_type or "Unknown",
error_message=step.error,
stack_trace=step.stack_trace,
attempt_number=step.attempts,
timestamp=step.completed_at or wf.updated_at,
context_snapshot={
k: str(v)[:200] # Truncate large values
for k, v in wf.context.items()
if not k.startswith("_")
},
))
return errors
Cost Breakdown API
class CostBreakdown(BaseModel):
model: str
call_count: int
total_tokens: int
input_tokens: int
output_tokens: int
total_cost_usd: float
@app.get("/api/dashboard/costs")
async def get_cost_breakdown(
hours: int = Query(default=24, ge=1, le=168),
group_by: str = Query(default="model", enum=["model", "workflow", "step"]),
) -> dict:
"""Get cost breakdown by model, workflow, or step."""
since = datetime.utcnow() - timedelta(hours=hours)
raw_data = await metrics_store.get_llm_costs(
since=since, group_by=group_by
)
breakdowns = [
CostBreakdown(
model=row["group_key"],
call_count=row["call_count"],
total_tokens=row["total_tokens"],
input_tokens=row["input_tokens"],
output_tokens=row["output_tokens"],
total_cost_usd=round(row["total_cost"], 4),
)
for row in raw_data
]
return {
"period_hours": hours,
"breakdowns": breakdowns,
"total_cost_usd": round(
sum(b.total_cost_usd for b in breakdowns), 2
),
}
Real-Time Updates with WebSocket
For live dashboard updates, stream workflow state changes over WebSocket.
from fastapi import WebSocket, WebSocketDisconnect
import json
class DashboardBroadcaster:
def __init__(self):
self.connections: list[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.connections.append(ws)
def disconnect(self, ws: WebSocket):
self.connections.remove(ws)
async def broadcast(self, event: dict):
dead = []
for ws in self.connections:
try:
await ws.send_json(event)
except Exception:
dead.append(ws)
for ws in dead:
self.connections.remove(ws)
broadcaster = DashboardBroadcaster()
@app.websocket("/ws/dashboard")
async def dashboard_ws(ws: WebSocket):
await broadcaster.connect(ws)
try:
while True:
await ws.receive_text() # Keep connection alive
except WebSocketDisconnect:
broadcaster.disconnect(ws)
# Call this from your orchestrator when state changes
async def on_workflow_event(event_type: str, workflow_id: str, data: dict):
await broadcaster.broadcast({
"type": event_type,
"workflow_id": workflow_id,
"timestamp": datetime.utcnow().isoformat(),
"data": data,
})
FAQ
What refresh rate should the dashboard use?
For the overview panel, poll every 5-10 seconds. For individual workflow detail views, use WebSocket connections for real-time updates. Avoid polling faster than every 2 seconds — it creates unnecessary database load and the human eye cannot perceive changes faster than that anyway.
Should I build the dashboard frontend in React or use Grafana?
Use Grafana for metrics-heavy dashboards (latency percentiles, throughput charts, cost trends) since it integrates natively with Prometheus and requires no frontend code. Build a custom React or Next.js dashboard for workflow-specific views like timelines, step drill-downs, and action buttons (retry, cancel, pause) that Grafana cannot provide. Many teams use both.
How do I handle dashboard performance with thousands of workflows?
Implement server-side pagination, filtering, and aggregation. Never load all workflows into the frontend. Use database indexes on status, created_at, and workflow_name columns. For the overview metrics, pre-compute aggregates on a schedule rather than computing them on every request. A materialized view or Redis cache updated every 30 seconds works well for dashboard summary statistics.
#Dashboard #Visualization #AgentOrchestration #FastAPI #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.