Concurrent LLM Calls with asyncio.gather: Processing Multiple Prompts in Parallel
Learn how to make parallel LLM API calls using asyncio.gather with proper error handling, rate limiting, and result ordering for production AI agent systems.
The Case for Parallel LLM Calls
Most AI agent workflows involve multiple LLM calls: extracting entities, summarizing documents, classifying intent, generating responses. When these calls are independent, running them sequentially wastes massive amounts of time. A typical LLM API call takes 500ms to 3 seconds. Five sequential calls means 5-15 seconds of wall-clock time. Running them in parallel brings that down to the duration of the single slowest call.
asyncio.gather() is the primary tool for this pattern. It takes multiple coroutines, schedules them concurrently, and returns their results in the original order.
Basic Parallel LLM Calls
import asyncio
import httpx
import time
API_URL = "https://api.openai.com/v1/chat/completions"
async def call_openai(
client: httpx.AsyncClient,
prompt: str,
model: str = "gpt-4o",
) -> str:
"""Make a single LLM API call."""
response = await client.post(
API_URL,
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 256,
},
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
async def parallel_llm_calls(prompts: list[str]) -> list[str]:
"""Process multiple prompts concurrently."""
async with httpx.AsyncClient(
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=30.0,
) as client:
results = await asyncio.gather(
*[call_openai(client, prompt) for prompt in prompts]
)
return results
async def main():
prompts = [
"Summarize the key benefits of microservices architecture.",
"List 5 common Python antipatterns.",
"Explain the CAP theorem in 3 sentences.",
"What is the difference between OLTP and OLAP?",
]
start = time.monotonic()
results = await parallel_llm_calls(prompts)
elapsed = time.monotonic() - start
for prompt, result in zip(prompts, results):
print(f"Q: {prompt[:50]}...")
print(f"A: {result[:100]}...\n")
print(f"Total time: {elapsed:.2f}s for {len(prompts)} calls")
asyncio.run(main())
Notice we share a single httpx.AsyncClient across all calls. This reuses the underlying TCP connection pool, avoiding the overhead of establishing new connections for each request.
flowchart TD
START["Concurrent LLM Calls with asyncio.gather: Process…"] --> A
A["The Case for Parallel LLM Calls"]
A --> B
B["Basic Parallel LLM Calls"]
B --> C
C["Error Handling with return_exceptions"]
C --> D
D["Chunked Processing with Rate Limiting"]
D --> E
E["Retry Logic for Failed Calls"]
E --> F
F["Result Ordering Guarantee"]
F --> G
G["FAQ"]
G --> DONE["Key Takeaways"]
style START fill:#4f46e5,stroke:#4338ca,color:#fff
style DONE fill:#059669,stroke:#047857,color:#fff
Error Handling with return_exceptions
By default, asyncio.gather() cancels all remaining tasks when one raises an exception. Use return_exceptions=True to collect errors alongside successes.
async def safe_parallel_calls(
client: httpx.AsyncClient,
prompts: list[str],
) -> list[str | Exception]:
"""Process prompts in parallel, capturing errors per-prompt."""
results = await asyncio.gather(
*[call_openai(client, p) for p in prompts],
return_exceptions=True,
)
processed = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Prompt {i} failed: {result}")
processed.append(f"[ERROR] {type(result).__name__}")
else:
processed.append(result)
return processed
This pattern is critical for production agents. You do not want one failed API call to discard the successful results of the other four calls in a batch.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Chunked Processing with Rate Limiting
LLM APIs enforce rate limits. Sending 100 requests simultaneously will trigger 429 errors. Process prompts in chunks to stay within limits.
async def chunked_parallel_calls(
prompts: list[str],
chunk_size: int = 5,
delay_between_chunks: float = 1.0,
) -> list[str]:
"""Process prompts in rate-limited chunks."""
all_results: list[str] = []
async with httpx.AsyncClient(
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=30.0,
) as client:
for i in range(0, len(prompts), chunk_size):
chunk = prompts[i : i + chunk_size]
print(f"Processing chunk {i // chunk_size + 1} "
f"({len(chunk)} prompts)")
results = await asyncio.gather(
*[call_openai(client, p) for p in chunk],
return_exceptions=True,
)
all_results.extend(results)
# Rate limit: wait between chunks
if i + chunk_size < len(prompts):
await asyncio.sleep(delay_between_chunks)
return all_results
Retry Logic for Failed Calls
Individual calls may fail due to transient errors. Wrap each call with retry logic.
async def call_with_retry(
client: httpx.AsyncClient,
prompt: str,
max_retries: int = 3,
base_delay: float = 1.0,
) -> str:
"""Call LLM with exponential backoff retry."""
for attempt in range(max_retries):
try:
return await call_openai(client, prompt)
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
retry_after = float(
e.response.headers.get("retry-after", base_delay)
)
wait = retry_after * (2 ** attempt)
print(f"Rate limited. Retrying in {wait:.1f}s...")
await asyncio.sleep(wait)
elif e.response.status_code >= 500:
wait = base_delay * (2 ** attempt)
await asyncio.sleep(wait)
else:
raise
except httpx.TimeoutException:
wait = base_delay * (2 ** attempt)
await asyncio.sleep(wait)
raise RuntimeError(f"Failed after {max_retries} retries: {prompt[:50]}")
Result Ordering Guarantee
A key property of asyncio.gather() is that results are returned in the same order as the input coroutines, regardless of completion order. This means you can safely zip results back to their original prompts without any additional tracking.
async def analyze_documents(docs: list[str]) -> list[dict]:
"""Analyze multiple documents with ordered results."""
tasks = [
call_openai(client, f"Analyze this document: {doc}")
for doc in docs
]
analyses = await asyncio.gather(*tasks)
# Results are guaranteed to match input order
return [
{"document": doc, "analysis": analysis}
for doc, analysis in zip(docs, analyses)
]
FAQ
What happens if one call in asyncio.gather takes much longer than the others?
All results are returned only when every coroutine completes. If one call takes 10 seconds while others take 1 second, you wait the full 10 seconds. To avoid this, wrap slow calls with asyncio.wait_for(coroutine, timeout=5.0) to enforce per-call timeouts, or use asyncio.as_completed() to process results as they arrive.
Should I create a new httpx.AsyncClient per call or share one?
Always share a single client across calls. httpx.AsyncClient maintains a connection pool internally, so reusing it avoids TCP handshake overhead and reduces latency. Create one client at the start of your batch and pass it to all coroutines.
How do I handle different models or parameters for each parallel call?
Pass different parameters to each coroutine in the gather call. Since each coroutine is independent, you can mix models, temperatures, and token limits freely: asyncio.gather(call_openai(client, p1, model="gpt-4o"), call_openai(client, p2, model="gpt-4o-mini")).
#Python #Asyncio #LLMAPI #ParallelProcessing #AIAgents #AgenticAI #LearnAI #AIEngineering
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.