Skip to content

relation_proxy

RelationProxy

Bases: Generic[T], list[T]

Proxy of the Relation that is a list with special methods.

Source code in ormar/relations/relation_proxy.py
Python
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
class RelationProxy(Generic[T], list[T]):
    """
    Proxy of the Relation that is a list with special methods.
    """

    def __init__(
        self,
        relation: "Relation",
        type_: "RelationType",
        to: type["T"],
        field_name: str,
        data_: Any = None,
    ) -> None:
        self.relation: "Relation[T]" = relation
        self.type_: "RelationType" = type_
        self.field_name = field_name
        self._owner: "Model" = self.relation.manager.owner
        self._to: type["T"] = to
        self._queryset_proxy: Optional[QuerysetProxy[T]] = None
        self._related_field_name: Optional[str] = None

        self._relation_cache: dict[int, int] = {}

        validated_data = []
        if data_ is not None:
            idx = 0
            for d in data_:
                try:
                    self._relation_cache[d.__hash__()] = idx
                    validated_data.append(d)
                    idx += 1
                except ReferenceError:
                    pass
        super().__init__(validated_data or ())

    @property
    def queryset_proxy(self) -> "QuerysetProxy[T]":
        """
        Builds the underlying ``QuerysetProxy`` on first access. Most
        ``RelationProxy`` instances are constructed during row materialization
        and never have any queryset method invoked on them, so deferring this
        allocation skips a non-trivial dict/setattr pair per relation.

        :return: lazily constructed (and cached) QuerysetProxy
        :rtype: QuerysetProxy
        """
        proxy = self._queryset_proxy
        if proxy is None:
            proxy = QuerysetProxy[T](
                relation=self.relation, to=self._to, type_=self.type_
            )
            self._queryset_proxy = proxy
        return proxy

    @property
    def related_field_name(self) -> str:
        """
        On first access calculates the name of the related field, later stored in
        _related_field_name property.

        :return: name of the related field
        :rtype: str
        """
        if self._related_field_name:
            return self._related_field_name
        owner_field = self._owner.ormar_config.model_fields[self.field_name]
        self._related_field_name = owner_field.get_related_name()

        return self._related_field_name

    def __getitem__(self, item: Any) -> "T":  # type: ignore
        return super().__getitem__(item)

    def append(self, item: "T") -> None:
        """
        Appends an item to the list in place

        :param item: The generic item of the list
        :type item: T
        """
        idx = len(self)
        self._relation_cache[item.__hash__()] = idx
        super().append(item)

    def update_cache(self, prev_hash: int, new_hash: int) -> None:
        """
        Updates the cache from the old hash to the new one.
        This maintains the index cache, which allows O(1) indexing and
        existence checks

        :param prev_hash: The hash to update
        :type prev_hash: int
        :param prev_hash: The new hash to update to
        :type new_hash: int
        """
        try:
            idx = self._relation_cache.pop(prev_hash)
            self._relation_cache[new_hash] = idx
        except KeyError:
            pass

    def index(self, item: T, *args: Any) -> int:
        """
        Gets the index of the item in the list

        :param item: The item to get the index of
        :type item: "T"
        """
        return self._relation_cache[item.__hash__()]

    def _get_list_of_missing_weakrefs(self) -> set[int]:
        """
        Iterates through the list and checks for weakrefs.

        :return: The set of missing weakref indices
        :rtype: set[int]
        """
        to_remove = set()
        for ind, relation_child in enumerate(self[:]):
            try:
                relation_child.__repr__.__self__  # type: ignore
            except ReferenceError:  # pragma no cover
                to_remove.add(ind)

        return to_remove

    def pop(self, index: SupportsIndex = 0) -> T:
        """
        Pops the index off the list and returns it. By default,
        it pops off the element at index 0.
        This also clears the value from the relation cache.

        :param index: The index to pop
        :type index: SupportsIndex
        :return: The item at the provided index
        :rtype: "T"
        """
        item = self[index]

        # Try to delete it, but do it a long way
        # if weakly-referenced thing doesn't exist
        try:
            self._relation_cache.pop(item.__hash__())
        except ReferenceError:
            for hash_, idx in self._relation_cache.items():
                if idx == index:
                    self._relation_cache.pop(hash_)
                    break

        index_int = int(index)
        for idx in range(index_int + 1, len(self)):
            self._relation_cache[self[idx].__hash__()] -= 1

        return super().pop(index)

    def __contains__(self, item: object) -> bool:
        """
        Checks whether the item exists in self. This relies
        on the relation cache, which is a hashmap of values
        in the list. It runs in O(1) time.

        :param item: The item to check if the list contains
        :type item: object
        """
        try:
            return item.__hash__() in self._relation_cache
        except ReferenceError:
            return False

    async def count(self, distinct: bool = True) -> int:  # type: ignore[override]
        """
        Returns count of related models. Delegates to ``QuerysetProxy.count``.

        Defined explicitly to shadow ``list.count`` so attribute lookup resolves
        to the relation-aware version without going through ``__getattribute__``
        on every attribute access.

        :param distinct: flag if the primary table rows should be distinct
        :type distinct: bool
        :return: number of related models
        :rtype: int
        """
        self._initialize_queryset()
        return await self.queryset_proxy.count(distinct=distinct)

    async def clear(self, keep_reversed: bool = True) -> int:  # type: ignore[override]
        """
        Removes all related models from the relation. Delegates to
        ``QuerysetProxy.clear``.

        Defined explicitly to shadow ``list.clear`` so attribute lookup resolves
        to the relation-aware version without going through ``__getattribute__``
        on every attribute access.

        :param keep_reversed: keep reversed FK rows in the database
        :type keep_reversed: bool
        :return: number of removed relation entries
        :rtype: int
        """
        self._initialize_queryset()
        return await self.queryset_proxy.clear(keep_reversed=keep_reversed)

    def __getattr__(self, item: str) -> Any:
        """
        Delegates calls for non existing attributes to QuerySetProxy.

        :param item: name of attribute/method
        :type item: str
        :return: method from QuerySetProxy if exists
        :rtype: method
        """
        self._initialize_queryset()
        return getattr(self.queryset_proxy, item)

    def _clear(self) -> None:
        self._relation_cache.clear()
        super().clear()

    def _initialize_queryset(self) -> None:
        """
        Initializes the QuerySetProxy if not yet initialized.
        """
        if not self._check_if_queryset_is_initialized():
            self.queryset_proxy.queryset = self._set_queryset()

    def _check_if_queryset_is_initialized(self) -> bool:
        """
        Checks if the QuerySetProxy is already set and ready. Reads the
        backing ``_queryset_proxy`` slot directly so the check itself does
        not force lazy construction of the proxy.

        :return: result of the check
        :rtype: bool
        """
        proxy = self._queryset_proxy
        return proxy is not None and proxy._queryset is not None

    def _check_if_model_saved(self) -> None:
        """
        Verifies if the parent model of the relation has been already saved.
        Otherwise QuerySetProxy cannot filter by parent primary key.
        """
        pk_value = self._owner.pk
        if not pk_value:
            raise RelationshipInstanceError(
                "You cannot query relationships from unsaved model."
            )

    def _set_queryset(self) -> "QuerySet[T]":
        """
        Creates new QuerySet with relation model and pre filters it with currents
        parent model primary key, so all queries by definition are already related
        to the parent model only, without need for user to filter them.

        :return: initialized QuerySet
        :rtype: QuerySet
        """
        related_field_name = self.related_field_name
        pkname = self._owner.get_column_alias(self._owner.ormar_config.pkname)
        self._check_if_model_saved()
        kwargs = {f"{related_field_name}__{pkname}": self._owner.pk}
        queryset = (
            ormar.QuerySet(
                model_cls=self.relation.to, proxy_source_model=self._owner.__class__
            )
            .select_related(related_field_name)
            .filter(**kwargs)
        )
        return queryset

    async def remove(  # type: ignore
        self, item: "T", keep_reversed: bool = True
    ) -> None:
        """
        Removes the related from relation with parent.

        Through models are automatically deleted for m2m relations.

        For reverse FK relations keep_reversed flag marks if the reversed models
        should be kept or deleted from the database too (False means that models
        will be deleted, and not only removed from relation).

        :param item: child to remove from relation
        :type item: Model
        :param keep_reversed: flag if the reversed model should be kept or deleted too
        :type keep_reversed: bool
        """
        if item not in self:
            raise NoMatch(
                f"Object {self._owner.get_name()} has no "
                f"{item.get_name()} with given primary key!"
            )
        await self._owner.signals.pre_relation_remove.send(
            sender=self._owner.__class__,
            instance=self._owner,
            child=item,
            relation_name=self.field_name,
        )

        index_to_remove = self._relation_cache[item.__hash__()]
        self.pop(index_to_remove)

        relation_name = self.related_field_name
        relation = item._orm._get(relation_name)
        # if relation is None:  # pragma nocover
        #     raise ValueError(
        #         f"{self._owner.get_name()} does not have relation {relation_name}"
        #     )
        if relation:
            relation.remove(self._owner)
        self.relation.remove(item)
        if self.type_ == ormar.RelationType.MULTIPLE:
            await self.queryset_proxy.delete_through_instance(item)
        else:
            if keep_reversed:
                setattr(item, relation_name, None)
                await item.update()
            else:
                await item.delete()
        await self._owner.signals.post_relation_remove.send(
            sender=self._owner.__class__,
            instance=self._owner,
            child=item,
            relation_name=self.field_name,
        )

    async def add(self, item: "T", **kwargs: Any) -> None:
        """
        Adds child model to relation.

        For ManyToMany relations through instance is automatically created.

        :param kwargs: dict of additional keyword arguments for through instance
        :type kwargs: Any
        :param item: child to add to relation
        :type item: Model
        """
        new_idx = len(self) if item not in self else self.index(item)
        relation_name = self.related_field_name
        await self._owner.signals.pre_relation_add.send(
            sender=self._owner.__class__,
            instance=self._owner,
            child=item,
            relation_name=self.field_name,
            passed_kwargs=kwargs,
        )
        self._check_if_model_saved()
        if self.type_ == ormar.RelationType.MULTIPLE:
            await self.queryset_proxy.create_through_instance(item, **kwargs)
            setattr(self._owner, self.field_name, item)
        else:
            setattr(item, relation_name, self._owner)
            await item.upsert()
        self._relation_cache[item.__hash__()] = new_idx
        await self._owner.signals.post_relation_add.send(
            sender=self._owner.__class__,
            instance=self._owner,
            child=item,
            relation_name=self.field_name,
            passed_kwargs=kwargs,
        )

