Skip to content

mixins

Package contains functionalities divided by features. All mixins are combined into ModelTableProxy which is one of the parents of Model. The split into mixins was done to ease the maintainability of the proxy class, as it became quite complicated over time.

AliasMixin

Used to translate field names into database column names.

Source code in ormar/models/mixins/alias_mixin.py
Python
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
class AliasMixin:
    """
    Used to translate field names into database column names.
    """

    if TYPE_CHECKING:  # pragma: no cover
        from ormar.models.ormar_config import OrmarConfig

        ormar_config: OrmarConfig

    _alias_to_field_map: dict[str, str]
    _field_to_alias_map: dict[str, str]

    @classmethod
    def _build_alias_cache(cls) -> None:
        """
        Build and cache alias mappings for this model class.
        Builds two dicts:
        - _field_to_alias_map: field_name -> db_alias
        - _alias_to_field_map: db_alias -> field_name (reverse)
        """
        field_to_alias = {}
        for field_name, field in cls.ormar_config.model_fields.items():
            alias = field.get_alias()
            if alias:
                field_to_alias[field_name] = alias
        cls._field_to_alias_map = field_to_alias
        cls._alias_to_field_map = ormar_rust_utils.build_reverse_alias_map(
            field_to_alias
        )

    @classmethod
    def get_column_alias(cls, field_name: str) -> str:
        """
        Returns db alias (column name in db) for given ormar field.
        For fields without alias field name is returned.
        :param field_name: name of the field to get alias from
        :type field_name: str
        :return: alias (db name) if set, otherwise passed name
        :rtype: str
        """
        try:
            return cls._field_to_alias_map.get(field_name, field_name)
        except AttributeError:
            cls._build_alias_cache()
            return cls._field_to_alias_map.get(field_name, field_name)

    @classmethod
    def get_column_name_from_alias(cls, alias: str) -> str:
        """
        Returns ormar field name for given db alias (column name in db).
        If field do not have alias it's returned as is.
        :param alias:
        :type alias: str
        :return: field name if set, otherwise passed alias (db name)
        :rtype: str
        """
        try:
            return cls._alias_to_field_map.get(alias, alias)
        except AttributeError:
            cls._build_alias_cache()
            return cls._alias_to_field_map.get(alias, alias)

    @classmethod
    def translate_columns_to_aliases(cls, new_kwargs: dict) -> dict:
        """
        Translates dictionary of model fields changing field names into aliases.
        If field has no alias the field name remains intact.
        Only fields present in the dictionary are translated.
        :param new_kwargs: dict with fields names and their values
        :type new_kwargs: dict
        :return: dict with aliases and their values
        :rtype: dict
        """
        try:
            field_to_alias = cls._field_to_alias_map
        except AttributeError:
            cls._build_alias_cache()
            field_to_alias = cls._field_to_alias_map
        for field_name in list(new_kwargs.keys()):
            alias = field_to_alias.get(field_name)
            if alias and alias != field_name:
                new_kwargs[alias] = new_kwargs.pop(field_name)
        return new_kwargs

    @classmethod
    def translate_aliases_to_columns(cls, new_kwargs: dict) -> dict:
        """
        Translates dictionary of model fields changing aliases into field names.
        If field has no alias the alias is already a field name.
        Only fields present in the dictionary are translated.
        :param new_kwargs: dict with aliases and their values
        :type new_kwargs: dict
        :return: dict with fields names and their values
        :rtype: dict
        """
        try:
            alias_to_field = cls._alias_to_field_map
        except AttributeError:  # pragma: nocover
            cls._build_alias_cache()
            alias_to_field = cls._alias_to_field_map
        for key in list(new_kwargs.keys()):
            field_name = alias_to_field.get(key)
            if field_name and field_name != key:
                new_kwargs[field_name] = new_kwargs.pop(key)
        return new_kwargs

get_column_alias(field_name) classmethod

Returns db alias (column name in db) for given ormar field. For fields without alias field name is returned. :param field_name: name of the field to get alias from :type field_name: str :return: alias (db name) if set, otherwise passed name :rtype: str

Source code in ormar/models/mixins/alias_mixin.py
Python
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@classmethod
def get_column_alias(cls, field_name: str) -> str:
    """
    Returns db alias (column name in db) for given ormar field.
    For fields without alias field name is returned.
    :param field_name: name of the field to get alias from
    :type field_name: str
    :return: alias (db name) if set, otherwise passed name
    :rtype: str
    """
    try:
        return cls._field_to_alias_map.get(field_name, field_name)
    except AttributeError:
        cls._build_alias_cache()
        return cls._field_to_alias_map.get(field_name, field_name)

get_column_name_from_alias(alias) classmethod

Returns ormar field name for given db alias (column name in db). If field do not have alias it's returned as is. :param alias: :type alias: str :return: field name if set, otherwise passed alias (db name) :rtype: str

Source code in ormar/models/mixins/alias_mixin.py
Python
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@classmethod
def get_column_name_from_alias(cls, alias: str) -> str:
    """
    Returns ormar field name for given db alias (column name in db).
    If field do not have alias it's returned as is.
    :param alias:
    :type alias: str
    :return: field name if set, otherwise passed alias (db name)
    :rtype: str
    """
    try:
        return cls._alias_to_field_map.get(alias, alias)
    except AttributeError:
        cls._build_alias_cache()
        return cls._alias_to_field_map.get(alias, alias)

translate_aliases_to_columns(new_kwargs) classmethod

Translates dictionary of model fields changing aliases into field names. If field has no alias the alias is already a field name. Only fields present in the dictionary are translated. :param new_kwargs: dict with aliases and their values :type new_kwargs: dict :return: dict with fields names and their values :rtype: dict

Source code in ormar/models/mixins/alias_mixin.py
Python
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@classmethod
def translate_aliases_to_columns(cls, new_kwargs: dict) -> dict:
    """
    Translates dictionary of model fields changing aliases into field names.
    If field has no alias the alias is already a field name.
    Only fields present in the dictionary are translated.
    :param new_kwargs: dict with aliases and their values
    :type new_kwargs: dict
    :return: dict with fields names and their values
    :rtype: dict
    """
    try:
        alias_to_field = cls._alias_to_field_map
    except AttributeError:  # pragma: nocover
        cls._build_alias_cache()
        alias_to_field = cls._alias_to_field_map
    for key in list(new_kwargs.keys()):
        field_name = alias_to_field.get(key)
        if field_name and field_name != key:
            new_kwargs[field_name] = new_kwargs.pop(key)
    return new_kwargs

translate_columns_to_aliases(new_kwargs) classmethod

Translates dictionary of model fields changing field names into aliases. If field has no alias the field name remains intact. Only fields present in the dictionary are translated. :param new_kwargs: dict with fields names and their values :type new_kwargs: dict :return: dict with aliases and their values :rtype: dict

Source code in ormar/models/mixins/alias_mixin.py
Python
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@classmethod
def translate_columns_to_aliases(cls, new_kwargs: dict) -> dict:
    """
    Translates dictionary of model fields changing field names into aliases.
    If field has no alias the field name remains intact.
    Only fields present in the dictionary are translated.
    :param new_kwargs: dict with fields names and their values
    :type new_kwargs: dict
    :return: dict with aliases and their values
    :rtype: dict
    """
    try:
        field_to_alias = cls._field_to_alias_map
    except AttributeError:
        cls._build_alias_cache()
        field_to_alias = cls._field_to_alias_map
    for field_name in list(new_kwargs.keys()):
        alias = field_to_alias.get(field_name)
        if alias and alias != field_name:
            new_kwargs[alias] = new_kwargs.pop(field_name)
    return new_kwargs

ExcludableMixin

Bases: RelationMixin

Used to include/exclude given set of fields on models during load and dict() calls.

