Skip to content
Learn Agentic AI11 min read0 views

Building an Agent-to-Agent API: Standardized Communication Between AI Services

Design and implement a standardized API for agent-to-agent communication, covering interface contracts, service discovery, authentication between agents, and message formats that enable seamless multi-agent orchestration.

The Need for Standardized Agent Communication

When you build a multi-agent system, agents need to talk to each other reliably. The triage agent routes to the billing agent. The research agent asks the search agent for data. The orchestrator assigns tasks to specialist workers. Without a standardized communication protocol, each integration becomes a bespoke point-to-point connection that breaks when either side changes.

A well-designed agent-to-agent API establishes a common contract — a shared language for requesting work, reporting results, and handling failures. This contract enables you to add new agents, swap implementations, and scale individual services independently.

Defining the Agent Interface Contract

Every agent in the system should expose the same base interface, regardless of what it does internally. This is the foundational principle:

from pydantic import BaseModel, Field
from typing import Any, Optional
from enum import Enum
from datetime import datetime

class AgentCapability(str, Enum):
    CHAT = "chat"
    TASK_EXECUTION = "task_execution"
    TOOL_USE = "tool_use"
    CODE_GENERATION = "code_generation"
    DATA_ANALYSIS = "data_analysis"

class AgentCard(BaseModel):
    """Self-description that every agent publishes."""
    agent_id: str
    name: str
    version: str
    capabilities: list[AgentCapability]
    accepted_input_types: list[str]
    output_types: list[str]
    max_concurrent_tasks: int = 10
    avg_response_ms: int = 0
    endpoint: str

class TaskMessage(BaseModel):
    """Standard message format for agent-to-agent requests."""
    task_id: str
    source_agent: str
    target_agent: str
    action: str
    payload: dict[str, Any]
    context: dict[str, Any] = Field(default_factory=dict)
    priority: int = Field(default=5, ge=1, le=10)
    deadline: Optional[datetime] = None
    correlation_id: str = ""

class TaskResult(BaseModel):
    """Standard response from any agent."""
    task_id: str
    agent_id: str
    status: str = Field(..., pattern="^(completed|failed|delegated|pending)$")
    result: Any = None
    error: Optional[str] = None
    execution_ms: int = 0
    delegated_to: Optional[str] = None

With this contract, any agent can send a TaskMessage to any other agent and receive a TaskResult back, regardless of the receiving agent's internal implementation.

Service Discovery with a Registry

Agents need to find each other. A central registry lets agents announce their capabilities and discover peers:

from fastapi import FastAPI, HTTPException

app = FastAPI(title="Agent Registry")

registry: dict[str, AgentCard] = {}

@app.post("/registry/agents", status_code=201)
async def register_agent(card: AgentCard):
    registry[card.agent_id] = card
    return {"registered": card.agent_id}

@app.get("/registry/agents")
async def list_agents(capability: AgentCapability | None = None):
    agents = list(registry.values())
    if capability:
        agents = [a for a in agents if capability in a.capabilities]
    return {"agents": agents}

@app.get("/registry/agents/{agent_id}")
async def get_agent(agent_id: str):
    if agent_id not in registry:
        raise HTTPException(status_code=404, detail="Agent not registered")
    return registry[agent_id]

@app.delete("/registry/agents/{agent_id}", status_code=204)
async def deregister_agent(agent_id: str):
    registry.pop(agent_id, None)

Each agent registers on startup and deregisters on shutdown. The orchestrator queries the registry to find agents with the right capability for each task.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Agent-to-Agent Authentication

Agents must authenticate with each other to prevent unauthorized task injection. Use short-lived JWT tokens issued by a central authority:

import jwt
from datetime import datetime, timedelta
from fastapi import Header, HTTPException

AGENT_SECRET = "shared-agent-signing-key"  # In production, use a vault

def create_agent_token(agent_id: str) -> str:
    payload = {
        "sub": agent_id,
        "type": "agent",
        "iat": datetime.utcnow(),
        "exp": datetime.utcnow() + timedelta(minutes=15),
    }
    return jwt.encode(payload, AGENT_SECRET, algorithm="HS256")

async def verify_agent_token(authorization: str = Header(...)) -> str:
    token = authorization.removeprefix("Bearer ")
    try:
        payload = jwt.decode(token, AGENT_SECRET, algorithms=["HS256"])
        if payload.get("type") != "agent":
            raise HTTPException(status_code=403, detail="Not an agent token")
        return payload["sub"]
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

Building the Agent Base Class

Create a reusable base class so every agent exposes the same HTTP interface:

from fastapi import Depends
import time

class BaseAgent:
    def __init__(self, card: AgentCard):
        self.card = card

    async def handle_task(self, message: TaskMessage) -> TaskResult:
        raise NotImplementedError

    def register_routes(self, app: FastAPI):
        @app.get("/agent/card")
        async def get_card():
            return self.card

        @app.post("/agent/tasks")
        async def receive_task(
            message: TaskMessage,
            caller: str = Depends(verify_agent_token),
        ):
            start = time.perf_counter()
            result = await self.handle_task(message)
            result.execution_ms = int((time.perf_counter() - start) * 1000)
            return result

Specialist agents inherit from BaseAgent and implement handle_task:

class BillingAgent(BaseAgent):
    async def handle_task(self, message: TaskMessage) -> TaskResult:
        if message.action == "check_balance":
            balance = await fetch_balance(message.payload["account_id"])
            return TaskResult(
                task_id=message.task_id,
                agent_id=self.card.agent_id,
                status="completed",
                result={"balance": balance},
            )
        return TaskResult(
            task_id=message.task_id,
            agent_id=self.card.agent_id,
            status="failed",
            error=f"Unknown action: {message.action}",
        )

FAQ

How do I handle delegation chains where Agent A asks Agent B, which asks Agent C?

Use the correlation_id field to trace the entire chain. Agent A sets the correlation ID when it creates the task. Agent B passes the same correlation ID when it delegates to Agent C. All logs and results share this ID, making the full execution chain traceable.

What happens when a target agent is down or unreachable?

Implement a circuit breaker pattern in the calling agent. After a configurable number of consecutive failures (typically 3-5), mark the target agent as unhealthy and stop sending requests for a cooldown period. Check the registry for alternative agents with the same capability and route to them instead.

Should agents communicate synchronously or asynchronously?

Use synchronous HTTP calls for tasks that complete in under a few seconds. For longer tasks like LLM inference or data processing, use an async pattern: the calling agent sends the task, receives a 202 Accepted with a task ID, and either polls for the result or receives a callback when it completes.


#AgentCommunication #AIAgents #APIDesign #FastAPI #MultiAgent #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.