Parallel Tool Execution: Running Multiple Tools Simultaneously in Agents
Learn how to execute multiple tool calls in parallel to dramatically speed up AI agent workflows. Covers async execution with asyncio.gather, handling partial failures, result aggregation, and timeout management.
The Serial Execution Bottleneck
When an LLM returns multiple tool calls in a single response, the naive approach executes them one at a time. If the agent calls three APIs that each take 2 seconds, the total wait is 6 seconds. With parallel execution, all three run simultaneously and the total wait is roughly 2 seconds. For agents that frequently call multiple tools per turn, this is a significant performance improvement.
Modern LLMs like GPT-4o and Claude commonly generate multiple tool calls in a single response. Your agent loop needs to handle this efficiently.
Detecting Parallel Tool Calls
The OpenAI API returns multiple tool calls in the tool_calls array of a single message. Each call has its own id, function.name, and function.arguments:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tool_schemas,
)
message = response.choices[0].message
if message.tool_calls:
print(f"LLM requested {len(message.tool_calls)} tool calls")
for tc in message.tool_calls:
print(f" - {tc.function.name}({tc.function.arguments})")
When there are multiple entries in tool_calls, the model is telling you these calls are independent and can run concurrently.
Basic Parallel Execution with asyncio.gather
The core pattern uses asyncio.gather to run all tool calls simultaneously:
import asyncio
import json
async def execute_tool(name: str, arguments: str) -> str:
args = json.loads(arguments)
if name == "search_web":
return await search_web(**args)
elif name == "query_database":
return await query_database(**args)
elif name == "fetch_weather":
return await fetch_weather(**args)
else:
return f"Error: Unknown tool {name}"
async def execute_tools_parallel(tool_calls) -> list[dict]:
tasks = [
execute_tool(tc.function.name, tc.function.arguments)
for tc in tool_calls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
tool_results = []
for tc, result in zip(tool_calls, results):
if isinstance(result, Exception):
content = f"Error: Tool execution failed - {str(result)}"
else:
content = result
tool_results.append({
"role": "tool",
"tool_call_id": tc.id,
"content": content,
})
return tool_results
The return_exceptions=True parameter is critical. Without it, a single failing tool call would cancel all other tasks and raise the exception immediately. With it, exceptions are returned as values in the results list, allowing successful calls to complete.
The Complete Parallel Agent Loop
Here is a full agent loop that handles parallel execution:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from openai import AsyncOpenAI
client = AsyncOpenAI()
async def run_agent(user_message: str, tools: list, system_prompt: str) -> str:
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
]
max_iterations = 10
for _ in range(max_iterations):
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools,
)
message = response.choices[0].message
messages.append(message)
if not message.tool_calls:
return message.content
# Execute all tool calls in parallel
tool_results = await execute_tools_parallel(message.tool_calls)
messages.extend(tool_results)
return "Error: Agent exceeded maximum iterations"
Each iteration either returns a final text response or executes all tool calls in parallel and feeds the results back to the LLM.
Handling Partial Failures
In production, some tool calls succeed while others fail. The LLM needs to know which succeeded and which failed so it can decide whether to retry, use partial results, or ask the user for help:
async def execute_tools_with_status(tool_calls) -> list[dict]:
tasks = []
for tc in tool_calls:
task = asyncio.create_task(
execute_tool(tc.function.name, tc.function.arguments)
)
tasks.append((tc, task))
tool_results = []
for tc, task in tasks:
try:
result = await asyncio.wait_for(task, timeout=30.0)
tool_results.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result,
})
except asyncio.TimeoutError:
tool_results.append({
"role": "tool",
"tool_call_id": tc.id,
"content": f"Error: {tc.function.name} timed out after 30 seconds",
})
except Exception as e:
tool_results.append({
"role": "tool",
"tool_call_id": tc.id,
"content": f"Error: {tc.function.name} failed - {str(e)}",
})
return tool_results
Per-task timeouts ensure one slow tool does not hold up the entire batch. The individual error messages let the LLM reason about what data it has and what is missing.
Semaphore-Based Concurrency Limits
Unlimited parallelism can overwhelm external services. Use a semaphore to cap concurrent executions:
class ParallelExecutor:
def __init__(self, max_concurrent: int = 5):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute_tool_limited(self, name: str, arguments: str) -> str:
async with self.semaphore:
return await execute_tool(name, arguments)
async def execute_batch(self, tool_calls) -> list[dict]:
tasks = [
self.execute_tool_limited(tc.function.name, tc.function.arguments)
for tc in tool_calls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
{
"role": "tool",
"tool_call_id": tc.id,
"content": str(r) if isinstance(r, Exception) else r,
}
for tc, r in zip(tool_calls, results)
]
executor = ParallelExecutor(max_concurrent=5)
A semaphore of 5 means at most 5 tools run simultaneously. If the LLM requests 10 tool calls, the first 5 start immediately and the remaining 5 start as slots become available.
Result Aggregation Patterns
When tools return related data, you may want to aggregate results before passing them back:
async def aggregate_search_results(tool_calls, results) -> list[dict]:
aggregated = []
for tc, result in zip(tool_calls, results):
if isinstance(result, str) and result.startswith("Error"):
aggregated.append({"role": "tool", "tool_call_id": tc.id, "content": result})
continue
summary = f"Results from {tc.function.name}:\n"
try:
data = json.loads(result)
if isinstance(data, list):
summary += f"Found {len(data)} items.\n"
summary += json.dumps(data[:10], indent=2)
else:
summary += json.dumps(data, indent=2)
except json.JSONDecodeError:
summary += result[:2000]
aggregated.append({"role": "tool", "tool_call_id": tc.id, "content": summary})
return aggregated
FAQ
Do all LLMs support parallel tool calls?
Most frontier models do. GPT-4o, GPT-4 Turbo, and Claude 3.5/4 all generate multiple tool calls in a single response when they detect the calls are independent. Older models and some open-source models may only generate one tool call at a time. Your agent loop should handle both cases — the parallel execution code works fine with a single tool call too.
What happens if I return tool results in a different order than the tool calls?
The LLM matches results to calls using the tool_call_id field, not by position. You can return results in any order as long as each result has the correct tool_call_id. This is important for parallel execution where faster tools finish before slower ones.
Should I parallelize CPU-bound tools too?
Use asyncio.gather for I/O-bound tools (API calls, database queries, file reads). For CPU-bound tools (data processing, computation), use asyncio.to_thread or concurrent.futures.ProcessPoolExecutor to avoid blocking the event loop. Mixing both types is common and the executor pattern handles it cleanly.
#ParallelExecution #AsyncPython #Performance #ToolDesign #AIAgents #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.