Skip to content
Learn Agentic AI13 min read0 views

Capstone: Building an AI Document Processing Pipeline with Human Review

Build a complete document processing system with automated ingestion, AI-powered extraction and classification, a human review queue for quality assurance, and structured data export.

Pipeline Architecture

Document processing is one of the highest-value applications of AI in business. Invoices, contracts, medical records, insurance claims, and tax forms all need to be ingested, classified, have key fields extracted, reviewed for accuracy, and exported to downstream systems. This capstone builds that entire pipeline.

The system has five stages: ingestion (file upload with format detection), classification (determine document type), extraction (pull structured fields from unstructured text), review (human verification with an approval queue), and export (deliver validated data to external systems via API or CSV).

Data Model

# models.py
from sqlalchemy import Column, String, Text, Float, DateTime, ForeignKey, Enum
from sqlalchemy.dialects.postgresql import UUID, JSONB
import uuid, enum

class DocStatus(str, enum.Enum):
    UPLOADED = "uploaded"
    CLASSIFIED = "classified"
    EXTRACTED = "extracted"
    IN_REVIEW = "in_review"
    APPROVED = "approved"
    REJECTED = "rejected"
    EXPORTED = "exported"

class DocumentRecord(Base):
    __tablename__ = "document_records"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    filename = Column(String(500))
    file_path = Column(String(1000))
    file_type = Column(String(20))  # pdf, image, docx
    doc_type = Column(String(100), nullable=True)  # invoice, contract, etc.
    classification_confidence = Column(Float, nullable=True)
    status = Column(Enum(DocStatus), default=DocStatus.UPLOADED)
    extracted_data = Column(JSONB, nullable=True)
    reviewer_notes = Column(Text, nullable=True)
    reviewed_by = Column(String(255), nullable=True)
    created_at = Column(DateTime, server_default="now()")
    reviewed_at = Column(DateTime, nullable=True)

class ExtractionField(Base):
    __tablename__ = "extraction_fields"
    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    document_id = Column(UUID(as_uuid=True), ForeignKey("document_records.id"))
    field_name = Column(String(100))
    extracted_value = Column(Text)
    corrected_value = Column(Text, nullable=True)  # human correction
    confidence = Column(Float)

Document Classification

After ingestion, classify each document to determine what extraction schema to apply.

# services/classifier.py
import openai, fitz

DOCUMENT_TYPES = {
    "invoice": ["vendor_name", "invoice_number", "date", "total_amount", "line_items"],
    "contract": ["parties", "effective_date", "term_length", "key_clauses"],
    "receipt": ["merchant", "date", "total", "payment_method"],
    "medical_record": ["patient_name", "date_of_service", "diagnosis", "provider"],
}

async def classify_document(doc_id: str, db) -> str:
    doc = db.query(DocumentRecord).get(doc_id)
    text = extract_text(doc.file_path, doc.file_type)

    response = openai.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"""Classify this document into one of
these types: {list(DOCUMENT_TYPES.keys())}.
Return JSON with: doc_type (string), confidence (0-1)."""},
            {"role": "user", "content": text[:3000]},
        ],
        response_format={"type": "json_object"},
    )

    result = json.loads(response.choices[0].message.content)
    doc.doc_type = result["doc_type"]
    doc.classification_confidence = result["confidence"]
    doc.status = DocStatus.CLASSIFIED
    db.commit()
    return result["doc_type"]

Field Extraction

Once classified, extract the relevant fields based on the document type schema.

# services/extractor.py
async def extract_fields(doc_id: str, db) -> dict:
    doc = db.query(DocumentRecord).get(doc_id)
    text = extract_text(doc.file_path, doc.file_type)
    schema_fields = DOCUMENT_TYPES[doc.doc_type]

    field_descriptions = ", ".join(schema_fields)
    response = openai.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"""Extract these fields from the document:
{field_descriptions}.
Return JSON with each field name as a key. For each field include:
value (the extracted text), confidence (0-1).
If a field is not found, set value to null and confidence to 0."""},
            {"role": "user", "content": text},
        ],
        response_format={"type": "json_object"},
    )

    extracted = json.loads(response.choices[0].message.content)
    doc.extracted_data = extracted
    doc.status = DocStatus.EXTRACTED

    # Store individual fields for granular tracking
    for field_name, field_data in extracted.items():
        ef = ExtractionField(
            document_id=doc_id,
            field_name=field_name,
            extracted_value=str(field_data.get("value", "")),
            confidence=field_data.get("confidence", 0),
        )
        db.add(ef)

    # Auto-approve if all fields have high confidence
    all_confident = all(
        f.get("confidence", 0) >= 0.95 for f in extracted.values()
    )
    if all_confident:
        doc.status = DocStatus.APPROVED
    else:
        doc.status = DocStatus.IN_REVIEW

    db.commit()
    return extracted

