Inngest for AI Agent Functions: Event-Driven Serverless Agent Workflows
Learn how to build event-driven AI agent workflows with Inngest. Covers event triggers, step functions, automatic retries, fan-out patterns, and rate limiting for production agent systems.
Why Inngest for AI Agent Workflows
Inngest takes a unique approach to workflow orchestration: event-driven, serverless, and step-based. Instead of managing workers, queues, and schedulers, you define functions that respond to events. Each function is composed of steps — individually retryable, checkpointed units of work that Inngest manages automatically.
This model is particularly well-suited for AI agents because it eliminates the infrastructure overhead while providing the durability guarantees that long-running LLM workflows need. You write your agent logic as a series of steps, deploy it to any Python server, and Inngest handles retries, concurrency, rate limiting, and fan-out.
Setting Up Inngest with Python
pip install inngest
Initialize the Inngest client and create your first function:
import inngest
import httpx
# Initialize the client
client = inngest.Inngest(
app_id="ai-agent-platform",
event_key="your-event-key",
)
Defining Step Functions
Inngest functions are composed of steps. Each step is independently retryable — if step 3 fails, Inngest retries only step 3, not the entire function.
@client.create_function(
fn_id="research-agent",
trigger=inngest.TriggerEvent(event="agent/research.requested"),
retries=3,
)
async def research_agent(
ctx: inngest.Context,
step: inngest.Step,
) -> dict:
query = ctx.event.data["query"]
user_id = ctx.event.data["user_id"]
# Step 1: Plan the research
plan = await step.run(
"plan-research",
lambda: call_planning_llm(query),
)
# Step 2: Gather sources
sources = await step.run(
"gather-sources",
lambda: search_knowledge_base(plan["search_queries"]),
)
# Step 3: Synthesize answer
answer = await step.run(
"synthesize",
lambda: call_synthesis_llm(query, sources),
)
# Step 4: Store result
await step.run(
"store-result",
lambda: save_to_database(user_id, query, answer),
)
return {"answer": answer, "source_count": len(sources)}
async def call_planning_llm(query: str) -> dict:
async with httpx.AsyncClient(timeout=60) as http:
response = await http.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"model": "gpt-4",
"messages": [
{
"role": "system",
"content": "Generate 3 search queries for research.",
},
{"role": "user", "content": query},
],
"response_format": {"type": "json_object"},
},
)
return response.json()["choices"][0]["message"]["content"]
Fan-Out Patterns
Fan-out lets you execute multiple sub-tasks in parallel, then collect results. This is ideal for agents that need to process multiple data sources simultaneously.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
@client.create_function(
fn_id="multi-source-agent",
trigger=inngest.TriggerEvent(event="agent/multi-source.requested"),
)
async def multi_source_agent(
ctx: inngest.Context,
step: inngest.Step,
) -> dict:
sources = ctx.event.data["sources"]
# Fan out: send an event for each source
events = [
inngest.Event(
name="agent/source.process",
data={"source": source, "parent_id": ctx.event.id},
)
for source in sources
]
await step.send_event("fan-out-sources", events)
# Wait for all sub-tasks to complete
results = await step.wait_for_event(
"collect-results",
event="agent/source.completed",
match="data.parent_id",
timeout="10m",
)
# Synthesize all results
synthesis = await step.run(
"synthesize-all",
lambda: synthesize_sources(results),
)
return {"synthesis": synthesis}
Rate Limiting and Concurrency Control
AI agents often interact with rate-limited APIs. Inngest provides built-in rate limiting and concurrency controls.
@client.create_function(
fn_id="rate-limited-agent",
trigger=inngest.TriggerEvent(event="agent/process.requested"),
rate_limit=inngest.RateLimit(
limit=10,
period="1m", # Max 10 executions per minute
),
concurrency=[
inngest.Concurrency(
limit=5, # Max 5 concurrent executions
scope="environment",
),
],
throttle=inngest.Throttle(
limit=100,
period="1h",
burst=20,
),
)
async def rate_limited_agent(
ctx: inngest.Context,
step: inngest.Step,
) -> dict:
result = await step.run(
"call-llm",
lambda: call_llm(ctx.event.data["prompt"]),
)
return {"result": result}
Triggering Events
Send events to trigger agent functions from anywhere:
# From your API endpoint
async def handle_request(query: str, user_id: str):
await client.send(
inngest.Event(
name="agent/research.requested",
data={
"query": query,
"user_id": user_id,
"priority": "high",
},
)
)
return {"status": "processing"}
Serving with FastAPI
from fastapi import FastAPI
import inngest.fast_api
app = FastAPI()
inngest.fast_api.serve(
app,
client,
[research_agent, multi_source_agent, rate_limited_agent],
)
Inngest connects to your server, discovers your functions, and manages execution. You deploy your code as a normal web server — no separate worker processes needed.
Scheduled Agent Runs
@client.create_function(
fn_id="daily-digest-agent",
trigger=inngest.TriggerCron(cron="0 8 * * *"), # 8 AM daily
)
async def daily_digest(
ctx: inngest.Context,
step: inngest.Step,
) -> dict:
news = await step.run("fetch-news", fetch_latest_news)
digest = await step.run("generate-digest", lambda: summarize(news))
await step.run("send-digest", lambda: send_email(digest))
return {"status": "sent"}
FAQ
How does Inngest differ from a traditional message queue like RabbitMQ?
Inngest is a higher-level abstraction. With RabbitMQ, you manage queues, consumers, acknowledgments, dead-letter routing, and retry logic yourself. Inngest handles all of that automatically. You define functions with steps, and Inngest manages the execution lifecycle including retries, concurrency, rate limiting, and observability.
What happens if my server goes down during a function execution?
Inngest checkpoints after each completed step. When your server comes back online, Inngest resumes the function from the last completed step. You do not lose progress, and completed steps are not re-executed.
Can I use Inngest with my existing FastAPI or Flask application?
Yes. Inngest provides middleware for FastAPI, Flask, and Django. You add the middleware to your existing app and define functions in the same codebase. No separate worker deployment needed — Inngest calls your server to execute each step.
#Inngest #EventDriven #Serverless #AIAgents #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.