Skip to main content
Advertisement

Background Tasks

FastAPI's BackgroundTasks runs work after the response is sent. Celery + Redis handles large-scale distributed task queues.


BackgroundTasks — Lightweight Background Work

from fastapi import FastAPI, BackgroundTasks, Depends
import time
import logging

app = FastAPI()
logger = logging.getLogger(__name__)


def send_welcome_email(email: str, username: str) -> None:
"""Email sending simulation (use SMTP/SendGrid in production)"""
time.sleep(2)
logger.info(f"[Email] Welcome email sent to {email}")
print(f"✉️ Welcome email sent to {email} ({username})")


def log_access(user_id: int, action: str) -> None:
"""Log access"""
logger.info(f"[Audit] user={user_id} action={action}")


def generate_report(report_id: str, data: dict) -> None:
"""Heavy report generation"""
time.sleep(5)
print(f"[Report] {report_id} complete: {data}")


@app.post("/users/register")
async def register_user(
username: str,
email: str,
background_tasks: BackgroundTasks,
):
# Return response immediately
user = {"id": 1, "username": username, "email": email}

# These run in the background after the response is sent
background_tasks.add_task(send_welcome_email, email, username)
background_tasks.add_task(log_access, user["id"], "register")

return {"message": "Registration complete", "user": user}


@app.delete("/users/{user_id}")
async def delete_user(
user_id: int,
background_tasks: BackgroundTasks,
):
background_tasks.add_task(log_access, user_id, "delete")
return {"message": f"User {user_id} deleted"}

Celery + Redis — Distributed Task Queue

pip install celery redis
# Run Redis: docker run -d -p 6379:6379 redis
# celery_app.py
from celery import Celery
import time

celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0", # task queue
backend="redis://localhost:6379/1", # result storage
)

celery_app.conf.update(
task_serializer="json",
result_serializer="json",
task_track_started=True,
)


@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, to: str, subject: str, body: str) -> dict:
"""Retryable email task"""
try:
time.sleep(1) # simulate SMTP
print(f"Email sent: {to}")
return {"status": "sent", "to": to}
except Exception as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)


@celery_app.task
def process_image(image_path: str, width: int, height: int) -> str:
"""Image resize (CPU-bound)"""
time.sleep(3)
print(f"Image processed: {image_path}{width}x{height}")
return f"{image_path}_resized"
# main.py — FastAPI + Celery integration
from fastapi import FastAPI
from celery_app import send_email_task, process_image
from celery.result import AsyncResult

app = FastAPI()


@app.post("/send-email")
def queue_email(to: str, subject: str, body: str):
"""Queue email task in Celery"""
task = send_email_task.delay(to, subject, body)
return {
"task_id": task.id,
"status": "queued",
"message": "Email queued for delivery",
}


@app.get("/tasks/{task_id}")
def get_task_status(task_id: str):
"""Check task status"""
result = AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None,
}
# Run Celery worker
celery -A celery_app worker --loglevel=info

# Monitoring (Flower)
pip install flower
celery -A celery_app flower # http://localhost:5555

Comparison

BackgroundTasks
✅ No setup needed, simple tasks
✅ Runs immediately after response
❌ Lost if process restarts
❌ No retry, scheduling
→ Email sending, logging, cache updates

Celery + Redis
✅ Task persistence, retries, priority
✅ Distributed processing, scheduling (celery beat)
✅ Monitoring (Flower)
❌ Requires Redis/RabbitMQ infrastructure
→ Large image processing, scheduled reports, payment processing

Summary

ToolBest For
BackgroundTasksLightweight, short-lived (logging, notifications)
CeleryHeavy, long-running, retry-required
celery beatScheduled recurring tasks
FlowerReal-time Celery monitoring

Use background tasks to keep API responses fast while processing heavy work asynchronously.

Advertisement