Skip to content
Learn Agentic AI10 min read0 views

Data Versioning for AI Agents: Tracking Changes to Knowledge Bases Over Time

Learn how to implement data versioning for AI agent knowledge bases using DVC, content-addressable storage, and lineage tracking to ensure reproducibility and auditability.

Why Data Versioning Matters for AI Agents

When your agent suddenly starts giving worse answers, you need to answer a fundamental question: did the model change, the prompts change, or the data change? Without data versioning, that question is unanswerable. You have no way to compare today's knowledge base to last week's, no way to roll back a bad data update, and no way to reproduce the exact behavior a user experienced yesterday.

Data versioning for AI agents tracks every change to the knowledge base — what was added, what was modified, what was deleted — so you can audit, compare, and reproduce any point in time.

Content-Addressable Storage

The foundation of data versioning is content-addressable storage: every version of every document gets a unique identifier derived from its content, not its filename or location.

import hashlib
import json
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from datetime import datetime

@dataclass
class VersionedDocument:
    id: str
    content: str
    metadata: dict
    content_hash: str = ""
    version: int = 1
    created_at: str = ""

    def __post_init__(self):
        self.content_hash = hashlib.sha256(
            self.content.encode()
        ).hexdigest()
        if not self.created_at:
            self.created_at = datetime.utcnow().isoformat()

@dataclass
class DataSnapshot:
    snapshot_id: str
    timestamp: str
    document_hashes: Dict[str, str]  # doc_id -> content_hash
    total_documents: int
    description: str
    parent_snapshot: Optional[str] = None

class ContentAddressableStore:
    def __init__(self, base_path: str = "./data_versions"):
        self.base = Path(base_path)
        self.objects_dir = self.base / "objects"
        self.snapshots_dir = self.base / "snapshots"
        self.objects_dir.mkdir(parents=True, exist_ok=True)
        self.snapshots_dir.mkdir(parents=True, exist_ok=True)

    def store(self, doc: VersionedDocument) -> str:
        # Store content by hash — deduplication is automatic
        obj_path = (
            self.objects_dir
            / doc.content_hash[:2]
            / doc.content_hash
        )
        obj_path.parent.mkdir(exist_ok=True)
        obj_path.write_text(json.dumps({
            "id": doc.id,
            "content": doc.content,
            "metadata": doc.metadata,
            "version": doc.version,
            "created_at": doc.created_at,
        }))
        return doc.content_hash

    def retrieve(self, content_hash: str) -> Optional[dict]:
        obj_path = (
            self.objects_dir
            / content_hash[:2]
            / content_hash
        )
        if obj_path.exists():
            return json.loads(obj_path.read_text())
        return None

    def create_snapshot(
        self,
        documents: List[VersionedDocument],
        description: str,
        parent: Optional[str] = None,
    ) -> DataSnapshot:
        doc_hashes = {}
        for doc in documents:
            self.store(doc)
            doc_hashes[doc.id] = doc.content_hash

        snapshot_content = json.dumps(doc_hashes, sort_keys=True)
        snapshot_id = hashlib.sha256(
            snapshot_content.encode()
        ).hexdigest()[:16]

        snapshot = DataSnapshot(
            snapshot_id=snapshot_id,
            timestamp=datetime.utcnow().isoformat(),
            document_hashes=doc_hashes,
            total_documents=len(documents),
            description=description,
            parent_snapshot=parent,
        )

        snap_path = self.snapshots_dir / f"{snapshot_id}.json"
        snap_path.write_text(json.dumps({
            "snapshot_id": snapshot.snapshot_id,
            "timestamp": snapshot.timestamp,
            "document_hashes": snapshot.document_hashes,
            "total_documents": snapshot.total_documents,
            "description": snapshot.description,
            "parent_snapshot": snapshot.parent_snapshot,
        }, indent=2))

        return snapshot

Comparing Versions with Diff

The ability to diff two snapshots is the most operationally useful feature. It tells you exactly what changed between any two points in time.

