Skip to content

query_executor

QueryExecutor module - executes database queries using SQLAlchemy async API.

QueryExecutor

Executes database queries using SQLAlchemy async API. Provides a databases-compatible interface.

Source code in ormar/databases/query_executor.py
Python
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class QueryExecutor:
    """
    Executes database queries using SQLAlchemy async API.
    Provides a databases-compatible interface.
    """

    def __init__(self, connection: AsyncConnection) -> None:
        """
        Initialize query executor.

        :param connection: SQLAlchemy async connection
        """
        self._connection = connection

    async def fetch_all(self, query: Executable) -> List[Any]:
        """
        Execute a query and fetch all rows.

        :param query: SQLAlchemy query expression
        :return: List of Row objects
        """
        result: CursorResult[Any] = await self._connection.execute(query)
        return list(result.mappings().all())

    async def fetch_one(self, query: Executable) -> Optional[RowMapping]:
        """
        Execute a query and fetch one row.

        :param query: SQLAlchemy query expression
        :return: Single Row object or None
        """
        result: CursorResult[Any] = await self._connection.execute(query)
        row = result.mappings().first()
        return row

    async def fetch_val(self, query: Executable, column: int = 0) -> Optional[Any]:
        """
        Execute a query and fetch a single scalar value.

        :param query: SQLAlchemy query expression
        :param column: Column index to fetch (default 0)
        :return: Scalar value or None
        """
        result: CursorResult[Any] = await self._connection.execute(query)
        return result.scalar()

    async def execute(self, query: Executable) -> Any:
        """
        Execute a query (INSERT, UPDATE, DELETE).

        :param query: SQLAlchemy query expression
        :return: For INSERT, the inserted primary key or ``None`` if the backend
            cannot return one (e.g. Oracle MySQL inserting into a
            non-AUTO_INCREMENT pk with a server default — no RETURNING support).
            For UPDATE/DELETE, the row count.
        """
        result: CursorResult[Any] = await self._connection.execute(query)

        # For INSERT queries, try to get the inserted primary key via the
        # dialect's best-available mechanism (RETURNING on PostgreSQL / SQLite
        # 3.35+ / MariaDB 10.5+, LAST_INSERT_ID() on MySQL AUTO_INCREMENT).
        # Do NOT fall back to rowcount here: rowcount is not a pk, and
        # returning it would silently corrupt `Model.pk` in `save()`.
        if result.context and result.context.isinsert:
            if result.inserted_primary_key:
                pk_value = result.inserted_primary_key[0]
                if pk_value is not None:
                    return pk_value

            if hasattr(result, "lastrowid") and result.lastrowid:  # pragma: no cover
                return result.lastrowid

            return None  # pragma: no cover

        return result.rowcount if result.rowcount is not None else 0

    async def execute_many(
        self, query: Union[Executable, str], values: Sequence[Mapping[str, Any]]
    ) -> None:
        """
        Execute a query multiple times with different parameter sets.

        :param query: SQLAlchemy query expression or SQL string
        :param values: Sequence of parameter mappings
        """
        exec_query = text(query) if isinstance(query, str) else query
        await self._connection.execute(exec_query, values)

    async def iterate(self, query: Executable) -> AsyncIterator[Any]:
        """
        Execute a query and iterate over results.

        :param query: SQLAlchemy query expression
        :return: Async iterator of Row objects
        """
        async with self._connection.stream(query) as result:
            async for row in result.mappings():
                yield row

__init__(connection)

Initialize query executor.

:param connection: SQLAlchemy async connection

Source code in ormar/databases/query_executor.py
Python
19
20
21
22
23
24
25
def __init__(self, connection: AsyncConnection) -> None:
    """
    Initialize query executor.

    :param connection: SQLAlchemy async connection
    """
    self._connection = connection

execute(query) async

Execute a query (INSERT, UPDATE, DELETE).

:param query: SQLAlchemy query expression :return: For INSERT, the inserted primary key or None if the backend cannot return one (e.g. Oracle MySQL inserting into a non-AUTO_INCREMENT pk with a server default — no RETURNING support). For UPDATE/DELETE, the row count.