Source code in ormar/models/mixins/excludable_mixin.py
Python
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class ExcludableMixin(RelationMixin):
    """
    Used to include/exclude given set of fields on models during load and dict() calls.
    """

    if TYPE_CHECKING:  # pragma: no cover
        from ormar import Model
        from ormar.models import ModelRow

    @staticmethod
    def _populate_pk_column(
        model: Union[type["Model"], type["ModelRow"]],
        columns: list[str],
        use_alias: bool = False,
    ) -> list[str]:
        """
        Adds primary key column/alias (depends on use_alias flag) to list of
        column names that are selected.

        :param model: model on columns are selected
        :type model: type["Model"]
        :param columns: list of columns names
        :type columns: list[str]
        :param use_alias: flag to set if aliases or field names should be used
        :type use_alias: bool
        :return: list of columns names with pk column in it
        :rtype: list[str]
        """
        pk_alias = (
            model.get_column_alias(model.ormar_config.pkname)
            if use_alias
            else model.ormar_config.pkname
        )
        if pk_alias not in columns:
            columns.append(pk_alias)
        return columns

    @staticmethod
    def _get_table_column_pairs(
        model: Union[type["Model"], type["ModelRow"]],
    ) -> list[tuple[str, str]]:
        """
        Returns cached list of (col_name, field_name) tuples for the model's table.
        Built once per model class and cached to avoid repeated SA column iteration.

        :param model: model to get column pairs for
        :type model: type["Model"]
        :return: list of (column_name, field_name) tuples
        :rtype: list[tuple[str, str]]
        """
        cached = getattr(model, "_table_column_pairs", None)
        if cached is not None:
            return cached
        pairs = [
            (col.name, model.get_column_name_from_alias(col.name))
            for col in model.ormar_config.table.columns
        ]
        model._table_column_pairs = pairs  # type: ignore[union-attr]
        return pairs

    @classmethod
    def own_table_columns(
        cls,
        model: Union[type["Model"], type["ModelRow"]],
        excludable: ExcludableItems,
        alias: str = "",
        use_alias: bool = False,
        add_pk_columns: bool = True,
    ) -> list[str]:
        """
        Returns list of aliases or field names for given model.
        Aliases/names switch is use_alias flag.

        If provided only fields included in fields will be returned.
        If provided fields in exclude_fields will be excluded in return.

        Primary key field is always added and cannot be excluded (will be added anyway).

        :param add_pk_columns: flag if add primary key - always yes if ormar parses data
        :type add_pk_columns: bool
        :param alias: relation prefix
        :type alias: str
        :param excludable: structure of fields to include and exclude
        :type excludable: ExcludableItems
        :param model: model on columns are selected
        :type model: type["Model"]
        :param use_alias: flag if aliases or field names should be used
        :type use_alias: bool
        :return: list of column field names or aliases
        :rtype: list[str]
        """
        model_excludable = excludable.get(model_cls=model, alias=alias)  # type: ignore
        has_include = bool(model_excludable.include)
        has_exclude = bool(model_excludable.exclude)

        column_pairs = cls._get_table_column_pairs(model)
        columns = []
        for col_name, field_name in column_pairs:
            if has_include and not model_excludable.is_included(field_name):
                continue
            if has_exclude and model_excludable.is_excluded(field_name):
                continue
            columns.append(col_name if use_alias else field_name)

        # always has to return pk column for ormar to work
        if add_pk_columns:
            columns = cls._populate_pk_column(
                model=model, columns=columns, use_alias=use_alias
            )

        return columns

    @classmethod
    def _update_excluded_with_related(cls, exclude: Union[set, dict, None]) -> set:
        """
        Used during generation of the dict().
        To avoid cyclical references and max recurrence limit nested models have to
        exclude related models that are not mandatory.

        For a main model (not nested) only nullable related field names are added to
        exclusion, for nested models all related models are excluded.

        :param exclude: set/dict with fields to exclude
        :type exclude: Union[set, dict, None]
        :return: set or dict with excluded fields added.
        :rtype: Union[set, dict]
        """
        exclude = exclude or set()
        related_set = cls.extract_related_names()
        if isinstance(exclude, set):
            exclude = {s for s in exclude}
            exclude = exclude.union(related_set)
        elif isinstance(exclude, dict):
            # relations are handled in ormar - take only own fields (ellipsis in dict)
            exclude = {k for k, v in exclude.items() if v is Ellipsis}
            exclude = exclude.union(related_set)
        return exclude

    @classmethod
    def _update_excluded_with_pks_and_through(
        cls, exclude: set, exclude_primary_keys: bool, exclude_through_models: bool
    ) -> set:
        """
        Updates excluded names with name of pk column if exclude flag is set.

        :param exclude: set of names to exclude
        :type exclude: set
        :param exclude_primary_keys: flag if the primary keys should be excluded
        :type exclude_primary_keys: bool
        :return: set updated with pk if flag is set
        :rtype: set
        """
        if exclude_primary_keys:
            exclude.add(cls.ormar_config.pkname)
        if exclude_through_models:
            exclude = exclude.union(cls.extract_through_names())
        return exclude

    @classmethod
    def get_names_to_exclude(cls, excludable: ExcludableItems, alias: str) -> set:
        """
        Returns a set of models field names that should be explicitly excluded
        during model initialization.

        Those fields will be set to None to avoid ormar/pydantic setting default
        values on them. They should be returned as None in any case.

        Used in parsing data from database rows that construct Models by initializing
        them with dicts constructed from those db rows.

        :param alias: alias of current relation
        :type alias: str
        :param excludable: structure of fields to include and exclude
        :type excludable: ExcludableItems
        :return: set of field names that should be excluded
        :rtype: set
        """
        model = cast(type["Model"], cls)
        model_excludable = excludable.get(model_cls=model, alias=alias)
        fields_names = cls.extract_db_own_fields()
        if model_excludable.include:
            fields_to_keep = model_excludable.include.intersection(fields_names)
        else:
            fields_to_keep = fields_names

        fields_to_exclude = fields_names - fields_to_keep

        if model_excludable.exclude:
            fields_to_exclude = fields_to_exclude.union(
                model_excludable.exclude.intersection(fields_names)
            )
        fields_to_exclude = fields_to_exclude - {cls.ormar_config.pkname}

        return fields_to_exclude

get_names_to_exclude(excludable, alias) classmethod

Returns a set of models field names that should be explicitly excluded during model initialization.

Those fields will be set to None to avoid ormar/pydantic setting default values on them. They should be returned as None in any case.

Used in parsing data from database rows that construct Models by initializing them with dicts constructed from those db rows.

:param alias: alias of current relation :type alias: str :param excludable: structure of fields to include and exclude :type excludable: ExcludableItems :return: set of field names that should be excluded :rtype: set

Source code in ormar/models/mixins/excludable_mixin.py
Python
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
@classmethod
def get_names_to_exclude(cls, excludable: ExcludableItems, alias: str) -> set:
    """
    Returns a set of models field names that should be explicitly excluded
    during model initialization.

    Those fields will be set to None to avoid ormar/pydantic setting default
    values on them. They should be returned as None in any case.

    Used in parsing data from database rows that construct Models by initializing
    them with dicts constructed from those db rows.

    :param alias: alias of current relation
    :type alias: str
    :param excludable: structure of fields to include and exclude
    :type excludable: ExcludableItems
    :return: set of field names that should be excluded
    :rtype: set
    """
    model = cast(type["Model"], cls)
    model_excludable = excludable.get(model_cls=model, alias=alias)
    fields_names = cls.extract_db_own_fields()
    if model_excludable.include:
        fields_to_keep = model_excludable.include.intersection(fields_names)
    else:
        fields_to_keep = fields_names

    fields_to_exclude = fields_names - fields_to_keep

    if model_excludable.exclude:
        fields_to_exclude = fields_to_exclude.union(
            model_excludable.exclude.intersection(fields_names)
        )
    fields_to_exclude = fields_to_exclude - {cls.ormar_config.pkname}

    return fields_to_exclude

