Skip to content
Learn Agentic AI9 min read0 views

Optimizing Agent Tool Calls: Reducing Round Trips and External API Latency

Learn how to minimize tool call overhead in AI agents through batch execution, parallel tool calls, result prefetching, connection pooling, and smart retry strategies for external APIs.

The Tool Call Bottleneck

In most AI agent architectures, the agent loop looks like this: the LLM decides to call a tool, the framework executes the tool, the result goes back to the LLM, and the LLM decides what to do next. Each tool call adds a full LLM round trip — typically 1-3 seconds — plus the tool execution time itself.

A typical customer service interaction might involve 3-5 tool calls: lookup customer, check orders, check inventory, apply discount, confirm change. That is 5 round trips to the LLM plus 5 external API calls. Optimizing this chain has an outsized impact on end-to-end response time.

Batch Tool Calls: One Request Instead of Many

When a tool needs to fetch multiple items, batching the requests into a single call eliminates per-request overhead.

from typing import Any

# BAD: One API call per item
async def get_order_details_slow(order_ids: list[str]) -> list[dict]:
    results = []
    for order_id in order_ids:
        response = await http_client.get(f"/api/orders/{order_id}")
        results.append(response.json())
    return results
# 10 orders = 10 HTTP requests = 10 x 100ms = 1000ms

# GOOD: Single batched API call
async def get_order_details_fast(order_ids: list[str]) -> list[dict]:
    response = await http_client.post(
        "/api/orders/batch",
        json={"ids": order_ids},
    )
    return response.json()
# 10 orders = 1 HTTP request = 100ms

When the external API does not support batch endpoints, you can still parallelize individual calls.

import asyncio

async def get_order_details_parallel(order_ids: list[str]) -> list[dict]:
    tasks = [
        http_client.get(f"/api/orders/{order_id}")
        for order_id in order_ids
    ]
    responses = await asyncio.gather(*tasks)
    return [r.json() for r in responses]
# 10 orders = 10 HTTP requests in parallel = ~100ms (not 1000ms)

Designing Composite Tools

Instead of exposing many small tools to the LLM, create composite tools that accomplish common multi-step operations in a single call.

from agents import function_tool

# BAD: Three separate tools that the LLM calls sequentially
@function_tool
async def search_customer(email: str) -> str:
    customer = await db.fetch_one("SELECT * FROM customers WHERE email = $1", email)
    return json.dumps(customer)

@function_tool
async def get_recent_orders(customer_id: str) -> str:
    orders = await db.fetch("SELECT * FROM orders WHERE customer_id = $1 LIMIT 5", customer_id)
    return json.dumps(orders)

@function_tool
async def get_open_tickets(customer_id: str) -> str:
    tickets = await db.fetch("SELECT * FROM tickets WHERE customer_id = $1 AND status = 'open'", customer_id)
    return json.dumps(tickets)

# GOOD: One composite tool that returns everything
@function_tool
async def get_customer_context(email: str) -> str:
    """Look up a customer and return their profile, recent orders, and open tickets."""
    customer = await db.fetch_one(
        "SELECT * FROM customers WHERE email = $1", email
    )
    if not customer:
        return json.dumps({"error": "Customer not found"})

    orders, tickets = await asyncio.gather(
        db.fetch(
            "SELECT * FROM orders WHERE customer_id = $1 "
            "ORDER BY created_at DESC LIMIT 5",
            customer["id"],
        ),
        db.fetch(
            "SELECT * FROM tickets WHERE customer_id = $1 AND status = 'open'",
            customer["id"],
        ),
    )

    return json.dumps({
        "customer": customer,
        "recent_orders": orders,
        "open_tickets": tickets,
    })

This reduces three LLM round trips to one. The LLM calls get_customer_context once and gets everything it needs.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Connection Pooling for External APIs

Every tool call that hits an external API benefits from connection pooling. Without it, each call pays the full TCP+TLS handshake cost.

import httpx
from contextlib import asynccontextmanager

class ToolConnectionPool:
    def __init__(self):
        self._clients: dict[str, httpx.AsyncClient] = {}

    def get_client(self, base_url: str) -> httpx.AsyncClient:
        if base_url not in self._clients:
            self._clients[base_url] = httpx.AsyncClient(
                base_url=base_url,
                limits=httpx.Limits(
                    max_connections=10,
                    max_keepalive_connections=5,
                    keepalive_expiry=120,
                ),
                timeout=httpx.Timeout(10.0, connect=3.0),
                http2=True,
            )
        return self._clients[base_url]

    async def close_all(self):
        for client in self._clients.values():
            await client.aclose()
        self._clients.clear()

