Python Generators and Iterators for Streaming AI Responses
Master Python generators and async iterators for building efficient streaming AI response pipelines with memory-efficient processing, backpressure handling, and real-time output.
Why Streaming Matters for AI Applications
When an LLM generates a 2,000-token response, waiting for the full completion before showing anything creates a poor user experience. Streaming sends tokens as they are generated, reducing perceived latency from seconds to milliseconds. Python generators are the natural abstraction for this pattern — they produce values lazily, one at a time, without holding the entire response in memory.
Beyond user experience, generators enable memory-efficient processing of large datasets for embeddings, batch inference, and document chunking — all critical operations in AI pipelines.
Generator Basics for Token Streaming
A generator function uses yield instead of return. Each yield pauses the function and produces a value. The function resumes from where it paused on the next iteration.
from typing import Generator
def stream_tokens(text: str, chunk_size: int = 4) -> Generator[str, None, None]:
"""Simulate token streaming by yielding chunks of text."""
for i in range(0, len(text), chunk_size):
yield text[i:i + chunk_size]
# Lazy evaluation - only one chunk in memory at a time
for token in stream_tokens("The agent analyzed the document carefully."):
print(token, end="", flush=True)
Async Generators for Real LLM Streaming
Real LLM APIs stream over HTTP using server-sent events. Async generators handle this naturally.
import httpx
import json
from typing import AsyncGenerator
async def stream_chat_completion(
messages: list[dict],
model: str = "gpt-4o",
api_key: str = "",
) -> AsyncGenerator[str, None]:
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}"},
json={"model": model, "messages": messages, "stream": True},
timeout=60.0,
) as response:
async for line in response.aiter_lines():
if line.startswith("data: ") and line != "data: [DONE]":
chunk = json.loads(line[6:])
delta = chunk["choices"][0].get("delta", {})
if content := delta.get("content"):
yield content
# Usage
async def main():
messages = [{"role": "user", "content": "Explain agentic AI"}]
full_response = []
async for token in stream_chat_completion(messages, api_key="sk-..."):
print(token, end="", flush=True)
full_response.append(token)
complete = "".join(full_response)
Pipeline Composition with Generators
Generators compose into processing pipelines where each stage transforms the stream without buffering everything.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from typing import AsyncGenerator
async def chunk_by_sentence(
token_stream: AsyncGenerator[str, None]
) -> AsyncGenerator[str, None]:
buffer = ""
async for token in token_stream:
buffer += token
while ". " in buffer:
sentence, buffer = buffer.split(". ", 1)
yield sentence.strip() + "."
async def add_citations(
sentence_stream: AsyncGenerator[str, None],
knowledge_base: dict,
) -> AsyncGenerator[str, None]:
async for sentence in sentence_stream:
# Check if sentence needs a citation
for keyword, citation in knowledge_base.items():
if keyword.lower() in sentence.lower():
sentence += f" [{citation}]"
break
yield sentence
# Compose the pipeline
async def enriched_stream(messages, api_key, kb):
raw_tokens = stream_chat_completion(messages, api_key=api_key)
sentences = chunk_by_sentence(raw_tokens)
enriched = add_citations(sentences, kb)
async for sentence in enriched:
yield sentence
Memory-Efficient Batch Processing
When generating embeddings for thousands of documents, generators prevent loading everything into memory at once.
from typing import Generator
from pathlib import Path
def read_documents(directory: Path) -> Generator[str, None, None]:
for file_path in directory.glob("*.txt"):
yield file_path.read_text()
def batch_items(items: Generator, batch_size: int = 32) -> Generator[list, None, None]:
batch = []
for item in items:
batch.append(item)
if len(batch) == batch_size:
yield batch
batch = []
if batch:
yield batch
async def embed_directory(directory: Path):
documents = read_documents(directory)
for batch in batch_items(documents, batch_size=32):
embeddings = await embedding_api.embed(batch)
await vector_db.upsert(embeddings)
# Only 32 documents in memory at any time
Generator-Based Streaming in FastAPI
FastAPI supports StreamingResponse with generators for real-time AI output.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
async def generate():
async for token in stream_chat_completion(
request.messages, api_key=settings.api_key
):
yield f"data: {json.dumps({'token': token})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
FAQ
What is the difference between a generator and an iterator in Python?
Every generator is an iterator, but not every iterator is a generator. An iterator is any object with __iter__ and __next__ methods. A generator is a function that uses yield and automatically creates an iterator. Generators are more concise and handle state management automatically.
How do I handle errors in a streaming pipeline without losing partial results?
Wrap the consumer loop in a try/except and process whatever has been yielded so far. You can also use generator.throw() to inject exceptions into a running generator from the outside, allowing it to handle errors and potentially recover or yield a fallback value.
Can generators cause memory leaks if not fully consumed?
Yes. If you break out of a generator loop early, the generator object stays in memory with its full stack frame until garbage collected. Call generator.close() explicitly or use it inside a with statement via contextlib.closing to ensure cleanup.
#Python #Generators #Streaming #AsyncProgramming #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.