Celery Async Tasks
The Django + Celery combination handles time-consuming tasks like email sending, report generation, and scheduled jobs in the background.
Installation and Setup
pip install celery redis django-celery-results django-celery-beat
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings.development")
app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")
# Auto-discover tasks.py in Django apps
app.autodiscover_tasks()
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)
# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "django-db" # Store results in DB
INSTALLED_APPS = [
...
"django_celery_results",
"django_celery_beat",
]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Seoul"
CELERY_TASK_TRACK_STARTED = True
Task Definition
# users/tasks.py
from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
import time
logger = get_task_logger(__name__)
User = get_user_model()
# Basic task
@shared_task
def send_welcome_email(user_id: int) -> str:
user = User.objects.get(pk=user_id)
send_mail(
subject="Welcome!",
message=f"Hello {user.name}, welcome to our service.",
from_email="noreply@example.com",
recipient_list=[user.email],
)
return f"Email sent: {user.email}"
# Retry configuration
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60, # retry after 60 seconds
autoretry_for=(Exception,),
retry_backoff=True, # exponential backoff
)
def process_payment(self, order_id: int) -> dict:
logger.info(f"Payment processing started: order={order_id}")
try:
# Simulate payment API call
time.sleep(1)
return {"order_id": order_id, "status": "paid"}
except ConnectionError as exc:
logger.warning(f"Payment connection failed, retrying... (attempt {self.request.retries + 1})")
raise self.retry(exc=exc)
# Progress state updates
@shared_task(bind=True)
def generate_report(self, user_id: int) -> str:
total_steps = 5
for step in range(1, total_steps + 1):
time.sleep(1)
self.update_state(
state="PROGRESS",
meta={"current": step, "total": total_steps, "step": f"Processing step {step}"},
)
return f"Report generated for user {user_id}"
Beat Scheduler — Scheduled Tasks
# products/tasks.py
from celery import shared_task
from django.utils import timezone
from .models import Product
@shared_task
def cleanup_inactive_products() -> int:
"""Delete products inactive for 30+ days"""
threshold = timezone.now() - timezone.timedelta(days=30)
count, _ = Product.objects.filter(
is_active=False,
updated_at__lt=threshold,
).delete()
return count
@shared_task
def update_stock_alerts() -> list:
"""Alert for low-stock products"""
low_stock = Product.objects.filter(
is_active=True,
stock__lt=10,
).values_list("id", "name", "stock")
return list(low_stock)
@shared_task
def send_daily_report() -> str:
from django.contrib.auth import get_user_model
User = get_user_model()
total_users = User.objects.count()
total_products = Product.objects.filter(is_active=True).count()
return f"Daily report: {total_users} users, {total_products} active products"
# settings.py — static schedule configuration
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
# Clean up inactive products at midnight daily
"cleanup-inactive-products": {
"task": "products.tasks.cleanup_inactive_products",
"schedule": crontab(hour=0, minute=0),
},
# Check stock every hour
"check-stock-alerts": {
"task": "products.tasks.update_stock_alerts",
"schedule": crontab(minute=0),
},
# Daily report at 8 AM
"daily-report": {
"task": "products.tasks.send_daily_report",
"schedule": crontab(hour=8, minute=0),
},
# Every 10 minutes
"periodic-task": {
"task": "some.task",
"schedule": 600, # seconds
},
}
Task Invocation and Status Check
# views.py
from rest_framework.decorators import api_view
from rest_framework.response import Response
from celery.result import AsyncResult
from .tasks import send_welcome_email, generate_report
@api_view(["POST"])
def register_user(request):
# ... create user ...
user = ...
# Schedule async task immediately
send_welcome_email.delay(user.id)
# Delayed execution (after 60 seconds)
# send_welcome_email.apply_async(args=[user.id], countdown=60)
# Execute at specific time
# from datetime import datetime, timedelta
# eta = datetime.utcnow() + timedelta(hours=1)
# send_welcome_email.apply_async(args=[user.id], eta=eta)
return Response({"message": "Registration complete"})
@api_view(["POST"])
def start_report(request):
task = generate_report.delay(request.user.id)
return Response({"task_id": task.id})
@api_view(["GET"])
def task_status(request, task_id: str):
result = AsyncResult(task_id)
return Response({
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
"info": result.info,
})
Run Commands
# Start Celery worker
celery -A myproject worker --loglevel=info
# Start Beat scheduler (separate process)
celery -A myproject beat --loglevel=info
# Run worker + Beat together (development)
celery -A myproject worker --beat --loglevel=info
# Monitoring (Flower)
pip install flower
celery -A myproject flower --port=5555
Summary
| Component | Role |
|---|---|
@shared_task | Define task independent of Django app |
task.delay(args) | Schedule task immediately |
task.apply_async | Delayed/scheduled execution |
bind=True | Access self (retry, state updates) |
celery beat | Run periodic schedules |
CELERY_BEAT_SCHEDULE | Static schedule configuration |
DatabaseScheduler | Manage schedules via Admin |
Flower | Real-time monitoring web UI |
Using Celery Beat's DatabaseScheduler lets you dynamically add and modify schedules from Django Admin.