Long-Running API Operations for AI Agents: Async Tasks, Polling, and Webhooks
Implement long-running operations in AI agent APIs using async task patterns, polling endpoints, and webhook callbacks. Covers task lifecycle management, timeout handling, and FastAPI implementation with background workers.
When Synchronous Requests Are Not Enough
Many AI agent operations take too long for a synchronous HTTP request. Fine-tuning a model takes hours. Batch processing thousands of documents takes minutes. Running an evaluation suite across multiple test cases can take tens of minutes. Holding an HTTP connection open for that long is unreliable — proxies timeout, clients disconnect, and server resources are tied up.
The solution is the async task pattern: accept the request immediately, return a task ID, and let the client check back for results via polling or receive a callback via webhooks.
The Async Task Pattern
The pattern has three components: a submission endpoint that returns immediately, a status endpoint for polling, and an optional webhook for push notification.
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, HttpUrl
from enum import Enum
import uuid
import asyncio
from datetime import datetime
app = FastAPI()
class TaskStatus(str, Enum):
pending = "pending"
running = "running"
completed = "completed"
failed = "failed"
cancelled = "cancelled"
class TaskRecord(BaseModel):
id: str
status: TaskStatus
created_at: str
started_at: str | None = None
completed_at: str | None = None
progress: float = 0.0
result: dict | None = None
error: dict | None = None
# In production, use Redis or a database
task_store: dict[str, TaskRecord] = {}
class BatchEvalRequest(BaseModel):
agent_id: str
test_suite_id: str
webhook_url: HttpUrl | None = None
@app.post("/v1/evaluations", status_code=202)
async def submit_evaluation(
body: BatchEvalRequest,
background_tasks: BackgroundTasks,
):
task_id = str(uuid.uuid4())
task = TaskRecord(
id=task_id,
status=TaskStatus.pending,
created_at=datetime.utcnow().isoformat(),
)
task_store[task_id] = task
background_tasks.add_task(
run_evaluation, task_id, body.agent_id,
body.test_suite_id, body.webhook_url,
)
return {
"task_id": task_id,
"status": "pending",
"status_url": f"/v1/evaluations/{task_id}",
"cancel_url": f"/v1/evaluations/{task_id}/cancel",
}
The key detail is the 202 Accepted status code. It tells the client that the request was accepted for processing but is not yet complete. The response includes URLs for polling status and cancelling the task.
Background Worker Implementation
The background worker updates the task record as it progresses. This enables clients to track completion percentage.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import httpx
async def run_evaluation(
task_id: str,
agent_id: str,
test_suite_id: str,
webhook_url: str | None,
):
task = task_store[task_id]
task.status = TaskStatus.running
task.started_at = datetime.utcnow().isoformat()
try:
test_cases = await load_test_cases(test_suite_id)
results = []
for i, test_case in enumerate(test_cases):
result = await evaluate_single(agent_id, test_case)
results.append(result)
task.progress = (i + 1) / len(test_cases)
task.status = TaskStatus.completed
task.completed_at = datetime.utcnow().isoformat()
task.result = {
"total": len(results),
"passed": sum(1 for r in results if r["passed"]),
"failed": sum(1 for r in results if not r["passed"]),
"details": results,
}
except Exception as e:
task.status = TaskStatus.failed
task.completed_at = datetime.utcnow().isoformat()
task.error = {"message": str(e), "type": type(e).__name__}
# Send webhook notification if configured
if webhook_url:
await send_webhook(webhook_url, task)
async def send_webhook(url: str, task: TaskRecord):
async with httpx.AsyncClient() as client:
try:
await client.post(
str(url),
json={
"event": "evaluation.completed",
"task_id": task.id,
"status": task.status,
"result": task.result,
"error": task.error,
},
timeout=10.0,
)
except httpx.RequestError:
pass # Log but do not fail the task
Polling Endpoint with Retry-After
The status endpoint returns the current task state. The Retry-After header tells clients how long to wait before polling again, reducing unnecessary requests.
from fastapi.responses import JSONResponse
@app.get("/v1/evaluations/{task_id}")
async def get_evaluation_status(task_id: str):
task = task_store.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
response = JSONResponse(content=task.model_dump())
if task.status in (TaskStatus.pending, TaskStatus.running):
retry_seconds = 5 if task.progress > 0.8 else 15
response.headers["Retry-After"] = str(retry_seconds)
return response
Task Cancellation
AI agents need to cancel tasks that are no longer needed. Implement cancellation as a cooperative mechanism: the worker checks a cancellation flag periodically.
@app.post("/v1/evaluations/{task_id}/cancel")
async def cancel_evaluation(task_id: str):
task = task_store.get(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
if task.status in (TaskStatus.completed, TaskStatus.failed):
raise HTTPException(
status_code=409,
detail=f"Cannot cancel task in '{task.status}' state",
)
task.status = TaskStatus.cancelled
task.completed_at = datetime.utcnow().isoformat()
return {"status": "cancelled"}
Timeout Handling
Set maximum durations for tasks and fail them if they exceed the limit. This prevents resource leaks from hung operations.
TASK_TIMEOUT_SECONDS = 3600 # 1 hour
async def run_with_timeout(task_id: str, coro):
try:
await asyncio.wait_for(coro, timeout=TASK_TIMEOUT_SECONDS)
except asyncio.TimeoutError:
task = task_store.get(task_id)
if task:
task.status = TaskStatus.failed
task.error = {
"message": f"Task exceeded {TASK_TIMEOUT_SECONDS}s timeout",
"type": "TimeoutError",
}
task.completed_at = datetime.utcnow().isoformat()
FAQ
Should I use polling or webhooks for AI agent integrations?
Use both. Provide webhooks as the primary notification mechanism for agent platforms that can receive callbacks. Provide polling as a fallback for environments where incoming HTTP connections are blocked (like serverless functions or development machines behind NATs). Many production systems register a webhook but also poll as a safety net in case the webhook delivery fails.
How do I handle webhook delivery failures?
Implement retry with exponential backoff: try again after 1 minute, 5 minutes, 30 minutes, then hourly for up to 24 hours. Log all delivery attempts and their HTTP status codes. Provide a webhook event log endpoint where consumers can see delivery history and manually replay failed events. After all retries are exhausted, mark the delivery as permanently failed but keep the result available via the polling endpoint.
What should the task TTL be before cleanup?
Keep completed task records for at least 7 days so agents can retrieve results even after delays. For failed tasks, retain them for 30 days for debugging purposes. Use a background cleanup job that removes expired records. Always document the retention policy in your API documentation so consumers know how long results are available.
#AsyncAPIs #BackgroundTasks #Webhooks #Polling #FastAPI #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.