Skip to content
Learn Agentic AI
Learn Agentic AI17 min read0 views

LangGraph Agent Patterns 2026: Building Stateful Multi-Step AI Workflows

Complete LangGraph tutorial covering state machines for agents, conditional edges, human-in-the-loop patterns, checkpointing, and parallel execution with full code examples.

Why LangGraph Exists

LangChain made it easy to chain LLM calls together. But real-world agents are not chains — they are graphs. An agent that processes a customer refund request needs to verify the purchase, check the refund policy, determine if manager approval is required, wait for that approval, process the refund, and send a confirmation. Some of these steps happen conditionally. Some happen in parallel. Some require human input. A linear chain cannot model this.

LangGraph extends LangChain with a graph-based execution engine built on state machines. Each node in the graph is a function that reads and writes to a shared state object. Edges connect nodes — either unconditionally (always go from A to B) or conditionally (go to B if the amount is under $100, go to C if it needs approval). The graph compiles into an executable workflow that handles branching, looping, parallel execution, and persistence out of the box.

Core Concepts: State, Nodes, and Edges

Every LangGraph workflow starts with a state definition. The state is a TypedDict (or Pydantic model) that holds all data flowing through the workflow. Nodes are functions that receive the current state and return updates. Edges define the flow between nodes.

from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_openai import ChatOpenAI

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]
    current_step: str
    tool_results: dict
    needs_approval: bool
    approved: bool | None

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def analyze_request(state: AgentState) -> dict:
    """First node: analyze the user request."""
    messages = state["messages"]
    response = llm.invoke(
        [{"role": "system", "content": "Analyze the user request. "
          "Determine if it needs manager approval (amount > $100)."}]
        + messages
    )
    # Parse response to determine approval need
    needs_approval = "$" in response.content and "approval" in response.content.lower()
    return {
        "messages": [response],
        "current_step": "analysis",
        "needs_approval": needs_approval,
    }

def process_directly(state: AgentState) -> dict:
    """Process request without approval."""
    response = llm.invoke(
        [{"role": "system", "content": "Process this request directly. "
          "Generate a confirmation message."}]
        + state["messages"]
    )
    return {"messages": [response], "current_step": "processed"}

def request_approval(state: AgentState) -> dict:
    """Route to human approval."""
    return {
        "messages": [{"role": "assistant",
                       "content": "This request requires manager approval. "
                       "Waiting for approval..."}],
        "current_step": "awaiting_approval",
    }

def process_after_approval(state: AgentState) -> dict:
    """Process after receiving approval."""
    if state.get("approved"):
        response = llm.invoke(
            [{"role": "system", "content": "The request has been approved. "
              "Process it and generate confirmation."}]
            + state["messages"]
        )
    else:
        response = llm.invoke(
            [{"role": "system", "content": "The request was denied. "
              "Generate a polite denial message."}]
            + state["messages"]
        )
    return {"messages": [response], "current_step": "completed"}

# Define the routing function
def route_after_analysis(state: AgentState) -> Literal["process_directly", "request_approval"]:
    if state["needs_approval"]:
        return "request_approval"
    return "process_directly"

# Build the graph
graph = StateGraph(AgentState)

# Add nodes
graph.add_node("analyze", analyze_request)
graph.add_node("process_directly", process_directly)
graph.add_node("request_approval", request_approval)
graph.add_node("process_after_approval", process_after_approval)

# Add edges
graph.add_edge(START, "analyze")
graph.add_conditional_edges("analyze", route_after_analysis)
graph.add_edge("process_directly", END)
graph.add_edge("request_approval", "process_after_approval")
graph.add_edge("process_after_approval", END)

# Compile
app = graph.compile()

Human-in-the-Loop with Interrupts

One of LangGraph's most powerful features is its interrupt mechanism. You can pause execution at any node, persist the state, wait for human input (hours or days later), and resume exactly where you left off. This is essential for approval workflows, review steps, and escalation patterns.

from langgraph.checkpoint.memory import MemorySaver

# Compile with checkpointing and interrupt
memory = MemorySaver()
app = graph.compile(
    checkpointer=memory,
    interrupt_before=["process_after_approval"],
)