queryset_proxy property

Builds the underlying QuerysetProxy on first access. Most RelationProxy instances are constructed during row materialization and never have any queryset method invoked on them, so deferring this allocation skips a non-trivial dict/setattr pair per relation.

:return: lazily constructed (and cached) QuerysetProxy :rtype: QuerysetProxy

related_field_name property

On first access calculates the name of the related field, later stored in _related_field_name property.

:return: name of the related field :rtype: str

__contains__(item)

Checks whether the item exists in self. This relies on the relation cache, which is a hashmap of values in the list. It runs in O(1) time.

:param item: The item to check if the list contains :type item: object

Source code in ormar/relations/relation_proxy.py
Python
173
174
175
176
177
178
179
180
181
182
183
184
185
def __contains__(self, item: object) -> bool:
    """
    Checks whether the item exists in self. This relies
    on the relation cache, which is a hashmap of values
    in the list. It runs in O(1) time.

    :param item: The item to check if the list contains
    :type item: object
    """
    try:
        return item.__hash__() in self._relation_cache
    except ReferenceError:
        return False

__getattr__(item)

Delegates calls for non existing attributes to QuerySetProxy.

:param item: name of attribute/method :type item: str :return: method from QuerySetProxy if exists :rtype: method

Source code in ormar/relations/relation_proxy.py
Python
220
221
222
223
224
225
226
227
228
229
230
def __getattr__(self, item: str) -> Any:
    """
    Delegates calls for non existing attributes to QuerySetProxy.

    :param item: name of attribute/method
    :type item: str
    :return: method from QuerySetProxy if exists
    :rtype: method
    """
    self._initialize_queryset()
    return getattr(self.queryset_proxy, item)

