Agent Gateway Pattern: Rate Limiting, Authentication, and Request Routing for AI Agents
Implementing an agent gateway with API key management, per-agent rate limiting, intelligent request routing, audit logging, and cost tracking for enterprise AI systems.
What Is an Agent Gateway?
As your AI agent system grows beyond a few agents, you need a single entry point that handles cross-cutting concerns: authentication, rate limiting, request routing, cost tracking, and audit logging. This is the agent gateway pattern — the same concept as an API gateway, but designed specifically for the unique requirements of AI agent systems.
AI agents introduce challenges that traditional API gateways do not handle well. Agent requests vary wildly in cost (a simple lookup versus a multi-step research task), latency (milliseconds versus minutes), and resource consumption (token counts, tool calls, external API calls). The agent gateway must be aware of these dimensions to make intelligent routing and rate limiting decisions.
Gateway Architecture
┌──────────────┐
│ Client │
│ (API Key) │
└──────┬───────┘
│
▼
┌──────────────────────────────────────────────┐
│ Agent Gateway │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Auth │ │ Rate │ │ Router │ │
│ │ Layer │ │ Limiter │ │ (Intelligent)│ │
│ └────┬─────┘ └────┬─────┘ └──────┬───────┘ │
│ │ │ │ │
│ ┌────┴────────────┴──────────────┴────────┐ │
│ │ Middleware Pipeline │ │
│ │ Logging → Metrics → Cost Tracking │ │
│ └──────────────────────────────────────────┘ │
└──────────────────────┬───────────────────────┘
│
┌───────────┼───────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Research │ │Writing │ │Code │
│Agent │ │Agent │ │Agent │
└─────────┘ └─────────┘ └─────────┘
Step 1: Authentication and API Key Management
The gateway authenticates every request using API keys with scoped permissions:
# gateway/auth.py
from fastapi import Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import hashlib
import secrets
from datetime import datetime
from pydantic import BaseModel
security = HTTPBearer()
class APIKey(BaseModel):
key_id: str
key_hash: str
client_name: str
allowed_agents: list[str] # Which agents this key can access
rate_limit_rpm: int # Requests per minute
rate_limit_tokens: int # Tokens per minute
monthly_budget_usd: float # Cost cap
is_active: bool = True
created_at: datetime = datetime.utcnow()
expires_at: datetime | None = None
# In production, use a database. This is for illustration.
API_KEY_STORE: dict[str, APIKey] = {}
def generate_api_key(client_name: str, allowed_agents: list[str],
rate_limit_rpm: int = 60,
monthly_budget: float = 100.0) -> tuple[str, APIKey]:
"""Generate a new API key for a client."""
raw_key = f"csa_{secrets.token_urlsafe(32)}"
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
key_id = f"key_{secrets.token_hex(8)}"
api_key = APIKey(
key_id=key_id,
key_hash=key_hash,
client_name=client_name,
allowed_agents=allowed_agents,
rate_limit_rpm=rate_limit_rpm,
rate_limit_tokens=500_000,
monthly_budget_usd=monthly_budget,
)
API_KEY_STORE[key_hash] = api_key
return raw_key, api_key
async def authenticate(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> APIKey:
"""Authenticate a request by API key."""
token = credentials.credentials
key_hash = hashlib.sha256(token.encode()).hexdigest()
api_key = API_KEY_STORE.get(key_hash)
if not api_key:
raise HTTPException(401, "Invalid API key")
if not api_key.is_active:
raise HTTPException(403, "API key is disabled")
if api_key.expires_at and api_key.expires_at < datetime.utcnow():
raise HTTPException(403, "API key has expired")
return api_key
Step 2: Token-Bucket Rate Limiting
Standard request-per-minute rate limiting is insufficient for AI agents because requests vary enormously in cost. A one-sentence query and a 10-page research task should not count equally. Implement dual-dimension rate limiting: requests AND tokens.
# gateway/rate_limiter.py
import time
import asyncio
from dataclasses import dataclass, field
@dataclass
class TokenBucket:
"""Token bucket rate limiter with refill."""
capacity: float
tokens: float
refill_rate: float # Tokens per second
last_refill: float = field(default_factory=time.time)
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
def try_consume(self, amount: float = 1.0) -> bool:
self._refill()
if self.tokens >= amount:
self.tokens -= amount
return True
return False
def time_until_available(self, amount: float = 1.0) -> float:
self._refill()
if self.tokens >= amount:
return 0.0
deficit = amount - self.tokens
return deficit / self.refill_rate
class AgentRateLimiter:
"""Per-client, per-agent rate limiter with request and token dimensions."""
def __init__(self):
self.request_buckets: dict[str, TokenBucket] = {}
self.token_buckets: dict[str, TokenBucket] = {}
self._lock = asyncio.Lock()
def _get_bucket_key(self, client_id: str, agent_type: str) -> str:
return f"{client_id}:{agent_type}"
async def check_rate_limit(self, client_id: str, agent_type: str,
rpm_limit: int, token_limit: int,
estimated_tokens: int = 1000) -> tuple[bool, str]:
async with self._lock:
key = self._get_bucket_key(client_id, agent_type)
# Initialize buckets if needed
if key not in self.request_buckets:
self.request_buckets[key] = TokenBucket(
capacity=rpm_limit,
tokens=rpm_limit,
refill_rate=rpm_limit / 60.0,
)
self.token_buckets[key] = TokenBucket(
capacity=token_limit,
tokens=token_limit,
refill_rate=token_limit / 60.0,
)
req_bucket = self.request_buckets[key]
tok_bucket = self.token_buckets[key]
# Check request limit
if not req_bucket.try_consume(1):
wait = req_bucket.time_until_available(1)
return False, f"Request rate limit exceeded. Retry in {wait:.1f}s"
# Check token limit
if not tok_bucket.try_consume(estimated_tokens):
wait = tok_bucket.time_until_available(estimated_tokens)
return False, f"Token rate limit exceeded. Retry in {wait:.1f}s"
return True, "OK"
Step 3: Intelligent Request Routing
The router analyzes each request and directs it to the most appropriate agent. Unlike simple URL-based routing, the agent gateway routes based on content analysis, agent capabilities, and current load:
# gateway/router.py
from pydantic import BaseModel
from enum import Enum
class AgentCapability(str, Enum):
RESEARCH = "research"
WRITING = "writing"
CODE = "code"
DATA_ANALYSIS = "data_analysis"
CUSTOMER_SUPPORT = "customer_support"
class AgentEndpoint(BaseModel):
name: str
address: str
capabilities: list[AgentCapability]
max_concurrent: int = 10
current_load: int = 0
avg_latency_ms: float = 0.0
error_rate: float = 0.0
cost_per_request: float = 0.0
class AgentRouter:
def __init__(self):
self.agents: dict[str, AgentEndpoint] = {}
self.keyword_map: dict[str, AgentCapability] = {
"research": AgentCapability.RESEARCH,
"find": AgentCapability.RESEARCH,
"search": AgentCapability.RESEARCH,
"investigate": AgentCapability.RESEARCH,
"write": AgentCapability.WRITING,
"draft": AgentCapability.WRITING,
"compose": AgentCapability.WRITING,
"edit": AgentCapability.WRITING,
"code": AgentCapability.CODE,
"fix bug": AgentCapability.CODE,
"implement": AgentCapability.CODE,
"debug": AgentCapability.CODE,
"analyze data": AgentCapability.DATA_ANALYSIS,
"statistics": AgentCapability.DATA_ANALYSIS,
"chart": AgentCapability.DATA_ANALYSIS,
"visualize": AgentCapability.DATA_ANALYSIS,
}
def register_agent(self, agent: AgentEndpoint):
self.agents[agent.name] = agent
def route(self, request_text: str, preferred_agent: str = None) -> AgentEndpoint:
"""Route a request to the best available agent."""
# Explicit routing if client specifies an agent
if preferred_agent and preferred_agent in self.agents:
agent = self.agents[preferred_agent]
if agent.current_load < agent.max_concurrent:
return agent
# Content-based routing
capability = self._detect_capability(request_text)
candidates = [
a for a in self.agents.values()
if capability in a.capabilities and a.current_load < a.max_concurrent
]
if not candidates:
# Fallback: route to least loaded agent
candidates = sorted(
self.agents.values(),
key=lambda a: a.current_load / max(a.max_concurrent, 1),
)
# Select best candidate by score
return min(candidates, key=lambda a: self._score_agent(a))
def _detect_capability(self, text: str) -> AgentCapability:
text_lower = text.lower()
for keyword, capability in self.keyword_map.items():
if keyword in text_lower:
return capability
return AgentCapability.RESEARCH # Default
def _score_agent(self, agent: AgentEndpoint) -> float:
"""Lower score is better. Considers load, latency, and error rate."""
load_score = agent.current_load / max(agent.max_concurrent, 1)
latency_score = agent.avg_latency_ms / 10000 # Normalize
error_score = agent.error_rate * 10 # Heavily penalize errors
return load_score + latency_score + error_score
Step 4: Cost Tracking and Budget Enforcement
Every agent request has a cost. The gateway tracks spending per client and enforces budgets:
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
# gateway/cost_tracker.py
from datetime import datetime, timedelta
from dataclasses import dataclass, field
import asyncio
@dataclass
class UsageRecord:
client_id: str
agent_name: str
input_tokens: int
output_tokens: int
tool_calls: int
cost_usd: float
timestamp: datetime = field(default_factory=datetime.utcnow)
class CostTracker:
# Approximate costs per 1K tokens (as of 2026)
MODEL_COSTS = {
"gpt-4o": {"input": 0.0025, "output": 0.01},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"claude-sonnet": {"input": 0.003, "output": 0.015},
}
def __init__(self):
self.records: list[UsageRecord] = []
self._lock = asyncio.Lock()
def estimate_cost(self, model: str, input_tokens: int,
output_tokens: int) -> float:
costs = self.MODEL_COSTS.get(model, {"input": 0.003, "output": 0.015})
return (
(input_tokens / 1000) * costs["input"]
+ (output_tokens / 1000) * costs["output"]
)
async def record_usage(self, record: UsageRecord):
async with self._lock:
self.records.append(record)
async def get_monthly_spend(self, client_id: str) -> float:
month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0)
return sum(
r.cost_usd
for r in self.records
if r.client_id == client_id and r.timestamp >= month_start
)
async def check_budget(self, client_id: str, budget: float) -> tuple[bool, float]:
spent = await self.get_monthly_spend(client_id)
remaining = budget - spent
return remaining > 0, remaining
async def get_usage_report(self, client_id: str) -> dict:
month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0)
client_records = [
r for r in self.records
if r.client_id == client_id and r.timestamp >= month_start
]
by_agent = {}
for r in client_records:
if r.agent_name not in by_agent:
by_agent[r.agent_name] = {
"requests": 0, "tokens": 0, "cost": 0.0
}
by_agent[r.agent_name]["requests"] += 1
by_agent[r.agent_name]["tokens"] += r.input_tokens + r.output_tokens
by_agent[r.agent_name]["cost"] += r.cost_usd
return {
"client_id": client_id,
"period": f"{month_start.strftime('%Y-%m')}",
"total_requests": len(client_records),
"total_cost_usd": sum(r.cost_usd for r in client_records),
"by_agent": by_agent,
}
Step 5: Audit Logging
Every request through the gateway must be logged for compliance, debugging, and analytics:
# gateway/audit.py
from pydantic import BaseModel
from datetime import datetime
import json
import os
class AuditEntry(BaseModel):
request_id: str
client_id: str
client_name: str
agent_name: str
action: str
input_preview: str # First 200 chars, no sensitive data
output_preview: str
status: str
latency_ms: int
tokens_used: int
cost_usd: float
ip_address: str
timestamp: datetime = datetime.utcnow()
class AuditLogger:
def __init__(self, log_dir: str = "./audit_logs"):
os.makedirs(log_dir, exist_ok=True)
self.log_dir = log_dir
def log(self, entry: AuditEntry):
"""Append audit entry to daily log file."""
date_str = entry.timestamp.strftime("%Y-%m-%d")
log_file = os.path.join(self.log_dir, f"audit_{date_str}.jsonl")
# Sanitize: remove any potential PII from previews
sanitized = entry.model_copy()
sanitized.input_preview = self._sanitize(entry.input_preview)
with open(log_file, "a") as f:
f.write(sanitized.model_dump_json() + "\n")
def _sanitize(self, text: str) -> str:
"""Remove potential PII patterns from preview text."""
import re
text = re.sub(r'\b[\w.+-]+@[\w-]+\.[\w.]+\b', '[EMAIL]', text)
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text)
return text[:200]
Step 6: Assemble the Gateway
Bring all components together into a FastAPI application:
# gateway/main.py
from fastapi import FastAPI, Request, HTTPException, Depends
from gateway.auth import authenticate, APIKey
from gateway.rate_limiter import AgentRateLimiter
from gateway.router import AgentRouter, AgentEndpoint, AgentCapability
from gateway.cost_tracker import CostTracker
from gateway.audit import AuditLogger, AuditEntry
from pydantic import BaseModel
import time
import uuid
app = FastAPI(title="Agent Gateway", version="1.0.0")
rate_limiter = AgentRateLimiter()
router = AgentRouter()
cost_tracker = CostTracker()
audit_logger = AuditLogger()
class AgentRequest(BaseModel):
input: str
agent: str = ""
model: str = "gpt-4o"
max_tokens: int = 4096
class AgentResponse(BaseModel):
request_id: str
output: str
agent_used: str
tokens_used: int
cost_usd: float
latency_ms: int
@app.post("/v1/agent/invoke", response_model=AgentResponse)
async def invoke_agent(
req: AgentRequest,
request: Request,
api_key: APIKey = Depends(authenticate),
):
request_id = str(uuid.uuid4())
start_time = time.time()
# Check agent access
target_agent = router.route(req.input, req.agent)
if target_agent.name not in api_key.allowed_agents and "*" not in api_key.allowed_agents:
raise HTTPException(
403,
f"API key does not have access to agent '{target_agent.name}'"
)
# Check rate limits
allowed, message = await rate_limiter.check_rate_limit(
api_key.key_id, target_agent.name,
api_key.rate_limit_rpm, api_key.rate_limit_tokens,
)
if not allowed:
raise HTTPException(429, message)
# Check budget
has_budget, remaining = await cost_tracker.check_budget(
api_key.key_id, api_key.monthly_budget_usd
)
if not has_budget:
raise HTTPException(
402,
f"Monthly budget exceeded. Budget: ${api_key.monthly_budget_usd:.2f}"
)
# Forward to agent (simplified — in production, use gRPC or HTTP)
try:
# ... call the actual agent service ...
output = "Agent response placeholder"
tokens_used = 1500
cost = cost_tracker.estimate_cost(req.model, 1000, 500)
except Exception as e:
raise HTTPException(503, f"Agent execution failed: {str(e)}")
latency_ms = int((time.time() - start_time) * 1000)
# Record cost
from gateway.cost_tracker import UsageRecord
await cost_tracker.record_usage(UsageRecord(
client_id=api_key.key_id,
agent_name=target_agent.name,
input_tokens=1000,
output_tokens=500,
tool_calls=0,
cost_usd=cost,
))
# Audit log
audit_logger.log(AuditEntry(
request_id=request_id,
client_id=api_key.key_id,
client_name=api_key.client_name,
agent_name=target_agent.name,
action="invoke",
input_preview=req.input[:200],
output_preview=output[:200],
status="success",
latency_ms=latency_ms,
tokens_used=tokens_used,
cost_usd=cost,
ip_address=request.client.host or "unknown",
))
return AgentResponse(
request_id=request_id,
output=output,
agent_used=target_agent.name,
tokens_used=tokens_used,
cost_usd=cost,
latency_ms=latency_ms,
)
@app.get("/v1/usage", response_model=dict)
async def get_usage(api_key: APIKey = Depends(authenticate)):
return await cost_tracker.get_usage_report(api_key.key_id)
Production Deployment Considerations
When deploying the agent gateway to production, address these concerns:
- High availability — Run at least 3 gateway instances behind a load balancer. Rate limiter state must be shared (use Redis instead of in-memory).
- TLS termination — The gateway should terminate TLS and communicate with backend agents over an internal network.
- Request validation — Add input sanitization to prevent prompt injection attacks through the gateway.
- Observability — Export metrics to Prometheus (request count, latency histograms, error rates, circuit breaker states) and traces to Jaeger or similar.
- Canary deployments — Route a small percentage of traffic to new agent versions before full rollout.
FAQ
How do I handle long-running agent requests that exceed typical HTTP timeouts?
Use an async job pattern. The gateway immediately returns a job ID with a 202 Accepted status. The client polls a status endpoint or receives a webhook when the agent completes. This decouples the HTTP request lifecycle from the agent execution time, allowing agents to run for minutes without timeout issues.
Should the gateway handle agent-to-agent communication or only external requests?
The gateway should primarily handle external client-to-agent requests. For internal agent-to-agent communication, use direct gRPC calls or a message broker. Adding gateway overhead to every internal call would increase latency unnecessarily. The exception is when you need centralized audit logging for all agent interactions, including internal ones.
How do I implement per-endpoint rate limits in addition to per-client limits?
Add a second dimension to the rate limiter keyed by the agent name. Each agent endpoint gets its own capacity limit that is shared across all clients. This prevents one client from consuming all capacity on a popular agent. The check becomes: client-level limit AND agent-level limit must both allow the request.
What is the recommended approach for API key rotation?
Support multiple active keys per client. When rotating, generate a new key, distribute it to the client, and set the old key to expire in 24-48 hours. The gateway accepts both keys during the overlap period. This zero-downtime rotation prevents service interruptions during key changes.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.