Skip to content
Learn Agentic AI11 min read0 views

Streaming Responses from OpenAI: Real-Time Token-by-Token Output

Learn how to stream OpenAI responses token-by-token using the Python SDK, implement async streaming for web applications, and display incremental results to users.

Why Streaming Matters

When a model generates a long response, the standard (non-streaming) API makes you wait for the entire completion before returning anything. For a 500-token response, that can mean several seconds of silence before any text appears. Streaming changes this by delivering tokens as they are generated, giving users the familiar "typing" experience seen in ChatGPT.

Streaming is essential for chatbots, real-time UIs, and any application where perceived latency matters.

Basic Synchronous Streaming

Enable streaming by setting stream=True:

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": "Write a short guide to Python decorators."},
    ],
    stream=True,
)

for chunk in stream:
    delta = chunk.choices[0].delta
    if delta.content:
        print(delta.content, end="", flush=True)

print()  # newline after streaming completes

Each chunk is a ChatCompletionChunk object. The delta field contains the incremental content — usually one or a few tokens per chunk. The first chunk often has the role field set, and subsequent chunks contain content.

Async Streaming

For web applications built with FastAPI, Django, or similar frameworks, async streaming is the right approach:

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI()

async def stream_response(prompt: str):
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": prompt},
        ],
        stream=True,
    )

    full_response = ""
    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            full_response += delta.content
            print(delta.content, end="", flush=True)

    print()
    return full_response

result = asyncio.run(stream_response("Explain async generators in Python."))

The async client uses async for to iterate over chunks without blocking the event loop, which means your server can handle other requests concurrently during generation.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Building an SSE Endpoint with FastAPI

Server-Sent Events (SSE) are the standard way to push streaming responses to a browser:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

async def generate_stream(prompt: str):
    stream = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )

    async for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            yield f"data: {delta.content}\n\n"

    yield "data: [DONE]\n\n"

@app.get("/stream")
async def stream_endpoint(prompt: str):
    return StreamingResponse(
        generate_stream(prompt),
        media_type="text/event-stream",
    )

On the frontend, consume this with the EventSource API or a fetch-based SSE reader.

Collecting the Full Response While Streaming

A common pattern is to display tokens in real-time while also building up the complete response for storage or further processing:

from openai import OpenAI

client = OpenAI()

def stream_and_collect(messages: list[dict]) -> str:
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        stream=True,
    )

    collected_content = []
    for chunk in stream:
        delta = chunk.choices[0].delta
        if delta.content:
            collected_content.append(delta.content)
            print(delta.content, end="", flush=True)

    print()
    return "".join(collected_content)

full_text = stream_and_collect([
    {"role": "user", "content": "Summarize the Python GIL in 3 sentences."},
])
# full_text now contains the entire response

Handling Stream Interruptions

Network issues can interrupt a stream mid-response. Wrap your streaming code in proper error handling:

from openai import OpenAI, APIConnectionError, APITimeoutError

client = OpenAI()

def safe_stream(messages: list[dict]) -> str:
    try:
        stream = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            stream=True,
        )

        parts = []
        for chunk in stream:
            delta = chunk.choices[0].delta
            if delta.content:
                parts.append(delta.content)

        return "".join(parts)

    except APIConnectionError:
        return "Connection lost during streaming. Please retry."
    except APITimeoutError:
        return "Request timed out. Please retry."

FAQ

Does streaming cost more tokens than non-streaming?

No. Token usage is identical whether you stream or not. The only difference is how the response is delivered to your client.

Can I use streaming with function calling?

Yes. When the model decides to call a function, the tool call arguments are streamed incrementally in the delta.tool_calls field. You accumulate the argument string across chunks and parse it once complete.

How do I know when the stream is finished?

The stream ends when iteration completes. The last chunk will have a finish_reason set on choices[0] (e.g., stop or tool_calls). If you are sending SSE, emit a [DONE] event as a signal to the frontend.


#OpenAI #Streaming #ServerSentEvents #AsyncPython #RealTime #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.