DAG-Based Agent Workflows: Directed Acyclic Graphs for Complex Task Orchestration
Learn how to model complex agent workflows as directed acyclic graphs with dependency resolution, parallel execution of independent tasks, and topological sorting for correct execution order.
Why DAGs Matter for Agent Workflows
When an AI agent must complete a complex task — say, generating a market analysis report — it faces dozens of sub-tasks with intricate dependencies. Fetching competitor data must happen before the comparison analysis. Sentiment analysis can run in parallel with financial data retrieval. The final summary depends on all preceding steps.
A directed acyclic graph (DAG) is the natural data structure for this problem. Each node represents a task, each edge represents a dependency, and the acyclic constraint guarantees no circular dependencies that would cause infinite loops.
Modeling Tasks as a DAG
Start by defining tasks with explicit dependencies:
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable
@dataclass
class AgentTask:
"""A single unit of work in the agent workflow."""
task_id: str
name: str
execute: Callable[..., Awaitable[Any]]
dependencies: list[str] = field(default_factory=list)
result: Any = None
status: str = "pending" # pending, running, completed, failed
class WorkflowDAG:
"""DAG-based workflow engine for agent tasks."""
def __init__(self):
self.tasks: dict[str, AgentTask] = {}
def add_task(self, task: AgentTask):
self.tasks[task.task_id] = task
def validate(self) -> bool:
"""Ensure the graph is acyclic using DFS cycle detection."""
visited = set()
in_stack = set()
def dfs(task_id: str) -> bool:
visited.add(task_id)
in_stack.add(task_id)
for dep_id in self.tasks[task_id].dependencies:
if dep_id not in visited:
if not dfs(dep_id):
return False
elif dep_id in in_stack:
return False # Cycle detected
in_stack.discard(task_id)
return True
for tid in self.tasks:
if tid not in visited:
if not dfs(tid):
return False
return True
The validate method uses depth-first search to detect cycles. If any back edge is found during traversal, the graph is invalid.
Topological Sort for Execution Order
Topological sorting produces a linear ordering of tasks where every dependency appears before its dependent. This is essential for determining which tasks can run at each stage:
from collections import deque
def topological_sort(dag: WorkflowDAG) -> list[list[str]]:
"""Return tasks grouped into levels for parallel execution."""
in_degree = {tid: 0 for tid in dag.tasks}
for task in dag.tasks.values():
for dep in task.dependencies:
in_degree[task.task_id] += 1
# Level 0: tasks with no dependencies
queue = deque(
[tid for tid, degree in in_degree.items() if degree == 0]
)
levels = []
while queue:
current_level = list(queue)
levels.append(current_level)
next_queue = deque()
for tid in current_level:
# Reduce in-degree for dependents
for other_id, other_task in dag.tasks.items():
if tid in other_task.dependencies:
in_degree[other_id] -= 1
if in_degree[other_id] == 0:
next_queue.append(other_id)
queue = next_queue
return levels
Each level in the output contains tasks that can execute simultaneously because all their dependencies are satisfied by prior levels.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Parallel Execution Engine
With levels computed, executing the DAG becomes straightforward with asyncio:
import asyncio
async def execute_dag(dag: WorkflowDAG):
"""Execute all tasks respecting dependencies, parallelizing where possible."""
if not dag.validate():
raise ValueError("Workflow contains cycles")
levels = topological_sort(dag)
results = {}
for level in levels:
# Run all tasks in this level concurrently
coros = []
for task_id in level:
task = dag.tasks[task_id]
dep_results = {
d: results[d] for d in task.dependencies
}
coros.append(run_task(task, dep_results, results))
await asyncio.gather(*coros)
async def run_task(
task: AgentTask,
dep_results: dict,
all_results: dict,
):
task.status = "running"
try:
task.result = await task.execute(dep_results)
task.status = "completed"
all_results[task.task_id] = task.result
except Exception as e:
task.status = "failed"
raise RuntimeError(f"Task {task.task_id} failed: {e}")
Practical Example: Research Report Pipeline
Here is a complete pipeline that uses the DAG engine to generate a research report:
async def fetch_market_data(deps):
return {"revenue": 1_200_000, "growth": 0.15}
async def fetch_competitor_data(deps):
return [{"name": "CompA", "share": 0.3}]
async def analyze_trends(deps):
market = deps["market_data"]
return f"Growth rate: {market['growth'] * 100}%"
async def generate_report(deps):
trends = deps["trend_analysis"]
competitors = deps["competitor_data"]
return f"Report based on {trends} and {len(competitors)} competitors"
# Build the DAG
dag = WorkflowDAG()
dag.add_task(AgentTask("market_data", "Fetch Market Data", fetch_market_data))
dag.add_task(AgentTask("competitor_data", "Fetch Competitors", fetch_competitor_data))
dag.add_task(AgentTask(
"trend_analysis", "Analyze Trends", analyze_trends,
dependencies=["market_data"],
))
dag.add_task(AgentTask(
"report", "Generate Report", generate_report,
dependencies=["trend_analysis", "competitor_data"],
))
asyncio.run(execute_dag(dag))
In this example, market_data and competitor_data run in parallel (level 0). trend_analysis runs next (level 1), and the final report runs last (level 2).
FAQ
When should I use a DAG workflow instead of a simple sequential pipeline?
Use a DAG when your workflow has tasks with independent branches that can benefit from parallel execution, or when the dependency structure is complex enough that a linear sequence would either waste time waiting or execute things in the wrong order. For simple three-step pipelines, sequential execution is fine.
How do I handle failures in a DAG where downstream tasks depend on the failed task?
Implement a failure propagation strategy. When a task fails, mark all its transitive dependents as "skipped" rather than attempting them. You can also add retry logic at the task level, and only propagate the failure after retries are exhausted. The key is to never run a task whose dependencies have not all completed successfully.
Can I dynamically add tasks to a DAG during execution?
Yes, but it requires careful design. After a task completes, it can register new tasks into the DAG as long as they do not create cycles. Re-run the topological sort for remaining tasks and continue execution. This pattern is common when an agent discovers sub-tasks it could not predict upfront.
#DAG #WorkflowOrchestration #ParallelExecution #AgenticAI #Python #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.