Skip to content
Learn Agentic AI11 min read0 views

Rate Limiting AI Agents: Preventing Abuse and Controlling API Costs

Implement per-user rate limiting, token budgets, sliding window algorithms, and graceful degradation strategies to protect your AI agent system from abuse while controlling LLM API costs.

Why AI Agents Need Rate Limiting

A single LLM API call can cost anywhere from $0.01 to $0.50 depending on the model and token count. An AI agent that makes multiple LLM calls per user request — reasoning, tool execution, re-planning — can burn through $2-5 per interaction. Without rate limiting, a single abusive user or a runaway loop can generate thousands of dollars in API costs within minutes.

Rate limiting for AI agents goes beyond traditional API rate limiting because you need to track not just request counts but also token consumption, tool call frequency, and per-user spending budgets. This post builds a comprehensive rate limiting system using Redis and Python.

The Sliding Window Algorithm

Sliding window rate limiting is more fair than fixed windows because it prevents burst traffic at window boundaries:

import time
import redis
from dataclasses import dataclass
from typing import Optional

@dataclass
class RateLimitConfig:
    requests_per_minute: int = 20
    requests_per_hour: int = 200
    tokens_per_minute: int = 50_000
    tokens_per_hour: int = 500_000
    max_daily_cost_usd: float = 10.0

@dataclass
class RateLimitResult:
    allowed: bool
    retry_after_seconds: Optional[int] = None
    remaining_requests: int = 0
    remaining_tokens: int = 0
    reason: Optional[str] = None

class SlidingWindowRateLimiter:
    def __init__(self, redis_client: redis.Redis, config: RateLimitConfig):
        self.redis = redis_client
        self.config = config

    def check_request(self, user_id: str) -> RateLimitResult:
        """Check if a request is allowed under all rate limits."""
        now = time.time()

        # Check requests per minute
        minute_key = f"rl:{user_id}:rpm"
        minute_count = self._count_in_window(minute_key, now, 60)
        if minute_count >= self.config.requests_per_minute:
            return RateLimitResult(
                allowed=False,
                retry_after_seconds=self._time_until_slot(minute_key, now, 60),
                reason="Per-minute request limit exceeded",
            )

        # Check requests per hour
        hour_key = f"rl:{user_id}:rph"
        hour_count = self._count_in_window(hour_key, now, 3600)
        if hour_count >= self.config.requests_per_hour:
            return RateLimitResult(
                allowed=False,
                retry_after_seconds=self._time_until_slot(hour_key, now, 3600),
                reason="Per-hour request limit exceeded",
            )

        # Record this request
        pipe = self.redis.pipeline()
        pipe.zadd(minute_key, {f"{now}": now})
        pipe.expire(minute_key, 120)
        pipe.zadd(hour_key, {f"{now}": now})
        pipe.expire(hour_key, 7200)
        pipe.execute()

        return RateLimitResult(
            allowed=True,
            remaining_requests=self.config.requests_per_minute - minute_count - 1,
            remaining_tokens=self.config.tokens_per_minute,
        )

    def _count_in_window(self, key: str, now: float, window_seconds: int) -> int:
        window_start = now - window_seconds
        return self.redis.zcount(key, window_start, now)

    def _time_until_slot(self, key: str, now: float, window_seconds: int) -> int:
        window_start = now - window_seconds
        oldest = self.redis.zrangebyscore(key, window_start, now, start=0, num=1)
        if oldest:
            oldest_time = float(oldest[0])
            return max(1, int(oldest_time + window_seconds - now) + 1)
        return 1

Token Budget Tracking

Token budgets prevent users from consuming excessive LLM tokens even within request limits:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

