Claude Message Batches: Processing Thousands of Agent Tasks with 50% Cost Savings
Learn how to use the Claude Message Batches API to process thousands of agent tasks asynchronously with 50% cost reduction, including job monitoring, result processing, and error handling.
Why Batch Processing Matters for Agents
Many agent workloads are not real-time. Nightly data classification, bulk document summarization, mass email personalization, and dataset labeling can all tolerate minutes to hours of latency. The Claude Message Batches API is designed for exactly these scenarios — it processes up to 100,000 requests per batch at 50% of the standard API cost with a 24-hour processing window.
For agent systems, this means you can run thousands of independent agent tasks in parallel without managing rate limits, connection pools, or retry logic yourself. Anthropic handles the queuing and execution; you just submit the batch and poll for results.
How the Batches API Works
The flow is straightforward: create a batch of message requests, submit them, poll for completion, and retrieve results. Each request in the batch is a complete Messages API call — it can include tools, system prompts, multi-turn conversations, and all other features.
import anthropic
import json
import time
client = anthropic.Anthropic()
# Step 1: Define individual requests
requests = []
documents = load_documents() # Your list of documents to process
for i, doc in enumerate(documents):
requests.append({
"custom_id": f"doc-{i}",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{
"role": "user",
"content": f"Classify this document and extract key entities:\n\n{doc['text']}"
}
],
}
})
Each request needs a custom_id that you use to match results back to inputs. The params field mirrors the standard Messages API parameters exactly.
Submitting a Batch
# Step 2: Create the batch
batch = client.messages.batches.create(requests=requests)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
print(f"Total requests: {batch.request_counts.processing}")
The batch is now queued for processing. Anthropic guarantees completion within 24 hours, but most batches finish much faster — small batches (under 1,000 requests) typically complete in minutes.
Monitoring Batch Progress
Poll the batch status to track progress:
def wait_for_batch(batch_id: str, poll_interval: int = 30) -> dict:
"""Poll batch status until completion."""
while True:
batch = client.messages.batches.retrieve(batch_id)
succeeded = batch.request_counts.succeeded
errored = batch.request_counts.errored
total = batch.request_counts.processing + succeeded + errored
print(f"Progress: {succeeded + errored}/{total} "
f"(succeeded: {succeeded}, errored: {errored})")
if batch.processing_status == "ended":
return batch
time.sleep(poll_interval)
completed_batch = wait_for_batch(batch.id)
For production systems, replace polling with webhooks or a task queue like Celery that checks batch status on a schedule.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Retrieving and Processing Results
Once the batch completes, stream the results:
# Step 3: Retrieve results
results = {}
for result in client.messages.batches.results(completed_batch.id):
custom_id = result.custom_id
if result.result.type == "succeeded":
message = result.result.message
text = message.content[0].text
results[custom_id] = {"status": "success", "output": text}
elif result.result.type == "errored":
error = result.result.error
results[custom_id] = {"status": "error", "error": str(error)}
elif result.result.type == "expired":
results[custom_id] = {"status": "expired"}
print(f"Processed {len(results)} results")
print(f"Succeeded: {sum(1 for r in results.values() if r['status'] == 'success')}")
print(f"Failed: {sum(1 for r in results.values() if r['status'] != 'success')}")
Results stream back as an iterator, so you can process them without loading everything into memory at once.
Batch Requests with Tool Use
Batch requests support the full tool use API. This means you can run agent-like workflows in batch mode, though each batch request gets a single turn — no iterative agent loop:
classification_tool = {
"name": "classify_document",
"description": "Classify a document into categories",
"input_schema": {
"type": "object",
"properties": {
"category": {
"type": "string",
"enum": ["legal", "financial", "technical", "marketing", "other"]
},
"confidence": {"type": "number"},
"entities": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["category", "confidence", "entities"]
}
}
# Force structured output via tool_choice
batch_requests = []
for i, doc in enumerate(documents):
batch_requests.append({
"custom_id": f"classify-{i}",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 512,
"tools": [classification_tool],
"tool_choice": {"type": "tool", "name": "classify_document"},
"messages": [
{"role": "user", "content": f"Classify this document:\n\n{doc['text']}"}
],
}
})
By forcing tool use with tool_choice, every response will contain a structured tool_use block that you can parse directly — no text extraction needed.
Error Handling and Retries
Build resilience into your batch pipeline:
def submit_with_retry(requests: list, max_retries: int = 3) -> str:
for attempt in range(max_retries):
try:
batch = client.messages.batches.create(requests=requests)
return batch.id
except anthropic.APIError as e:
if attempt == max_retries - 1:
raise
print(f"Attempt {attempt + 1} failed: {e}. Retrying...")
time.sleep(2 ** attempt)
def resubmit_failures(batch_id: str, original_requests: dict) -> str:
"""Collect failed requests and resubmit them as a new batch."""
failed_requests = []
for result in client.messages.batches.results(batch_id):
if result.result.type != "succeeded":
# Find the original request by custom_id
original = original_requests[result.custom_id]
failed_requests.append(original)
if not failed_requests:
return None
print(f"Resubmitting {len(failed_requests)} failed requests")
return submit_with_retry(failed_requests)
FAQ
What is the maximum batch size?
Each batch can contain up to 100,000 requests. If you have more than that, split them into multiple batches and submit them concurrently. Each request can use up to the model's full context window and max output tokens.
Can I cancel a running batch?
Yes, call client.messages.batches.cancel(batch_id) to cancel a batch in progress. Requests that have already completed will still be available in the results. Requests that were not yet processed will be marked as canceled.
How much does batch processing actually save?
Batch processing costs exactly 50% of the standard API pricing for both input and output tokens. For a workflow processing 10,000 documents at an average of 2,000 input tokens and 500 output tokens each, the savings are substantial — potentially hundreds of dollars per run compared to real-time API calls.
#Claude #BatchProcessing #CostOptimization #Async #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.