Skip to content
Back to Blog
Agentic AI5 min read

Claude API Batching: Processing Thousands of Requests Cost-Effectively

Master the Claude Message Batches API for high-volume, cost-effective processing. Learn how to submit batch jobs, poll for results, handle errors, and save 50% on Claude API costs for non-real-time workloads.

What Is the Message Batches API?

The Claude Message Batches API allows you to submit up to 10,000 requests in a single batch and receive results asynchronously. Each request in the batch gets a 50% discount on both input and output tokens compared to the standard Messages API.

The tradeoff: batches can take up to 24 hours to complete (though most finish within 1-2 hours). This makes the Batch API ideal for workloads that do not require real-time responses.

Ideal Use Cases

  • Document classification across thousands of files
  • Bulk content moderation
  • Dataset annotation and labeling
  • Nightly report generation
  • Mass email personalization
  • Code analysis across a large codebase
  • Evaluation and testing of prompts at scale

Submitting a Batch

from anthropic import Anthropic

client = Anthropic()

# Each request in the batch follows the standard Messages API format
requests = []
for i, document in enumerate(documents):
    requests.append({
        "custom_id": f"doc-{i}",  # Your identifier for tracking
        "params": {
            "model": "claude-sonnet-4-5-20250514",
            "max_tokens": 1024,
            "messages": [{
                "role": "user",
                "content": f"Classify this document into one of: [legal, financial, technical, marketing].\n\nDocument:\n{document}"
            }]
        }
    })

# Submit the batch
batch = client.messages.batches.create(requests=requests)

print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
print(f"Total requests: {batch.request_counts.total}")

Polling for Results

import time

def wait_for_batch(batch_id: str, poll_interval: int = 30) -> dict:
    """Poll until batch completes."""
    while True:
        batch = client.messages.batches.retrieve(batch_id)

        print(f"Status: {batch.processing_status}")
        print(f"  Succeeded: {batch.request_counts.succeeded}")
        print(f"  Errored: {batch.request_counts.errored}")
        print(f"  Processing: {batch.request_counts.processing}")

        if batch.processing_status == "ended":
            return batch

        time.sleep(poll_interval)

batch_result = wait_for_batch(batch.id)

Retrieving Results

def get_batch_results(batch_id: str) -> dict[str, str]:
    """Retrieve all results from a completed batch."""
    results = {}

    for result in client.messages.batches.results(batch_id):
        custom_id = result.custom_id

        if result.result.type == "succeeded":
            message = result.result.message
            text = message.content[0].text
            results[custom_id] = {
                "status": "success",
                "text": text,
                "input_tokens": message.usage.input_tokens,
                "output_tokens": message.usage.output_tokens,
            }
        elif result.result.type == "errored":
            results[custom_id] = {
                "status": "error",
                "error": str(result.result.error),
            }
        elif result.result.type == "expired":
            results[custom_id] = {
                "status": "expired",
            }

    return results

results = get_batch_results(batch.id)
for custom_id, result in results.items():
    if result["status"] == "success":
        print(f"{custom_id}: {result['text'][:100]}...")

Production Batch Pipeline

Here is a complete pipeline for batch-processing a dataset:

import json
import asyncio
from pathlib import Path
from datetime import datetime