own_table_columns(model, excludable, alias='', use_alias=False, add_pk_columns=True) classmethod

Returns list of aliases or field names for given model. Aliases/names switch is use_alias flag.

If provided only fields included in fields will be returned. If provided fields in exclude_fields will be excluded in return.

Primary key field is always added and cannot be excluded (will be added anyway).

:param add_pk_columns: flag if add primary key - always yes if ormar parses data :type add_pk_columns: bool :param alias: relation prefix :type alias: str :param excludable: structure of fields to include and exclude :type excludable: ExcludableItems :param model: model on columns are selected :type model: type["Model"] :param use_alias: flag if aliases or field names should be used :type use_alias: bool :return: list of column field names or aliases :rtype: list[str]

Source code in ormar/models/mixins/excludable_mixin.py
Python
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@classmethod
def own_table_columns(
    cls,
    model: Union[type["Model"], type["ModelRow"]],
    excludable: ExcludableItems,
    alias: str = "",
    use_alias: bool = False,
    add_pk_columns: bool = True,
) -> list[str]:
    """
    Returns list of aliases or field names for given model.
    Aliases/names switch is use_alias flag.

    If provided only fields included in fields will be returned.
    If provided fields in exclude_fields will be excluded in return.

    Primary key field is always added and cannot be excluded (will be added anyway).

    :param add_pk_columns: flag if add primary key - always yes if ormar parses data
    :type add_pk_columns: bool
    :param alias: relation prefix
    :type alias: str
    :param excludable: structure of fields to include and exclude
    :type excludable: ExcludableItems
    :param model: model on columns are selected
    :type model: type["Model"]
    :param use_alias: flag if aliases or field names should be used
    :type use_alias: bool
    :return: list of column field names or aliases
    :rtype: list[str]
    """
    model_excludable = excludable.get(model_cls=model, alias=alias)  # type: ignore
    has_include = bool(model_excludable.include)
    has_exclude = bool(model_excludable.exclude)

    column_pairs = cls._get_table_column_pairs(model)
    columns = []
    for col_name, field_name in column_pairs:
        if has_include and not model_excludable.is_included(field_name):
            continue
        if has_exclude and model_excludable.is_excluded(field_name):
            continue
        columns.append(col_name if use_alias else field_name)

    # always has to return pk column for ormar to work
    if add_pk_columns:
        columns = cls._populate_pk_column(
            model=model, columns=columns, use_alias=use_alias
        )

    return columns

MergeModelMixin

Used to merge models instances returned by database, but already initialized to ormar Models.keys

Models can duplicate during joins when parent model has multiple child rows, in the end all parent (main) models should be unique.

Source code in ormar/models/mixins/merge_mixin.py
Python
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
class MergeModelMixin:
    """
    Used to merge models instances returned by database,
    but already initialized to ormar Models.keys

    Models can duplicate during joins when parent model has multiple child rows,
    in the end all parent (main) models should be unique.
    """

    @classmethod
    def _recursive_add(cls, model_group: list["Model"]) -> list["Model"]:
        """
        Instead of accumulating the model additions one by one, this recursively adds
        the models. E.G.
        [1, 2, 3, 4].accumulate_add() would give [3, 3, 4], then [6, 4], then [10]
        where this method looks like
        [1, 2, 3, 4].recursive_add() gives [[3], [7]], [10]
        It's the same number of adds, but it gives better O(N) performance on sublists
        """
        if len(model_group) <= 1:
            return model_group

        added_values = []
        iterable_group = iter(model_group)
        for model in iterable_group:
            next_model = next(iterable_group, None)
            if next_model is not None:
                combined = cls.merge_two_instances(next_model, model)
            else:
                combined = model
            added_values.append(combined)

        return cls._recursive_add(added_values)

    @classmethod
    def merge_instances_list(
        cls,
        result_rows: list["Model"],
        *,
        excludable: "ExcludableItems",
    ) -> list["Model"]:
        """
        Merges a list of models into list of unique models.

        Models can duplicate during joins when parent model has multiple child rows,
        in the end all parent (main) models should be unique.

        Each merged top-level model also has the queryset's ``ExcludableItems``
        attached via the ``__ormar_excludable__`` slot so a bare ``model_dump()``
        on the result can resolve the (lazily cached) flatten map without going
        back through the queryset.

        :param result_rows: list of already initialized Models with child models
        populated, each instance is one row in db and some models can duplicate
        :type result_rows: list["Model"]
        :param excludable: ExcludableItems to attach to every merged top-level
            instance via the ``__ormar_excludable__`` slot
        :type excludable: ExcludableItems
        :return: list of merged models where each main model is unique
        :rtype: list["Model"]
        """
        merged_rows: list["Model"] = []

        if result_rows:
            pks = [model.pk for model in result_rows]
            index_groups = ormar_rust_utils.group_by_pk(pks)
            for group_indices in index_groups:
                # Single-row groups are the common case for queries with no
                # parent duplication (``Model.objects.all()`` and similar);
                # skip the wrapper list and the no-op ``_recursive_add`` call.
                if len(group_indices) == 1:
                    model = result_rows[group_indices[0]]
                else:
                    model = cls._recursive_add([result_rows[i] for i in group_indices])[
                        0
                    ]
                object.__setattr__(model, "__ormar_excludable__", excludable)
                merged_rows.append(model)

        return merged_rows

    @classmethod
    def merge_two_instances(
        cls, one: "Model", other: "Model", relation_map: Optional[dict] = None
    ) -> "Model":
        """
        Merges current (other) Model and previous one (one) and returns the current
        Model instance with data merged from previous one.

        If needed it's calling itself recurrently and merges also children models.

        :param relation_map: map of models relations to follow
        :type relation_map: dict
        :param one: previous model instance
        :type one: Model
        :param other: current model instance
        :type other: Model
        :return: current Model instance with data merged from previous one.
        :rtype: Model
        """
        relation_map = (
            relation_map
            if relation_map is not None
            else translate_list_to_dict(one._iterate_related_models())
        )
        for field_name in relation_map:
            current_field = getattr(one, field_name)
            other_value = getattr(other, field_name, [])
            if isinstance(current_field, list):
                value_to_set = cls._merge_items_lists(
                    field_name=field_name,
                    current_field=current_field,
                    other_value=other_value,
                    relation_map=relation_map,
                )
                setattr(other, field_name, value_to_set)
            elif (
                isinstance(current_field, ormar.Model)
                and isinstance(other_value, ormar.Model)
                and current_field.pk == other_value.pk
            ):
                setattr(
                    other,
                    field_name,
                    cls.merge_two_instances(
                        current_field,
                        other_value,
                        relation_map=cast(
                            Optional[dict],
                            skip_ellipsis(relation_map, field_name, default=dict()),
                        ),
                    ),
                )
        other.set_save_status(True)
        return other

    @classmethod
    def _merge_items_lists(
        cls,
        field_name: str,
        current_field: list,
        other_value: list,
        relation_map: Optional[dict],
    ) -> list:
        """
        Takes two list of nested models and process them going deeper
        according with the map.

        If model from one's list is in other -> they are merged with relations
        to follow passed from map.

        If one's model is not in other it's simply appended to the list.

        :param field_name: name of the current relation field
        :type field_name: str
        :param current_field: list of nested models from one model
        :type current_field: list[Model]
        :param other_value: list of nested models from other model
        :type other_value: list[Model]
        :param relation_map: map of relations to follow
        :type relation_map: dict
        :return: merged list of models
        :rtype: list[Model]
        """
        current_pks = [getattr(m, "pk", None) for m in current_field]
        other_pks = [getattr(m, "pk", None) for m in other_value]
        plan = ormar_rust_utils.plan_merge_items_lists(current_pks, other_pks)
        value_to_set = list(other_value)
        nested_relation_map = cast(
            Optional[dict],
            skip_ellipsis(relation_map, field_name, default=dict()),
        )
        for cur_idx, other_idx in plan:
            cur_item = current_field[cur_idx]
            if other_idx is not None:
                # ``other_idx`` is the destination slot the Rust planner
                # already identified — write the merged instance there in
                # place rather than rebuilding ``value_to_set`` with a pk
                # filter (which was O(N) per match).
                value_to_set[other_idx] = cls.merge_two_instances(
                    cur_item,
                    cast("Model", other_value[other_idx]),
                    relation_map=nested_relation_map,
                )
            else:
                value_to_set.append(cur_item)
        return value_to_set

