Skip to content

Check

Validation of Model data before saving to the database.

CheckMixin

Bases: BoolGroupMixin, ChoiceGroupMixin, DateGroupMixin, FileGroupMixin, IDGroupMixin, ImgGroupMixin, NumGroupMixin, PassGroupMixin, SlugGroupMixin, TextGroupMixin

Validation of Model data before saving to the database.

Source code in src/ramifice/paladins/check.py
class CheckMixin(
    BoolGroupMixin,
    ChoiceGroupMixin,
    DateGroupMixin,
    FileGroupMixin,
    IDGroupMixin,
    ImgGroupMixin,
    NumGroupMixin,
    PassGroupMixin,
    SlugGroupMixin,
    TextGroupMixin,
):
    """Validation of Model data before saving to the database."""

    async def check(
        self,
        is_save: bool = False,
        collection: AsyncCollection | None = None,
        is_migration_process: bool = False,
    ) -> dict[str, Any]:
        """Validation of Model data before saving to the database.

        It is also used to verify Models that do not migrate to the database.
        """
        cls_model = self.__class__

        # Get the document ID.
        doc_id: ObjectId | None = self._id.value
        # Does the document exist in the database?
        is_update: bool = doc_id is not None
        # Create an identifier for a new document.
        if is_save and not is_update:
            doc_id = ObjectId()
            self._id.value = doc_id

        result_map: dict[str, Any] = {}
        # Errors from additional validation of fields.
        error_map: dict[str, str] = await self.add_validation()
        # Get Model collection.
        if collection is None:
            collection = constants.MONGO_DATABASE[cls_model.META["collection_name"]]
        # Create params for *_group methods.
        params: dict[str, Any] = {
            "doc_id": doc_id,
            "is_save": is_save,
            "is_update": is_update,  # Does the document exist in the database?
            "is_error_symptom": False,  # Is there any incorrect data?
            "result_map": result_map,  # Data to save or update to the database.
            "collection": collection,
            "field_data": None,
            "full_model_name": cls_model.META["full_model_name"],
            "is_migration_process": is_migration_process,
            "curr_doc": (await collection.find_one({"_id": doc_id}) if is_save and is_update else None),
        }

        # Run checking fields.
        for field_name, field_data in self.__dict__.items():
            if callable(field_data):
                continue
            # Reset a field errors to exclude duplicates.
            field_data.errors = []
            # Check additional validation.
            err_msg = error_map.get(field_name)
            if bool(err_msg):
                field_data.errors.append(err_msg)
                if not params["is_error_symptom"]:
                    params["is_error_symptom"] = True
            # Checking the fields by groups.
            if not field_data.ignored:
                params["field_data"] = field_data
                match field_data.group:
                    case "text":
                        await self.text_group(params)
                    case "num":
                        await self.num_group(params)
                    case "date":
                        self.date_group(params)
                    case "img":
                        await self.img_group(params)
                    case "file":
                        await self.file_group(params)
                    case "choice":
                        self.choice_group(params)
                    case "bool":
                        self.bool_group(params)
                    case "id":
                        self.id_group(params)
                    case "slug":
                        await self.slug_group(params)
                    case "pass":
                        self.pass_group(params)
                    case _ as unreachable:
                        msg: str = f"Unacceptable group `{unreachable}`!"
                        logger.critical(msg)
                        assert_never(unreachable)

        # Actions in case of error.
        if is_save:
            if params["is_error_symptom"]:
                # Reset the ObjectId for a new document.
                if not is_update:
                    self._id.value = None
                # Delete orphaned files.
                curr_doc: dict[str, Any] | None = params["curr_doc"]
                for field_name, field_data in self.__dict__.items():
                    if callable(field_data) or field_data.ignored:
                        continue
                    match field_data.group:
                        case "file":
                            file_data = result_map.get(field_name)
                            if file_data is not None:
                                if file_data["is_new_file"]:
                                    await to_thread.run_sync(remove, file_data["path"])
                                field_data.value = None
                            if curr_doc is not None:
                                field_data.value = curr_doc[field_name]
                        case "img":
                            img_data = result_map.get(field_name)
                            if img_data is not None:
                                if img_data["is_new_img"]:
                                    await to_thread.run_sync(rmtree, img_data["imgs_dir_path"])
                                field_data.value = None
                            if curr_doc is not None:
                                field_data.value = curr_doc[field_name]
            else:
                for field_name, field_data in self.__dict__.items():
                    if callable(field_data) or field_data.ignored:
                        continue
                    match field_data.group:
                        case "file":
                            file_data = result_map.get(field_name)
                            if file_data is not None:
                                file_data["is_new_file"] = False
                        case "img":
                            img_data = result_map.get(field_name)
                            if img_data is not None:
                                img_data["is_new_img"] = False
        #
        return {
            "data": result_map,
            "is_valid": not params["is_error_symptom"],
            "is_update": is_update,
        }