Human Review Queue

Documents with low-confidence extractions enter a review queue. The admin interface shows the original document alongside extracted fields, allowing reviewers to correct values.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

# routes/review.py
from fastapi import APIRouter

router = APIRouter(prefix="/review")

@router.get("/queue")
async def get_review_queue(page: int = 1, per_page: int = 20, db=Depends(get_db)):
    offset = (page - 1) * per_page
    docs = db.query(DocumentRecord).filter(
        DocumentRecord.status == DocStatus.IN_REVIEW
    ).order_by(DocumentRecord.created_at).offset(offset).limit(per_page).all()
    total = db.query(DocumentRecord).filter(
        DocumentRecord.status == DocStatus.IN_REVIEW
    ).count()
    return {"documents": docs, "total": total, "page": page}

@router.post("/{doc_id}/approve")
async def approve_document(doc_id: str, body: ReviewApproval, db=Depends(get_db)):
    doc = db.query(DocumentRecord).get(doc_id)

    # Apply any corrections
    for field_name, corrected_value in body.corrections.items():
        field = db.query(ExtractionField).filter(
            ExtractionField.document_id == doc_id,
            ExtractionField.field_name == field_name,
        ).first()
        if field:
            field.corrected_value = corrected_value

    doc.status = DocStatus.APPROVED
    doc.reviewed_by = body.reviewer_email
    doc.reviewed_at = datetime.utcnow()
    doc.reviewer_notes = body.notes
    db.commit()
    return {"status": "approved"}

@router.post("/{doc_id}/reject")
async def reject_document(doc_id: str, body: ReviewRejection, db=Depends(get_db)):
    doc = db.query(DocumentRecord).get(doc_id)
    doc.status = DocStatus.REJECTED
    doc.reviewed_by = body.reviewer_email
    doc.reviewer_notes = body.reason
    doc.reviewed_at = datetime.utcnow()
    db.commit()
    return {"status": "rejected"}

Export Pipeline

Approved documents are exported to downstream systems. The export layer uses the corrected values when available, falling back to the original extraction.

# services/exporter.py
async def export_approved_documents(db) -> list:
    docs = db.query(DocumentRecord).filter(
        DocumentRecord.status == DocStatus.APPROVED
    ).all()

    exported = []
    for doc in docs:
        fields = db.query(ExtractionField).filter(
            ExtractionField.document_id == doc.id
        ).all()
        record = {"doc_type": doc.doc_type, "filename": doc.filename}
        for f in fields:
            record[f.field_name] = f.corrected_value or f.extracted_value
        exported.append(record)

        doc.status = DocStatus.EXPORTED
    db.commit()
    return exported

FAQ

How do I handle scanned documents and images?

Use OCR as a preprocessing step before classification. PyMuPDF handles PDFs with embedded text. For scanned PDFs and images, use Tesseract OCR or a cloud service like Google Cloud Vision. Store the OCR quality score and route low-quality scans to human review regardless of extraction confidence.

How do I improve extraction accuracy over time?

Use the human corrections as training signal. Track which fields are most frequently corrected and for which document types. Periodically update extraction prompts to include examples of common corrections. Consider fine-tuning an extraction model on your corrected dataset once you have several thousand reviewed documents.

How do I handle multi-page documents where relevant data spans pages?

Concatenate all pages into a single text block before extraction. For very long documents, use a two-pass approach: first identify which pages contain relevant fields, then extract from only those pages. Store page numbers in the extraction metadata so reviewers can quickly navigate to the source.


#CapstoneProject #DocumentProcessing #HumanintheLoop #DataExtraction #Classification #FullStackAI #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.