Class Methods

Examples of frequently used methods:

main.py
# Gets an estimate of the count of documents in a collection using collection metadata.
count: int = await User.estimated_document_count()

# Gets an estimate of the count of documents in a collection using collection metadata.
q_filter = {"first_name": "John"}
count: int = await User.count_documents(q_filter)

# Runs an aggregation framework pipeline.
from bson.bson import BSON
pipeline = [
    {"$unwind": "$tags"},
    {"$group": {"_id": "$tags", "count": {"$sum": 1}}},
    {"$sort": BSON([("count", -1), ("_id", -1)])},
]
docs = await User.aggregate(pipeline)

# Finds the distinct values for a specified field across a single collection.
q_filter = "key_name"
values = await User.distinct(q_filter)

# Get collection name.
name = await User.collection_name()

# The full name is of the form database_name.collection_name.
name = await User.collection_full_name()

# Get AsyncBatabase for the current Model.
database = await User.database()

# Get AsyncCollection for the current Model.
collection = await User.collection()

# Find a single document.
q_filter = {"email": "John_Smith@gmail.com"}
mongo_doc = await User.find_one(q_filter)

# Create object instance from Mongo document.
q_filter = {"email": "John_Smith@gmail.com"}
mongo_doc = await User.find_one(q_filter)
user = User.from_mongo_doc(mongo_doc)

# Find a single document and converting to raw document.
q_filter = {"email": "John_Smith@gmail.com"}
raw_doc = await User.find_one_to_raw_doc(q_filter)

# Find a single document and convert it to a Model instance.
q_filter = {"email": "John_Smith@gmail.com"}
user = await User.find_one_to_instance(q_filter)

# Find a single document and convert it to a JSON string.
q_filter = {"email": "John_Smith@gmail.com"}
json = await User.find_one_to_json(q_filter)

# Find a single document and delete it.
q_filter = {"email": "John_Smith@gmail.com"}
delete_result = await User.delete_one(q_filter)

# Find a single document and delete it, return original.
q_filter = {"email": "John_Smith@gmail.com"}
mongo_doc = await User.find_one_and_delete(q_filter)

# Find documents.
q_filter = {"first_name": "John"}
mongo_docs = await User.find_many(q_filter)

# Find documents and convert to a raw documents.
q_filter = {"first_name": "John"}
raw_docs = await User.find_many_to_raw_docs(q_filter)

# Find documents and convert to a json string.
q_filter = {"email": "John_Smith@gmail.com"}
json = await User.find_many_to_json(q_filter)

# Find documents matching with Model.
q_filter = {"email": "John_Smith@gmail.com"}
delete_result = await User.delete_many(q_filter)

# Creates an index on this collection.
from pymongo import ASCENDING
keys = [("email", ASCENDING)]
result: str = await User.create_index(keys, name="idx_email")

# Drops the specified index on this collection.
User.drop_index("idx_email")

# Create one or more indexes on this collection.
from pymongo import ASCENDING, DESCENDING
index_1 = IndexModel([("username", DESCENDING), ("email", ASCENDING)], name="idx_username_email")
index_2 = IndexModel([("first_name", DESCENDING)], name="idx_first_name")
result: list[str] = await User.create_indexes([index_1, index_2])

# Drops all indexes on this collection.
User.drop_index()

# Get information on this collection’s indexes.
result = await User.index_information()

# Get a cursor over the index documents for this collection.
async for index in await User.list_indexes():
    print(index)

# Units Management.
# Management for `choices` parameter in dynamic field types.
# Units are stored in a separate collection.
from ramifice import Unit
unit = Unit(
  field="field_name",  # The name of the dynamic field.
  title={"en": "Title", "ru": "Заголовок"},  # The name of the choice item.
  value="Some text ...",  # The value of the choice item.
                          # Hint: float | int | str
  is_delete=False, # True - if you need to remove the item of choice.
                   # by default = False (add item to choice)
)
await User.unit_manager(unit)