Skip to content
Learn Agentic AI
Learn Agentic AI16 min read0 views

Agent Gateway Pattern: Rate Limiting, Authentication, and Request Routing for AI Agents

Implementing an agent gateway with API key management, per-agent rate limiting, intelligent request routing, audit logging, and cost tracking for enterprise AI systems.

What Is an Agent Gateway?

As your AI agent system grows beyond a few agents, you need a single entry point that handles cross-cutting concerns: authentication, rate limiting, request routing, cost tracking, and audit logging. This is the agent gateway pattern — the same concept as an API gateway, but designed specifically for the unique requirements of AI agent systems.

AI agents introduce challenges that traditional API gateways do not handle well. Agent requests vary wildly in cost (a simple lookup versus a multi-step research task), latency (milliseconds versus minutes), and resource consumption (token counts, tool calls, external API calls). The agent gateway must be aware of these dimensions to make intelligent routing and rate limiting decisions.

Gateway Architecture

┌──────────────┐
│   Client      │
│   (API Key)   │
└──────┬───────┘
       │
       ▼
┌──────────────────────────────────────────────┐
│                Agent Gateway                  │
│                                               │
│  ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│  │  Auth     │ │  Rate    │ │  Router      │ │
│  │  Layer    │ │  Limiter │ │  (Intelligent)│ │
│  └────┬─────┘ └────┬─────┘ └──────┬───────┘ │
│       │            │              │           │
│  ┌────┴────────────┴──────────────┴────────┐ │
│  │          Middleware Pipeline              │ │
│  │  Logging → Metrics → Cost Tracking       │ │
│  └──────────────────────────────────────────┘ │
└──────────────────────┬───────────────────────┘
                       │
           ┌───────────┼───────────┐
           ▼           ▼           ▼
      ┌─────────┐ ┌─────────┐ ┌─────────┐
      │Research  │ │Writing  │ │Code     │
      │Agent     │ │Agent    │ │Agent    │
      └─────────┘ └─────────┘ └─────────┘

Step 1: Authentication and API Key Management

The gateway authenticates every request using API keys with scoped permissions:

# gateway/auth.py
from fastapi import Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import hashlib
import secrets
from datetime import datetime
from pydantic import BaseModel

security = HTTPBearer()

class APIKey(BaseModel):
    key_id: str
    key_hash: str
    client_name: str
    allowed_agents: list[str]   # Which agents this key can access
    rate_limit_rpm: int         # Requests per minute
    rate_limit_tokens: int      # Tokens per minute
    monthly_budget_usd: float   # Cost cap
    is_active: bool = True
    created_at: datetime = datetime.utcnow()
    expires_at: datetime | None = None

# In production, use a database. This is for illustration.
API_KEY_STORE: dict[str, APIKey] = {}

def generate_api_key(client_name: str, allowed_agents: list[str],
                     rate_limit_rpm: int = 60,
                     monthly_budget: float = 100.0) -> tuple[str, APIKey]:
    """Generate a new API key for a client."""
    raw_key = f"csa_{secrets.token_urlsafe(32)}"
    key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
    key_id = f"key_{secrets.token_hex(8)}"

    api_key = APIKey(
        key_id=key_id,
        key_hash=key_hash,
        client_name=client_name,
        allowed_agents=allowed_agents,
        rate_limit_rpm=rate_limit_rpm,
        rate_limit_tokens=500_000,
        monthly_budget_usd=monthly_budget,
    )
    API_KEY_STORE[key_hash] = api_key
    return raw_key, api_key

