Prefect for AI Agent Pipelines: Modern Python Workflow Orchestration
Learn how to build AI agent pipelines with Prefect. Covers flow definition, task decorators, deployments, scheduling, and real-time monitoring for agent workloads.
Why Prefect Fits AI Agent Workloads
Prefect takes a Python-native approach to workflow orchestration. Unlike systems that require you to learn a new DSL or configuration language, Prefect lets you turn any Python function into a tracked, retryable, observable workflow step by adding a single decorator. For AI engineers already writing agent logic in Python, this means near-zero friction to go from a working script to a production pipeline.
Prefect 3.x introduced native async support, improved caching, and a completely redesigned task runner — all features that align well with the async, IO-heavy nature of AI agent workloads.
Setting Up Prefect
pip install prefect
prefect server start # Local server with UI at http://localhost:4200
Defining Flows and Tasks
A Prefect flow is the top-level orchestration function. Tasks are individual units of work within a flow that get their own retry logic, caching, and observability.
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import httpx
@task(
retries=3,
retry_delay_seconds=[10, 30, 60],
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
log_prints=True,
)
async def call_llm(prompt: str, model: str = "gpt-4") -> str:
"""Call an LLM with automatic retries and response caching."""
async with httpx.AsyncClient(timeout=90) as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.0,
},
)
response.raise_for_status()
result = response.json()["choices"][0]["message"]["content"]
print(f"LLM returned {len(result)} chars")
return result
@task(retries=2, retry_delay_seconds=5)
async def fetch_context(query: str) -> list[dict]:
"""Retrieve relevant documents from a vector store."""
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
"http://localhost:8000/search",
json={"query": query, "top_k": 5},
)
response.raise_for_status()
return response.json()["results"]
@task
async def format_report(answer: str, sources: list[dict]) -> str:
"""Format the agent output as a structured report."""
source_list = "\n".join(
f"- {s['title']}: {s['snippet']}" for s in sources
)
return f"## Answer\n\n{answer}\n\n## Sources\n\n{source_list}"
Building the Agent Flow
@flow(
name="research-agent",
description="Multi-step research agent with RAG",
log_prints=True,
timeout_seconds=600,
)
async def research_agent_flow(query: str) -> str:
# Step 1: Retrieve context
context = await fetch_context(query)
print(f"Retrieved {len(context)} context documents")
# Step 2: Build prompt with context
context_text = "\n".join(
f"[{c['title']}]: {c['snippet']}" for c in context
)
prompt = (
f"Answer this question using the provided context.\n\n"
f"Question: {query}\n\nContext:\n{context_text}"
)
# Step 3: Generate answer
answer = await call_llm(prompt)
# Step 4: Format and return
report = await format_report(answer, context)
return report
# Run locally
if __name__ == "__main__":
import asyncio
result = asyncio.run(
research_agent_flow("What is retrieval-augmented generation?")
)
print(result)
Deploying with Schedules
Prefect deployments let you trigger flows on schedules, via API, or from events.
from prefect import flow
from prefect.runner import serve
async def deploy():
research_deployment = await research_agent_flow.to_deployment(
name="scheduled-research",
cron="0 */6 * * *", # Every 6 hours
parameters={"query": "latest AI agent frameworks"},
tags=["research", "production"],
)
await serve(research_deployment)
Parallel Task Execution
Prefect supports concurrent task execution for agent steps that are independent.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from prefect import flow, task
import asyncio
@flow
async def multi_query_agent(queries: list[str]) -> list[str]:
"""Run multiple research queries in parallel."""
tasks = [call_llm(q) for q in queries]
results = await asyncio.gather(*tasks)
return list(results)
Monitoring in the Prefect UI
Prefect provides a built-in dashboard at http://localhost:4200 showing flow runs, task states, logs, and timing. Each task run displays its status (Completed, Failed, Retrying, Cached), duration, and any logged output. You can filter by flow name, deployment, or tags.
For programmatic monitoring, query the Prefect API:
from prefect.client.orchestration import get_client
async def check_recent_runs():
async with get_client() as client:
runs = await client.read_flow_runs(
limit=10,
sort="EXPECTED_START_TIME_DESC",
)
for run in runs:
print(f"{run.name}: {run.state_name} ({run.total_run_time})")
FAQ
How does Prefect handle task failures differently from Temporal?
Prefect retries tasks within the same process by default, while Temporal dispatches activities to separate workers. Prefect is simpler to set up but does not provide the same cross-process durability. If your worker process dies, Prefect loses in-progress task state unless you configure external result storage.
Can I cache LLM responses across flow runs?
Yes. Use the cache_key_fn=task_input_hash parameter on your task decorator. Prefect hashes the task inputs and returns the cached result if the same inputs appear within the cache_expiration window. This is particularly useful for deterministic LLM calls with temperature=0.
Is Prefect Cloud required for production use?
No. Prefect runs entirely self-hosted with prefect server start. Prefect Cloud adds managed infrastructure, RBAC, automations, and push work pools, but the open-source server covers all core orchestration features.
#Prefect #WorkflowOrchestration #Python #AIPipelines #MLOps #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.