Skip to content
Learn Agentic AI13 min read0 views

Semaphores and Rate Limiting: Controlling Concurrent LLM API Requests

Master asyncio.Semaphore, token bucket, and sliding window rate limiters to control concurrent LLM API requests. Includes retry-after handling and adaptive throttling.

The Rate Limiting Problem in AI Systems

LLM APIs enforce strict rate limits — typically measured in requests per minute (RPM) and tokens per minute (TPM). An agent processing 100 documents concurrently will blow past these limits immediately, triggering 429 errors, wasted retries, and degraded throughput.

Effective rate limiting requires two mechanisms: concurrency control (how many requests are in-flight simultaneously) and rate control (how many requests per time window). asyncio provides the primitives to implement both.

asyncio.Semaphore: Basic Concurrency Control

A semaphore limits the number of coroutines that can execute a critical section simultaneously. It is the simplest and most effective tool for capping concurrent API calls.

import asyncio
import httpx
import time

async def call_llm(
    client: httpx.AsyncClient,
    semaphore: asyncio.Semaphore,
    prompt: str,
) -> str:
    """Make an LLM call with concurrency limiting."""
    async with semaphore:  # Blocks if limit reached
        print(f"[{time.monotonic():.1f}] Sending: {prompt[:30]}...")
        response = await client.post(
            "https://api.openai.com/v1/chat/completions",
            json={
                "model": "gpt-4o",
                "messages": [{"role": "user", "content": prompt}],
            },
        )
        response.raise_for_status()
        return response.json()["choices"][0]["message"]["content"]

async def process_batch(prompts: list[str], max_concurrent: int = 5):
    """Process prompts with bounded concurrency."""
    semaphore = asyncio.Semaphore(max_concurrent)

    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=60.0,
    ) as client:
        tasks = [
            call_llm(client, semaphore, prompt)
            for prompt in prompts
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)

With max_concurrent=5, only five API calls are in-flight at once. The remaining coroutines wait at async with semaphore until a slot opens.

Token Bucket Rate Limiter

A semaphore controls concurrency but not rate. For true rate limiting (e.g., 60 requests per minute), implement a token bucket algorithm.

class TokenBucketRateLimiter:
    """Token bucket algorithm for rate-limited API calls."""

    def __init__(self, rate: float, capacity: int):
        """
        Args:
            rate: Tokens added per second (e.g., 1.0 = 60/min)
            capacity: Maximum burst size
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self):
        """Wait until a token is available."""
        while True:
            async with self._lock:
                now = time.monotonic()
                elapsed = now - self.last_refill
                self.tokens = min(
                    self.capacity,
                    self.tokens + elapsed * self.rate,
                )
                self.last_refill = now

                if self.tokens >= 1.0:
                    self.tokens -= 1.0
                    return

            # No tokens available, wait for next refill
            await asyncio.sleep(1.0 / self.rate)

# Usage: 60 requests per minute with burst of 10
limiter = TokenBucketRateLimiter(rate=1.0, capacity=10)

async def rate_limited_call(client, prompt):
    await limiter.acquire()  # Wait for rate limit token
    return await call_llm_api(client, prompt)

The token bucket allows short bursts up to capacity, then throttles to the sustained rate. This matches how most LLM APIs behave — they allow brief spikes but enforce an average rate.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Sliding Window Rate Limiter

A sliding window provides more precise rate limiting by tracking exact request timestamps.

from collections import deque

class SlidingWindowLimiter:
    """Sliding window rate limiter for precise request counting."""

    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window = window_seconds
        self.timestamps: deque[float] = deque()
        self._lock = asyncio.Lock()

    async def acquire(self):
        """Wait until a request slot is available."""
        while True:
            async with self._lock:
                now = time.monotonic()
                # Remove expired timestamps
                while (self.timestamps and
                       self.timestamps[0] <= now - self.window):
                    self.timestamps.popleft()

                if len(self.timestamps) < self.max_requests:
                    self.timestamps.append(now)
                    return

                # Calculate wait time until oldest request expires
                wait = self.timestamps[0] + self.window - now

            await asyncio.sleep(wait)

# Usage: 100 requests per 60-second window
limiter = SlidingWindowLimiter(max_requests=100, window_seconds=60)

Combining Semaphore and Rate Limiter

Production systems need both concurrency control and rate limiting.

class LLMThrottler:
    """Combined concurrency + rate limiter for LLM APIs."""

    def __init__(
        self,
        max_concurrent: int = 10,
        max_per_minute: int = 60,
    ):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.rate_limiter = SlidingWindowLimiter(
            max_requests=max_per_minute,
            window_seconds=60,
        )

    async def call(
        self,
        client: httpx.AsyncClient,
        prompt: str,
    ) -> str:
        # First: wait for rate limit slot
        await self.rate_limiter.acquire()
        # Then: wait for concurrency slot
        async with self.semaphore:
            response = await client.post(
                "https://api.openai.com/v1/chat/completions",
                json={
                    "model": "gpt-4o",
                    "messages": [
                        {"role": "user", "content": prompt}
                    ],
                },
            )
            response.raise_for_status()
            return response.json()["choices"][0]["message"]["content"]

# Usage
throttler = LLMThrottler(max_concurrent=10, max_per_minute=60)

async def process_batch(prompts: list[str]):
    async with httpx.AsyncClient(
        headers={"Authorization": f"Bearer {API_KEY}"},
        timeout=60.0,
    ) as client:
        return await asyncio.gather(
            *[throttler.call(client, p) for p in prompts]
        )

Handling Retry-After Headers

When you do hit a 429, respect the server's retry-after header.

async def call_with_retry_after(
    client: httpx.AsyncClient,
    throttler: LLMThrottler,
    prompt: str,
    max_retries: int = 3,
) -> str:
    for attempt in range(max_retries):
        try:
            return await throttler.call(client, prompt)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                retry_after = float(
                    e.response.headers.get("retry-after", "5")
                )
                print(f"429 received. Waiting {retry_after}s")
                await asyncio.sleep(retry_after)
            else:
                raise
    raise RuntimeError(f"Exhausted retries for: {prompt[:50]}")

FAQ

How do I determine the right semaphore limit for my LLM API?

Start with the API's documented rate limits. If the limit is 60 RPM, set the semaphore to 10-15 (allowing bursts but staying well under the limit). Monitor 429 error rates in production and adjust. A good rule of thumb: set concurrency to rate_limit / average_latency_seconds. If your average call takes 2 seconds and the limit is 60 RPM, max_concurrent = 60/60 * 2 = 2 concurrent calls would fully saturate the limit.

What is the difference between a semaphore and a rate limiter?

A semaphore limits how many operations happen simultaneously (concurrency). A rate limiter limits how many operations happen within a time window (throughput). If your LLM calls take 2 seconds each and you have a semaphore of 5, you can make roughly 150 requests per minute — far exceeding a 60 RPM rate limit. You need both.

Should I implement rate limiting per-API-key or per-endpoint?

Per-API-key, because that is how LLM providers enforce limits. If your application uses multiple API keys (e.g., for different tenants), create a separate throttler instance per key. If you call multiple LLM providers, each provider needs its own throttler with provider-specific limits.


#Python #RateLimiting #Asyncio #Semaphore #LLMAPI #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.