Batch Processing for Cost Reduction: When Real-Time Isn't Necessary
Learn when and how to use batch processing to cut AI agent costs by up to 50%. Covers batch API usage, queue-based architectures, priority tiers, and SLA tradeoffs for non-time-critical agent workloads.
Not Everything Needs a Real-Time Response
Many AI agent workloads do not require sub-second responses. Content generation, document summarization, bulk classification, email drafting, report generation, and data enrichment can all tolerate latency of minutes or even hours. Batch processing these workloads can reduce costs by 50% compared to synchronous API calls — OpenAI’s Batch API, for example, offers a flat 50% discount for requests processed within a 24-hour window.
The key insight is to separate your agent’s workloads into latency tiers and use the cheapest processing method for each.
OpenAI Batch API Integration
import json
import time
from pathlib import Path
from typing import List
import openai
class BatchProcessor:
def __init__(self, client: openai.OpenAI):
self.client = client
def prepare_batch_file(
self,
requests: List[dict],
output_path: str = "batch_input.jsonl",
) -> str:
with open(output_path, "w") as f:
for i, req in enumerate(requests):
batch_request = {
"custom_id": req.get("id", f"req-{i}"),
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": req.get("model", "gpt-4o-mini"),
"messages": req["messages"],
"max_tokens": req.get("max_tokens", 1024),
},
}
f.write(json.dumps(batch_request) + "\n")
return output_path
def submit_batch(self, file_path: str) -> str:
with open(file_path, "rb") as f:
uploaded = self.client.files.create(file=f, purpose="batch")
batch = self.client.batches.create(
input_file_id=uploaded.id,
endpoint="/v1/chat/completions",
completion_window="24h",
)
return batch.id
def check_status(self, batch_id: str) -> dict:
batch = self.client.batches.retrieve(batch_id)
return {
"id": batch.id,
"status": batch.status,
"total": batch.request_counts.total,
"completed": batch.request_counts.completed,
"failed": batch.request_counts.failed,
}
def retrieve_results(self, batch_id: str) -> List[dict]:
batch = self.client.batches.retrieve(batch_id)
if batch.status != "completed":
raise ValueError(f"Batch not complete: {batch.status}")
content = self.client.files.content(batch.output_file_id)
results = []
for line in content.text.strip().split("\n"):
results.append(json.loads(line))
return results
Queue-Based Processing Architecture
For more control than the batch API provides, build your own queue-based system that processes agent tasks at configurable rates.
import asyncio
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable, List
from collections import deque
class Priority(Enum):
CRITICAL = 0 # process immediately (real-time)
HIGH = 1 # process within 1 minute
NORMAL = 2 # process within 1 hour
LOW = 3 # process within 24 hours (batch eligible)
@dataclass
class AgentTask:
task_id: str
priority: Priority
payload: dict
created_at: float = field(default_factory=time.time)
result: dict = field(default_factory=dict)
class PriorityQueueProcessor:
def __init__(self):
self.queues: dict[Priority, deque] = {p: deque() for p in Priority}
self.batch_buffer: List[AgentTask] = []
self.batch_size = 50
def enqueue(self, task: AgentTask):
if task.priority == Priority.LOW:
self.batch_buffer.append(task)
if len(self.batch_buffer) >= self.batch_size:
self._flush_batch()
else:
self.queues[task.priority].append(task)
def _flush_batch(self):
"""Send accumulated low-priority tasks as a batch."""
batch = self.batch_buffer[:self.batch_size]
self.batch_buffer = self.batch_buffer[self.batch_size:]
print(f"Flushing batch of {len(batch)} tasks for batch processing")
# Submit to batch API here
def next_task(self) -> AgentTask | None:
for priority in Priority:
if priority == Priority.LOW:
continue # handled via batch
if self.queues[priority]:
return self.queues[priority].popleft()
return None
def stats(self) -> dict:
return {
p.name: len(q) for p, q in self.queues.items()
} | {"batch_buffer": len(self.batch_buffer)}
Classifying Workloads by Latency Tier
WORKLOAD_TIERS = {
Priority.CRITICAL: [
"live_chat_response",
"voice_agent_reply",
"safety_check",
],
Priority.HIGH: [
"email_draft",
"ticket_classification",
"escalation_decision",
],
Priority.NORMAL: [
"meeting_summary",
"document_analysis",
"lead_scoring",
],
Priority.LOW: [
"content_generation",
"bulk_classification",
"data_enrichment",
"report_generation",
],
}
def classify_workload(task_type: str) -> Priority:
for priority, types in WORKLOAD_TIERS.items():
if task_type in types:
return priority
return Priority.NORMAL
Cost Comparison
The economics are compelling. A typical workload mix might be 20% critical, 25% high, 30% normal, and 25% low priority. By routing the low-priority tasks through the batch API at 50% discount and queuing normal tasks for off-peak processing, total LLM spend drops 25–35% without any quality compromise.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
SLA Tradeoffs
Every batch processing decision is an SLA tradeoff. Document these tradeoffs explicitly for your team: critical tasks get sub-second response times at full price, high-priority tasks get under-a-minute responses at full price, normal tasks can tolerate an hour and might benefit from off-peak pricing, and low-priority tasks accept 24-hour turnaround for 50% savings.
FAQ
When should I NOT use batch processing?
Never batch safety-critical checks (content moderation, fraud detection), live user-facing interactions (chat, voice), or time-sensitive decisions (escalation routing, alerts). The rule is simple: if a delayed response would cause user frustration, revenue loss, or safety risk, process it synchronously.
How do I handle failures in batch processing?
Implement a dead-letter queue for failed batch items and retry them individually. Track failure rates per batch and set up alerts if failures exceed 5%. For the batch API specifically, check the failed count in the batch status response and re-submit failed items in the next batch.
Can I combine batch processing with model routing?
Yes, and this is a powerful combination. Route low-priority tasks to the cheapest model via the batch API for compounding savings. A task that would cost $0.01 with GPT-4o synchronously might cost $0.0003 with GPT-4o-mini via batch API — a 97% reduction.
#BatchProcessing #CostReduction #QueueArchitecture #OpenAIBatchAPI #SLAManagement #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.