Celery 비동기 작업
Django + Celery 조합은 이메일 발송, 보고서 생성, 정기 작업 등 시간이 걸리는 작업을 백그라운드에서 처리합니다.
설치와 설정
pip install celery redis django-celery-results django-celery-beat
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings.development")
app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")
# Django 앱의 tasks.py 자동 검색
app.autodiscover_tasks()
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)
# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "django-db" # DB에 결과 저장
INSTALLED_APPS = [
...
"django_celery_results",
"django_celery_beat",
]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Seoul"
CELERY_TASK_TRACK_STARTED = True
태스크 정의
# users/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
import time
logger = get_task_logger(__name__)
User = get_user_model()
# 기본 태스크
@shared_task
def send_welcome_email(user_id: int) -> str:
user = User.objects.get(pk=user_id)
send_mail(
subject="환영합니다!",
message=f"안녕하세요 {user.name}님, 가입을 환영합니다.",
from_email="noreply@example.com",
recipient_list=[user.email],
)
return f"이메일 발송 완료: {user.email}"
# 재시도 설정
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60, # 60초 후 재시도
autoretry_for=(Exception,),
retry_backoff=True, # 지수 백오프
)
def process_payment(self, order_id: int) -> dict:
logger.info(f"결제 처리 시작: order={order_id}")
try:
# 결제 API 호출 시뮬레이션
time.sleep(1)
return {"order_id": order_id, "status": "paid"}
except ConnectionError as exc:
logger.warning(f"결제 연결 실패, 재시도 중... (시도 {self.request.retries + 1})")
raise self.retry(exc=exc)
# 진행 상태 업데이트
@shared_task(bind=True)
def generate_report(self, user_id: int) -> str:
total_steps = 5
for step in range(1, total_steps + 1):
time.sleep(1)
self.update_state(
state="PROGRESS",
meta={"current": step, "total": total_steps, "step": f"단계 {step} 처리 중"},
)
return f"사용자 {user_id} 보고서 생성 완료"
Beat 스케줄러 — 정기 작업
# products/tasks.py
from celery import shared_task
from django.utils import timezone
from .models import Product
@shared_task
def cleanup_inactive_products() -> int:
"""30일 이상 비활성 상품 삭제"""
threshold = timezone.now() - timezone.timedelta(days=30)
count, _ = Product.objects.filter(
is_active=False,
updated_at__lt=threshold,
).delete()
return count
@shared_task
def update_stock_alerts() -> list:
"""재고 부족 상품 알림"""
low_stock = Product.objects.filter(
is_active=True,
stock__lt=10,
).values_list("id", "name", "stock")
return list(low_stock)
@shared_task
def send_daily_report() -> str:
from django.contrib.auth import get_user_model
User = get_user_model()
total_users = User.objects.count()
total_products = Product.objects.filter(is_active=True).count()
return f"일일 리포트: 사용자 {total_users}명, 활성 상품 {total_products}개"
# settings.py — 정적 스케줄 설정
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
# 매일 자정 비활성 상품 정리
"cleanup-inactive-products": {
"task": "products.tasks.cleanup_inactive_products",
"schedule": crontab(hour=0, minute=0),
},
# 매시간 재고 체크
"check-stock-alerts": {
"task": "products.tasks.update_stock_alerts",
"schedule": crontab(minute=0),
},
# 매일 오전 8시 일일 보고서
"daily-report": {
"task": "products.tasks.send_daily_report",
"schedule": crontab(hour=8, minute=0),
},
# 매 10분마다
"periodic-task": {
"task": "some.task",
"schedule": 600, # 초 단위
},
}
태스크 호출과 상태 확인
# views.py
from rest_framework.decorators import api_view
from rest_framework.response import Response
from celery.result import AsyncResult
from .tasks import send_welcome_email, generate_report
@api_view(["POST"])
def register_user(request):
# ... 사용자 생성 ...
user = ...
# 비동기 태스크 예약
send_welcome_email.delay(user.id)
# 지연 실행 (60초 후)
# send_welcome_email.apply_async(args=[user.id], countdown=60)
# 특정 시간에 실행
# from datetime import datetime, timedelta
# eta = datetime.utcnow() + timedelta(hours=1)
# send_welcome_email.apply_async(args=[user.id], eta=eta)
return Response({"message": "회원가입 완료"})
@api_view(["POST"])
def start_report(request):
task = generate_report.delay(request.user.id)
return Response({"task_id": task.id})
@api_view(["GET"])
def task_status(request, task_id: str):
result = AsyncResult(task_id)
return Response({
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
"info": result.info,
})
실행 명령
# Celery 워커 실행
celery -A myproject worker --loglevel=info
# Beat 스케줄러 실행 (별도 프로세스)
celery -A myproject beat --loglevel=info
# 워커 + Beat 동시 실행 (개발용)
celery -A myproject worker --beat --loglevel=info
# 모니터링 (Flower)
pip install flower
celery -A myproject flower --port=5555
정리
| 구성 요소 | 역할 |
|---|---|
@shared_task | Django 앱 독립적 태스크 정의 |
task.delay(args) | 즉시 태스크 예약 |
task.apply_async | 지연·예약 실행 |
bind=True | self 접근 (재시도, 상태 업데이트) |
celery beat | 정기 스케줄 실행 |
CELERY_BEAT_SCHEDULE | 정적 스케줄 설정 |
DatabaseScheduler | Admin에서 스케줄 관리 |
Flower | 실시간 모니터링 웹 UI |
Celery Beat의 DatabaseScheduler를 사용하면 Django Admin에서 스케줄을 동적으로 추가·수정할 수 있습니다.