add(item, **kwargs) async

Adds child model to relation.

For ManyToMany relations through instance is automatically created.

:param kwargs: dict of additional keyword arguments for through instance :type kwargs: Any :param item: child to add to relation :type item: Model

Source code in ormar/relations/relation_proxy.py
Python
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
async def add(self, item: "T", **kwargs: Any) -> None:
    """
    Adds child model to relation.

    For ManyToMany relations through instance is automatically created.

    :param kwargs: dict of additional keyword arguments for through instance
    :type kwargs: Any
    :param item: child to add to relation
    :type item: Model
    """
    new_idx = len(self) if item not in self else self.index(item)
    relation_name = self.related_field_name
    await self._owner.signals.pre_relation_add.send(
        sender=self._owner.__class__,
        instance=self._owner,
        child=item,
        relation_name=self.field_name,
        passed_kwargs=kwargs,
    )
    self._check_if_model_saved()
    if self.type_ == ormar.RelationType.MULTIPLE:
        await self.queryset_proxy.create_through_instance(item, **kwargs)
        setattr(self._owner, self.field_name, item)
    else:
        setattr(item, relation_name, self._owner)
        await item.upsert()
    self._relation_cache[item.__hash__()] = new_idx
    await self._owner.signals.post_relation_add.send(
        sender=self._owner.__class__,
        instance=self._owner,
        child=item,
        relation_name=self.field_name,
        passed_kwargs=kwargs,
    )