Source code in ormar/databases/query_executor.py
Python
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def execute(self, query: Executable) -> Any:
    """
    Execute a query (INSERT, UPDATE, DELETE).

    :param query: SQLAlchemy query expression
    :return: For INSERT, the inserted primary key or ``None`` if the backend
        cannot return one (e.g. Oracle MySQL inserting into a
        non-AUTO_INCREMENT pk with a server default — no RETURNING support).
        For UPDATE/DELETE, the row count.
    """
    result: CursorResult[Any] = await self._connection.execute(query)

    # For INSERT queries, try to get the inserted primary key via the
    # dialect's best-available mechanism (RETURNING on PostgreSQL / SQLite
    # 3.35+ / MariaDB 10.5+, LAST_INSERT_ID() on MySQL AUTO_INCREMENT).
    # Do NOT fall back to rowcount here: rowcount is not a pk, and
    # returning it would silently corrupt `Model.pk` in `save()`.
    if result.context and result.context.isinsert:
        if result.inserted_primary_key:
            pk_value = result.inserted_primary_key[0]
            if pk_value is not None:
                return pk_value

        if hasattr(result, "lastrowid") and result.lastrowid:  # pragma: no cover
            return result.lastrowid

        return None  # pragma: no cover

    return result.rowcount if result.rowcount is not None else 0

execute_many(query, values) async

Execute a query multiple times with different parameter sets.

:param query: SQLAlchemy query expression or SQL string :param values: Sequence of parameter mappings

Source code in ormar/databases/query_executor.py
Python
89
90
91
92
93
94
95
96
97
98
99
async def execute_many(
    self, query: Union[Executable, str], values: Sequence[Mapping[str, Any]]
) -> None:
    """
    Execute a query multiple times with different parameter sets.

    :param query: SQLAlchemy query expression or SQL string
    :param values: Sequence of parameter mappings
    """
    exec_query = text(query) if isinstance(query, str) else query
    await self._connection.execute(exec_query, values)

fetch_all(query) async

Execute a query and fetch all rows.

:param query: SQLAlchemy query expression :return: List of Row objects

Source code in ormar/databases/query_executor.py
Python
27
28
29
30
31
32
33
34
35
async def fetch_all(self, query: Executable) -> List[Any]:
    """
    Execute a query and fetch all rows.

    :param query: SQLAlchemy query expression
    :return: List of Row objects
    """
    result: CursorResult[Any] = await self._connection.execute(query)
    return list(result.mappings().all())

fetch_one(query) async

Execute a query and fetch one row.

:param query: SQLAlchemy query expression :return: Single Row object or None

Source code in ormar/databases/query_executor.py
Python
37
38
39
40
41
42
43
44
45
46
async def fetch_one(self, query: Executable) -> Optional[RowMapping]:
    """
    Execute a query and fetch one row.

    :param query: SQLAlchemy query expression
    :return: Single Row object or None
    """
    result: CursorResult[Any] = await self._connection.execute(query)
    row = result.mappings().first()
    return row

fetch_val(query, column=0) async

Execute a query and fetch a single scalar value.

:param query: SQLAlchemy query expression :param column: Column index to fetch (default 0) :return: Scalar value or None

Source code in ormar/databases/query_executor.py
Python
48
49
50
51
52
53
54
55
56
57
async def fetch_val(self, query: Executable, column: int = 0) -> Optional[Any]:
    """
    Execute a query and fetch a single scalar value.

    :param query: SQLAlchemy query expression
    :param column: Column index to fetch (default 0)
    :return: Scalar value or None
    """
    result: CursorResult[Any] = await self._connection.execute(query)
    return result.scalar()

iterate(query) async

Execute a query and iterate over results.

:param query: SQLAlchemy query expression :return: Async iterator of Row objects

Source code in ormar/databases/query_executor.py
Python
101
102
103
104
105
106
107
108
109
110
async def iterate(self, query: Executable) -> AsyncIterator[Any]:
    """
    Execute a query and iterate over results.

    :param query: SQLAlchemy query expression
    :return: Async iterator of Row objects
    """
    async with self._connection.stream(query) as result:
        async for row in result.mappings():
            yield row