Skip to content
Learn Agentic AI13 min read0 views

Inngest for AI Agent Functions: Event-Driven Serverless Agent Workflows

Learn how to build event-driven AI agent workflows with Inngest. Covers event triggers, step functions, automatic retries, fan-out patterns, and rate limiting for production agent systems.

Why Inngest for AI Agent Workflows

Inngest takes a unique approach to workflow orchestration: event-driven, serverless, and step-based. Instead of managing workers, queues, and schedulers, you define functions that respond to events. Each function is composed of steps — individually retryable, checkpointed units of work that Inngest manages automatically.

This model is particularly well-suited for AI agents because it eliminates the infrastructure overhead while providing the durability guarantees that long-running LLM workflows need. You write your agent logic as a series of steps, deploy it to any Python server, and Inngest handles retries, concurrency, rate limiting, and fan-out.

Setting Up Inngest with Python

pip install inngest

Initialize the Inngest client and create your first function:

import inngest
import httpx

# Initialize the client
client = inngest.Inngest(
    app_id="ai-agent-platform",
    event_key="your-event-key",
)

Defining Step Functions

Inngest functions are composed of steps. Each step is independently retryable — if step 3 fails, Inngest retries only step 3, not the entire function.

@client.create_function(
    fn_id="research-agent",
    trigger=inngest.TriggerEvent(event="agent/research.requested"),
    retries=3,
)
async def research_agent(
    ctx: inngest.Context,
    step: inngest.Step,
) -> dict:
    query = ctx.event.data["query"]
    user_id = ctx.event.data["user_id"]

    # Step 1: Plan the research
    plan = await step.run(
        "plan-research",
        lambda: call_planning_llm(query),
    )

    # Step 2: Gather sources
    sources = await step.run(
        "gather-sources",
        lambda: search_knowledge_base(plan["search_queries"]),
    )

    # Step 3: Synthesize answer
    answer = await step.run(
        "synthesize",
        lambda: call_synthesis_llm(query, sources),
    )

    # Step 4: Store result
    await step.run(
        "store-result",
        lambda: save_to_database(user_id, query, answer),
    )

    return {"answer": answer, "source_count": len(sources)}


async def call_planning_llm(query: str) -> dict:
    async with httpx.AsyncClient(timeout=60) as http:
        response = await http.post(
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={
                "model": "gpt-4",
                "messages": [
                    {
                        "role": "system",
                        "content": "Generate 3 search queries for research.",
                    },
                    {"role": "user", "content": query},
                ],
                "response_format": {"type": "json_object"},
            },
        )
        return response.json()["choices"][0]["message"]["content"]

Fan-Out Patterns

Fan-out lets you execute multiple sub-tasks in parallel, then collect results. This is ideal for agents that need to process multiple data sources simultaneously.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

@client.create_function(
    fn_id="multi-source-agent",
    trigger=inngest.TriggerEvent(event="agent/multi-source.requested"),
)
async def multi_source_agent(
    ctx: inngest.Context,
    step: inngest.Step,
) -> dict:
    sources = ctx.event.data["sources"]

    # Fan out: send an event for each source
    events = [
        inngest.Event(
            name="agent/source.process",
            data={"source": source, "parent_id": ctx.event.id},
        )
        for source in sources
    ]
    await step.send_event("fan-out-sources", events)

    # Wait for all sub-tasks to complete
    results = await step.wait_for_event(
        "collect-results",
        event="agent/source.completed",
        match="data.parent_id",
        timeout="10m",
    )

    # Synthesize all results
    synthesis = await step.run(
        "synthesize-all",
        lambda: synthesize_sources(results),
    )
    return {"synthesis": synthesis}

Rate Limiting and Concurrency Control

AI agents often interact with rate-limited APIs. Inngest provides built-in rate limiting and concurrency controls.

@client.create_function(
    fn_id="rate-limited-agent",
    trigger=inngest.TriggerEvent(event="agent/process.requested"),
    rate_limit=inngest.RateLimit(
        limit=10,
        period="1m",  # Max 10 executions per minute
    ),
    concurrency=[
        inngest.Concurrency(
            limit=5,  # Max 5 concurrent executions
            scope="environment",
        ),
    ],
    throttle=inngest.Throttle(
        limit=100,
        period="1h",
        burst=20,
    ),
)
async def rate_limited_agent(
    ctx: inngest.Context,
    step: inngest.Step,
) -> dict:
    result = await step.run(
        "call-llm",
        lambda: call_llm(ctx.event.data["prompt"]),
    )
    return {"result": result}

Triggering Events

Send events to trigger agent functions from anywhere:

# From your API endpoint
async def handle_request(query: str, user_id: str):
    await client.send(
        inngest.Event(
            name="agent/research.requested",
            data={
                "query": query,
                "user_id": user_id,
                "priority": "high",
            },
        )
    )
    return {"status": "processing"}

Serving with FastAPI

from fastapi import FastAPI
import inngest.fast_api

app = FastAPI()

inngest.fast_api.serve(
    app,
    client,
    [research_agent, multi_source_agent, rate_limited_agent],
)

Inngest connects to your server, discovers your functions, and manages execution. You deploy your code as a normal web server — no separate worker processes needed.

Scheduled Agent Runs

@client.create_function(
    fn_id="daily-digest-agent",
    trigger=inngest.TriggerCron(cron="0 8 * * *"),  # 8 AM daily
)
async def daily_digest(
    ctx: inngest.Context,
    step: inngest.Step,
) -> dict:
    news = await step.run("fetch-news", fetch_latest_news)
    digest = await step.run("generate-digest", lambda: summarize(news))
    await step.run("send-digest", lambda: send_email(digest))
    return {"status": "sent"}

FAQ

How does Inngest differ from a traditional message queue like RabbitMQ?

Inngest is a higher-level abstraction. With RabbitMQ, you manage queues, consumers, acknowledgments, dead-letter routing, and retry logic yourself. Inngest handles all of that automatically. You define functions with steps, and Inngest manages the execution lifecycle including retries, concurrency, rate limiting, and observability.

What happens if my server goes down during a function execution?

Inngest checkpoints after each completed step. When your server comes back online, Inngest resumes the function from the last completed step. You do not lose progress, and completed steps are not re-executed.

Can I use Inngest with my existing FastAPI or Flask application?

Yes. Inngest provides middleware for FastAPI, Flask, and Django. You add the middleware to your existing app and define functions in the same codebase. No separate worker deployment needed — Inngest calls your server to execute each step.


#Inngest #EventDriven #Serverless #AIAgents #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.