merge_instances_list(result_rows, *, excludable) classmethod

Merges a list of models into list of unique models.

Models can duplicate during joins when parent model has multiple child rows, in the end all parent (main) models should be unique.

Each merged top-level model also has the queryset's ExcludableItems attached via the __ormar_excludable__ slot so a bare model_dump() on the result can resolve the (lazily cached) flatten map without going back through the queryset.

:param result_rows: list of already initialized Models with child models populated, each instance is one row in db and some models can duplicate :type result_rows: list["Model"] :param excludable: ExcludableItems to attach to every merged top-level instance via the __ormar_excludable__ slot :type excludable: ExcludableItems :return: list of merged models where each main model is unique :rtype: list["Model"]

Source code in ormar/models/mixins/merge_mixin.py
Python
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@classmethod
def merge_instances_list(
    cls,
    result_rows: list["Model"],
    *,
    excludable: "ExcludableItems",
) -> list["Model"]:
    """
    Merges a list of models into list of unique models.

    Models can duplicate during joins when parent model has multiple child rows,
    in the end all parent (main) models should be unique.

    Each merged top-level model also has the queryset's ``ExcludableItems``
    attached via the ``__ormar_excludable__`` slot so a bare ``model_dump()``
    on the result can resolve the (lazily cached) flatten map without going
    back through the queryset.

    :param result_rows: list of already initialized Models with child models
    populated, each instance is one row in db and some models can duplicate
    :type result_rows: list["Model"]
    :param excludable: ExcludableItems to attach to every merged top-level
        instance via the ``__ormar_excludable__`` slot
    :type excludable: ExcludableItems
    :return: list of merged models where each main model is unique
    :rtype: list["Model"]
    """
    merged_rows: list["Model"] = []

    if result_rows:
        pks = [model.pk for model in result_rows]
        index_groups = ormar_rust_utils.group_by_pk(pks)
        for group_indices in index_groups:
            # Single-row groups are the common case for queries with no
            # parent duplication (``Model.objects.all()`` and similar);
            # skip the wrapper list and the no-op ``_recursive_add`` call.
            if len(group_indices) == 1:
                model = result_rows[group_indices[0]]
            else:
                model = cls._recursive_add([result_rows[i] for i in group_indices])[
                    0
                ]
            object.__setattr__(model, "__ormar_excludable__", excludable)
            merged_rows.append(model)

    return merged_rows

merge_two_instances(one, other, relation_map=None) classmethod

Merges current (other) Model and previous one (one) and returns the current Model instance with data merged from previous one.

If needed it's calling itself recurrently and merges also children models.

:param relation_map: map of models relations to follow :type relation_map: dict :param one: previous model instance :type one: Model :param other: current model instance :type other: Model :return: current Model instance with data merged from previous one. :rtype: Model

Source code in ormar/models/mixins/merge_mixin.py
Python
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@classmethod
def merge_two_instances(
    cls, one: "Model", other: "Model", relation_map: Optional[dict] = None
) -> "Model":
    """
    Merges current (other) Model and previous one (one) and returns the current
    Model instance with data merged from previous one.

    If needed it's calling itself recurrently and merges also children models.

    :param relation_map: map of models relations to follow
    :type relation_map: dict
    :param one: previous model instance
    :type one: Model
    :param other: current model instance
    :type other: Model
    :return: current Model instance with data merged from previous one.
    :rtype: Model
    """
    relation_map = (
        relation_map
        if relation_map is not None
        else translate_list_to_dict(one._iterate_related_models())
    )
    for field_name in relation_map:
        current_field = getattr(one, field_name)
        other_value = getattr(other, field_name, [])
        if isinstance(current_field, list):
            value_to_set = cls._merge_items_lists(
                field_name=field_name,
                current_field=current_field,
                other_value=other_value,
                relation_map=relation_map,
            )
            setattr(other, field_name, value_to_set)
        elif (
            isinstance(current_field, ormar.Model)
            and isinstance(other_value, ormar.Model)
            and current_field.pk == other_value.pk
        ):
            setattr(
                other,
                field_name,
                cls.merge_two_instances(
                    current_field,
                    other_value,
                    relation_map=cast(
                        Optional[dict],
                        skip_ellipsis(relation_map, field_name, default=dict()),
                    ),
                ),
            )
    other.set_save_status(True)
    return other

PydanticMixin

Bases: RelationMixin

Source code in ormar/models/mixins/pydantic_mixin.py
Python
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class PydanticMixin(RelationMixin):
    __cache__: dict[str, type[pydantic.BaseModel]] = {}

    if TYPE_CHECKING:  # pragma: no cover
        __pydantic_decorators__: DecoratorInfos

    @classmethod
    def get_pydantic(
        cls,
        *,
        include: Union[set, dict, None] = None,
        exclude: Union[set, dict, None] = None,
    ) -> type[pydantic.BaseModel]:
        """
        Returns a pydantic model out of ormar model.

        Converts also nested ormar models into pydantic models.

        Can be used to fully exclude certain fields in fastapi response and requests.

        :param include: fields of own and nested models to include
        :type include: Union[set, dict, None]
        :param exclude: fields of own and nested models to exclude
        :type exclude: Union[set, dict, None]
        """
        relation_map = translate_list_to_dict(cls._iterate_related_models())

        return cls._convert_ormar_to_pydantic(
            include=include, exclude=exclude, relation_map=relation_map
        )

    @classmethod
    def _convert_ormar_to_pydantic(
        cls,
        relation_map: dict[str, Any],
        include: Union[set, dict, None] = None,
        exclude: Union[set, dict, None] = None,
    ) -> type[pydantic.BaseModel]:
        include = normalize_to_dict(include)
        exclude = normalize_to_dict(exclude)
        fields_dict: dict[str, Any] = dict()
        defaults: dict[str, Any] = dict()
        fields_to_process = filter_not_excluded_fields(
            fields={*cls.ormar_config.model_fields.keys()},
            include=include,
            exclude=exclude,
        )
        fields_to_process.sort(
            key=lambda x: list(cls.ormar_config.model_fields.keys()).index(x)
        )

        cache_key = f"{cls.__name__}_{str(include)}_{str(exclude)}"
        if cache_key in cls.__cache__:
            return cls.__cache__[cache_key]

        for name in fields_to_process:
            field = cls._determine_pydantic_field_type(
                name=name,
                defaults=defaults,
                include=include,
                exclude=exclude,
                relation_map=relation_map,
            )
            if field is not None:
                fields_dict[name] = field
        model = type(
            f"{cls.__name__}_{''.join(choices(string.ascii_uppercase, k=3))}",
            (pydantic.BaseModel,),
            {"__annotations__": fields_dict, **defaults},
        )
        model = cast(type[pydantic.BaseModel], model)
        cls._copy_field_validators(model=model)
        cls.__cache__[cache_key] = model
        return model

    @classmethod
    def _determine_pydantic_field_type(
        cls,
        name: str,
        defaults: dict,
        include: Union[set, dict, None],
        exclude: Union[set, dict, None],
        relation_map: dict[str, Any],
    ) -> Any:
        field = cls.ormar_config.model_fields[name]
        target: Any = None
        if field.is_relation and name in relation_map:
            target, default = cls._determined_included_relation_field_type(
                name=name,
                field=field,
                include=include,
                exclude=exclude,
                defaults=defaults,
                relation_map=relation_map,
            )
        elif not field.is_relation:
            defaults[name] = cls.model_fields[name].default  # type: ignore
            target = field.__type__
        if target is not None and field.nullable:
            target = Optional[target]
        return target

    @classmethod
    def _determined_included_relation_field_type(
        cls,
        name: str,
        field: Union[BaseField, ForeignKeyField, ManyToManyField],
        include: Union[set, dict, None],
        exclude: Union[set, dict, None],
        defaults: dict,
        relation_map: dict[str, Any],
    ) -> tuple[type[BaseModel], dict]:
        target = field.to._convert_ormar_to_pydantic(
            include=skip_ellipsis(include, name),
            exclude=skip_ellipsis(exclude, name),
            relation_map=cast(
                dict[str, Any], skip_ellipsis(relation_map, name, default=dict())
            ),
        )
        if field.is_multi or field.virtual:
            target = list[target]  # type: ignore
        if field.nullable:
            defaults[name] = None
        return target, defaults

    @classmethod
    def _copy_field_validators(cls, model: type[pydantic.BaseModel]) -> None:
        """
        Copy field validators from ormar model to generated pydantic model.
        """
        filed_names = list(model.model_fields.keys())
        cls.copy_selected_validators_type(
            model=model, fields=filed_names, validator_type="field_validators"
        )
        cls.copy_selected_validators_type(
            model=model, fields=filed_names, validator_type="validators"
        )

        class_validators = cls.__pydantic_decorators__.root_validators
        model.__pydantic_decorators__.root_validators.update(
            copy.deepcopy(class_validators)
        )
        model_validators = cls.__pydantic_decorators__.model_validators
        model.__pydantic_decorators__.model_validators.update(
            copy.deepcopy(model_validators)
        )
        model.model_rebuild(force=True)

    @classmethod
    def copy_selected_validators_type(
        cls, model: type[pydantic.BaseModel], fields: list[str], validator_type: str
    ) -> None:
        """
        Copy field validators from ormar model to generated pydantic model.
        """
        validators = getattr(cls.__pydantic_decorators__, validator_type)
        for name, decorator in validators.items():
            if any(field_name in decorator.info.fields for field_name in fields):
                copied_decorator = copy.deepcopy(decorator)
                copied_decorator.info.fields = [
                    field_name
                    for field_name in decorator.info.fields
                    if field_name in fields
                ]
                getattr(model.__pydantic_decorators__, validator_type)[name] = (
                    copied_decorator
                )

