Skip to content
Learn Agentic AI13 min read0 views

Prefect for AI Agent Pipelines: Modern Python Workflow Orchestration

Learn how to build AI agent pipelines with Prefect. Covers flow definition, task decorators, deployments, scheduling, and real-time monitoring for agent workloads.

Why Prefect Fits AI Agent Workloads

Prefect takes a Python-native approach to workflow orchestration. Unlike systems that require you to learn a new DSL or configuration language, Prefect lets you turn any Python function into a tracked, retryable, observable workflow step by adding a single decorator. For AI engineers already writing agent logic in Python, this means near-zero friction to go from a working script to a production pipeline.

Prefect 3.x introduced native async support, improved caching, and a completely redesigned task runner — all features that align well with the async, IO-heavy nature of AI agent workloads.

Setting Up Prefect

pip install prefect
prefect server start  # Local server with UI at http://localhost:4200

Defining Flows and Tasks

A Prefect flow is the top-level orchestration function. Tasks are individual units of work within a flow that get their own retry logic, caching, and observability.

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx

@task(
    retries=3,
    retry_delay_seconds=[10, 30, 60],
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
    log_prints=True,
)
async def call_llm(prompt: str, model: str = "gpt-4") -> str:
    """Call an LLM with automatic retries and response caching."""
    async with httpx.AsyncClient(timeout=90) as client:
        response = await client.post(
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={
                "model": model,
                "messages": [{"role": "user", "content": prompt}],
                "temperature": 0.0,
            },
        )
        response.raise_for_status()
        result = response.json()["choices"][0]["message"]["content"]
        print(f"LLM returned {len(result)} chars")
        return result

@task(retries=2, retry_delay_seconds=5)
async def fetch_context(query: str) -> list[dict]:
    """Retrieve relevant documents from a vector store."""
    async with httpx.AsyncClient(timeout=30) as client:
        response = await client.post(
            "http://localhost:8000/search",
            json={"query": query, "top_k": 5},
        )
        response.raise_for_status()
        return response.json()["results"]

@task
async def format_report(answer: str, sources: list[dict]) -> str:
    """Format the agent output as a structured report."""
    source_list = "\n".join(
        f"- {s['title']}: {s['snippet']}" for s in sources
    )
    return f"## Answer\n\n{answer}\n\n## Sources\n\n{source_list}"

Building the Agent Flow

@flow(
    name="research-agent",
    description="Multi-step research agent with RAG",
    log_prints=True,
    timeout_seconds=600,
)
async def research_agent_flow(query: str) -> str:
    # Step 1: Retrieve context
    context = await fetch_context(query)
    print(f"Retrieved {len(context)} context documents")

    # Step 2: Build prompt with context
    context_text = "\n".join(
        f"[{c['title']}]: {c['snippet']}" for c in context
    )
    prompt = (
        f"Answer this question using the provided context.\n\n"
        f"Question: {query}\n\nContext:\n{context_text}"
    )

    # Step 3: Generate answer
    answer = await call_llm(prompt)

    # Step 4: Format and return
    report = await format_report(answer, context)
    return report

# Run locally
if __name__ == "__main__":
    import asyncio
    result = asyncio.run(
        research_agent_flow("What is retrieval-augmented generation?")
    )
    print(result)

Deploying with Schedules

Prefect deployments let you trigger flows on schedules, via API, or from events.

from prefect import flow
from prefect.runner import serve

async def deploy():
    research_deployment = await research_agent_flow.to_deployment(
        name="scheduled-research",
        cron="0 */6 * * *",  # Every 6 hours
        parameters={"query": "latest AI agent frameworks"},
        tags=["research", "production"],
    )

    await serve(research_deployment)

Parallel Task Execution

Prefect supports concurrent task execution for agent steps that are independent.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from prefect import flow, task
import asyncio

@flow
async def multi_query_agent(queries: list[str]) -> list[str]:
    """Run multiple research queries in parallel."""
    tasks = [call_llm(q) for q in queries]
    results = await asyncio.gather(*tasks)
    return list(results)

Monitoring in the Prefect UI

Prefect provides a built-in dashboard at http://localhost:4200 showing flow runs, task states, logs, and timing. Each task run displays its status (Completed, Failed, Retrying, Cached), duration, and any logged output. You can filter by flow name, deployment, or tags.

For programmatic monitoring, query the Prefect API:

from prefect.client.orchestration import get_client

async def check_recent_runs():
    async with get_client() as client:
        runs = await client.read_flow_runs(
            limit=10,
            sort="EXPECTED_START_TIME_DESC",
        )
        for run in runs:
            print(f"{run.name}: {run.state_name} ({run.total_run_time})")

FAQ

How does Prefect handle task failures differently from Temporal?

Prefect retries tasks within the same process by default, while Temporal dispatches activities to separate workers. Prefect is simpler to set up but does not provide the same cross-process durability. If your worker process dies, Prefect loses in-progress task state unless you configure external result storage.

Can I cache LLM responses across flow runs?

Yes. Use the cache_key_fn=task_input_hash parameter on your task decorator. Prefect hashes the task inputs and returns the cached result if the same inputs appear within the cache_expiration window. This is particularly useful for deterministic LLM calls with temperature=0.

Is Prefect Cloud required for production use?

No. Prefect runs entirely self-hosted with prefect server start. Prefect Cloud adds managed infrastructure, RBAC, automations, and push work pools, but the open-source server covers all core orchestration features.


#Prefect #WorkflowOrchestration #Python #AIPipelines #MLOps #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.