Data Versioning for AI Agents: Tracking Changes to Knowledge Bases Over Time
Learn how to implement data versioning for AI agent knowledge bases using DVC, content-addressable storage, and lineage tracking to ensure reproducibility and auditability.
Why Data Versioning Matters for AI Agents
When your agent suddenly starts giving worse answers, you need to answer a fundamental question: did the model change, the prompts change, or the data change? Without data versioning, that question is unanswerable. You have no way to compare today's knowledge base to last week's, no way to roll back a bad data update, and no way to reproduce the exact behavior a user experienced yesterday.
Data versioning for AI agents tracks every change to the knowledge base — what was added, what was modified, what was deleted — so you can audit, compare, and reproduce any point in time.
Content-Addressable Storage
The foundation of data versioning is content-addressable storage: every version of every document gets a unique identifier derived from its content, not its filename or location.
import hashlib
import json
from pathlib import Path
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from datetime import datetime
@dataclass
class VersionedDocument:
id: str
content: str
metadata: dict
content_hash: str = ""
version: int = 1
created_at: str = ""
def __post_init__(self):
self.content_hash = hashlib.sha256(
self.content.encode()
).hexdigest()
if not self.created_at:
self.created_at = datetime.utcnow().isoformat()
@dataclass
class DataSnapshot:
snapshot_id: str
timestamp: str
document_hashes: Dict[str, str] # doc_id -> content_hash
total_documents: int
description: str
parent_snapshot: Optional[str] = None
class ContentAddressableStore:
def __init__(self, base_path: str = "./data_versions"):
self.base = Path(base_path)
self.objects_dir = self.base / "objects"
self.snapshots_dir = self.base / "snapshots"
self.objects_dir.mkdir(parents=True, exist_ok=True)
self.snapshots_dir.mkdir(parents=True, exist_ok=True)
def store(self, doc: VersionedDocument) -> str:
# Store content by hash — deduplication is automatic
obj_path = (
self.objects_dir
/ doc.content_hash[:2]
/ doc.content_hash
)
obj_path.parent.mkdir(exist_ok=True)
obj_path.write_text(json.dumps({
"id": doc.id,
"content": doc.content,
"metadata": doc.metadata,
"version": doc.version,
"created_at": doc.created_at,
}))
return doc.content_hash
def retrieve(self, content_hash: str) -> Optional[dict]:
obj_path = (
self.objects_dir
/ content_hash[:2]
/ content_hash
)
if obj_path.exists():
return json.loads(obj_path.read_text())
return None
def create_snapshot(
self,
documents: List[VersionedDocument],
description: str,
parent: Optional[str] = None,
) -> DataSnapshot:
doc_hashes = {}
for doc in documents:
self.store(doc)
doc_hashes[doc.id] = doc.content_hash
snapshot_content = json.dumps(doc_hashes, sort_keys=True)
snapshot_id = hashlib.sha256(
snapshot_content.encode()
).hexdigest()[:16]
snapshot = DataSnapshot(
snapshot_id=snapshot_id,
timestamp=datetime.utcnow().isoformat(),
document_hashes=doc_hashes,
total_documents=len(documents),
description=description,
parent_snapshot=parent,
)
snap_path = self.snapshots_dir / f"{snapshot_id}.json"
snap_path.write_text(json.dumps({
"snapshot_id": snapshot.snapshot_id,
"timestamp": snapshot.timestamp,
"document_hashes": snapshot.document_hashes,
"total_documents": snapshot.total_documents,
"description": snapshot.description,
"parent_snapshot": snapshot.parent_snapshot,
}, indent=2))
return snapshot
Comparing Versions with Diff
The ability to diff two snapshots is the most operationally useful feature. It tells you exactly what changed between any two points in time.
@dataclass
class SnapshotDiff:
added: List[str]
removed: List[str]
modified: List[str]
unchanged: int
@property
def summary(self) -> str:
return (
f"+{len(self.added)} added, "
f"-{len(self.removed)} removed, "
f"~{len(self.modified)} modified, "
f"={self.unchanged} unchanged"
)
def diff_snapshots(
old: DataSnapshot, new: DataSnapshot
) -> SnapshotDiff:
old_ids = set(old.document_hashes.keys())
new_ids = set(new.document_hashes.keys())
added = list(new_ids - old_ids)
removed = list(old_ids - new_ids)
modified = []
unchanged = 0
for doc_id in old_ids & new_ids:
if old.document_hashes[doc_id] != new.document_hashes[doc_id]:
modified.append(doc_id)
else:
unchanged += 1
return SnapshotDiff(
added=added,
removed=removed,
modified=modified,
unchanged=unchanged,
)
Integrating DVC for Large Datasets
For datasets too large for custom storage, DVC (Data Version Control) extends git with large file tracking and remote storage.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import subprocess
class DVCManager:
def __init__(self, repo_path: str):
self.repo_path = repo_path
def track_dataset(self, data_path: str, message: str):
"""Add a dataset to DVC tracking and commit."""
subprocess.run(
["dvc", "add", data_path],
cwd=self.repo_path, check=True,
)
subprocess.run(
["git", "add", f"{data_path}.dvc", ".gitignore"],
cwd=self.repo_path, check=True,
)
subprocess.run(
["git", "commit", "-m", message],
cwd=self.repo_path, check=True,
)
def push_to_remote(self):
subprocess.run(
["dvc", "push"],
cwd=self.repo_path, check=True,
)
def checkout_version(self, git_ref: str):
subprocess.run(
["git", "checkout", git_ref],
cwd=self.repo_path, check=True,
)
subprocess.run(
["dvc", "checkout"],
cwd=self.repo_path, check=True,
)
Lineage Tracking
Lineage tracking records how each piece of data was produced — what source it came from, what transformations were applied, and when.
@dataclass
class LineageRecord:
document_id: str
source: str
pipeline_version: str
transformations: List[str]
created_at: str
input_hash: str
output_hash: str
class LineageTracker:
def __init__(self):
self.records: Dict[str, LineageRecord] = {}
def record(
self, doc_id: str, source: str,
pipeline_version: str, transformations: List[str],
input_hash: str, output_hash: str,
):
self.records[doc_id] = LineageRecord(
document_id=doc_id,
source=source,
pipeline_version=pipeline_version,
transformations=transformations,
created_at=datetime.utcnow().isoformat(),
input_hash=input_hash,
output_hash=output_hash,
)
def trace_origin(self, doc_id: str) -> Optional[LineageRecord]:
return self.records.get(doc_id)
FAQ
How do I roll back a bad data update in production?
Load the previous snapshot, compute the diff against the current state, and apply the reverse operations: delete added documents, re-insert removed ones, and overwrite modified ones with their previous versions from content-addressable storage. If using DVC, checkout the git commit before the bad update and run dvc checkout to restore the dataset.
How granular should my snapshots be — per document or per pipeline run?
Create snapshots per pipeline run, not per document change. Pipeline-level snapshots are more meaningful because they represent a coherent state of the entire knowledge base at a point in time. Tag snapshots with the pipeline run ID, timestamp, and a human-readable description so you can find the right version quickly.
How much storage does content-addressable versioning require?
Less than you might expect. Because content-addressable storage automatically deduplicates, documents that have not changed between versions are stored only once. In practice, if 90% of your knowledge base is stable between updates, versioning adds only about 10% storage overhead per snapshot rather than a full copy each time.
#DataVersioning #DVC #KnowledgeBase #Reproducibility #DataLineage #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.