Skip to content

connection

DatabaseConnection module - provides async database connection management.

DatabaseConnection

Wrapper around SQLAlchemy AsyncEngine that provides a databases-compatible API.

Source code in ormar/databases/connection.py
Python
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class DatabaseConnection:
    """
    Wrapper around SQLAlchemy AsyncEngine that provides a databases-compatible API.
    """

    def __init__(self, url: str, **options: Any) -> None:
        """
        Initialize database connection.

        :param url: Database URL with an async driver (e.g., postgresql+asyncpg://)
        :param options: Additional engine options
        """
        self._force_rollback = options.pop("force_rollback", False)
        self._url = url
        # Set reasonable pool defaults if not provided
        if "pool_size" not in options:
            options["pool_size"] = 5
        if "max_overflow" not in options:
            options["max_overflow"] = 10
        self._options = options
        self._engine: Optional[AsyncEngine] = None
        self._autocommit_engine: Optional[AsyncEngine] = None

        self._global_transaction: Optional[Transaction] = None

    async def connect(self) -> None:
        """Connect to the database by creating the async engine."""
        if self._engine is None:
            self._engine = create_async_engine(self._url, **self._options)
            # View of the same engine/pool in AUTOCOMMIT mode. Standalone
            # queries use this to avoid a BEGIN/COMMIT round-trip per call,
            # matching legacy `databases`-library semantics. Explicit
            # Transactions keep using ``self._engine`` so BEGIN / SAVEPOINT
            # still work.
            self._autocommit_engine = self._engine.execution_options(
                isolation_level="AUTOCOMMIT"
            )

            # Set up SQLite foreign keys pragma if using SQLite
            if self._engine.dialect.name == "sqlite":  # pragma: nocover

                @event.listens_for(self._engine.sync_engine, "connect")
                def set_sqlite_pragma(dbapi_conn: Any, connection_record: Any) -> None:
                    cursor = dbapi_conn.cursor()
                    cursor.execute("PRAGMA foreign_keys=ON")
                    cursor.close()

            if self._force_rollback:
                assert self._global_transaction is None
                self._global_transaction = Transaction(
                    self, force_rollback=self._force_rollback
                )
                await self._global_transaction.__aenter__()

    async def disconnect(self) -> None:
        """
        Disconnect from the database and dispose of the engine.
        In case of force_rollback, also roll back the global transaction.
        """
        if self._engine is not None:
            if self._force_rollback:
                assert self._global_transaction is not None
                await self._global_transaction.__aexit__()
                self._global_transaction = None

            await self._engine.dispose()
            self._engine = None
            self._autocommit_engine = None

    @property
    def is_connected(self) -> bool:
        """Check if the engine is connected."""
        return self._engine is not None

    @property
    def engine(self) -> AsyncEngine:
        """Get the async engine."""
        assert self._engine is not None, "DatabaseConnection not connected"
        return self._engine

    @property
    def dialect(self) -> Any:
        """Get the database dialect."""
        return self.engine.dialect

    @property
    def url(self) -> str:
        """Get the database URL."""
        return self._url

    @asynccontextmanager
    async def connection(self) -> AsyncIterator[AsyncConnection]:
        """
        Get a connection from the pool.
        If inside a transaction, returns the transaction connection.
        """
        trans_conn = _transaction_connection.get()
        if trans_conn is not None:
            yield trans_conn
        else:
            async with self.engine.connect() as conn:
                yield conn

    def transaction(self, force_rollback: bool = False) -> Transaction:
        """
        Create a transaction context manager.

        :param force_rollback: If True, always rollback (used for testing)
        """
        return Transaction(self, force_rollback=force_rollback)

    @asynccontextmanager
    async def get_query_executor(
        self, *, transactional: bool = False
    ) -> AsyncIterator[QueryExecutor]:
        """
        Get connection, reusing transaction connection if in transaction.

        :param transactional: If True, open an explicit transaction for the
            scope of the executor. Use for server-side cursors (``iterate``)
            and multi-row ``execute_many`` batches, which need a real
            transaction either because AUTOCOMMIT would commit each row
            separately or because the driver (asyncpg) requires a
            transaction for streaming.
        :type transactional: bool
        :return: QueryExecutor wrapping a connection
        :rtype: QueryExecutor
        """
        trans_conn = self.get_transaction_connection()
        if trans_conn is not None:
            # Inside a transaction - reuse the transaction's connection
            yield QueryExecutor(trans_conn)
        elif transactional:
            async with self.transaction():
                conn = self.get_transaction_connection()
                assert conn is not None
                yield QueryExecutor(conn)
        else:
            # Outside transaction: use AUTOCOMMIT view so each statement
            # commits at the driver level with no extra BEGIN/COMMIT
            # round-trip.
            assert self._autocommit_engine is not None
            async with self._autocommit_engine.connect() as conn:
                yield QueryExecutor(conn)

    def get_transaction_connection(self) -> Optional[AsyncConnection]:
        """Get the current transaction connection if in a transaction."""
        return _transaction_connection.get()

    def set_transaction_connection(self, conn: Optional[AsyncConnection]) -> None:
        """Set the current transaction connection."""
        _transaction_connection.set(conn)

    async def __aenter__(self) -> "DatabaseConnection":
        """Async context manager entry - connect to a database."""
        await self.connect()
        return self

    async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
        """Async context manager exit - disconnect from database."""
        await self.disconnect()

