Building AI Pipelines with Directed Acyclic Graphs (DAGs)
A deep technical guide to designing AI and LLM processing pipelines using DAG-based architectures for reliable, observable, and scalable agentic workflows.
Why DAGs Matter for AI Pipelines
Directed Acyclic Graphs (DAGs) have been the backbone of data engineering pipelines for years through tools like Apache Airflow, Prefect, and Dagster. In 2026, the same architectural pattern is proving essential for orchestrating complex AI and LLM workflows where tasks have dependencies, can fail independently, and need to be retried or debugged in isolation.
An AI pipeline modeled as a DAG provides three critical properties that linear chains lack: parallel execution of independent tasks, deterministic ordering of dependent tasks, and granular retry and observability at the node level.
DAG Fundamentals for AI Engineers
A DAG is a finite directed graph with no directed cycles. In practical terms, this means:
- Each node represents a discrete processing step (an LLM call, a retrieval operation, a transformation)
- Edges represent data dependencies between steps
- No circular dependencies are allowed -- the graph always flows forward
- Multiple paths can execute in parallel when there are no shared dependencies
+---> [Embed Query] ---> [Vector Search] ---+
| |
[Parse Input] -----+---> [Extract Entities] ---> [Graph Lookup] +---> [Merge Results] ---> [LLM Synthesis] ---> [Format Output]
| |
+---> [Check Cache] -------------------------+
In this example, three independent retrieval paths execute in parallel after input parsing, then merge their results before the final LLM synthesis step. This pattern is faster and more resilient than a linear chain.
Designing AI Pipeline DAGs
Node Types
Every AI pipeline DAG contains a mix of node types:
| Node Type | Description | Example |
|---|---|---|
| Ingestion | Accept and validate input | Parse user query, validate schema |
| Retrieval | Fetch context from external sources | Vector search, database query, API call |
| Transform | Process or reshape data | Chunk text, extract entities, rerank results |
| LLM Call | Invoke a language model | Generate summary, classify intent, answer question |
| Validation | Check output quality | Hallucination detection, format verification |
| Output | Format and deliver results | Render response, write to database, send webhook |
Defining Dependencies
The key design decision is determining which nodes depend on which. This requires understanding your data flow:
from prefect import flow, task
from prefect.futures import wait
@task(retries=3, retry_delay_seconds=2)
async def parse_input(raw_query: str) -> dict:
"""Validate and structure the incoming query."""
return {"query": raw_query.strip(), "timestamp": time.time()}
@task(retries=2, retry_delay_seconds=5)
async def embed_query(parsed: dict) -> list[float]:
"""Generate embedding vector for the query."""
response = await embedding_client.embed(parsed["query"])
return response.embedding
@task(retries=2)
async def vector_search(embedding: list[float], top_k: int = 10) -> list[dict]:
"""Search vector store for relevant documents."""
results = await vector_store.search(embedding, limit=top_k)
return [{"text": r.text, "score": r.score, "source": r.metadata} for r in results]
@task(retries=2)
async def extract_entities(parsed: dict) -> list[str]:
"""Extract named entities for graph lookup."""
response = await llm_client.messages.create(
model="claude-haiku",
max_tokens=256,
messages=[{"role": "user", "content": f"Extract key entities from: {parsed['query']}"}]
)
return parse_entities(response.content[0].text)
@task(retries=2)
async def graph_lookup(entities: list[str]) -> list[dict]:
"""Query knowledge graph for entity relationships."""
results = []
for entity in entities:
neighbors = await kg_client.get_neighbors(entity, depth=2)
results.extend(neighbors)
return results
@task
async def merge_results(vector_results: list[dict], graph_results: list[dict]) -> str:
"""Combine and deduplicate results from multiple retrieval paths."""
all_results = vector_results + graph_results
deduplicated = deduplicate_by_content(all_results)
return format_context(deduplicated[:20])
@task(retries=2, retry_delay_seconds=10)
async def llm_synthesis(query: str, context: str) -> str:
"""Generate final answer using LLM with retrieved context."""
response = await llm_client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
messages=[{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {query}"}]
)
return response.content[0].text
@flow(name="rag-pipeline")
async def rag_pipeline(raw_query: str) -> str:
parsed = await parse_input(raw_query)
# These three tasks run in parallel -- no dependencies between them
embed_future = embed_query.submit(parsed)
entity_future = extract_entities.submit(parsed)
# vector_search depends on embed_query
embedding = await embed_future
vector_future = vector_search.submit(embedding)
# graph_lookup depends on extract_entities
entities = await entity_future
graph_future = graph_lookup.submit(entities)
# Wait for both retrieval paths to complete
vector_results = await vector_future
graph_results = await graph_future
# Merge and synthesize
context = await merge_results(vector_results, graph_results)
answer = await llm_synthesis(parsed["query"], context)
return answer
Error Handling and Retry Strategies
DAG-based pipelines excel at granular error handling. Each node can have its own retry policy, timeout, and fallback behavior.
Retry Policies by Node Type
- Embedding nodes: Retry 3x with 2-second delay (transient API errors)
- LLM call nodes: Retry 2x with exponential backoff (rate limits, timeouts)
- Database nodes: Retry 3x with 1-second delay (connection pool exhaustion)
- Validation nodes: No retry (deterministic -- either passes or fails)
Circuit Breakers
For production pipelines, implement circuit breakers that stop retrying after a threshold of failures:
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.last_failure_time = 0
self.state = "closed" # closed = normal, open = failing, half-open = testing
async def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
else:
raise CircuitBreakerOpen("Service unavailable")
try:
result = await func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
Observability and Debugging
One of the primary advantages of DAG architectures is observability. Every node execution produces structured telemetry that enables debugging.
Tracing Each Node
import structlog
from opentelemetry import trace
tracer = trace.get_tracer("ai-pipeline")
logger = structlog.get_logger()
async def traced_node(name: str, func, *args, **kwargs):
with tracer.start_as_current_span(name) as span:
start = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start
span.set_attribute("duration_ms", duration * 1000)
span.set_attribute("status", "success")
logger.info("node_completed", node=name, duration_ms=duration * 1000)
return result
except Exception as e:
span.set_attribute("status", "error")
span.set_attribute("error.message", str(e))
logger.error("node_failed", node=name, error=str(e))
raise
Key Metrics to Track
- Node latency: P50, P95, P99 execution time per node
- Pipeline throughput: End-to-end completions per minute
- Error rate by node: Which nodes fail most frequently
- Token usage per LLM node: Cost attribution at the node level
- Parallelism efficiency: Ratio of wall-clock time to total node time
When to Use DAGs vs. Linear Chains
DAG architectures add complexity. Use them when you have:
- Multiple independent retrieval or processing steps that benefit from parallelism
- Varying reliability across steps that need different retry policies
- Cost-sensitive workflows where you want to short-circuit expensive LLM calls
- Observability requirements that demand per-step metrics and tracing
For simple prompt-in / response-out workflows, a linear chain is simpler and sufficient. The DAG pattern pays off as pipeline complexity grows.
Conclusion
DAG-based AI pipelines bring the same reliability and observability that data engineering teams have relied on for years to the world of LLM and agentic workflows. By modeling your AI pipeline as a graph of discrete, typed, independently retryable nodes, you gain parallelism, granular error handling, and deep observability -- all of which are essential for running AI systems in production at scale.
NYC News
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.