copy_selected_validators_type(model, fields, validator_type) classmethod

Copy field validators from ormar model to generated pydantic model.

Source code in ormar/models/mixins/pydantic_mixin.py
Python
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@classmethod
def copy_selected_validators_type(
    cls, model: type[pydantic.BaseModel], fields: list[str], validator_type: str
) -> None:
    """
    Copy field validators from ormar model to generated pydantic model.
    """
    validators = getattr(cls.__pydantic_decorators__, validator_type)
    for name, decorator in validators.items():
        if any(field_name in decorator.info.fields for field_name in fields):
            copied_decorator = copy.deepcopy(decorator)
            copied_decorator.info.fields = [
                field_name
                for field_name in decorator.info.fields
                if field_name in fields
            ]
            getattr(model.__pydantic_decorators__, validator_type)[name] = (
                copied_decorator
            )

get_pydantic(*, include=None, exclude=None) classmethod

Returns a pydantic model out of ormar model.

Converts also nested ormar models into pydantic models.

Can be used to fully exclude certain fields in fastapi response and requests.

:param include: fields of own and nested models to include :type include: Union[set, dict, None] :param exclude: fields of own and nested models to exclude :type exclude: Union[set, dict, None]

Source code in ormar/models/mixins/pydantic_mixin.py
Python
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@classmethod
def get_pydantic(
    cls,
    *,
    include: Union[set, dict, None] = None,
    exclude: Union[set, dict, None] = None,
) -> type[pydantic.BaseModel]:
    """
    Returns a pydantic model out of ormar model.

    Converts also nested ormar models into pydantic models.

    Can be used to fully exclude certain fields in fastapi response and requests.

    :param include: fields of own and nested models to include
    :type include: Union[set, dict, None]
    :param exclude: fields of own and nested models to exclude
    :type exclude: Union[set, dict, None]
    """
    relation_map = translate_list_to_dict(cls._iterate_related_models())

    return cls._convert_ormar_to_pydantic(
        include=include, exclude=exclude, relation_map=relation_map
    )

SavePrepareMixin

Bases: RelationMixin, AliasMixin

Used to prepare models to be saved in database

