Microservices for AI Agents: Service Decomposition and Inter-Agent Communication
How to structure AI agents as microservices with proper service boundaries, gRPC communication, circuit breakers, health checks, and service mesh integration.
Why Microservices for AI Agents?
When your AI system grows beyond a single monolithic agent, you face the same scaling challenges that drove the microservices revolution in traditional software. Different agents have different resource profiles — a research agent needs high network throughput, a coding agent needs CPU for running tests, and a writing agent needs large context windows which translate to high memory usage. Running them all in a single process wastes resources and creates a single point of failure.
Decomposing agents into microservices lets you scale each independently, deploy them on appropriate hardware, update them without downtime, and isolate failures. A bug in the research agent does not crash the writing agent. A spike in coding requests does not slow down email processing.
This article covers how to decompose an agent system into microservices, communicate between them using gRPC, implement resilience patterns, and deploy the whole thing with proper health monitoring.
Service Decomposition Strategy
The first decision is how to draw service boundaries. For AI agents, there are three natural decomposition strategies:
Strategy 1: Agent-Per-Service
Each specialist agent runs as its own service. This is the most common and usually the best starting point.
┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐
│ Gateway │ │ Research │ │ Writing │ │ Code │
│ Service │ │ Agent │ │ Agent │ │ Agent │
│ (Router) │ │ Service │ │ Service │ │ Service │
└─────┬──────┘ └──────┬─────┘ └──────┬─────┘ └──────┬─────┘
│ │ │ │
└────────────────┴───────────────┴───────────────┘
Shared Message Bus
Strategy 2: Capability-Per-Service
Group by capability rather than agent identity. Tools, LLM inference, and orchestration each get their own service.
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Orchestrator │ │ LLM Gateway │ │ Tool Runner │
│ Service │──│ Service │ │ Service │
│ │ │ (Multi-model)│ │ (Sandboxed) │
└──────────────┘ └──────────────┘ └──────────────┘
Strategy 3: Domain-Per-Service
Decompose by business domain, with each service containing the agents relevant to that domain.
The right choice depends on your scale. Start with agent-per-service and refactor to capability-per-service as you grow.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Defining Service Contracts with Protocol Buffers
gRPC provides type-safe, high-performance communication between agent services. Define the contracts first:
// proto/agent_service.proto
syntax = "proto3";
package agent;
service AgentService {
rpc ProcessTask (TaskRequest) returns (TaskResponse);
rpc StreamProcess (TaskRequest) returns (stream TaskChunk);
rpc HealthCheck (HealthRequest) returns (HealthResponse);
}
message TaskRequest {
string task_id = 1;
string correlation_id = 2;
string agent_type = 3;
string input = 4;
map<string, string> metadata = 5;
int32 max_tokens = 6;
float temperature = 7;
}
message TaskResponse {
string task_id = 1;
string output = 2;
Status status = 3;
int32 tokens_used = 4;
int64 latency_ms = 5;
repeated ToolCall tool_calls = 6;
}
message TaskChunk {
string chunk = 1;
bool is_final = 2;
}
message ToolCall {
string tool_name = 1;
string arguments = 2;
string result = 3;
int64 latency_ms = 4;
}
message HealthRequest {}
message HealthResponse {
bool healthy = 1;
string agent_name = 2;
string version = 3;
map<string, string> checks = 4;
}
enum Status {
SUCCESS = 0;
PARTIAL = 1;
FAILED = 2;
TIMEOUT = 3;
}
Generate the Python stubs:
pip install grpcio grpcio-tools
python -m grpc_tools.protoc -I./proto --python_out=./generated --grpc_python_out=./generated proto/agent_service.proto
Implementing a gRPC Agent Service
Each agent service implements the AgentService interface:
# services/research_service.py
import grpc
from concurrent import futures
import time
from generated import agent_service_pb2 as pb2
from generated import agent_service_pb2_grpc as pb2_grpc
from agents import Agent, Runner
import asyncio
class ResearchAgentServicer(pb2_grpc.AgentServiceServicer):
def __init__(self):
self.agent = Agent(
name="Research Agent",
instructions="You are a research specialist...",
tools=[],
model="gpt-4o",
)
self.request_count = 0
self.error_count = 0
def ProcessTask(self, request, context):
start = time.time()
self.request_count += 1
try:
# Run the agent
loop = asyncio.new_event_loop()
result = loop.run_until_complete(
Runner.run(self.agent, request.input)
)
loop.close()
latency = int((time.time() - start) * 1000)
return pb2.TaskResponse(
task_id=request.task_id,
output=result.final_output,
status=pb2.Status.SUCCESS,
latency_ms=latency,
)
except Exception as e:
self.error_count += 1
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(str(e))
return pb2.TaskResponse(
task_id=request.task_id,
output=str(e),
status=pb2.Status.FAILED,
latency_ms=int((time.time() - start) * 1000),
)
def HealthCheck(self, request, context):
return pb2.HealthResponse(
healthy=True,
agent_name="research-agent",
version="1.0.0",
checks={
"total_requests": str(self.request_count),
"total_errors": str(self.error_count),
"error_rate": f"{(self.error_count / max(self.request_count, 1)) * 100:.1f}%",
},
)
def serve():
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
("grpc.max_receive_message_length", 10 * 1024 * 1024),
("grpc.max_send_message_length", 10 * 1024 * 1024),
("grpc.keepalive_time_ms", 30000),
("grpc.keepalive_timeout_ms", 10000),
],
)
pb2_grpc.add_AgentServiceServicer_to_server(ResearchAgentServicer(), server)
server.add_insecure_port("[::]:50051")
server.start()
print("Research Agent service listening on port 50051")
server.wait_for_termination()
if __name__ == "__main__":
serve()
Implementing a gRPC Agent Client with Circuit Breakers
The client side includes circuit breaker logic to handle service failures gracefully:
# clients/agent_client.py
import grpc
import time
from enum import Enum
from generated import agent_service_pb2 as pb2
from generated import agent_service_pb2_grpc as pb2_grpc
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 30,
half_open_max: int = 3):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max = half_open_max
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.half_open_calls = 0
def can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
return True
return False
if self.state == CircuitState.HALF_OPEN:
return self.half_open_calls < self.half_open_max
return False
def record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max:
self.state = CircuitState.CLOSED
self.failure_count = 0
else:
self.failure_count = 0
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class AgentServiceClient:
def __init__(self, address: str):
self.channel = grpc.insecure_channel(address)
self.stub = pb2_grpc.AgentServiceStub(self.channel)
self.breaker = CircuitBreaker()
def process_task(self, task_id: str, input_text: str,
correlation_id: str = "", timeout: float = 60.0) -> pb2.TaskResponse:
if not self.breaker.can_execute():
raise Exception(
f"Circuit breaker is OPEN — service at {self.channel} is unavailable. "
f"Will retry in {self.breaker.recovery_timeout}s."
)
try:
response = self.stub.ProcessTask(
pb2.TaskRequest(
task_id=task_id,
correlation_id=correlation_id,
input=input_text,
),
timeout=timeout,
)
self.breaker.record_success()
return response
except grpc.RpcError as e:
self.breaker.record_failure()
raise Exception(
f"Agent service call failed: {e.code()} — {e.details()}"
) from e
def health_check(self) -> pb2.HealthResponse:
return self.stub.HealthCheck(pb2.HealthRequest(), timeout=5.0)
def close(self):
self.channel.close()
Gateway Service: Routing Requests to Specialist Agents
The gateway routes incoming requests to the appropriate specialist agent:
# services/gateway.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from clients.agent_client import AgentServiceClient
import os
app = FastAPI(title="Agent Gateway")
# Agent registry — in production, use service discovery
AGENT_REGISTRY = {
"research": AgentServiceClient(os.getenv("RESEARCH_AGENT_ADDR", "localhost:50051")),
"writing": AgentServiceClient(os.getenv("WRITING_AGENT_ADDR", "localhost:50052")),
"code": AgentServiceClient(os.getenv("CODE_AGENT_ADDR", "localhost:50053")),
}
class TaskInput(BaseModel):
input: str
agent_type: str = "research"
correlation_id: str = ""
class TaskOutput(BaseModel):
task_id: str
output: str
status: str
latency_ms: int
@app.post("/api/v1/tasks", response_model=TaskOutput)
async def create_task(task: TaskInput):
client = AGENT_REGISTRY.get(task.agent_type)
if not client:
raise HTTPException(404, f"Unknown agent type: {task.agent_type}")
try:
response = client.process_task(
task_id=f"task-{id(task)}",
input_text=task.input,
correlation_id=task.correlation_id,
)
return TaskOutput(
task_id=response.task_id,
output=response.output,
status=response.status.name if hasattr(response.status, 'name') else str(response.status),
latency_ms=response.latency_ms,
)
except Exception as e:
raise HTTPException(503, str(e))
@app.get("/api/v1/health")
async def health():
statuses = {}
for name, client in AGENT_REGISTRY.items():
try:
resp = client.health_check()
statuses[name] = {
"healthy": resp.healthy,
"version": resp.version,
"checks": dict(resp.checks),
}
except Exception as e:
statuses[name] = {"healthy": False, "error": str(e)}
return statuses
Kubernetes Deployment
Deploy each agent as a separate Kubernetes Deployment with proper resource limits:
# k8s/research-agent.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: research-agent
labels:
app: research-agent
spec:
replicas: 2
selector:
matchLabels:
app: research-agent
template:
metadata:
labels:
app: research-agent
spec:
containers:
- name: research-agent
image: agents/research:1.0.0
ports:
- containerPort: 50051
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agent-secrets
key: openai-api-key
readinessProbe:
grpc:
port: 50051
initialDelaySeconds: 10
periodSeconds: 5
livenessProbe:
grpc:
port: 50051
initialDelaySeconds: 15
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: research-agent
spec:
selector:
app: research-agent
ports:
- port: 50051
targetPort: 50051
type: ClusterIP
Service Mesh Integration
For production, use a service mesh like Istio or Linkerd to get automatic mTLS, traffic management, and observability without modifying application code:
# k8s/istio-config.yaml
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
name: research-agent-dr
spec:
host: research-agent
trafficPolicy:
connectionPool:
tcp:
maxConnections: 100
http:
h2UpgradePolicy: UPGRADE
outlierDetection:
consecutive5xxErrors: 3
interval: 30s
baseEjectionTime: 30s
maxEjectionPercent: 50
FAQ
When should I use gRPC versus REST for agent communication?
Use gRPC for internal agent-to-agent communication where you control both sides. It provides type safety through Protocol Buffers, streaming support for long-running agent tasks, and significantly lower overhead than JSON-based REST. Use REST only for external-facing APIs where clients may not support gRPC.
How do I handle agent service discovery in Kubernetes?
Kubernetes provides built-in service discovery via DNS. When you create a Service resource for each agent, other pods can reach it at agent-name.namespace.svc.cluster.local. For more advanced routing, use a service mesh that provides weighted routing, canary deployments, and automatic retries.
What is the right number of replicas for each agent service?
Start with 2 replicas for high availability and scale based on observed latency and queue depth. Agent services that call LLM APIs are typically IO-bound, so they can handle many concurrent requests per replica. Monitor the p99 latency and scale up when it exceeds your SLA.
How do I test a microservices agent system locally?
Use Docker Compose to run all services locally. Define each agent as a service in docker-compose.yml with the same environment variables as production. For the gRPC connections, use the Docker Compose service names as hostnames. This gives you a realistic local environment without needing Kubernetes.
Written by
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.