Skip to content

utils

SliceBounds dataclass

Normalized pagination parameters produced by :func:normalize_slice.

reverse=True signals that the ORDER BY directions should be flipped at query build time and the fetched list reversed in memory so the caller still sees rows in the original ordering. This is how negative indices and negative slice bounds are emulated without a COUNT(*) round-trip.

:ivar limit: row count for LIMIT; None means "no limit" :vartype limit: Optional[int] :ivar offset: row count for OFFSET; always non-negative :vartype offset: int :ivar reverse: whether the query ordering must be flipped and the result list reversed in memory :vartype reverse: bool

Source code in ormar/queryset/utils.py
Python
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@dataclass(frozen=True)
class SliceBounds:
    """
    Normalized pagination parameters produced by :func:`normalize_slice`.

    ``reverse=True`` signals that the ORDER BY directions should be flipped
    at query build time and the fetched list reversed in memory so the
    caller still sees rows in the original ordering. This is how negative
    indices and negative slice bounds are emulated without a ``COUNT(*)``
    round-trip.

    :ivar limit: row count for ``LIMIT``; ``None`` means "no limit"
    :vartype limit: Optional[int]
    :ivar offset: row count for ``OFFSET``; always non-negative
    :vartype offset: int
    :ivar reverse: whether the query ordering must be flipped and the
        result list reversed in memory
    :vartype reverse: bool
    """

    limit: Optional[int]
    offset: int
    reverse: bool

build_flatten_map(paths)

Build a nested-Ellipsis dict from pre-split flatten path tuples — the runtime representation threaded through model_dump recursion.

Example::

Text Only
1
2
3
4
5
6
7
8
9
paths = [
    ("author", "name"),
    ("author", "email"),
    ("category", "title"),
]
build_flatten_map(paths) == {
    "author": {"name": ..., "email": ...},
    "category": {"title": ...},
}

:param paths: iterable of tuple paths (each path already split on __) :type paths: Iterable[PathParts] :return: nested dict where leaves are ... :rtype: dict

Source code in ormar/queryset/utils.py
Python
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def build_flatten_map(paths: Iterable[PathParts]) -> dict:
    """
    Build a nested-Ellipsis dict from pre-split flatten path tuples — the
    runtime representation threaded through ``model_dump`` recursion.

    Example::

        paths = [
            ("author", "name"),
            ("author", "email"),
            ("category", "title"),
        ]
        build_flatten_map(paths) == {
            "author": {"name": ..., "email": ...},
            "category": {"title": ...},
        }

    :param paths: iterable of tuple paths (each path already split on ``__``)
    :type paths: Iterable[PathParts]
    :return: nested dict where leaves are ``...``
    :rtype: dict
    """
    result: dict = {}
    for parts in paths:
        current = result
        last = len(parts) - 1
        for i, part in enumerate(parts):
            if i == last:
                current[part] = ...
            else:
                current = current.setdefault(part, {})
    return result

convert_set_to_required_dict(set_to_convert)

Converts set to dictionary of required keys. Required key is Ellipsis.

:param set_to_convert: set to convert to dict :type set_to_convert: set :return: set converted to dict of ellipsis :rtype: dict

Source code in ormar/queryset/utils.py
Python
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def convert_set_to_required_dict(set_to_convert: set) -> dict:
    """
    Converts set to dictionary of required keys.
    Required key is Ellipsis.

    :param set_to_convert: set to convert to dict
    :type set_to_convert: set
    :return: set converted to dict of ellipsis
    :rtype: dict
    """
    new_dict = dict()
    for key in set_to_convert:
        new_dict[key] = Ellipsis
    return new_dict

extract_access_chains(value)

Unwrap FieldAccessor inputs (or lists that contain them) into their underlying dunder-path strings so downstream parsers see a uniform shape. Anything that isn't an accessor (or list of accessors) is returned unchanged — sets, tuples, dicts, and plain strings all pass through.

:param value: user input for a relation-spec method (select_related, prefetch_related, flatten_fields) :type value: Any :return: a dunder string, a list with each accessor replaced by its chain, or the original value unchanged :rtype: Any

Source code in ormar/queryset/utils.py
Python
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def extract_access_chains(value: Any) -> Any:
    """
    Unwrap ``FieldAccessor`` inputs (or lists that contain them) into their
    underlying dunder-path strings so downstream parsers see a uniform shape.
    Anything that isn't an accessor (or list of accessors) is returned
    unchanged — sets, tuples, dicts, and plain strings all pass through.

    :param value: user input for a relation-spec method (``select_related``,
        ``prefetch_related``, ``flatten_fields``)
    :type value: Any
    :return: a dunder string, a list with each accessor replaced by its chain,
        or the original value unchanged
    :rtype: Any
    """
    # Late import to avoid circular dependency with queryset package.
    from ormar.queryset.field_accessor import FieldAccessor

    if isinstance(value, FieldAccessor):
        return [value._access_chain]
    if isinstance(value, list):
        return [
            item._access_chain if isinstance(item, FieldAccessor) else item
            for item in value
        ]
    return value

