Skip to content
Learn Agentic AI15 min read0 views

Building a Custom Agent Orchestrator: When Off-the-Shelf Tools Are Not Enough

Learn when and how to build a custom agent orchestrator. Covers state machine design, queue integration, monitoring hooks, and the architectural patterns that make custom orchestrators maintainable.

When to Build Custom

Off-the-shelf orchestration platforms like Temporal, Prefect, and Airflow solve 80% of workflow needs. But AI agent systems sometimes hit the other 20%:

  • Sub-second latency requirements that batch-oriented schedulers cannot meet
  • Custom LLM routing logic that needs to inspect token counts, model availability, and cost in real time
  • Tight integration with existing infrastructure that would require fighting an orchestration framework's opinions
  • Specialized retry semantics — for example, retrying with a different model when one returns low-confidence results
  • Multi-tenant isolation requirements that off-the-shelf tools do not support natively

If your agent system has any of these constraints, a custom orchestrator may be the right choice. The key is to build it with clear boundaries so it remains maintainable.

Core Architecture

A custom orchestrator has four components: a state machine that tracks workflow progress, a task queue that distributes work, a worker pool that executes tasks, and a persistence layer that stores state.

from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import uuid

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRYING = "retrying"

class WorkflowStatus(Enum):
    CREATED = "created"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    PAUSED = "paused"

@dataclass
class Step:
    name: str
    handler: str  # Dotted path to the handler function
    status: StepStatus = StepStatus.PENDING
    result: Any = None
    error: str | None = None
    attempts: int = 0
    max_retries: int = 3
    started_at: datetime | None = None
    completed_at: datetime | None = None

@dataclass
class Workflow:
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    status: WorkflowStatus = WorkflowStatus.CREATED
    steps: list[Step] = field(default_factory=list)
    context: dict = field(default_factory=dict)
    created_at: datetime = field(default_factory=datetime.utcnow)
    updated_at: datetime = field(default_factory=datetime.utcnow)

The State Machine

The state machine enforces valid transitions and prevents workflows from entering inconsistent states.

class WorkflowStateMachine:
    VALID_TRANSITIONS = {
        WorkflowStatus.CREATED: {WorkflowStatus.RUNNING},
        WorkflowStatus.RUNNING: {
            WorkflowStatus.COMPLETED,
            WorkflowStatus.FAILED,
            WorkflowStatus.PAUSED,
        },
        WorkflowStatus.PAUSED: {WorkflowStatus.RUNNING, WorkflowStatus.FAILED},
        WorkflowStatus.FAILED: {WorkflowStatus.RUNNING},  # Allow restart
    }

    def transition(self, workflow: Workflow, new_status: WorkflowStatus):
        allowed = self.VALID_TRANSITIONS.get(workflow.status, set())
        if new_status not in allowed:
            raise ValueError(
                f"Cannot transition from {workflow.status} to {new_status}"
            )
        workflow.status = new_status
        workflow.updated_at = datetime.utcnow()

Queue Integration

Use Redis Streams or a similar lightweight queue to distribute work to workers.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

import redis.asyncio as redis
import json

class TaskQueue:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.stream = "agent:tasks"
        self.group = "agent-workers"

    async def initialize(self):
        try:
            await self.redis.xgroup_create(
                self.stream, self.group, mkstream=True
            )
        except redis.ResponseError:
            pass  # Group already exists

    async def enqueue(self, workflow_id: str, step_name: str, payload: dict):
        await self.redis.xadd(
            self.stream,
            {
                "workflow_id": workflow_id,
                "step_name": step_name,
                "payload": json.dumps(payload),
            },
        )

    async def dequeue(self, consumer: str, count: int = 1, block_ms: int = 5000):
        messages = await self.redis.xreadgroup(
            self.group,
            consumer,
            {self.stream: ">"},
            count=count,
            block=block_ms,
        )
        results = []
        for stream_name, entries in messages:
            for msg_id, fields in entries:
                results.append({
                    "id": msg_id,
                    "workflow_id": fields[b"workflow_id"].decode(),
                    "step_name": fields[b"step_name"].decode(),
                    "payload": json.loads(fields[b"payload"]),
                })
        return results

    async def acknowledge(self, message_id: str):
        await self.redis.xack(self.stream, self.group, message_id)

The Orchestrator Engine

import importlib

class Orchestrator:
    def __init__(self, queue: TaskQueue, store: WorkflowStore):
        self.queue = queue
        self.store = store
        self.state_machine = WorkflowStateMachine()
        self.handlers: dict[str, callable] = {}

    def register_handler(self, name: str, handler: callable):
        self.handlers[name] = handler

    async def start_workflow(self, workflow: Workflow) -> str:
        self.state_machine.transition(workflow, WorkflowStatus.RUNNING)
        await self.store.save(workflow)

        # Enqueue the first pending step
        for step in workflow.steps:
            if step.status == StepStatus.PENDING:
                await self.queue.enqueue(
                    workflow.id, step.name, workflow.context
                )
                break
        return workflow.id

    async def process_step_result(
        self, workflow_id: str, step_name: str, result: Any
    ):
        workflow = await self.store.load(workflow_id)
        current_step = next(
            s for s in workflow.steps if s.name == step_name
        )
        current_step.status = StepStatus.COMPLETED
        current_step.result = result
        current_step.completed_at = datetime.utcnow()

        # Add result to context for downstream steps
        workflow.context[f"{step_name}_result"] = result

        # Find and enqueue next pending step
        next_step = next(
            (s for s in workflow.steps if s.status == StepStatus.PENDING),
            None,
        )
        if next_step:
            await self.queue.enqueue(
                workflow.id, next_step.name, workflow.context
            )
        else:
            self.state_machine.transition(
                workflow, WorkflowStatus.COMPLETED
            )

        await self.store.save(workflow)

Monitoring Hooks

Add observability from day one. Emit structured events that can feed dashboards and alerts.

import logging
import time

logger = logging.getLogger("orchestrator")

class MonitoredOrchestrator(Orchestrator):
    async def process_step_result(self, workflow_id, step_name, result):
        start = time.monotonic()
        await super().process_step_result(workflow_id, step_name, result)
        duration = time.monotonic() - start

        logger.info(
            "step_completed",
            extra={
                "workflow_id": workflow_id,
                "step_name": step_name,
                "duration_ms": round(duration * 1000, 2),
                "result_size": len(str(result)),
            },
        )

FAQ

How do I decide between building custom and using Temporal?

Start with Temporal or another off-the-shelf tool. Build custom only if you have confirmed that the existing tool cannot meet a specific requirement — latency, routing logic, multi-tenancy, or integration constraints. Most teams overestimate the need for custom orchestration and underestimate the maintenance cost.

What is the biggest risk with custom orchestrators?

Incomplete failure handling. Production orchestrators must handle worker crashes, partial failures, poison messages, timeout recovery, and state corruption. Off-the-shelf tools have years of hardening around these edge cases. Budget significant testing effort for failure scenarios if you build custom.

How do I migrate from a custom orchestrator to Temporal later?

Design your step handlers as pure functions that take inputs and return outputs without referencing the orchestrator directly. This makes them portable. The orchestration logic (step sequencing, retry policies, state transitions) is what changes when you migrate — the actual work functions stay the same.


#AgentOrchestration #CustomArchitecture #StateMachine #Python #SystemDesign #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.