Rate Limiting AI Agents: Preventing Abuse and Controlling API Costs
Implement per-user rate limiting, token budgets, sliding window algorithms, and graceful degradation strategies to protect your AI agent system from abuse while controlling LLM API costs.
Why AI Agents Need Rate Limiting
A single LLM API call can cost anywhere from $0.01 to $0.50 depending on the model and token count. An AI agent that makes multiple LLM calls per user request — reasoning, tool execution, re-planning — can burn through $2-5 per interaction. Without rate limiting, a single abusive user or a runaway loop can generate thousands of dollars in API costs within minutes.
Rate limiting for AI agents goes beyond traditional API rate limiting because you need to track not just request counts but also token consumption, tool call frequency, and per-user spending budgets. This post builds a comprehensive rate limiting system using Redis and Python.
The Sliding Window Algorithm
Sliding window rate limiting is more fair than fixed windows because it prevents burst traffic at window boundaries:
import time
import redis
from dataclasses import dataclass
from typing import Optional
@dataclass
class RateLimitConfig:
requests_per_minute: int = 20
requests_per_hour: int = 200
tokens_per_minute: int = 50_000
tokens_per_hour: int = 500_000
max_daily_cost_usd: float = 10.0
@dataclass
class RateLimitResult:
allowed: bool
retry_after_seconds: Optional[int] = None
remaining_requests: int = 0
remaining_tokens: int = 0
reason: Optional[str] = None
class SlidingWindowRateLimiter:
def __init__(self, redis_client: redis.Redis, config: RateLimitConfig):
self.redis = redis_client
self.config = config
def check_request(self, user_id: str) -> RateLimitResult:
"""Check if a request is allowed under all rate limits."""
now = time.time()
# Check requests per minute
minute_key = f"rl:{user_id}:rpm"
minute_count = self._count_in_window(minute_key, now, 60)
if minute_count >= self.config.requests_per_minute:
return RateLimitResult(
allowed=False,
retry_after_seconds=self._time_until_slot(minute_key, now, 60),
reason="Per-minute request limit exceeded",
)
# Check requests per hour
hour_key = f"rl:{user_id}:rph"
hour_count = self._count_in_window(hour_key, now, 3600)
if hour_count >= self.config.requests_per_hour:
return RateLimitResult(
allowed=False,
retry_after_seconds=self._time_until_slot(hour_key, now, 3600),
reason="Per-hour request limit exceeded",
)
# Record this request
pipe = self.redis.pipeline()
pipe.zadd(minute_key, {f"{now}": now})
pipe.expire(minute_key, 120)
pipe.zadd(hour_key, {f"{now}": now})
pipe.expire(hour_key, 7200)
pipe.execute()
return RateLimitResult(
allowed=True,
remaining_requests=self.config.requests_per_minute - minute_count - 1,
remaining_tokens=self.config.tokens_per_minute,
)
def _count_in_window(self, key: str, now: float, window_seconds: int) -> int:
window_start = now - window_seconds
return self.redis.zcount(key, window_start, now)
def _time_until_slot(self, key: str, now: float, window_seconds: int) -> int:
window_start = now - window_seconds
oldest = self.redis.zrangebyscore(key, window_start, now, start=0, num=1)
if oldest:
oldest_time = float(oldest[0])
return max(1, int(oldest_time + window_seconds - now) + 1)
return 1
Token Budget Tracking
Token budgets prevent users from consuming excessive LLM tokens even within request limits:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
class TokenBudgetTracker:
"""Track token consumption per user with rolling budgets."""
# Approximate costs per 1K tokens (input + output blended)
MODEL_COSTS = {
"gpt-4o": 0.0075,
"gpt-4o-mini": 0.0003,
"claude-sonnet-4-20250514": 0.009,
}
def __init__(self, redis_client: redis.Redis, config: RateLimitConfig):
self.redis = redis_client
self.config = config
def check_token_budget(self, user_id: str) -> RateLimitResult:
"""Check if user has remaining token budget."""
now = time.time()
minute_key = f"tokens:{user_id}:tpm"
minute_tokens = self._sum_in_window(minute_key, now, 60)
if minute_tokens >= self.config.tokens_per_minute:
return RateLimitResult(
allowed=False,
reason="Per-minute token budget exhausted",
remaining_tokens=0,
)
return RateLimitResult(
allowed=True,
remaining_tokens=self.config.tokens_per_minute - int(minute_tokens),
)
def record_usage(self, user_id: str, tokens_used: int, model: str) -> None:
"""Record token usage after a successful LLM call."""
now = time.time()
cost_per_1k = self.MODEL_COSTS.get(model, 0.01)
cost = (tokens_used / 1000) * cost_per_1k
pipe = self.redis.pipeline()
# Track tokens
minute_key = f"tokens:{user_id}:tpm"
pipe.zadd(minute_key, {f"{now}:{tokens_used}": now})
pipe.expire(minute_key, 120)
# Track daily cost
daily_key = f"cost:{user_id}:daily:{time.strftime('%Y-%m-%d')}"
pipe.incrbyfloat(daily_key, cost)
pipe.expire(daily_key, 86400 * 2)
pipe.execute()
def check_daily_budget(self, user_id: str) -> RateLimitResult:
daily_key = f"cost:{user_id}:daily:{time.strftime('%Y-%m-%d')}"
current_cost = float(self.redis.get(daily_key) or 0)
if current_cost >= self.config.max_daily_cost_usd:
return RateLimitResult(
allowed=False,
reason=f"Daily budget of ${self.config.max_daily_cost_usd} exhausted",
)
return RateLimitResult(allowed=True)
def _sum_in_window(self, key: str, now: float, window_seconds: int) -> float:
window_start = now - window_seconds
entries = self.redis.zrangebyscore(key, window_start, now)
return sum(int(e.decode().split(":")[1]) for e in entries)
Graceful Degradation
Instead of hard-blocking users when limits are reached, degrade service quality:
class GracefulDegradation:
"""Downgrade service instead of blocking when approaching limits."""
def select_model(self, user_id: str, budget_tracker: TokenBudgetTracker) -> str:
result = budget_tracker.check_daily_budget(user_id)
daily_key = f"cost:{user_id}:daily:{time.strftime('%Y-%m-%d')}"
current_cost = float(budget_tracker.redis.get(daily_key) or 0)
max_cost = budget_tracker.config.max_daily_cost_usd
usage_ratio = current_cost / max_cost if max_cost > 0 else 0
if usage_ratio < 0.5:
return "gpt-4o" # Full quality
elif usage_ratio < 0.8:
return "gpt-4o-mini" # Reduced quality
else:
return "gpt-4o-mini" # Minimal with shorter context
def get_max_tokens(self, usage_ratio: float) -> int:
if usage_ratio < 0.5:
return 4096
elif usage_ratio < 0.8:
return 2048
else:
return 1024
FastAPI Integration
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.responses import JSONResponse
app = FastAPI()
redis_client = redis.Redis(host="localhost", port=6379, db=0)
config = RateLimitConfig()
limiter = SlidingWindowRateLimiter(redis_client, config)
budget = TokenBudgetTracker(redis_client, config)
async def rate_limit_dependency(request: Request):
user_id = request.headers.get("X-User-ID", "anonymous")
result = limiter.check_request(user_id)
if not result.allowed:
raise HTTPException(
status_code=429,
detail=result.reason,
headers={"Retry-After": str(result.retry_after_seconds)},
)
budget_result = budget.check_daily_budget(user_id)
if not budget_result.allowed:
raise HTTPException(status_code=429, detail=budget_result.reason)
return user_id
@app.post("/agent/chat")
async def chat(request: Request, user_id: str = Depends(rate_limit_dependency)):
body = await request.json()
# Process agent request with rate-limited user
return {"response": "Agent response here"}
FAQ
How do I set appropriate rate limits for different user tiers?
Start by measuring actual usage patterns from real users for at least a week before setting limits. Create tiers based on your business model — free users might get 20 requests per hour and a $0.50 daily budget, while paid users get 200 requests per hour and a $20 daily budget. Store tier configurations in your database and load them dynamically rather than hardcoding values.
Should I rate limit per user, per API key, or per IP address?
Use a combination. Per-user limits are the primary control for authenticated users. Per-API-key limits protect against compromised keys. Per-IP limits catch unauthenticated abuse and credential stuffing. Apply the most restrictive matching limit. For agents specifically, also consider per-session limits to prevent a single long-running conversation from consuming too many resources.
How do I handle rate limiting in multi-agent workflows where one user request triggers many LLM calls?
Track the entire workflow as a single "request" for the user-facing rate limit, but track individual LLM calls for the token budget. This way, a user sees consistent request limits regardless of how many internal agent calls their request triggers, but the token budget still provides cost control. Pre-estimate the maximum tokens a workflow might consume and check the budget before starting the workflow, not after each call.
#RateLimiting #APICosts #AISafety #Redis #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.