Skip to content
Learn Agentic AI9 min read0 views

Building Agentic AI with Streaming: Real-Time Token-by-Token Output Patterns

Implement streaming in agentic AI systems with SSE, WebSockets, tool call streaming, multi-agent stream merging, and backpressure handling.

Why Streaming Changes the Agent Experience

Without streaming, the user submits a request and stares at a loading spinner while the LLM generates its entire response. For complex agent tasks that take 10-30 seconds (tool calls, multi-step reasoning, long responses), this creates a terrible user experience. The user does not know if the system is working, stuck, or has crashed.

Streaming transforms this experience. The user sees tokens appear in real-time as the LLM generates them, tool calls appear as they are dispatched, and results populate as they arrive. The perceived latency drops from the total generation time to the time-to-first-token, which is typically under one second.

This guide covers the transport mechanisms (SSE vs WebSocket), token-level streaming implementation, tool call streaming, multi-agent stream merging, client-side rendering, and backpressure handling.

SSE vs WebSocket for Agent Streaming

Two transport protocols dominate real-time agent communication. The right choice depends on your communication pattern.

Server-Sent Events (SSE)

SSE provides a unidirectional stream from server to client over a standard HTTP connection. The server sends events; the client listens. SSE is simpler to implement, works through most proxies and load balancers without special configuration, and automatically handles reconnection.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def agent_stream(user_message: str):
    """Generate SSE events from agent processing."""
    # Signal processing start
    yield f"event: status\ndata: {json.dumps({'status': 'thinking'})}\n\n"

    async for chunk in llm_client.stream_chat(
        messages=[{"role": "user", "content": user_message}]
    ):
        if chunk.type == "content_block_delta":
            yield f"event: token\ndata: {json.dumps({'text': chunk.delta.text})}\n\n"
        elif chunk.type == "tool_use":
            yield f"event: tool_call\ndata: {json.dumps({'tool': chunk.name, 'args': chunk.input})}\n\n"

    yield f"event: done\ndata: {json.dumps({'status': 'complete'})}\n\n"

@app.post("/api/agent/chat")
async def chat_endpoint(request: ChatRequest):
    return StreamingResponse(
        agent_stream(request.message),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        },
    )

Client-side SSE consumption:

async function streamAgentResponse(message) {
  const response = await fetch("/api/agent/chat", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const events = buffer.split("\n\n");
    buffer = events.pop(); // Keep incomplete event in buffer

    for (const event of events) {
      const lines = event.split("\n");
      const eventType = lines[0]?.replace("event: ", "");
      const data = JSON.parse(lines[1]?.replace("data: ", "") || "{}");

      switch (eventType) {
        case "token":
          appendToResponse(data.text);
          break;
        case "tool_call":
          showToolCallIndicator(data.tool);
          break;
        case "done":
          finalizeResponse();
          break;
      }
    }
  }
}

WebSocket

WebSockets provide bidirectional communication. Both client and server can send messages at any time. Use WebSockets when you need bidirectional streaming — the user can send additional messages or interrupt the agent while it is still responding.

from fastapi import WebSocket

@app.websocket("/ws/agent/chat")
async def websocket_agent(ws: WebSocket):
    await ws.accept()
    session = AgentSession()

    try:
        while True:
            # Receive user message
            data = await ws.receive_json()

            if data["type"] == "message":
                # Stream agent response
                async for event in session.process_message(data["content"]):
                    await ws.send_json(event)

            elif data["type"] == "interrupt":
                await session.cancel_current()
                await ws.send_json({"type": "interrupted"})

    except WebSocketDisconnect:
        await session.cleanup()

When to Use Which

Feature SSE WebSocket
Direction Server to client only Bidirectional
Protocol HTTP Custom (upgrade from HTTP)
Reconnection Automatic Manual implementation
Proxy compatibility Excellent Requires configuration
Barge-in / interrupt Not native Natural fit
Load balancer Standard HTTP Sticky sessions needed
Best for Simple chat agents Voice agents, interactive agents

For most text-based chat agents, SSE is the better choice. It is simpler, more reliable through infrastructure, and sufficient for the server-to-client streaming pattern. Use WebSockets when you need bidirectional communication, such as voice agents where the user can interrupt or interactive agents where the user sends incremental input.

Token-Level Streaming from LLM APIs

Modern LLM APIs support streaming natively. Here is how to handle streams from the major providers.

OpenAI Streaming

async def stream_openai(messages: list[dict]):
    stream = await openai_client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        stream=True,
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            yield {"type": "token", "text": delta.content}
        if delta.tool_calls:
            for tc in delta.tool_calls:
                yield {
                    "type": "tool_call_delta",
                    "index": tc.index,
                    "id": tc.id,
                    "name": tc.function.name if tc.function.name else None,
                    "arguments_delta": tc.function.arguments,
                }

        if chunk.choices[0].finish_reason:
            yield {
                "type": "finish",
                "reason": chunk.choices[0].finish_reason,
            }