# Run until interrupt
config = {"configurable": {"thread_id": "request-123"}}
result = app.invoke(
    {"messages": [{"role": "user", "content": "I need a refund for $250"}],
     "needs_approval": False, "approved": None, "tool_results": {},
     "current_step": ""},
    config=config,
)
# Execution pauses before process_after_approval

# Later: inject human decision and resume
app.update_state(
    config,
    {"approved": True},
    as_node="request_approval",
)
result = app.invoke(None, config=config)
# Execution resumes from the interrupt point

The key insight is that LangGraph serializes the entire state to the checkpointer. When you call invoke with None and the same thread_id, it loads the saved state and continues from where it stopped. This works across process restarts — if you use a persistent checkpointer (PostgreSQL, Redis), your workflows survive server crashes.

Tool Integration with LangGraph

Agents need tools. LangGraph integrates with LangChain tools through a prebuilt ToolNode that handles tool execution automatically.

from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode

@tool
def get_order_status(order_id: str) -> str:
    """Look up the current status of an order."""
    # In production, query your database
    orders = {
        "ORD-001": "shipped",
        "ORD-002": "processing",
        "ORD-003": "delivered",
    }
    return orders.get(order_id, "not found")

@tool
def process_refund(order_id: str, amount: float, reason: str) -> str:
    """Process a refund for an order."""
    return f"Refund of ${amount:.2f} processed for {order_id}. Reason: {reason}"

@tool
def send_notification(email: str, message: str) -> str:
    """Send an email notification to a customer."""
    return f"Notification sent to {email}: {message}"

tools = [get_order_status, process_refund, send_notification]
tool_node = ToolNode(tools)
llm_with_tools = llm.bind_tools(tools)

def agent_node(state: AgentState) -> dict:
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response]}

def should_use_tool(state: AgentState) -> Literal["tools", "end"]:
    last_message = state["messages"][-1]
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
        return "tools"
    return "end"

# Build agent with tool loop
tool_graph = StateGraph(AgentState)
tool_graph.add_node("agent", agent_node)
tool_graph.add_node("tools", tool_node)
tool_graph.add_edge(START, "agent")
tool_graph.add_conditional_edges("agent", should_use_tool, {
    "tools": "tools",
    "end": END,
})
tool_graph.add_edge("tools", "agent")  # Loop back after tool execution

tool_app = tool_graph.compile()

This creates the classic ReAct loop: the agent decides whether to call a tool, the tool executes, the result feeds back to the agent, and the agent decides again. The loop continues until the agent responds without calling a tool.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Parallel Execution with Fan-Out

LangGraph supports parallel node execution for independent tasks. When multiple sub-tasks do not depend on each other, you can fan out to process them simultaneously and fan in to collect results.

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator

class ParallelState(TypedDict):
    query: str
    web_results: str
    db_results: str
    api_results: str
    final_answer: str

def search_web(state: ParallelState) -> dict:
    # Simulate web search
    return {"web_results": f"Web results for: {state['query']}"}

def search_database(state: ParallelState) -> dict:
    # Simulate database query
    return {"db_results": f"DB results for: {state['query']}"}

def call_external_api(state: ParallelState) -> dict:
    # Simulate API call
    return {"api_results": f"API results for: {state['query']}"}

def synthesize(state: ParallelState) -> dict:
    combined = f"""Based on:
    Web: {state['web_results']}
    Database: {state['db_results']}
    API: {state['api_results']}"""

    response = llm.invoke(
        f"Synthesize these results into a comprehensive answer: {combined}"
    )
    return {"final_answer": response.content}

parallel_graph = StateGraph(ParallelState)
parallel_graph.add_node("web", search_web)
parallel_graph.add_node("db", search_database)
parallel_graph.add_node("api", call_external_api)
parallel_graph.add_node("synthesize", synthesize)

# Fan out: START -> all three search nodes
parallel_graph.add_edge(START, "web")
parallel_graph.add_edge(START, "db")
parallel_graph.add_edge(START, "api")

# Fan in: all search nodes -> synthesize
parallel_graph.add_edge("web", "synthesize")
parallel_graph.add_edge("db", "synthesize")
parallel_graph.add_edge("api", "synthesize")
parallel_graph.add_edge("synthesize", END)

