본문으로 건너뛰기
Advertisement

Celery 비동기 작업

Django + Celery 조합은 이메일 발송, 보고서 생성, 정기 작업 등 시간이 걸리는 작업을 백그라운드에서 처리합니다.


설치와 설정

pip install celery redis django-celery-results django-celery-beat
# myproject/celery.py
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings.development")

app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")

# Django 앱의 tasks.py 자동 검색
app.autodiscover_tasks()
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)
# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "django-db" # DB에 결과 저장

INSTALLED_APPS = [
...
"django_celery_results",
"django_celery_beat",
]

CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"

CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Seoul"
CELERY_TASK_TRACK_STARTED = True

태스크 정의

# users/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
import time

logger = get_task_logger(__name__)
User = get_user_model()


# 기본 태스크
@shared_task
def send_welcome_email(user_id: int) -> str:
user = User.objects.get(pk=user_id)
send_mail(
subject="환영합니다!",
message=f"안녕하세요 {user.name}님, 가입을 환영합니다.",
from_email="noreply@example.com",
recipient_list=[user.email],
)
return f"이메일 발송 완료: {user.email}"


# 재시도 설정
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60, # 60초 후 재시도
autoretry_for=(Exception,),
retry_backoff=True, # 지수 백오프
)
def process_payment(self, order_id: int) -> dict:
logger.info(f"결제 처리 시작: order={order_id}")
try:
# 결제 API 호출 시뮬레이션
time.sleep(1)
return {"order_id": order_id, "status": "paid"}
except ConnectionError as exc:
logger.warning(f"결제 연결 실패, 재시도 중... (시도 {self.request.retries + 1})")
raise self.retry(exc=exc)


# 진행 상태 업데이트
@shared_task(bind=True)
def generate_report(self, user_id: int) -> str:
total_steps = 5
for step in range(1, total_steps + 1):
time.sleep(1)
self.update_state(
state="PROGRESS",
meta={"current": step, "total": total_steps, "step": f"단계 {step} 처리 중"},
)

return f"사용자 {user_id} 보고서 생성 완료"

Beat 스케줄러 — 정기 작업

# products/tasks.py
from celery import shared_task
from django.utils import timezone
from .models import Product


@shared_task
def cleanup_inactive_products() -> int:
"""30일 이상 비활성 상품 삭제"""
threshold = timezone.now() - timezone.timedelta(days=30)
count, _ = Product.objects.filter(
is_active=False,
updated_at__lt=threshold,
).delete()
return count


@shared_task
def update_stock_alerts() -> list:
"""재고 부족 상품 알림"""
low_stock = Product.objects.filter(
is_active=True,
stock__lt=10,
).values_list("id", "name", "stock")
return list(low_stock)


@shared_task
def send_daily_report() -> str:
from django.contrib.auth import get_user_model
User = get_user_model()
total_users = User.objects.count()
total_products = Product.objects.filter(is_active=True).count()
return f"일일 리포트: 사용자 {total_users}명, 활성 상품 {total_products}개"
# settings.py — 정적 스케줄 설정
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
# 매일 자정 비활성 상품 정리
"cleanup-inactive-products": {
"task": "products.tasks.cleanup_inactive_products",
"schedule": crontab(hour=0, minute=0),
},
# 매시간 재고 체크
"check-stock-alerts": {
"task": "products.tasks.update_stock_alerts",
"schedule": crontab(minute=0),
},
# 매일 오전 8시 일일 보고서
"daily-report": {
"task": "products.tasks.send_daily_report",
"schedule": crontab(hour=8, minute=0),
},
# 매 10분마다
"periodic-task": {
"task": "some.task",
"schedule": 600, # 초 단위
},
}

태스크 호출과 상태 확인

# views.py
from rest_framework.decorators import api_view
from rest_framework.response import Response
from celery.result import AsyncResult
from .tasks import send_welcome_email, generate_report


@api_view(["POST"])
def register_user(request):
# ... 사용자 생성 ...
user = ...

# 비동기 태스크 예약
send_welcome_email.delay(user.id)

# 지연 실행 (60초 후)
# send_welcome_email.apply_async(args=[user.id], countdown=60)

# 특정 시간에 실행
# from datetime import datetime, timedelta
# eta = datetime.utcnow() + timedelta(hours=1)
# send_welcome_email.apply_async(args=[user.id], eta=eta)

return Response({"message": "회원가입 완료"})


@api_view(["POST"])
def start_report(request):
task = generate_report.delay(request.user.id)
return Response({"task_id": task.id})


@api_view(["GET"])
def task_status(request, task_id: str):
result = AsyncResult(task_id)
return Response({
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
"info": result.info,
})

실행 명령

# Celery 워커 실행
celery -A myproject worker --loglevel=info

# Beat 스케줄러 실행 (별도 프로세스)
celery -A myproject beat --loglevel=info

# 워커 + Beat 동시 실행 (개발용)
celery -A myproject worker --beat --loglevel=info

# 모니터링 (Flower)
pip install flower
celery -A myproject flower --port=5555

정리

구성 요소역할
@shared_taskDjango 앱 독립적 태스크 정의
task.delay(args)즉시 태스크 예약
task.apply_async지연·예약 실행
bind=Trueself 접근 (재시도, 상태 업데이트)
celery beat정기 스케줄 실행
CELERY_BEAT_SCHEDULE정적 스케줄 설정
DatabaseSchedulerAdmin에서 스케줄 관리
Flower실시간 모니터링 웹 UI

Celery Beat의 DatabaseScheduler를 사용하면 Django Admin에서 스케줄을 동적으로 추가·수정할 수 있습니다.

Advertisement