Implementing Agent Checkpoints: Save and Resume Long-Running Agent Tasks
Learn how to implement checkpointing for AI agents — serializing agent state, persisting progress to disk or database, and resuming interrupted tasks with idempotent operations and recovery strategies.
Why Agents Need Checkpoints
Long-running agents face real-world interruptions: server restarts, network timeouts, API rate limits, or simple crashes. Without checkpoints, a research agent that spent 30 minutes gathering and analyzing data loses all progress and must start from scratch. Checkpointing saves the agent's state at regular intervals so it can resume from the last saved point rather than the beginning.
Checkpointing is especially critical for agents that perform expensive operations — LLM calls, API requests, or database writes — because resuming from a checkpoint avoids repeating those costs.
Designing the Checkpoint System
A checkpoint captures everything needed to resume the agent: the current task state, completed steps, intermediate results, and any accumulated context.
import json
import hashlib
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
@dataclass
class Checkpoint:
task_id: str
step_index: int
state: Dict[str, Any]
completed_steps: List[str]
intermediate_results: Dict[str, Any]
created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
checksum: str = ""
def compute_checksum(self) -> str:
"""Create a checksum to detect corruption."""
data = json.dumps(
{k: v for k, v in asdict(self).items() if k != "checksum"},
sort_keys=True,
)
return hashlib.sha256(data.encode()).hexdigest()[:16]
class CheckpointStore:
def __init__(self, storage_dir: str = "./checkpoints"):
self.storage_dir = Path(storage_dir)
self.storage_dir.mkdir(parents=True, exist_ok=True)
def save(self, checkpoint: Checkpoint):
checkpoint.checksum = checkpoint.compute_checksum()
path = self.storage_dir / f"{checkpoint.task_id}.json"
# Write to temp file first, then rename for atomicity
tmp_path = path.with_suffix(".tmp")
tmp_path.write_text(json.dumps(asdict(checkpoint), indent=2))
tmp_path.rename(path)
def load(self, task_id: str) -> Optional[Checkpoint]:
path = self.storage_dir / f"{task_id}.json"
if not path.exists():
return None
data = json.loads(path.read_text())
checkpoint = Checkpoint(**data)
# Verify integrity
expected = checkpoint.compute_checksum()
if checkpoint.checksum != expected:
raise ValueError(
f"Checkpoint corrupted: expected {expected}, got {checkpoint.checksum}"
)
return checkpoint
def delete(self, task_id: str):
path = self.storage_dir / f"{task_id}.json"
if path.exists():
path.unlink()
The atomic write pattern (write to a temp file, then rename) prevents data corruption if the process is killed mid-write.
Building a Checkpointed Agent Runner
The runner wraps your agent logic, automatically saving checkpoints after each step and resuming from the last checkpoint on restart.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
from typing import Callable, Awaitable
@dataclass
class AgentStep:
name: str
execute: Callable[..., Awaitable[Any]]
idempotent: bool = True # safe to re-execute without side effects?
class CheckpointedRunner:
def __init__(self, task_id: str, steps: List[AgentStep], store: CheckpointStore = None):
self.task_id = task_id
self.steps = steps
self.store = store or CheckpointStore()
self.results: Dict[str, Any] = {}
self.state: Dict[str, Any] = {}
async def run(self) -> Dict[str, Any]:
# Try to resume from checkpoint
checkpoint = self.store.load(self.task_id)
start_index = 0
if checkpoint:
start_index = checkpoint.step_index
self.results = checkpoint.intermediate_results
self.state = checkpoint.state
print(
f"Resuming task {self.task_id} from step {start_index} "
f"({checkpoint.completed_steps[-1] if checkpoint.completed_steps else 'start'})"
)
completed = list(self.results.keys())
for i in range(start_index, len(self.steps)):
step = self.steps[i]
print(f"Executing step {i}: {step.name}")
try:
result = await step.execute(self.state, self.results)
self.results[step.name] = result
completed.append(step.name)
# Save checkpoint after each successful step
self.store.save(Checkpoint(
task_id=self.task_id,
step_index=i + 1,
state=self.state,
completed_steps=completed,
intermediate_results=self.results,
))
except Exception as e:
# Save checkpoint at the failed step so we can retry it
self.store.save(Checkpoint(
task_id=self.task_id,
step_index=i, # retry this step on resume
state=self.state,
completed_steps=completed,
intermediate_results=self.results,
))
raise RuntimeError(
f"Step '{step.name}' failed: {e}. "
f"Checkpoint saved at step {i}."
) from e
# Clean up checkpoint on success
self.store.delete(self.task_id)
return self.results
Idempotency Considerations
When an agent resumes from a checkpoint, it may re-execute the step that failed. If that step sends an email or charges a credit card, you get duplicate side effects. Idempotent operations produce the same result regardless of how many times they run.
class IdempotencyGuard:
"""Track completed operations to prevent duplicate execution."""
def __init__(self, store_path: str = "./idempotency_keys.json"):
self.path = Path(store_path)
self.keys: Dict[str, Any] = {}
if self.path.exists():
self.keys = json.loads(self.path.read_text())
def execute_once(self, key: str, func: Callable, *args, **kwargs) -> Any:
"""Execute func only if this key has not been executed before."""
if key in self.keys:
print(f"Skipping already-executed operation: {key}")
return self.keys[key]
result = func(*args, **kwargs)
self.keys[key] = result
self.path.write_text(json.dumps(self.keys, indent=2, default=str))
return result
# Usage in an agent step
guard = IdempotencyGuard()
async def send_notification(state, results):
user_email = results["collect_info"]["email"]
guard.execute_once(
f"notify_{user_email}_{state.get('task_id')}",
lambda: email_service.send(user_email, "Your report is ready"),
)
Practical Example: Research Agent with Checkpoints
import asyncio
async def gather_sources(state, results):
# Simulate expensive web research
state["query"] = "AI agent memory architectures"
return {"sources": ["paper_A", "paper_B", "paper_C"]}
async def analyze_sources(state, results):
sources = results["gather_sources"]["sources"]
return {"analysis": f"Analyzed {len(sources)} sources on {state['query']}"}
async def generate_report(state, results):
analysis = results["analyze_sources"]["analysis"]
return {"report": f"Final report based on: {analysis}"}
# Define the pipeline
steps = [
AgentStep(name="gather_sources", execute=gather_sources),
AgentStep(name="analyze_sources", execute=analyze_sources),
AgentStep(name="generate_report", execute=generate_report),
]
runner = CheckpointedRunner(task_id="research_001", steps=steps)
result = asyncio.run(runner.run())
If the process crashes during analyze_sources, restarting runner.run() picks up from the checkpoint: it skips gather_sources (already completed) and retries analyze_sources.
FAQ
How often should I save checkpoints?
After every significant step or expensive operation. The goal is to minimize rework on failure. If a step takes 10 seconds, checkpointing after it saves 10 seconds on restart. If checkpointing itself is expensive (e.g., writing large embeddings), batch multiple lightweight steps between checkpoints.
Should I store checkpoints in files or a database?
Files work well for single-machine agents. Use a database (PostgreSQL, Redis) for distributed agents or when multiple processes need to read checkpoint state. Database-backed checkpoints also make it easier to monitor and manage long-running tasks via a dashboard.
How do I handle non-serializable state like database connections?
Separate your state into serializable data (stored in the checkpoint) and runtime resources (recreated on resume). The checkpoint should contain only the data needed to reconstruct the agent's position — IDs, results, counters — not live connections or file handles.
#Checkpointing #AgentPersistence #FaultTolerance #Idempotency #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.