Skip to content
Learn Agentic AI14 min read0 views

Apache Airflow for AI Agent Scheduling: DAG-Based Workflow Management

Learn how to orchestrate AI agent workflows with Apache Airflow. Covers DAG design patterns, custom operators for LLM calls, XCom data passing, sensors, and scheduling strategies.

Airflow and AI Agents: A Natural Fit for Batch Workflows

Apache Airflow is the most widely deployed workflow orchestration platform, used by thousands of companies to schedule and monitor data pipelines. Its DAG-based model maps naturally to AI agent workflows that run on schedules — nightly report generation, periodic data analysis, scheduled content creation, and batch inference pipelines.

Airflow excels at scheduled, batch-oriented agent work. If your agent needs to run every night at 2 AM, process yesterday's data, generate a report, and email it to stakeholders, Airflow is a battle-tested choice.

Designing a DAG for an AI Agent

A DAG (Directed Acyclic Graph) defines the dependency structure of your workflow. Each node is a task, and edges define execution order.

from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
from datetime import timedelta

default_args = {
    "owner": "ai-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=2),
    "retry_exponential_backoff": True,
    "max_retry_delay": timedelta(minutes=30),
    "execution_timeout": timedelta(minutes=10),
}

with DAG(
    dag_id="daily_research_agent",
    default_args=default_args,
    description="Daily research agent that summarizes industry news",
    schedule_interval="0 6 * * *",  # 6 AM daily
    start_date=days_ago(1),
    catchup=False,
    tags=["ai-agent", "research"],
) as dag:
    pass  # Tasks defined below

Building Tasks with the TaskFlow API

Airflow 2.x introduced the TaskFlow API, which lets you define tasks as decorated Python functions — much cleaner than the older operator-based approach.

import openai
import json

@task(retries=3, retry_delay=timedelta(seconds=30))
def gather_news(topic: str) -> list[dict]:
    """Fetch recent news articles on a topic."""
    import requests
    response = requests.get(
        "https://newsapi.org/v2/everything",
        params={
            "q": topic,
            "sortBy": "publishedAt",
            "pageSize": 10,
            "apiKey": "{{ var.value.news_api_key }}",
        },
        timeout=30,
    )
    response.raise_for_status()
    articles = response.json()["articles"]
    return [
        {"title": a["title"], "description": a["description"]}
        for a in articles
    ]

@task(retries=2, retry_delay=timedelta(seconds=60))
def summarize_articles(articles: list[dict]) -> str:
    """Use an LLM to summarize the collected articles."""
    client = openai.OpenAI()
    articles_text = "\n".join(
        f"- {a['title']}: {a['description']}" for a in articles
    )
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {
                "role": "system",
                "content": "Summarize these news articles into a brief digest.",
            },
            {"role": "user", "content": articles_text},
        ],
        temperature=0.3,
    )
    return response.choices[0].message.content

@task
def format_report(summary: str, topic: str) -> str:
    """Format the summary as an HTML email report."""
    return f"""
    <h2>Daily {topic} Digest</h2>
    <p>{summary}</p>
    <hr>
    <small>Generated by AI Research Agent</small>
    """

@task
def send_report(report: str) -> None:
    """Send the report via email."""
    from airflow.utils.email import send_email
    send_email(
        to=["team@company.com"],
        subject="Daily AI Research Digest",
        html_content=report,
    )

Wiring the DAG

with DAG(
    dag_id="daily_research_agent",
    default_args=default_args,
    schedule_interval="0 6 * * *",
    start_date=days_ago(1),
    catchup=False,
    tags=["ai-agent", "research"],
) as dag:
    topic = "artificial intelligence agents"
    articles = gather_news(topic)
    summary = summarize_articles(articles)
    report = format_report(summary, topic)
    send_report(report)

Data flows between tasks automatically via XComs (cross-communications). Each task's return value is serialized and stored in the Airflow metadata database, then deserialized as the input to downstream tasks.

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

Custom Operators for LLM Calls

For reusable LLM integration, build a custom operator:

from airflow.models import BaseOperator

class LLMOperator(BaseOperator):
    def __init__(
        self,
        prompt_template: str,
        model: str = "gpt-4",
        temperature: float = 0.3,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.prompt_template = prompt_template
        self.model = model
        self.temperature = temperature

    def execute(self, context):
        prompt = self.prompt_template.format(**context["params"])
        client = openai.OpenAI()
        response = client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=self.temperature,
        )
        result = response.choices[0].message.content
        self.log.info(f"LLM returned {len(result)} characters")
        return result

Sensors for Event-Driven Triggers

Sensors wait for an external condition before proceeding. Use them to trigger agent workflows when new data arrives.

from airflow.sensors.filesystem import FileSensor

wait_for_data = FileSensor(
    task_id="wait_for_upload",
    filepath="/data/uploads/latest.csv",
    poke_interval=60,
    timeout=3600,
    mode="reschedule",  # Free the worker slot while waiting
)

FAQ

Is Airflow suitable for real-time AI agent workflows?

Airflow is designed for batch scheduling, not real-time execution. Its minimum practical scheduling interval is about one minute, and DAG parsing adds overhead. For real-time or event-driven agent workflows, consider Temporal, Inngest, or a custom solution. Airflow works best for agents that run on a schedule.

How do I handle large XCom payloads from LLM responses?

By default, XComs are stored in the Airflow metadata database, which is not designed for large payloads. For LLM responses exceeding a few kilobytes, configure a remote XCom backend using S3, GCS, or a custom backend that stores payloads externally and passes references through XCom.

Can I run multiple agent DAGs concurrently?

Yes. Airflow's scheduler manages concurrency at the DAG level, task level, and pool level. Use the max_active_runs parameter on the DAG to control how many instances run simultaneously, and use Airflow pools to limit concurrent LLM API calls across all DAGs.


#ApacheAirflow #DAG #WorkflowScheduling #AIAgents #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.