Skip to content
Learn Agentic AI11 min read0 views

Long-Running API Operations for AI Agents: Async Tasks, Polling, and Webhooks

Implement long-running operations in AI agent APIs using async task patterns, polling endpoints, and webhook callbacks. Covers task lifecycle management, timeout handling, and FastAPI implementation with background workers.

When Synchronous Requests Are Not Enough

Many AI agent operations take too long for a synchronous HTTP request. Fine-tuning a model takes hours. Batch processing thousands of documents takes minutes. Running an evaluation suite across multiple test cases can take tens of minutes. Holding an HTTP connection open for that long is unreliable — proxies timeout, clients disconnect, and server resources are tied up.

The solution is the async task pattern: accept the request immediately, return a task ID, and let the client check back for results via polling or receive a callback via webhooks.

The Async Task Pattern

The pattern has three components: a submission endpoint that returns immediately, a status endpoint for polling, and an optional webhook for push notification.

from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, HttpUrl
from enum import Enum
import uuid
import asyncio
from datetime import datetime

app = FastAPI()

class TaskStatus(str, Enum):
    pending = "pending"
    running = "running"
    completed = "completed"
    failed = "failed"
    cancelled = "cancelled"

class TaskRecord(BaseModel):
    id: str
    status: TaskStatus
    created_at: str
    started_at: str | None = None
    completed_at: str | None = None
    progress: float = 0.0
    result: dict | None = None
    error: dict | None = None

# In production, use Redis or a database
task_store: dict[str, TaskRecord] = {}

class BatchEvalRequest(BaseModel):
    agent_id: str
    test_suite_id: str
    webhook_url: HttpUrl | None = None

@app.post("/v1/evaluations", status_code=202)
async def submit_evaluation(
    body: BatchEvalRequest,
    background_tasks: BackgroundTasks,
):
    task_id = str(uuid.uuid4())
    task = TaskRecord(
        id=task_id,
        status=TaskStatus.pending,
        created_at=datetime.utcnow().isoformat(),
    )
    task_store[task_id] = task

    background_tasks.add_task(
        run_evaluation, task_id, body.agent_id,
        body.test_suite_id, body.webhook_url,
    )

    return {
        "task_id": task_id,
        "status": "pending",
        "status_url": f"/v1/evaluations/{task_id}",
        "cancel_url": f"/v1/evaluations/{task_id}/cancel",
    }

The key detail is the 202 Accepted status code. It tells the client that the request was accepted for processing but is not yet complete. The response includes URLs for polling status and cancelling the task.

Background Worker Implementation

The background worker updates the task record as it progresses. This enables clients to track completion percentage.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import httpx

async def run_evaluation(
    task_id: str,
    agent_id: str,
    test_suite_id: str,
    webhook_url: str | None,
):
    task = task_store[task_id]
    task.status = TaskStatus.running
    task.started_at = datetime.utcnow().isoformat()

    try:
        test_cases = await load_test_cases(test_suite_id)
        results = []

        for i, test_case in enumerate(test_cases):
            result = await evaluate_single(agent_id, test_case)
            results.append(result)

            task.progress = (i + 1) / len(test_cases)

        task.status = TaskStatus.completed
        task.completed_at = datetime.utcnow().isoformat()
        task.result = {
            "total": len(results),
            "passed": sum(1 for r in results if r["passed"]),
            "failed": sum(1 for r in results if not r["passed"]),
            "details": results,
        }

    except Exception as e:
        task.status = TaskStatus.failed
        task.completed_at = datetime.utcnow().isoformat()
        task.error = {"message": str(e), "type": type(e).__name__}

    # Send webhook notification if configured
    if webhook_url:
        await send_webhook(webhook_url, task)

async def send_webhook(url: str, task: TaskRecord):
    async with httpx.AsyncClient() as client:
        try:
            await client.post(
                str(url),
                json={
                    "event": "evaluation.completed",
                    "task_id": task.id,
                    "status": task.status,
                    "result": task.result,
                    "error": task.error,
                },
                timeout=10.0,
            )
        except httpx.RequestError:
            pass  # Log but do not fail the task

Polling Endpoint with Retry-After

The status endpoint returns the current task state. The Retry-After header tells clients how long to wait before polling again, reducing unnecessary requests.

from fastapi.responses import JSONResponse

@app.get("/v1/evaluations/{task_id}")
async def get_evaluation_status(task_id: str):
    task = task_store.get(task_id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    response = JSONResponse(content=task.model_dump())

    if task.status in (TaskStatus.pending, TaskStatus.running):
        retry_seconds = 5 if task.progress > 0.8 else 15
        response.headers["Retry-After"] = str(retry_seconds)

    return response

Task Cancellation

AI agents need to cancel tasks that are no longer needed. Implement cancellation as a cooperative mechanism: the worker checks a cancellation flag periodically.

@app.post("/v1/evaluations/{task_id}/cancel")
async def cancel_evaluation(task_id: str):
    task = task_store.get(task_id)
    if not task:
        raise HTTPException(status_code=404, detail="Task not found")

    if task.status in (TaskStatus.completed, TaskStatus.failed):
        raise HTTPException(
            status_code=409,
            detail=f"Cannot cancel task in '{task.status}' state",
        )

    task.status = TaskStatus.cancelled
    task.completed_at = datetime.utcnow().isoformat()
    return {"status": "cancelled"}

Timeout Handling

Set maximum durations for tasks and fail them if they exceed the limit. This prevents resource leaks from hung operations.

TASK_TIMEOUT_SECONDS = 3600  # 1 hour

async def run_with_timeout(task_id: str, coro):
    try:
        await asyncio.wait_for(coro, timeout=TASK_TIMEOUT_SECONDS)
    except asyncio.TimeoutError:
        task = task_store.get(task_id)
        if task:
            task.status = TaskStatus.failed
            task.error = {
                "message": f"Task exceeded {TASK_TIMEOUT_SECONDS}s timeout",
                "type": "TimeoutError",
            }
            task.completed_at = datetime.utcnow().isoformat()

FAQ

Should I use polling or webhooks for AI agent integrations?

Use both. Provide webhooks as the primary notification mechanism for agent platforms that can receive callbacks. Provide polling as a fallback for environments where incoming HTTP connections are blocked (like serverless functions or development machines behind NATs). Many production systems register a webhook but also poll as a safety net in case the webhook delivery fails.

How do I handle webhook delivery failures?

Implement retry with exponential backoff: try again after 1 minute, 5 minutes, 30 minutes, then hourly for up to 24 hours. Log all delivery attempts and their HTTP status codes. Provide a webhook event log endpoint where consumers can see delivery history and manually replay failed events. After all retries are exhausted, mark the delivery as permanently failed but keep the result available via the polling endpoint.

What should the task TTL be before cleanup?

Keep completed task records for at least 7 days so agents can retrieve results even after delays. For failed tasks, retain them for 30 days for debugging purposes. Use a background cleanup job that removes expired records. Always document the retention policy in your API documentation so consumers know how long results are available.


#AsyncAPIs #BackgroundTasks #Webhooks #Polling #FastAPI #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.