Skip to content
Learn Agentic AI11 min read0 views

Background Tasks in FastAPI for AI Agents: Async Processing and Task Queues

Implement background processing for AI agent workloads using FastAPI BackgroundTasks, Celery integration, and custom task queues. Learn task status tracking, webhook notifications, and long-running agent job management.

Why Background Tasks for AI Agents

Not every AI agent interaction fits a synchronous request-response cycle. Research agents that scrape and summarize dozens of pages, batch processing of documents through an LLM, training custom embeddings, or generating lengthy reports can take minutes. Forcing users to hold an HTTP connection open for that long is unreliable and frustrating.

Background tasks let you accept the request immediately, return a task ID, and process the work asynchronously. The client polls for status or receives a webhook notification when the work completes. This pattern is essential for production AI agent systems.

FastAPI Built-in BackgroundTasks

For lightweight tasks that complete in seconds, FastAPI's built-in BackgroundTasks is the simplest option:

from fastapi import BackgroundTasks

async def log_agent_interaction(
    session_id: str,
    user_message: str,
    agent_response: str,
    latency_ms: float,
):
    """Save interaction to analytics database."""
    async with get_db_session() as db:
        log = InteractionLog(
            session_id=session_id,
            user_message=user_message,
            agent_response=agent_response,
            latency_ms=latency_ms,
            created_at=datetime.utcnow(),
        )
        db.add(log)
        await db.commit()

@router.post("/chat")
async def chat(
    request: ChatRequest,
    background_tasks: BackgroundTasks,
    llm_service: LLMService = Depends(get_llm_service),
):
    start = time.monotonic()
    response = await llm_service.generate(request.messages)
    latency = (time.monotonic() - start) * 1000

    # Log asynchronously - response returns immediately
    background_tasks.add_task(
        log_agent_interaction,
        session_id=request.session_id,
        user_message=request.messages[-1].content,
        agent_response=response,
        latency_ms=latency,
    )

    return {"response": response}

The response is sent to the client immediately. The logging happens afterward in the background. However, BackgroundTasks runs in the same process, so if the server restarts, pending tasks are lost.

Task Status Tracking with In-Memory Store

For tasks that take longer, implement a status tracking system:

import uuid
from enum import Enum

class TaskStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

class TaskInfo(BaseModel):
    task_id: str
    status: TaskStatus
    result: Optional[dict] = None
    error: Optional[str] = None
    created_at: datetime
    completed_at: Optional[datetime] = None

# In production, use Redis instead
task_store: dict[str, TaskInfo] = {}

async def run_research_task(task_id: str, query: str):
    task_store[task_id].status = TaskStatus.RUNNING
    try:
        result = await research_agent.deep_research(query)
        task_store[task_id].status = TaskStatus.COMPLETED
        task_store[task_id].result = result
        task_store[task_id].completed_at = datetime.utcnow()
    except Exception as e:
        task_store[task_id].status = TaskStatus.FAILED
        task_store[task_id].error = str(e)

@router.post("/research", status_code=202)
async def start_research(
    request: ResearchRequest,
    background_tasks: BackgroundTasks,
):
    task_id = str(uuid.uuid4())
    task_store[task_id] = TaskInfo(
        task_id=task_id,
        status=TaskStatus.PENDING,
        created_at=datetime.utcnow(),
    )
    background_tasks.add_task(
        run_research_task, task_id, request.query
    )
    return {"task_id": task_id, "status": "pending"}

@router.get("/research/{task_id}")
async def get_research_status(task_id: str):
    task = task_store.get(task_id)
    if not task:
        raise HTTPException(404, "Task not found")
    return task

The endpoint returns HTTP 202 Accepted with a task ID. The client polls GET /research/{task_id} to check progress.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Celery for Distributed Task Queues

For production workloads, use Celery with Redis as the broker. This gives you persistent task queues, automatic retries, worker scaling, and task monitoring:

from celery import Celery

celery_app = Celery(
    "agent_tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    task_track_started=True,
    task_time_limit=600,  # 10 minute hard limit
    task_soft_time_limit=540,  # 9 minute soft limit
)

@celery_app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
)
def process_document_batch(self, document_ids: list[str]):
    try:
        results = []
        for i, doc_id in enumerate(document_ids):
            result = analyze_document_sync(doc_id)
            results.append(result)
            # Update progress
            self.update_state(
                state="PROGRESS",
                meta={"current": i + 1, "total": len(document_ids)},
            )
        return {"results": results, "count": len(results)}
    except ExternalServiceError as e:
        raise self.retry(exc=e)

Integrate Celery tasks into your FastAPI endpoints:

@router.post("/batch-analyze", status_code=202)
async def batch_analyze(request: BatchAnalyzeRequest):
    task = process_document_batch.delay(request.document_ids)
    return {"task_id": task.id, "status": "queued"}

@router.get("/batch-analyze/{task_id}")
async def get_batch_status(task_id: str):
    result = celery_app.AsyncResult(task_id)
    response = {"task_id": task_id, "status": result.status}

    if result.status == "PROGRESS":
        response["progress"] = result.info
    elif result.status == "SUCCESS":
        response["result"] = result.result
    elif result.status == "FAILURE":
        response["error"] = str(result.result)

    return response

Webhook Notifications

Instead of polling, let clients register a webhook URL to receive notifications when tasks complete:

import httpx

async def notify_webhook(
    webhook_url: str, task_id: str, result: dict
):
    async with httpx.AsyncClient() as client:
        await client.post(
            webhook_url,
            json={
                "task_id": task_id,
                "status": "completed",
                "result": result,
            },
            timeout=10.0,
        )

@router.post("/research", status_code=202)
async def start_research(
    request: ResearchRequest,
    background_tasks: BackgroundTasks,
):
    task_id = str(uuid.uuid4())
    background_tasks.add_task(
        run_and_notify,
        task_id,
        request.query,
        request.webhook_url,
    )
    return {"task_id": task_id}

FAQ

When should I use BackgroundTasks versus Celery?

Use FastAPI BackgroundTasks for fire-and-forget operations that take under 30 seconds and where losing a task on server restart is acceptable, like logging, sending notifications, or cache warming. Use Celery for anything that takes longer, needs retries, requires progress tracking, or must survive server restarts. If you are processing user-submitted documents through an LLM, that is a Celery task. If you are logging an API call, that is a BackgroundTask.

How do I prevent duplicate task submissions?

Use an idempotency key. Have clients send a unique key with each request. Before creating a new task, check if a task with that key already exists in your store. If it does, return the existing task ID instead of creating a duplicate. Store the mapping from idempotency key to task ID in Redis with a TTL matching your task retention period.

Can background tasks access FastAPI dependencies?

FastAPI BackgroundTasks functions do not have access to the dependency injection system. Dependencies like database sessions from Depends(get_db) are closed before the background task runs. You must create new database sessions and clients inside the background task function itself, or pass the necessary data as plain arguments rather than injected dependencies.


#FastAPI #BackgroundTasks #Celery #AIAgents #AsyncProcessing #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.