class BatchPipeline:
    def __init__(self, client: Anthropic, output_dir: str = "./batch_results"):
        self.client = client
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)

    def prepare_requests(
        self,
        items: list[dict],
        system_prompt: str,
        user_template: str,
        model: str = "claude-sonnet-4-5-20250514",
        max_tokens: int = 1024,
    ) -> list[dict]:
        """Convert items into batch request format."""
        requests = []
        for item in items:
            user_content = user_template.format(**item)
            requests.append({
                "custom_id": str(item.get("id", len(requests))),
                "params": {
                    "model": model,
                    "max_tokens": max_tokens,
                    "system": system_prompt,
                    "messages": [{"role": "user", "content": user_content}],
                }
            })
        return requests

    def submit(self, requests: list[dict]) -> str:
        """Submit batch and return batch ID."""
        # Batch API supports up to 10,000 requests
        if len(requests) > 10_000:
            raise ValueError(f"Too many requests: {len(requests)} (max 10,000)")

        batch = self.client.messages.batches.create(requests=requests)

        # Save metadata
        metadata = {
            "batch_id": batch.id,
            "submitted_at": datetime.utcnow().isoformat(),
            "total_requests": len(requests),
        }
        with open(self.output_dir / f"{batch.id}_metadata.json", "w") as f:
            json.dump(metadata, f)

        return batch.id

    def collect_results(self, batch_id: str) -> list[dict]:
        """Wait for completion and collect all results."""
        batch = self._wait(batch_id)
        results = []

        for result in self.client.messages.batches.results(batch_id):
            entry = {"custom_id": result.custom_id}
            if result.result.type == "succeeded":
                msg = result.result.message
                entry["output"] = msg.content[0].text
                entry["usage"] = {
                    "input": msg.usage.input_tokens,
                    "output": msg.usage.output_tokens,
                }
            else:
                entry["error"] = result.result.type

            results.append(entry)

        # Save results
        with open(self.output_dir / f"{batch_id}_results.json", "w") as f:
            json.dump(results, f, indent=2)

        return results

    def _wait(self, batch_id: str):
        while True:
            batch = self.client.messages.batches.retrieve(batch_id)
            if batch.processing_status == "ended":
                return batch
            time.sleep(30)

Usage Example

pipeline = BatchPipeline(client)

# Prepare 5,000 classification requests
items = [{"id": f"doc-{i}", "text": doc} for i, doc in enumerate(documents)]

requests = pipeline.prepare_requests(
    items=items,
    system_prompt="Classify documents into categories. Return JSON with 'category' and 'confidence'.",
    user_template="Classify this document:\n\n{text}",
    model="claude-haiku-4-5-20250514",  # Use Haiku for simple classification
    max_tokens=256,
)

batch_id = pipeline.submit(requests)
results = pipeline.collect_results(batch_id)

# Analyze results
succeeded = [r for r in results if "output" in r]
failed = [r for r in results if "error" in r]
print(f"Success: {len(succeeded)}, Failed: {len(failed)}")

Cost Comparison

Processing 10,000 documents with an average of 500 input tokens and 100 output tokens each:

Method Input Cost Output Cost Total Time
Standard API (Sonnet) $15.00 $15.00 $30.00 ~2 hours (rate limited)
Batch API (Sonnet) $7.50 $7.50 $15.00 1-2 hours
Standard API (Haiku) $5.00 $5.00 $10.00 ~1 hour
Batch API (Haiku) $2.50 $2.50 $5.00 1-2 hours

The Batch API saves 50% on cost with comparable or better throughput for large workloads.

Error Handling and Retries

Batches can have partial failures. Always handle errors per-request:

def handle_batch_errors(batch_id: str) -> list[dict]:
    """Collect failed requests for retry."""
    failed = []
    for result in client.messages.batches.results(batch_id):
        if result.result.type == "errored":
            failed.append({
                "custom_id": result.custom_id,
                "error": str(result.result.error),
            })
        elif result.result.type == "expired":
            failed.append({
                "custom_id": result.custom_id,
                "error": "expired",
            })
    return failed

# Retry failed requests in a new batch
failed = handle_batch_errors(batch_id)
if failed:
    retry_requests = [
        original_requests[r["custom_id"]]
        for r in failed
        if r["custom_id"] in original_requests
    ]
    if retry_requests:
        retry_batch = client.messages.batches.create(requests=retry_requests)

Canceling a Batch

If you need to stop a batch that is in progress:

# Cancel a running batch
client.messages.batches.cancel(batch_id)

# Results for already-completed requests are still available
# Only pending requests are canceled

Best Practices

  1. Use meaningful custom_ids that map back to your data source for easy result matching
  2. Save batch IDs immediately after submission -- you need them to retrieve results
  3. Monitor batch progress with periodic polling, especially for time-sensitive workflows
  4. Implement idempotency -- design your pipeline so resubmitting the same batch is safe
  5. Chunk large datasets into multiple batches of 10,000 if needed
  6. Use the cheapest model that meets your quality requirements -- Haiku with Batch API is extremely cost-effective for classification and extraction tasks
Share this article
N

NYC News

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.