Skip to content

Check

Validation of Model data before saving to the database.

CheckMixin

Bases: BoolGroupMixin, ChoiceGroupMixin, DateGroupMixin, FileGroupMixin, IDGroupMixin, ImgGroupMixin, NumGroupMixin, PassGroupMixin, SlugGroupMixin, TextGroupMixin

Validation of Model data before saving to the database.

Source code in src\ramifice\paladins\check.py
class CheckMixin(
    BoolGroupMixin,
    ChoiceGroupMixin,
    DateGroupMixin,
    FileGroupMixin,
    IDGroupMixin,
    ImgGroupMixin,
    NumGroupMixin,
    PassGroupMixin,
    SlugGroupMixin,
    TextGroupMixin,
):
    """Validation of Model data before saving to the database."""

    async def check(
        self,
        is_save: bool = False,
        collection: AsyncCollection | None = None,
        is_migration_process: bool = False,
    ) -> dict[str, Any]:
        """Validation of Model data before saving to the database.

        It is also used to verify Models that do not migrate to the database.
        """
        cls_model = self.__class__

        # Get the document ID.
        doc_id: ObjectId | None = self._id.value
        # Does the document exist in the database?
        is_update: bool = doc_id is not None
        # Create an identifier for a new document.
        if is_save and not is_update:
            doc_id = ObjectId()
            self._id.value = doc_id

        result_map: dict[str, Any] = {}
        # Errors from additional validation of fields.
        error_map: dict[str, str] = await self.add_validation()
        # Get Model collection.
        if collection is None:
            collection = constants.MONGO_DATABASE[cls_model.META["collection_name"]]
        # Create params for *_group methods.
        params: dict[str, Any] = {
            "doc_id": doc_id,
            "is_save": is_save,
            "is_update": is_update,  # Does the document exist in the database?
            "is_error_symptom": False,  # Is there any incorrect data?
            "result_map": result_map,  # Data to save or update to the database.
            "collection": collection,
            "field_data": None,
            "full_model_name": cls_model.META["full_model_name"],
            "is_migration_process": is_migration_process,
            "curr_doc": (await collection.find_one({"_id": doc_id}) if is_save and is_update else None),
        }

        # Run checking fields.
        for field_name, field_data in self.__dict__.items():
            if callable(field_data):
                continue
            # Reset a field errors to exclude duplicates.
            field_data.errors = []
            # Check additional validation.
            err_msg = error_map.get(field_name)
            if bool(err_msg):
                field_data.errors.append(err_msg)
                if not params["is_error_symptom"]:
                    params["is_error_symptom"] = True
            # Checking the fields by groups.
            if not field_data.ignored:
                params["field_data"] = field_data
                match field_data.group:
                    case "text":
                        await self.text_group(params)
                    case "num":
                        await self.num_group(params)
                    case "date":
                        self.date_group(params)
                    case "img":
                        await self.img_group(params)
                    case "file":
                        await self.file_group(params)
                    case "choice":
                        self.choice_group(params)
                    case "bool":
                        self.bool_group(params)
                    case "id":
                        self.id_group(params)
                    case "slug":
                        await self.slug_group(params)
                    case "pass":
                        self.pass_group(params)
                    case _ as unreachable:
                        msg: str = f"Unacceptable group `{unreachable}`!"
                        logger.critical(msg)
                        assert_never(unreachable)

        # Actions in case of error.
        if is_save:
            if params["is_error_symptom"]:
                # Reset the ObjectId for a new document.
                if not is_update:
                    self._id.value = None
                # Delete orphaned files.
                curr_doc: dict[str, Any] | None = params["curr_doc"]
                for field_name, field_data in self.__dict__.items():
                    if callable(field_data) or field_data.ignored:
                        continue
                    match field_data.group:
                        case "file":
                            file_data = result_map.get(field_name)
                            if file_data is not None:
                                if file_data["is_new_file"]:
                                    await to_thread.run_sync(remove, file_data["path"])
                                field_data.value = None
                            if curr_doc is not None:
                                field_data.value = curr_doc[field_name]
                        case "img":
                            img_data = result_map.get(field_name)
                            if img_data is not None:
                                if img_data["is_new_img"]:
                                    await to_thread.run_sync(rmtree, img_data["imgs_dir_path"])
                                field_data.value = None
                            if curr_doc is not None:
                                field_data.value = curr_doc[field_name]
            else:
                for field_name, field_data in self.__dict__.items():
                    if callable(field_data) or field_data.ignored:
                        continue
                    match field_data.group:
                        case "file":
                            file_data = result_map.get(field_name)
                            if file_data is not None:
                                file_data["is_new_file"] = False
                        case "img":
                            img_data = result_map.get(field_name)
                            if img_data is not None:
                                img_data["is_new_img"] = False
        #
        return {
            "data": result_map,
            "is_valid": not params["is_error_symptom"],
            "is_update": is_update,
        }

