Skip to content
Learn Agentic AI10 min read0 views

Timeout Management for AI Agent Pipelines: Preventing Hung Requests

Implement comprehensive timeout strategies for AI agent pipelines including cascading timeouts, deadline propagation, and proper cleanup of abandoned requests to prevent resource leaks.

The Silent Killer: Requests That Never Finish

The most insidious failure in an AI agent system is not a crash — it is a request that hangs forever. A stuck LLM call holds an open connection, consumes a worker thread, and leaves the user staring at a spinner. In production, hung requests accumulate, exhaust connection pools, and eventually bring down the entire service.

Proper timeout management ensures every operation has a maximum duration, nested operations share a global deadline, and abandoned work is cleaned up.

Layered Timeout Architecture

An AI agent pipeline has multiple layers, each needing its own timeout. From outer to inner:

  1. Request timeout — total time the user is willing to wait (e.g., 30 seconds)
  2. Agent loop timeout — maximum time for all reasoning iterations (e.g., 25 seconds)
  3. LLM call timeout — single model inference (e.g., 15 seconds)
  4. Tool execution timeout — single tool call (e.g., 10 seconds)
import asyncio
from dataclasses import dataclass
from typing import Optional
import time

@dataclass
class Deadline:
    """A shared deadline that propagates through the call chain."""
    absolute_time: float

    @classmethod
    def from_timeout(cls, timeout_seconds: float) -> "Deadline":
        return cls(absolute_time=time.monotonic() + timeout_seconds)

    @property
    def remaining(self) -> float:
        return max(0, self.absolute_time - time.monotonic())

    @property
    def expired(self) -> bool:
        return self.remaining <= 0

    def child_timeout(self, max_timeout: float) -> float:
        """Return the lesser of the requested timeout and remaining deadline."""
        return min(max_timeout, self.remaining)

Deadline Propagation

The key pattern is passing the deadline down through every layer. Each layer calculates its own timeout as the minimum of its desired timeout and the remaining deadline.

class TimeoutAwareAgent:
    def __init__(self, llm_timeout: float = 15.0, tool_timeout: float = 10.0):
        self.llm_timeout = llm_timeout
        self.tool_timeout = tool_timeout

    async def run(self, query: str, deadline: Deadline) -> str:
        """Main agent loop with deadline awareness."""
        if deadline.expired:
            raise TimeoutError("Request deadline already expired")

        max_iterations = 5
        messages = [{"role": "user", "content": query}]

        for i in range(max_iterations):
            if deadline.expired:
                return self._partial_response(messages)

            # LLM call with propagated timeout
            llm_timeout = deadline.child_timeout(self.llm_timeout)
            try:
                response = await asyncio.wait_for(
                    self._call_llm(messages),
                    timeout=llm_timeout,
                )
            except asyncio.TimeoutError:
                return self._partial_response(messages)

            if response.get("tool_calls"):
                tool_timeout = deadline.child_timeout(self.tool_timeout)
                try:
                    tool_results = await asyncio.wait_for(
                        self._execute_tools(response["tool_calls"]),
                        timeout=tool_timeout,
                    )
                    messages.append({"role": "tool", "content": str(tool_results)})
                except asyncio.TimeoutError:
                    messages.append({
                        "role": "tool",
                        "content": "Tool execution timed out. Summarize with available info.",
                    })
            else:
                return response["content"]

        return self._partial_response(messages)

    def _partial_response(self, messages: list) -> str:
        return (
            "I was not able to complete my full analysis within the time limit. "
            "Here is what I have so far based on the information gathered."
        )

    async def _call_llm(self, messages: list) -> dict:
        # Placeholder for actual LLM call
        await asyncio.sleep(0.5)
        return {"content": "response", "tool_calls": None}

    async def _execute_tools(self, tool_calls: list) -> list:
        await asyncio.sleep(0.3)
        return [{"result": "data"}]

Parallel Tool Execution with Per-Tool Timeouts

When an agent calls multiple tools, each tool should have an independent timeout, with a global cap from the deadline.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

async def execute_tools_parallel(
    tool_calls: list[dict],
    tool_registry: dict,
    deadline: Deadline,
    per_tool_timeout: float = 10.0,
) -> list[dict]:
    """Execute tools in parallel, each with its own timeout."""
    results = []
    timeout = deadline.child_timeout(per_tool_timeout)

    async def run_one(tool_call: dict) -> dict:
        tool_name = tool_call["name"]
        tool_fn = tool_registry.get(tool_name)
        if not tool_fn:
            return {"tool": tool_name, "error": "Unknown tool"}
        try:
            result = await asyncio.wait_for(tool_fn(tool_call["args"]), timeout=timeout)
            return {"tool": tool_name, "result": result}
        except asyncio.TimeoutError:
            return {"tool": tool_name, "error": f"Timed out after {timeout:.1f}s"}
        except Exception as exc:
            return {"tool": tool_name, "error": str(exc)}

    tasks = [run_one(tc) for tc in tool_calls]
    results = await asyncio.gather(*tasks)
    return list(results)

Cleaning Up After Timeouts

Timeouts that cancel an asyncio task do not automatically close HTTP connections, database cursors, or file handles. Always use structured cleanup.

class ManagedHTTPClient:
    """HTTP client that tracks and cleans up outstanding requests."""

    def __init__(self):
        self.client = None
        self.pending_requests: set = set()

    async def start(self):
        import httpx
        self.client = httpx.AsyncClient(timeout=30.0)

    async def request(self, method: str, url: str, **kwargs):
        task = asyncio.current_task()
        self.pending_requests.add(task)
        try:
            return await self.client.request(method, url, **kwargs)
        finally:
            self.pending_requests.discard(task)

    async def cleanup(self):
        for task in list(self.pending_requests):
            task.cancel()
        if self.client:
            await self.client.aclose()

FAQ

What happens if the LLM is mid-stream when the timeout fires?

With asyncio.wait_for, the coroutine is cancelled. If you are using streaming responses, you will have a partial response buffer. The best practice is to capture whatever tokens have arrived so far and use them as a partial response. Never leave a streaming connection open without a timeout — it can hold resources indefinitely.

How should I set timeout values for a user-facing agent?

Start from the user experience backward. If users expect a response within 10 seconds, set the request deadline to 10 seconds, allocate 8 seconds to the agent loop, and let the LLM call and tool execution compete for that budget. Measure actual p95 latencies in production and tune from there. Most LLM calls complete in 2-5 seconds, so a 15-second LLM timeout with a 30-second request deadline is a reasonable starting point.

Should I return partial results or an error when a timeout occurs?

Always prefer partial results over a generic error. If the agent gathered useful information from one tool before the second tool timed out, return what you have with a note about the incomplete analysis. Users find partial answers far more useful than "request timed out" errors.


#TimeoutManagement #PipelineDesign #AsyncPython #AIAgents #Resilience #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.