Source code in ormar/models/mixins/save_mixin.py
Python
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
class SavePrepareMixin(RelationMixin, AliasMixin):
    """
    Used to prepare models to be saved in database
    """

    if TYPE_CHECKING:  # pragma: nocover
        _json_fields: set[str]
        _bytes_fields: set[str]
        _onupdate_fields: set[str]
        __pydantic_core_schema__: CoreSchema
        __ormar_fields_validators__: Optional[
            dict[str, Union[SchemaValidator, PluggableSchemaValidator]]
        ]

    @classmethod
    def prepare_model_to_save(cls, new_kwargs: dict) -> dict:
        """
        Combines all preparation methods before saving.
        Removes primary key for if it's nullable or autoincrement pk field,
        and it's set to None.
        Substitute related models with their primary key values as fk column.
        Populates the default values for field with default set and no value.
        Translate columns into aliases (db names).

        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: dict[str, str]
        :return: dictionary of model that is about to be saved
        :rtype: dict[str, str]
        """
        new_kwargs = cls._remove_pk_from_kwargs(new_kwargs)
        new_kwargs = cls._remove_not_ormar_fields(new_kwargs)
        new_kwargs = cls.substitute_models_with_pks(new_kwargs)
        new_kwargs = cls.populate_default_values(new_kwargs)
        new_kwargs = cls.reconvert_str_to_bytes(new_kwargs)
        new_kwargs = cls.translate_columns_to_aliases(new_kwargs)
        return new_kwargs

    @classmethod
    def prepare_model_to_update(cls, new_kwargs: dict) -> dict:
        """
        Combines all preparation methods before updating.
        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: dict[str, str]
        :return: dictionary of model that is about to be updated
        :rtype: dict[str, str]
        """
        new_kwargs = cls.parse_non_db_fields(new_kwargs)
        new_kwargs = cls.substitute_models_with_pks(new_kwargs)
        new_kwargs = cls.reconvert_str_to_bytes(new_kwargs)
        new_kwargs = cls.dump_all_json_fields_to_str(new_kwargs)
        new_kwargs = cls.translate_columns_to_aliases(new_kwargs)
        new_kwargs = cls.translate_enum_columns(new_kwargs)
        return new_kwargs

    @classmethod
    def translate_enum_columns(cls, new_kwargs: dict) -> dict:
        for key, value in new_kwargs.items():
            if isinstance(value, Enum):
                new_kwargs[key] = value.name
        return new_kwargs

    @classmethod
    def _remove_not_ormar_fields(cls, new_kwargs: dict) -> dict:
        """
        Removes primary key for if it's nullable or autoincrement pk field,
        and it's set to None.

        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: dict[str, str]
        :return: dictionary of model that is about to be saved
        :rtype: dict[str, str]
        """
        try:
            ormar_fields = cls._ormar_fields_set  # type: ignore[attr-defined]
        except AttributeError:
            ormar_fields = set(cls.ormar_config.model_fields.keys())
            cls._ormar_fields_set = ormar_fields  # type: ignore[attr-defined]
        new_kwargs = {k: v for k, v in new_kwargs.items() if k in ormar_fields}
        return new_kwargs

    @classmethod
    def _remove_pk_from_kwargs(cls, new_kwargs: dict) -> dict:
        """
        Removes primary key for if it's nullable or autoincrement pk field,
        and it's set to None.

        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: dict[str, str]
        :return: dictionary of model that is about to be saved
        :rtype: dict[str, str]
        """
        pkname = cls.ormar_config.pkname
        pk = cls.ormar_config.model_fields[pkname]
        if new_kwargs.get(pkname, ormar.Undefined) is None and (
            pk.nullable or pk.autoincrement
        ):
            del new_kwargs[pkname]
        return new_kwargs

    @classmethod
    def parse_non_db_fields(cls, model_dict: dict) -> dict:
        """
        Receives dictionary of model that is about to be saved and changes uuid fields
        to strings in bulk_update.

        :param model_dict: dictionary of model that is about to be saved
        :type model_dict: dict
        :return: dictionary of model that is about to be saved
        :rtype: dict
        """
        for name, field in cls.ormar_config.model_fields.items():
            if field.__type__ == uuid.UUID and name in model_dict:
                parsers = {"string": lambda x: str(x), "hex": lambda x: "%.32x" % x.int}
                uuid_format = field.column_type.uuid_format
                parser: Callable[..., Any] = parsers.get(uuid_format, lambda x: x)
                model_dict[name] = parser(model_dict[name])
        return model_dict

    @classmethod
    def substitute_models_with_pks(cls, model_dict: dict) -> dict:  # noqa  CCR001
        """
        Receives dictionary of model that is about to be saved and changes all related
        models that are stored as foreign keys to their fk value.

        :param model_dict: dictionary of model that is about to be saved
        :type model_dict: dict
        :return: dictionary of model that is about to be saved
        :rtype: dict
        """
        for field in cls.extract_related_names():
            field_value = model_dict.get(field, None)
            if field_value is not None:
                target_field = cls.ormar_config.model_fields[field]
                target_pkname = target_field.to.ormar_config.pkname
                if isinstance(field_value, ormar.Model):  # pragma: no cover
                    pk_value = getattr(field_value, target_pkname)
                    if not pk_value:
                        raise ModelPersistenceError(
                            f"You cannot save {field_value.get_name()} "
                            f"model without pk set!"
                        )
                    model_dict[field] = pk_value
                elif isinstance(field_value, (list, dict)) and field_value:
                    if isinstance(field_value, list):
                        model_dict[field] = [
                            target.get(target_pkname) for target in field_value
                        ]
                    else:
                        model_dict[field] = field_value.get(target_pkname)
                else:
                    model_dict.pop(field, None)
        return model_dict

    @classmethod
    def reconvert_str_to_bytes(cls, model_dict: dict) -> dict:
        """
        Receives dictionary of model that is about to be saved and changes
        all bytes fields that are represented as strings back into bytes.

        :param model_dict: dictionary of model that is about to be saved
        :type model_dict: dict
        :return: dictionary of model that is about to be saved
        :rtype: dict
        """
        bytes_base64_fields = {
            name
            for name, field in cls.ormar_config.model_fields.items()
            if field.represent_as_base64_str
        }
        for key, value in model_dict.items():
            if key in cls._bytes_fields and isinstance(value, str):
                model_dict[key] = (
                    value.encode("utf-8")
                    if key not in bytes_base64_fields
                    else base64.b64decode(value)
                )
        return model_dict

    @classmethod
    def dump_all_json_fields_to_str(cls, model_dict: dict) -> dict:
        """
        Receives dictionary of model that is about to be saved and changes
        all json fields into strings

        :param model_dict: dictionary of model that is about to be saved
        :type model_dict: dict
        :return: dictionary of model that is about to be saved
        :rtype: dict
        """
        for key, value in model_dict.items():
            if key in cls._json_fields:
                model_dict[key] = encode_json(value)
        return model_dict

    @classmethod
    def serialize_nested_models_json_fields(cls, kwargs: dict) -> dict:
        """
        Serializes JSON fields in nested model instances to strings.
        This ensures that when models with JSON fields are nested in other models,
        the JSON values are properly serialized as strings for Pydantic validation.

        Needed as sqlalchemy auto parses the JSON fields.

        :param kwargs: dictionary of keyword arguments
        :type kwargs: Dict
        :return: modified kwargs with serialized JSON fields in nested models
        :rtype: Dict
        """
        for key, value in kwargs.items():
            if isinstance(value, ormar.Model) and hasattr(value, "_json_fields"):
                value.dump_all_json_fields_to_str(value.__dict__)
        return kwargs

    @classmethod
    def populate_default_values(cls, new_kwargs: dict) -> dict:
        """
        Receives dictionary of model that is about to be saved and populates the default
        value on the fields that have the default value set, but no actual value was
        passed by the user.

        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: dict
        :return: dictionary of model that is about to be saved
        :rtype: dict
        """
        for field_name, field in cls.ormar_config.model_fields.items():
            if field_name not in new_kwargs and field.has_default(use_server=False):
                new_kwargs[field_name] = field.get_default()
            # clear fields with server_default set as None
            if (
                field.server_default is not None
                and new_kwargs.get(field_name, None) is None
            ):
                new_kwargs.pop(field_name, None)
        return new_kwargs

    @classmethod
    def populate_onupdate_value(
        cls, new_kwargs: dict, explicit_fields: Optional[set[str]] = None
    ) -> dict:
        """
        Populates on_update values for fields that have on_update configured
        and were not provided explicitly.

        If ``explicit_fields`` is omitted the keys of ``new_kwargs`` are
        treated as explicitly provided. Callers pass the set explicitly when
        ``new_kwargs`` is a full ``model_dump`` (e.g. ``bulk_update``) so that
        fields the user actually mutated can be distinguished from values that
        are just present by virtue of the dump.

        :param new_kwargs: dictionary of values about to be used for the update
        :type new_kwargs: dict
        :param explicit_fields: fields that should be considered user-provided
        :type explicit_fields: Optional[set[str]]
        :return: new_kwargs with on_update values populated
        :rtype: dict
        """
        if explicit_fields is None:
            explicit_fields = set(new_kwargs.keys())
        for field_name in cls._onupdate_fields - explicit_fields:
            field = cls.ormar_config.model_fields[field_name]
            new_kwargs[field_name] = field.get_on_update()
        return new_kwargs

    @classmethod
    def validate_enums(cls, new_kwargs: dict) -> dict:
        """
        Receives dictionary of model that is about to be saved and validates the
        fields with choices set to see if the value is allowed.

        :param new_kwargs: dictionary of model that is about to be saved
        :type new_kwargs: dict
        :return: dictionary of model that is about to be saved
        :rtype: dict
        """
        validators = cls._build_individual_schema_validator()
        for key, value in new_kwargs.items():
            if key in validators:
                validators[key].validate_python(value)
        return new_kwargs

    @classmethod
    def _build_individual_schema_validator(cls) -> Any:
        if cls.__ormar_fields_validators__ is not None:
            return cls.__ormar_fields_validators__
        field_validators = {}
        for key, field in cls._extract_pydantic_fields().items():
            if cls.__pydantic_core_schema__["type"] == "definitions":
                schema = {
                    "type": "definitions",
                    "schema": field["schema"],
                    "definitions": cls.__pydantic_core_schema__["definitions"],
                }
            else:
                schema = field["schema"]
            field_validators[key] = create_schema_validator(
                schema, cls, cls.__module__, cls.__qualname__, "BaseModel"
            )
        cls.__ormar_fields_validators__ = field_validators
        return cls.__ormar_fields_validators__

    @classmethod
    def _extract_pydantic_fields(cls) -> Any:
        if cls.__pydantic_core_schema__["type"] == "model":
            return cls.__pydantic_core_schema__["schema"]["fields"]
        elif cls.__pydantic_core_schema__["type"] == "definitions":
            main_schema = cls.__pydantic_core_schema__["schema"]
            if "schema_ref" in main_schema:  # pragma: no cover
                reference_id = main_schema["schema_ref"]
                return next(
                    ref
                    for ref in cls.__pydantic_core_schema__["definitions"]
                    if ref["ref"] == reference_id
                )["schema"]["fields"]
            return main_schema["schema"]["fields"]

    @staticmethod
    async def _upsert_model(
        instance: "Model",
        save_all: bool,
        previous_model: Optional["Model"],
        relation_field: Optional["ForeignKeyField"],
        update_count: int,
    ) -> int:
        """
        Method updates given instance if:

        * instance is not saved or
        * instance have no pk or
        * save_all=True flag is set

        and instance is not __pk_only__.

        If relation leading to instance is a ManyToMany also the through model is saved

        :param instance: current model to upsert
        :type instance: Model
        :param save_all: flag if all models should be saved or only not saved ones
        :type save_all: bool
        :param relation_field: field with relation
        :type relation_field: Optional[ForeignKeyField]
        :param previous_model: previous model from which method came
        :type previous_model: Model
        :param update_count: no of updated models
        :type update_count: int
        :return: no of updated models
        :rtype: int
        """
        if (
            save_all or not instance.pk or not instance.saved
        ) and not instance.__pk_only__:
            await instance.upsert(__force_save__=True)
            if relation_field and relation_field.is_multi:
                await instance._upsert_through_model(
                    instance=instance,
                    relation_field=relation_field,
                    previous_model=cast("Model", previous_model),
                )
            update_count += 1
        return update_count

    @staticmethod
    async def _upsert_through_model(
        instance: "Model", previous_model: "Model", relation_field: "ForeignKeyField"
    ) -> None:
        """
        Upsert through model for m2m relation.

        :param instance: current model to upsert
        :type instance: Model
        :param relation_field: field with relation
        :type relation_field: Optional[ForeignKeyField]
        :param previous_model: previous model from which method came
        :type previous_model: Model
        """
        through_name = previous_model.ormar_config.model_fields[
            relation_field.name
        ].through.get_name()
        through = getattr(instance, through_name)
        if through:
            through_dict = through.model_dump(exclude=through.extract_related_names())
        else:
            through_dict = {}
        await getattr(
            previous_model, relation_field.name
        ).queryset_proxy.upsert_through_instance(instance, **through_dict)

    async def _update_relation_list(
        self,
        fields_list: Collection["ForeignKeyField"],
        follow: bool,
        save_all: bool,
        relation_map: dict,
        update_count: int,
    ) -> int:
        """
        Internal method used in save_related to follow deeper from
        related models and update numbers of updated related instances.

        :type save_all: flag if all models should be saved
        :type save_all: bool
        :param fields_list: list of ormar fields to follow and save
        :type fields_list: Collection["ForeignKeyField"]
        :param relation_map: map of relations to follow
        :type relation_map: dict
        :param follow: flag to trigger deep save -
        by default only directly related models are saved
        with follow=True also related models of related models are saved
        :type follow: bool
        :param update_count: internal parameter for recursive calls -
        number of updated instances
        :type update_count: int
        :return: tuple of update count and visited
        :rtype: int
        """
        for field in fields_list:
            values = self._get_field_values(name=field.name)
            for value in values:
                if follow:
                    update_count = await value.save_related(
                        follow=follow,
                        save_all=save_all,
                        relation_map=skip_ellipsis(
                            relation_map, field.name, default={}
                        ),
                        update_count=update_count,
                        previous_model=self,
                        relation_field=field,
                    )
                else:
                    update_count = await value._upsert_model(
                        instance=value,
                        save_all=save_all,
                        previous_model=self,
                        relation_field=field,
                        update_count=update_count,
                    )
        return update_count

    def _get_field_values(self, name: str) -> list:
        """
        Extract field values and ensures it is a list.

        :param name: name of the field
        :type name: str
        :return: list of values
        :rtype: list
        """
        values = getattr(self, name) or []
        if not isinstance(values, list):
            values = [values]
        return values

