Skip to content
Learn Agentic AI10 min read0 views

Scheduled Agent Tasks: Cron Jobs, Recurring Analysis, and Periodic Reports

Learn how to schedule AI agent tasks with cron expressions, implement idempotent recurring analyses, prevent overlapping runs, and build periodic reporting pipelines that run reliably in production.

Why Agents Need Schedules

Not all agent work is triggered by user requests. Many valuable agent applications run on schedules: daily market analysis reports, hourly anomaly detection on server logs, weekly customer churn predictions, and monthly compliance audits. These are autonomous agents that operate on a clock rather than a prompt.

Building scheduled agent tasks correctly requires handling cron expressions, ensuring idempotency (running the same job twice produces the same result), and preventing overlapping runs when a job takes longer than the schedule interval.

APScheduler: The Python Scheduling Library

APScheduler (Advanced Python Scheduler) provides cron-like scheduling with support for multiple backends:

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import asyncio

scheduler = AsyncIOScheduler()

async def daily_market_analysis():
    """Agent task: analyze market data and produce a report."""
    print("Starting daily market analysis...")
    data = await fetch_market_data()
    analysis = await run_llm_analysis(data)
    await store_report(analysis)
    print("Market analysis complete.")

# Run every day at 6:00 AM UTC
scheduler.add_job(
    daily_market_analysis,
    CronTrigger(hour=6, minute=0, timezone="UTC"),
    id="daily_market_analysis",
    name="Daily Market Analysis",
    replace_existing=True,
)

scheduler.start()
asyncio.get_event_loop().run_forever()

The replace_existing=True parameter ensures that restarting the process does not create duplicate job entries.

Understanding Cron Expressions

Cron expressions define schedules using five fields: minute, hour, day-of-month, month, and day-of-week. Here are common patterns for agent tasks:

# Every 15 minutes — real-time monitoring agent
CronTrigger(minute="*/15")

# Every weekday at 9 AM — morning briefing agent
CronTrigger(hour=9, minute=0, day_of_week="mon-fri")

# First day of every month at midnight — monthly compliance audit
CronTrigger(day=1, hour=0, minute=0)

# Every Sunday at 11 PM — weekly churn prediction
CronTrigger(day_of_week="sun", hour=23, minute=0)

# Every 6 hours — periodic data refresh
CronTrigger(hour="*/6", minute=0)

Ensuring Idempotency

If your scheduler fires the same job twice (due to a restart, clock skew, or retry), the job must produce the same result without side effects. Use an idempotency key based on the schedule window:

See AI Voice Agents Handle Real Calls

Book a free demo or calculate how much you can save with AI voice automation.

from datetime import datetime, timezone
import hashlib

def get_idempotency_key(job_name: str, window: str) -> str:
    """Generate a unique key for this job's execution window."""
    # window could be "2026-03-17" for daily, "2026-03-17T06" for hourly
    raw = f"{job_name}:{window}"
    return hashlib.sha256(raw.encode()).hexdigest()[:16]

async def idempotent_job(job_name: str, execute_fn):
    """Run a job only if it has not already completed for this window."""
    today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
    key = get_idempotency_key(job_name, today)

    if await check_completed(key):
        print(f"Job {job_name} already completed for {today}, skipping.")
        return

    try:
        result = await execute_fn()
        await mark_completed(key, result)
    except Exception as e:
        await mark_failed(key, str(e))
        raise

The check_completed and mark_completed functions should use a persistent store like Redis or a database table. This ensures that even if the process crashes and restarts, the job does not re-execute for the same window.

Preventing Overlapping Runs

When a job takes longer than its schedule interval, the scheduler might fire a second instance while the first is still running. Use a distributed lock to prevent this:

import redis.asyncio as redis

class SchedulerLock:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client

    async def acquire(self, job_name: str, ttl_seconds: int = 3600) -> bool:
        """Try to acquire a lock. Returns True if successful."""
        lock_key = f"agent_lock:{job_name}"
        acquired = await self.redis.set(
            lock_key, "locked", ex=ttl_seconds, nx=True
        )
        return acquired is not None

    async def release(self, job_name: str):
        lock_key = f"agent_lock:{job_name}"
        await self.redis.delete(lock_key)

# Usage in a scheduled job
lock = SchedulerLock(redis.from_url("redis://localhost:6379/0"))

async def protected_job():
    if not await lock.acquire("daily_analysis", ttl_seconds=7200):
        print("Previous run still in progress, skipping.")
        return

    try:
        await run_analysis()
    finally:
        await lock.release("daily_analysis")

The TTL on the lock acts as a safety valve. If the worker crashes without releasing the lock, it automatically expires after the TTL, allowing the next scheduled run to proceed.

Complete Scheduled Agent Example

Putting it all together — a production-ready scheduled agent with idempotency and overlap prevention:

async def build_weekly_report():
    """Complete scheduled agent: weekly churn analysis."""
    week = datetime.now(timezone.utc).strftime("%Y-W%W")
    key = get_idempotency_key("churn_report", week)

    if await check_completed(key):
        return

    if not await lock.acquire("churn_report"):
        return

    try:
        customers = await fetch_customer_metrics()
        churn_risks = await analyze_churn_with_llm(customers)
        report = await generate_report(churn_risks)
        await send_report_email(report, recipients=["team@company.com"])
        await mark_completed(key, {"customers_analyzed": len(customers)})
    finally:
        await lock.release("churn_report")

scheduler.add_job(
    build_weekly_report,
    CronTrigger(day_of_week="mon", hour=7, minute=0, timezone="UTC"),
    id="weekly_churn_report",
    replace_existing=True,
)

FAQ

How do I handle timezone issues with scheduled agent tasks?

Always store and schedule in UTC internally. Convert to local timezones only for display. APScheduler accepts a timezone parameter on triggers. If your report must arrive at "9 AM New York time," use CronTrigger(hour=9, timezone="America/New_York") — APScheduler handles DST transitions automatically.

What happens when a scheduled job fails? Should it retry automatically?

It depends on the job type. For idempotent jobs (like report generation), automatic retries are safe — just schedule a retry after a delay. For non-idempotent jobs (like sending notifications), log the failure and alert an operator. APScheduler supports misfire_grace_time which controls how late a misfired job can still run, and you can add retry decorators to the job function itself.

How do I monitor whether scheduled agent tasks are actually running?

Implement a heartbeat pattern. Each job writes a "last_run" timestamp to a monitoring store after completion. A separate health check compares the last_run timestamp against the expected schedule. If a daily job has not run in 25 hours, trigger an alert. Services like Cronitor or Healthchecks.io can receive pings from your jobs and alert on missed runs.


#Scheduling #CronJobs #PeriodicTasks #Idempotency #Python #AgenticAI #LearnAI #AIEngineering

Share this article
C

CallSphere Team

Expert insights on AI voice agents and customer communication automation.

Try CallSphere AI Voice Agents

See how AI voice agents work for your industry. Live demo available -- no signup required.