Building Your First RAG Pipeline in Python: End-to-End Tutorial
A complete hands-on tutorial that walks you through building a working RAG pipeline from scratch — loading documents, chunking, embedding, storing in a vector database, retrieving, and generating answers.
What You Will Build
By the end of this tutorial, you will have a fully working RAG pipeline that can answer questions about any collection of documents. The pipeline includes six stages: load, chunk, embed, store, retrieve, and generate. Every line of code is explained.
Prerequisites
Install the required packages:
pip install langchain langchain-openai langchain-community chromadb pypdf python-dotenv
Set your OpenAI API key:
export OPENAI_API_KEY="sk-proj-your-key-here"
Step 1: Load Documents
We will use LangChain's document loaders to read PDF files from a directory. The same pattern works for markdown, HTML, CSV, and dozens of other formats.
from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader
loader = DirectoryLoader(
"./docs",
glob="**/*.pdf",
loader_cls=PyPDFLoader,
show_progress=True,
)
raw_documents = loader.load()
print(f"Loaded {len(raw_documents)} pages from PDF files")
# Each document has page_content (text) and metadata (source, page number)
for doc in raw_documents[:2]:
print(f"Source: {doc.metadata['source']}, Page: {doc.metadata.get('page', 'N/A')}")
print(f"Content preview: {doc.page_content[:150]}...")
print()
Step 2: Chunk Documents
Split the loaded documents into smaller, semantically coherent chunks using recursive character splitting:
from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=64,
separators=["\n\n", "\n", ". ", " ", ""],
length_function=len,
)
chunks = text_splitter.split_documents(raw_documents)
print(f"Split {len(raw_documents)} pages into {len(chunks)} chunks")
# Inspect chunk size distribution
sizes = [len(c.page_content) for c in chunks]
print(f"Chunk sizes — min: {min(sizes)}, max: {max(sizes)}, avg: {sum(sizes)//len(sizes)}")
Each chunk retains the metadata from its parent document (source file, page number), which is critical for source attribution in answers.
Step 3: Create Embeddings and Store in Vector DB
We use OpenAI's text-embedding-3-small model and Chroma as the vector store:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
embedding_model = OpenAIEmbeddings(
model="text-embedding-3-small", # 1536 dimensions, $0.02/1M tokens
)
# Build the vector store — this embeds all chunks and stores them
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embedding_model,
persist_directory="./chroma_rag_db",
collection_name="my_documents",
)
print(f"Stored {len(chunks)} chunks in Chroma vector store")
This step makes API calls to OpenAI to generate embeddings for every chunk. For 1,000 chunks of 512 characters each, the cost is roughly $0.01.
Step 4: Build the Retriever
The retriever wraps the vector store and provides a clean interface for finding relevant chunks:
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={
"k": 5, # return top 5 most similar chunks
},
)
# Test retrieval
test_query = "What are the main product features?"
retrieved_docs = retriever.invoke(test_query)
print(f"Retrieved {len(retrieved_docs)} chunks for: '{test_query}'")
for i, doc in enumerate(retrieved_docs):
print(f"\n--- Chunk {i+1} (from {doc.metadata.get('source', 'unknown')}) ---")
print(doc.page_content[:200])
Step 5: Build the Generation Chain
Now we connect retrieval to generation. The LLM receives the retrieved context and produces a grounded answer:
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
llm = ChatOpenAI(model="gpt-4o", temperature=0)
RAG_PROMPT = ChatPromptTemplate.from_template("""You are a helpful assistant that answers questions based on the provided context.
Instructions:
- Answer the question using ONLY the information in the context below.
- If the context does not contain enough information, say "I don't have sufficient information to answer that question."
- Cite the source document when possible.
- Be concise and direct.
Context:
{context}
Question: {question}
Answer:""")
def format_docs(docs):
formatted = []
for i, doc in enumerate(docs):
source = doc.metadata.get("source", "unknown")
formatted.append(f"[Source: {source}]\n{doc.page_content}")
return "\n\n---\n\n".join(formatted)
Step 6: Put It All Together
def ask(question: str) -> str:
"""Complete RAG pipeline: retrieve then generate."""
# Retrieve relevant chunks
docs = retriever.invoke(question)
# Format context
context = format_docs(docs)
# Generate answer
prompt = RAG_PROMPT.format(context=context, question=question)
response = llm.invoke(prompt)
return response.content
# Try it
answer = ask("What are the main product features?")
print(answer)
answer = ask("What is the pricing for the enterprise plan?")
print(answer)
Adding Source Attribution
A production-quality RAG system should tell users where the answer came from:
def ask_with_sources(question: str) -> dict:
"""RAG pipeline that returns answer with sources."""
docs = retriever.invoke(question)
context = format_docs(docs)
prompt = RAG_PROMPT.format(context=context, question=question)
response = llm.invoke(prompt)
sources = list(set(
doc.metadata.get("source", "unknown") for doc in docs
))
return {
"answer": response.content,
"sources": sources,
"num_chunks_used": len(docs),
}
result = ask_with_sources("What is the refund policy?")
print(f"Answer: {result['answer']}")
print(f"Sources: {', '.join(result['sources'])}")
print(f"Chunks used: {result['num_chunks_used']}")
Testing Your Pipeline
Verify the pipeline works correctly by testing edge cases:
test_questions = [
"What is the refund policy?", # should find answer
"What is the capital of Mars?", # should say not in context
"Summarize the main features", # broad question
]
for q in test_questions:
result = ask_with_sources(q)
print(f"Q: {q}")
print(f"A: {result['answer'][:200]}")
print(f"Sources: {result['sources']}")
print()
FAQ
How much does it cost to run this pipeline?
Embedding costs are minimal — roughly $0.02 per million tokens with text-embedding-3-small. The main cost is the generation LLM call. With GPT-4o, each query costs about $0.005-0.02 depending on context length. For most applications this totals a few dollars per thousand queries.
Can I use a local LLM instead of OpenAI for generation?
Yes. Replace ChatOpenAI with any LangChain-compatible LLM wrapper. For local models, use ChatOllama with Llama or Mistral. The retrieval pipeline remains identical — only the generation step changes.
How do I update the knowledge base when documents change?
Re-run the ingestion pipeline (Steps 1-3) on the new or updated documents. For incremental updates, add new chunks to the existing Chroma collection using vectorstore.add_documents(new_chunks). For deletions, use Chroma's delete API with document IDs.
#RAG #Python #Tutorial #LangChain #VectorSearch #OpenAI #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.