Skip to content
Learn Agentic AI14 min read0 views

Building Your First RAG Pipeline in Python: End-to-End Tutorial

A complete hands-on tutorial that walks you through building a working RAG pipeline from scratch — loading documents, chunking, embedding, storing in a vector database, retrieving, and generating answers.

What You Will Build

By the end of this tutorial, you will have a fully working RAG pipeline that can answer questions about any collection of documents. The pipeline includes six stages: load, chunk, embed, store, retrieve, and generate. Every line of code is explained.

Prerequisites

Install the required packages:

pip install langchain langchain-openai langchain-community chromadb pypdf python-dotenv

Set your OpenAI API key:

export OPENAI_API_KEY="sk-proj-your-key-here"

Step 1: Load Documents

We will use LangChain's document loaders to read PDF files from a directory. The same pattern works for markdown, HTML, CSV, and dozens of other formats.

from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader

loader = DirectoryLoader(
    "./docs",
    glob="**/*.pdf",
    loader_cls=PyPDFLoader,
    show_progress=True,
)

raw_documents = loader.load()
print(f"Loaded {len(raw_documents)} pages from PDF files")

# Each document has page_content (text) and metadata (source, page number)
for doc in raw_documents[:2]:
    print(f"Source: {doc.metadata['source']}, Page: {doc.metadata.get('page', 'N/A')}")
    print(f"Content preview: {doc.page_content[:150]}...")
    print()

Step 2: Chunk Documents

Split the loaded documents into smaller, semantically coherent chunks using recursive character splitting:

from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=64,
    separators=["\n\n", "\n", ". ", " ", ""],
    length_function=len,
)

chunks = text_splitter.split_documents(raw_documents)
print(f"Split {len(raw_documents)} pages into {len(chunks)} chunks")

# Inspect chunk size distribution
sizes = [len(c.page_content) for c in chunks]
print(f"Chunk sizes — min: {min(sizes)}, max: {max(sizes)}, avg: {sum(sizes)//len(sizes)}")

Each chunk retains the metadata from its parent document (source file, page number), which is critical for source attribution in answers.

Step 3: Create Embeddings and Store in Vector DB

We use OpenAI's text-embedding-3-small model and Chroma as the vector store:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma

embedding_model = OpenAIEmbeddings(
    model="text-embedding-3-small",  # 1536 dimensions, $0.02/1M tokens
)

# Build the vector store — this embeds all chunks and stores them
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embedding_model,
    persist_directory="./chroma_rag_db",
    collection_name="my_documents",
)

print(f"Stored {len(chunks)} chunks in Chroma vector store")

This step makes API calls to OpenAI to generate embeddings for every chunk. For 1,000 chunks of 512 characters each, the cost is roughly $0.01.

Step 4: Build the Retriever

The retriever wraps the vector store and provides a clean interface for finding relevant chunks:

retriever = vectorstore.as_retriever(
    search_type="similarity",
    search_kwargs={
        "k": 5,  # return top 5 most similar chunks
    },
)

# Test retrieval
test_query = "What are the main product features?"
retrieved_docs = retriever.invoke(test_query)

print(f"Retrieved {len(retrieved_docs)} chunks for: '{test_query}'")
for i, doc in enumerate(retrieved_docs):
    print(f"\n--- Chunk {i+1} (from {doc.metadata.get('source', 'unknown')}) ---")
    print(doc.page_content[:200])

Step 5: Build the Generation Chain

Now we connect retrieval to generation. The LLM receives the retrieved context and produces a grounded answer:

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o", temperature=0)

RAG_PROMPT = ChatPromptTemplate.from_template("""You are a helpful assistant that answers questions based on the provided context.

Instructions:
- Answer the question using ONLY the information in the context below.
- If the context does not contain enough information, say "I don't have sufficient information to answer that question."
- Cite the source document when possible.
- Be concise and direct.

Context:
{context}

Question: {question}

Answer:""")

def format_docs(docs):
    formatted = []
    for i, doc in enumerate(docs):
        source = doc.metadata.get("source", "unknown")
        formatted.append(f"[Source: {source}]\n{doc.page_content}")
    return "\n\n---\n\n".join(formatted)

Step 6: Put It All Together

def ask(question: str) -> str:
    """Complete RAG pipeline: retrieve then generate."""
    # Retrieve relevant chunks
    docs = retriever.invoke(question)

    # Format context
    context = format_docs(docs)

    # Generate answer
    prompt = RAG_PROMPT.format(context=context, question=question)
    response = llm.invoke(prompt)

    return response.content

# Try it
answer = ask("What are the main product features?")
print(answer)

answer = ask("What is the pricing for the enterprise plan?")
print(answer)

Adding Source Attribution

A production-quality RAG system should tell users where the answer came from:

def ask_with_sources(question: str) -> dict:
    """RAG pipeline that returns answer with sources."""
    docs = retriever.invoke(question)
    context = format_docs(docs)

    prompt = RAG_PROMPT.format(context=context, question=question)
    response = llm.invoke(prompt)

    sources = list(set(
        doc.metadata.get("source", "unknown") for doc in docs
    ))

    return {
        "answer": response.content,
        "sources": sources,
        "num_chunks_used": len(docs),
    }

result = ask_with_sources("What is the refund policy?")
print(f"Answer: {result['answer']}")
print(f"Sources: {', '.join(result['sources'])}")
print(f"Chunks used: {result['num_chunks_used']}")

Testing Your Pipeline

Verify the pipeline works correctly by testing edge cases:

test_questions = [
    "What is the refund policy?",           # should find answer
    "What is the capital of Mars?",          # should say not in context
    "Summarize the main features",           # broad question
]

for q in test_questions:
    result = ask_with_sources(q)
    print(f"Q: {q}")
    print(f"A: {result['answer'][:200]}")
    print(f"Sources: {result['sources']}")
    print()

FAQ

How much does it cost to run this pipeline?

Embedding costs are minimal — roughly $0.02 per million tokens with text-embedding-3-small. The main cost is the generation LLM call. With GPT-4o, each query costs about $0.005-0.02 depending on context length. For most applications this totals a few dollars per thousand queries.

Can I use a local LLM instead of OpenAI for generation?

Yes. Replace ChatOpenAI with any LangChain-compatible LLM wrapper. For local models, use ChatOllama with Llama or Mistral. The retrieval pipeline remains identical — only the generation step changes.

How do I update the knowledge base when documents change?

Re-run the ingestion pipeline (Steps 1-3) on the new or updated documents. For incremental updates, add new chunks to the existing Chroma collection using vectorstore.add_documents(new_chunks). For deletions, use Chroma's delete API with document IDs.


#RAG #Python #Tutorial #LangChain #VectorSearch #OpenAI #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.