get_relationship_alias_model_and_str(source_model, related_parts)

Walks the relation to retrieve the actual model on which the clause should be constructed, extracts alias based on last relation leading to target model. :param related_parts: list of related names extracted from string :type related_parts: Union[list, list[str]] :param source_model: model from which relation starts :type source_model: type[Model] :return: table prefix, target model and relation string :rtype: tuple[str, type["Model"], str]

Source code in ormar/queryset/utils.py
Python
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def get_relationship_alias_model_and_str(
    source_model: type["Model"], related_parts: list
) -> tuple[str, type["Model"], str, bool]:
    """
    Walks the relation to retrieve the actual model on which the clause should be
    constructed, extracts alias based on last relation leading to target model.
    :param related_parts: list of related names extracted from string
    :type related_parts: Union[list, list[str]]
    :param source_model: model from which relation starts
    :type source_model: type[Model]
    :return: table prefix, target model and relation string
    :rtype: tuple[str, type["Model"], str]
    """
    table_prefix = ""
    is_through = False
    target_model = source_model
    previous_model = target_model
    previous_models = [target_model]
    manager = target_model.ormar_config.alias_manager
    for relation in related_parts[:]:
        related_field = target_model.ormar_config.model_fields[relation]

        if related_field.is_through:
            previous_model, relation, is_through = _process_through_field(
                related_parts=related_parts,
                relation=relation,
                related_field=related_field,
                previous_model=previous_model,
                previous_models=previous_models,
            )
        if related_field.is_multi:
            previous_model = related_field.through
            relation = related_field.default_target_field_name()  # type: ignore
        table_prefix = manager.resolve_relation_alias(
            from_model=previous_model, relation_name=relation
        )
        target_model = related_field.to
        previous_model = target_model
        if not is_through:
            previous_models.append(previous_model)
    relation_str = "__".join(related_parts)

    return table_prefix, target_model, relation_str, is_through

normalize_slice(key)

Top-level dispatcher: turns a Python integer index or slice into a :class:SliceBounds suitable for QuerySet.

Delegates to a dedicated helper for each shape. Any shape that would require a COUNT(*) round-trip (step != 1, a bare [:-N], or mixed-sign bounds) raises QueryDefinitionError.

Examples::

Text Only
1
2
3
5           → SliceBounds(limit=1, offset=5,  reverse=False)
slice(2, 8) → SliceBounds(limit=6, offset=2,  reverse=False)
slice(-3, None) → SliceBounds(limit=3, offset=0, reverse=True)

:param key: integer or slice passed to QuerySet.__getitem__ :type key: int | slice :return: normalized slice parameters :rtype: SliceBounds

Source code in ormar/queryset/utils.py
Python
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def normalize_slice(key: Union[int, slice]) -> SliceBounds:
    """
    Top-level dispatcher: turns a Python integer index or slice into a
    :class:`SliceBounds` suitable for ``QuerySet``.

    Delegates to a dedicated helper for each shape. Any shape that would
    require a ``COUNT(*)`` round-trip (``step != 1``, a bare ``[:-N]``, or
    mixed-sign bounds) raises ``QueryDefinitionError``.

    Examples::

        5           → SliceBounds(limit=1, offset=5,  reverse=False)
        slice(2, 8) → SliceBounds(limit=6, offset=2,  reverse=False)
        slice(-3, None) → SliceBounds(limit=3, offset=0, reverse=True)

    :param key: integer or slice passed to ``QuerySet.__getitem__``
    :type key: int | slice
    :return: normalized slice parameters
    :rtype: SliceBounds
    """
    if isinstance(key, int):
        return _int_to_limit_offset(key)
    if isinstance(key, slice):
        return _slice_to_limit_offset(key)
    raise QueryDefinitionError(
        f"QuerySet indices must be integers or slices, not {type(key).__name__}."
    )

subtract_dict(current_dict, updating_dict)

Update one dict with another but with regard for nested keys.

That way nested sets are unionised, dicts updated and only other values are overwritten.

:param current_dict: dict to update :type current_dict: dict[str, ellipsis] :param updating_dict: dict with values to update :type updating_dict: dict :return: combination of both dicts :rtype: dict

