Skip to content

Plugin

Scruby-Return-Dict Plugin.

ReturnDict

Bases: ScrubyPlugin

Scruby-Return-Dict Plugin.

In search methods, returns result as dictionaries.

Source code in src/scruby_return_dict/plugin.py
class ReturnDict(ScrubyPlugin):
    """Scruby-Return-Dict Plugin.

    In search methods, returns result as dictionaries.
    """

    def __init__(self, scruby_self: Scruby) -> None:  # noqa: D107
        ScrubyPlugin.__init__(self, scruby_self)

    @final
    @staticmethod
    async def _task_find(
        branch_number: int,
        filter_fn: Callable,
        hash_reduce_left: str,
        db_root: str,
        class_model: Any,
    ) -> list[dict[str, Any]] | None:
        """Task for find documents.

        This method is for internal use.

        Returns:
            List of documents as dictionaries or None.
        """
        branch_number_as_hash: str = f"{branch_number:08x}"[hash_reduce_left:]
        separated_hash: str = "/".join(list(branch_number_as_hash))
        leaf_path = Path(
            *(
                db_root,
                class_model.__name__,
                separated_hash,
                "leaf.json",
            ),
        )
        docs: list[dict[str, Any]] = []
        if await leaf_path.exists():
            data_json: bytes = await leaf_path.read_bytes()
            data: dict[str, str] = orjson.loads(data_json) or {}
            for _, val in data.items():
                doc = class_model.model_validate_json(val)
                if filter_fn(doc):
                    docs.append(doc.model_dump())
        return docs or None

    @final
    async def find_one(
        self,
        filter_fn: Callable,
    ) -> dict[str, Any] | None:
        """Asynchronous method for find one document matching the filter.

        Attention:
            - The search is based on the effect of a quantum loop.
            - The search effectiveness depends on the number of processor threads.

        Args:
            filter_fn (Callable): A function that execute the conditions of filtering.

        Returns:
            One document as dictionary or None.
        """
        # Get Scruby instance
        scruby_self = self.scruby_self()
        # Variable initialization
        search_task_fn: Callable = self._task_find
        branch_numbers: range = range(scruby_self._max_number_branch)
        hash_reduce_left: int = scruby_self._hash_reduce_left
        db_root: str = scruby_self._db_root
        class_model: Any = scruby_self._class_model
        # Run quantum loop
        with concurrent.futures.ThreadPoolExecutor(scruby_self._max_workers) as executor:
            for branch_number in branch_numbers:
                future = executor.submit(
                    search_task_fn,
                    branch_number,
                    filter_fn,
                    hash_reduce_left,
                    db_root,
                    class_model,
                )
                docs = await future.result()
                if docs is not None:
                    return docs[0]
        return None

    @final
    async def find_many(
        self,
        filter_fn: Callable = lambda _: True,
        limit_docs: int = 100,
        page_number: int = 1,
    ) -> list[dict[str, Any]] | None:
        """Asynchronous method for find many documents matching the filter.

        Attention:
            - The search is based on the effect of a quantum loop.
            - The search effectiveness depends on the number of processor threads.

        Args:
            filter_fn (Callable): A function that execute the conditions of filtering.
                                  By default it searches for all documents.
            limit_docs (int): Limiting the number of documents. By default = 100.
            page_number (int): For pagination. By default = 1.
                               Number of documents per page = limit_docs.

        Returns:
            List of documents as dictionaries or None.
        """
        # The `page_number` parameter must not be less than one
        assert page_number > 0, "`find_many` => The `page_number` parameter must not be less than one."
        # Get Scruby instance
        scruby_self = self.scruby_self()
        # Variable initialization
        search_task_fn: Callable = self._task_find
        branch_numbers: range = range(scruby_self._max_number_branch)
        hash_reduce_left: int = scruby_self._hash_reduce_left
        db_root: str = scruby_self._db_root
        class_model: Any = scruby_self._class_model
        counter: int = 0
        number_docs_skippe: int = limit_docs * (page_number - 1) if page_number > 1 else 0
        result: list[dict[str, Any]] = []
        # Run quantum loop
        with concurrent.futures.ThreadPoolExecutor(scruby_self._max_workers) as executor:
            for branch_number in branch_numbers:
                if number_docs_skippe == 0 and counter >= limit_docs:
                    return result[:limit_docs]
                future = executor.submit(
                    search_task_fn,
                    branch_number,
                    filter_fn,
                    hash_reduce_left,
                    db_root,
                    class_model,
                )
                docs = await future.result()
                if docs is not None:
                    for doc in docs:
                        if number_docs_skippe == 0:
                            if counter >= limit_docs:
                                return result[:limit_docs]
                            result.append(doc)
                            counter += 1
                        else:
                            number_docs_skippe -= 1
        return result or None

