Claude Batch API: Processing Thousands of Agent Tasks Cost-Effectively
Use Claude's Batch API to process thousands of agent tasks at 50% reduced cost. Learn how to create batches, poll for completion, retrieve results, and design batch-friendly agent workflows.
When to Use the Batch API
The Claude Batch API is designed for workloads that do not require real-time responses. Instead of processing each request immediately and returning results in milliseconds, the Batch API accepts large collections of requests, processes them asynchronously over hours, and returns all results at once — at 50% of the standard per-token price.
This makes it ideal for data processing pipelines, content generation at scale, evaluation harnesses, document classification, and any agent workflow where latency is less important than cost and throughput. If you have 500 documents to analyze or 10,000 support tickets to classify, the Batch API is the right tool.
Creating a Batch
Batch requests use the same message format as the standard API, wrapped in a batch structure:
import anthropic
client = anthropic.Anthropic()
# Define batch requests - each is a standard messages API call with a custom_id
requests = [
{
"custom_id": "doc-001",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "Summarize this article: Artificial intelligence is transforming healthcare..."}
]
}
},
{
"custom_id": "doc-002",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "Summarize this article: The global chip shortage continues to affect..."}
]
}
},
{
"custom_id": "doc-003",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "Summarize this article: Renewable energy investments reached record levels..."}
]
}
}
]
# Create the batch
batch = client.messages.batches.create(requests=requests)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
Each request gets a custom_id that you define — this is how you match results back to your input data. The params object accepts all the same parameters as messages.create, including system prompts, tools, and max_tokens.
Polling for Completion
Batches process asynchronously. Poll the status until completion:
import anthropic
import time
client = anthropic.Anthropic()
def wait_for_batch(batch_id: str, poll_interval: int = 30) -> None:
while True:
batch = client.messages.batches.retrieve(batch_id)
status = batch.processing_status
counts = batch.request_counts
total = counts.processing + counts.succeeded + counts.errored + counts.canceled + counts.expired
completed = counts.succeeded + counts.errored
print(f"Status: {status} | Completed: {completed}/{total} "
f"(succeeded: {counts.succeeded}, errors: {counts.errored})")
if status == "ended":
return
time.sleep(poll_interval)
# Usage
wait_for_batch("msgbatch_abc123")
The batch transitions through states: in_progress while processing, then ended when all requests are complete. Each request can independently succeed or fail, so check both succeeded and errored counts.
Retrieving Results
Once the batch is complete, iterate over the results:
import anthropic
client = anthropic.Anthropic()
def get_batch_results(batch_id: str) -> dict:
results = {}
for result in client.messages.batches.results(batch_id):
custom_id = result.custom_id
if result.result.type == "succeeded":
message = result.result.message
text = "".join(
block.text for block in message.content if hasattr(block, "text")
)
results[custom_id] = {
"status": "success",
"text": text,
"input_tokens": message.usage.input_tokens,
"output_tokens": message.usage.output_tokens,
}
elif result.result.type == "errored":
results[custom_id] = {
"status": "error",
"error": str(result.result.error),
}
elif result.result.type == "expired":
results[custom_id] = {
"status": "expired",
}
return results
results = get_batch_results("msgbatch_abc123")
for doc_id, result in results.items():
if result["status"] == "success":
print(f"{doc_id}: {result['text'][:100]}...")
else:
print(f"{doc_id}: FAILED - {result.get('error', result['status'])}")
Results are streamed as an iterator, so you can process them without loading the entire result set into memory — important when dealing with thousands of responses.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Building a Batch Agent Pipeline
Here is a complete pipeline for batch document classification:
import anthropic
import json
import time
client = anthropic.Anthropic()
def batch_classify_documents(documents: list[dict]) -> dict:
"""
Classify a list of documents using the Batch API.
Each document should have 'id' and 'text' fields.
"""
# Step 1: Build batch requests
requests = []
for doc in documents:
requests.append({
"custom_id": doc["id"],
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 256,
"system": """Classify the document into exactly one category.
Return JSON: {"category": "...", "confidence": 0.0-1.0, "reasoning": "..."}
Categories: technology, finance, healthcare, legal, general""",
"messages": [
{"role": "user", "content": f"Classify this document:\n\n{doc['text']}"}
]
}
})
# Step 2: Submit the batch
batch = client.messages.batches.create(requests=requests)
print(f"Submitted batch {batch.id} with {len(requests)} requests")
# Step 3: Wait for completion
while True:
batch = client.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
completed = batch.request_counts.succeeded + batch.request_counts.errored
total = len(requests)
print(f"Progress: {completed}/{total}")
time.sleep(30)
# Step 4: Collect results
classifications = {}
for result in client.messages.batches.results(batch.id):
if result.result.type == "succeeded":
text = result.result.message.content[0].text
try:
classifications[result.custom_id] = json.loads(text)
except json.JSONDecodeError:
classifications[result.custom_id] = {"category": "parse_error", "raw": text}
else:
classifications[result.custom_id] = {"category": "error"}
return classifications
# Example usage
docs = [
{"id": "doc-001", "text": "The Federal Reserve raised interest rates by 25 basis points..."},
{"id": "doc-002", "text": "NVIDIA announced its next-generation GPU architecture..."},
{"id": "doc-003", "text": "New clinical trials show promising results for mRNA vaccines..."},
]
results = batch_classify_documents(docs)
for doc_id, classification in results.items():
print(f"{doc_id}: {classification['category']} (confidence: {classification.get('confidence', 'N/A')})")
This pipeline handles the full lifecycle: request construction, submission, polling, and result parsing with error handling.
Cost Comparison
The savings from batch processing are straightforward to calculate:
def estimate_batch_savings(
num_requests: int,
avg_input_tokens: int,
avg_output_tokens: int,
model: str = "claude-sonnet-4-20250514"
):
# Sonnet pricing (per million tokens)
standard_input_cost = 3.0 # $/M tokens
standard_output_cost = 15.0 # $/M tokens
batch_discount = 0.5 # 50% discount
total_input = num_requests * avg_input_tokens
total_output = num_requests * avg_output_tokens
standard_cost = (
(total_input / 1_000_000) * standard_input_cost +
(total_output / 1_000_000) * standard_output_cost
)
batch_cost = standard_cost * batch_discount
print(f"Requests: {num_requests:,}")
print(f"Standard API cost: ${standard_cost:.2f}")
print(f"Batch API cost: ${batch_cost:.2f}")
print(f"Savings: ${standard_cost - batch_cost:.2f} ({(1 - batch_discount) * 100:.0f}%)")
# Example: 5,000 document classifications
estimate_batch_savings(
num_requests=5000,
avg_input_tokens=800,
avg_output_tokens=150
)
For 5,000 classification tasks, the Batch API saves roughly 50% compared to real-time API calls. The trade-off is latency — batches may take minutes to hours instead of seconds.
Batch API with Tools
You can include tool definitions in batch requests for agentic tasks, though each batch request is a single turn:
import anthropic
client = anthropic.Anthropic()
requests = [
{
"custom_id": "extract-001",
"params": {
"model": "claude-sonnet-4-20250514",
"max_tokens": 1024,
"tools": [
{
"name": "save_contact",
"description": "Save extracted contact information.",
"input_schema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"email": {"type": "string"},
"company": {"type": "string"},
"role": {"type": "string"}
},
"required": ["name"]
}
}
],
"tool_choice": {"type": "tool", "name": "save_contact"},
"messages": [
{"role": "user", "content": "Extract contact info: John Smith, CTO at Acme Corp, john@acme.com"}
]
}
}
]
batch = client.messages.batches.create(requests=requests)
Using tool_choice to force a specific tool call ensures Claude returns structured data in every batch response, making result parsing predictable.
FAQ
How long does a batch take to process?
Anthropic targets processing within 24 hours, but most batches complete much faster — often within minutes for small batches and a few hours for large ones. The processing time depends on current system load and batch size. Do not design workflows that require batch results within a specific time window.
Is there a maximum batch size?
Yes. Batches can contain up to 100,000 requests. For larger workloads, split into multiple batches and process them in parallel. Each batch is independent, so you can submit several simultaneously.
Can I cancel a batch in progress?
Yes. Call client.messages.batches.cancel(batch_id) to cancel a batch. Requests that have already been processed will still be available in the results, but pending requests will be marked as canceled. This is useful for aborting a batch if you discover an error in your request construction.
#Anthropic #Claude #BatchAPI #CostOptimization #Scalability #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.