Skip to content
Learn Agentic AI
Learn Agentic AI16 min read0 views

Microservices for AI Agents: Service Decomposition and Inter-Agent Communication

How to structure AI agents as microservices with proper service boundaries, gRPC communication, circuit breakers, health checks, and service mesh integration.

Why Microservices for AI Agents?

When your AI system grows beyond a single monolithic agent, you face the same scaling challenges that drove the microservices revolution in traditional software. Different agents have different resource profiles — a research agent needs high network throughput, a coding agent needs CPU for running tests, and a writing agent needs large context windows which translate to high memory usage. Running them all in a single process wastes resources and creates a single point of failure.

Decomposing agents into microservices lets you scale each independently, deploy them on appropriate hardware, update them without downtime, and isolate failures. A bug in the research agent does not crash the writing agent. A spike in coding requests does not slow down email processing.

This article covers how to decompose an agent system into microservices, communicate between them using gRPC, implement resilience patterns, and deploy the whole thing with proper health monitoring.

Service Decomposition Strategy

The first decision is how to draw service boundaries. For AI agents, there are three natural decomposition strategies:

Strategy 1: Agent-Per-Service

Each specialist agent runs as its own service. This is the most common and usually the best starting point.

┌────────────┐  ┌────────────┐  ┌────────────┐  ┌────────────┐
│  Gateway    │  │  Research   │  │  Writing    │  │  Code       │
│  Service    │  │  Agent      │  │  Agent      │  │  Agent      │
│  (Router)   │  │  Service    │  │  Service    │  │  Service    │
└─────┬──────┘  └──────┬─────┘  └──────┬─────┘  └──────┬─────┘
      │                │               │               │
      └────────────────┴───────────────┴───────────────┘
                    Shared Message Bus

Strategy 2: Capability-Per-Service

Group by capability rather than agent identity. Tools, LLM inference, and orchestration each get their own service.

┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│  Orchestrator │  │  LLM Gateway  │  │  Tool Runner  │
│  Service      │──│  Service      │  │  Service      │
│               │  │  (Multi-model)│  │  (Sandboxed)  │
└──────────────┘  └──────────────┘  └──────────────┘

Strategy 3: Domain-Per-Service

Decompose by business domain, with each service containing the agents relevant to that domain.

The right choice depends on your scale. Start with agent-per-service and refactor to capability-per-service as you grow.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Defining Service Contracts with Protocol Buffers

gRPC provides type-safe, high-performance communication between agent services. Define the contracts first:

// proto/agent_service.proto
syntax = "proto3";
package agent;

service AgentService {
  rpc ProcessTask (TaskRequest) returns (TaskResponse);
  rpc StreamProcess (TaskRequest) returns (stream TaskChunk);
  rpc HealthCheck (HealthRequest) returns (HealthResponse);
}

message TaskRequest {
  string task_id = 1;
  string correlation_id = 2;
  string agent_type = 3;
  string input = 4;
  map<string, string> metadata = 5;
  int32 max_tokens = 6;
  float temperature = 7;
}

message TaskResponse {
  string task_id = 1;
  string output = 2;
  Status status = 3;
  int32 tokens_used = 4;
  int64 latency_ms = 5;
  repeated ToolCall tool_calls = 6;
}

message TaskChunk {
  string chunk = 1;
  bool is_final = 2;
}

message ToolCall {
  string tool_name = 1;
  string arguments = 2;
  string result = 3;
  int64 latency_ms = 4;
}

message HealthRequest {}
message HealthResponse {
  bool healthy = 1;
  string agent_name = 2;
  string version = 3;
  map<string, string> checks = 4;
}

enum Status {
  SUCCESS = 0;
  PARTIAL = 1;
  FAILED = 2;
  TIMEOUT = 3;
}

Generate the Python stubs:

pip install grpcio grpcio-tools
python -m grpc_tools.protoc -I./proto --python_out=./generated --grpc_python_out=./generated proto/agent_service.proto

Implementing a gRPC Agent Service

Each agent service implements the AgentService interface:

# services/research_service.py
import grpc
from concurrent import futures
import time
from generated import agent_service_pb2 as pb2
from generated import agent_service_pb2_grpc as pb2_grpc
from agents import Agent, Runner
import asyncio