async def authenticate(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> APIKey:
    """Authenticate a request by API key."""
    token = credentials.credentials
    key_hash = hashlib.sha256(token.encode()).hexdigest()
    api_key = API_KEY_STORE.get(key_hash)

    if not api_key:
        raise HTTPException(401, "Invalid API key")
    if not api_key.is_active:
        raise HTTPException(403, "API key is disabled")
    if api_key.expires_at and api_key.expires_at < datetime.utcnow():
        raise HTTPException(403, "API key has expired")

    return api_key

Step 2: Token-Bucket Rate Limiting

Standard request-per-minute rate limiting is insufficient for AI agents because requests vary enormously in cost. A one-sentence query and a 10-page research task should not count equally. Implement dual-dimension rate limiting: requests AND tokens.

# gateway/rate_limiter.py
import time
import asyncio
from dataclasses import dataclass, field

@dataclass
class TokenBucket:
    """Token bucket rate limiter with refill."""
    capacity: float
    tokens: float
    refill_rate: float  # Tokens per second
    last_refill: float = field(default_factory=time.time)

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now

    def try_consume(self, amount: float = 1.0) -> bool:
        self._refill()
        if self.tokens >= amount:
            self.tokens -= amount
            return True
        return False

    def time_until_available(self, amount: float = 1.0) -> float:
        self._refill()
        if self.tokens >= amount:
            return 0.0
        deficit = amount - self.tokens
        return deficit / self.refill_rate

class AgentRateLimiter:
    """Per-client, per-agent rate limiter with request and token dimensions."""

    def __init__(self):
        self.request_buckets: dict[str, TokenBucket] = {}
        self.token_buckets: dict[str, TokenBucket] = {}
        self._lock = asyncio.Lock()

    def _get_bucket_key(self, client_id: str, agent_type: str) -> str:
        return f"{client_id}:{agent_type}"

    async def check_rate_limit(self, client_id: str, agent_type: str,
                                rpm_limit: int, token_limit: int,
                                estimated_tokens: int = 1000) -> tuple[bool, str]:
        async with self._lock:
            key = self._get_bucket_key(client_id, agent_type)

            # Initialize buckets if needed
            if key not in self.request_buckets:
                self.request_buckets[key] = TokenBucket(
                    capacity=rpm_limit,
                    tokens=rpm_limit,
                    refill_rate=rpm_limit / 60.0,
                )
                self.token_buckets[key] = TokenBucket(
                    capacity=token_limit,
                    tokens=token_limit,
                    refill_rate=token_limit / 60.0,
                )

            req_bucket = self.request_buckets[key]
            tok_bucket = self.token_buckets[key]

            # Check request limit
            if not req_bucket.try_consume(1):
                wait = req_bucket.time_until_available(1)
                return False, f"Request rate limit exceeded. Retry in {wait:.1f}s"

            # Check token limit
            if not tok_bucket.try_consume(estimated_tokens):
                wait = tok_bucket.time_until_available(estimated_tokens)
                return False, f"Token rate limit exceeded. Retry in {wait:.1f}s"

            return True, "OK"

Step 3: Intelligent Request Routing

The router analyzes each request and directs it to the most appropriate agent. Unlike simple URL-based routing, the agent gateway routes based on content analysis, agent capabilities, and current load:

# gateway/router.py
from pydantic import BaseModel
from enum import Enum

class AgentCapability(str, Enum):
    RESEARCH = "research"
    WRITING = "writing"
    CODE = "code"
    DATA_ANALYSIS = "data_analysis"
    CUSTOMER_SUPPORT = "customer_support"

class AgentEndpoint(BaseModel):
    name: str
    address: str
    capabilities: list[AgentCapability]
    max_concurrent: int = 10
    current_load: int = 0
    avg_latency_ms: float = 0.0
    error_rate: float = 0.0
    cost_per_request: float = 0.0

class AgentRouter:
    def __init__(self):
        self.agents: dict[str, AgentEndpoint] = {}
        self.keyword_map: dict[str, AgentCapability] = {
            "research": AgentCapability.RESEARCH,
            "find": AgentCapability.RESEARCH,
            "search": AgentCapability.RESEARCH,
            "investigate": AgentCapability.RESEARCH,
            "write": AgentCapability.WRITING,
            "draft": AgentCapability.WRITING,
            "compose": AgentCapability.WRITING,
            "edit": AgentCapability.WRITING,
            "code": AgentCapability.CODE,
            "fix bug": AgentCapability.CODE,
            "implement": AgentCapability.CODE,
            "debug": AgentCapability.CODE,
            "analyze data": AgentCapability.DATA_ANALYSIS,
            "statistics": AgentCapability.DATA_ANALYSIS,
            "chart": AgentCapability.DATA_ANALYSIS,
            "visualize": AgentCapability.DATA_ANALYSIS,
        }

    def register_agent(self, agent: AgentEndpoint):
        self.agents[agent.name] = agent

    def route(self, request_text: str, preferred_agent: str = None) -> AgentEndpoint:
        """Route a request to the best available agent."""

        # Explicit routing if client specifies an agent
        if preferred_agent and preferred_agent in self.agents:
            agent = self.agents[preferred_agent]
            if agent.current_load < agent.max_concurrent:
                return agent

        # Content-based routing
        capability = self._detect_capability(request_text)
        candidates = [
            a for a in self.agents.values()
            if capability in a.capabilities and a.current_load < a.max_concurrent
        ]

        if not candidates:
            # Fallback: route to least loaded agent
            candidates = sorted(
                self.agents.values(),
                key=lambda a: a.current_load / max(a.max_concurrent, 1),
            )

        # Select best candidate by score
        return min(candidates, key=lambda a: self._score_agent(a))

    def _detect_capability(self, text: str) -> AgentCapability:
        text_lower = text.lower()
        for keyword, capability in self.keyword_map.items():
            if keyword in text_lower:
                return capability
        return AgentCapability.RESEARCH  # Default

    def _score_agent(self, agent: AgentEndpoint) -> float:
        """Lower score is better. Considers load, latency, and error rate."""
        load_score = agent.current_load / max(agent.max_concurrent, 1)
        latency_score = agent.avg_latency_ms / 10000  # Normalize
        error_score = agent.error_rate * 10  # Heavily penalize errors
        return load_score + latency_score + error_score

Step 4: Cost Tracking and Budget Enforcement

Every agent request has a cost. The gateway tracks spending per client and enforces budgets:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

# gateway/cost_tracker.py
from datetime import datetime, timedelta
from dataclasses import dataclass, field
import asyncio

@dataclass
class UsageRecord:
    client_id: str
    agent_name: str
    input_tokens: int
    output_tokens: int
    tool_calls: int
    cost_usd: float
    timestamp: datetime = field(default_factory=datetime.utcnow)

class CostTracker:
    # Approximate costs per 1K tokens (as of 2026)
    MODEL_COSTS = {
        "gpt-4o": {"input": 0.0025, "output": 0.01},
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
        "claude-sonnet": {"input": 0.003, "output": 0.015},
    }

    def __init__(self):
        self.records: list[UsageRecord] = []
        self._lock = asyncio.Lock()

    def estimate_cost(self, model: str, input_tokens: int,
                      output_tokens: int) -> float:
        costs = self.MODEL_COSTS.get(model, {"input": 0.003, "output": 0.015})
        return (
            (input_tokens / 1000) * costs["input"]
            + (output_tokens / 1000) * costs["output"]
        )

    async def record_usage(self, record: UsageRecord):
        async with self._lock:
            self.records.append(record)

    async def get_monthly_spend(self, client_id: str) -> float:
        month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0)
        return sum(
            r.cost_usd
            for r in self.records
            if r.client_id == client_id and r.timestamp >= month_start
        )

    async def check_budget(self, client_id: str, budget: float) -> tuple[bool, float]:
        spent = await self.get_monthly_spend(client_id)
        remaining = budget - spent
        return remaining > 0, remaining

    async def get_usage_report(self, client_id: str) -> dict:
        month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0)
        client_records = [
            r for r in self.records
            if r.client_id == client_id and r.timestamp >= month_start
        ]

        by_agent = {}
        for r in client_records:
            if r.agent_name not in by_agent:
                by_agent[r.agent_name] = {
                    "requests": 0, "tokens": 0, "cost": 0.0
                }
            by_agent[r.agent_name]["requests"] += 1
            by_agent[r.agent_name]["tokens"] += r.input_tokens + r.output_tokens
            by_agent[r.agent_name]["cost"] += r.cost_usd

        return {
            "client_id": client_id,
            "period": f"{month_start.strftime('%Y-%m')}",
            "total_requests": len(client_records),
            "total_cost_usd": sum(r.cost_usd for r in client_records),
            "by_agent": by_agent,
        }

