Background Tasks
FastAPI's BackgroundTasks runs work after the response is sent. Celery + Redis handles large-scale distributed task queues.
BackgroundTasks — Lightweight Background Work
from fastapi import FastAPI, BackgroundTasks, Depends
import time
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
def send_welcome_email(email: str, username: str) -> None:
"""Email sending simulation (use SMTP/SendGrid in production)"""
time.sleep(2)
logger.info(f"[Email] Welcome email sent to {email}")
print(f"✉️ Welcome email sent to {email} ({username})")
def log_access(user_id: int, action: str) -> None:
"""Log access"""
logger.info(f"[Audit] user={user_id} action={action}")
def generate_report(report_id: str, data: dict) -> None:
"""Heavy report generation"""
time.sleep(5)
print(f"[Report] {report_id} complete: {data}")
@app.post("/users/register")
async def register_user(
username: str,
email: str,
background_tasks: BackgroundTasks,
):
# Return response immediately
user = {"id": 1, "username": username, "email": email}
# These run in the background after the response is sent
background_tasks.add_task(send_welcome_email, email, username)
background_tasks.add_task(log_access, user["id"], "register")
return {"message": "Registration complete", "user": user}
@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
background_tasks: BackgroundTasks,
):
background_tasks.add_task(log_access, user_id, "delete")
return {"message": f"User {user_id} deleted"}
Celery + Redis — Distributed Task Queue
pip install celery redis
# Run Redis: docker run -d -p 6379:6379 redis
# celery_app.py
from celery import Celery
import time
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0", # task queue
backend="redis://localhost:6379/1", # result storage
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
task_track_started=True,
)
@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, to: str, subject: str, body: str) -> dict:
"""Retryable email task"""
try:
time.sleep(1) # simulate SMTP
print(f"Email sent: {to}")
return {"status": "sent", "to": to}
except Exception as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@celery_app.task
def process_image(image_path: str, width: int, height: int) -> str:
"""Image resize (CPU-bound)"""
time.sleep(3)
print(f"Image processed: {image_path} → {width}x{height}")
return f"{image_path}_resized"
# main.py — FastAPI + Celery integration
from fastapi import FastAPI
from celery_app import send_email_task, process_image
from celery.result import AsyncResult
app = FastAPI()
@app.post("/send-email")
def queue_email(to: str, subject: str, body: str):
"""Queue email task in Celery"""
task = send_email_task.delay(to, subject, body)
return {
"task_id": task.id,
"status": "queued",
"message": "Email queued for delivery",
}
@app.get("/tasks/{task_id}")
def get_task_status(task_id: str):
"""Check task status"""
result = AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
}
# Run Celery worker
celery -A celery_app worker --loglevel=info
# Monitoring (Flower)
pip install flower
celery -A celery_app flower # http://localhost:5555
Comparison
BackgroundTasks
✅ No setup needed, simple tasks
✅ Runs immediately after response
❌ Lost if process restarts
❌ No retry, scheduling
→ Email sending, logging, cache updates
Celery + Redis
✅ Task persistence, retries, priority
✅ Distributed processing, scheduling (celery beat)
✅ Monitoring (Flower)
❌ Requires Redis/RabbitMQ infrastructure
→ Large image processing, scheduled reports, payment processing
Summary
| Tool | Best For |
|---|---|
BackgroundTasks | Lightweight, short-lived (logging, notifications) |
Celery | Heavy, long-running, retry-required |
celery beat | Scheduled recurring tasks |
Flower | Real-time Celery monitoring |
Use background tasks to keep API responses fast while processing heavy work asynchronously.