Error Handling in Async Agent Code: Timeouts, Cancellation, and Graceful Shutdown
Master error handling in async Python for AI agents. Learn asyncio.timeout, task cancellation, cleanup patterns, and exception groups for robust agent systems.
Why Async Error Handling Is Different
Synchronous error handling is straightforward: exceptions propagate up the call stack, and a single try/except catches them. Async code introduces new failure modes. A coroutine can be cancelled externally. Multiple concurrent tasks can fail simultaneously. An event loop shutdown must clean up dozens of in-flight operations. LLM API calls can hang indefinitely without proper timeouts.
Getting error handling right in async agent code is the difference between an agent that recovers gracefully and one that silently drops user requests.
Timeouts: The First Line of Defense
LLM APIs can hang — network partitions, overloaded servers, malformed requests that never complete. Always enforce timeouts.
import asyncio
import httpx
async def call_llm_with_timeout(
client: httpx.AsyncClient,
prompt: str,
timeout_seconds: float = 30.0,
) -> str:
"""Call LLM with a strict timeout."""
try:
async with asyncio.timeout(timeout_seconds):
response = await client.post(
"https://api.openai.com/v1/chat/completions",
json={
"model": "gpt-4o",
"messages": [{"role": "user", "content": prompt}],
},
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except TimeoutError:
print(f"LLM call timed out after {timeout_seconds}s")
raise
except httpx.HTTPStatusError as e:
print(f"HTTP error {e.response.status_code}: {e.response.text}")
raise
async def agent_step_with_fallback(
client: httpx.AsyncClient,
prompt: str,
) -> str:
"""Agent step with timeout and fallback."""
try:
return await call_llm_with_timeout(client, prompt, timeout_seconds=15.0)
except (TimeoutError, httpx.HTTPStatusError):
# Fallback to a faster, simpler model
return await call_llm_with_timeout(
client,
prompt,
timeout_seconds=10.0,
)
asyncio.timeout() (Python 3.11+) creates a context manager that raises TimeoutError if the block does not complete within the specified duration. It is the recommended replacement for the older asyncio.wait_for().
Task Cancellation
Tasks can be cancelled externally — for example, when a user disconnects or a parent operation times out. Handle cancellation explicitly.
async def cancellable_agent_workflow(session_id: str) -> str:
"""Agent workflow that handles cancellation cleanly."""
resources = []
try:
# Acquire resources
db_conn = await get_db_connection()
resources.append(db_conn)
# Long-running LLM work
context = await retrieve_context(session_id)
response = await generate_response(context)
await save_response(db_conn, session_id, response)
return response
except asyncio.CancelledError:
# Clean up any partial state
print(f"Workflow cancelled for session {session_id}")
await mark_session_cancelled(session_id)
raise # Always re-raise CancelledError
finally:
# Release resources regardless of outcome
for resource in resources:
await resource.close()
The critical rule: always re-raise CancelledError. Swallowing it prevents the event loop from properly shutting down the task.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Exception Groups (Python 3.11+)
When asyncio.gather() runs with return_exceptions=False (the default), only the first exception propagates. Python 3.11 introduced TaskGroup with exception groups to capture all failures.
async def robust_parallel_calls(prompts: list[str]) -> list[str]:
"""Process prompts with proper multi-exception handling."""
results = [None] * len(prompts)
async with httpx.AsyncClient(
headers={"Authorization": f"Bearer {API_KEY}"},
timeout=30.0,
) as client:
try:
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(
call_llm_with_timeout(client, prompt),
name=f"prompt_{i}",
)
for i, prompt in enumerate(prompts)
]
except* httpx.HTTPStatusError as eg:
print(f"{len(eg.exceptions)} HTTP errors occurred:")
for exc in eg.exceptions:
print(f" - {exc.response.status_code}")
except* TimeoutError as eg:
print(f"{len(eg.exceptions)} timeouts occurred")
else:
results = [task.result() for task in tasks]
return results
The except* syntax matches specific exception types within an ExceptionGroup, letting you handle different failure classes separately.
Graceful Shutdown
When your agent service receives a shutdown signal, it must finish in-flight requests, clean up resources, and exit cleanly.
import signal
class AgentService:
def __init__(self):
self._shutdown_event = asyncio.Event()
self._active_tasks: set[asyncio.Task] = set()
async def handle_request(self, request: dict) -> dict:
"""Process a single agent request."""
task = asyncio.current_task()
self._active_tasks.add(task)
try:
result = await self._run_agent_workflow(request)
return {"status": "success", "result": result}
except asyncio.CancelledError:
return {"status": "cancelled"}
finally:
self._active_tasks.discard(task)
async def shutdown(self, grace_period: float = 30.0):
"""Gracefully shut down the service."""
print(f"Shutting down. {len(self._active_tasks)} tasks in flight.")
self._shutdown_event.set()
if self._active_tasks:
# Wait for active tasks to complete
print(f"Waiting up to {grace_period}s for tasks...")
try:
async with asyncio.timeout(grace_period):
await asyncio.gather(
*self._active_tasks,
return_exceptions=True,
)
except TimeoutError:
# Force cancel remaining tasks
print("Grace period expired. Cancelling tasks.")
for task in self._active_tasks:
task.cancel()
await asyncio.gather(
*self._active_tasks,
return_exceptions=True,
)
print("Shutdown complete.")
async def run(self):
"""Main service loop."""
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGTERM,
lambda: asyncio.create_task(self.shutdown()),
)
loop.add_signal_handler(
signal.SIGINT,
lambda: asyncio.create_task(self.shutdown()),
)
# Service loop
while not self._shutdown_event.is_set():
await asyncio.sleep(0.1)
Structured Error Context
Wrap errors with context to make debugging async agent failures tractable.
class AgentStepError(Exception):
"""Error with agent step context for debugging."""
def __init__(self, step: str, session_id: str, cause: Exception):
self.step = step
self.session_id = session_id
self.cause = cause
super().__init__(
f"Step '{step}' failed for session {session_id}: {cause}"
)
async def run_step_with_context(
step_name: str,
session_id: str,
coro,
):
"""Run a step with structured error wrapping."""
try:
return await coro
except asyncio.CancelledError:
raise # Never wrap cancellation
except Exception as e:
raise AgentStepError(step_name, session_id, e) from e
FAQ
Should I use asyncio.timeout or httpx's built-in timeout?
Use both. httpx's timeout handles connection-level failures (connect timeout, read timeout). asyncio.timeout wraps the entire operation including retries, parsing, and any processing you do with the response. They serve different purposes: httpx catches slow networks, asyncio.timeout catches slow business logic.
How do I debug tasks that silently disappear?
Tasks that raise unhandled exceptions outside of an await are logged as warnings but easily missed. Always store task references and check their results: task = asyncio.create_task(coro()); task.add_done_callback(handle_task_result). In the callback, check task.exception() and log it explicitly. TaskGroup in Python 3.11+ makes this easier by propagating all exceptions.
When should I catch CancelledError vs let it propagate?
Catch it only to perform cleanup (closing connections, saving state, rolling back transactions), then always re-raise it. The only exception is top-level request handlers where you want to return a "cancelled" response to the client. Never silently swallow CancelledError — it breaks asyncio's task management.
#Python #ErrorHandling #Asyncio #Timeouts #AIAgents #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.