append(item)

Appends an item to the list in place

:param item: The generic item of the list :type item: T

Source code in ormar/relations/relation_proxy.py
Python
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def append(self, item: "T") -> None:
    """
    Appends an item to the list in place

    :param item: The generic item of the list
    :type item: T
    """
    idx = len(self)
    self._relation_cache[item.__hash__()] = idx
    super().append(item)

clear(keep_reversed=True) async

Removes all related models from the relation. Delegates to QuerysetProxy.clear.

Defined explicitly to shadow list.clear so attribute lookup resolves to the relation-aware version without going through __getattribute__ on every attribute access.

:param keep_reversed: keep reversed FK rows in the database :type keep_reversed: bool :return: number of removed relation entries :rtype: int

Source code in ormar/relations/relation_proxy.py
Python
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
async def clear(self, keep_reversed: bool = True) -> int:  # type: ignore[override]
    """
    Removes all related models from the relation. Delegates to
    ``QuerysetProxy.clear``.

    Defined explicitly to shadow ``list.clear`` so attribute lookup resolves
    to the relation-aware version without going through ``__getattribute__``
    on every attribute access.

    :param keep_reversed: keep reversed FK rows in the database
    :type keep_reversed: bool
    :return: number of removed relation entries
    :rtype: int
    """
    self._initialize_queryset()
    return await self.queryset_proxy.clear(keep_reversed=keep_reversed)

count(distinct=True) async

Returns count of related models. Delegates to QuerysetProxy.count.

Defined explicitly to shadow list.count so attribute lookup resolves to the relation-aware version without going through __getattribute__ on every attribute access.

:param distinct: flag if the primary table rows should be distinct :type distinct: bool :return: number of related models :rtype: int

Source code in ormar/relations/relation_proxy.py
Python
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
async def count(self, distinct: bool = True) -> int:  # type: ignore[override]
    """
    Returns count of related models. Delegates to ``QuerysetProxy.count``.

    Defined explicitly to shadow ``list.count`` so attribute lookup resolves
    to the relation-aware version without going through ``__getattribute__``
    on every attribute access.

    :param distinct: flag if the primary table rows should be distinct
    :type distinct: bool
    :return: number of related models
    :rtype: int
    """
    self._initialize_queryset()
    return await self.queryset_proxy.count(distinct=distinct)

index(item, *args)

Gets the index of the item in the list

:param item: The item to get the index of :type item: "T"

Source code in ormar/relations/relation_proxy.py
Python
119
120
121
122
123
124
125
126
def index(self, item: T, *args: Any) -> int:
    """
    Gets the index of the item in the list

    :param item: The item to get the index of
    :type item: "T"
    """
    return self._relation_cache[item.__hash__()]

