Parallel Fan-Out Fan-In Patterns: Processing Multiple Sub-Tasks Simultaneously
Implement fan-out fan-in patterns for AI agents to distribute work across parallel sub-tasks, aggregate results, handle partial failures gracefully, and enforce timeouts on straggler tasks.
The Fan-Out Fan-In Pattern
Many agent tasks naturally decompose into independent sub-tasks. A research agent might need to search five databases simultaneously. A code review agent might analyze ten files in parallel. A customer support agent might check order status, payment history, and shipping details all at once.
The fan-out fan-in pattern distributes work across multiple concurrent sub-tasks (fan-out) and then collects and merges the results (fan-in). This pattern dramatically reduces total execution time — instead of running N tasks sequentially in N * T seconds, you run them in parallel in roughly T seconds.
Basic Fan-Out with asyncio.gather
The simplest implementation uses asyncio.gather:
import asyncio
from typing import Any, Callable, Awaitable
async def fan_out_gather(
tasks: list[Callable[[], Awaitable[Any]]],
) -> list[Any]:
"""Run all tasks in parallel and collect results."""
return await asyncio.gather(*[task() for task in tasks])
# Example: search multiple sources in parallel
async def search_arxiv(query: str) -> dict:
await asyncio.sleep(1) # Simulate API call
return {"source": "arxiv", "results": ["paper1", "paper2"]}
async def search_scholar(query: str) -> dict:
await asyncio.sleep(1.5)
return {"source": "scholar", "results": ["paper3"]}
async def search_semantic(query: str) -> dict:
await asyncio.sleep(0.8)
return {"source": "semantic_scholar", "results": ["paper4", "paper5"]}
query = "agentic AI workflows"
results = await fan_out_gather([
lambda: search_arxiv(query),
lambda: search_scholar(query),
lambda: search_semantic(query),
])
# All three searches complete in ~1.5s (the slowest) instead of ~3.3s
The problem with plain gather is that one failed task raises an exception and cancels everything. Production systems need better error handling.
Robust Fan-Out with Partial Failure Handling
Use asyncio.gather(return_exceptions=True) or a custom wrapper to handle individual task failures without aborting the entire batch:
from dataclasses import dataclass
from typing import TypeVar, Generic
T = TypeVar("T")
@dataclass
class TaskResult(Generic[T]):
task_id: str
success: bool
result: T | None = None
error: str | None = None
duration_ms: float = 0.0
async def robust_fan_out(
tasks: dict[str, Callable[[], Awaitable[Any]]],
timeout: float | None = None,
) -> dict[str, TaskResult]:
"""Fan-out with per-task error handling and optional timeout."""
import time
async def wrapped(task_id: str, fn: Callable) -> TaskResult:
start = time.monotonic()
try:
result = await fn()
elapsed = (time.monotonic() - start) * 1000
return TaskResult(task_id, True, result, duration_ms=elapsed)
except Exception as e:
elapsed = (time.monotonic() - start) * 1000
return TaskResult(task_id, False, error=str(e), duration_ms=elapsed)
coros = [wrapped(tid, fn) for tid, fn in tasks.items()]
if timeout:
done, pending = await asyncio.wait(
[asyncio.create_task(c) for c in coros],
timeout=timeout,
)
# Cancel timed-out tasks
for task in pending:
task.cancel()
results = {}
for task in done:
r = task.result()
results[r.task_id] = r
for task in pending:
# Mark timed-out tasks
results[f"timeout_{id(task)}"] = TaskResult(
"unknown", False, error="Task timed out"
)
return results
else:
raw_results = await asyncio.gather(*coros)
return {r.task_id: r for r in raw_results}
Now individual task failures are captured in the TaskResult without crashing the entire operation.
The Fan-In Aggregator
After fan-out completes, the fan-in stage merges partial results into a coherent output:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
class ResultAggregator:
"""Merge results from parallel sub-tasks."""
def __init__(self, min_success_ratio: float = 0.5):
self.min_success_ratio = min_success_ratio
def aggregate(
self, results: dict[str, TaskResult]
) -> dict[str, Any]:
successful = {
tid: r for tid, r in results.items() if r.success
}
failed = {
tid: r for tid, r in results.items() if not r.success
}
total = len(results)
success_count = len(successful)
success_ratio = success_count / total if total else 0
if success_ratio < self.min_success_ratio:
raise InsufficientResultsError(
f"Only {success_count}/{total} tasks succeeded "
f"({success_ratio:.0%}), minimum is {self.min_success_ratio:.0%}"
)
return {
"merged_results": [r.result for r in successful.values()],
"success_count": success_count,
"failure_count": len(failed),
"failed_tasks": {
tid: r.error for tid, r in failed.items()
},
"total_duration_ms": max(
r.duration_ms for r in results.values()
),
}
The min_success_ratio parameter controls how many tasks must succeed before the aggregated result is considered valid. For a research agent querying five sources, maybe three out of five is acceptable. For a financial reconciliation, you might need all tasks to succeed.
Bounded Concurrency with Semaphores
Unbounded fan-out can overwhelm downstream services. Use a semaphore to limit concurrency:
async def bounded_fan_out(
tasks: dict[str, Callable[[], Awaitable[Any]]],
max_concurrency: int = 5,
) -> dict[str, TaskResult]:
"""Fan-out with bounded concurrency."""
semaphore = asyncio.Semaphore(max_concurrency)
import time
async def limited(task_id: str, fn: Callable) -> TaskResult:
async with semaphore:
start = time.monotonic()
try:
result = await fn()
elapsed = (time.monotonic() - start) * 1000
return TaskResult(task_id, True, result, duration_ms=elapsed)
except Exception as e:
elapsed = (time.monotonic() - start) * 1000
return TaskResult(task_id, False, error=str(e), duration_ms=elapsed)
coros = [limited(tid, fn) for tid, fn in tasks.items()]
raw_results = await asyncio.gather(*coros)
return {r.task_id: r for r in raw_results}
With max_concurrency=5, at most five tasks run simultaneously even if you fan out to fifty sub-tasks.
Complete Agent Example: Multi-Source Research
Putting the pattern together for a research agent that queries multiple sources, handles failures, and synthesizes results:
class ResearchAgent:
def __init__(self, llm_client, sources: dict):
self.llm = llm_client
self.sources = sources
self.aggregator = ResultAggregator(min_success_ratio=0.4)
async def research(self, query: str) -> str:
# Fan-out: query all sources in parallel
tasks = {
name: lambda n=name: self._search_source(n, query)
for name in self.sources
}
results = await bounded_fan_out(tasks, max_concurrency=3)
# Fan-in: aggregate results
aggregated = self.aggregator.aggregate(results)
# Synthesize with LLM
return await self._synthesize(query, aggregated)
async def _search_source(self, source_name: str, query: str) -> list:
search_fn = self.sources[source_name]
return await search_fn(query)
async def _synthesize(self, query: str, aggregated: dict) -> str:
all_results = []
for source_results in aggregated["merged_results"]:
all_results.extend(source_results)
prompt = f"""Synthesize these research results for the query: {query}
Results from {aggregated['success_count']} sources:
{json.dumps(all_results, indent=2)}
Note: {aggregated['failure_count']} sources failed and are excluded.
Provide a comprehensive summary."""
response = await self.llm.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
FAQ
How do I decide the right max_concurrency value?
Start with the most restrictive downstream limit. If you are calling an API with a rate limit of 10 requests per second, set max_concurrency to 10 or lower. If you are calling multiple different APIs, each with its own limit, use separate semaphores per API. For LLM APIs, check your tier's rate limit (requests per minute) and set concurrency accordingly. Monitor for 429 (rate limit) errors and adjust down if they appear.
Should I use asyncio.gather, asyncio.TaskGroup, or asyncio.wait?
Use gather with return_exceptions=True for the simplest case where you want all results including errors. Use TaskGroup (Python 3.11 and later) when you want structured concurrency with automatic cleanup — if one task fails, all others are cancelled. Use wait when you need fine-grained control over timeouts or want to process results as they complete rather than waiting for all tasks.
What happens if the slowest sub-task takes much longer than the others?
This is the "straggler" problem. Set a timeout on the entire fan-out operation. When the timeout fires, cancel the straggler and proceed with the results you have. The aggregator checks whether enough tasks succeeded to produce a meaningful result. For research tasks, getting four out of five sources in two seconds is often better than waiting 30 seconds for the fifth source.
#FanOutFanIn #ParallelProcessing #Concurrency #Asyncio #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.