Skip to content
Learn Agentic AI10 min read1 views

Server-Sent Events for Agent Streaming: Pushing Token-by-Token Responses to Clients

Implement Server-Sent Events (SSE) to stream AI agent responses token by token to browser clients using FastAPI StreamingResponse, EventSource API, and proper reconnection handling.

Why SSE for Agent Streaming

When a user sends a message to an AI agent, waiting 10-30 seconds for a complete response creates a terrible experience. Streaming tokens as they are generated makes the agent feel responsive and intelligent. Server-Sent Events (SSE) is the simplest protocol for this — it is HTTP-based, works through proxies and firewalls, auto-reconnects on failure, and requires zero client-side libraries.

Unlike WebSockets, SSE is unidirectional: the server pushes events to the client. This is a perfect fit for the most common agent pattern — the user sends a message (via a regular POST), and the server streams back the response token by token.

The SSE Protocol

SSE follows a simple text format. Each event is a block of lines separated by a blank line:

event: token
data: {"content": "Hello"}

event: token
data: {"content": " world"}

event: done
data: {"session_id": "abc-123", "total_tokens": 47}

The event: field names the event type. The data: field contains the payload. Clients parse these automatically with the browser EventSource API.

FastAPI Streaming Endpoint

Use StreamingResponse with an async generator to produce SSE events:

# app/routes/stream.py
from fastapi import APIRouter, Request
from fastapi.responses import StreamingResponse
from agents import Agent, Runner
import json

router = APIRouter(prefix="/api/v1/agent", tags=["Streaming"])

agent = Agent(
    name="assistant",
    instructions="You are a helpful assistant.",
    model="gpt-4o",
)

async def event_generator(message: str):
    """Yield SSE-formatted events as the agent generates tokens."""
    result = Runner.run_streamed(agent, message)

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            delta = event.data
            if hasattr(delta, "delta") and hasattr(delta.delta, "content"):
                text = delta.delta.content
                if text:
                    yield f"event: token\ndata: {json.dumps({'content': text})}\n\n"

    yield f"event: done\ndata: {json.dumps({'content': result.final_output})}\n\n"

@router.post("/stream")
async def stream_agent(request: Request):
    body = await request.json()
    message = body.get("message", "")

    return StreamingResponse(
        event_generator(message),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        },
    )

The X-Accel-Buffering: no header is critical when running behind nginx or similar reverse proxies — without it, the proxy buffers the entire response and delivers it at once, defeating the purpose of streaming.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Client-Side with EventSource

The browser EventSource API handles SSE natively, but it only supports GET requests. For POST-based streaming, use the fetch API with a stream reader:

# Using fetch for POST-based SSE (JavaScript)
# Note: This shows the client-side approach in pseudocode
#
# async function streamAgent(message) {
#   const response = await fetch("/api/v1/agent/stream", {
#     method: "POST",
#     headers: {"Content-Type": "application/json"},
#     body: JSON.stringify({message}),
#   });
#   const reader = response.body.getReader();
#   const decoder = new TextDecoder();
#   while (true) {
#     const {done, value} = await reader.read();
#     if (done) break;
#     const text = decoder.decode(value);
#     parseSSEChunks(text);
#   }
# }

Handling Backpressure

If the client reads slower than the server produces tokens, you need backpressure handling to avoid unbounded memory growth:

import asyncio

async def event_generator_with_backpressure(message: str):
    queue: asyncio.Queue = asyncio.Queue(maxsize=100)
    done = asyncio.Event()

    async def producer():
        result = Runner.run_streamed(agent, message)
        async for event in result.stream_events():
            if event.type == "raw_response_event":
                delta = event.data
                if hasattr(delta, "delta") and hasattr(delta.delta, "content"):
                    text = delta.delta.content
                    if text:
                        await queue.put(text)  # Blocks when queue is full
        done.set()
        await queue.put(None)  # Sentinel

    asyncio.create_task(producer())

    while True:
        token = await queue.get()
        if token is None:
            break
        yield f"event: token\ndata: {json.dumps({'content': token})}\n\n"

    yield f"event: done\ndata: {json.dumps({'status': 'complete'})}\n\n"

Adding Reconnection Support

SSE has built-in reconnection. Use the id: field so clients can resume from where they left off:

token_index = 0

async def event_generator_with_ids(message: str, last_id: int = 0):
    token_index = 0
    result = Runner.run_streamed(agent, message)

    async for event in result.stream_events():
        if event.type == "raw_response_event":
            delta = event.data
            if hasattr(delta, "delta") and hasattr(delta.delta, "content"):
                text = delta.delta.content
                if text:
                    token_index += 1
                    if token_index <= last_id:
                        continue  # Skip already-sent tokens
                    yield f"id: {token_index}\nevent: token\ndata: {json.dumps({'content': text})}\n\n"

    yield f"event: done\ndata: {json.dumps({'status': 'complete'})}\n\n"

When the connection drops, the browser EventSource sends a Last-Event-ID header on reconnect. Your endpoint reads this header and skips already-delivered tokens.

FAQ

When should I use SSE instead of WebSockets for AI agents?

Use SSE when the communication is primarily server-to-client — the most common agent pattern where the user sends a message and receives a streamed response. SSE is simpler to implement, works through all HTTP proxies, and auto-reconnects natively. Use WebSockets when you need true bidirectional communication, such as allowing users to interrupt or redirect the agent mid-generation.

How do I handle SSE through a load balancer?

Most load balancers support SSE out of the box since it is standard HTTP. Disable response buffering in your reverse proxy (nginx: proxy_buffering off;, AWS ALB: works by default). Set idle timeout on the load balancer higher than your maximum response time. Use sticky sessions if your agent maintains in-memory state across requests.

What is the maximum number of concurrent SSE connections a browser supports?

Browsers limit concurrent SSE connections to 6 per domain when using HTTP/1.1. With HTTP/2 the limit increases to 100 or more. If your application needs many simultaneous streams, use HTTP/2 or multiplex multiple agent streams over a single SSE connection with event type prefixes.


#SSE #Streaming #AIAgents #FastAPI #Frontend #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.