Skip to content

Update documents

Update documents

main.py
"""Update one or more documents matching the filter.

The search is based on the effect of a quantum loop.
The search effectiveness depends on the number of processor threads.
"""

import anyio
from datetime import datetime
from zoneinfo import ZoneInfo
from typing import Annotated
from pydantic import EmailStr, Field
from pydantic_extra_types.phone_numbers import PhoneNumber, PhoneNumberValidator
from scruby import Scruby, ScrubyModel, ScrubyConfig

ScrubyConfig.db_root = "ScrubyDB"  # By default = "ScrubyDB"
ScrubyConfig.HASH_REDUCE_LEFT = 6  # By default = 6
ScrubyConfig.max_workers = None  # By default = None
ScrubyConfig.plugins = []  # By default = []


class User(ScrubyModel):
    """User model."""
    first_name: str = Field(strict=True)
    last_name: str = Field(strict=True)
    birthday: datetime = Field(strict=True)
    email: EmailStr = Field(strict=True)
    phone: Annotated[PhoneNumber, PhoneNumberValidator(number_format="E164")] = Field(frozen=True)
    # key is always at bottom
    key: str = Field(
        strict=True,
        frozen=True,
        default_factory=lambda data: data["phone"],
    )


async def main() -> None:
    """Example."""
    # Get collection `User`.
    user_coll = await Scruby.collection(User)

    # Create users.
    for num in range(1, 10):
        user = User(
            first_name="John",
            last_name="Smith",
            birthday=datetime(1970, 1, num, tzinfo=ZoneInfo("UTC")),
            email=f"John_Smith_{num}@gmail.com",
            phone=f"+44798612345{num}",
        )
        await db.add_doc(user)

    number_updated_users = await user_coll.update_many(
        new_data={"first_name": "Georg"},
    )
    print(number_updated_users)  # => 9

    users: list[User] | None = await user_coll.find_many()
    for user in users:
        print(user.first_name)  # => Georg

    # Full database deletion.
    # Hint: The main purpose is tests.
    Scruby.napalm()


if __name__ == "__main__":
    anyio.run(main)