스케줄링
schedule, APScheduler로 반복 작업을 자동화하고 cron 연동까지 다룹니다.
설치
pip install schedule apscheduler
schedule — 단순 스케줄러
import schedule
import time
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
logger = logging.getLogger(__name__)
# 작업 함수 정의
def send_daily_report():
logger.info("일일 보고서 발송 시작")
# ... 실제 작업
logger.info("완료")
def backup_database():
logger.info(f"DB 백업 시작: {datetime.now()}")
# ... 백업 로직
logger.info("DB 백업 완료")
def health_check():
logger.info("헬스 체크 실행")
# 스케줄 등록
schedule.every().day.at("09:00").do(send_daily_report) # 매일 09:00
schedule.every().monday.at("08:00").do(backup_database) # 매주 월요일
schedule.every(30).minutes.do(health_check) # 30분마다
schedule.every(5).seconds.do(lambda: print("ping")) # 5초마다 (테스트용)
# 특정 시간대 실행
schedule.every().day.at("18:00").do(send_daily_report)
# 실행 루프
while True:
schedule.run_pending()
time.sleep(1)
schedule — 고급 패턴
import schedule
import time
import functools
from typing import Callable
# 에러 잡기 데코레이터
def catch_exceptions(job_func: Callable) -> Callable:
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except Exception as e:
logger.error(f"작업 실패 [{job_func.__name__}]: {e}")
return wrapper
@catch_exceptions
def risky_job():
raise ValueError("테스트 에러")
# 한 번만 실행 (one-shot)
def run_once():
logger.info("한 번만 실행")
return schedule.CancelJob # 반환하면 스케줄에서 제거
schedule.every().day.at("12:00").do(run_once)
# 작업 취소
job = schedule.every(10).seconds.do(health_check)
schedule.cancel_job(job)
# 태그로 관리
schedule.every().day.do(send_daily_report).tag("report", "daily")
schedule.every().week.do(backup_database).tag("backup")
# 태그별 취소
schedule.clear("backup")
# 다음 실행 시각 확인
for job in schedule.get_jobs():
print(f"{job.job_func.__name__}: 다음 실행 {job.next_run}")
APScheduler — 운영 환경용
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from datetime import datetime, timedelta
import pytz
KST = pytz.timezone("Asia/Seoul")
# ── 작업 함수 ──────────────────────────────────────────
def generate_report(report_type: str):
print(f"[{datetime.now(KST)}] {report_type} 보고서 생성")
def cleanup_temp_files():
import shutil
print("임시 파일 정리")
def send_notification(user_id: int, message: str):
print(f"알림 발송 → 사용자 {user_id}: {message}")
# ── 스케줄러 설정 ───────────────────────────────────────
jobstores = {
"default": SQLAlchemyJobStore(url="sqlite:///jobs.db"), # 재시작 후에도 유지
}
executors = {
"default": ThreadPoolExecutor(max_workers=10),
}
job_defaults = {
"coalesce": True, # 밀린 작업 하나로 합치기
"max_instances": 1, # 동시 실행 제한
"misfire_grace_time": 60, # 1분 이내 지연은 허용
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=KST,
)
# ── 트리거 종류 ─────────────────────────────────────────
# 1. Cron 트리거 (복잡한 주기)
scheduler.add_job(
generate_report,
CronTrigger(hour=9, minute=0, day_of_week="mon-fri"),
args=["daily"],
id="daily_report",
name="평일 일일 보고서",
replace_existing=True,
)
# 2. Interval 트리거 (고정 간격)
scheduler.add_job(
cleanup_temp_files,
IntervalTrigger(hours=6),
id="cleanup",
name="임시 파일 정리",
)
# 3. Date 트리거 (특정 시각 1회)
scheduler.add_job(
send_notification,
DateTrigger(run_date=datetime.now(KST) + timedelta(seconds=10)),
args=[42, "시스템 점검 완료"],
id="one_time_notify",
)
# ── 시작 / 종료 ─────────────────────────────────────────
scheduler.start()
try:
import time
while True:
time.sleep(60)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
print("스케줄러 종료")
APScheduler — FastAPI 연동
from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
import httpx
async def fetch_exchange_rates():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/rates")
data = response.json()
print(f"환율 업데이트: {data}")
@asynccontextmanager
async def lifespan(app: FastAPI):
# 앱 시작 시 스케줄러 시작
scheduler = AsyncIOScheduler()
scheduler.add_job(
fetch_exchange_rates,
CronTrigger(hour="*", minute=0), # 매 시 정각
id="exchange_rates",
)
scheduler.start()
yield # 앱 실행 중
# 앱 종료 시 스케줄러 정지
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
@app.get("/jobs")
async def list_jobs():
scheduler = app.state.scheduler if hasattr(app.state, "scheduler") else None
return {"message": "스케줄러 실행 중"}
cron 연동 (시스템 레벨)
# crontab 편집
crontab -e
# 형식: 분 시 일 월 요일 명령어
# ┌──────── 분 (0-59)
# │ ┌────── 시 (0-23)
# │ │ ┌──── 일 (1-31)
# │ │ │ ┌── 월 (1-12)
# │ │ │ │ ┌ 요일 (0=일, 6=토)
# │ │ │ │ │
0 9 * * 1-5 /usr/bin/python3 /app/scripts/daily_report.py >> /var/log/report.log 2>&1
0 * * * * /usr/bin/python3 /app/scripts/health_check.py
0 2 * * 0 /usr/bin/python3 /app/scripts/weekly_backup.py
# cron에서 실행될 Python 스크립트 (scripts/daily_report.py)
#!/usr/bin/env python3
import sys
import logging
from pathlib import Path
# 프로젝트 루트를 경로에 추가
sys.path.insert(0, str(Path(__file__).parent.parent))
logging.basicConfig(
filename="/var/log/daily_report.log",
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
def main():
logger.info("=== 일일 보고서 시작 ===")
try:
# 실제 작업
from app.reports import generate_daily_report
generate_daily_report()
logger.info("=== 완료 ===")
sys.exit(0)
except Exception as e:
logger.error(f"실패: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
main()
정리
| 도구 | 용도 |
|---|---|
schedule | 간단한 인프로세스 스케줄링 |
APScheduler | 운영 환경, Job 영속성, FastAPI 연동 |
cron | 시스템 레벨, OS 스케줄러 |
- 단순 스크립트 →
schedule - 웹 서버 내 작업 →
APScheduler(BackgroundScheduler / AsyncIOScheduler) - 독립 실행 스크립트 →
cron+ Python 스크립트