MongoDB Integration
MongoDB is a document-oriented NoSQL database. It features flexible schemas and horizontal scalability.
Installation
pip install pymongo # synchronous
pip install motor # async (asyncio-based)
pip install "pymongo[srv]" # for MongoDB Atlas (DNS SRV) connection
PyMongo — Synchronous Client
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.collection import Collection
from bson import ObjectId
from datetime import datetime
# Connect
client = MongoClient("mongodb://localhost:27017/")
db = client["myapp"]
products_col: Collection = db["products"]
# Atlas connection
# client = MongoClient("mongodb+srv://user:pass@cluster.mongodb.net/")
Basic CRUD
# CREATE
result = products_col.insert_one({
"name": "Python Book",
"price": 35000,
"stock": 100,
"tags": ["python", "programming"],
"category": {"id": 1, "name": "Books"},
"created_at": datetime.utcnow(),
})
product_id = result.inserted_id # ObjectId
# Insert multiple documents
result = products_col.insert_many([
{"name": "FastAPI Guide", "price": 28000},
{"name": "Django Pro", "price": 32000},
])
# READ
# Single document
product = products_col.find_one({"_id": ObjectId(product_id)})
product = products_col.find_one({"name": "Python Book"})
# Multiple documents
all_products = list(products_col.find({"stock": {"$gt": 0}}))
# Field selection (projection)
names_only = list(products_col.find({}, {"name": 1, "price": 1, "_id": 0}))
# Sort + pagination
page = list(
products_col.find({"is_active": True})
.sort("created_at", DESCENDING)
.skip(20)
.limit(20)
)
# Count
total = products_col.count_documents({"is_active": True})
# UPDATE
# Update single document
products_col.update_one(
{"_id": ObjectId(product_id)},
{"$set": {"price": 38000, "updated_at": datetime.utcnow()}}
)
# Increment/decrement numbers
products_col.update_one(
{"_id": ObjectId(product_id)},
{"$inc": {"stock": -1}}
)
# Array operations
products_col.update_one(
{"_id": ObjectId(product_id)},
{"$push": {"tags": "bestseller"}} # add to array
)
products_col.update_one(
{"_id": ObjectId(product_id)},
{"$pull": {"tags": "draft"}} # remove from array
)
# Update multiple documents
products_col.update_many(
{"category.name": "Books"},
{"$set": {"discount": 0.1}}
)
# DELETE
products_col.delete_one({"_id": ObjectId(product_id)})
products_col.delete_many({"is_active": False, "stock": 0})
Query Operators
# Comparison
products_col.find({"price": {"$gte": 10000, "$lte": 50000}})
products_col.find({"stock": {"$gt": 0}})
products_col.find({"category.name": {"$ne": "Electronics"}})
# Logic
# AND (default)
products_col.find({"is_active": True, "stock": {"$gt": 0}})
# OR
products_col.find({"$or": [{"price": {"$lt": 10000}}, {"tags": "free"}]})
# NOT
products_col.find({"price": {"$not": {"$gt": 100000}}})
# IN / NIN
products_col.find({"tags": {"$in": ["python", "backend"]}})
products_col.find({"category.name": {"$nin": ["Deleted", "Hidden"]}})
# Regex
import re
products_col.find({"name": {"$regex": re.compile("python", re.IGNORECASE)}})
# Array queries
products_col.find({"tags": "python"}) # array contains value
products_col.find({"tags": {"$all": ["python", "api"]}}) # contains all
products_col.find({"tags": {"$size": 3}}) # array length
Aggregation Pipeline
# Statistics by category
pipeline = [
{"$match": {"is_active": True}}, # filter
{"$group": {
"_id": "$category.name", # group key
"count": {"$sum": 1}, # count
"avg_price": {"$avg": "$price"}, # average
"total_stock": {"$sum": "$stock"}, # sum
"max_price": {"$max": "$price"}, # max
}},
{"$sort": {"count": -1}}, # sort
{"$limit": 10}, # top 10
]
results = list(products_col.aggregate(pipeline))
# Join ($lookup)
pipeline = [
{"$lookup": {
"from": "reviews",
"localField": "_id",
"foreignField": "product_id",
"as": "reviews",
}},
{"$addFields": {
"review_count": {"$size": "$reviews"},
"avg_rating": {"$avg": "$reviews.rating"},
}},
{"$project": {"reviews": 0}}, # exclude reviews array
]
# Unwind (array → individual documents)
pipeline = [
{"$unwind": "$tags"}, # flatten tags array
{"$group": {"_id": "$tags", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
]
Index Management
# Single index
products_col.create_index("name")
products_col.create_index([("price", ASCENDING)])
# Compound index
products_col.create_index([("category.name", ASCENDING), ("is_active", ASCENDING)])
# Unique index
products_col.create_index("sku", unique=True)
# TTL index (auto-expiration)
logs_col.create_index("created_at", expireAfterSeconds=86400) # delete after 24 hours
# Text index (full-text search)
products_col.create_index([("name", "text"), ("description", "text")])
results = list(products_col.find({"$text": {"$search": "python api"}}))
# List indexes
print(list(products_col.list_indexes()))
Motor — Async Client
import motor.motor_asyncio
client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017/")
db = client["myapp"]
products_col = db["products"]
async def get_product(product_id: str) -> dict | None:
return await products_col.find_one({"_id": ObjectId(product_id)})
async def get_products(skip: int = 0, limit: int = 20) -> list[dict]:
cursor = products_col.find({"is_active": True}).skip(skip).limit(limit)
return await cursor.to_list(length=limit)
async def create_product(data: dict) -> str:
data["created_at"] = datetime.utcnow()
result = await products_col.insert_one(data)
return str(result.inserted_id)
# Async aggregation
async def get_category_stats() -> list[dict]:
pipeline = [
{"$group": {"_id": "$category.name", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
]
cursor = products_col.aggregate(pipeline)
return await cursor.to_list(length=None)
Summary
| Concept | Description |
|---|---|
| Collection | Document container equivalent to a table |
| Document | JSON document equivalent to a row |
$set, $inc | Update operators |
$match, $group | Aggregation pipeline stages |
$lookup | Collection join |
create_index() | Performance optimization index |
| Motor | asyncio-based async driver |
MongoDB is best suited for services requiring unstructured data, rapid development, and horizontal scaling.