Streaming Agent Architectures: Real-Time Token-by-Token Output with Tool Call Interleaving
Master the architecture of streaming AI agents that deliver token-by-token output while interleaving tool calls, using Server-Sent Events and progressive rendering to create responsive user experiences.
Why Streaming Matters for Agents
A non-streaming agent calls the LLM, waits for the full response, executes any tool calls, waits again, and finally returns everything at once. The user stares at a spinner for 5-15 seconds. Streaming agents send tokens to the client the moment they are generated. When the model decides to call a tool, the client sees a tool execution indicator, and when the tool returns, the model's continuation streams immediately.
The result is an agent that feels instantaneous. The first token appears in 200-400ms. Tool calls appear as they happen. The architecture is more complex, but the user experience difference is dramatic.
Server-Sent Events: The Transport Layer
SSE is the simplest reliable protocol for server-to-client streaming. Unlike WebSockets, it works over standard HTTP, passes through all proxies and CDNs, and auto-reconnects on failure.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
import json
app = FastAPI()
agent = Agent(
name="Streaming Assistant",
instructions="You are a helpful assistant. Use tools when needed.",
tools=[search_tool, calculator_tool],
)
@app.get("/api/chat/stream")
async def stream_chat(message: str):
async def event_generator():
result = Runner.run_streamed(agent, message)
async for event in result.stream_events():
if event.type == "raw_response_event":
# Token-by-token text output
delta = event.data
if hasattr(delta, "delta") and delta.delta:
yield f"data: {json.dumps({'type': 'token', 'content': delta.delta})}\n\n"
elif event.type == "run_item_stream_event":
item = event.item
item_type = type(item).__name__
if "ToolCall" in item_type:
yield f"data: {json.dumps({'type': 'tool_start', 'tool': str(item)})}\n\n"
elif "ToolOutput" in item_type:
yield f"data: {json.dumps({'type': 'tool_result', 'output': str(item)})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
The X-Accel-Buffering: no header is critical — it tells Nginx and similar reverse proxies not to buffer the response, which would defeat the purpose of streaming.
Client-Side Stream Consumption
On the frontend, use the EventSource API or a fetch-based reader for more control.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
// Using fetch for full control over the stream
async function streamChat(message: string, onEvent: (event: StreamEvent) => void) {
const response = await fetch(
`/api/chat/stream?message=${encodeURIComponent(message)}`
);
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const event = JSON.parse(line.slice(6));
onEvent(event);
}
}
}
}
// React component that renders the stream
function ChatMessage() {
const [tokens, setTokens] = useState("");
const [toolCalls, setToolCalls] = useState<string[]>([]);
const handleStream = (event: StreamEvent) => {
switch (event.type) {
case "token":
setTokens((prev) => prev + event.content);
break;
case "tool_start":
setToolCalls((prev) => [...prev, `Calling: ${event.tool}`]);
break;
case "tool_result":
setToolCalls((prev) => [...prev, `Result: ${event.output}`]);
break;
}
};
return (
<div>
<p>{tokens}</p>
{toolCalls.map((tc, i) => (
<div key={i} className="text-sm text-gray-500">{tc}</div>
))}
</div>
);
}
Handling Partial Tool Calls
One of the trickiest parts of streaming agents is partial tool calls. The LLM might stream the tool name and arguments token by token. You need to buffer the tool call until it is complete before executing it.
class ToolCallBuffer:
"""Accumulates partial tool call tokens until the call is complete."""
def __init__(self):
self.active_calls: dict[int, dict] = {}
def process_delta(self, delta) -> list[dict] | None:
"""Process a streaming delta, return complete tool calls if any."""
completed = []
if hasattr(delta, "tool_calls") and delta.tool_calls:
for tc in delta.tool_calls:
idx = tc.index
if idx not in self.active_calls:
self.active_calls[idx] = {
"name": "",
"arguments": "",
}
if tc.function and tc.function.name:
self.active_calls[idx]["name"] += tc.function.name
if tc.function and tc.function.arguments:
self.active_calls[idx]["arguments"] += tc.function.arguments
# Check if any calls are complete (valid JSON arguments)
for idx in list(self.active_calls.keys()):
call = self.active_calls[idx]
try:
json.loads(call["arguments"])
completed.append(call)
del self.active_calls[idx]
except json.JSONDecodeError:
pass # Still accumulating
return completed if completed else None
Progressive Rendering Pattern
Instead of waiting for the entire agent response, render each phase as it completes.
interface StreamPhase {
type: "thinking" | "tool_executing" | "responding";
content: string;
}
function ProgressiveResponse({ phases }: { phases: StreamPhase[] }) {
return (
<div className="space-y-2">
{phases.map((phase, i) => (
<div key={i} className={phaseStyles[phase.type]}>
{phase.type === "tool_executing" && (
<span className="animate-spin mr-2">⚙</span>
)}
{phase.content}
</div>
))}
</div>
);
}
Backpressure and Connection Management
In production, manage connections carefully to prevent resource exhaustion.
import asyncio
from contextlib import asynccontextmanager
class ConnectionManager:
def __init__(self, max_concurrent: int = 100):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.active_connections: set[str] = set()
@asynccontextmanager
async def connect(self, connection_id: str):
await self.semaphore.acquire()
self.active_connections.add(connection_id)
try:
yield
finally:
self.active_connections.discard(connection_id)
self.semaphore.release()
manager = ConnectionManager(max_concurrent=100)
@app.get("/api/chat/stream")
async def stream_chat(message: str, connection_id: str):
async with manager.connect(connection_id):
async def event_generator():
# ... streaming logic
pass
return StreamingResponse(event_generator(), media_type="text/event-stream")
FAQ
How do you handle client disconnects mid-stream?
FastAPI detects client disconnection when the response write fails. Wrap your generator in a try/except that catches ConnectionResetError and BrokenPipeError. When caught, cancel any pending LLM calls or tool executions to free resources. The OpenAI SDK supports cancellation tokens for this purpose.
What happens if a tool call takes 30 seconds during a stream?
Send heartbeat events during long tool executions to keep the connection alive and show progress. Emit a tool_start event immediately, then send periodic tool_progress events (e.g., every 2 seconds), and finally emit tool_result when the tool completes. This prevents connection timeouts and keeps the user informed.
Can you stream from multiple agents in parallel?
Yes, but it requires careful event multiplexing. Assign each agent stream a unique channel ID. On the client, demultiplex events by channel and render each agent's output in its designated area. Use async generators with asyncio.as_completed() or merge streams to interleave events from concurrent agents.
#StreamingAI #ServerSentEvents #RealTimeAI #AgentStreaming #ProgressiveRendering #ToolInterleaving #FastAPI #SSE
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.