21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214 | class ExcludableMixin(RelationMixin):
"""
Used to include/exclude given set of fields on models during load and dict() calls.
"""
if TYPE_CHECKING: # pragma: no cover
from ormar import Model
from ormar.models import ModelRow
@staticmethod
def _populate_pk_column(
model: Union[type["Model"], type["ModelRow"]],
columns: list[str],
use_alias: bool = False,
) -> list[str]:
"""
Adds primary key column/alias (depends on use_alias flag) to list of
column names that are selected.
:param model: model on columns are selected
:type model: type["Model"]
:param columns: list of columns names
:type columns: list[str]
:param use_alias: flag to set if aliases or field names should be used
:type use_alias: bool
:return: list of columns names with pk column in it
:rtype: list[str]
"""
pk_alias = (
model.get_column_alias(model.ormar_config.pkname)
if use_alias
else model.ormar_config.pkname
)
if pk_alias not in columns:
columns.append(pk_alias)
return columns
@staticmethod
def _get_table_column_pairs(
model: Union[type["Model"], type["ModelRow"]],
) -> list[tuple[str, str]]:
"""
Returns cached list of (col_name, field_name) tuples for the model's table.
Built once per model class and cached to avoid repeated SA column iteration.
:param model: model to get column pairs for
:type model: type["Model"]
:return: list of (column_name, field_name) tuples
:rtype: list[tuple[str, str]]
"""
cached = getattr(model, "_table_column_pairs", None)
if cached is not None:
return cached
pairs = [
(col.name, model.get_column_name_from_alias(col.name))
for col in model.ormar_config.table.columns
]
model._table_column_pairs = pairs # type: ignore[union-attr]
return pairs
@classmethod
def own_table_columns(
cls,
model: Union[type["Model"], type["ModelRow"]],
excludable: ExcludableItems,
alias: str = "",
use_alias: bool = False,
add_pk_columns: bool = True,
) -> list[str]:
"""
Returns list of aliases or field names for given model.
Aliases/names switch is use_alias flag.
If provided only fields included in fields will be returned.
If provided fields in exclude_fields will be excluded in return.
Primary key field is always added and cannot be excluded (will be added anyway).
:param add_pk_columns: flag if add primary key - always yes if ormar parses data
:type add_pk_columns: bool
:param alias: relation prefix
:type alias: str
:param excludable: structure of fields to include and exclude
:type excludable: ExcludableItems
:param model: model on columns are selected
:type model: type["Model"]
:param use_alias: flag if aliases or field names should be used
:type use_alias: bool
:return: list of column field names or aliases
:rtype: list[str]
"""
model_excludable = excludable.get(model_cls=model, alias=alias) # type: ignore
has_include = bool(model_excludable.include)
has_exclude = bool(model_excludable.exclude)
column_pairs = cls._get_table_column_pairs(model)
columns = []
for col_name, field_name in column_pairs:
if has_include and not model_excludable.is_included(field_name):
continue
if has_exclude and model_excludable.is_excluded(field_name):
continue
columns.append(col_name if use_alias else field_name)
# always has to return pk column for ormar to work
if add_pk_columns:
columns = cls._populate_pk_column(
model=model, columns=columns, use_alias=use_alias
)
return columns
@classmethod
def _update_excluded_with_related(cls, exclude: Union[set, dict, None]) -> set:
"""
Used during generation of the dict().
To avoid cyclical references and max recurrence limit nested models have to
exclude related models that are not mandatory.
For a main model (not nested) only nullable related field names are added to
exclusion, for nested models all related models are excluded.
:param exclude: set/dict with fields to exclude
:type exclude: Union[set, dict, None]
:return: set or dict with excluded fields added.
:rtype: Union[set, dict]
"""
exclude = exclude or set()
related_set = cls.extract_related_names()
if isinstance(exclude, set):
exclude = {s for s in exclude}
exclude = exclude.union(related_set)
elif isinstance(exclude, dict):
# relations are handled in ormar - take only own fields (ellipsis in dict)
exclude = {k for k, v in exclude.items() if v is Ellipsis}
exclude = exclude.union(related_set)
return exclude
@classmethod
def _update_excluded_with_pks_and_through(
cls, exclude: set, exclude_primary_keys: bool, exclude_through_models: bool
) -> set:
"""
Updates excluded names with name of pk column if exclude flag is set.
:param exclude: set of names to exclude
:type exclude: set
:param exclude_primary_keys: flag if the primary keys should be excluded
:type exclude_primary_keys: bool
:return: set updated with pk if flag is set
:rtype: set
"""
if exclude_primary_keys:
exclude.add(cls.ormar_config.pkname)
if exclude_through_models:
exclude = exclude.union(cls.extract_through_names())
return exclude
@classmethod
def get_names_to_exclude(cls, excludable: ExcludableItems, alias: str) -> set:
"""
Returns a set of models field names that should be explicitly excluded
during model initialization.
Those fields will be set to None to avoid ormar/pydantic setting default
values on them. They should be returned as None in any case.
Used in parsing data from database rows that construct Models by initializing
them with dicts constructed from those db rows.
:param alias: alias of current relation
:type alias: str
:param excludable: structure of fields to include and exclude
:type excludable: ExcludableItems
:return: set of field names that should be excluded
:rtype: set
"""
model = cast(type["Model"], cls)
model_excludable = excludable.get(model_cls=model, alias=alias)
fields_names = cls.extract_db_own_fields()
if model_excludable.include:
fields_to_keep = model_excludable.include.intersection(fields_names)
else:
fields_to_keep = fields_names
fields_to_exclude = fields_names - fields_to_keep
if model_excludable.exclude:
fields_to_exclude = fields_to_exclude.union(
model_excludable.exclude.intersection(fields_names)
)
fields_to_exclude = fields_to_exclude - {cls.ormar_config.pkname}
return fields_to_exclude
|