# Global pool shared across all tool executions
pool = ToolConnectionPool()

@function_tool
async def check_inventory(product_id: str) -> str:
    client = pool.get_client("https://inventory.internal")
    response = await client.get(f"/api/products/{product_id}/stock")
    return response.text

@function_tool
async def get_shipping_estimate(zip_code: str, product_id: str) -> str:
    client = pool.get_client("https://shipping.internal")
    response = await client.post(
        "/api/estimates",
        json={"zip": zip_code, "product": product_id},
    )
    return response.text

Result Prefetching

When the agent follows predictable tool chains, you can start fetching the next tool's data while the LLM is still processing the current result.

import asyncio

class PrefetchingToolRunner:
    def __init__(self, tool_registry: dict):
        self.tools = tool_registry
        self._prefetch_tasks: dict[str, asyncio.Task] = {}
        # Predefined chains: tool A is usually followed by tool B
        self.chains = {
            "search_customer": ("get_orders", lambda result: {"customer_id": result["id"]}),
            "get_orders": ("get_shipments", lambda result: {"order_ids": [o["id"] for o in result]}),
        }

    async def execute(self, tool_name: str, args: dict) -> Any:
        # Check if this result was prefetched
        cache_key = f"{tool_name}:{json.dumps(args, sort_keys=True)}"
        if cache_key in self._prefetch_tasks:
            result = await self._prefetch_tasks.pop(cache_key)
            self._start_prefetch(tool_name, result)
            return result

        # Execute the tool
        result = await self.tools[tool_name](**args)

        # Start prefetching the likely next tool
        self._start_prefetch(tool_name, result)

        return result

    def _start_prefetch(self, completed_tool: str, result: Any):
        if completed_tool in self.chains:
            next_tool, arg_builder = self.chains[completed_tool]
            try:
                next_args = arg_builder(result)
                cache_key = f"{next_tool}:{json.dumps(next_args, sort_keys=True)}"
                self._prefetch_tasks[cache_key] = asyncio.create_task(
                    self.tools[next_tool](**next_args)
                )
            except (KeyError, TypeError):
                pass  # Cannot build args from result, skip prefetch

Smart Retry with Exponential Backoff

External APIs fail. Good retry logic prevents a single transient error from breaking the entire agent run.

import asyncio
import random
from typing import TypeVar, Callable

T = TypeVar("T")

async def retry_with_backoff(
    fn: Callable[..., T],
    max_retries: int = 3,
    base_delay: float = 0.5,
    max_delay: float = 10.0,
) -> T:
    for attempt in range(max_retries + 1):
        try:
            return await fn()
        except Exception as e:
            if attempt == max_retries:
                raise
            delay = min(base_delay * (2 ** attempt) + random.uniform(0, 0.5), max_delay)
            await asyncio.sleep(delay)

# Usage in a tool
@function_tool
async def fetch_weather(city: str) -> str:
    async def _call():
        response = await pool.get_client("https://weather.api.com").get(
            f"/v1/current?city={city}"
        )
        response.raise_for_status()
        return response.text

    return await retry_with_backoff(_call, max_retries=2)

FAQ

How many tools should I expose to the LLM?

Fewer is better. Each tool adds to the system prompt size and increases the chance of the LLM choosing poorly. Aim for 5-15 well-designed composite tools rather than 30+ granular ones. If a sequence of three tools is always called together, combine them into one tool.

Should I cache tool results between agent turns?

Yes, especially for tools that fetch relatively stable data. If the agent calls get_customer_profile on turn 1 and calls it again on turn 3, serving the cached result eliminates an unnecessary API call. Use a short TTL (60-300 seconds) so the data stays fresh within a single conversation.

How do I handle tool timeouts without breaking the agent loop?

Set aggressive timeouts (3-5 seconds for most tools) and return a structured error response instead of letting the timeout propagate. The LLM can then decide to retry, try an alternative tool, or inform the user. Never let a single slow tool hang the entire agent indefinitely.


#ToolCalls #APIOptimization #BatchProcessing #ConnectionPooling #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.