check(is_save=False, collection=None, is_migration_process=False) async

Validation of Model data before saving to the database.

It is also used to verify Models that do not migrate to the database.

Source code in src\ramifice\paladins\check.py
async def check(
    self,
    is_save: bool = False,
    collection: AsyncCollection | None = None,
    is_migration_process: bool = False,
) -> dict[str, Any]:
    """Validation of Model data before saving to the database.

    It is also used to verify Models that do not migrate to the database.
    """
    cls_model = self.__class__

    # Get the document ID.
    doc_id: ObjectId | None = self._id.value
    # Does the document exist in the database?
    is_update: bool = doc_id is not None
    # Create an identifier for a new document.
    if is_save and not is_update:
        doc_id = ObjectId()
        self._id.value = doc_id

    result_map: dict[str, Any] = {}
    # Errors from additional validation of fields.
    error_map: dict[str, str] = await self.add_validation()
    # Get Model collection.
    if collection is None:
        collection = constants.MONGO_DATABASE[cls_model.META["collection_name"]]
    # Create params for *_group methods.
    params: dict[str, Any] = {
        "doc_id": doc_id,
        "is_save": is_save,
        "is_update": is_update,  # Does the document exist in the database?
        "is_error_symptom": False,  # Is there any incorrect data?
        "result_map": result_map,  # Data to save or update to the database.
        "collection": collection,
        "field_data": None,
        "full_model_name": cls_model.META["full_model_name"],
        "is_migration_process": is_migration_process,
        "curr_doc": (await collection.find_one({"_id": doc_id}) if is_save and is_update else None),
    }

    # Run checking fields.
    for field_name, field_data in self.__dict__.items():
        if callable(field_data):
            continue
        # Reset a field errors to exclude duplicates.
        field_data.errors = []
        # Check additional validation.
        err_msg = error_map.get(field_name)
        if bool(err_msg):
            field_data.errors.append(err_msg)
            if not params["is_error_symptom"]:
                params["is_error_symptom"] = True
        # Checking the fields by groups.
        if not field_data.ignored:
            params["field_data"] = field_data
            match field_data.group:
                case "text":
                    await self.text_group(params)
                case "num":
                    await self.num_group(params)
                case "date":
                    self.date_group(params)
                case "img":
                    await self.img_group(params)
                case "file":
                    await self.file_group(params)
                case "choice":
                    self.choice_group(params)
                case "bool":
                    self.bool_group(params)
                case "id":
                    self.id_group(params)
                case "slug":
                    await self.slug_group(params)
                case "pass":
                    self.pass_group(params)
                case _ as unreachable:
                    msg: str = f"Unacceptable group `{unreachable}`!"
                    logger.critical(msg)
                    assert_never(unreachable)

    # Actions in case of error.
    if is_save:
        if params["is_error_symptom"]:
            # Reset the ObjectId for a new document.
            if not is_update:
                self._id.value = None
            # Delete orphaned files.
            curr_doc: dict[str, Any] | None = params["curr_doc"]
            for field_name, field_data in self.__dict__.items():
                if callable(field_data) or field_data.ignored:
                    continue
                match field_data.group:
                    case "file":
                        file_data = result_map.get(field_name)
                        if file_data is not None:
                            if file_data["is_new_file"]:
                                await to_thread.run_sync(remove, file_data["path"])
                            field_data.value = None
                        if curr_doc is not None:
                            field_data.value = curr_doc[field_name]
                    case "img":
                        img_data = result_map.get(field_name)
                        if img_data is not None:
                            if img_data["is_new_img"]:
                                await to_thread.run_sync(rmtree, img_data["imgs_dir_path"])
                            field_data.value = None
                        if curr_doc is not None:
                            field_data.value = curr_doc[field_name]
        else:
            for field_name, field_data in self.__dict__.items():
                if callable(field_data) or field_data.ignored:
                    continue
                match field_data.group:
                    case "file":
                        file_data = result_map.get(field_name)
                        if file_data is not None:
                            file_data["is_new_file"] = False
                    case "img":
                        img_data = result_map.get(field_name)
                        if img_data is not None:
                            img_data["is_new_img"] = False
    #
    return {
        "data": result_map,
        "is_valid": not params["is_error_symptom"],
        "is_update": is_update,
    }