Background Tasks in FastAPI for AI Agents: Async Processing and Task Queues
Implement background processing for AI agent workloads using FastAPI BackgroundTasks, Celery integration, and custom task queues. Learn task status tracking, webhook notifications, and long-running agent job management.
Why Background Tasks for AI Agents
Not every AI agent interaction fits a synchronous request-response cycle. Research agents that scrape and summarize dozens of pages, batch processing of documents through an LLM, training custom embeddings, or generating lengthy reports can take minutes. Forcing users to hold an HTTP connection open for that long is unreliable and frustrating.
Background tasks let you accept the request immediately, return a task ID, and process the work asynchronously. The client polls for status or receives a webhook notification when the work completes. This pattern is essential for production AI agent systems.
FastAPI Built-in BackgroundTasks
For lightweight tasks that complete in seconds, FastAPI's built-in BackgroundTasks is the simplest option:
from fastapi import BackgroundTasks
async def log_agent_interaction(
session_id: str,
user_message: str,
agent_response: str,
latency_ms: float,
):
"""Save interaction to analytics database."""
async with get_db_session() as db:
log = InteractionLog(
session_id=session_id,
user_message=user_message,
agent_response=agent_response,
latency_ms=latency_ms,
created_at=datetime.utcnow(),
)
db.add(log)
await db.commit()
@router.post("/chat")
async def chat(
request: ChatRequest,
background_tasks: BackgroundTasks,
llm_service: LLMService = Depends(get_llm_service),
):
start = time.monotonic()
response = await llm_service.generate(request.messages)
latency = (time.monotonic() - start) * 1000
# Log asynchronously - response returns immediately
background_tasks.add_task(
log_agent_interaction,
session_id=request.session_id,
user_message=request.messages[-1].content,
agent_response=response,
latency_ms=latency,
)
return {"response": response}
The response is sent to the client immediately. The logging happens afterward in the background. However, BackgroundTasks runs in the same process, so if the server restarts, pending tasks are lost.
Task Status Tracking with In-Memory Store
For tasks that take longer, implement a status tracking system:
import uuid
from enum import Enum
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class TaskInfo(BaseModel):
task_id: str
status: TaskStatus
result: Optional[dict] = None
error: Optional[str] = None
created_at: datetime
completed_at: Optional[datetime] = None
# In production, use Redis instead
task_store: dict[str, TaskInfo] = {}
async def run_research_task(task_id: str, query: str):
task_store[task_id].status = TaskStatus.RUNNING
try:
result = await research_agent.deep_research(query)
task_store[task_id].status = TaskStatus.COMPLETED
task_store[task_id].result = result
task_store[task_id].completed_at = datetime.utcnow()
except Exception as e:
task_store[task_id].status = TaskStatus.FAILED
task_store[task_id].error = str(e)
@router.post("/research", status_code=202)
async def start_research(
request: ResearchRequest,
background_tasks: BackgroundTasks,
):
task_id = str(uuid.uuid4())
task_store[task_id] = TaskInfo(
task_id=task_id,
status=TaskStatus.PENDING,
created_at=datetime.utcnow(),
)
background_tasks.add_task(
run_research_task, task_id, request.query
)
return {"task_id": task_id, "status": "pending"}
@router.get("/research/{task_id}")
async def get_research_status(task_id: str):
task = task_store.get(task_id)
if not task:
raise HTTPException(404, "Task not found")
return task
The endpoint returns HTTP 202 Accepted with a task ID. The client polls GET /research/{task_id} to check progress.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Celery for Distributed Task Queues
For production workloads, use Celery with Redis as the broker. This gives you persistent task queues, automatic retries, worker scaling, and task monitoring:
from celery import Celery
celery_app = Celery(
"agent_tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
task_track_started=True,
task_time_limit=600, # 10 minute hard limit
task_soft_time_limit=540, # 9 minute soft limit
)
@celery_app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
)
def process_document_batch(self, document_ids: list[str]):
try:
results = []
for i, doc_id in enumerate(document_ids):
result = analyze_document_sync(doc_id)
results.append(result)
# Update progress
self.update_state(
state="PROGRESS",
meta={"current": i + 1, "total": len(document_ids)},
)
return {"results": results, "count": len(results)}
except ExternalServiceError as e:
raise self.retry(exc=e)
Integrate Celery tasks into your FastAPI endpoints:
@router.post("/batch-analyze", status_code=202)
async def batch_analyze(request: BatchAnalyzeRequest):
task = process_document_batch.delay(request.document_ids)
return {"task_id": task.id, "status": "queued"}
@router.get("/batch-analyze/{task_id}")
async def get_batch_status(task_id: str):
result = celery_app.AsyncResult(task_id)
response = {"task_id": task_id, "status": result.status}
if result.status == "PROGRESS":
response["progress"] = result.info
elif result.status == "SUCCESS":
response["result"] = result.result
elif result.status == "FAILURE":
response["error"] = str(result.result)
return response
Webhook Notifications
Instead of polling, let clients register a webhook URL to receive notifications when tasks complete:
import httpx
async def notify_webhook(
webhook_url: str, task_id: str, result: dict
):
async with httpx.AsyncClient() as client:
await client.post(
webhook_url,
json={
"task_id": task_id,
"status": "completed",
"result": result,
},
timeout=10.0,
)
@router.post("/research", status_code=202)
async def start_research(
request: ResearchRequest,
background_tasks: BackgroundTasks,
):
task_id = str(uuid.uuid4())
background_tasks.add_task(
run_and_notify,
task_id,
request.query,
request.webhook_url,
)
return {"task_id": task_id}
FAQ
When should I use BackgroundTasks versus Celery?
Use FastAPI BackgroundTasks for fire-and-forget operations that take under 30 seconds and where losing a task on server restart is acceptable, like logging, sending notifications, or cache warming. Use Celery for anything that takes longer, needs retries, requires progress tracking, or must survive server restarts. If you are processing user-submitted documents through an LLM, that is a Celery task. If you are logging an API call, that is a BackgroundTask.
How do I prevent duplicate task submissions?
Use an idempotency key. Have clients send a unique key with each request. Before creating a new task, check if a task with that key already exists in your store. If it does, return the existing task ID instead of creating a duplicate. Store the mapping from idempotency key to task ID in Redis with a TTL matching your task retention period.
Can background tasks access FastAPI dependencies?
FastAPI BackgroundTasks functions do not have access to the dependency injection system. Dependencies like database sessions from Depends(get_db) are closed before the background task runs. You must create new database sessions and clients inside the background task function itself, or pass the necessary data as plain arguments rather than injected dependencies.
#FastAPI #BackgroundTasks #Celery #AIAgents #AsyncProcessing #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.