Step 5: Audit Logging

Every request through the gateway must be logged for compliance, debugging, and analytics:

# gateway/audit.py
from pydantic import BaseModel
from datetime import datetime
import json
import os

class AuditEntry(BaseModel):
    request_id: str
    client_id: str
    client_name: str
    agent_name: str
    action: str
    input_preview: str       # First 200 chars, no sensitive data
    output_preview: str
    status: str
    latency_ms: int
    tokens_used: int
    cost_usd: float
    ip_address: str
    timestamp: datetime = datetime.utcnow()

class AuditLogger:
    def __init__(self, log_dir: str = "./audit_logs"):
        os.makedirs(log_dir, exist_ok=True)
        self.log_dir = log_dir

    def log(self, entry: AuditEntry):
        """Append audit entry to daily log file."""
        date_str = entry.timestamp.strftime("%Y-%m-%d")
        log_file = os.path.join(self.log_dir, f"audit_{date_str}.jsonl")

        # Sanitize: remove any potential PII from previews
        sanitized = entry.model_copy()
        sanitized.input_preview = self._sanitize(entry.input_preview)

        with open(log_file, "a") as f:
            f.write(sanitized.model_dump_json() + "\n")

    def _sanitize(self, text: str) -> str:
        """Remove potential PII patterns from preview text."""
        import re
        text = re.sub(r'\b[\w.+-]+@[\w-]+\.[\w.]+\b', '[EMAIL]', text)
        text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
        text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text)
        return text[:200]

Step 6: Assemble the Gateway

Bring all components together into a FastAPI application:

# gateway/main.py
from fastapi import FastAPI, Request, HTTPException, Depends
from gateway.auth import authenticate, APIKey
from gateway.rate_limiter import AgentRateLimiter
from gateway.router import AgentRouter, AgentEndpoint, AgentCapability
from gateway.cost_tracker import CostTracker
from gateway.audit import AuditLogger, AuditEntry
from pydantic import BaseModel
import time
import uuid

app = FastAPI(title="Agent Gateway", version="1.0.0")

