Building Agentic AI with Streaming: Real-Time Token-by-Token Output Patterns
Implement streaming in agentic AI systems with SSE, WebSockets, tool call streaming, multi-agent stream merging, and backpressure handling.
Why Streaming Changes the Agent Experience
Without streaming, the user submits a request and stares at a loading spinner while the LLM generates its entire response. For complex agent tasks that take 10-30 seconds (tool calls, multi-step reasoning, long responses), this creates a terrible user experience. The user does not know if the system is working, stuck, or has crashed.
Streaming transforms this experience. The user sees tokens appear in real-time as the LLM generates them, tool calls appear as they are dispatched, and results populate as they arrive. The perceived latency drops from the total generation time to the time-to-first-token, which is typically under one second.
This guide covers the transport mechanisms (SSE vs WebSocket), token-level streaming implementation, tool call streaming, multi-agent stream merging, client-side rendering, and backpressure handling.
SSE vs WebSocket for Agent Streaming
Two transport protocols dominate real-time agent communication. The right choice depends on your communication pattern.
Server-Sent Events (SSE)
SSE provides a unidirectional stream from server to client over a standard HTTP connection. The server sends events; the client listens. SSE is simpler to implement, works through most proxies and load balancers without special configuration, and automatically handles reconnection.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def agent_stream(user_message: str):
"""Generate SSE events from agent processing."""
# Signal processing start
yield f"event: status\ndata: {json.dumps({'status': 'thinking'})}\n\n"
async for chunk in llm_client.stream_chat(
messages=[{"role": "user", "content": user_message}]
):
if chunk.type == "content_block_delta":
yield f"event: token\ndata: {json.dumps({'text': chunk.delta.text})}\n\n"
elif chunk.type == "tool_use":
yield f"event: tool_call\ndata: {json.dumps({'tool': chunk.name, 'args': chunk.input})}\n\n"
yield f"event: done\ndata: {json.dumps({'status': 'complete'})}\n\n"
@app.post("/api/agent/chat")
async def chat_endpoint(request: ChatRequest):
return StreamingResponse(
agent_stream(request.message),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
Client-side SSE consumption:
async function streamAgentResponse(message) {
const response = await fetch("/api/agent/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const events = buffer.split("\n\n");
buffer = events.pop(); // Keep incomplete event in buffer
for (const event of events) {
const lines = event.split("\n");
const eventType = lines[0]?.replace("event: ", "");
const data = JSON.parse(lines[1]?.replace("data: ", "") || "{}");
switch (eventType) {
case "token":
appendToResponse(data.text);
break;
case "tool_call":
showToolCallIndicator(data.tool);
break;
case "done":
finalizeResponse();
break;
}
}
}
}
WebSocket
WebSockets provide bidirectional communication. Both client and server can send messages at any time. Use WebSockets when you need bidirectional streaming — the user can send additional messages or interrupt the agent while it is still responding.
from fastapi import WebSocket
@app.websocket("/ws/agent/chat")
async def websocket_agent(ws: WebSocket):
await ws.accept()
session = AgentSession()
try:
while True:
# Receive user message
data = await ws.receive_json()
if data["type"] == "message":
# Stream agent response
async for event in session.process_message(data["content"]):
await ws.send_json(event)
elif data["type"] == "interrupt":
await session.cancel_current()
await ws.send_json({"type": "interrupted"})
except WebSocketDisconnect:
await session.cleanup()
When to Use Which
| Feature | SSE | WebSocket |
|---|---|---|
| Direction | Server to client only | Bidirectional |
| Protocol | HTTP | Custom (upgrade from HTTP) |
| Reconnection | Automatic | Manual implementation |
| Proxy compatibility | Excellent | Requires configuration |
| Barge-in / interrupt | Not native | Natural fit |
| Load balancer | Standard HTTP | Sticky sessions needed |
| Best for | Simple chat agents | Voice agents, interactive agents |
For most text-based chat agents, SSE is the better choice. It is simpler, more reliable through infrastructure, and sufficient for the server-to-client streaming pattern. Use WebSockets when you need bidirectional communication, such as voice agents where the user can interrupt or interactive agents where the user sends incremental input.
Token-Level Streaming from LLM APIs
Modern LLM APIs support streaming natively. Here is how to handle streams from the major providers.
OpenAI Streaming
async def stream_openai(messages: list[dict]):
stream = await openai_client.chat.completions.create(
model="gpt-4o",
messages=messages,
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
yield {"type": "token", "text": delta.content}
if delta.tool_calls:
for tc in delta.tool_calls:
yield {
"type": "tool_call_delta",
"index": tc.index,
"id": tc.id,
"name": tc.function.name if tc.function.name else None,
"arguments_delta": tc.function.arguments,
}
if chunk.choices[0].finish_reason:
yield {
"type": "finish",
"reason": chunk.choices[0].finish_reason,
}
Anthropic Streaming
async def stream_anthropic(messages: list[dict], system: str):
async with anthropic_client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system=system,
messages=messages,
) as stream:
async for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
yield {"type": "token", "text": event.delta.text}
elif event.delta.type == "input_json_delta":
yield {
"type": "tool_input_delta",
"partial_json": event.delta.partial_json,
}
elif event.type == "content_block_start":
if event.content_block.type == "tool_use":
yield {
"type": "tool_call_start",
"tool_name": event.content_block.name,
"tool_id": event.content_block.id,
}
elif event.type == "message_stop":
yield {"type": "finish"}
Tool Call Streaming
When an agent calls a tool, the user should see the tool invocation in real-time, not just the final result. This builds trust and helps users understand what the agent is doing.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Streaming Tool Call Lifecycle
class StreamingToolExecutor:
async def execute_and_stream(
self,
tool_name: str,
tool_args: dict,
stream_callback,
):
# 1. Signal tool call start
await stream_callback({
"type": "tool_start",
"tool": tool_name,
"args": tool_args,
})
# 2. Execute tool
try:
result = await self.tools[tool_name](**tool_args)
# 3. Signal tool result
await stream_callback({
"type": "tool_result",
"tool": tool_name,
"result": result,
"success": True,
})
return result
except Exception as e:
await stream_callback({
"type": "tool_error",
"tool": tool_name,
"error": str(e),
})
raise
Progressive Tool Results
Some tools can stream partial results. A database query can stream rows as they arrive rather than waiting for the full result set. A web search can stream individual results as they are fetched.
async def search_knowledge_base_streaming(
query: str,
stream_callback,
):
embedding = await generate_embedding(query)
results = await vector_store.query(vector=embedding, top_k=10)
for i, result in enumerate(results.matches):
await stream_callback({
"type": "search_result",
"index": i,
"title": result.metadata["title"],
"snippet": result.metadata["content"][:200],
"score": result.score,
})
# Small delay for visual effect on client
await asyncio.sleep(0.05)
Multi-Agent Stream Merging
In multi-agent architectures, multiple agents may generate output that needs to be streamed to the user as a coherent experience. A supervisor agent might dispatch work to specialist agents and merge their responses.
class MultiAgentStreamMerger:
def __init__(self):
self.streams: dict[str, asyncio.Queue] = {}
self.output_queue = asyncio.Queue()
async def add_agent_stream(
self,
agent_id: str,
stream,
):
async for event in stream:
await self.output_queue.put({
"agent_id": agent_id,
**event,
})
await self.output_queue.put({
"agent_id": agent_id,
"type": "agent_done",
})
async def merged_stream(self, agent_streams: dict[str, any]):
# Start all agent streams concurrently
tasks = [
asyncio.create_task(
self.add_agent_stream(agent_id, stream)
)
for agent_id, stream in agent_streams.items()
]
active_agents = set(agent_streams.keys())
while active_agents:
event = await self.output_queue.get()
if event["type"] == "agent_done":
active_agents.discard(event["agent_id"])
else:
yield event
# Ensure all tasks complete
await asyncio.gather(*tasks)
Client-Side Rendering of Streamed Content
Rendering streamed tokens requires handling partial markdown, code blocks, and structured content that arrives incrementally.
Incremental Markdown Rendering
class StreamRenderer {
constructor(containerElement) {
this.container = containerElement;
this.buffer = "";
this.renderedLength = 0;
}
appendToken(text) {
this.buffer += text;
this.render();
}
render() {
// Render complete paragraphs and keep incomplete ones in buffer
const html = marked.parse(this.buffer, { breaks: true });
this.container.innerHTML = html;
// Auto-scroll to bottom
this.container.scrollTop = this.container.scrollHeight;
}
appendToolCall(toolName, args) {
const toolEl = document.createElement("div");
toolEl.className = "tool-call-indicator";
toolEl.innerHTML =
'<span class="tool-icon">⚙</span> ' +
"Calling <strong>" + toolName + "</strong>...";
this.container.appendChild(toolEl);
}
appendToolResult(toolName, result) {
const resultEl = document.createElement("div");
resultEl.className = "tool-result";
resultEl.innerHTML =
'<span class="check-icon">✓</span> ' +
"<strong>" + toolName + "</strong> completed";
this.container.appendChild(resultEl);
}
}
Backpressure Handling
Backpressure occurs when the server produces data faster than the client can consume it. Without backpressure handling, buffers grow unbounded and the server runs out of memory.
Server-Side Backpressure
class BackpressureAwareStream:
def __init__(self, max_buffer_size: int = 100):
self.queue = asyncio.Queue(maxsize=max_buffer_size)
self.dropped_count = 0
async def push(self, event: dict):
try:
self.queue.put_nowait(event)
except asyncio.QueueFull:
# Drop oldest non-critical events
if event.get("type") in ("token", "status"):
try:
self.queue.get_nowait() # Remove oldest
self.queue.put_nowait(event)
self.dropped_count += 1
except asyncio.QueueEmpty:
pass
else:
# Critical events (tool_result, finish) always wait
await self.queue.put(event)
async def consume(self):
while True:
event = await self.queue.get()
yield event
if event.get("type") == "finish":
break
Client-Side Flow Control
On the client, implement flow control by pausing the stream reader when the render queue is too large and resuming when it drains.
class FlowControlledReader {
constructor(reader, renderer, highWaterMark = 50) {
this.reader = reader;
this.renderer = renderer;
this.highWaterMark = highWaterMark;
this.pendingRenders = 0;
}
async start() {
const decoder = new TextDecoder();
while (true) {
// Pause if too many pending renders
while (this.pendingRenders > this.highWaterMark) {
await new Promise((r) => setTimeout(r, 10));
}
const { done, value } = await this.reader.read();
if (done) break;
this.pendingRenders++;
requestAnimationFrame(() => {
this.renderer.appendToken(decoder.decode(value));
this.pendingRenders--;
});
}
}
}
Production Streaming Considerations
Nginx and reverse proxy configuration: Most reverse proxies buffer responses by default, which defeats streaming. Disable response buffering with X-Accel-Buffering: no for Nginx, or configure proxy_buffering off at the location level.
Connection timeouts: Streaming connections can be long-lived. Configure your load balancer and reverse proxy timeouts to accommodate the longest expected agent response time, plus a margin. A typical setting is 120-300 seconds.
Error recovery mid-stream: If the LLM API connection drops mid-response, the client should see an error indicator and have the option to retry. Include stream IDs so the client can request resumption from a specific point.
Monitoring: Track time-to-first-token, total stream duration, tokens-per-second throughput, client disconnection rates, and backpressure events. These metrics reveal both LLM performance issues and client-side rendering bottlenecks.
Frequently Asked Questions
What is the time-to-first-token for streaming LLM responses?
Time-to-first-token (TTFT) varies by model and provider. GPT-4o typically achieves 200-500ms TTFT, Claude averages 300-700ms, and Gemini is typically 200-400ms. These times can increase under load. TTFT is the most important latency metric for user experience because it determines how quickly the user sees the agent begin responding.
Should I use SSE or WebSocket for a chat-based agent?
For most chat agents, SSE is the better choice. It is simpler to implement, works reliably through proxies and load balancers without special configuration, handles reconnection automatically, and provides everything you need for server-to-client streaming. Use WebSocket only if you need bidirectional communication — for example, if the user can interrupt the agent mid-response or if you are building a voice interface.
How do you handle streaming when the agent makes tool calls mid-response?
The stream pauses text tokens while the agent executes a tool call. Send a tool_call_start event so the client can show a loading indicator, then send the tool_result event when the tool completes, followed by resuming text token streaming as the agent incorporates the tool result into its response. The client renders this as: partial text, then a tool call indicator, then more text.
What happens if the client disconnects during a stream?
On the server side, detect client disconnection through write failures or connection close events. When detected, cancel the ongoing LLM API call to stop incurring costs and free server resources. Clean up any session state. If the user reconnects and sends the same request, generate a fresh response rather than trying to resume the interrupted stream.
How do you test streaming agent endpoints?
Test at multiple levels: unit test the stream generation logic by collecting all emitted events into a list and asserting their types and order, integration test the HTTP endpoint by reading the SSE stream and validating event format, and load test with many concurrent streaming connections to verify backpressure handling and server stability under load. Tools like k6 and Artillery support SSE and WebSocket load testing.
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.