parallel_app = parallel_graph.compile()

LangGraph detects that web, db, and api nodes have no dependencies between them and executes them concurrently. The synthesize node waits until all three complete before running.

Subgraphs: Composing Complex Workflows

Large agent systems benefit from modularity. LangGraph supports subgraphs — complete graph workflows that are embedded as a single node in a parent graph. This lets you build reusable agent components.

# Define a reusable research subgraph
def build_research_subgraph():
    class ResearchState(TypedDict):
        topic: str
        sources: list[str]
        summary: str

    def find_sources(state: ResearchState) -> dict:
        return {"sources": [f"Source about {state['topic']}"]}

    def summarize_sources(state: ResearchState) -> dict:
        return {"summary": f"Summary of {len(state['sources'])} sources on {state['topic']}"}

    sub = StateGraph(ResearchState)
    sub.add_node("find", find_sources)
    sub.add_node("summarize", summarize_sources)
    sub.add_edge(START, "find")
    sub.add_edge("find", "summarize")
    sub.add_edge("summarize", END)
    return sub.compile()

research_agent = build_research_subgraph()

# Use as a node in the parent graph
class MainState(TypedDict):
    user_query: str
    research_result: str
    final_response: str

def do_research(state: MainState) -> dict:
    result = research_agent.invoke({"topic": state["user_query"], "sources": [], "summary": ""})
    return {"research_result": result["summary"]}

def generate_response(state: MainState) -> dict:
    return {"final_response": f"Based on research: {state['research_result']}"}

main = StateGraph(MainState)
main.add_node("research", do_research)
main.add_node("respond", generate_response)
main.add_edge(START, "research")
main.add_edge("research", "respond")
main.add_edge("respond", END)
main_app = main.compile()

Production Deployment Patterns

For production, replace MemorySaver with a persistent checkpointer. LangGraph provides PostgreSQL and Redis checkpointers that survive process restarts.

from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

async def build_production_app():
    checkpointer = AsyncPostgresSaver.from_conn_string(
        "postgresql://user:pass@localhost:5432/langgraph"
    )
    await checkpointer.setup()

    return graph.compile(
        checkpointer=checkpointer,
        interrupt_before=["process_after_approval"],
    )

Add observability by integrating with LangSmith for tracing every node execution, state transition, and tool call. This is critical for debugging workflows that span hours or days.

FAQ

How does LangGraph differ from a plain state machine library?

LangGraph is purpose-built for LLM-based workflows. While it uses state machine concepts, it adds LLM-specific features: native tool execution with the ToolNode, message history management with add_messages reducers, built-in streaming of both tokens and state updates, and checkpointing designed for long-running AI workflows. A generic state machine library would require you to implement all of these from scratch.

Can LangGraph handle workflows that run for days or weeks?

Yes, this is one of LangGraph's primary design goals. With a persistent checkpointer (PostgreSQL or Redis), workflow state survives process restarts, server crashes, and deployments. You can start a workflow, interrupt it for human approval, and resume it days later. The thread_id identifies each workflow instance, and the checkpointer stores the full state at each step. You can even replay a workflow from any checkpoint for debugging.

How do I handle errors in LangGraph nodes?

Wrap node logic in try/except blocks and write error information to the state. Then use conditional edges to route to error-handling nodes. For transient failures (API timeouts, rate limits), use LangGraph's built-in retry mechanism by configuring retry_policy on individual nodes. For permanent failures, route to a human escalation node that interrupts the workflow and waits for manual intervention.

What is the performance overhead of LangGraph compared to calling the LLM directly?

The graph execution overhead is negligible — microseconds per node transition. The real cost is checkpointing: writing state to PostgreSQL adds 5-15ms per node execution. For workflows where each node involves an LLM call (200-2000ms), this overhead is invisible. For high-throughput workflows with many lightweight nodes, consider batching checkpoint writes or using an in-memory checkpointer for non-critical workflows.


#LangGraph #LangChain #AgentWorkflows #StateMachine #Python #AIAgents #HumanInTheLoop #MultiStepAI

Share
C

Written by

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.