Skip to content
Learn Agentic AI14 min read0 views

Hierarchical Task Networks for AI Agents: Planning Complex Multi-Step Operations

Master Hierarchical Task Network (HTN) planning for AI agents including task decomposition, method selection, plan refinement, and execution monitoring with complete Python implementations.

What Are Hierarchical Task Networks?

When you ask an AI agent to "deploy a microservice," that instruction conceals dozens of subtasks: pull the latest code, run tests, build a container, push to a registry, update Kubernetes manifests, apply the deployment, verify health checks, and notify the team. An agent that tries to plan all of this at once will either miss steps or get lost in details.

Hierarchical Task Networks (HTN) solve this by organizing tasks into a hierarchy. High-level abstract tasks decompose into lower-level subtasks through predefined methods, continuing recursively until you reach primitive actions the agent can execute directly. HTN planning has been used in game AI, military logistics, and industrial automation for decades — and it maps perfectly onto agentic AI systems.

HTN Core Components

An HTN planner has four building blocks:

  1. Primitive tasks — Actions the agent can execute directly
  2. Compound tasks — Abstract tasks that must be decomposed
  3. Methods — Recipes for decomposing a compound task into subtasks
  4. World state — The current state of the environment, used to select which method applies
from dataclasses import dataclass, field
from typing import List, Callable, Dict, Any, Optional
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    name: str
    is_primitive: bool = False
    parameters: Dict[str, Any] = field(default_factory=dict)
    status: TaskStatus = TaskStatus.PENDING

@dataclass
class Method:
    """A recipe for decomposing a compound task into subtasks."""
    name: str
    target_task: str  # Name of the compound task this method decomposes
    precondition: Callable[[Dict], bool]  # When this method applies
    subtasks: Callable[[Dict, Dict], List[Task]]  # Generate subtasks

@dataclass
class WorldState:
    facts: Dict[str, Any] = field(default_factory=dict)

    def check(self, key: str, expected: Any = True) -> bool:
        return self.facts.get(key) == expected

    def update(self, key: str, value: Any):
        self.facts[key] = value

Building the HTN Planner

The planner recursively decomposes compound tasks until only primitive tasks remain.

class HTNPlanner:
    def __init__(self):
        self.methods: Dict[str, List[Method]] = {}

    def register_method(self, method: Method):
        if method.target_task not in self.methods:
            self.methods[method.target_task] = []
        self.methods[method.target_task].append(method)

    def plan(
        self, tasks: List[Task], state: WorldState
    ) -> Optional[List[Task]]:
        plan = []
        for task in tasks:
            result = self._decompose(task, state)
            if result is None:
                return None  # Planning failed
            plan.extend(result)
        return plan

    def _decompose(
        self, task: Task, state: WorldState
    ) -> Optional[List[Task]]:
        if task.is_primitive:
            return [task]

        methods = self.methods.get(task.name, [])
        for method in methods:
            if method.precondition(state.facts):
                subtasks = method.subtasks(task.parameters, state.facts)
                result = self.plan(subtasks, state)
                if result is not None:
                    return result

        return None  # No applicable method found

Defining a Domain: Microservice Deployment

Let us define an HTN domain for deploying a microservice.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

planner = HTNPlanner()

# Method 1: Deploy with Docker (when containerized)
planner.register_method(Method(
    name="deploy_containerized",
    target_task="deploy_service",
    precondition=lambda s: s.get("containerized", False),
    subtasks=lambda params, state: [
        Task("run_tests", is_primitive=True, parameters=params),
        Task("build_container", is_primitive=True, parameters=params),
        Task("push_to_registry", is_primitive=True, parameters=params),
        Task("apply_k8s_manifest", is_primitive=True, parameters=params),
        Task("verify_health", is_primitive=True, parameters=params),
        Task("notify_team", is_primitive=True, parameters=params),
    ],
))

# Method 2: Deploy as binary (when not containerized)
planner.register_method(Method(
    name="deploy_binary",
    target_task="deploy_service",
    precondition=lambda s: not s.get("containerized", False),
    subtasks=lambda params, state: [
        Task("run_tests", is_primitive=True, parameters=params),
        Task("build_binary", is_primitive=True, parameters=params),
        Task("upload_to_server", is_primitive=True, parameters=params),
        Task("restart_process", is_primitive=True, parameters=params),
        Task("verify_health", is_primitive=True, parameters=params),
        Task("notify_team", is_primitive=True, parameters=params),
    ],
))

# Plan for a containerized deployment
state = WorldState(facts={"containerized": True, "has_tests": True})
root_task = Task("deploy_service", parameters={"service": "user-api"})
plan = planner.plan([root_task], state)

for i, task in enumerate(plan):
    print(f"Step {i+1}: {task.name} ({task.parameters})")

Execution Monitor

Planning is only half the problem. The execution monitor runs the plan, handles failures, and triggers re-planning when the world state changes unexpectedly.

import asyncio

class ExecutionMonitor:
    def __init__(self, planner: HTNPlanner):
        self.planner = planner
        self.executors: Dict[str, Callable] = {}

    def register_executor(self, task_name: str, executor: Callable):
        self.executors[task_name] = executor

    async def execute_plan(
        self, plan: List[Task], state: WorldState
    ) -> bool:
        for task in plan:
            task.status = TaskStatus.RUNNING
            executor = self.executors.get(task.name)
            if not executor:
                print(f"No executor for {task.name}")
                task.status = TaskStatus.FAILED
                return False

            try:
                result = await executor(task.parameters, state)
                if result:
                    task.status = TaskStatus.COMPLETED
                    state.update(f"{task.name}_done", True)
                else:
                    task.status = TaskStatus.FAILED
                    return await self._handle_failure(task, plan, state)
            except Exception as e:
                print(f"Task {task.name} raised: {e}")
                task.status = TaskStatus.FAILED
                return await self._handle_failure(task, plan, state)

        return True

    async def _handle_failure(
        self, failed_task: Task, plan: List[Task], state: WorldState
    ) -> bool:
        state.update(f"{failed_task.name}_failed", True)
        remaining = [t for t in plan if t.status == TaskStatus.PENDING]
        if not remaining:
            return False
        # Attempt re-planning for remaining tasks
        new_plan = self.planner.plan(remaining, state)
        if new_plan:
            return await self.execute_plan(new_plan, state)
        return False

Dynamic Plan Refinement

The power of HTN planning is that methods can be added or modified at runtime. An LLM can generate new methods based on novel situations, expanding the planner's capabilities without code changes.

FAQ

How is HTN planning different from simple step-by-step prompting?

Step-by-step prompting asks an LLM to generate all steps at once, with no formal structure for preconditions, method selection, or failure recovery. HTN planning uses a formal decomposition hierarchy where method selection is driven by world state, enabling principled replanning when steps fail and deterministic behavior for known domains.

Can I combine HTN planning with LLM-based agents?

Absolutely. The best approach is to use HTN planning for the known, structured parts of a workflow and delegate to LLM agents for the creative or uncertain subtasks. For example, the "run_tests" primitive might be a deterministic script, while "generate_test_cases" could be an LLM-powered compound task with its own methods.

What happens when no method's preconditions match?

The planner returns None, indicating planning failure. Your system should handle this by either relaxing preconditions, asking a human for guidance, or falling back to an LLM agent to invent a novel decomposition for the task.


#HTNPlanning #TaskDecomposition #AIPlanning #AgentArchitecture #MultiAgentSystems #AgenticAI #PythonAI #AutonomousAgents

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.