class ResearchAgentServicer(pb2_grpc.AgentServiceServicer):
    def __init__(self):
        self.agent = Agent(
            name="Research Agent",
            instructions="You are a research specialist...",
            tools=[],
            model="gpt-4o",
        )
        self.request_count = 0
        self.error_count = 0

    def ProcessTask(self, request, context):
        start = time.time()
        self.request_count += 1

        try:
            # Run the agent
            loop = asyncio.new_event_loop()
            result = loop.run_until_complete(
                Runner.run(self.agent, request.input)
            )
            loop.close()

            latency = int((time.time() - start) * 1000)
            return pb2.TaskResponse(
                task_id=request.task_id,
                output=result.final_output,
                status=pb2.Status.SUCCESS,
                latency_ms=latency,
            )
        except Exception as e:
            self.error_count += 1
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(str(e))
            return pb2.TaskResponse(
                task_id=request.task_id,
                output=str(e),
                status=pb2.Status.FAILED,
                latency_ms=int((time.time() - start) * 1000),
            )

    def HealthCheck(self, request, context):
        return pb2.HealthResponse(
            healthy=True,
            agent_name="research-agent",
            version="1.0.0",
            checks={
                "total_requests": str(self.request_count),
                "total_errors": str(self.error_count),
                "error_rate": f"{(self.error_count / max(self.request_count, 1)) * 100:.1f}%",
            },
        )

