Task Scheduling
Automate recurring tasks using schedule and APScheduler, including cron integration.
Installation
pip install schedule apscheduler
schedule — Simple In-Process Scheduler
import schedule
import time
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
logger = logging.getLogger(__name__)
# Define job functions
def send_daily_report():
logger.info("Starting daily report delivery")
# ... actual work
logger.info("Done")
def backup_database():
logger.info(f"DB backup started: {datetime.now()}")
# ... backup logic
logger.info("DB backup complete")
def health_check():
logger.info("Running health check")
# Register schedules
schedule.every().day.at("09:00").do(send_daily_report) # Daily at 09:00
schedule.every().monday.at("08:00").do(backup_database) # Every Monday
schedule.every(30).minutes.do(health_check) # Every 30 minutes
schedule.every(5).seconds.do(lambda: print("ping")) # Every 5s (for testing)
# Run at a specific time
schedule.every().day.at("18:00").do(send_daily_report)
# Run loop
while True:
schedule.run_pending()
time.sleep(1)
schedule — Advanced Patterns
import schedule
import time
import functools
from typing import Callable
# Error-catching decorator
def catch_exceptions(job_func: Callable) -> Callable:
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except Exception as e:
logger.error(f"Job failed [{job_func.__name__}]: {e}")
return wrapper
@catch_exceptions
def risky_job():
raise ValueError("Test error")
# One-shot job
def run_once():
logger.info("Running one time only")
return schedule.CancelJob # Returning this removes the job
schedule.every().day.at("12:00").do(run_once)
# Cancel a job
job = schedule.every(10).seconds.do(health_check)
schedule.cancel_job(job)
# Manage jobs with tags
schedule.every().day.do(send_daily_report).tag("report", "daily")
schedule.every().week.do(backup_database).tag("backup")
# Cancel by tag
schedule.clear("backup")
# Check next run time
for job in schedule.get_jobs():
print(f"{job.job_func.__name__}: next run {job.next_run}")
APScheduler — Production-Ready Scheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta
import pytz
TZ = pytz.timezone("America/New_York")
# ── Job functions ───────────────────────────────────────
def generate_report(report_type: str):
print(f"[{datetime.now(TZ)}] Generating {report_type} report")
def cleanup_temp_files():
print("Cleaning up temp files")
def send_notification(user_id: int, message: str):
print(f"Notification → user {user_id}: {message}")
# ── Scheduler configuration ─────────────────────────────
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///jobs.db"), # Persists across restarts
}
executors = {
"default": ThreadPoolExecutor(max_workers=10),
}
job_defaults = {
"coalesce": True, # Merge missed runs into one
"max_instances": 1, # Limit concurrent executions
"misfire_grace_time": 60, # Allow up to 60s delay
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=TZ,
)
# ── Trigger types ───────────────────────────────────────
# 1. Cron trigger (complex schedules)
scheduler.add_job(
generate_report,
CronTrigger(hour=9, minute=0, day_of_week="mon-fri"),
args=["daily"],
id="daily_report",
name="Weekday Daily Report",
replace_existing=True,
)
# 2. Interval trigger (fixed intervals)
scheduler.add_job(
cleanup_temp_files,
IntervalTrigger(hours=6),
id="cleanup",
name="Temp File Cleanup",
)
# 3. Date trigger (one-time at a specific moment)
scheduler.add_job(
send_notification,
DateTrigger(run_date=datetime.now(TZ) + timedelta(seconds=10)),
args=[42, "System maintenance complete"],
id="one_time_notify",
)
# ── Start / Stop ─────────────────────────────────────────
scheduler.start()
try:
import time
while True:
time.sleep(60)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
print("Scheduler stopped")
APScheduler — FastAPI Integration
from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import httpx
async def fetch_exchange_rates():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/rates")
data = response.json()
print(f"Exchange rates updated: {data}")
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start scheduler when app starts
scheduler = AsyncIOScheduler()
scheduler.add_job(
fetch_exchange_rates,
CronTrigger(hour="*", minute=0), # Every hour on the hour
id="exchange_rates",
)
scheduler.start()
yield # App is running
# Stop scheduler when app shuts down
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
@app.get("/jobs")
async def list_jobs():
return {"message": "Scheduler running"}
cron Integration (System Level)
# Edit crontab
crontab -e
# Format: min hour day month weekday command
# ┌──────── min (0-59)
# │ ┌────── hour (0-23)
# │ │ ┌──── day (1-31)
# │ │ │ ┌── month (1-12)
# │ │ │ │ ┌ weekday (0=Sun, 6=Sat)
# │ │ │ │ │
0 9 * * 1-5 /usr/bin/python3 /app/scripts/daily_report.py >> /var/log/report.log 2>&1
0 * * * * /usr/bin/python3 /app/scripts/health_check.py
0 2 * * 0 /usr/bin/python3 /app/scripts/weekly_backup.py
# Python script for cron (scripts/daily_report.py)
#!/usr/bin/env python3
import sys
import logging
from pathlib import Path
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
logging.basicConfig(
filename="/var/log/daily_report.log",
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
def main():
logger.info("=== Daily report started ===")
try:
from app.reports import generate_daily_report
generate_daily_report()
logger.info("=== Completed ===")
sys.exit(0)
except Exception as e:
logger.error(f"Failed: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()
Summary
| Tool | Use Case |
|---|---|
schedule | Simple in-process scheduling |
APScheduler | Production environments, job persistence, FastAPI integration |
cron | System-level, OS scheduler |
- Simple scripts →
schedule - Inside a web server →
APScheduler(BackgroundScheduler / AsyncIOScheduler) - Standalone scripts →
cron+ Python script