Building a Real-Time AI Dashboard: Live Metrics, Streaming Logs, and Agent Status
Build a production-grade real-time dashboard for monitoring AI agents, featuring live metrics pipelines, streaming log aggregation, agent health indicators, and efficient frontend rendering with React.
Why AI Agents Need Real-Time Dashboards
Monitoring AI agents in production requires more than traditional APM tools. You need to see token throughput, model latency percentiles, tool call success rates, agent reasoning traces, and cost accumulation — all updating in real time. A well-built dashboard transforms a black-box AI system into an observable one where you can spot degradation before users notice.
The architecture follows three layers: a metrics collection backend that aggregates data from running agents, a streaming transport layer that pushes updates to the browser, and a frontend that renders efficiently without choking on high-frequency updates.
Backend: Metrics Collection and Aggregation
Start by instrumenting your agents to emit structured events. Each event carries a timestamp, agent ID, event type, and a payload with type-specific data.
import asyncio
import time
import json
from dataclasses import dataclass, asdict
from typing import Optional
from collections import defaultdict, deque
@dataclass
class AgentMetricEvent:
agent_id: str
event_type: str # "token", "tool_call", "error", "completion"
timestamp: float
payload: dict
class MetricsAggregator:
def __init__(self, window_seconds: int = 60):
self.window = window_seconds
self.events: deque[AgentMetricEvent] = deque()
self.subscribers: list[asyncio.Queue] = []
def record(self, event: AgentMetricEvent):
self.events.append(event)
self._prune_old_events()
snapshot = self._compute_snapshot()
for queue in self.subscribers:
try:
queue.put_nowait(snapshot)
except asyncio.QueueFull:
pass # Drop if subscriber is slow
def _prune_old_events(self):
cutoff = time.time() - self.window
while self.events and self.events[0].timestamp < cutoff:
self.events.popleft()
def _compute_snapshot(self) -> dict:
now = time.time()
recent = [e for e in self.events if e.timestamp > now - self.window]
tokens = [e for e in recent if e.event_type == "token"]
tool_calls = [e for e in recent if e.event_type == "tool_call"]
errors = [e for e in recent if e.event_type == "error"]
completions = [e for e in recent if e.event_type == "completion"]
latencies = [
e.payload.get("latency_ms", 0) for e in completions
]
latencies.sort()
return {
"timestamp": now,
"tokens_per_second": len(tokens) / max(self.window, 1),
"tool_calls_total": len(tool_calls),
"error_rate": len(errors) / max(len(recent), 1),
"completions": len(completions),
"p50_latency_ms": latencies[len(latencies) // 2] if latencies else 0,
"p99_latency_ms": latencies[int(len(latencies) * 0.99)] if latencies else 0,
"active_agents": len(set(e.agent_id for e in recent)),
}
def subscribe(self) -> asyncio.Queue:
queue = asyncio.Queue(maxsize=100)
self.subscribers.append(queue)
return queue
def unsubscribe(self, queue: asyncio.Queue):
self.subscribers.remove(queue)
aggregator = MetricsAggregator(window_seconds=60)
The aggregator uses a sliding window deque for memory efficiency. Old events are pruned on each insertion, keeping memory usage bounded. Subscribers receive computed snapshots rather than raw events, reducing frontend processing load.
Streaming Transport with SSE
For a monitoring dashboard, SSE is the right transport — the data flows one direction (server to browser), and we get automatic reconnection for free.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def metrics_stream():
queue = aggregator.subscribe()
try:
while True:
snapshot = await queue.get()
data = json.dumps(snapshot)
yield f"event: metrics\ndata: {data}\n\n"
finally:
aggregator.unsubscribe(queue)
@app.get("/api/dashboard/stream")
async def dashboard_stream():
return StreamingResponse(
metrics_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
Streaming Logs Endpoint
Agent logs need their own stream. Structured log events let the frontend filter and highlight based on severity or agent ID.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from collections import deque
log_buffer: deque[dict] = deque(maxlen=1000)
log_subscribers: list[asyncio.Queue] = []
def emit_agent_log(agent_id: str, level: str, message: str, metadata: dict = None):
entry = {
"timestamp": time.time(),
"agent_id": agent_id,
"level": level,
"message": message,
"metadata": metadata or {},
}
log_buffer.append(entry)
for q in log_subscribers:
try:
q.put_nowait(entry)
except asyncio.QueueFull:
pass
async def log_stream():
queue = asyncio.Queue(maxsize=200)
log_subscribers.append(queue)
try:
# Send recent history first
for entry in log_buffer:
yield f"event: log\ndata: {json.dumps(entry)}\n\n"
# Then stream new entries
while True:
entry = await queue.get()
yield f"event: log\ndata: {json.dumps(entry)}\n\n"
finally:
log_subscribers.remove(queue)
Sending the recent buffer on connection lets newly opened dashboards see immediate context instead of staring at a blank screen.
Frontend: Efficient React Rendering
High-frequency updates can overwhelm React if every SSE event triggers a re-render. Batch updates and use requestAnimationFrame to align rendering with the browser's paint cycle.
import { useState, useEffect, useRef, useCallback } from "react";
interface DashboardMetrics {
tokens_per_second: number;
error_rate: number;
p50_latency_ms: number;
p99_latency_ms: number;
active_agents: number;
}
function useMetricsStream(url: string): DashboardMetrics | null {
const [metrics, setMetrics] = useState<DashboardMetrics | null>(null);
const latestRef = useRef<DashboardMetrics | null>(null);
const rafRef = useRef<number>(0);
const scheduleUpdate = useCallback(() => {
if (rafRef.current) return;
rafRef.current = requestAnimationFrame(() => {
rafRef.current = 0;
if (latestRef.current) {
setMetrics({ ...latestRef.current });
}
});
}, []);
useEffect(() => {
const source = new EventSource(url);
source.addEventListener("metrics", (event) => {
latestRef.current = JSON.parse(event.data);
scheduleUpdate();
});
return () => {
source.close();
if (rafRef.current) cancelAnimationFrame(rafRef.current);
};
}, [url, scheduleUpdate]);
return metrics;
}
This hook stores the latest event in a ref (no re-render) and schedules a single state update per animation frame. Even if the server sends 30 events per second, React only re-renders at the display refresh rate.
FAQ
How do you handle dashboard access when there are hundreds of agents producing metrics?
Use server-side aggregation to pre-compute summary statistics rather than pushing raw events to the browser. The MetricsAggregator pattern shown above computes totals and percentiles server-side, so the browser receives one compact snapshot per update regardless of how many agents are running. For drill-down views, let the user select specific agents and open filtered streams that only include events from those agents.
What happens if the metrics aggregator crashes and loses in-memory data?
For production systems, persist metrics to a time-series database like TimescaleDB or InfluxDB alongside the in-memory aggregator. The in-memory layer serves real-time streaming, while the database provides historical data for trend analysis and post-incident investigation. On restart, the aggregator begins with an empty window and fills naturally within one window period (typically 60 seconds).
How do you test a real-time dashboard during development without running actual AI agents?
Build a metrics simulator that generates realistic event patterns — bursts of token events, periodic tool calls, occasional errors, and varying latency distributions. Run the simulator as a script that calls the same aggregator.record() method your real agents use. This lets you test the full pipeline including edge cases like error rate spikes and latency degradation without consuming API credits.
#Dashboard #RealTimeAI #Monitoring #React #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.