dialect property

Get the database dialect.

engine property

Get the async engine.

is_connected property

Check if the engine is connected.

url property

Get the database URL.

__aenter__() async

Async context manager entry - connect to a database.

Source code in ormar/databases/connection.py
Python
173
174
175
176
async def __aenter__(self) -> "DatabaseConnection":
    """Async context manager entry - connect to a database."""
    await self.connect()
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit - disconnect from database.

Source code in ormar/databases/connection.py
Python
178
179
180
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit - disconnect from database."""
    await self.disconnect()

__init__(url, **options)

Initialize database connection.

:param url: Database URL with an async driver (e.g., postgresql+asyncpg://) :param options: Additional engine options

Source code in ormar/databases/connection.py
Python
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def __init__(self, url: str, **options: Any) -> None:
    """
    Initialize database connection.

    :param url: Database URL with an async driver (e.g., postgresql+asyncpg://)
    :param options: Additional engine options
    """
    self._force_rollback = options.pop("force_rollback", False)
    self._url = url
    # Set reasonable pool defaults if not provided
    if "pool_size" not in options:
        options["pool_size"] = 5
    if "max_overflow" not in options:
        options["max_overflow"] = 10
    self._options = options
    self._engine: Optional[AsyncEngine] = None
    self._autocommit_engine: Optional[AsyncEngine] = None

    self._global_transaction: Optional[Transaction] = None

connect() async

Connect to the database by creating the async engine.

Source code in ormar/databases/connection.py
Python
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
async def connect(self) -> None:
    """Connect to the database by creating the async engine."""
    if self._engine is None:
        self._engine = create_async_engine(self._url, **self._options)
        # View of the same engine/pool in AUTOCOMMIT mode. Standalone
        # queries use this to avoid a BEGIN/COMMIT round-trip per call,
        # matching legacy `databases`-library semantics. Explicit
        # Transactions keep using ``self._engine`` so BEGIN / SAVEPOINT
        # still work.
        self._autocommit_engine = self._engine.execution_options(
            isolation_level="AUTOCOMMIT"
        )

        # Set up SQLite foreign keys pragma if using SQLite
        if self._engine.dialect.name == "sqlite":  # pragma: nocover

            @event.listens_for(self._engine.sync_engine, "connect")
            def set_sqlite_pragma(dbapi_conn: Any, connection_record: Any) -> None:
                cursor = dbapi_conn.cursor()
                cursor.execute("PRAGMA foreign_keys=ON")
                cursor.close()

        if self._force_rollback:
            assert self._global_transaction is None
            self._global_transaction = Transaction(
                self, force_rollback=self._force_rollback
            )
            await self._global_transaction.__aenter__()

connection() async

Get a connection from the pool. If inside a transaction, returns the transaction connection.

Source code in ormar/databases/connection.py
Python
110
111
112
113
114
115
116
117
118
119
120
121
@asynccontextmanager
async def connection(self) -> AsyncIterator[AsyncConnection]:
    """
    Get a connection from the pool.
    If inside a transaction, returns the transaction connection.
    """
    trans_conn = _transaction_connection.get()
    if trans_conn is not None:
        yield trans_conn
    else:
        async with self.engine.connect() as conn:
            yield conn

disconnect() async

Disconnect from the database and dispose of the engine. In case of force_rollback, also roll back the global transaction.

Source code in ormar/databases/connection.py
Python
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def disconnect(self) -> None:
    """
    Disconnect from the database and dispose of the engine.
    In case of force_rollback, also roll back the global transaction.
    """
    if self._engine is not None:
        if self._force_rollback:
            assert self._global_transaction is not None
            await self._global_transaction.__aexit__()
            self._global_transaction = None

        await self._engine.dispose()
        self._engine = None
        self._autocommit_engine = None

get_query_executor(*, transactional=False) async

Get connection, reusing transaction connection if in transaction.

:param transactional: If True, open an explicit transaction for the scope of the executor. Use for server-side cursors (iterate) and multi-row execute_many batches, which need a real transaction either because AUTOCOMMIT would commit each row separately or because the driver (asyncpg) requires a transaction for streaming. :type transactional: bool :return: QueryExecutor wrapping a connection :rtype: QueryExecutor

Source code in ormar/databases/connection.py
Python
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@asynccontextmanager
async def get_query_executor(
    self, *, transactional: bool = False
) -> AsyncIterator[QueryExecutor]:
    """
    Get connection, reusing transaction connection if in transaction.

    :param transactional: If True, open an explicit transaction for the
        scope of the executor. Use for server-side cursors (``iterate``)
        and multi-row ``execute_many`` batches, which need a real
        transaction either because AUTOCOMMIT would commit each row
        separately or because the driver (asyncpg) requires a
        transaction for streaming.
    :type transactional: bool
    :return: QueryExecutor wrapping a connection
    :rtype: QueryExecutor
    """
    trans_conn = self.get_transaction_connection()
    if trans_conn is not None:
        # Inside a transaction - reuse the transaction's connection
        yield QueryExecutor(trans_conn)
    elif transactional:
        async with self.transaction():
            conn = self.get_transaction_connection()
            assert conn is not None
            yield QueryExecutor(conn)
    else:
        # Outside transaction: use AUTOCOMMIT view so each statement
        # commits at the driver level with no extra BEGIN/COMMIT
        # round-trip.
        assert self._autocommit_engine is not None
        async with self._autocommit_engine.connect() as conn:
            yield QueryExecutor(conn)

get_transaction_connection()

Get the current transaction connection if in a transaction.

Source code in ormar/databases/connection.py
Python
165
166
167
def get_transaction_connection(self) -> Optional[AsyncConnection]:
    """Get the current transaction connection if in a transaction."""
    return _transaction_connection.get()

set_transaction_connection(conn)

Set the current transaction connection.

Source code in ormar/databases/connection.py
Python
169
170
171
def set_transaction_connection(self, conn: Optional[AsyncConnection]) -> None:
    """Set the current transaction connection."""
    _transaction_connection.set(conn)

transaction(force_rollback=False)

Create a transaction context manager.

:param force_rollback: If True, always rollback (used for testing)

Source code in ormar/databases/connection.py
Python
123
124
125
126
127
128
129
def transaction(self, force_rollback: bool = False) -> Transaction:
    """
    Create a transaction context manager.

    :param force_rollback: If True, always rollback (used for testing)
    """
    return Transaction(self, force_rollback=force_rollback)