Streaming AI Agent Responses with FastAPI: SSE and StreamingResponse
Implement real-time token-by-token streaming from AI agents using FastAPI's StreamingResponse and Server-Sent Events. Covers async generators, error handling during streams, and JavaScript client integration.
Why Streaming Matters for AI Agents
When an AI agent takes 5 to 15 seconds to generate a complete response, making the user stare at a loading spinner destroys the experience. Streaming sends tokens to the client as they are generated, so the user sees the response forming in real time. This is the same pattern that powers ChatGPT, Claude, and every modern AI chat interface.
FastAPI provides two mechanisms for streaming: StreamingResponse for raw HTTP streaming and Server-Sent Events (SSE) for structured event streams. For AI agent backends, SSE is usually the better choice because it provides built-in reconnection, event typing, and a clean browser API via EventSource.
Basic StreamingResponse with an Async Generator
The simplest streaming approach wraps an async generator that yields chunks from your LLM:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import openai
app = FastAPI()
async def generate_stream(prompt: str):
client = openai.AsyncOpenAI()
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
yield delta.content
@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
return StreamingResponse(
generate_stream(request.message),
media_type="text/plain",
)
This works, but it has limitations. The client has no structured way to know when the stream ends, whether an error occurred mid-stream, or to distinguish between different types of events like tokens versus tool calls.
Server-Sent Events for Structured Streaming
SSE solves these problems by sending typed, newline-delimited events. Install the sse-starlette package which integrates cleanly with FastAPI:
pip install sse-starlette
Now build a proper SSE endpoint:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import json
from fastapi import APIRouter, Depends
from sse_starlette.sse import EventSourceResponse
router = APIRouter()
async def agent_event_stream(
message: str,
session_id: str,
llm_service: LLMService,
):
try:
# Send a start event
yield {
"event": "start",
"data": json.dumps({"session_id": session_id}),
}
# Stream LLM tokens
full_response = ""
async for token in llm_service.stream_generate(message):
full_response += token
yield {
"event": "token",
"data": json.dumps({"content": token}),
}
# Send completion event with metadata
yield {
"event": "done",
"data": json.dumps({
"total_tokens": len(full_response.split()),
"session_id": session_id,
}),
}
except Exception as e:
yield {
"event": "error",
"data": json.dumps({"message": str(e)}),
}
@router.post("/chat/stream")
async def stream_agent_response(
request: ChatRequest,
llm_service: LLMService = Depends(get_llm_service),
):
return EventSourceResponse(
agent_event_stream(
message=request.message,
session_id=request.session_id,
llm_service=llm_service,
)
)
Each event has a typed event field and a JSON data payload. The client can handle token, done, and error events differently.
Streaming Tool Call Results
AI agents often invoke tools mid-response. You can stream tool execution as separate events so the frontend can render tool status indicators:
async def agent_with_tools_stream(message: str, agent: Agent):
yield {"event": "start", "data": "{}"}
async for event in agent.run_stream(message):
if event.type == "token":
yield {
"event": "token",
"data": json.dumps({"content": event.content}),
}
elif event.type == "tool_call":
yield {
"event": "tool_call",
"data": json.dumps({
"tool": event.tool_name,
"args": event.arguments,
}),
}
elif event.type == "tool_result":
yield {
"event": "tool_result",
"data": json.dumps({
"tool": event.tool_name,
"result": event.result,
}),
}
yield {"event": "done", "data": "{}"}
JavaScript Client Integration
On the frontend, use the native EventSource API or the fetch API for POST-based SSE:
async function streamChat(message) {
const response = await fetch("/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message, session_id: "abc123" }),
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = JSON.parse(line.slice(6));
appendToChat(data.content);
}
}
}
}
Error Handling in Streams
Errors during streaming require special handling because the HTTP status code has already been sent as 200. You cannot change it mid-stream. Instead, send an error event and close the stream:
async def safe_stream(message: str, llm: LLMService):
try:
async for token in llm.stream_generate(message):
yield {"event": "token", "data": json.dumps({"content": token})}
except openai.RateLimitError:
yield {
"event": "error",
"data": json.dumps({
"code": "rate_limited",
"message": "Too many requests. Please retry.",
"retry_after": 30,
}),
}
except openai.APIError as e:
yield {
"event": "error",
"data": json.dumps({
"code": "llm_error",
"message": "Agent encountered an error.",
}),
}
FAQ
Can I use SSE with POST requests?
Standard EventSource in the browser only supports GET requests. For POST-based SSE, use the fetch API with a ReadableStream reader as shown above, or use a library like @microsoft/fetch-event-source which provides an EventSource-like API for POST requests. Most AI chat interfaces use POST because you need to send the conversation history in the request body.
How do I handle client disconnections during streaming?
FastAPI and Starlette detect client disconnections automatically. When the client closes the connection, the async generator receives a GeneratorExit or CancelledError exception. You can catch this to clean up resources. The sse-starlette library also supports a ping parameter that sends periodic keepalive messages to detect dead connections early.
Should I buffer the full response before saving it to the database?
Yes. Accumulate tokens in a string variable as you stream them. After the stream completes successfully, save the full response to your database in the done event handler. Do not write individual tokens to the database as they arrive since that would create excessive database writes for no benefit.
#FastAPI #Streaming #SSE #AIAgents #RealTime #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.