Source code in ormar/queryset/utils.py
Python
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def subtract_dict(current_dict: Any, updating_dict: Any) -> dict:  # noqa: CCR001
    """
    Update one dict with another but with regard for nested keys.

    That way nested sets are unionised, dicts updated and
    only other values are overwritten.

    :param current_dict: dict to update
    :type current_dict: dict[str, ellipsis]
    :param updating_dict: dict with values to update
    :type updating_dict: dict
    :return: combination of both dicts
    :rtype: dict
    """
    for key, value in updating_dict.items():
        old_key = current_dict.get(key, {})
        new_value: Optional[Union[dict, set]] = None
        if not old_key:
            continue
        if isinstance(value, set) and isinstance(old_key, set):
            new_value = old_key.difference(value)
        elif isinstance(value, (set, collections.abc.Mapping)) and isinstance(
            old_key, (set, collections.abc.Mapping)
        ):
            value = (
                convert_set_to_required_dict(value)
                if not isinstance(value, collections.abc.Mapping)
                else value
            )
            old_key = (
                convert_set_to_required_dict(old_key)
                if not isinstance(old_key, collections.abc.Mapping)
                else old_key
            )
            new_value = subtract_dict(old_key, value)

        if new_value:
            current_dict[key] = new_value
        else:
            current_dict.pop(key, None)
    return current_dict

translate_list_to_dict(list_to_trans, default=...)

Splits the list of strings by '__' and converts them to dictionary with nested models grouped by parent model. That way each model appears only once in the whole dictionary and children are grouped under parent name.

Default required key ise Ellipsis like in pydantic.

:param list_to_trans: input list :type list_to_trans: Union[list, set] :param default: value to use as a default value :type default: Any :param is_order: flag if change affects order_by clauses are they require special default value with sort order. :type is_order: bool :return: converted to dictionary input list :rtype: dict

Source code in ormar/queryset/utils.py
Python
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def translate_list_to_dict(  # noqa: CCR001
    list_to_trans: Union[list, set], default: Any = ...
) -> dict:
    """
    Splits the list of strings by '__' and converts them to dictionary with nested
    models grouped by parent model. That way each model appears only once in the whole
    dictionary and children are grouped under parent name.

    Default required key ise Ellipsis like in pydantic.

    :param list_to_trans: input list
    :type list_to_trans: Union[list, set]
    :param default: value to use as a default value
    :type default: Any
    :param is_order: flag if change affects order_by clauses are they require special
    default value with sort order.
    :type is_order: bool
    :return: converted to dictionary input list
    :rtype: dict
    """
    return ormar_rust_utils.translate_list_to_dict(list(list_to_trans), default)

update(current_dict, updating_dict)

Update one dict with another but with regard for nested keys.

That way nested sets are unionised, dicts updated and only other values are overwritten.

:param current_dict: dict to update :type current_dict: dict[str, ellipsis] :param updating_dict: dict with values to update :type updating_dict: dict :return: combination of both dicts :rtype: dict

Source code in ormar/queryset/utils.py
Python
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
def update(current_dict: Any, updating_dict: Any) -> dict:  # noqa: CCR001
    """
    Update one dict with another but with regard for nested keys.

    That way nested sets are unionised, dicts updated and
    only other values are overwritten.

    :param current_dict: dict to update
    :type current_dict: dict[str, ellipsis]
    :param updating_dict: dict with values to update
    :type updating_dict: dict
    :return: combination of both dicts
    :rtype: dict
    """
    if current_dict is Ellipsis:
        current_dict = dict()
    for key, value in updating_dict.items():
        if isinstance(value, collections.abc.Mapping):
            old_key = current_dict.get(key, {})
            if isinstance(old_key, set):
                old_key = convert_set_to_required_dict(old_key)
            current_dict[key] = update(old_key, value)
        elif isinstance(value, set) and isinstance(current_dict.get(key), set):
            current_dict[key] = current_dict.get(key).union(value)
        else:
            current_dict[key] = value
    return current_dict

update_dict_from_list(curr_dict, list_to_update)

Converts the list into dictionary and later performs special update, where nested keys that are sets or dicts are combined and not overwritten.

:param curr_dict: dict to update :type curr_dict: dict :param list_to_update: list with values to update the dict :type list_to_update: list[str] :return: updated dict :rtype: dict

Source code in ormar/queryset/utils.py
Python
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
def update_dict_from_list(curr_dict: dict, list_to_update: Union[list, set]) -> dict:
    """
    Converts the list into dictionary and later performs special update, where
    nested keys that are sets or dicts are combined and not overwritten.

    :param curr_dict: dict to update
    :type curr_dict: dict
    :param list_to_update: list with values to update the dict
    :type list_to_update: list[str]
    :return: updated dict
    :rtype: dict
    """
    updated_dict = copy.copy(curr_dict)
    dict_to_update = translate_list_to_dict(list_to_update)
    update(updated_dict, dict_to_update)
    return updated_dict