Skip to content
Learn Agentic AI10 min read0 views

Building Offline-Capable AI Agents: Local Models with Sync-When-Connected

Build AI agents that work fully offline using local model caching, request queuing, and intelligent sync strategies that reconcile state when connectivity returns.

Why Offline Capability Matters

Network connectivity is not guaranteed. Field technicians diagnosing equipment in basements, healthcare workers in rural clinics, warehouse staff in signal-dead zones — all need AI agents that keep working when the network drops.

An offline-capable agent must do three things: run inference locally without any network calls, store results and actions locally, and synchronize everything when connectivity returns — without data loss or conflicts.

Local Model Management

The first challenge is getting the model onto the device and keeping it updated:

import os
import hashlib
import json
from pathlib import Path
from datetime import datetime

class ModelCache:
    """Manages local model storage with version tracking."""

    def __init__(self, cache_dir: str = "~/.agent/models"):
        self.cache_dir = Path(cache_dir).expanduser()
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.manifest_path = self.cache_dir / "manifest.json"
        self.manifest = self._load_manifest()

    def _load_manifest(self) -> dict:
        if self.manifest_path.exists():
            return json.loads(self.manifest_path.read_text())
        return {"models": {}}

    def _save_manifest(self):
        self.manifest_path.write_text(json.dumps(self.manifest, indent=2))

    def is_cached(self, model_name: str, expected_hash: str) -> bool:
        """Check if a model is cached and matches expected version."""
        entry = self.manifest.get("models", {}).get(model_name)
        if not entry:
            return False
        model_path = self.cache_dir / entry["filename"]
        return model_path.exists() and entry["hash"] == expected_hash

    def get_model_path(self, model_name: str) -> Path:
        entry = self.manifest["models"][model_name]
        return self.cache_dir / entry["filename"]

    def store_model(self, model_name: str, model_bytes: bytes):
        """Store model bytes and update manifest."""
        file_hash = hashlib.sha256(model_bytes).hexdigest()
        filename = f"{model_name}_{file_hash[:8]}.onnx"
        model_path = self.cache_dir / filename

        model_path.write_bytes(model_bytes)
        self.manifest["models"][model_name] = {
            "filename": filename,
            "hash": file_hash,
            "size_bytes": len(model_bytes),
            "cached_at": datetime.utcnow().isoformat(),
        }
        self._save_manifest()
        return model_path

    async def update_if_needed(self, model_name: str, remote_url: str, remote_hash: str):
        """Download model only if local cache is outdated."""
        if self.is_cached(model_name, remote_hash):
            return self.get_model_path(model_name)

        import aiohttp
        async with aiohttp.ClientSession() as session:
            async with session.get(remote_url) as resp:
                model_bytes = await resp.read()
        return self.store_model(model_name, model_bytes)

Offline Request Queue

When the agent takes actions that require a server — like saving a report, updating a database, or sending a notification — those actions must be queued locally:

import sqlite3
import json
import uuid
from datetime import datetime
from enum import Enum

class SyncStatus(Enum):
    PENDING = "pending"
    SYNCING = "syncing"
    SYNCED = "synced"
    FAILED = "failed"

