Building a Custom Agent Orchestrator: When Off-the-Shelf Tools Are Not Enough
Learn when and how to build a custom agent orchestrator. Covers state machine design, queue integration, monitoring hooks, and the architectural patterns that make custom orchestrators maintainable.
When to Build Custom
Off-the-shelf orchestration platforms like Temporal, Prefect, and Airflow solve 80% of workflow needs. But AI agent systems sometimes hit the other 20%:
- Sub-second latency requirements that batch-oriented schedulers cannot meet
- Custom LLM routing logic that needs to inspect token counts, model availability, and cost in real time
- Tight integration with existing infrastructure that would require fighting an orchestration framework's opinions
- Specialized retry semantics — for example, retrying with a different model when one returns low-confidence results
- Multi-tenant isolation requirements that off-the-shelf tools do not support natively
If your agent system has any of these constraints, a custom orchestrator may be the right choice. The key is to build it with clear boundaries so it remains maintainable.
Core Architecture
A custom orchestrator has four components: a state machine that tracks workflow progress, a task queue that distributes work, a worker pool that executes tasks, and a persistence layer that stores state.
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
class WorkflowStatus(Enum):
CREATED = "created"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
PAUSED = "paused"
@dataclass
class Step:
name: str
handler: str # Dotted path to the handler function
status: StepStatus = StepStatus.PENDING
result: Any = None
error: str | None = None
attempts: int = 0
max_retries: int = 3
started_at: datetime | None = None
completed_at: datetime | None = None
@dataclass
class Workflow:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
status: WorkflowStatus = WorkflowStatus.CREATED
steps: list[Step] = field(default_factory=list)
context: dict = field(default_factory=dict)
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
The State Machine
The state machine enforces valid transitions and prevents workflows from entering inconsistent states.
class WorkflowStateMachine:
VALID_TRANSITIONS = {
WorkflowStatus.CREATED: {WorkflowStatus.RUNNING},
WorkflowStatus.RUNNING: {
WorkflowStatus.COMPLETED,
WorkflowStatus.FAILED,
WorkflowStatus.PAUSED,
},
WorkflowStatus.PAUSED: {WorkflowStatus.RUNNING, WorkflowStatus.FAILED},
WorkflowStatus.FAILED: {WorkflowStatus.RUNNING}, # Allow restart
}
def transition(self, workflow: Workflow, new_status: WorkflowStatus):
allowed = self.VALID_TRANSITIONS.get(workflow.status, set())
if new_status not in allowed:
raise ValueError(
f"Cannot transition from {workflow.status} to {new_status}"
)
workflow.status = new_status
workflow.updated_at = datetime.utcnow()
Queue Integration
Use Redis Streams or a similar lightweight queue to distribute work to workers.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
import redis.asyncio as redis
import json
class TaskQueue:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.stream = "agent:tasks"
self.group = "agent-workers"
async def initialize(self):
try:
await self.redis.xgroup_create(
self.stream, self.group, mkstream=True
)
except redis.ResponseError:
pass # Group already exists
async def enqueue(self, workflow_id: str, step_name: str, payload: dict):
await self.redis.xadd(
self.stream,
{
"workflow_id": workflow_id,
"step_name": step_name,
"payload": json.dumps(payload),
},
)
async def dequeue(self, consumer: str, count: int = 1, block_ms: int = 5000):
messages = await self.redis.xreadgroup(
self.group,
consumer,
{self.stream: ">"},
count=count,
block=block_ms,
)
results = []
for stream_name, entries in messages:
for msg_id, fields in entries:
results.append({
"id": msg_id,
"workflow_id": fields[b"workflow_id"].decode(),
"step_name": fields[b"step_name"].decode(),
"payload": json.loads(fields[b"payload"]),
})
return results
async def acknowledge(self, message_id: str):
await self.redis.xack(self.stream, self.group, message_id)
The Orchestrator Engine
import importlib
class Orchestrator:
def __init__(self, queue: TaskQueue, store: WorkflowStore):
self.queue = queue
self.store = store
self.state_machine = WorkflowStateMachine()
self.handlers: dict[str, callable] = {}
def register_handler(self, name: str, handler: callable):
self.handlers[name] = handler
async def start_workflow(self, workflow: Workflow) -> str:
self.state_machine.transition(workflow, WorkflowStatus.RUNNING)
await self.store.save(workflow)
# Enqueue the first pending step
for step in workflow.steps:
if step.status == StepStatus.PENDING:
await self.queue.enqueue(
workflow.id, step.name, workflow.context
)
break
return workflow.id
async def process_step_result(
self, workflow_id: str, step_name: str, result: Any
):
workflow = await self.store.load(workflow_id)
current_step = next(
s for s in workflow.steps if s.name == step_name
)
current_step.status = StepStatus.COMPLETED
current_step.result = result
current_step.completed_at = datetime.utcnow()
# Add result to context for downstream steps
workflow.context[f"{step_name}_result"] = result
# Find and enqueue next pending step
next_step = next(
(s for s in workflow.steps if s.status == StepStatus.PENDING),
None,
)
if next_step:
await self.queue.enqueue(
workflow.id, next_step.name, workflow.context
)
else:
self.state_machine.transition(
workflow, WorkflowStatus.COMPLETED
)
await self.store.save(workflow)
Monitoring Hooks
Add observability from day one. Emit structured events that can feed dashboards and alerts.
import logging
import time
logger = logging.getLogger("orchestrator")
class MonitoredOrchestrator(Orchestrator):
async def process_step_result(self, workflow_id, step_name, result):
start = time.monotonic()
await super().process_step_result(workflow_id, step_name, result)
duration = time.monotonic() - start
logger.info(
"step_completed",
extra={
"workflow_id": workflow_id,
"step_name": step_name,
"duration_ms": round(duration * 1000, 2),
"result_size": len(str(result)),
},
)
FAQ
How do I decide between building custom and using Temporal?
Start with Temporal or another off-the-shelf tool. Build custom only if you have confirmed that the existing tool cannot meet a specific requirement — latency, routing logic, multi-tenancy, or integration constraints. Most teams overestimate the need for custom orchestration and underestimate the maintenance cost.
What is the biggest risk with custom orchestrators?
Incomplete failure handling. Production orchestrators must handle worker crashes, partial failures, poison messages, timeout recovery, and state corruption. Off-the-shelf tools have years of hardening around these edge cases. Budget significant testing effort for failure scenarios if you build custom.
How do I migrate from a custom orchestrator to Temporal later?
Design your step handlers as pure functions that take inputs and return outputs without referencing the orchestrator directly. This makes them portable. The orchestration logic (step sequencing, retry policies, state transitions) is what changes when you migrate — the actual work functions stay the same.
#AgentOrchestration #CustomArchitecture #StateMachine #Python #SystemDesign #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.