Apache Airflow for AI Agent Scheduling: DAG-Based Workflow Management
Learn how to orchestrate AI agent workflows with Apache Airflow. Covers DAG design patterns, custom operators for LLM calls, XCom data passing, sensors, and scheduling strategies.
Airflow and AI Agents: A Natural Fit for Batch Workflows
Apache Airflow is the most widely deployed workflow orchestration platform, used by thousands of companies to schedule and monitor data pipelines. Its DAG-based model maps naturally to AI agent workflows that run on schedules — nightly report generation, periodic data analysis, scheduled content creation, and batch inference pipelines.
Airflow excels at scheduled, batch-oriented agent work. If your agent needs to run every night at 2 AM, process yesterday's data, generate a report, and email it to stakeholders, Airflow is a battle-tested choice.
Designing a DAG for an AI Agent
A DAG (Directed Acyclic Graph) defines the dependency structure of your workflow. Each node is a task, and edges define execution order.
from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
from datetime import timedelta
default_args = {
"owner": "ai-team",
"retries": 3,
"retry_delay": timedelta(minutes=2),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
"execution_timeout": timedelta(minutes=10),
}
with DAG(
dag_id="daily_research_agent",
default_args=default_args,
description="Daily research agent that summarizes industry news",
schedule_interval="0 6 * * *", # 6 AM daily
start_date=days_ago(1),
catchup=False,
tags=["ai-agent", "research"],
) as dag:
pass # Tasks defined below
Building Tasks with the TaskFlow API
Airflow 2.x introduced the TaskFlow API, which lets you define tasks as decorated Python functions — much cleaner than the older operator-based approach.
import openai
import json
@task(retries=3, retry_delay=timedelta(seconds=30))
def gather_news(topic: str) -> list[dict]:
"""Fetch recent news articles on a topic."""
import requests
response = requests.get(
"https://newsapi.org/v2/everything",
params={
"q": topic,
"sortBy": "publishedAt",
"pageSize": 10,
"apiKey": "{{ var.value.news_api_key }}",
},
timeout=30,
)
response.raise_for_status()
articles = response.json()["articles"]
return [
{"title": a["title"], "description": a["description"]}
for a in articles
]
@task(retries=2, retry_delay=timedelta(seconds=60))
def summarize_articles(articles: list[dict]) -> str:
"""Use an LLM to summarize the collected articles."""
client = openai.OpenAI()
articles_text = "\n".join(
f"- {a['title']}: {a['description']}" for a in articles
)
response = client.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": "Summarize these news articles into a brief digest.",
},
{"role": "user", "content": articles_text},
],
temperature=0.3,
)
return response.choices[0].message.content
@task
def format_report(summary: str, topic: str) -> str:
"""Format the summary as an HTML email report."""
return f"""
<h2>Daily {topic} Digest</h2>
<p>{summary}</p>
<hr>
<small>Generated by AI Research Agent</small>
"""
@task
def send_report(report: str) -> None:
"""Send the report via email."""
from airflow.utils.email import send_email
send_email(
to=["team@company.com"],
subject="Daily AI Research Digest",
html_content=report,
)
Wiring the DAG
with DAG(
dag_id="daily_research_agent",
default_args=default_args,
schedule_interval="0 6 * * *",
start_date=days_ago(1),
catchup=False,
tags=["ai-agent", "research"],
) as dag:
topic = "artificial intelligence agents"
articles = gather_news(topic)
summary = summarize_articles(articles)
report = format_report(summary, topic)
send_report(report)
Data flows between tasks automatically via XComs (cross-communications). Each task's return value is serialized and stored in the Airflow metadata database, then deserialized as the input to downstream tasks.
See AI Voice Agents Handle Real Calls
Book a free demo or calculate how much you can save with AI voice automation.
Custom Operators for LLM Calls
For reusable LLM integration, build a custom operator:
from airflow.models import BaseOperator
class LLMOperator(BaseOperator):
def __init__(
self,
prompt_template: str,
model: str = "gpt-4",
temperature: float = 0.3,
**kwargs,
):
super().__init__(**kwargs)
self.prompt_template = prompt_template
self.model = model
self.temperature = temperature
def execute(self, context):
prompt = self.prompt_template.format(**context["params"])
client = openai.OpenAI()
response = client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=self.temperature,
)
result = response.choices[0].message.content
self.log.info(f"LLM returned {len(result)} characters")
return result
Sensors for Event-Driven Triggers
Sensors wait for an external condition before proceeding. Use them to trigger agent workflows when new data arrives.
from airflow.sensors.filesystem import FileSensor
wait_for_data = FileSensor(
task_id="wait_for_upload",
filepath="/data/uploads/latest.csv",
poke_interval=60,
timeout=3600,
mode="reschedule", # Free the worker slot while waiting
)
FAQ
Is Airflow suitable for real-time AI agent workflows?
Airflow is designed for batch scheduling, not real-time execution. Its minimum practical scheduling interval is about one minute, and DAG parsing adds overhead. For real-time or event-driven agent workflows, consider Temporal, Inngest, or a custom solution. Airflow works best for agents that run on a schedule.
How do I handle large XCom payloads from LLM responses?
By default, XComs are stored in the Airflow metadata database, which is not designed for large payloads. For LLM responses exceeding a few kilobytes, configure a remote XCom backend using S3, GCS, or a custom backend that stores payloads externally and passes references through XCom.
Can I run multiple agent DAGs concurrently?
Yes. Airflow's scheduler manages concurrency at the DAG level, task level, and pool level. Use the max_active_runs parameter on the DAG to control how many instances run simultaneously, and use Airflow pools to limit concurrent LLM API calls across all DAGs.
#ApacheAirflow #DAG #WorkflowScheduling #AIAgents #Python #AgenticAI #LearnAI #AIEngineering
CallSphere Team
Expert insights on AI voice agents and customer communication automation.
Try CallSphere AI Voice Agents
See how AI voice agents work for your industry. Live demo available -- no signup required.