Claude API Batching: Processing Thousands of Requests Cost-Effectively
Master the Claude Message Batches API for high-volume, cost-effective processing. Learn how to submit batch jobs, poll for results, handle errors, and save 50% on Claude API costs for non-real-time workloads.
What Is the Message Batches API?
The Claude Message Batches API allows you to submit up to 10,000 requests in a single batch and receive results asynchronously. Each request in the batch gets a 50% discount on both input and output tokens compared to the standard Messages API.
The tradeoff: batches can take up to 24 hours to complete (though most finish within 1-2 hours). This makes the Batch API ideal for workloads that do not require real-time responses.
Ideal Use Cases
- Document classification across thousands of files
- Bulk content moderation
- Dataset annotation and labeling
- Nightly report generation
- Mass email personalization
- Code analysis across a large codebase
- Evaluation and testing of prompts at scale
Submitting a Batch
from anthropic import Anthropic
client = Anthropic()
# Each request in the batch follows the standard Messages API format
requests = []
for i, document in enumerate(documents):
requests.append({
"custom_id": f"doc-{i}", # Your identifier for tracking
"params": {
"model": "claude-sonnet-4-5-20250514",
"max_tokens": 1024,
"messages": [{
"role": "user",
"content": f"Classify this document into one of: [legal, financial, technical, marketing].\n\nDocument:\n{document}"
}]
}
})
# Submit the batch
batch = client.messages.batches.create(requests=requests)
print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
print(f"Total requests: {batch.request_counts.total}")
Polling for Results
import time
def wait_for_batch(batch_id: str, poll_interval: int = 30) -> dict:
"""Poll until batch completes."""
while True:
batch = client.messages.batches.retrieve(batch_id)
print(f"Status: {batch.processing_status}")
print(f" Succeeded: {batch.request_counts.succeeded}")
print(f" Errored: {batch.request_counts.errored}")
print(f" Processing: {batch.request_counts.processing}")
if batch.processing_status == "ended":
return batch
time.sleep(poll_interval)
batch_result = wait_for_batch(batch.id)
Retrieving Results
def get_batch_results(batch_id: str) -> dict[str, str]:
"""Retrieve all results from a completed batch."""
results = {}
for result in client.messages.batches.results(batch_id):
custom_id = result.custom_id
if result.result.type == "succeeded":
message = result.result.message
text = message.content[0].text
results[custom_id] = {
"status": "success",
"text": text,
"input_tokens": message.usage.input_tokens,
"output_tokens": message.usage.output_tokens,
}
elif result.result.type == "errored":
results[custom_id] = {
"status": "error",
"error": str(result.result.error),
}
elif result.result.type == "expired":
results[custom_id] = {
"status": "expired",
}
return results
results = get_batch_results(batch.id)
for custom_id, result in results.items():
if result["status"] == "success":
print(f"{custom_id}: {result['text'][:100]}...")
Production Batch Pipeline
Here is a complete pipeline for batch-processing a dataset:
import json
import asyncio
from pathlib import Path
from datetime import datetime
class BatchPipeline:
def __init__(self, client: Anthropic, output_dir: str = "./batch_results"):
self.client = client
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
def prepare_requests(
self,
items: list[dict],
system_prompt: str,
user_template: str,
model: str = "claude-sonnet-4-5-20250514",
max_tokens: int = 1024,
) -> list[dict]:
"""Convert items into batch request format."""
requests = []
for item in items:
user_content = user_template.format(**item)
requests.append({
"custom_id": str(item.get("id", len(requests))),
"params": {
"model": model,
"max_tokens": max_tokens,
"system": system_prompt,
"messages": [{"role": "user", "content": user_content}],
}
})
return requests
def submit(self, requests: list[dict]) -> str:
"""Submit batch and return batch ID."""
# Batch API supports up to 10,000 requests
if len(requests) > 10_000:
raise ValueError(f"Too many requests: {len(requests)} (max 10,000)")
batch = self.client.messages.batches.create(requests=requests)
# Save metadata
metadata = {
"batch_id": batch.id,
"submitted_at": datetime.utcnow().isoformat(),
"total_requests": len(requests),
}
with open(self.output_dir / f"{batch.id}_metadata.json", "w") as f:
json.dump(metadata, f)
return batch.id
def collect_results(self, batch_id: str) -> list[dict]:
"""Wait for completion and collect all results."""
batch = self._wait(batch_id)
results = []
for result in self.client.messages.batches.results(batch_id):
entry = {"custom_id": result.custom_id}
if result.result.type == "succeeded":
msg = result.result.message
entry["output"] = msg.content[0].text
entry["usage"] = {
"input": msg.usage.input_tokens,
"output": msg.usage.output_tokens,
}
else:
entry["error"] = result.result.type
results.append(entry)
# Save results
with open(self.output_dir / f"{batch_id}_results.json", "w") as f:
json.dump(results, f, indent=2)
return results
def _wait(self, batch_id: str):
while True:
batch = self.client.messages.batches.retrieve(batch_id)
if batch.processing_status == "ended":
return batch
time.sleep(30)
Usage Example
pipeline = BatchPipeline(client)
# Prepare 5,000 classification requests
items = [{"id": f"doc-{i}", "text": doc} for i, doc in enumerate(documents)]
requests = pipeline.prepare_requests(
items=items,
system_prompt="Classify documents into categories. Return JSON with 'category' and 'confidence'.",
user_template="Classify this document:\n\n{text}",
model="claude-haiku-4-5-20250514", # Use Haiku for simple classification
max_tokens=256,
)
batch_id = pipeline.submit(requests)
results = pipeline.collect_results(batch_id)
# Analyze results
succeeded = [r for r in results if "output" in r]
failed = [r for r in results if "error" in r]
print(f"Success: {len(succeeded)}, Failed: {len(failed)}")
Cost Comparison
Processing 10,000 documents with an average of 500 input tokens and 100 output tokens each:
| Method | Input Cost | Output Cost | Total | Time |
|---|---|---|---|---|
| Standard API (Sonnet) | $15.00 | $15.00 | $30.00 | ~2 hours (rate limited) |
| Batch API (Sonnet) | $7.50 | $7.50 | $15.00 | 1-2 hours |
| Standard API (Haiku) | $5.00 | $5.00 | $10.00 | ~1 hour |
| Batch API (Haiku) | $2.50 | $2.50 | $5.00 | 1-2 hours |
The Batch API saves 50% on cost with comparable or better throughput for large workloads.
Error Handling and Retries
Batches can have partial failures. Always handle errors per-request:
def handle_batch_errors(batch_id: str) -> list[dict]:
"""Collect failed requests for retry."""
failed = []
for result in client.messages.batches.results(batch_id):
if result.result.type == "errored":
failed.append({
"custom_id": result.custom_id,
"error": str(result.result.error),
})
elif result.result.type == "expired":
failed.append({
"custom_id": result.custom_id,
"error": "expired",
})
return failed
# Retry failed requests in a new batch
failed = handle_batch_errors(batch_id)
if failed:
retry_requests = [
original_requests[r["custom_id"]]
for r in failed
if r["custom_id"] in original_requests
]
if retry_requests:
retry_batch = client.messages.batches.create(requests=retry_requests)
Canceling a Batch
If you need to stop a batch that is in progress:
# Cancel a running batch
client.messages.batches.cancel(batch_id)
# Results for already-completed requests are still available
# Only pending requests are canceled
Best Practices
- Use meaningful custom_ids that map back to your data source for easy result matching
- Save batch IDs immediately after submission -- you need them to retrieve results
- Monitor batch progress with periodic polling, especially for time-sensitive workflows
- Implement idempotency -- design your pipeline so resubmitting the same batch is safe
- Chunk large datasets into multiple batches of 10,000 if needed
- Use the cheapest model that meets your quality requirements -- Haiku with Batch API is extremely cost-effective for classification and extraction tasks
NYC News
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.