Building Offline-Capable AI Agents: Local Models with Sync-When-Connected
Build AI agents that work fully offline using local model caching, request queuing, and intelligent sync strategies that reconcile state when connectivity returns.
Why Offline Capability Matters
Network connectivity is not guaranteed. Field technicians diagnosing equipment in basements, healthcare workers in rural clinics, warehouse staff in signal-dead zones — all need AI agents that keep working when the network drops.
An offline-capable agent must do three things: run inference locally without any network calls, store results and actions locally, and synchronize everything when connectivity returns — without data loss or conflicts.
Local Model Management
The first challenge is getting the model onto the device and keeping it updated:
import os
import hashlib
import json
from pathlib import Path
from datetime import datetime
class ModelCache:
"""Manages local model storage with version tracking."""
def __init__(self, cache_dir: str = "~/.agent/models"):
self.cache_dir = Path(cache_dir).expanduser()
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.manifest_path = self.cache_dir / "manifest.json"
self.manifest = self._load_manifest()
def _load_manifest(self) -> dict:
if self.manifest_path.exists():
return json.loads(self.manifest_path.read_text())
return {"models": {}}
def _save_manifest(self):
self.manifest_path.write_text(json.dumps(self.manifest, indent=2))
def is_cached(self, model_name: str, expected_hash: str) -> bool:
"""Check if a model is cached and matches expected version."""
entry = self.manifest.get("models", {}).get(model_name)
if not entry:
return False
model_path = self.cache_dir / entry["filename"]
return model_path.exists() and entry["hash"] == expected_hash
def get_model_path(self, model_name: str) -> Path:
entry = self.manifest["models"][model_name]
return self.cache_dir / entry["filename"]
def store_model(self, model_name: str, model_bytes: bytes):
"""Store model bytes and update manifest."""
file_hash = hashlib.sha256(model_bytes).hexdigest()
filename = f"{model_name}_{file_hash[:8]}.onnx"
model_path = self.cache_dir / filename
model_path.write_bytes(model_bytes)
self.manifest["models"][model_name] = {
"filename": filename,
"hash": file_hash,
"size_bytes": len(model_bytes),
"cached_at": datetime.utcnow().isoformat(),
}
self._save_manifest()
return model_path
async def update_if_needed(self, model_name: str, remote_url: str, remote_hash: str):
"""Download model only if local cache is outdated."""
if self.is_cached(model_name, remote_hash):
return self.get_model_path(model_name)
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(remote_url) as resp:
model_bytes = await resp.read()
return self.store_model(model_name, model_bytes)
Offline Request Queue
When the agent takes actions that require a server — like saving a report, updating a database, or sending a notification — those actions must be queued locally:
import sqlite3
import json
import uuid
from datetime import datetime
from enum import Enum
class SyncStatus(Enum):
PENDING = "pending"
SYNCING = "syncing"
SYNCED = "synced"
FAILED = "failed"
class OfflineQueue:
"""Persistent queue for actions that need server sync."""
def __init__(self, db_path: str = "offline_queue.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS action_queue (
id TEXT PRIMARY KEY,
action_type TEXT NOT NULL,
payload TEXT NOT NULL,
status TEXT DEFAULT 'pending',
created_at TEXT NOT NULL,
synced_at TEXT,
retry_count INTEGER DEFAULT 0,
error_message TEXT
)
""")
self.conn.commit()
def enqueue(self, action_type: str, payload: dict) -> str:
action_id = str(uuid.uuid4())
self.conn.execute(
"""INSERT INTO action_queue (id, action_type, payload, created_at)
VALUES (?, ?, ?, ?)""",
(action_id, action_type, json.dumps(payload), datetime.utcnow().isoformat()),
)
self.conn.commit()
return action_id
def get_pending(self, limit: int = 50) -> list[dict]:
cursor = self.conn.execute(
"""SELECT id, action_type, payload, created_at, retry_count
FROM action_queue
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT ?""",
(limit,),
)
return [
{
"id": row[0],
"action_type": row[1],
"payload": json.loads(row[2]),
"created_at": row[3],
"retry_count": row[4],
}
for row in cursor.fetchall()
]
def mark_synced(self, action_id: str):
self.conn.execute(
"""UPDATE action_queue SET status = 'synced', synced_at = ?
WHERE id = ?""",
(datetime.utcnow().isoformat(), action_id),
)
self.conn.commit()
def mark_failed(self, action_id: str, error: str):
self.conn.execute(
"""UPDATE action_queue
SET status = 'failed', error_message = ?,
retry_count = retry_count + 1
WHERE id = ?""",
(error, action_id),
)
self.conn.commit()
Sync-When-Connected Strategy
The sync engine monitors connectivity and processes the queue when the network is available:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import asyncio
import aiohttp
class SyncEngine:
"""Monitors connectivity and syncs queued actions."""
def __init__(self, queue: OfflineQueue, api_base: str, max_retries: int = 5):
self.queue = queue
self.api_base = api_base
self.max_retries = max_retries
self.is_online = False
async def start(self):
"""Run the sync loop in the background."""
while True:
self.is_online = await self._check_connectivity()
if self.is_online:
await self._process_queue()
await asyncio.sleep(10)
async def _check_connectivity(self) -> bool:
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.api_base}/health", timeout=aiohttp.ClientTimeout(total=3)
) as resp:
return resp.status == 200
except Exception:
return False
async def _process_queue(self):
pending = self.queue.get_pending()
for action in pending:
if action["retry_count"] >= self.max_retries:
self.queue.mark_failed(action["id"], "Max retries exceeded")
continue
try:
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.api_base}/sync/{action['action_type']}",
json=action["payload"],
) as resp:
if resp.status in (200, 201):
self.queue.mark_synced(action["id"])
else:
body = await resp.text()
self.queue.mark_failed(action["id"], f"HTTP {resp.status}: {body}")
except Exception as e:
self.queue.mark_failed(action["id"], str(e))
Conflict Resolution
When two devices (or edge and cloud) modify the same resource offline, you need a conflict resolution strategy:
class ConflictResolver:
"""Resolves conflicts using last-write-wins with field-level merge."""
def resolve(self, local: dict, remote: dict) -> dict:
"""Merge two versions of the same record."""
merged = {}
all_keys = set(local.keys()) | set(remote.keys())
for key in all_keys:
if key == "updated_at":
continue
local_val = local.get(key)
remote_val = remote.get(key)
if local_val == remote_val:
merged[key] = local_val
elif key not in remote:
merged[key] = local_val # Local-only field
elif key not in local:
merged[key] = remote_val # Remote-only field
else:
# Both modified — use timestamp to decide
local_time = local.get("updated_at", "")
remote_time = remote.get("updated_at", "")
merged[key] = local_val if local_time > remote_time else remote_val
merged["updated_at"] = max(
local.get("updated_at", ""), remote.get("updated_at", "")
)
return merged
FAQ
How much storage do offline AI models require on a device?
A quantized intent classifier (DistilBERT INT8) takes about 64 MB. A small generative model like Phi-2 quantized to 4-bit takes about 1.5 GB. For most agent use cases, a combination of a classifier, an embedding model, and a small generator fits within 2 to 3 GB — manageable on modern phones and laptops.
How do I handle model updates when the device reconnects?
Use a version manifest on the server that includes model names and SHA-256 hashes. When the device comes online, compare local hashes against the manifest. Download only changed models. Apply updates atomically — load the new model in the background, swap it in once ready, and delete the old version after confirming the new one works.
What happens if queued actions conflict with changes made on the server while offline?
Use optimistic concurrency with version numbers. Each record has a version field that increments on every update. When syncing, include the expected version. If the server version is higher, the sync fails and triggers conflict resolution — either automatic merging (for compatible changes) or flagging for manual review (for incompatible changes).
#OfflineAI #LocalModels #DataSync #EdgeAI #ConflictResolution #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.