@dataclass
class SnapshotDiff:
    added: List[str]
    removed: List[str]
    modified: List[str]
    unchanged: int

    @property
    def summary(self) -> str:
        return (
            f"+{len(self.added)} added, "
            f"-{len(self.removed)} removed, "
            f"~{len(self.modified)} modified, "
            f"={self.unchanged} unchanged"
        )

def diff_snapshots(
    old: DataSnapshot, new: DataSnapshot
) -> SnapshotDiff:
    old_ids = set(old.document_hashes.keys())
    new_ids = set(new.document_hashes.keys())

    added = list(new_ids - old_ids)
    removed = list(old_ids - new_ids)

    modified = []
    unchanged = 0
    for doc_id in old_ids & new_ids:
        if old.document_hashes[doc_id] != new.document_hashes[doc_id]:
            modified.append(doc_id)
        else:
            unchanged += 1

    return SnapshotDiff(
        added=added,
        removed=removed,
        modified=modified,
        unchanged=unchanged,
    )

Integrating DVC for Large Datasets

For datasets too large for custom storage, DVC (Data Version Control) extends git with large file tracking and remote storage.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import subprocess

class DVCManager:
    def __init__(self, repo_path: str):
        self.repo_path = repo_path

    def track_dataset(self, data_path: str, message: str):
        """Add a dataset to DVC tracking and commit."""
        subprocess.run(
            ["dvc", "add", data_path],
            cwd=self.repo_path, check=True,
        )
        subprocess.run(
            ["git", "add", f"{data_path}.dvc", ".gitignore"],
            cwd=self.repo_path, check=True,
        )
        subprocess.run(
            ["git", "commit", "-m", message],
            cwd=self.repo_path, check=True,
        )

    def push_to_remote(self):
        subprocess.run(
            ["dvc", "push"],
            cwd=self.repo_path, check=True,
        )

    def checkout_version(self, git_ref: str):
        subprocess.run(
            ["git", "checkout", git_ref],
            cwd=self.repo_path, check=True,
        )
        subprocess.run(
            ["dvc", "checkout"],
            cwd=self.repo_path, check=True,
        )

Lineage Tracking

Lineage tracking records how each piece of data was produced — what source it came from, what transformations were applied, and when.

@dataclass
class LineageRecord:
    document_id: str
    source: str
    pipeline_version: str
    transformations: List[str]
    created_at: str
    input_hash: str
    output_hash: str

class LineageTracker:
    def __init__(self):
        self.records: Dict[str, LineageRecord] = {}

    def record(
        self, doc_id: str, source: str,
        pipeline_version: str, transformations: List[str],
        input_hash: str, output_hash: str,
    ):
        self.records[doc_id] = LineageRecord(
            document_id=doc_id,
            source=source,
            pipeline_version=pipeline_version,
            transformations=transformations,
            created_at=datetime.utcnow().isoformat(),
            input_hash=input_hash,
            output_hash=output_hash,
        )

    def trace_origin(self, doc_id: str) -> Optional[LineageRecord]:
        return self.records.get(doc_id)

FAQ

How do I roll back a bad data update in production?

Load the previous snapshot, compute the diff against the current state, and apply the reverse operations: delete added documents, re-insert removed ones, and overwrite modified ones with their previous versions from content-addressable storage. If using DVC, checkout the git commit before the bad update and run dvc checkout to restore the dataset.

How granular should my snapshots be — per document or per pipeline run?

Create snapshots per pipeline run, not per document change. Pipeline-level snapshots are more meaningful because they represent a coherent state of the entire knowledge base at a point in time. Tag snapshots with the pipeline run ID, timestamp, and a human-readable description so you can find the right version quickly.

How much storage does content-addressable versioning require?

Less than you might expect. Because content-addressable storage automatically deduplicates, documents that have not changed between versions are stored only once. In practice, if 90% of your knowledge base is stable between updates, versioning adds only about 10% storage overhead per snapshot rather than a full copy each time.


#DataVersioning #DVC #KnowledgeBase #Reproducibility #DataLineage #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.