rate_limiter = AgentRateLimiter()
router = AgentRouter()
cost_tracker = CostTracker()
audit_logger = AuditLogger()

class AgentRequest(BaseModel):
    input: str
    agent: str = ""
    model: str = "gpt-4o"
    max_tokens: int = 4096

class AgentResponse(BaseModel):
    request_id: str
    output: str
    agent_used: str
    tokens_used: int
    cost_usd: float
    latency_ms: int

@app.post("/v1/agent/invoke", response_model=AgentResponse)
async def invoke_agent(
    req: AgentRequest,
    request: Request,
    api_key: APIKey = Depends(authenticate),
):
    request_id = str(uuid.uuid4())
    start_time = time.time()

    # Check agent access
    target_agent = router.route(req.input, req.agent)
    if target_agent.name not in api_key.allowed_agents and "*" not in api_key.allowed_agents:
        raise HTTPException(
            403,
            f"API key does not have access to agent '{target_agent.name}'"
        )

    # Check rate limits
    allowed, message = await rate_limiter.check_rate_limit(
        api_key.key_id, target_agent.name,
        api_key.rate_limit_rpm, api_key.rate_limit_tokens,
    )
    if not allowed:
        raise HTTPException(429, message)

    # Check budget
    has_budget, remaining = await cost_tracker.check_budget(
        api_key.key_id, api_key.monthly_budget_usd
    )
    if not has_budget:
        raise HTTPException(
            402,
            f"Monthly budget exceeded. Budget: ${api_key.monthly_budget_usd:.2f}"
        )

    # Forward to agent (simplified — in production, use gRPC or HTTP)
    try:
        # ... call the actual agent service ...
        output = "Agent response placeholder"
        tokens_used = 1500
        cost = cost_tracker.estimate_cost(req.model, 1000, 500)
    except Exception as e:
        raise HTTPException(503, f"Agent execution failed: {str(e)}")

    latency_ms = int((time.time() - start_time) * 1000)

    # Record cost
    from gateway.cost_tracker import UsageRecord
    await cost_tracker.record_usage(UsageRecord(
        client_id=api_key.key_id,
        agent_name=target_agent.name,
        input_tokens=1000,
        output_tokens=500,
        tool_calls=0,
        cost_usd=cost,
    ))

    # Audit log
    audit_logger.log(AuditEntry(
        request_id=request_id,
        client_id=api_key.key_id,
        client_name=api_key.client_name,
        agent_name=target_agent.name,
        action="invoke",
        input_preview=req.input[:200],
        output_preview=output[:200],
        status="success",
        latency_ms=latency_ms,
        tokens_used=tokens_used,
        cost_usd=cost,
        ip_address=request.client.host or "unknown",
    ))

    return AgentResponse(
        request_id=request_id,
        output=output,
        agent_used=target_agent.name,
        tokens_used=tokens_used,
        cost_usd=cost,
        latency_ms=latency_ms,
    )

@app.get("/v1/usage", response_model=dict)
async def get_usage(api_key: APIKey = Depends(authenticate)):
    return await cost_tracker.get_usage_report(api_key.key_id)

Production Deployment Considerations

When deploying the agent gateway to production, address these concerns:

  1. High availability — Run at least 3 gateway instances behind a load balancer. Rate limiter state must be shared (use Redis instead of in-memory).
  2. TLS termination — The gateway should terminate TLS and communicate with backend agents over an internal network.
  3. Request validation — Add input sanitization to prevent prompt injection attacks through the gateway.
  4. Observability — Export metrics to Prometheus (request count, latency histograms, error rates, circuit breaker states) and traces to Jaeger or similar.
  5. Canary deployments — Route a small percentage of traffic to new agent versions before full rollout.

FAQ

How do I handle long-running agent requests that exceed typical HTTP timeouts?

Use an async job pattern. The gateway immediately returns a job ID with a 202 Accepted status. The client polls a status endpoint or receives a webhook when the agent completes. This decouples the HTTP request lifecycle from the agent execution time, allowing agents to run for minutes without timeout issues.

Should the gateway handle agent-to-agent communication or only external requests?

The gateway should primarily handle external client-to-agent requests. For internal agent-to-agent communication, use direct gRPC calls or a message broker. Adding gateway overhead to every internal call would increase latency unnecessarily. The exception is when you need centralized audit logging for all agent interactions, including internal ones.

How do I implement per-endpoint rate limits in addition to per-client limits?

Add a second dimension to the rate limiter keyed by the agent name. Each agent endpoint gets its own capacity limit that is shared across all clients. This prevents one client from consuming all capacity on a popular agent. The check becomes: client-level limit AND agent-level limit must both allow the request.

Support multiple active keys per client. When rotating, generate a new key, distribute it to the client, and set the old key to expire in 24-48 hours. The gateway accepts both keys during the overlap period. This zero-downtime rotation prevents service interruptions during key changes.

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.