Anthropic Streaming

async def stream_anthropic(messages: list[dict], system: str):
    async with anthropic_client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        system=system,
        messages=messages,
    ) as stream:
        async for event in stream:
            if event.type == "content_block_delta":
                if event.delta.type == "text_delta":
                    yield {"type": "token", "text": event.delta.text}
                elif event.delta.type == "input_json_delta":
                    yield {
                        "type": "tool_input_delta",
                        "partial_json": event.delta.partial_json,
                    }
            elif event.type == "content_block_start":
                if event.content_block.type == "tool_use":
                    yield {
                        "type": "tool_call_start",
                        "tool_name": event.content_block.name,
                        "tool_id": event.content_block.id,
                    }
            elif event.type == "message_stop":
                yield {"type": "finish"}

Tool Call Streaming

When an agent calls a tool, the user should see the tool invocation in real-time, not just the final result. This builds trust and helps users understand what the agent is doing.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Streaming Tool Call Lifecycle

class StreamingToolExecutor:
    async def execute_and_stream(
        self,
        tool_name: str,
        tool_args: dict,
        stream_callback,
    ):
        # 1. Signal tool call start
        await stream_callback({
            "type": "tool_start",
            "tool": tool_name,
            "args": tool_args,
        })

        # 2. Execute tool
        try:
            result = await self.tools[tool_name](**tool_args)

            # 3. Signal tool result
            await stream_callback({
                "type": "tool_result",
                "tool": tool_name,
                "result": result,
                "success": True,
            })
            return result

        except Exception as e:
            await stream_callback({
                "type": "tool_error",
                "tool": tool_name,
                "error": str(e),
            })
            raise

Progressive Tool Results

Some tools can stream partial results. A database query can stream rows as they arrive rather than waiting for the full result set. A web search can stream individual results as they are fetched.

async def search_knowledge_base_streaming(
    query: str,
    stream_callback,
):
    embedding = await generate_embedding(query)
    results = await vector_store.query(vector=embedding, top_k=10)

    for i, result in enumerate(results.matches):
        await stream_callback({
            "type": "search_result",
            "index": i,
            "title": result.metadata["title"],
            "snippet": result.metadata["content"][:200],
            "score": result.score,
        })
        # Small delay for visual effect on client
        await asyncio.sleep(0.05)

Multi-Agent Stream Merging

In multi-agent architectures, multiple agents may generate output that needs to be streamed to the user as a coherent experience. A supervisor agent might dispatch work to specialist agents and merge their responses.

class MultiAgentStreamMerger:
    def __init__(self):
        self.streams: dict[str, asyncio.Queue] = {}
        self.output_queue = asyncio.Queue()

    async def add_agent_stream(
        self,
        agent_id: str,
        stream,
    ):
        async for event in stream:
            await self.output_queue.put({
                "agent_id": agent_id,
                **event,
            })
        await self.output_queue.put({
            "agent_id": agent_id,
            "type": "agent_done",
        })

    async def merged_stream(self, agent_streams: dict[str, any]):
        # Start all agent streams concurrently
        tasks = [
            asyncio.create_task(
                self.add_agent_stream(agent_id, stream)
            )
            for agent_id, stream in agent_streams.items()
        ]

        active_agents = set(agent_streams.keys())

        while active_agents:
            event = await self.output_queue.get()

            if event["type"] == "agent_done":
                active_agents.discard(event["agent_id"])
            else:
                yield event

        # Ensure all tasks complete
        await asyncio.gather(*tasks)

Client-Side Rendering of Streamed Content

Rendering streamed tokens requires handling partial markdown, code blocks, and structured content that arrives incrementally.

Incremental Markdown Rendering

class StreamRenderer {
  constructor(containerElement) {
    this.container = containerElement;
    this.buffer = "";
    this.renderedLength = 0;
  }

  appendToken(text) {
    this.buffer += text;
    this.render();
  }

  render() {
    // Render complete paragraphs and keep incomplete ones in buffer
    const html = marked.parse(this.buffer, { breaks: true });
    this.container.innerHTML = html;

    // Auto-scroll to bottom
    this.container.scrollTop = this.container.scrollHeight;
  }

  appendToolCall(toolName, args) {
    const toolEl = document.createElement("div");
    toolEl.className = "tool-call-indicator";
    toolEl.innerHTML =
      '<span class="tool-icon">&#9881;</span> ' +
      "Calling <strong>" + toolName + "</strong>...";
    this.container.appendChild(toolEl);
  }