class TokenBudgetTracker:
    """Track token consumption per user with rolling budgets."""

    # Approximate costs per 1K tokens (input + output blended)
    MODEL_COSTS = {
        "gpt-4o": 0.0075,
        "gpt-4o-mini": 0.0003,
        "claude-sonnet-4-20250514": 0.009,
    }

    def __init__(self, redis_client: redis.Redis, config: RateLimitConfig):
        self.redis = redis_client
        self.config = config

    def check_token_budget(self, user_id: str) -> RateLimitResult:
        """Check if user has remaining token budget."""
        now = time.time()

        minute_key = f"tokens:{user_id}:tpm"
        minute_tokens = self._sum_in_window(minute_key, now, 60)

        if minute_tokens >= self.config.tokens_per_minute:
            return RateLimitResult(
                allowed=False,
                reason="Per-minute token budget exhausted",
                remaining_tokens=0,
            )

        return RateLimitResult(
            allowed=True,
            remaining_tokens=self.config.tokens_per_minute - int(minute_tokens),
        )

    def record_usage(self, user_id: str, tokens_used: int, model: str) -> None:
        """Record token usage after a successful LLM call."""
        now = time.time()
        cost_per_1k = self.MODEL_COSTS.get(model, 0.01)
        cost = (tokens_used / 1000) * cost_per_1k

        pipe = self.redis.pipeline()

        # Track tokens
        minute_key = f"tokens:{user_id}:tpm"
        pipe.zadd(minute_key, {f"{now}:{tokens_used}": now})
        pipe.expire(minute_key, 120)

        # Track daily cost
        daily_key = f"cost:{user_id}:daily:{time.strftime('%Y-%m-%d')}"
        pipe.incrbyfloat(daily_key, cost)
        pipe.expire(daily_key, 86400 * 2)

        pipe.execute()

    def check_daily_budget(self, user_id: str) -> RateLimitResult:
        daily_key = f"cost:{user_id}:daily:{time.strftime('%Y-%m-%d')}"
        current_cost = float(self.redis.get(daily_key) or 0)

        if current_cost >= self.config.max_daily_cost_usd:
            return RateLimitResult(
                allowed=False,
                reason=f"Daily budget of ${self.config.max_daily_cost_usd} exhausted",
            )

        return RateLimitResult(allowed=True)

    def _sum_in_window(self, key: str, now: float, window_seconds: int) -> float:
        window_start = now - window_seconds
        entries = self.redis.zrangebyscore(key, window_start, now)
        return sum(int(e.decode().split(":")[1]) for e in entries)

Graceful Degradation

Instead of hard-blocking users when limits are reached, degrade service quality:

class GracefulDegradation:
    """Downgrade service instead of blocking when approaching limits."""

    def select_model(self, user_id: str, budget_tracker: TokenBudgetTracker) -> str:
        result = budget_tracker.check_daily_budget(user_id)
        daily_key = f"cost:{user_id}:daily:{time.strftime('%Y-%m-%d')}"
        current_cost = float(budget_tracker.redis.get(daily_key) or 0)
        max_cost = budget_tracker.config.max_daily_cost_usd

        usage_ratio = current_cost / max_cost if max_cost > 0 else 0

        if usage_ratio < 0.5:
            return "gpt-4o"        # Full quality
        elif usage_ratio < 0.8:
            return "gpt-4o-mini"   # Reduced quality
        else:
            return "gpt-4o-mini"   # Minimal with shorter context

    def get_max_tokens(self, usage_ratio: float) -> int:
        if usage_ratio < 0.5:
            return 4096
        elif usage_ratio < 0.8:
            return 2048
        else:
            return 1024

FastAPI Integration

from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.responses import JSONResponse

app = FastAPI()
redis_client = redis.Redis(host="localhost", port=6379, db=0)
config = RateLimitConfig()
limiter = SlidingWindowRateLimiter(redis_client, config)
budget = TokenBudgetTracker(redis_client, config)

async def rate_limit_dependency(request: Request):
    user_id = request.headers.get("X-User-ID", "anonymous")

    result = limiter.check_request(user_id)
    if not result.allowed:
        raise HTTPException(
            status_code=429,
            detail=result.reason,
            headers={"Retry-After": str(result.retry_after_seconds)},
        )

    budget_result = budget.check_daily_budget(user_id)
    if not budget_result.allowed:
        raise HTTPException(status_code=429, detail=budget_result.reason)

    return user_id

@app.post("/agent/chat")
async def chat(request: Request, user_id: str = Depends(rate_limit_dependency)):
    body = await request.json()
    # Process agent request with rate-limited user
    return {"response": "Agent response here"}

FAQ

How do I set appropriate rate limits for different user tiers?

Start by measuring actual usage patterns from real users for at least a week before setting limits. Create tiers based on your business model — free users might get 20 requests per hour and a $0.50 daily budget, while paid users get 200 requests per hour and a $20 daily budget. Store tier configurations in your database and load them dynamically rather than hardcoding values.

Should I rate limit per user, per API key, or per IP address?

Use a combination. Per-user limits are the primary control for authenticated users. Per-API-key limits protect against compromised keys. Per-IP limits catch unauthenticated abuse and credential stuffing. Apply the most restrictive matching limit. For agents specifically, also consider per-session limits to prevent a single long-running conversation from consuming too many resources.

How do I handle rate limiting in multi-agent workflows where one user request triggers many LLM calls?

Track the entire workflow as a single "request" for the user-facing rate limit, but track individual LLM calls for the token budget. This way, a user sees consistent request limits regardless of how many internal agent calls their request triggers, but the token budget still provides cost control. Pre-estimate the maximum tokens a workflow might consume and check the budget before starting the workflow, not after each call.


#RateLimiting #APICosts #AISafety #Redis #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.