check(is_save=False, collection=None, is_migration_process=False) async

Validation of Model data before saving to the database.

It is also used to verify Models that do not migrate to the database.

Source code in src/ramifice/paladins/check.py
async def check(
    self,
    is_save: bool = False,
    collection: AsyncCollection | None = None,
    is_migration_process: bool = False,
) -> dict[str, Any]:
    """Validation of Model data before saving to the database.

    It is also used to verify Models that do not migrate to the database.
    """
    cls_model = self.__class__

    # Get the document ID.
    doc_id: ObjectId | None = self._id.value
    # Does the document exist in the database?
    is_update: bool = doc_id is not None
    # Create an identifier for a new document.
    if is_save and not is_update:
        doc_id = ObjectId()
        self._id.value = doc_id

    result_map: dict[str, Any] = {}
    # Errors from additional validation of fields.
    error_map: dict[str, str] = await self.add_validation()
    # Get Model collection.
    if collection is None:
        collection = constants.MONGO_DATABASE[cls_model.META["collection_name"]]
    # Create params for *_group methods.
    params: dict[str, Any] = {
        "doc_id": doc_id,
        "is_save": is_save,
        "is_update": is_update,  # Does the document exist in the database?
        "is_error_symptom": False,  # Is there any incorrect data?
        "result_map": result_map,  # Data to save or update to the database.
        "collection": collection,
        "field_data": None,
        "full_model_name": cls_model.META["full_model_name"],
        "is_migration_process": is_migration_process,
        "curr_doc": (await collection.find_one({"_id": doc_id}) if is_save and is_update else None),
    }

    # Run checking fields.
    for field_name, field_data in self.__dict__.items():
        if callable(field_data):
            continue
        # Reset a field errors to exclude duplicates.
        field_data.errors = []
        # Check additional validation.
        err_msg = error_map.get(field_name)
        if bool(err_msg):
            field_data.errors.append(err_msg)
            if not params["is_error_symptom"]:
                params["is_error_symptom"] = True
        # Checking the fields by groups.
        if not field_data.ignored:
            params["field_data"] = field_data
            match field_data.group:
                case "text":
                    await self.text_group(params)
                case "num":
                    await self.num_group(params)
                case "date":
                    self.date_group(params)
                case "img":
                    await self.img_group(params)
                case "file":
                    await self.file_group(params)
                case "choice":
                    self.choice_group(params)
                case "bool":
                    self.bool_group(params)
                case "id":
                    self.id_group(params)
                case "slug":
                    await self.slug_group(params)
                case "pass":
                    self.pass_group(params)
                case _ as unreachable:
                    msg: str = f"Unacceptable group `{unreachable}`!"
                    logger.critical(msg)
                    assert_never(unreachable)

    # Actions in case of error.
    if is_save:
        if params["is_error_symptom"]:
            # Reset the ObjectId for a new document.
            if not is_update:
                self._id.value = None
            # Delete orphaned files.
            curr_doc: dict[str, Any] | None = params["curr_doc"]
            for field_name, field_data in self.__dict__.items():
                if callable(field_data) or field_data.ignored:
                    continue
                match field_data.group:
                    case "file":
                        file_data = result_map.get(field_name)
                        if file_data is not None:
                            if file_data["is_new_file"]:
                                await to_thread.run_sync(remove, file_data["path"])
                            field_data.value = None
                        if curr_doc is not None:
                            field_data.value = curr_doc[field_name]
                    case "img":
                        img_data = result_map.get(field_name)
                        if img_data is not None:
                            if img_data["is_new_img"]:
                                await to_thread.run_sync(rmtree, img_data["imgs_dir_path"])
                            field_data.value = None
                        if curr_doc is not None:
                            field_data.value = curr_doc[field_name]
        else:
            for field_name, field_data in self.__dict__.items():
                if callable(field_data) or field_data.ignored:
                    continue
                match field_data.group:
                    case "file":
                        file_data = result_map.get(field_name)
                        if file_data is not None:
                            file_data["is_new_file"] = False
                    case "img":
                        img_data = result_map.get(field_name)
                        if img_data is not None:
                            img_data["is_new_img"] = False
    #
    return {
        "data": result_map,
        "is_valid": not params["is_error_symptom"],
        "is_update": is_update,
    }