Building a Knowledge Graph Construction Agent: Extracting Entities and Relations from Documents
Build an AI agent that reads documents, extracts named entities and their relationships, constructs a knowledge graph stored in Neo4j, and provides a natural language query interface over the graph.
Why Knowledge Graphs for AI Agents
RAG retrieves document chunks. Knowledge graphs retrieve structured facts. When a user asks "which companies has Dr. Sarah Chen co-authored papers with in the last 3 years," a RAG system must search through dozens of paper chunks and hope the LLM connects the dots. A knowledge graph stores the relationship directly: (Dr. Sarah Chen)-[CO_AUTHORED]->(Paper X)<-[PUBLISHED_BY]-(Company Y) and returns precise answers in milliseconds.
A knowledge graph construction agent automates the labor-intensive process of reading documents, extracting entities, identifying relationships, and building the graph. Once built, the graph serves as a structured memory that any downstream agent can query.
Entity and Relation Extraction with Structured Output
The first step is extracting entities and relationships from text. Use the LLM with structured output to ensure consistent extraction.
from pydantic import BaseModel
from agents import Agent, Runner
class Entity(BaseModel):
name: str
type: str # PERSON, ORGANIZATION, TECHNOLOGY, CONCEPT, LOCATION
description: str
class Relation(BaseModel):
source: str
target: str
relation_type: str # WORKS_AT, FOUNDED, USES, COMPETES_WITH, etc.
confidence: float
evidence: str
class ExtractionResult(BaseModel):
entities: list[Entity]
relations: list[Relation]
extractor = Agent(
name="Entity Extractor",
instructions="""Extract all named entities and their relationships from the text.
Entity types: PERSON, ORGANIZATION, TECHNOLOGY, CONCEPT, LOCATION, EVENT, PRODUCT
Relation types: WORKS_AT, FOUNDED, ACQUIRED, PARTNERS_WITH, COMPETES_WITH,
USES, DEVELOPED, LOCATED_IN, PART_OF, CAUSED
Rules:
- Only extract explicitly stated relationships, not inferred ones
- Set confidence between 0.0 and 1.0 based on how clearly the text states the relation
- Include the exact text evidence for each relation
- Normalize entity names (e.g., "Google" and "Google LLC" -> "Google")""",
output_type=ExtractionResult,
)
Chunking Documents for Extraction
Large documents need to be chunked before extraction, with overlap to catch cross-boundary entities.
def chunk_document(text: str, chunk_size: int = 1500, overlap: int = 200) -> list[str]:
"""Split document into overlapping chunks for entity extraction."""
words = text.split()
chunks = []
start = 0
while start < len(words):
end = min(start + chunk_size, len(words))
chunk = " ".join(words[start:end])
chunks.append(chunk)
start += chunk_size - overlap
return chunks
async def extract_from_document(document_text: str) -> ExtractionResult:
"""Extract entities and relations from a full document."""
chunks = chunk_document(document_text)
all_entities: dict[str, Entity] = {}
all_relations: list[Relation] = []
for chunk in chunks:
result = await Runner.run(extractor, chunk)
extraction = result.final_output_as(ExtractionResult)
# Deduplicate entities by name
for entity in extraction.entities:
key = entity.name.lower().strip()
if key not in all_entities:
all_entities[key] = entity
all_relations.extend(extraction.relations)
# Deduplicate relations
unique_relations = deduplicate_relations(all_relations)
return ExtractionResult(
entities=list(all_entities.values()),
relations=unique_relations,
)
def deduplicate_relations(relations: list[Relation]) -> list[Relation]:
"""Merge duplicate relations, keeping the highest confidence."""
seen: dict[str, Relation] = {}
for rel in relations:
key = f"{rel.source}|{rel.relation_type}|{rel.target}"
if key not in seen or rel.confidence > seen[key].confidence:
seen[key] = rel
return list(seen.values())
Storing in Neo4j
Neo4j is the natural storage layer for knowledge graphs. The Cypher query language makes both insertion and querying intuitive.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from neo4j import AsyncGraphDatabase
class KnowledgeGraphStore:
def __init__(self, uri: str, user: str, password: str):
self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password))
async def store_extraction(self, extraction: ExtractionResult):
async with self.driver.session() as session:
# Create entity nodes
for entity in extraction.entities:
await session.run(
"""
MERGE (e:Entity {name: $name})
SET e.type = $type, e.description = $description
WITH e
CALL apoc.create.addLabels(e, [$type]) YIELD node
RETURN node
""",
name=entity.name,
type=entity.type,
description=entity.description,
)
# Create relationship edges
for rel in extraction.relations:
await session.run(
"""
MATCH (source:Entity {name: $source})
MATCH (target:Entity {name: $target})
CALL apoc.merge.relationship(
source, $rel_type, {confidence: $confidence,
evidence: $evidence}, {}, target, {}
) YIELD rel
RETURN rel
""",
source=rel.source,
target=rel.target,
rel_type=rel.relation_type,
confidence=rel.confidence,
evidence=rel.evidence,
)
async def query(self, cypher: str, params: dict = None) -> list[dict]:
async with self.driver.session() as session:
result = await session.run(cypher, params or {})
return [record.data() async for record in result]
async def close(self):
await self.driver.close()
Natural Language Query Interface
Let the agent translate natural language questions into Cypher queries.
from agents import Agent, function_tool
graph_store = KnowledgeGraphStore(
uri="bolt://localhost:7687", user="neo4j", password="password"
)
@function_tool
async def query_knowledge_graph(cypher_query: str) -> str:
"""Execute a Cypher query against the knowledge graph and return results."""
try:
results = await graph_store.query(cypher_query)
return json.dumps(results, indent=2, default=str)
except Exception as e:
return f"Query error: {e}"
@function_tool
async def get_graph_schema() -> str:
"""Get the current schema of the knowledge graph."""
results = await graph_store.query(
"CALL db.schema.visualization() YIELD nodes, relationships RETURN *"
)
return json.dumps(results, default=str)
query_agent = Agent(
name="Knowledge Graph Query Agent",
instructions="""You answer questions using a Neo4j knowledge graph.
First call get_graph_schema to understand the available entity types
and relationships. Then construct a Cypher query to answer the question.
Cypher tips:
- Use MATCH patterns: (a:Entity)-[r:RELATION]->(b:Entity)
- Use WHERE for filtering: WHERE a.type = 'PERSON'
- Use RETURN to specify output columns
- Use ORDER BY and LIMIT for ranking
""",
tools=[query_knowledge_graph, get_graph_schema],
)
Running the Full Pipeline
async def build_and_query_graph():
# Step 1: Extract from documents
documents = load_documents("./research_papers/")
for doc in documents:
extraction = await extract_from_document(doc.text)
await graph_store.store_extraction(extraction)
print(f"Stored {len(extraction.entities)} entities, "
f"{len(extraction.relations)} relations from {doc.name}")
# Step 2: Query the graph
result = await Runner.run(
query_agent,
"Which organizations are working on transformer architectures?"
)
print(result.final_output)
FAQ
How do you handle entity resolution when the same entity appears with different names?
Entity resolution (also called entity linking) requires a normalization step. After extraction, run a secondary LLM pass that compares entity names and descriptions to identify duplicates. Use Levenshtein distance for similar spellings and cosine similarity of entity descriptions for semantic matching. When a match is found, merge the entities in Neo4j using MERGE with a canonical name.
How large can the knowledge graph get before query performance degrades?
Neo4j handles millions of nodes and relationships efficiently with proper indexing. Create indexes on Entity.name and Entity.type. For graphs with over 10 million edges, use Neo4j's query profiling (PROFILE prefix) to identify slow traversals and add targeted composite indexes. Most natural language queries translate to 2-3 hop traversals, which remain fast even on large graphs.
Can you incrementally update the graph as new documents arrive?
Yes, and that is the primary advantage of MERGE over CREATE in the Cypher queries. MERGE creates the node or relationship only if it does not already exist. When a new document mentions an existing entity with new relationships, only the new edges are added. Track document provenance by adding PROCESSED_FROM relationships between entities and source document nodes.
#KnowledgeGraphs #EntityExtraction #Neo4j #NLP #GraphDatabases #AIAgents #StructuredData #InformationExtraction
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.