Skip to content
Learn Agentic AI14 min read0 views

MCP Authentication and Authorization: Securing Tool Access for AI Agents

Implement robust security for MCP servers with OAuth2 integration, API key validation, permission scopes, and token management to ensure AI agents only access tools they are authorized to use.

Why MCP Security Is Non-Negotiable

An MCP server that exposes database queries, file system access, or API integrations to AI agents is a high-value target. Without authentication, anyone who can reach the server can execute tools. Without authorization, an authenticated agent can access every tool the server exposes — including destructive operations it should never touch.

The MCP specification includes an authorization framework based on OAuth 2.1. For HTTP transport servers, this provides a standardized way to authenticate clients and scope their permissions. For stdio transport, security relies on the process environment since the server runs as a local subprocess.

API Key Authentication

The simplest authentication pattern is API key validation. For HTTP-transport MCP servers, implement this as middleware that runs before the MCP handler:

# secured_server.py
from mcp.server.fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
import os
import hashlib
import hmac

VALID_API_KEYS = {
    # hash keys instead of storing plaintext
    hashlib.sha256(b"agent-key-readonly-001").hexdigest(): {
        "name": "readonly-agent",
        "scopes": ["tools:read", "resources:read"],
    },
    hashlib.sha256(b"agent-key-admin-002").hexdigest(): {
        "name": "admin-agent",
        "scopes": ["tools:read", "tools:write", "resources:read"],
    },
}


class APIKeyMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        api_key = request.headers.get("Authorization", "").replace(
            "Bearer ", ""
        )
        if not api_key:
            return JSONResponse(
                {"error": "Missing API key"}, status_code=401
            )

        key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        client = VALID_API_KEYS.get(key_hash)

        if not client:
            return JSONResponse(
                {"error": "Invalid API key"}, status_code=403
            )

        # Attach client info to request state for scope checking
        request.state.client = client
        return await call_next(request)

This middleware hashes the incoming API key and compares it against stored hashes. Never store API keys in plaintext — always hash them.

Permission Scopes and Tool-Level Authorization

Authentication tells you who the caller is. Authorization tells you what they can do. Implement tool-level permission checks that enforce scopes:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from functools import wraps
import json

mcp_server = FastMCP(name="SecuredDatabase")

# Scope definitions
TOOL_SCOPES = {
    "query_db": "tools:read",
    "list_tables": "tools:read",
    "insert_record": "tools:write",
    "delete_record": "tools:write",
    "drop_table": "tools:admin",
}


def require_scope(scope: str):
    """Decorator that checks if the current client has the required scope."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # In a real implementation, extract client from request context
            # This is a simplified illustration
            client_scopes = kwargs.pop("_client_scopes", [])
            if scope not in client_scopes:
                return json.dumps({
                    "error": f"Permission denied. Required scope: {scope}",
                    "your_scopes": client_scopes,
                })
            return await func(*args, **kwargs)
        return wrapper
    return decorator


@mcp_server.tool()
@require_scope("tools:read")
async def query_db(sql: str) -> str:
    """Execute a read-only SQL query."""
    # Query logic here
    return json.dumps({"result": "query results"})


@mcp_server.tool()
@require_scope("tools:write")
async def insert_record(table: str, data: dict) -> str:
    """Insert a record into a table. Requires write permission."""
    return json.dumps({"inserted": True})

OAuth 2.1 Integration

For production deployments, MCP supports OAuth 2.1 with the authorization code flow. The MCP server acts as a resource server that validates access tokens issued by your identity provider:

import httpx
from datetime import datetime, timezone

class OAuth2Validator:
    """Validate OAuth 2.1 access tokens against an authorization server."""

    def __init__(self, issuer_url: str, audience: str):
        self.issuer_url = issuer_url
        self.audience = audience
        self._jwks_cache = None
        self._cache_expiry = None

    async def validate_token(self, token: str) -> dict | None:
        """Validate a Bearer token and return claims if valid."""
        # Use token introspection endpoint
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.issuer_url}/oauth/introspect",
                data={
                    "token": token,
                    "token_type_hint": "access_token",
                },
                headers={
                    "Content-Type": "application/x-www-form-urlencoded",
                },
            )

            if response.status_code != 200:
                return None

            claims = response.json()

            if not claims.get("active"):
                return None

            if claims.get("aud") != self.audience:
                return None

            exp = claims.get("exp", 0)
            if datetime.fromtimestamp(exp, tz=timezone.utc) < datetime.now(
                tz=timezone.utc
            ):
                return None

            return claims

    def extract_scopes(self, claims: dict) -> list[str]:
        """Extract MCP permission scopes from token claims."""
        scope_string = claims.get("scope", "")
        return scope_string.split() if scope_string else []

Token Management for Agent Runtimes

On the agent side, the runtime must manage tokens — acquiring them, refreshing them, and attaching them to MCP requests:

class MCPTokenManager:
    """Manage OAuth tokens for MCP server connections."""

    def __init__(self, token_url: str, client_id: str, client_secret: str):
        self.token_url = token_url
        self.client_id = client_id
        self.client_secret = client_secret
        self._access_token = None
        self._expires_at = None

    async def get_token(self) -> str:
        """Get a valid access token, refreshing if needed."""
        if self._access_token and self._expires_at:
            if datetime.now(tz=timezone.utc) < self._expires_at:
                return self._access_token

        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.token_url,
                data={
                    "grant_type": "client_credentials",
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                    "scope": "tools:read tools:write resources:read",
                },
            )
            data = response.json()
            self._access_token = data["access_token"]
            self._expires_at = datetime.now(
                tz=timezone.utc
            ) + timedelta(seconds=data["expires_in"] - 30)

            return self._access_token

Audit Logging

Every authenticated tool invocation should be logged for security auditing:

import logging
from datetime import datetime

audit_logger = logging.getLogger("mcp.audit")

def log_tool_invocation(
    client_name: str,
    tool_name: str,
    arguments: dict,
    result_status: str,
):
    """Log every tool call for security auditing."""
    audit_logger.info(
        "MCP tool invocation",
        extra={
            "client": client_name,
            "tool": tool_name,
            "arguments": arguments,
            "status": result_status,
            "timestamp": datetime.utcnow().isoformat(),
        },
    )

FAQ

How does authentication work for stdio MCP servers?

Stdio servers inherit the security context of the process that spawns them. The agent runtime starts the server as a subprocess, so the server runs with the same permissions as the agent. Authentication is implicit — if the agent can start the process, it has access. For additional security, the server can check environment variables or local credential files that the agent runtime provisions.

Can different agents have different permission levels on the same server?

Yes. With OAuth or API key scopes, each agent authenticates with its own credentials and receives a different set of permissions. A read-only analytics agent gets tools:read scope, while an admin agent gets full tools:read tools:write tools:admin scopes. The server enforces these scopes on every tool call.

What happens when an agent's token expires mid-conversation?

The agent runtime should implement token refresh logic. When a tool call returns a 401 response, the runtime refreshes the access token and retries the call. Most OAuth libraries handle this automatically. The key is that the MCP server must return clear 401/403 errors rather than generic failures so the runtime knows to refresh rather than give up.


#MCP #Authentication #Authorization #Security #AIAgents #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.