def serve():
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        options=[
            ("grpc.max_receive_message_length", 10 * 1024 * 1024),
            ("grpc.max_send_message_length", 10 * 1024 * 1024),
            ("grpc.keepalive_time_ms", 30000),
            ("grpc.keepalive_timeout_ms", 10000),
        ],
    )
    pb2_grpc.add_AgentServiceServicer_to_server(ResearchAgentServicer(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    print("Research Agent service listening on port 50051")
    server.wait_for_termination()

if __name__ == "__main__":
    serve()

Implementing a gRPC Agent Client with Circuit Breakers

The client side includes circuit breaker logic to handle service failures gracefully:

# clients/agent_client.py
import grpc
import time
from enum import Enum
from generated import agent_service_pb2 as pb2
from generated import agent_service_pb2_grpc as pb2_grpc

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 30,
                 half_open_max: int = 3):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max = half_open_max
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = 0
        self.half_open_calls = 0

    def can_execute(self) -> bool:
        if self.state == CircuitState.CLOSED:
            return True
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
                return True
            return False
        if self.state == CircuitState.HALF_OPEN:
            return self.half_open_calls < self.half_open_max
        return False

    def record_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.half_open_max:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
        else:
            self.failure_count = 0

    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

class AgentServiceClient:
    def __init__(self, address: str):
        self.channel = grpc.insecure_channel(address)
        self.stub = pb2_grpc.AgentServiceStub(self.channel)
        self.breaker = CircuitBreaker()

    def process_task(self, task_id: str, input_text: str,
                     correlation_id: str = "", timeout: float = 60.0) -> pb2.TaskResponse:
        if not self.breaker.can_execute():
            raise Exception(
                f"Circuit breaker is OPEN — service at {self.channel} is unavailable. "
                f"Will retry in {self.breaker.recovery_timeout}s."
            )

        try:
            response = self.stub.ProcessTask(
                pb2.TaskRequest(
                    task_id=task_id,
                    correlation_id=correlation_id,
                    input=input_text,
                ),
                timeout=timeout,
            )
            self.breaker.record_success()
            return response
        except grpc.RpcError as e:
            self.breaker.record_failure()
            raise Exception(
                f"Agent service call failed: {e.code()} — {e.details()}"
            ) from e

    def health_check(self) -> pb2.HealthResponse:
        return self.stub.HealthCheck(pb2.HealthRequest(), timeout=5.0)

    def close(self):
        self.channel.close()

Gateway Service: Routing Requests to Specialist Agents

The gateway routes incoming requests to the appropriate specialist agent:

# services/gateway.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from clients.agent_client import AgentServiceClient
import os

app = FastAPI(title="Agent Gateway")

# Agent registry — in production, use service discovery
AGENT_REGISTRY = {
    "research": AgentServiceClient(os.getenv("RESEARCH_AGENT_ADDR", "localhost:50051")),
    "writing": AgentServiceClient(os.getenv("WRITING_AGENT_ADDR", "localhost:50052")),
    "code": AgentServiceClient(os.getenv("CODE_AGENT_ADDR", "localhost:50053")),
}

class TaskInput(BaseModel):
    input: str
    agent_type: str = "research"
    correlation_id: str = ""

class TaskOutput(BaseModel):
    task_id: str
    output: str
    status: str
    latency_ms: int

@app.post("/api/v1/tasks", response_model=TaskOutput)
async def create_task(task: TaskInput):
    client = AGENT_REGISTRY.get(task.agent_type)
    if not client:
        raise HTTPException(404, f"Unknown agent type: {task.agent_type}")

    try:
        response = client.process_task(
            task_id=f"task-{id(task)}",
            input_text=task.input,
            correlation_id=task.correlation_id,
        )
        return TaskOutput(
            task_id=response.task_id,
            output=response.output,
            status=response.status.name if hasattr(response.status, 'name') else str(response.status),
            latency_ms=response.latency_ms,
        )
    except Exception as e:
        raise HTTPException(503, str(e))

@app.get("/api/v1/health")
async def health():
    statuses = {}
    for name, client in AGENT_REGISTRY.items():
        try:
            resp = client.health_check()
            statuses[name] = {
                "healthy": resp.healthy,
                "version": resp.version,
                "checks": dict(resp.checks),
            }
        except Exception as e:
            statuses[name] = {"healthy": False, "error": str(e)}
    return statuses

Kubernetes Deployment

Deploy each agent as a separate Kubernetes Deployment with proper resource limits:

# k8s/research-agent.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: research-agent
  labels:
    app: research-agent
spec:
  replicas: 2
  selector:
    matchLabels:
      app: research-agent
  template:
    metadata:
      labels:
        app: research-agent
    spec:
      containers:
        - name: research-agent
          image: agents/research:1.0.0
          ports:
            - containerPort: 50051
          resources:
            requests:
              memory: "512Mi"
              cpu: "250m"
            limits:
              memory: "1Gi"
              cpu: "500m"
          env:
            - name: OPENAI_API_KEY
              valueFrom:
                secretKeyRef:
                  name: agent-secrets
                  key: openai-api-key
          readinessProbe:
            grpc:
              port: 50051
            initialDelaySeconds: 10
            periodSeconds: 5
          livenessProbe:
            grpc:
              port: 50051
            initialDelaySeconds: 15
            periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: research-agent
spec:
  selector:
    app: research-agent
  ports:
    - port: 50051
      targetPort: 50051
  type: ClusterIP

Service Mesh Integration

For production, use a service mesh like Istio or Linkerd to get automatic mTLS, traffic management, and observability without modifying application code:

# k8s/istio-config.yaml
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: research-agent-dr
spec:
  host: research-agent
  trafficPolicy:
    connectionPool:
      tcp:
        maxConnections: 100
      http:
        h2UpgradePolicy: UPGRADE
    outlierDetection:
      consecutive5xxErrors: 3
      interval: 30s
      baseEjectionTime: 30s
      maxEjectionPercent: 50

FAQ

When should I use gRPC versus REST for agent communication?

Use gRPC for internal agent-to-agent communication where you control both sides. It provides type safety through Protocol Buffers, streaming support for long-running agent tasks, and significantly lower overhead than JSON-based REST. Use REST only for external-facing APIs where clients may not support gRPC.

How do I handle agent service discovery in Kubernetes?

Kubernetes provides built-in service discovery via DNS. When you create a Service resource for each agent, other pods can reach it at agent-name.namespace.svc.cluster.local. For more advanced routing, use a service mesh that provides weighted routing, canary deployments, and automatic retries.

What is the right number of replicas for each agent service?

Start with 2 replicas for high availability and scale based on observed latency and queue depth. Agent services that call LLM APIs are typically IO-bound, so they can handle many concurrent requests per replica. Monitor the p99 latency and scale up when it exceeds your SLA.

How do I test a microservices agent system locally?

Use Docker Compose to run all services locally. Define each agent as a service in docker-compose.yml with the same environment variables as production. For the gRPC connections, use the Docker Compose service names as hostnames. This gives you a realistic local environment without needing Kubernetes.

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.