Skip to main content
Advertisement

MongoDB Integration

MongoDB is a document-oriented NoSQL database. It features flexible schemas and horizontal scalability.


Installation

pip install pymongo           # synchronous
pip install motor # async (asyncio-based)
pip install "pymongo[srv]" # for MongoDB Atlas (DNS SRV) connection

PyMongo — Synchronous Client

from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.collection import Collection
from bson import ObjectId
from datetime import datetime

# Connect
client = MongoClient("mongodb://localhost:27017/")
db = client["myapp"]
products_col: Collection = db["products"]

# Atlas connection
# client = MongoClient("mongodb+srv://user:pass@cluster.mongodb.net/")

Basic CRUD

# CREATE
result = products_col.insert_one({
"name": "Python Book",
"price": 35000,
"stock": 100,
"tags": ["python", "programming"],
"category": {"id": 1, "name": "Books"},
"created_at": datetime.utcnow(),
})
product_id = result.inserted_id # ObjectId

# Insert multiple documents
result = products_col.insert_many([
{"name": "FastAPI Guide", "price": 28000},
{"name": "Django Pro", "price": 32000},
])


# READ
# Single document
product = products_col.find_one({"_id": ObjectId(product_id)})
product = products_col.find_one({"name": "Python Book"})

# Multiple documents
all_products = list(products_col.find({"stock": {"$gt": 0}}))

# Field selection (projection)
names_only = list(products_col.find({}, {"name": 1, "price": 1, "_id": 0}))

# Sort + pagination
page = list(
products_col.find({"is_active": True})
.sort("created_at", DESCENDING)
.skip(20)
.limit(20)
)

# Count
total = products_col.count_documents({"is_active": True})


# UPDATE
# Update single document
products_col.update_one(
{"_id": ObjectId(product_id)},
{"$set": {"price": 38000, "updated_at": datetime.utcnow()}}
)

# Increment/decrement numbers
products_col.update_one(
{"_id": ObjectId(product_id)},
{"$inc": {"stock": -1}}
)

# Array operations
products_col.update_one(
{"_id": ObjectId(product_id)},
{"$push": {"tags": "bestseller"}} # add to array
)
products_col.update_one(
{"_id": ObjectId(product_id)},
{"$pull": {"tags": "draft"}} # remove from array
)

# Update multiple documents
products_col.update_many(
{"category.name": "Books"},
{"$set": {"discount": 0.1}}
)


# DELETE
products_col.delete_one({"_id": ObjectId(product_id)})
products_col.delete_many({"is_active": False, "stock": 0})

Query Operators

# Comparison
products_col.find({"price": {"$gte": 10000, "$lte": 50000}})
products_col.find({"stock": {"$gt": 0}})
products_col.find({"category.name": {"$ne": "Electronics"}})

# Logic
# AND (default)
products_col.find({"is_active": True, "stock": {"$gt": 0}})

# OR
products_col.find({"$or": [{"price": {"$lt": 10000}}, {"tags": "free"}]})

# NOT
products_col.find({"price": {"$not": {"$gt": 100000}}})

# IN / NIN
products_col.find({"tags": {"$in": ["python", "backend"]}})
products_col.find({"category.name": {"$nin": ["Deleted", "Hidden"]}})

# Regex
import re
products_col.find({"name": {"$regex": re.compile("python", re.IGNORECASE)}})

# Array queries
products_col.find({"tags": "python"}) # array contains value
products_col.find({"tags": {"$all": ["python", "api"]}}) # contains all
products_col.find({"tags": {"$size": 3}}) # array length

Aggregation Pipeline

# Statistics by category
pipeline = [
{"$match": {"is_active": True}}, # filter
{"$group": {
"_id": "$category.name", # group key
"count": {"$sum": 1}, # count
"avg_price": {"$avg": "$price"}, # average
"total_stock": {"$sum": "$stock"}, # sum
"max_price": {"$max": "$price"}, # max
}},
{"$sort": {"count": -1}}, # sort
{"$limit": 10}, # top 10
]

results = list(products_col.aggregate(pipeline))


# Join ($lookup)
pipeline = [
{"$lookup": {
"from": "reviews",
"localField": "_id",
"foreignField": "product_id",
"as": "reviews",
}},
{"$addFields": {
"review_count": {"$size": "$reviews"},
"avg_rating": {"$avg": "$reviews.rating"},
}},
{"$project": {"reviews": 0}}, # exclude reviews array
]


# Unwind (array → individual documents)
pipeline = [
{"$unwind": "$tags"}, # flatten tags array
{"$group": {"_id": "$tags", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
]

Index Management

# Single index
products_col.create_index("name")
products_col.create_index([("price", ASCENDING)])

# Compound index
products_col.create_index([("category.name", ASCENDING), ("is_active", ASCENDING)])

# Unique index
products_col.create_index("sku", unique=True)

# TTL index (auto-expiration)
logs_col.create_index("created_at", expireAfterSeconds=86400) # delete after 24 hours

# Text index (full-text search)
products_col.create_index([("name", "text"), ("description", "text")])
results = list(products_col.find({"$text": {"$search": "python api"}}))

# List indexes
print(list(products_col.list_indexes()))

Motor — Async Client

import motor.motor_asyncio

client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017/")
db = client["myapp"]
products_col = db["products"]


async def get_product(product_id: str) -> dict | None:
return await products_col.find_one({"_id": ObjectId(product_id)})


async def get_products(skip: int = 0, limit: int = 20) -> list[dict]:
cursor = products_col.find({"is_active": True}).skip(skip).limit(limit)
return await cursor.to_list(length=limit)


async def create_product(data: dict) -> str:
data["created_at"] = datetime.utcnow()
result = await products_col.insert_one(data)
return str(result.inserted_id)


# Async aggregation
async def get_category_stats() -> list[dict]:
pipeline = [
{"$group": {"_id": "$category.name", "count": {"$sum": 1}}},
{"$sort": {"count": -1}},
]
cursor = products_col.aggregate(pipeline)
return await cursor.to_list(length=None)

Summary

ConceptDescription
CollectionDocument container equivalent to a table
DocumentJSON document equivalent to a row
$set, $incUpdate operators
$match, $groupAggregation pipeline stages
$lookupCollection join
create_index()Performance optimization index
Motorasyncio-based async driver

MongoDB is best suited for services requiring unstructured data, rapid development, and horizontal scaling.

Advertisement