dump_all_json_fields_to_str(model_dict) classmethod

Receives dictionary of model that is about to be saved and changes all json fields into strings

:param model_dict: dictionary of model that is about to be saved :type model_dict: dict :return: dictionary of model that is about to be saved :rtype: dict

Source code in ormar/models/mixins/save_mixin.py
Python
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
@classmethod
def dump_all_json_fields_to_str(cls, model_dict: dict) -> dict:
    """
    Receives dictionary of model that is about to be saved and changes
    all json fields into strings

    :param model_dict: dictionary of model that is about to be saved
    :type model_dict: dict
    :return: dictionary of model that is about to be saved
    :rtype: dict
    """
    for key, value in model_dict.items():
        if key in cls._json_fields:
            model_dict[key] = encode_json(value)
    return model_dict

parse_non_db_fields(model_dict) classmethod

Receives dictionary of model that is about to be saved and changes uuid fields to strings in bulk_update.

:param model_dict: dictionary of model that is about to be saved :type model_dict: dict :return: dictionary of model that is about to be saved :rtype: dict

Source code in ormar/models/mixins/save_mixin.py
Python
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@classmethod
def parse_non_db_fields(cls, model_dict: dict) -> dict:
    """
    Receives dictionary of model that is about to be saved and changes uuid fields
    to strings in bulk_update.

    :param model_dict: dictionary of model that is about to be saved
    :type model_dict: dict
    :return: dictionary of model that is about to be saved
    :rtype: dict
    """
    for name, field in cls.ormar_config.model_fields.items():
        if field.__type__ == uuid.UUID and name in model_dict:
            parsers = {"string": lambda x: str(x), "hex": lambda x: "%.32x" % x.int}
            uuid_format = field.column_type.uuid_format
            parser: Callable[..., Any] = parsers.get(uuid_format, lambda x: x)
            model_dict[name] = parser(model_dict[name])
    return model_dict

populate_default_values(new_kwargs) classmethod

Receives dictionary of model that is about to be saved and populates the default value on the fields that have the default value set, but no actual value was passed by the user.

:param new_kwargs: dictionary of model that is about to be saved :type new_kwargs: dict :return: dictionary of model that is about to be saved :rtype: dict

Source code in ormar/models/mixins/save_mixin.py
Python
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
@classmethod
def populate_default_values(cls, new_kwargs: dict) -> dict:
    """
    Receives dictionary of model that is about to be saved and populates the default
    value on the fields that have the default value set, but no actual value was
    passed by the user.

    :param new_kwargs: dictionary of model that is about to be saved
    :type new_kwargs: dict
    :return: dictionary of model that is about to be saved
    :rtype: dict
    """
    for field_name, field in cls.ormar_config.model_fields.items():
        if field_name not in new_kwargs and field.has_default(use_server=False):
            new_kwargs[field_name] = field.get_default()
        # clear fields with server_default set as None
        if (
            field.server_default is not None
            and new_kwargs.get(field_name, None) is None
        ):
            new_kwargs.pop(field_name, None)
    return new_kwargs

populate_onupdate_value(new_kwargs, explicit_fields=None) classmethod

Populates on_update values for fields that have on_update configured and were not provided explicitly.

If explicit_fields is omitted the keys of new_kwargs are treated as explicitly provided. Callers pass the set explicitly when new_kwargs is a full model_dump (e.g. bulk_update) so that fields the user actually mutated can be distinguished from values that are just present by virtue of the dump.

:param new_kwargs: dictionary of values about to be used for the update :type new_kwargs: dict :param explicit_fields: fields that should be considered user-provided :type explicit_fields: Optional[set[str]] :return: new_kwargs with on_update values populated :rtype: dict

Source code in ormar/models/mixins/save_mixin.py
Python
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
@classmethod
def populate_onupdate_value(
    cls, new_kwargs: dict, explicit_fields: Optional[set[str]] = None
) -> dict:
    """
    Populates on_update values for fields that have on_update configured
    and were not provided explicitly.

    If ``explicit_fields`` is omitted the keys of ``new_kwargs`` are
    treated as explicitly provided. Callers pass the set explicitly when
    ``new_kwargs`` is a full ``model_dump`` (e.g. ``bulk_update``) so that
    fields the user actually mutated can be distinguished from values that
    are just present by virtue of the dump.

    :param new_kwargs: dictionary of values about to be used for the update
    :type new_kwargs: dict
    :param explicit_fields: fields that should be considered user-provided
    :type explicit_fields: Optional[set[str]]
    :return: new_kwargs with on_update values populated
    :rtype: dict
    """
    if explicit_fields is None:
        explicit_fields = set(new_kwargs.keys())
    for field_name in cls._onupdate_fields - explicit_fields:
        field = cls.ormar_config.model_fields[field_name]
        new_kwargs[field_name] = field.get_on_update()
    return new_kwargs

