Building an AI Agent with Tool-Use Chains: Sequential Tool Orchestration for Complex Tasks
Learn how to build AI agents that chain multiple tools together sequentially, passing intermediate results through dependency graphs while handling errors gracefully across the entire pipeline.
Why Tool Chaining Changes Everything
Most AI agent tutorials show a single tool call: the model decides to call a function, gets a result, and responds. Real-world tasks are rarely that simple. A user who asks "find the top 3 competitors for Acme Corp and draft an outreach email for each" requires your agent to chain together a web search tool, a data extraction tool, a company analysis tool, and an email drafting tool — each depending on results from the previous step.
Tool-use chains transform agents from single-step assistants into multi-step reasoning engines. The key challenge is managing the flow of intermediate results, handling partial failures, and keeping the entire chain observable.
The Architecture of a Tool Chain
A tool chain is a directed acyclic graph (DAG) where each node is a tool invocation and edges represent data dependencies. The simplest chain is linear — tool A feeds tool B feeds tool C. More complex chains fan out and converge.
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable
import asyncio
@dataclass
class ToolNode:
name: str
fn: Callable[..., Awaitable[Any]]
depends_on: list[str] = field(default_factory=list)
result: Any = None
error: str | None = None
class ToolChain:
def __init__(self):
self.nodes: dict[str, ToolNode] = {}
def add(self, name: str, fn: Callable, depends_on: list[str] = None):
self.nodes[name] = ToolNode(
name=name, fn=fn, depends_on=depends_on or []
)
async def execute(self) -> dict[str, Any]:
completed: set[str] = set()
results: dict[str, Any] = {}
while len(completed) < len(self.nodes):
ready = [
n for n in self.nodes.values()
if n.name not in completed
and all(d in completed for d in n.depends_on)
]
if not ready:
raise RuntimeError("Circular dependency detected")
tasks = []
for node in ready:
dep_results = {d: results[d] for d in node.depends_on}
tasks.append(self._run_node(node, dep_results))
await asyncio.gather(*tasks)
for node in ready:
completed.add(node.name)
results[node.name] = node.result
return results
async def _run_node(self, node: ToolNode, deps: dict):
try:
node.result = await node.fn(deps)
except Exception as e:
node.error = str(e)
node.result = None
This executor resolves dependencies automatically, runs independent nodes in parallel, and captures errors per-node without crashing the entire chain.
Defining Tools with Dependencies
Each tool is an async function that receives its upstream dependencies as a dictionary. Here is a practical example — researching a company and generating a report.
async def search_company(deps: dict) -> dict:
"""Step 1: Search for company information."""
# In production, call a search API
return {
"name": "Acme Corp",
"industry": "SaaS",
"revenue": "$50M",
"employees": 200,
}
async def find_competitors(deps: dict) -> list[dict]:
"""Step 2: Find competitors based on company data."""
company = deps["search_company"]
# Use company industry and size to find competitors
return [
{"name": "Beta Inc", "overlap": "high"},
{"name": "Gamma Ltd", "overlap": "medium"},
]
async def draft_emails(deps: dict) -> list[str]:
"""Step 3: Draft outreach emails for each competitor."""
competitors = deps["find_competitors"]
company = deps["search_company"]
emails = []
for comp in competitors:
emails.append(
f"Subject: Partnership with {company['name']}\n"
f"Hi {comp['name']} team..."
)
return emails
Wiring the Chain and Running It
async def main():
chain = ToolChain()
chain.add("search_company", search_company)
chain.add("find_competitors", find_competitors, depends_on=["search_company"])
chain.add("draft_emails", draft_emails, depends_on=["find_competitors", "search_company"])
results = await chain.execute()
for email in results["draft_emails"]:
print(email)
asyncio.run(main())
The chain executor sees that search_company has no dependencies and runs it first. Then find_competitors becomes ready. Finally draft_emails runs once both of its dependencies are satisfied.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Error Propagation Strategies
When a mid-chain tool fails, you have three options: fail the entire chain, skip downstream nodes, or substitute a fallback. A robust pattern is to mark failed nodes and let downstream tools decide.
async def _run_node(self, node: ToolNode, deps: dict):
# Check if any dependency failed
failed_deps = [d for d in node.depends_on if self.nodes[d].error]
if failed_deps:
node.error = f"Skipped: upstream failures in {failed_deps}"
return
try:
node.result = await node.fn(deps)
except Exception as e:
node.error = str(e)
node.result = None
This cascade-skip approach prevents wasted compute on tools that cannot succeed, while preserving partial results from branches that did complete.
Integrating with an LLM Agent Loop
The tool chain becomes powerful when the LLM itself decides which chain to invoke. You register the chain as a single meta-tool that the agent can call.
from agents import Agent, function_tool
@function_tool
async def competitor_research(company_name: str) -> str:
"""Research a company's competitors and draft outreach emails."""
chain = ToolChain()
chain.add("search", search_company)
chain.add("competitors", find_competitors, depends_on=["search"])
chain.add("emails", draft_emails, depends_on=["competitors", "search"])
results = await chain.execute()
return str(results["emails"])
agent = Agent(
name="Research Agent",
instructions="You help users research companies and their competitors.",
tools=[competitor_research],
)
The agent sees one tool but behind it runs an entire dependency-resolved pipeline.
FAQ
How do tool chains differ from simple sequential tool calls?
Simple sequential calls execute tools one after another in a fixed order. Tool chains model explicit data dependencies, enabling parallel execution of independent branches, automatic error propagation, and dynamic reordering. A chain with five tools where two are independent can run those two simultaneously, cutting total latency.
How should I handle timeouts in long-running chains?
Wrap each node execution with asyncio.wait_for() and a per-tool timeout. When a tool times out, treat it the same as an error — mark the node as failed and let downstream skip or fallback logic handle it. Additionally, set a global timeout on the entire chain to enforce an upper bound on total execution time.
Can the LLM modify the chain dynamically at runtime?
Yes. You can give the agent a planning tool that returns a chain specification (list of tools and dependencies), then a second tool that executes that specification. This lets the LLM reason about which tools to include before committing to execution.
#ToolChaining #AgenticAI #AIAgents #PythonAsync #Orchestration #DependencyGraphs #ToolUse #LLMTools
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.