Streaming Responses from OpenAI: Real-Time Token-by-Token Output
Learn how to stream OpenAI responses token-by-token using the Python SDK, implement async streaming for web applications, and display incremental results to users.
Why Streaming Matters
When a model generates a long response, the standard (non-streaming) API makes you wait for the entire completion before returning anything. For a 500-token response, that can mean several seconds of silence before any text appears. Streaming changes this by delivering tokens as they are generated, giving users the familiar "typing" experience seen in ChatGPT.
Streaming is essential for chatbots, real-time UIs, and any application where perceived latency matters.
Basic Synchronous Streaming
Enable streaming by setting stream=True:
from openai import OpenAI
client = OpenAI()
stream = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "Write a short guide to Python decorators."},
],
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
print() # newline after streaming completes
Each chunk is a ChatCompletionChunk object. The delta field contains the incremental content — usually one or a few tokens per chunk. The first chunk often has the role field set, and subsequent chunks contain content.
Async Streaming
For web applications built with FastAPI, Django, or similar frameworks, async streaming is the right approach:
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def stream_response(prompt: str):
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": prompt},
],
stream=True,
)
full_response = ""
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
full_response += delta.content
print(delta.content, end="", flush=True)
print()
return full_response
result = asyncio.run(stream_response("Explain async generators in Python."))
The async client uses async for to iterate over chunks without blocking the event loop, which means your server can handle other requests concurrently during generation.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Building an SSE Endpoint with FastAPI
Server-Sent Events (SSE) are the standard way to push streaming responses to a browser:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI()
async def generate_stream(prompt: str):
stream = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
yield f"data: {delta.content}\n\n"
yield "data: [DONE]\n\n"
@app.get("/stream")
async def stream_endpoint(prompt: str):
return StreamingResponse(
generate_stream(prompt),
media_type="text/event-stream",
)
On the frontend, consume this with the EventSource API or a fetch-based SSE reader.
Collecting the Full Response While Streaming
A common pattern is to display tokens in real-time while also building up the complete response for storage or further processing:
from openai import OpenAI
client = OpenAI()
def stream_and_collect(messages: list[dict]) -> str:
stream = client.chat.completions.create(
model="gpt-4o",
messages=messages,
stream=True,
)
collected_content = []
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
collected_content.append(delta.content)
print(delta.content, end="", flush=True)
print()
return "".join(collected_content)
full_text = stream_and_collect([
{"role": "user", "content": "Summarize the Python GIL in 3 sentences."},
])
# full_text now contains the entire response
Handling Stream Interruptions
Network issues can interrupt a stream mid-response. Wrap your streaming code in proper error handling:
from openai import OpenAI, APIConnectionError, APITimeoutError
client = OpenAI()
def safe_stream(messages: list[dict]) -> str:
try:
stream = client.chat.completions.create(
model="gpt-4o",
messages=messages,
stream=True,
)
parts = []
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
parts.append(delta.content)
return "".join(parts)
except APIConnectionError:
return "Connection lost during streaming. Please retry."
except APITimeoutError:
return "Request timed out. Please retry."
FAQ
Does streaming cost more tokens than non-streaming?
No. Token usage is identical whether you stream or not. The only difference is how the response is delivered to your client.
Can I use streaming with function calling?
Yes. When the model decides to call a function, the tool call arguments are streamed incrementally in the delta.tool_calls field. You accumulate the argument string across chunks and parse it once complete.
How do I know when the stream is finished?
The stream ends when iteration completes. The last chunk will have a finish_reason set on choices[0] (e.g., stop or tool_calls). If you are sending SSE, emit a [DONE] event as a signal to the frontend.
#OpenAI #Streaming #ServerSentEvents #AsyncPython #RealTime #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.