  appendToolResult(toolName, result) {
    const resultEl = document.createElement("div");
    resultEl.className = "tool-result";
    resultEl.innerHTML =
      '<span class="check-icon">&#10003;</span> ' +
      "<strong>" + toolName + "</strong> completed";
    this.container.appendChild(resultEl);
  }
}

Backpressure Handling

Backpressure occurs when the server produces data faster than the client can consume it. Without backpressure handling, buffers grow unbounded and the server runs out of memory.

Server-Side Backpressure

class BackpressureAwareStream:
    def __init__(self, max_buffer_size: int = 100):
        self.queue = asyncio.Queue(maxsize=max_buffer_size)
        self.dropped_count = 0

    async def push(self, event: dict):
        try:
            self.queue.put_nowait(event)
        except asyncio.QueueFull:
            # Drop oldest non-critical events
            if event.get("type") in ("token", "status"):
                try:
                    self.queue.get_nowait()  # Remove oldest
                    self.queue.put_nowait(event)
                    self.dropped_count += 1
                except asyncio.QueueEmpty:
                    pass
            else:
                # Critical events (tool_result, finish) always wait
                await self.queue.put(event)

    async def consume(self):
        while True:
            event = await self.queue.get()
            yield event
            if event.get("type") == "finish":
                break

Client-Side Flow Control

On the client, implement flow control by pausing the stream reader when the render queue is too large and resuming when it drains.

class FlowControlledReader {
  constructor(reader, renderer, highWaterMark = 50) {
    this.reader = reader;
    this.renderer = renderer;
    this.highWaterMark = highWaterMark;
    this.pendingRenders = 0;
  }

  async start() {
    const decoder = new TextDecoder();
    while (true) {
      // Pause if too many pending renders
      while (this.pendingRenders > this.highWaterMark) {
        await new Promise((r) => setTimeout(r, 10));
      }

      const { done, value } = await this.reader.read();
      if (done) break;

      this.pendingRenders++;
      requestAnimationFrame(() => {
        this.renderer.appendToken(decoder.decode(value));
        this.pendingRenders--;
      });
    }
  }
}

Production Streaming Considerations

Nginx and reverse proxy configuration: Most reverse proxies buffer responses by default, which defeats streaming. Disable response buffering with X-Accel-Buffering: no for Nginx, or configure proxy_buffering off at the location level.

Connection timeouts: Streaming connections can be long-lived. Configure your load balancer and reverse proxy timeouts to accommodate the longest expected agent response time, plus a margin. A typical setting is 120-300 seconds.

Error recovery mid-stream: If the LLM API connection drops mid-response, the client should see an error indicator and have the option to retry. Include stream IDs so the client can request resumption from a specific point.

Monitoring: Track time-to-first-token, total stream duration, tokens-per-second throughput, client disconnection rates, and backpressure events. These metrics reveal both LLM performance issues and client-side rendering bottlenecks.

Frequently Asked Questions

What is the time-to-first-token for streaming LLM responses?

Time-to-first-token (TTFT) varies by model and provider. GPT-4o typically achieves 200-500ms TTFT, Claude averages 300-700ms, and Gemini is typically 200-400ms. These times can increase under load. TTFT is the most important latency metric for user experience because it determines how quickly the user sees the agent begin responding.

Should I use SSE or WebSocket for a chat-based agent?

For most chat agents, SSE is the better choice. It is simpler to implement, works reliably through proxies and load balancers without special configuration, handles reconnection automatically, and provides everything you need for server-to-client streaming. Use WebSocket only if you need bidirectional communication — for example, if the user can interrupt the agent mid-response or if you are building a voice interface.

How do you handle streaming when the agent makes tool calls mid-response?

The stream pauses text tokens while the agent executes a tool call. Send a tool_call_start event so the client can show a loading indicator, then send the tool_result event when the tool completes, followed by resuming text token streaming as the agent incorporates the tool result into its response. The client renders this as: partial text, then a tool call indicator, then more text.

What happens if the client disconnects during a stream?

On the server side, detect client disconnection through write failures or connection close events. When detected, cancel the ongoing LLM API call to stop incurring costs and free server resources. Clean up any session state. If the user reconnects and sends the same request, generate a fresh response rather than trying to resume the interrupted stream.

How do you test streaming agent endpoints?

Test at multiple levels: unit test the stream generation logic by collecting all emitted events into a list and asserting their types and order, integration test the HTTP endpoint by reading the SSE stream and validating event format, and load test with many concurrent streaming connections to verify backpressure handling and server stability under load. Tools like k6 and Artillery support SSE and WebSocket load testing.

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.