prepare_model_to_save(new_kwargs) classmethod

Combines all preparation methods before saving. Removes primary key for if it's nullable or autoincrement pk field, and it's set to None. Substitute related models with their primary key values as fk column. Populates the default values for field with default set and no value. Translate columns into aliases (db names).

:param new_kwargs: dictionary of model that is about to be saved :type new_kwargs: dict[str, str] :return: dictionary of model that is about to be saved :rtype: dict[str, str]

Source code in ormar/models/mixins/save_mixin.py
Python
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@classmethod
def prepare_model_to_save(cls, new_kwargs: dict) -> dict:
    """
    Combines all preparation methods before saving.
    Removes primary key for if it's nullable or autoincrement pk field,
    and it's set to None.
    Substitute related models with their primary key values as fk column.
    Populates the default values for field with default set and no value.
    Translate columns into aliases (db names).

    :param new_kwargs: dictionary of model that is about to be saved
    :type new_kwargs: dict[str, str]
    :return: dictionary of model that is about to be saved
    :rtype: dict[str, str]
    """
    new_kwargs = cls._remove_pk_from_kwargs(new_kwargs)
    new_kwargs = cls._remove_not_ormar_fields(new_kwargs)
    new_kwargs = cls.substitute_models_with_pks(new_kwargs)
    new_kwargs = cls.populate_default_values(new_kwargs)
    new_kwargs = cls.reconvert_str_to_bytes(new_kwargs)
    new_kwargs = cls.translate_columns_to_aliases(new_kwargs)
    return new_kwargs

prepare_model_to_update(new_kwargs) classmethod

Combines all preparation methods before updating. :param new_kwargs: dictionary of model that is about to be saved :type new_kwargs: dict[str, str] :return: dictionary of model that is about to be updated :rtype: dict[str, str]

Source code in ormar/models/mixins/save_mixin.py
Python
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@classmethod
def prepare_model_to_update(cls, new_kwargs: dict) -> dict:
    """
    Combines all preparation methods before updating.
    :param new_kwargs: dictionary of model that is about to be saved
    :type new_kwargs: dict[str, str]
    :return: dictionary of model that is about to be updated
    :rtype: dict[str, str]
    """
    new_kwargs = cls.parse_non_db_fields(new_kwargs)
    new_kwargs = cls.substitute_models_with_pks(new_kwargs)
    new_kwargs = cls.reconvert_str_to_bytes(new_kwargs)
    new_kwargs = cls.dump_all_json_fields_to_str(new_kwargs)
    new_kwargs = cls.translate_columns_to_aliases(new_kwargs)
    new_kwargs = cls.translate_enum_columns(new_kwargs)
    return new_kwargs

reconvert_str_to_bytes(model_dict) classmethod

Receives dictionary of model that is about to be saved and changes all bytes fields that are represented as strings back into bytes.

:param model_dict: dictionary of model that is about to be saved :type model_dict: dict :return: dictionary of model that is about to be saved :rtype: dict

Source code in ormar/models/mixins/save_mixin.py
Python
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@classmethod
def reconvert_str_to_bytes(cls, model_dict: dict) -> dict:
    """
    Receives dictionary of model that is about to be saved and changes
    all bytes fields that are represented as strings back into bytes.

    :param model_dict: dictionary of model that is about to be saved
    :type model_dict: dict
    :return: dictionary of model that is about to be saved
    :rtype: dict
    """
    bytes_base64_fields = {
        name
        for name, field in cls.ormar_config.model_fields.items()
        if field.represent_as_base64_str
    }
    for key, value in model_dict.items():
        if key in cls._bytes_fields and isinstance(value, str):
            model_dict[key] = (
                value.encode("utf-8")
                if key not in bytes_base64_fields
                else base64.b64decode(value)
            )
    return model_dict

serialize_nested_models_json_fields(kwargs) classmethod

Serializes JSON fields in nested model instances to strings. This ensures that when models with JSON fields are nested in other models, the JSON values are properly serialized as strings for Pydantic validation.

Needed as sqlalchemy auto parses the JSON fields.

:param kwargs: dictionary of keyword arguments :type kwargs: Dict :return: modified kwargs with serialized JSON fields in nested models :rtype: Dict

Source code in ormar/models/mixins/save_mixin.py
Python
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
@classmethod
def serialize_nested_models_json_fields(cls, kwargs: dict) -> dict:
    """
    Serializes JSON fields in nested model instances to strings.
    This ensures that when models with JSON fields are nested in other models,
    the JSON values are properly serialized as strings for Pydantic validation.

    Needed as sqlalchemy auto parses the JSON fields.

    :param kwargs: dictionary of keyword arguments
    :type kwargs: Dict
    :return: modified kwargs with serialized JSON fields in nested models
    :rtype: Dict
    """
    for key, value in kwargs.items():
        if isinstance(value, ormar.Model) and hasattr(value, "_json_fields"):
            value.dump_all_json_fields_to_str(value.__dict__)
    return kwargs

substitute_models_with_pks(model_dict) classmethod

Receives dictionary of model that is about to be saved and changes all related models that are stored as foreign keys to their fk value.

:param model_dict: dictionary of model that is about to be saved :type model_dict: dict :return: dictionary of model that is about to be saved :rtype: dict

Source code in ormar/models/mixins/save_mixin.py
Python
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@classmethod
def substitute_models_with_pks(cls, model_dict: dict) -> dict:  # noqa  CCR001
    """
    Receives dictionary of model that is about to be saved and changes all related
    models that are stored as foreign keys to their fk value.

    :param model_dict: dictionary of model that is about to be saved
    :type model_dict: dict
    :return: dictionary of model that is about to be saved
    :rtype: dict
    """
    for field in cls.extract_related_names():
        field_value = model_dict.get(field, None)
        if field_value is not None:
            target_field = cls.ormar_config.model_fields[field]
            target_pkname = target_field.to.ormar_config.pkname
            if isinstance(field_value, ormar.Model):  # pragma: no cover
                pk_value = getattr(field_value, target_pkname)
                if not pk_value:
                    raise ModelPersistenceError(
                        f"You cannot save {field_value.get_name()} "
                        f"model without pk set!"
                    )
                model_dict[field] = pk_value
            elif isinstance(field_value, (list, dict)) and field_value:
                if isinstance(field_value, list):
                    model_dict[field] = [
                        target.get(target_pkname) for target in field_value
                    ]
                else:
                    model_dict[field] = field_value.get(target_pkname)
            else:
                model_dict.pop(field, None)
    return model_dict

validate_enums(new_kwargs) classmethod

Receives dictionary of model that is about to be saved and validates the fields with choices set to see if the value is allowed.

:param new_kwargs: dictionary of model that is about to be saved :type new_kwargs: dict :return: dictionary of model that is about to be saved :rtype: dict

Source code in ormar/models/mixins/save_mixin.py
Python
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
@classmethod
def validate_enums(cls, new_kwargs: dict) -> dict:
    """
    Receives dictionary of model that is about to be saved and validates the
    fields with choices set to see if the value is allowed.

    :param new_kwargs: dictionary of model that is about to be saved
    :type new_kwargs: dict
    :return: dictionary of model that is about to be saved
    :rtype: dict
    """
    validators = cls._build_individual_schema_validator()
    for key, value in new_kwargs.items():
        if key in validators:
            validators[key].validate_python(value)
    return new_kwargs