Streaming with Async Generators: Building Real-Time AI Response Pipelines
Build real-time streaming AI pipelines using Python async generators. Learn yield patterns, consumer chains, and backpressure for delivering LLM tokens to users instantly.
Why Streaming Matters for AI Agents
Users perceive LLM responses as faster when tokens arrive incrementally. A 3-second response that streams token-by-token from 200ms feels instant. The same response delivered as a single block after 3 seconds feels slow. Streaming also enables real-time processing pipelines where downstream steps begin working before the LLM finishes generating.
Python async generators provide a natural abstraction for streaming data through processing stages. Each stage yields results as they become available, creating a pipeline where data flows continuously rather than in batch.
Async Generator Fundamentals
An async generator is a function that uses both async def and yield. It produces values lazily and asynchronously.
import asyncio
async def stream_llm_tokens(prompt: str):
"""Simulate streaming LLM response token by token."""
tokens = [
"Async", " generators", " enable", " real-time",
" streaming", " of", " LLM", " responses",
" to", " users.", ""
]
for token in tokens:
await asyncio.sleep(0.1) # Simulate token generation delay
yield token
async def main():
async for token in stream_llm_tokens("Explain streaming"):
print(token, end="", flush=True)
print() # Newline at the end
asyncio.run(main())
Each yield pauses the generator and delivers a value to the consumer. The generator resumes when the consumer requests the next value via async for.
Streaming from Real LLM APIs
Most LLM APIs support server-sent events (SSE) for streaming. Here is how to consume them with httpx.
import httpx
import json
async def stream_openai_response(
client: httpx.AsyncClient,
messages: list[dict],
model: str = "gpt-4o",
):
"""Stream tokens from OpenAI's API."""
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
json={
"model": model,
"messages": messages,
"stream": True,
},
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if not line.startswith("data: "):
continue
data = line[6:] # Strip "data: " prefix
if data == "[DONE]":
break
chunk = json.loads(data)
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
Building Processing Chains
The real power of async generators emerges when you chain them. Each stage transforms the stream and yields results to the next stage.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
async def stream_tokens(prompt: str):
"""Stage 1: Generate raw tokens from LLM."""
async with httpx.AsyncClient(
headers={"Authorization": f"Bearer {API_KEY}"},
) as client:
async for token in stream_openai_response(
client,
[{"role": "user", "content": prompt}],
):
yield token
async def buffer_sentences(token_stream):
"""Stage 2: Buffer tokens into complete sentences."""
buffer = ""
async for token in token_stream:
buffer += token
# Yield complete sentences
while ". " in buffer or buffer.endswith("."):
if ". " in buffer:
sentence, buffer = buffer.split(". ", 1)
yield sentence.strip() + "."
elif buffer.endswith("."):
yield buffer.strip()
buffer = ""
break
if buffer.strip():
yield buffer.strip()
async def add_metadata(sentence_stream):
"""Stage 3: Enrich sentences with metadata."""
index = 0
async for sentence in sentence_stream:
yield {
"index": index,
"text": sentence,
"word_count": len(sentence.split()),
"timestamp": time.monotonic(),
}
index += 1
async def main():
# Chain the pipeline stages
tokens = stream_tokens("Explain three benefits of async Python")
sentences = buffer_sentences(tokens)
enriched = add_metadata(sentences)
async for item in enriched:
print(f"[{item['index']}] ({item['word_count']} words) "
f"{item['text']}")
asyncio.run(main())
FastAPI Streaming Response
Integrate async generators directly with FastAPI's StreamingResponse for real-time delivery to clients.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def generate_sse_stream(prompt: str):
"""Generate Server-Sent Events from LLM stream."""
async with httpx.AsyncClient(
headers={"Authorization": f"Bearer {API_KEY}"},
) as client:
async for token in stream_openai_response(
client,
[{"role": "user", "content": prompt}],
):
yield f"data: {json.dumps({'token': token})}\n\n"
yield "data: [DONE]\n\n"
@app.post("/api/chat/stream")
async def chat_stream(prompt: str):
return StreamingResponse(
generate_sse_stream(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
Backpressure in Streaming Pipelines
When a consumer is slower than a producer, tokens accumulate in memory. Use an asyncio.Queue to bound the buffer.
async def bounded_stream(
source,
max_buffer: int = 50,
):
"""Apply backpressure to a stream with a bounded buffer."""
queue: asyncio.Queue = asyncio.Queue(maxsize=max_buffer)
async def producer():
async for item in source:
await queue.put(item) # Blocks when queue is full
await queue.put(None) # Sentinel
producer_task = asyncio.create_task(producer())
while True:
item = await queue.get()
if item is None:
break
yield item
await producer_task
Multiplexing Multiple Streams
An agent might need to stream from multiple sources and merge the results.
async def merge_streams(*streams):
"""Merge multiple async streams into one, yielding as available."""
queue: asyncio.Queue = asyncio.Queue()
active = len(streams)
async def feed(stream, source_id):
nonlocal active
async for item in stream:
await queue.put((source_id, item))
active -= 1
if active == 0:
await queue.put(None)
tasks = [
asyncio.create_task(feed(stream, i))
for i, stream in enumerate(streams)
]
while True:
item = await queue.get()
if item is None:
break
yield item
for task in tasks:
await task
FAQ
How do I handle errors in the middle of a streaming response?
Once you have started sending a streaming HTTP response, you cannot change the status code. The standard pattern is to send an error event in the SSE stream: yield f"data: {json.dumps({'error': str(e)})}\n\n". The client-side JavaScript should watch for error events and handle them appropriately. For critical errors, close the stream after sending the error event.
What is the memory overhead of async generators compared to collecting all results into a list?
Async generators use constant memory regardless of the total data volume — they hold only the current yielded value. A list holds every item simultaneously. For streaming 10,000 LLM tokens, a generator uses memory for one token at a time while a list stores all 10,000. This makes generators essential for long-running or high-volume streams.
Can I replay or tee an async generator to send data to multiple consumers?
Async generators are single-use — once consumed, the data is gone. To send the same stream to multiple consumers, use an asyncio.Queue per consumer and a producer task that reads from the generator once and puts each item into all queues. Libraries like aiostream provide a stream.tee() utility for this pattern.
#Python #Streaming #AsyncGenerators #RealTime #AIAgents #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.