class OfflineQueue:
    """Persistent queue for actions that need server sync."""

    def __init__(self, db_path: str = "offline_queue.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS action_queue (
                id TEXT PRIMARY KEY,
                action_type TEXT NOT NULL,
                payload TEXT NOT NULL,
                status TEXT DEFAULT 'pending',
                created_at TEXT NOT NULL,
                synced_at TEXT,
                retry_count INTEGER DEFAULT 0,
                error_message TEXT
            )
        """)
        self.conn.commit()

    def enqueue(self, action_type: str, payload: dict) -> str:
        action_id = str(uuid.uuid4())
        self.conn.execute(
            """INSERT INTO action_queue (id, action_type, payload, created_at)
               VALUES (?, ?, ?, ?)""",
            (action_id, action_type, json.dumps(payload), datetime.utcnow().isoformat()),
        )
        self.conn.commit()
        return action_id

    def get_pending(self, limit: int = 50) -> list[dict]:
        cursor = self.conn.execute(
            """SELECT id, action_type, payload, created_at, retry_count
               FROM action_queue
               WHERE status = 'pending'
               ORDER BY created_at ASC
               LIMIT ?""",
            (limit,),
        )
        return [
            {
                "id": row[0],
                "action_type": row[1],
                "payload": json.loads(row[2]),
                "created_at": row[3],
                "retry_count": row[4],
            }
            for row in cursor.fetchall()
        ]

    def mark_synced(self, action_id: str):
        self.conn.execute(
            """UPDATE action_queue SET status = 'synced', synced_at = ?
               WHERE id = ?""",
            (datetime.utcnow().isoformat(), action_id),
        )
        self.conn.commit()

    def mark_failed(self, action_id: str, error: str):
        self.conn.execute(
            """UPDATE action_queue
               SET status = 'failed', error_message = ?,
                   retry_count = retry_count + 1
               WHERE id = ?""",
            (error, action_id),
        )
        self.conn.commit()

Sync-When-Connected Strategy

The sync engine monitors connectivity and processes the queue when the network is available:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import asyncio
import aiohttp

class SyncEngine:
    """Monitors connectivity and syncs queued actions."""

    def __init__(self, queue: OfflineQueue, api_base: str, max_retries: int = 5):
        self.queue = queue
        self.api_base = api_base
        self.max_retries = max_retries
        self.is_online = False

    async def start(self):
        """Run the sync loop in the background."""
        while True:
            self.is_online = await self._check_connectivity()
            if self.is_online:
                await self._process_queue()
            await asyncio.sleep(10)

    async def _check_connectivity(self) -> bool:
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    f"{self.api_base}/health", timeout=aiohttp.ClientTimeout(total=3)
                ) as resp:
                    return resp.status == 200
        except Exception:
            return False

    async def _process_queue(self):
        pending = self.queue.get_pending()
        for action in pending:
            if action["retry_count"] >= self.max_retries:
                self.queue.mark_failed(action["id"], "Max retries exceeded")
                continue
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.post(
                        f"{self.api_base}/sync/{action['action_type']}",
                        json=action["payload"],
                    ) as resp:
                        if resp.status in (200, 201):
                            self.queue.mark_synced(action["id"])
                        else:
                            body = await resp.text()
                            self.queue.mark_failed(action["id"], f"HTTP {resp.status}: {body}")
            except Exception as e:
                self.queue.mark_failed(action["id"], str(e))

Conflict Resolution

When two devices (or edge and cloud) modify the same resource offline, you need a conflict resolution strategy:

class ConflictResolver:
    """Resolves conflicts using last-write-wins with field-level merge."""

    def resolve(self, local: dict, remote: dict) -> dict:
        """Merge two versions of the same record."""
        merged = {}
        all_keys = set(local.keys()) | set(remote.keys())

        for key in all_keys:
            if key == "updated_at":
                continue
            local_val = local.get(key)
            remote_val = remote.get(key)

            if local_val == remote_val:
                merged[key] = local_val
            elif key not in remote:
                merged[key] = local_val  # Local-only field
            elif key not in local:
                merged[key] = remote_val  # Remote-only field
            else:
                # Both modified — use timestamp to decide
                local_time = local.get("updated_at", "")
                remote_time = remote.get("updated_at", "")
                merged[key] = local_val if local_time > remote_time else remote_val

        merged["updated_at"] = max(
            local.get("updated_at", ""), remote.get("updated_at", "")
        )
        return merged

FAQ

How much storage do offline AI models require on a device?

A quantized intent classifier (DistilBERT INT8) takes about 64 MB. A small generative model like Phi-2 quantized to 4-bit takes about 1.5 GB. For most agent use cases, a combination of a classifier, an embedding model, and a small generator fits within 2 to 3 GB — manageable on modern phones and laptops.

How do I handle model updates when the device reconnects?

Use a version manifest on the server that includes model names and SHA-256 hashes. When the device comes online, compare local hashes against the manifest. Download only changed models. Apply updates atomically — load the new model in the background, swap it in once ready, and delete the old version after confirming the new one works.

What happens if queued actions conflict with changes made on the server while offline?

Use optimistic concurrency with version numbers. Each record has a version field that increments on every update. When syncing, include the expected version. If the server version is higher, the sync fails and triggers conflict resolution — either automatic merging (for compatible changes) or flagging for manual review (for incompatible changes).


#OfflineAI #LocalModels #DataSync #EdgeAI #ConflictResolution #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.