Skip to content
Learn Agentic AI12 min read0 views

Streaming AI Agent Responses with FastAPI: SSE and StreamingResponse

Implement real-time token-by-token streaming from AI agents using FastAPI's StreamingResponse and Server-Sent Events. Covers async generators, error handling during streams, and JavaScript client integration.

Why Streaming Matters for AI Agents

When an AI agent takes 5 to 15 seconds to generate a complete response, making the user stare at a loading spinner destroys the experience. Streaming sends tokens to the client as they are generated, so the user sees the response forming in real time. This is the same pattern that powers ChatGPT, Claude, and every modern AI chat interface.

FastAPI provides two mechanisms for streaming: StreamingResponse for raw HTTP streaming and Server-Sent Events (SSE) for structured event streams. For AI agent backends, SSE is usually the better choice because it provides built-in reconnection, event typing, and a clean browser API via EventSource.

Basic StreamingResponse with an Async Generator

The simplest streaming approach wraps an async generator that yields chunks from your LLM:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openai

app = FastAPI()

async def generate_stream(prompt: str):
    client = openai.AsyncOpenAI()
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            yield delta.content

@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
    return StreamingResponse(
        generate_stream(request.message),
        media_type="text/plain",
    )

This works, but it has limitations. The client has no structured way to know when the stream ends, whether an error occurred mid-stream, or to distinguish between different types of events like tokens versus tool calls.

Server-Sent Events for Structured Streaming

SSE solves these problems by sending typed, newline-delimited events. Install the sse-starlette package which integrates cleanly with FastAPI:

pip install sse-starlette

Now build a proper SSE endpoint:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import json
from fastapi import APIRouter, Depends
from sse_starlette.sse import EventSourceResponse

router = APIRouter()

async def agent_event_stream(
    message: str,
    session_id: str,
    llm_service: LLMService,
):
    try:
        # Send a start event
        yield {
            "event": "start",
            "data": json.dumps({"session_id": session_id}),
        }

        # Stream LLM tokens
        full_response = ""
        async for token in llm_service.stream_generate(message):
            full_response += token
            yield {
                "event": "token",
                "data": json.dumps({"content": token}),
            }

        # Send completion event with metadata
        yield {
            "event": "done",
            "data": json.dumps({
                "total_tokens": len(full_response.split()),
                "session_id": session_id,
            }),
        }

    except Exception as e:
        yield {
            "event": "error",
            "data": json.dumps({"message": str(e)}),
        }

@router.post("/chat/stream")
async def stream_agent_response(
    request: ChatRequest,
    llm_service: LLMService = Depends(get_llm_service),
):
    return EventSourceResponse(
        agent_event_stream(
            message=request.message,
            session_id=request.session_id,
            llm_service=llm_service,
        )
    )

Each event has a typed event field and a JSON data payload. The client can handle token, done, and error events differently.

Streaming Tool Call Results

AI agents often invoke tools mid-response. You can stream tool execution as separate events so the frontend can render tool status indicators:

async def agent_with_tools_stream(message: str, agent: Agent):
    yield {"event": "start", "data": "{}"}

    async for event in agent.run_stream(message):
        if event.type == "token":
            yield {
                "event": "token",
                "data": json.dumps({"content": event.content}),
            }
        elif event.type == "tool_call":
            yield {
                "event": "tool_call",
                "data": json.dumps({
                    "tool": event.tool_name,
                    "args": event.arguments,
                }),
            }
        elif event.type == "tool_result":
            yield {
                "event": "tool_result",
                "data": json.dumps({
                    "tool": event.tool_name,
                    "result": event.result,
                }),
            }

    yield {"event": "done", "data": "{}"}

JavaScript Client Integration

On the frontend, use the native EventSource API or the fetch API for POST-based SSE:

async function streamChat(message) {
  const response = await fetch("/chat/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ message, session_id: "abc123" }),
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const text = decoder.decode(value);
    const lines = text.split("\n");

    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const data = JSON.parse(line.slice(6));
        appendToChat(data.content);
      }
    }
  }
}

Error Handling in Streams

Errors during streaming require special handling because the HTTP status code has already been sent as 200. You cannot change it mid-stream. Instead, send an error event and close the stream:

async def safe_stream(message: str, llm: LLMService):
    try:
        async for token in llm.stream_generate(message):
            yield {"event": "token", "data": json.dumps({"content": token})}
    except openai.RateLimitError:
        yield {
            "event": "error",
            "data": json.dumps({
                "code": "rate_limited",
                "message": "Too many requests. Please retry.",
                "retry_after": 30,
            }),
        }
    except openai.APIError as e:
        yield {
            "event": "error",
            "data": json.dumps({
                "code": "llm_error",
                "message": "Agent encountered an error.",
            }),
        }

FAQ

Can I use SSE with POST requests?

Standard EventSource in the browser only supports GET requests. For POST-based SSE, use the fetch API with a ReadableStream reader as shown above, or use a library like @microsoft/fetch-event-source which provides an EventSource-like API for POST requests. Most AI chat interfaces use POST because you need to send the conversation history in the request body.

How do I handle client disconnections during streaming?

FastAPI and Starlette detect client disconnections automatically. When the client closes the connection, the async generator receives a GeneratorExit or CancelledError exception. You can catch this to clean up resources. The sse-starlette library also supports a ping parameter that sends periodic keepalive messages to detect dead connections early.

Should I buffer the full response before saving it to the database?

Yes. Accumulate tokens in a string variable as you stream them. After the stream completes successfully, save the full response to your database in the done event handler. Do not write individual tokens to the database as they arrive since that would create excessive database writes for no benefit.


#FastAPI #Streaming #SSE #AIAgents #RealTime #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.