pop(index=0)

Pops the index off the list and returns it. By default, it pops off the element at index 0. This also clears the value from the relation cache.

:param index: The index to pop :type index: SupportsIndex :return: The item at the provided index :rtype: "T"

Source code in ormar/relations/relation_proxy.py
Python
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def pop(self, index: SupportsIndex = 0) -> T:
    """
    Pops the index off the list and returns it. By default,
    it pops off the element at index 0.
    This also clears the value from the relation cache.

    :param index: The index to pop
    :type index: SupportsIndex
    :return: The item at the provided index
    :rtype: "T"
    """
    item = self[index]

    # Try to delete it, but do it a long way
    # if weakly-referenced thing doesn't exist
    try:
        self._relation_cache.pop(item.__hash__())
    except ReferenceError:
        for hash_, idx in self._relation_cache.items():
            if idx == index:
                self._relation_cache.pop(hash_)
                break

    index_int = int(index)
    for idx in range(index_int + 1, len(self)):
        self._relation_cache[self[idx].__hash__()] -= 1

    return super().pop(index)

remove(item, keep_reversed=True) async

Removes the related from relation with parent.

Through models are automatically deleted for m2m relations.

For reverse FK relations keep_reversed flag marks if the reversed models should be kept or deleted from the database too (False means that models will be deleted, and not only removed from relation).

:param item: child to remove from relation :type item: Model :param keep_reversed: flag if the reversed model should be kept or deleted too :type keep_reversed: bool

Source code in ormar/relations/relation_proxy.py
Python
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
async def remove(  # type: ignore
    self, item: "T", keep_reversed: bool = True
) -> None:
    """
    Removes the related from relation with parent.

    Through models are automatically deleted for m2m relations.

    For reverse FK relations keep_reversed flag marks if the reversed models
    should be kept or deleted from the database too (False means that models
    will be deleted, and not only removed from relation).

    :param item: child to remove from relation
    :type item: Model
    :param keep_reversed: flag if the reversed model should be kept or deleted too
    :type keep_reversed: bool
    """
    if item not in self:
        raise NoMatch(
            f"Object {self._owner.get_name()} has no "
            f"{item.get_name()} with given primary key!"
        )
    await self._owner.signals.pre_relation_remove.send(
        sender=self._owner.__class__,
        instance=self._owner,
        child=item,
        relation_name=self.field_name,
    )

    index_to_remove = self._relation_cache[item.__hash__()]
    self.pop(index_to_remove)

    relation_name = self.related_field_name
    relation = item._orm._get(relation_name)
    # if relation is None:  # pragma nocover
    #     raise ValueError(
    #         f"{self._owner.get_name()} does not have relation {relation_name}"
    #     )
    if relation:
        relation.remove(self._owner)
    self.relation.remove(item)
    if self.type_ == ormar.RelationType.MULTIPLE:
        await self.queryset_proxy.delete_through_instance(item)
    else:
        if keep_reversed:
            setattr(item, relation_name, None)
            await item.update()
        else:
            await item.delete()
    await self._owner.signals.post_relation_remove.send(
        sender=self._owner.__class__,
        instance=self._owner,
        child=item,
        relation_name=self.field_name,
    )

update_cache(prev_hash, new_hash)

Updates the cache from the old hash to the new one. This maintains the index cache, which allows O(1) indexing and existence checks

:param prev_hash: The hash to update :type prev_hash: int :param prev_hash: The new hash to update to :type new_hash: int

Source code in ormar/relations/relation_proxy.py
Python
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def update_cache(self, prev_hash: int, new_hash: int) -> None:
    """
    Updates the cache from the old hash to the new one.
    This maintains the index cache, which allows O(1) indexing and
    existence checks

    :param prev_hash: The hash to update
    :type prev_hash: int
    :param prev_hash: The new hash to update to
    :type new_hash: int
    """
    try:
        idx = self._relation_cache.pop(prev_hash)
        self._relation_cache[new_hash] = idx
    except KeyError:
        pass