Skip to content
Learn Agentic AI11 min read0 views

Python Generators and Iterators for Streaming AI Responses

Master Python generators and async iterators for building efficient streaming AI response pipelines with memory-efficient processing, backpressure handling, and real-time output.

Why Streaming Matters for AI Applications

When an LLM generates a 2,000-token response, waiting for the full completion before showing anything creates a poor user experience. Streaming sends tokens as they are generated, reducing perceived latency from seconds to milliseconds. Python generators are the natural abstraction for this pattern — they produce values lazily, one at a time, without holding the entire response in memory.

Beyond user experience, generators enable memory-efficient processing of large datasets for embeddings, batch inference, and document chunking — all critical operations in AI pipelines.

Generator Basics for Token Streaming

A generator function uses yield instead of return. Each yield pauses the function and produces a value. The function resumes from where it paused on the next iteration.

from typing import Generator

def stream_tokens(text: str, chunk_size: int = 4) -> Generator[str, None, None]:
    """Simulate token streaming by yielding chunks of text."""
    for i in range(0, len(text), chunk_size):
        yield text[i:i + chunk_size]

# Lazy evaluation - only one chunk in memory at a time
for token in stream_tokens("The agent analyzed the document carefully."):
    print(token, end="", flush=True)

Async Generators for Real LLM Streaming

Real LLM APIs stream over HTTP using server-sent events. Async generators handle this naturally.

import httpx
import json
from typing import AsyncGenerator

async def stream_chat_completion(
    messages: list[dict],
    model: str = "gpt-4o",
    api_key: str = "",
) -> AsyncGenerator[str, None]:
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {api_key}"},
            json={"model": model, "messages": messages, "stream": True},
            timeout=60.0,
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: ") and line != "data: [DONE]":
                    chunk = json.loads(line[6:])
                    delta = chunk["choices"][0].get("delta", {})
                    if content := delta.get("content"):
                        yield content

# Usage
async def main():
    messages = [{"role": "user", "content": "Explain agentic AI"}]
    full_response = []
    async for token in stream_chat_completion(messages, api_key="sk-..."):
        print(token, end="", flush=True)
        full_response.append(token)

    complete = "".join(full_response)

Pipeline Composition with Generators

Generators compose into processing pipelines where each stage transforms the stream without buffering everything.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from typing import AsyncGenerator

async def chunk_by_sentence(
    token_stream: AsyncGenerator[str, None]
) -> AsyncGenerator[str, None]:
    buffer = ""
    async for token in token_stream:
        buffer += token
        while ". " in buffer:
            sentence, buffer = buffer.split(". ", 1)
            yield sentence.strip() + "."

async def add_citations(
    sentence_stream: AsyncGenerator[str, None],
    knowledge_base: dict,
) -> AsyncGenerator[str, None]:
    async for sentence in sentence_stream:
        # Check if sentence needs a citation
        for keyword, citation in knowledge_base.items():
            if keyword.lower() in sentence.lower():
                sentence += f" [{citation}]"
                break
        yield sentence

# Compose the pipeline
async def enriched_stream(messages, api_key, kb):
    raw_tokens = stream_chat_completion(messages, api_key=api_key)
    sentences = chunk_by_sentence(raw_tokens)
    enriched = add_citations(sentences, kb)
    async for sentence in enriched:
        yield sentence

Memory-Efficient Batch Processing

When generating embeddings for thousands of documents, generators prevent loading everything into memory at once.

from typing import Generator
from pathlib import Path

def read_documents(directory: Path) -> Generator[str, None, None]:
    for file_path in directory.glob("*.txt"):
        yield file_path.read_text()

def batch_items(items: Generator, batch_size: int = 32) -> Generator[list, None, None]:
    batch = []
    for item in items:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:
        yield batch

async def embed_directory(directory: Path):
    documents = read_documents(directory)
    for batch in batch_items(documents, batch_size=32):
        embeddings = await embedding_api.embed(batch)
        await vector_db.upsert(embeddings)
        # Only 32 documents in memory at any time

Generator-Based Streaming in FastAPI

FastAPI supports StreamingResponse with generators for real-time AI output.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat/stream")
async def chat_stream(request: ChatRequest):
    async def generate():
        async for token in stream_chat_completion(
            request.messages, api_key=settings.api_key
        ):
            yield f"data: {json.dumps({'token': token})}\n\n"
        yield "data: [DONE]\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

FAQ

What is the difference between a generator and an iterator in Python?

Every generator is an iterator, but not every iterator is a generator. An iterator is any object with __iter__ and __next__ methods. A generator is a function that uses yield and automatically creates an iterator. Generators are more concise and handle state management automatically.

How do I handle errors in a streaming pipeline without losing partial results?

Wrap the consumer loop in a try/except and process whatever has been yielded so far. You can also use generator.throw() to inject exceptions into a running generator from the outside, allowing it to handle errors and potentially recover or yield a fallback value.

Can generators cause memory leaks if not fully consumed?

Yes. If you break out of a generator loop early, the generator object stays in memory with its full stack frame until garbage collected. Call generator.close() explicitly or use it inside a with statement via contextlib.closing to ensure cleanup.


#Python #Generators #Streaming #AsyncProgramming #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.