Building a Research Synthesis Agent: Multi-Source Data Collection and Analysis
Build a research synthesis agent that collects data from diverse sources in parallel, resolves conflicting information, and produces coherent analytical summaries using structured prompting.
The Research Synthesis Challenge
Research tasks are among the hardest for AI agents. A human researcher reads multiple sources, evaluates their credibility, notices contradictions, weighs evidence, and synthesizes a coherent conclusion. A naive agent that simply concatenates search results produces shallow, often contradictory output.
A proper research synthesis agent needs four capabilities: source diversity (pulling from varied, complementary sources), parallel retrieval (efficiency), conflict resolution (handling contradictions), and structured synthesis (producing coherent analysis, not just summaries).
Source Management
Start by defining a registry of sources with metadata about their type and reliability:
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable
from enum import Enum
class SourceType(Enum):
ACADEMIC = "academic"
NEWS = "news"
DATABASE = "database"
EXPERT = "expert"
GOVERNMENT = "government"
@dataclass
class ResearchSource:
name: str
source_type: SourceType
reliability_score: float # 0.0 to 1.0
fetch_fn: Callable[[str], Awaitable[list[dict]]]
rate_limit_per_min: int = 30
description: str = ""
@dataclass
class SourceResult:
source_name: str
source_type: SourceType
reliability: float
items: list[dict] = field(default_factory=list)
error: str | None = None
class SourceRegistry:
def __init__(self):
self.sources: dict[str, ResearchSource] = {}
def register(self, source: ResearchSource):
self.sources[source.name] = source
def get_sources_for_topic(
self, topic_type: str = "general"
) -> list[ResearchSource]:
"""Return sources sorted by reliability."""
return sorted(
self.sources.values(),
key=lambda s: s.reliability_score,
reverse=True,
)
Parallel Multi-Source Retrieval
Fetch from all sources simultaneously with proper error isolation:
import asyncio
class ParallelRetriever:
def __init__(self, registry: SourceRegistry, max_concurrency: int = 5):
self.registry = registry
self.semaphore = asyncio.Semaphore(max_concurrency)
async def retrieve_all(self, query: str) -> list[SourceResult]:
sources = self.registry.get_sources_for_topic()
tasks = [self._fetch_source(source, query) for source in sources]
return await asyncio.gather(*tasks)
async def _fetch_source(
self, source: ResearchSource, query: str
) -> SourceResult:
async with self.semaphore:
try:
items = await asyncio.wait_for(
source.fetch_fn(query),
timeout=30.0,
)
return SourceResult(
source_name=source.name,
source_type=source.source_type,
reliability=source.reliability_score,
items=items,
)
except asyncio.TimeoutError:
return SourceResult(
source_name=source.name,
source_type=source.source_type,
reliability=source.reliability_score,
error="Timeout after 30 seconds",
)
except Exception as e:
return SourceResult(
source_name=source.name,
source_type=source.source_type,
reliability=source.reliability_score,
error=str(e),
)
Claim Extraction
Before synthesis, extract discrete claims from each source so they can be compared:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import json
@dataclass
class Claim:
statement: str
source_name: str
source_type: SourceType
reliability: float
supporting_evidence: str = ""
confidence: float = 0.0
class ClaimExtractor:
def __init__(self, llm_client):
self.llm = llm_client
async def extract_claims(
self, source_result: SourceResult
) -> list[Claim]:
"""Extract factual claims from a source's results."""
if source_result.error or not source_result.items:
return []
content = json.dumps(source_result.items[:10], indent=2)
prompt = f"""Extract distinct factual claims from this data.
Source: {source_result.source_name} ({source_result.source_type.value})
Data:
{content}
Return JSON array:
[
{{
"statement": "clear factual claim",
"supporting_evidence": "quote or data point",
"confidence": 0.0 to 1.0
}}
]"""
response = await self.llm.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
raw = json.loads(response.choices[0].message.content)
claims_data = raw.get("claims", raw) if isinstance(raw, dict) else raw
return [
Claim(
statement=c["statement"],
source_name=source_result.source_name,
source_type=source_result.source_type,
reliability=source_result.reliability,
supporting_evidence=c.get("supporting_evidence", ""),
confidence=c.get("confidence", 0.5),
)
for c in claims_data
]
Conflict Resolution
When sources disagree, the agent must identify the conflict and determine which claim is more credible:
@dataclass
class ConflictGroup:
topic: str
claims: list[Claim]
resolution: str = ""
resolved_claim: str = ""
class ConflictResolver:
def __init__(self, llm_client):
self.llm = llm_client
async def find_conflicts(self, all_claims: list[Claim]) -> list[ConflictGroup]:
"""Group claims by topic and identify conflicts."""
claims_text = json.dumps(
[{"id": i, "statement": c.statement, "source": c.source_name}
for i, c in enumerate(all_claims)],
indent=2,
)
prompt = f"""Analyze these claims and identify groups where sources disagree.
Claims:
{claims_text}
Return JSON:
{{
"conflict_groups": [
{{
"topic": "what the disagreement is about",
"claim_ids": [0, 3, 7]
}}
]
}}"""
response = await self.llm.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
result = json.loads(response.choices[0].message.content)
groups = []
for group in result.get("conflict_groups", []):
group_claims = [all_claims[i] for i in group["claim_ids"]
if i < len(all_claims)]
groups.append(ConflictGroup(
topic=group["topic"],
claims=group_claims,
))
return groups
async def resolve(self, conflict: ConflictGroup) -> ConflictGroup:
"""Resolve a conflict by weighing source reliability and evidence."""
claims_detail = []
for c in conflict.claims:
claims_detail.append(
f"- [{c.source_name}, reliability={c.reliability:.1f}] "
f"{c.statement} (evidence: {c.supporting_evidence})"
)
prompt = f"""These sources disagree about: {conflict.topic}
Claims:
{chr(10).join(claims_detail)}
Resolve the conflict by:
1. Weighing source reliability scores
2. Evaluating the strength of supporting evidence
3. Considering source type (academic > news for factual claims)
Return JSON:
{{
"resolution": "explanation of why one claim is more credible",
"resolved_claim": "the most accurate statement based on evidence"
}}"""
response = await self.llm.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"},
)
result = json.loads(response.choices[0].message.content)
conflict.resolution = result["resolution"]
conflict.resolved_claim = result["resolved_claim"]
return conflict
The Synthesis Engine
The final synthesis combines resolved claims into a coherent analysis:
class SynthesisEngine:
def __init__(self, llm_client):
self.llm = llm_client
async def synthesize(
self,
query: str,
claims: list[Claim],
resolved_conflicts: list[ConflictGroup],
source_results: list[SourceResult],
) -> str:
"""Produce a coherent research synthesis."""
# Group non-conflicting claims by confidence
high_confidence = [c for c in claims if c.confidence >= 0.7]
medium_confidence = [c for c in claims if 0.4 <= c.confidence < 0.7]
conflict_summaries = []
for cg in resolved_conflicts:
conflict_summaries.append(
f"- {cg.topic}: {cg.resolved_claim} ({cg.resolution})"
)
successful_sources = [s for s in source_results if not s.error]
failed_sources = [s for s in source_results if s.error]
prompt = f"""Synthesize a comprehensive research analysis.
Research Question: {query}
High-Confidence Findings ({len(high_confidence)} claims):
{chr(10).join(f"- {c.statement} [{c.source_name}]" for c in high_confidence[:15])}
Medium-Confidence Findings ({len(medium_confidence)} claims):
{chr(10).join(f"- {c.statement} [{c.source_name}]" for c in medium_confidence[:10])}
Resolved Conflicts:
{chr(10).join(conflict_summaries) if conflict_summaries else "None identified"}
Sources consulted: {len(successful_sources)} successful, {len(failed_sources)} failed
Write a structured analysis with:
1. Executive Summary (2-3 sentences)
2. Key Findings (backed by high-confidence claims)
3. Areas of Uncertainty (medium-confidence and resolved conflicts)
4. Gaps and Limitations (failed sources, missing perspectives)
5. Recommendations for further research"""
response = await self.llm.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a research analyst. "
"Cite sources inline. Distinguish between established facts "
"and uncertain findings."},
{"role": "user", "content": prompt},
],
)
return response.choices[0].message.content
Orchestrating the Full Pipeline
class ResearchSynthesisAgent:
def __init__(self, llm_client, registry: SourceRegistry):
self.retriever = ParallelRetriever(registry)
self.extractor = ClaimExtractor(llm_client)
self.resolver = ConflictResolver(llm_client)
self.synthesizer = SynthesisEngine(llm_client)
async def research(self, query: str) -> str:
# 1. Parallel retrieval
source_results = await self.retriever.retrieve_all(query)
# 2. Extract claims from each source
all_claims = []
extract_tasks = [
self.extractor.extract_claims(sr) for sr in source_results
]
claim_lists = await asyncio.gather(*extract_tasks)
for claim_list in claim_lists:
all_claims.extend(claim_list)
# 3. Detect and resolve conflicts
conflicts = await self.resolver.find_conflicts(all_claims)
resolved = []
for conflict in conflicts:
resolved.append(await self.resolver.resolve(conflict))
# 4. Synthesize
return await self.synthesizer.synthesize(
query, all_claims, resolved, source_results
)
FAQ
How do I ensure source diversity in the research agent?
Enforce diversity at the source registry level. Require at least one source from each SourceType category (academic, news, database). When selecting sources for a query, use stratified sampling — pick the top-reliability source from each category rather than the top N overall. This prevents the agent from relying entirely on one type of source, which introduces systematic bias.
How does the agent handle sources that return stale or outdated information?
Include a recency field in each source result. During claim extraction, ask the LLM to identify dates mentioned in the data. During conflict resolution, newer information from reliable sources should generally take precedence over older data. Flag claims where the most recent source is more than six months old, and note this in the "Areas of Uncertainty" section of the synthesis.
What is the optimal number of sources for a research synthesis?
Five to eight sources provide a good balance between comprehensiveness and cost. Fewer than three sources give insufficient cross-validation. More than ten sources increase cost and latency with diminishing returns — most unique claims are captured by the first seven to eight sources. Adjust based on the topic complexity: a narrow technical question might need three specialized sources, while a broad market analysis benefits from eight diverse sources.
#ResearchAgent #MultiSource #Synthesis #ConflictResolution #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.