Skip to content
Learn Agentic AI10 min read0 views

Batch Processing for Cost Reduction: When Real-Time Isn't Necessary

Learn when and how to use batch processing to cut AI agent costs by up to 50%. Covers batch API usage, queue-based architectures, priority tiers, and SLA tradeoffs for non-time-critical agent workloads.

Not Everything Needs a Real-Time Response

Many AI agent workloads do not require sub-second responses. Content generation, document summarization, bulk classification, email drafting, report generation, and data enrichment can all tolerate latency of minutes or even hours. Batch processing these workloads can reduce costs by 50% compared to synchronous API calls — OpenAI’s Batch API, for example, offers a flat 50% discount for requests processed within a 24-hour window.

The key insight is to separate your agent’s workloads into latency tiers and use the cheapest processing method for each.

OpenAI Batch API Integration

import json
import time
from pathlib import Path
from typing import List
import openai

class BatchProcessor:
    def __init__(self, client: openai.OpenAI):
        self.client = client

    def prepare_batch_file(
        self,
        requests: List[dict],
        output_path: str = "batch_input.jsonl",
    ) -> str:
        with open(output_path, "w") as f:
            for i, req in enumerate(requests):
                batch_request = {
                    "custom_id": req.get("id", f"req-{i}"),
                    "method": "POST",
                    "url": "/v1/chat/completions",
                    "body": {
                        "model": req.get("model", "gpt-4o-mini"),
                        "messages": req["messages"],
                        "max_tokens": req.get("max_tokens", 1024),
                    },
                }
                f.write(json.dumps(batch_request) + "\n")
        return output_path

    def submit_batch(self, file_path: str) -> str:
        with open(file_path, "rb") as f:
            uploaded = self.client.files.create(file=f, purpose="batch")
        batch = self.client.batches.create(
            input_file_id=uploaded.id,
            endpoint="/v1/chat/completions",
            completion_window="24h",
        )
        return batch.id

    def check_status(self, batch_id: str) -> dict:
        batch = self.client.batches.retrieve(batch_id)
        return {
            "id": batch.id,
            "status": batch.status,
            "total": batch.request_counts.total,
            "completed": batch.request_counts.completed,
            "failed": batch.request_counts.failed,
        }

    def retrieve_results(self, batch_id: str) -> List[dict]:
        batch = self.client.batches.retrieve(batch_id)
        if batch.status != "completed":
            raise ValueError(f"Batch not complete: {batch.status}")
        content = self.client.files.content(batch.output_file_id)
        results = []
        for line in content.text.strip().split("\n"):
            results.append(json.loads(line))
        return results

Queue-Based Processing Architecture

For more control than the batch API provides, build your own queue-based system that processes agent tasks at configurable rates.

import asyncio
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, List
from collections import deque

class Priority(Enum):
    CRITICAL = 0   # process immediately (real-time)
    HIGH = 1       # process within 1 minute
    NORMAL = 2     # process within 1 hour
    LOW = 3        # process within 24 hours (batch eligible)

@dataclass
class AgentTask:
    task_id: str
    priority: Priority
    payload: dict
    created_at: float = field(default_factory=time.time)
    result: dict = field(default_factory=dict)

class PriorityQueueProcessor:
    def __init__(self):
        self.queues: dict[Priority, deque] = {p: deque() for p in Priority}
        self.batch_buffer: List[AgentTask] = []
        self.batch_size = 50

    def enqueue(self, task: AgentTask):
        if task.priority == Priority.LOW:
            self.batch_buffer.append(task)
            if len(self.batch_buffer) >= self.batch_size:
                self._flush_batch()
        else:
            self.queues[task.priority].append(task)

    def _flush_batch(self):
        """Send accumulated low-priority tasks as a batch."""
        batch = self.batch_buffer[:self.batch_size]
        self.batch_buffer = self.batch_buffer[self.batch_size:]
        print(f"Flushing batch of {len(batch)} tasks for batch processing")
        # Submit to batch API here

    def next_task(self) -> AgentTask | None:
        for priority in Priority:
            if priority == Priority.LOW:
                continue  # handled via batch
            if self.queues[priority]:
                return self.queues[priority].popleft()
        return None

    def stats(self) -> dict:
        return {
            p.name: len(q) for p, q in self.queues.items()
        } | {"batch_buffer": len(self.batch_buffer)}

Classifying Workloads by Latency Tier

WORKLOAD_TIERS = {
    Priority.CRITICAL: [
        "live_chat_response",
        "voice_agent_reply",
        "safety_check",
    ],
    Priority.HIGH: [
        "email_draft",
        "ticket_classification",
        "escalation_decision",
    ],
    Priority.NORMAL: [
        "meeting_summary",
        "document_analysis",
        "lead_scoring",
    ],
    Priority.LOW: [
        "content_generation",
        "bulk_classification",
        "data_enrichment",
        "report_generation",
    ],
}

def classify_workload(task_type: str) -> Priority:
    for priority, types in WORKLOAD_TIERS.items():
        if task_type in types:
            return priority
    return Priority.NORMAL

Cost Comparison

The economics are compelling. A typical workload mix might be 20% critical, 25% high, 30% normal, and 25% low priority. By routing the low-priority tasks through the batch API at 50% discount and queuing normal tasks for off-peak processing, total LLM spend drops 25–35% without any quality compromise.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

SLA Tradeoffs

Every batch processing decision is an SLA tradeoff. Document these tradeoffs explicitly for your team: critical tasks get sub-second response times at full price, high-priority tasks get under-a-minute responses at full price, normal tasks can tolerate an hour and might benefit from off-peak pricing, and low-priority tasks accept 24-hour turnaround for 50% savings.

FAQ

When should I NOT use batch processing?

Never batch safety-critical checks (content moderation, fraud detection), live user-facing interactions (chat, voice), or time-sensitive decisions (escalation routing, alerts). The rule is simple: if a delayed response would cause user frustration, revenue loss, or safety risk, process it synchronously.

How do I handle failures in batch processing?

Implement a dead-letter queue for failed batch items and retry them individually. Track failure rates per batch and set up alerts if failures exceed 5%. For the batch API specifically, check the failed count in the batch status response and re-submit failed items in the next batch.

Can I combine batch processing with model routing?

Yes, and this is a powerful combination. Route low-priority tasks to the cheapest model via the batch API for compounding savings. A task that would cost $0.01 with GPT-4o synchronously might cost $0.0003 with GPT-4o-mini via batch API — a 97% reduction.


#BatchProcessing #CostReduction #QueueArchitecture #OpenAIBatchAPI #SLAManagement #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.