Skip to content
Learn Agentic AI11 min read0 views

Claude Message Batches: Processing Thousands of Agent Tasks with 50% Cost Savings

Learn how to use the Claude Message Batches API to process thousands of agent tasks asynchronously with 50% cost reduction, including job monitoring, result processing, and error handling.

Why Batch Processing Matters for Agents

Many agent workloads are not real-time. Nightly data classification, bulk document summarization, mass email personalization, and dataset labeling can all tolerate minutes to hours of latency. The Claude Message Batches API is designed for exactly these scenarios — it processes up to 100,000 requests per batch at 50% of the standard API cost with a 24-hour processing window.

For agent systems, this means you can run thousands of independent agent tasks in parallel without managing rate limits, connection pools, or retry logic yourself. Anthropic handles the queuing and execution; you just submit the batch and poll for results.

How the Batches API Works

The flow is straightforward: create a batch of message requests, submit them, poll for completion, and retrieve results. Each request in the batch is a complete Messages API call — it can include tools, system prompts, multi-turn conversations, and all other features.

import anthropic
import json
import time

client = anthropic.Anthropic()

# Step 1: Define individual requests
requests = []
documents = load_documents()  # Your list of documents to process

for i, doc in enumerate(documents):
    requests.append({
        "custom_id": f"doc-{i}",
        "params": {
            "model": "claude-sonnet-4-20250514",
            "max_tokens": 1024,
            "messages": [
                {
                    "role": "user",
                    "content": f"Classify this document and extract key entities:\n\n{doc['text']}"
                }
            ],
        }
    })

Each request needs a custom_id that you use to match results back to inputs. The params field mirrors the standard Messages API parameters exactly.

Submitting a Batch

# Step 2: Create the batch
batch = client.messages.batches.create(requests=requests)

print(f"Batch ID: {batch.id}")
print(f"Status: {batch.processing_status}")
print(f"Total requests: {batch.request_counts.processing}")

The batch is now queued for processing. Anthropic guarantees completion within 24 hours, but most batches finish much faster — small batches (under 1,000 requests) typically complete in minutes.

Monitoring Batch Progress

Poll the batch status to track progress:

def wait_for_batch(batch_id: str, poll_interval: int = 30) -> dict:
    """Poll batch status until completion."""
    while True:
        batch = client.messages.batches.retrieve(batch_id)

        succeeded = batch.request_counts.succeeded
        errored = batch.request_counts.errored
        total = batch.request_counts.processing + succeeded + errored

        print(f"Progress: {succeeded + errored}/{total} "
              f"(succeeded: {succeeded}, errored: {errored})")

        if batch.processing_status == "ended":
            return batch

        time.sleep(poll_interval)

completed_batch = wait_for_batch(batch.id)

For production systems, replace polling with webhooks or a task queue like Celery that checks batch status on a schedule.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Retrieving and Processing Results

Once the batch completes, stream the results:

# Step 3: Retrieve results
results = {}
for result in client.messages.batches.results(completed_batch.id):
    custom_id = result.custom_id

    if result.result.type == "succeeded":
        message = result.result.message
        text = message.content[0].text
        results[custom_id] = {"status": "success", "output": text}

    elif result.result.type == "errored":
        error = result.result.error
        results[custom_id] = {"status": "error", "error": str(error)}

    elif result.result.type == "expired":
        results[custom_id] = {"status": "expired"}

print(f"Processed {len(results)} results")
print(f"Succeeded: {sum(1 for r in results.values() if r['status'] == 'success')}")
print(f"Failed: {sum(1 for r in results.values() if r['status'] != 'success')}")

Results stream back as an iterator, so you can process them without loading everything into memory at once.

Batch Requests with Tool Use

Batch requests support the full tool use API. This means you can run agent-like workflows in batch mode, though each batch request gets a single turn — no iterative agent loop:

classification_tool = {
    "name": "classify_document",
    "description": "Classify a document into categories",
    "input_schema": {
        "type": "object",
        "properties": {
            "category": {
                "type": "string",
                "enum": ["legal", "financial", "technical", "marketing", "other"]
            },
            "confidence": {"type": "number"},
            "entities": {
                "type": "array",
                "items": {"type": "string"}
            }
        },
        "required": ["category", "confidence", "entities"]
    }
}

# Force structured output via tool_choice
batch_requests = []
for i, doc in enumerate(documents):
    batch_requests.append({
        "custom_id": f"classify-{i}",
        "params": {
            "model": "claude-sonnet-4-20250514",
            "max_tokens": 512,
            "tools": [classification_tool],
            "tool_choice": {"type": "tool", "name": "classify_document"},
            "messages": [
                {"role": "user", "content": f"Classify this document:\n\n{doc['text']}"}
            ],
        }
    })

By forcing tool use with tool_choice, every response will contain a structured tool_use block that you can parse directly — no text extraction needed.

Error Handling and Retries

Build resilience into your batch pipeline:

def submit_with_retry(requests: list, max_retries: int = 3) -> str:
    for attempt in range(max_retries):
        try:
            batch = client.messages.batches.create(requests=requests)
            return batch.id
        except anthropic.APIError as e:
            if attempt == max_retries - 1:
                raise
            print(f"Attempt {attempt + 1} failed: {e}. Retrying...")
            time.sleep(2 ** attempt)

def resubmit_failures(batch_id: str, original_requests: dict) -> str:
    """Collect failed requests and resubmit them as a new batch."""
    failed_requests = []
    for result in client.messages.batches.results(batch_id):
        if result.result.type != "succeeded":
            # Find the original request by custom_id
            original = original_requests[result.custom_id]
            failed_requests.append(original)

    if not failed_requests:
        return None

    print(f"Resubmitting {len(failed_requests)} failed requests")
    return submit_with_retry(failed_requests)

FAQ

What is the maximum batch size?

Each batch can contain up to 100,000 requests. If you have more than that, split them into multiple batches and submit them concurrently. Each request can use up to the model's full context window and max output tokens.

Can I cancel a running batch?

Yes, call client.messages.batches.cancel(batch_id) to cancel a batch in progress. Requests that have already completed will still be available in the results. Requests that were not yet processed will be marked as canceled.

How much does batch processing actually save?

Batch processing costs exactly 50% of the standard API pricing for both input and output tokens. For a workflow processing 10,000 documents at an average of 2,000 input tokens and 500 output tokens each, the savings are substantial — potentially hundreds of dollars per run compared to real-time API calls.


#Claude #BatchProcessing #CostOptimization #Async #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.