Skip to content
Learn Agentic AI
Learn Agentic AI13 min read0 views

Concurrent LLM Calls with asyncio.gather: Processing Multiple Prompts in Parallel

Learn how to make parallel LLM API calls using asyncio.gather with proper error handling, rate limiting, and result ordering for production AI agent systems.

The Case for Parallel LLM Calls

Most AI agent workflows involve multiple LLM calls: extracting entities, summarizing documents, classifying intent, generating responses. When these calls are independent, running them sequentially wastes massive amounts of time. A typical LLM API call takes 500ms to 3 seconds. Five sequential calls means 5-15 seconds of wall-clock time. Running them in parallel brings that down to the duration of the single slowest call.

asyncio.gather() is the primary tool for this pattern. It takes multiple coroutines, schedules them concurrently, and returns their results in the original order.

Basic Parallel LLM Calls

import asyncio
import httpx
import time

API_URL = "https://api.openai.com/v1/chat/completions"

async def call_openai(
    client: httpx.AsyncClient,
    prompt: str,
    model: str = "gpt-4o",
) -> str:
    """Make a single LLM API call."""
    response = await client.post(
        API_URL,
        json={
            "model": model,
            "messages": [{"role": "user", "content": prompt}],
            "max_tokens": 256,
        },
    )
    response.raise_for_status()
    return response.json()["choices"][0]["message"]["content"]

async def parallel_llm_calls(prompts: list[str]) -> list[str]:
    """Process multiple prompts concurrently."""
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=30.0,
    ) as client:
        results = await asyncio.gather(
            *[call_openai(client, prompt) for prompt in prompts]
        )
    return results

async def main():
    prompts = [
        "Summarize the key benefits of microservices architecture.",
        "List 5 common Python antipatterns.",
        "Explain the CAP theorem in 3 sentences.",
        "What is the difference between OLTP and OLAP?",
    ]
    start = time.monotonic()
    results = await parallel_llm_calls(prompts)
    elapsed = time.monotonic() - start

    for prompt, result in zip(prompts, results):
        print(f"Q: {prompt[:50]}...")
        print(f"A: {result[:100]}...\n")
    print(f"Total time: {elapsed:.2f}s for {len(prompts)} calls")

asyncio.run(main())

Notice we share a single httpx.AsyncClient across all calls. This reuses the underlying TCP connection pool, avoiding the overhead of establishing new connections for each request.

flowchart TD
    START["Concurrent LLM Calls with asyncio.gather: Process…"] --> A
    A["The Case for Parallel LLM Calls"]
    A --> B
    B["Basic Parallel LLM Calls"]
    B --> C
    C["Error Handling with return_exceptions"]
    C --> D
    D["Chunked Processing with Rate Limiting"]
    D --> E
    E["Retry Logic for Failed Calls"]
    E --> F
    F["Result Ordering Guarantee"]
    F --> G
    G["FAQ"]
    G --> DONE["Key Takeaways"]
    style START fill:#4f46e5,stroke:#4338ca,color:#fff
    style DONE fill:#059669,stroke:#047857,color:#fff

Error Handling with return_exceptions

By default, asyncio.gather() cancels all remaining tasks when one raises an exception. Use return_exceptions=True to collect errors alongside successes.

async def safe_parallel_calls(
    client: httpx.AsyncClient,
    prompts: list[str],
) -> list[str | Exception]:
    """Process prompts in parallel, capturing errors per-prompt."""
    results = await asyncio.gather(
        *[call_openai(client, p) for p in prompts],
        return_exceptions=True,
    )

    processed = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Prompt {i} failed: {result}")
            processed.append(f"[ERROR] {type(result).__name__}")
        else:
            processed.append(result)
    return processed

This pattern is critical for production agents. You do not want one failed API call to discard the successful results of the other four calls in a batch.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Chunked Processing with Rate Limiting

LLM APIs enforce rate limits. Sending 100 requests simultaneously will trigger 429 errors. Process prompts in chunks to stay within limits.

async def chunked_parallel_calls(
    prompts: list[str],
    chunk_size: int = 5,
    delay_between_chunks: float = 1.0,
) -> list[str]:
    """Process prompts in rate-limited chunks."""
    all_results: list[str] = []

    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=30.0,
    ) as client:
        for i in range(0, len(prompts), chunk_size):
            chunk = prompts[i : i + chunk_size]
            print(f"Processing chunk {i // chunk_size + 1} "
                  f"({len(chunk)} prompts)")

            results = await asyncio.gather(
                *[call_openai(client, p) for p in chunk],
                return_exceptions=True,
            )
            all_results.extend(results)

            # Rate limit: wait between chunks
            if i + chunk_size < len(prompts):
                await asyncio.sleep(delay_between_chunks)

    return all_results

Retry Logic for Failed Calls

Individual calls may fail due to transient errors. Wrap each call with retry logic.

async def call_with_retry(
    client: httpx.AsyncClient,
    prompt: str,
    max_retries: int = 3,
    base_delay: float = 1.0,
) -> str:
    """Call LLM with exponential backoff retry."""
    for attempt in range(max_retries):
        try:
            return await call_openai(client, prompt)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                retry_after = float(
                    e.response.headers.get("retry-after", base_delay)
                )
                wait = retry_after * (2 ** attempt)
                print(f"Rate limited. Retrying in {wait:.1f}s...")
                await asyncio.sleep(wait)
            elif e.response.status_code >= 500:
                wait = base_delay * (2 ** attempt)
                await asyncio.sleep(wait)
            else:
                raise
        except httpx.TimeoutException:
            wait = base_delay * (2 ** attempt)
            await asyncio.sleep(wait)

    raise RuntimeError(f"Failed after {max_retries} retries: {prompt[:50]}")

Result Ordering Guarantee

A key property of asyncio.gather() is that results are returned in the same order as the input coroutines, regardless of completion order. This means you can safely zip results back to their original prompts without any additional tracking.

async def analyze_documents(docs: list[str]) -> list[dict]:
    """Analyze multiple documents with ordered results."""
    tasks = [
        call_openai(client, f"Analyze this document: {doc}")
        for doc in docs
    ]
    analyses = await asyncio.gather(*tasks)

    # Results are guaranteed to match input order
    return [
        {"document": doc, "analysis": analysis}
        for doc, analysis in zip(docs, analyses)
    ]

FAQ

What happens if one call in asyncio.gather takes much longer than the others?

All results are returned only when every coroutine completes. If one call takes 10 seconds while others take 1 second, you wait the full 10 seconds. To avoid this, wrap slow calls with asyncio.wait_for(coroutine, timeout=5.0) to enforce per-call timeouts, or use asyncio.as_completed() to process results as they arrive.

Should I create a new httpx.AsyncClient per call or share one?

Always share a single client across calls. httpx.AsyncClient maintains a connection pool internally, so reusing it avoids TCP handshake overhead and reduces latency. Create one client at the start of your batch and pass it to all coroutines.

How do I handle different models or parameters for each parallel call?

Pass different parameters to each coroutine in the gather call. Since each coroutine is independent, you can mix models, temperatures, and token limits freely: asyncio.gather(call_openai(client, p1, model="gpt-4o"), call_openai(client, p2, model="gpt-4o-mini")).


#Python #Asyncio #LLMAPI #ParallelProcessing #AIAgents #AgenticAI #LearnAI #AIEngineering

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.