find_many(filter_fn=lambda _: True, limit_docs=100, page_number=1) async

Asynchronous method for find many documents matching the filter.

Attention
  • The search is based on the effect of a quantum loop.
  • The search effectiveness depends on the number of processor threads.

Parameters:

Name Type Description Default
filter_fn Callable

A function that execute the conditions of filtering. By default it searches for all documents.

lambda _: True
limit_docs int

Limiting the number of documents. By default = 100.

100
page_number int

For pagination. By default = 1. Number of documents per page = limit_docs.

1

Returns:

Type Description
list[dict[str, Any]] | None

List of documents as dictionaries or None.

Source code in src/scruby_return_dict/plugin.py
@final
async def find_many(
    self,
    filter_fn: Callable = lambda _: True,
    limit_docs: int = 100,
    page_number: int = 1,
) -> list[dict[str, Any]] | None:
    """Asynchronous method for find many documents matching the filter.

    Attention:
        - The search is based on the effect of a quantum loop.
        - The search effectiveness depends on the number of processor threads.

    Args:
        filter_fn (Callable): A function that execute the conditions of filtering.
                              By default it searches for all documents.
        limit_docs (int): Limiting the number of documents. By default = 100.
        page_number (int): For pagination. By default = 1.
                           Number of documents per page = limit_docs.

    Returns:
        List of documents as dictionaries or None.
    """
    # The `page_number` parameter must not be less than one
    assert page_number > 0, "`find_many` => The `page_number` parameter must not be less than one."
    # Get Scruby instance
    scruby_self = self.scruby_self()
    # Variable initialization
    search_task_fn: Callable = self._task_find
    branch_numbers: range = range(scruby_self._max_number_branch)
    hash_reduce_left: int = scruby_self._hash_reduce_left
    db_root: str = scruby_self._db_root
    class_model: Any = scruby_self._class_model
    counter: int = 0
    number_docs_skippe: int = limit_docs * (page_number - 1) if page_number > 1 else 0
    result: list[dict[str, Any]] = []
    # Run quantum loop
    with concurrent.futures.ThreadPoolExecutor(scruby_self._max_workers) as executor:
        for branch_number in branch_numbers:
            if number_docs_skippe == 0 and counter >= limit_docs:
                return result[:limit_docs]
            future = executor.submit(
                search_task_fn,
                branch_number,
                filter_fn,
                hash_reduce_left,
                db_root,
                class_model,
            )
            docs = await future.result()
            if docs is not None:
                for doc in docs:
                    if number_docs_skippe == 0:
                        if counter >= limit_docs:
                            return result[:limit_docs]
                        result.append(doc)
                        counter += 1
                    else:
                        number_docs_skippe -= 1
    return result or None

find_one(filter_fn) async

Asynchronous method for find one document matching the filter.

Attention
  • The search is based on the effect of a quantum loop.
  • The search effectiveness depends on the number of processor threads.

Parameters:

Name Type Description Default
filter_fn Callable

A function that execute the conditions of filtering.

required

Returns:

Type Description
dict[str, Any] | None

One document as dictionary or None.

Source code in src/scruby_return_dict/plugin.py
@final
async def find_one(
    self,
    filter_fn: Callable,
) -> dict[str, Any] | None:
    """Asynchronous method for find one document matching the filter.

    Attention:
        - The search is based on the effect of a quantum loop.
        - The search effectiveness depends on the number of processor threads.

    Args:
        filter_fn (Callable): A function that execute the conditions of filtering.

    Returns:
        One document as dictionary or None.
    """
    # Get Scruby instance
    scruby_self = self.scruby_self()
    # Variable initialization
    search_task_fn: Callable = self._task_find
    branch_numbers: range = range(scruby_self._max_number_branch)
    hash_reduce_left: int = scruby_self._hash_reduce_left
    db_root: str = scruby_self._db_root
    class_model: Any = scruby_self._class_model
    # Run quantum loop
    with concurrent.futures.ThreadPoolExecutor(scruby_self._max_workers) as executor:
        for branch_number in branch_numbers:
            future = executor.submit(
                search_task_fn,
                branch_number,
                filter_fn,
                hash_reduce_left,
                db_root,
                class_model,
            )
            docs = await future.result()
            if docs is not None:
                return docs[0]
    return None