Skip to content
Learn Agentic AI13 min read0 views

Streaming with Async Generators: Building Real-Time AI Response Pipelines

Build real-time streaming AI pipelines using Python async generators. Learn yield patterns, consumer chains, and backpressure for delivering LLM tokens to users instantly.

Why Streaming Matters for AI Agents

Users perceive LLM responses as faster when tokens arrive incrementally. A 3-second response that streams token-by-token from 200ms feels instant. The same response delivered as a single block after 3 seconds feels slow. Streaming also enables real-time processing pipelines where downstream steps begin working before the LLM finishes generating.

Python async generators provide a natural abstraction for streaming data through processing stages. Each stage yields results as they become available, creating a pipeline where data flows continuously rather than in batch.

Async Generator Fundamentals

An async generator is a function that uses both async def and yield. It produces values lazily and asynchronously.

import asyncio

async def stream_llm_tokens(prompt: str):
    """Simulate streaming LLM response token by token."""
    tokens = [
        "Async", " generators", " enable", " real-time",
        " streaming", " of", " LLM", " responses",
        " to", " users.", ""
    ]
    for token in tokens:
        await asyncio.sleep(0.1)  # Simulate token generation delay
        yield token

async def main():
    async for token in stream_llm_tokens("Explain streaming"):
        print(token, end="", flush=True)
    print()  # Newline at the end

asyncio.run(main())

Each yield pauses the generator and delivers a value to the consumer. The generator resumes when the consumer requests the next value via async for.

Streaming from Real LLM APIs

Most LLM APIs support server-sent events (SSE) for streaming. Here is how to consume them with httpx.

import httpx
import json

async def stream_openai_response(
    client: httpx.AsyncClient,
    messages: list[dict],
    model: str = "gpt-4o",
):
    """Stream tokens from OpenAI's API."""
    async with client.stream(
        "POST",
        "https://api.openai.com/v1/chat/completions",
        json={
            "model": model,
            "messages": messages,
            "stream": True,
        },
    ) as response:
        response.raise_for_status()
        async for line in response.aiter_lines():
            if not line.startswith("data: "):
                continue
            data = line[6:]  # Strip "data: " prefix
            if data == "[DONE]":
                break
            chunk = json.loads(data)
            delta = chunk["choices"][0].get("delta", {})
            content = delta.get("content", "")
            if content:
                yield content

Building Processing Chains

The real power of async generators emerges when you chain them. Each stage transforms the stream and yields results to the next stage.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

async def stream_tokens(prompt: str):
    """Stage 1: Generate raw tokens from LLM."""
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
    ) as client:
        async for token in stream_openai_response(
            client,
            [{"role": "user", "content": prompt}],
        ):
            yield token

async def buffer_sentences(token_stream):
    """Stage 2: Buffer tokens into complete sentences."""
    buffer = ""
    async for token in token_stream:
        buffer += token
        # Yield complete sentences
        while ". " in buffer or buffer.endswith("."):
            if ". " in buffer:
                sentence, buffer = buffer.split(". ", 1)
                yield sentence.strip() + "."
            elif buffer.endswith("."):
                yield buffer.strip()
                buffer = ""
                break
    if buffer.strip():
        yield buffer.strip()

async def add_metadata(sentence_stream):
    """Stage 3: Enrich sentences with metadata."""
    index = 0
    async for sentence in sentence_stream:
        yield {
            "index": index,
            "text": sentence,
            "word_count": len(sentence.split()),
            "timestamp": time.monotonic(),
        }
        index += 1

async def main():
    # Chain the pipeline stages
    tokens = stream_tokens("Explain three benefits of async Python")
    sentences = buffer_sentences(tokens)
    enriched = add_metadata(sentences)

    async for item in enriched:
        print(f"[{item['index']}] ({item['word_count']} words) "
              f"{item['text']}")

asyncio.run(main())

FastAPI Streaming Response

Integrate async generators directly with FastAPI's StreamingResponse for real-time delivery to clients.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def generate_sse_stream(prompt: str):
    """Generate Server-Sent Events from LLM stream."""
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
    ) as client:
        async for token in stream_openai_response(
            client,
            [{"role": "user", "content": prompt}],
        ):
            yield f"data: {json.dumps({'token': token})}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/api/chat/stream")
async def chat_stream(prompt: str):
    return StreamingResponse(
        generate_sse_stream(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )

Backpressure in Streaming Pipelines

When a consumer is slower than a producer, tokens accumulate in memory. Use an asyncio.Queue to bound the buffer.

async def bounded_stream(
    source,
    max_buffer: int = 50,
):
    """Apply backpressure to a stream with a bounded buffer."""
    queue: asyncio.Queue = asyncio.Queue(maxsize=max_buffer)

    async def producer():
        async for item in source:
            await queue.put(item)  # Blocks when queue is full
        await queue.put(None)  # Sentinel

    producer_task = asyncio.create_task(producer())

    while True:
        item = await queue.get()
        if item is None:
            break
        yield item

    await producer_task

Multiplexing Multiple Streams

An agent might need to stream from multiple sources and merge the results.

async def merge_streams(*streams):
    """Merge multiple async streams into one, yielding as available."""
    queue: asyncio.Queue = asyncio.Queue()
    active = len(streams)

    async def feed(stream, source_id):
        nonlocal active
        async for item in stream:
            await queue.put((source_id, item))
        active -= 1
        if active == 0:
            await queue.put(None)

    tasks = [
        asyncio.create_task(feed(stream, i))
        for i, stream in enumerate(streams)
    ]

    while True:
        item = await queue.get()
        if item is None:
            break
        yield item

    for task in tasks:
        await task

FAQ

How do I handle errors in the middle of a streaming response?

Once you have started sending a streaming HTTP response, you cannot change the status code. The standard pattern is to send an error event in the SSE stream: yield f"data: {json.dumps({'error': str(e)})}\n\n". The client-side JavaScript should watch for error events and handle them appropriately. For critical errors, close the stream after sending the error event.

What is the memory overhead of async generators compared to collecting all results into a list?

Async generators use constant memory regardless of the total data volume — they hold only the current yielded value. A list holds every item simultaneously. For streaming 10,000 LLM tokens, a generator uses memory for one token at a time while a list stores all 10,000. This makes generators essential for long-running or high-volume streams.

Can I replay or tee an async generator to send data to multiple consumers?

Async generators are single-use — once consumed, the data is gone. To send the same stream to multiple consumers, use an asyncio.Queue per consumer and a producer task that reads from the generator once and puts each item into all queues. Libraries like aiostream provide a stream.tee() utility for this pattern.


#Python #Streaming #AsyncGenerators #RealTime #AIAgents #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.