Skip to main content
Advertisement

Task Scheduling

Automate recurring tasks using schedule and APScheduler, including cron integration.


Installation

pip install schedule apscheduler

schedule — Simple In-Process Scheduler

import schedule
import time
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
logger = logging.getLogger(__name__)


# Define job functions
def send_daily_report():
logger.info("Starting daily report delivery")
# ... actual work
logger.info("Done")


def backup_database():
logger.info(f"DB backup started: {datetime.now()}")
# ... backup logic
logger.info("DB backup complete")


def health_check():
logger.info("Running health check")


# Register schedules
schedule.every().day.at("09:00").do(send_daily_report) # Daily at 09:00
schedule.every().monday.at("08:00").do(backup_database) # Every Monday
schedule.every(30).minutes.do(health_check) # Every 30 minutes
schedule.every(5).seconds.do(lambda: print("ping")) # Every 5s (for testing)

# Run at a specific time
schedule.every().day.at("18:00").do(send_daily_report)

# Run loop
while True:
schedule.run_pending()
time.sleep(1)

schedule — Advanced Patterns

import schedule
import time
import functools
from typing import Callable


# Error-catching decorator
def catch_exceptions(job_func: Callable) -> Callable:
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except Exception as e:
logger.error(f"Job failed [{job_func.__name__}]: {e}")
return wrapper


@catch_exceptions
def risky_job():
raise ValueError("Test error")


# One-shot job
def run_once():
logger.info("Running one time only")
return schedule.CancelJob # Returning this removes the job


schedule.every().day.at("12:00").do(run_once)

# Cancel a job
job = schedule.every(10).seconds.do(health_check)
schedule.cancel_job(job)

# Manage jobs with tags
schedule.every().day.do(send_daily_report).tag("report", "daily")
schedule.every().week.do(backup_database).tag("backup")

# Cancel by tag
schedule.clear("backup")

# Check next run time
for job in schedule.get_jobs():
print(f"{job.job_func.__name__}: next run {job.next_run}")

APScheduler — Production-Ready Scheduler

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta
import pytz

TZ = pytz.timezone("America/New_York")


# ── Job functions ───────────────────────────────────────
def generate_report(report_type: str):
print(f"[{datetime.now(TZ)}] Generating {report_type} report")


def cleanup_temp_files():
print("Cleaning up temp files")


def send_notification(user_id: int, message: str):
print(f"Notification → user {user_id}: {message}")


# ── Scheduler configuration ─────────────────────────────
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///jobs.db"), # Persists across restarts
}
executors = {
"default": ThreadPoolExecutor(max_workers=10),
}
job_defaults = {
"coalesce": True, # Merge missed runs into one
"max_instances": 1, # Limit concurrent executions
"misfire_grace_time": 60, # Allow up to 60s delay
}

scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=TZ,
)

# ── Trigger types ───────────────────────────────────────

# 1. Cron trigger (complex schedules)
scheduler.add_job(
generate_report,
CronTrigger(hour=9, minute=0, day_of_week="mon-fri"),
args=["daily"],
id="daily_report",
name="Weekday Daily Report",
replace_existing=True,
)

# 2. Interval trigger (fixed intervals)
scheduler.add_job(
cleanup_temp_files,
IntervalTrigger(hours=6),
id="cleanup",
name="Temp File Cleanup",
)

# 3. Date trigger (one-time at a specific moment)
scheduler.add_job(
send_notification,
DateTrigger(run_date=datetime.now(TZ) + timedelta(seconds=10)),
args=[42, "System maintenance complete"],
id="one_time_notify",
)

# ── Start / Stop ─────────────────────────────────────────
scheduler.start()

try:
import time
while True:
time.sleep(60)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
print("Scheduler stopped")

APScheduler — FastAPI Integration

from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import httpx


async def fetch_exchange_rates():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/rates")
data = response.json()
print(f"Exchange rates updated: {data}")


@asynccontextmanager
async def lifespan(app: FastAPI):
# Start scheduler when app starts
scheduler = AsyncIOScheduler()
scheduler.add_job(
fetch_exchange_rates,
CronTrigger(hour="*", minute=0), # Every hour on the hour
id="exchange_rates",
)
scheduler.start()

yield # App is running

# Stop scheduler when app shuts down
scheduler.shutdown()


app = FastAPI(lifespan=lifespan)


@app.get("/jobs")
async def list_jobs():
return {"message": "Scheduler running"}

cron Integration (System Level)

# Edit crontab
crontab -e

# Format: min hour day month weekday command
# ┌──────── min (0-59)
# │ ┌────── hour (0-23)
# │ │ ┌──── day (1-31)
# │ │ │ ┌── month (1-12)
# │ │ │ │ ┌ weekday (0=Sun, 6=Sat)
# │ │ │ │ │
0 9 * * 1-5 /usr/bin/python3 /app/scripts/daily_report.py >> /var/log/report.log 2>&1
0 * * * * /usr/bin/python3 /app/scripts/health_check.py
0 2 * * 0 /usr/bin/python3 /app/scripts/weekly_backup.py
# Python script for cron (scripts/daily_report.py)
#!/usr/bin/env python3
import sys
import logging
from pathlib import Path

# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))

logging.basicConfig(
filename="/var/log/daily_report.log",
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)


def main():
logger.info("=== Daily report started ===")
try:
from app.reports import generate_daily_report
generate_daily_report()
logger.info("=== Completed ===")
sys.exit(0)
except Exception as e:
logger.error(f"Failed: {e}", exc_info=True)
sys.exit(1)


if __name__ == "__main__":
main()

Summary

ToolUse Case
scheduleSimple in-process scheduling
APSchedulerProduction environments, job persistence, FastAPI integration
cronSystem-level, OS scheduler
  • Simple scriptsschedule
  • Inside a web serverAPScheduler (BackgroundScheduler / AsyncIOScheduler)
  • Standalone scriptscron + Python script
Advertisement