summaryrefslogtreecommitdiff
path: root/doc/build/orm/extensions/asyncio.rst
blob: 281d9805b3badcfac754c45acf850e9d6ecb0d01 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
.. _asyncio_toplevel:

Asynchronous I/O (asyncio)
==========================

Support for Python asyncio.    Support for Core and ORM usage is
included, using asyncio-compatible dialects.

.. versionadded:: 1.4

.. note:: The asyncio extension as of SQLAlchemy 1.4.3 can now be considered to
   be **beta level** software. API details are subject to change however at this
   point it is unlikely for there to be significant backwards-incompatible
   changes.

.. seealso::

    :ref:`change_3414` - initial feature announcement

    :ref:`examples_asyncio` - example scripts illustrating working examples
    of Core and ORM use within the asyncio extension.

.. _asyncio_install:

Asyncio Platform Installation Notes
------------------------------------

The asyncio extension requires at least Python version 3.6. It also depends
upon the `greenlet <https://pypi.org/project/greenlet/>`_ library. This
dependency is installed by default on common machine platforms including::

    x86_64 aarch64 ppc64le amd64 win32

For the above platforms, ``greenlet`` is known to supply pre-built wheel files.
To ensure the ``greenlet`` dependency is present on other platforms, the
``[asyncio]`` extra may be installed as follows, which will include an attempt
to build and install ``greenlet``::

  pip install sqlalchemy[asyncio]


Synopsis - Core
---------------

For Core use, the :func:`_asyncio.create_async_engine` function creates an
instance of :class:`_asyncio.AsyncEngine` which then offers an async version of
the traditional :class:`_engine.Engine` API.   The
:class:`_asyncio.AsyncEngine` delivers an :class:`_asyncio.AsyncConnection` via
its :meth:`_asyncio.AsyncEngine.connect` and :meth:`_asyncio.AsyncEngine.begin`
methods which both deliver asynchronous context managers.   The
:class:`_asyncio.AsyncConnection` can then invoke statements using either the
:meth:`_asyncio.AsyncConnection.execute` method to deliver a buffered
:class:`_engine.Result`, or the :meth:`_asyncio.AsyncConnection.stream` method
to deliver a streaming server-side :class:`_asyncio.AsyncResult`::

    import asyncio

    from sqlalchemy.ext.asyncio import create_async_engine

    async def async_main():
        engine = create_async_engine(
            "postgresql+asyncpg://scott:tiger@localhost/test", echo=True,
        )

        async with engine.begin() as conn:
            await conn.run_sync(meta.drop_all)
            await conn.run_sync(meta.create_all)

            await conn.execute(
                t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
            )

        async with engine.connect() as conn:

            # select a Result, which will be delivered with buffered
            # results
            result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))

            print(result.fetchall())

        # for AsyncEngine created in function scope, close and
        # clean-up pooled connections
        await engine.dispose()

    asyncio.run(async_main())

Above, the :meth:`_asyncio.AsyncConnection.run_sync` method may be used to
invoke special DDL functions such as :meth:`_schema.MetaData.create_all` that
don't include an awaitable hook.

.. tip:: It's advisable to invoke the :meth:`_asyncio.AsyncEngine.dispose` method
   using ``await`` when using the :class:`_asyncio.AsyncEngine` object in a
   scope that will go out of context and be garbage collected, as illustrated in the
   ``async_main`` function in the above example.  This ensures that any
   connections held open by the connection pool will be properly disposed
   within an awaitable context.   Unlike when using blocking IO, SQLAlchemy
   cannot properly dispose of these connections within methods like ``__del__``
   or weakref finalizers as there is no opportunity to invoke ``await``.
   Failing to explicitly dispose of the engine when it falls out of scope
   may result in warnings emitted to standard out resembling the form
   ``RuntimeError: Event loop is closed`` within garbage collection.

The :class:`_asyncio.AsyncConnection` also features a "streaming" API via
the :meth:`_asyncio.AsyncConnection.stream` method that returns an
:class:`_asyncio.AsyncResult` object.  This result object uses a server-side
cursor and provides an async/await API, such as an async iterator::

    async with engine.connect() as conn:
        async_result = await conn.stream(select(t1))

        async for row in async_result:
            print("row: %s" % (row, ))

.. _asyncio_orm:


Synopsis - ORM
---------------

Using :term:`2.0 style` querying, the :class:`_asyncio.AsyncSession` class
provides full ORM functionality. Within the default mode of use, special care
must be taken to avoid :term:`lazy loading` or other expired-attribute access
involving ORM relationships and column attributes; the next
section :ref:`asyncio_orm_avoid_lazyloads` details this.   The example below
illustrates a complete example including mapper and session configuration::

    import asyncio

    from sqlalchemy import Column
    from sqlalchemy import DateTime
    from sqlalchemy import ForeignKey
    from sqlalchemy import func
    from sqlalchemy import Integer
    from sqlalchemy import String
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.ext.asyncio import create_async_engine
    from sqlalchemy.future import select
    from sqlalchemy.orm import declarative_base
    from sqlalchemy.orm import relationship
    from sqlalchemy.orm import selectinload
    from sqlalchemy.orm import sessionmaker

    Base = declarative_base()


    class A(Base):
        __tablename__ = "a"

        id = Column(Integer, primary_key=True)
        data = Column(String)
        create_date = Column(DateTime, server_default=func.now())
        bs = relationship("B")

        # required in order to access columns with server defaults
        # or SQL expression defaults, subsequent to a flush, without
        # triggering an expired load
        __mapper_args__ = {"eager_defaults": True}


    class B(Base):
        __tablename__ = "b"
        id = Column(Integer, primary_key=True)
        a_id = Column(ForeignKey("a.id"))
        data = Column(String)


    async def async_main():
        engine = create_async_engine(
            "postgresql+asyncpg://scott:tiger@localhost/test",
            echo=True,
        )

        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.drop_all)
            await conn.run_sync(Base.metadata.create_all)

        # expire_on_commit=False will prevent attributes from being expired
        # after commit.
        async_session = sessionmaker(
            engine, expire_on_commit=False, class_=AsyncSession
        )

        async with async_session() as session:
            async with session.begin():
                session.add_all(
                    [
                        A(bs=[B(), B()], data="a1"),
                        A(bs=[B()], data="a2"),
                        A(bs=[B(), B()], data="a3"),
                    ]
                )

            stmt = select(A).options(selectinload(A.bs))

            result = await session.execute(stmt)

            for a1 in result.scalars():
                print(a1)
                print(f"created at: {a1.create_date}")
                for b1 in a1.bs:
                    print(b1)

            result = await session.execute(select(A).order_by(A.id))

            a1 = result.scalars().first()

            a1.data = "new data"

            await session.commit()

            # access attribute subsequent to commit; this is what
            # expire_on_commit=False allows
            print(a1.data)

        # for AsyncEngine created in function scope, close and
        # clean-up pooled connections
        await engine.dispose()


    asyncio.run(async_main())

In the example above, the :class:`_asyncio.AsyncSession` is instantiated using
the optional :class:`_orm.sessionmaker` helper, and associated with an
:class:`_asyncio.AsyncEngine` against particular database URL. It is
then used in a Python asynchronous context manager (i.e. ``async with:``
statement) so that it is automatically closed at the end of the block; this is
equivalent to calling the :meth:`_asyncio.AsyncSession.close` method.

.. _asyncio_orm_avoid_lazyloads:

Preventing Implicit IO when Using AsyncSession
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Using traditional asyncio, the application needs to avoid any points at which
IO-on-attribute access may occur. Above, the following measures are taken to
prevent this:

* The :func:`_orm.selectinload` eager loader is employed in order to eagerly
  load the ``A.bs`` collection within the scope of the
  ``await session.execute()`` call::

      stmt = select(A).options(selectinload(A.bs))

  ..

  If the default loader strategy of "lazyload" were left in place, the access
  of the ``A.bs`` attribute would raise an asyncio exception.
  There are a variety of ORM loader options available, which may be configured
  at the default mapping level or used on a per-query basis, documented at
  :ref:`loading_toplevel`.


* The :class:`_asyncio.AsyncSession` is configured using
  :paramref:`_orm.Session.expire_on_commit` set to False, so that we may access
  attributes on an object subsequent to a call to
  :meth:`_asyncio.AsyncSession.commit`, as in the line at the end where we
  access an attribute::

      # create AsyncSession with expire_on_commit=False
      async_session = AsyncSession(engine, expire_on_commit=False)

      # sessionmaker version
      async_session = sessionmaker(
          engine, expire_on_commit=False, class_=AsyncSession
      )

      async with async_session() as session:

          result = await session.execute(select(A).order_by(A.id))

          a1 = result.scalars().first()

          # commit would normally expire all attributes
          await session.commit()

          # access attribute subsequent to commit; this is what
          # expire_on_commit=False allows
          print(a1.data)

* The :paramref:`_schema.Column.server_default` value on the ``created_at``
  column will not be refreshed by default after an INSERT; instead, it is
  normally
  :ref:`expired so that it can be loaded when needed <orm_server_defaults>`.
  Similar behavior applies to a column where the
  :paramref:`_schema.Column.default` parameter is assigned to a SQL expression
  object. To access this value with asyncio, it has to be refreshed within the
  flush process, which is achieved by setting the
  :paramref:`_orm.mapper.eager_defaults` parameter on the mapping::


    class A(Base):
        # ...

        # column with a server_default, or SQL expression default
        create_date = Column(DateTime, server_default=func.now())

        # add this so that it can be accessed
        __mapper_args__ = {"eager_defaults": True}

Other guidelines include:

* Methods like :meth:`_asyncio.AsyncSession.expire` should be avoided in favor of
  :meth:`_asyncio.AsyncSession.refresh`

* Avoid using the ``all`` cascade option documented at :ref:`unitofwork_cascades`
  in favor of listing out the desired cascade features explicitly.   The
  ``all`` cascade option implies among others the :ref:`cascade_refresh_expire`
  setting, which means that the :meth:`.AsyncSession.refresh` method will
  expire the attributes on related objects, but not necessarily refresh those
  related objects assuming eager loading is not configured within the
  :func:`_orm.relationship`, leaving them in an expired state.   A future
  release may introduce the ability to indicate eager loader options when
  invoking :meth:`.Session.refresh` and/or :meth:`.AsyncSession.refresh`.

* Appropriate loader options should be employed for :func:`_orm.deferred`
  columns, if used at all, in addition to that of :func:`_orm.relationship`
  constructs as noted above.  See :ref:`deferred` for background on
  deferred column loading.

* The "dynamic" relationship loader strategy described at
  :ref:`dynamic_relationship` is not compatible with the asyncio approach and
  cannot be used, unless invoked within the
  :meth:`_asyncio.AsyncSession.run_sync` method described at
  :ref:`session_run_sync`.

.. _session_run_sync:

Running Synchronous Methods and Functions under asyncio
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. deepalchemy::  This approach is essentially exposing publicly the
   mechanism by which SQLAlchemy is able to provide the asyncio interface
   in the first place.   While there is no technical issue with doing so, overall
   the approach can probably be considered "controversial" as it works against
   some of the central philosophies of the asyncio programming model, which
   is essentially that any programming statement that can potentially result
   in IO being invoked **must** have an ``await`` call, lest the program
   does not make it explicitly clear every line at which IO may occur.
   This approach does not change that general idea, except that it allows
   a series of synchronous IO instructions to be exempted from this rule
   within the scope of a function call, essentially bundled up into a single
   awaitable.

As an alternative means of integrating traditional SQLAlchemy "lazy loading"
within an asyncio event loop, an **optional** method known as
:meth:`_asyncio.AsyncSession.run_sync` is provided which will run any
Python function inside of a greenlet, where traditional synchronous
programming concepts will be translated to use ``await`` when they reach the
database driver.   A hypothetical approach here is an asyncio-oriented
application can package up database-related methods into functions that are
invoked using :meth:`_asyncio.AsyncSession.run_sync`.

Altering the above example, if we didn't use :func:`_orm.selectinload`
for the ``A.bs`` collection, we could accomplish our treatment of these
attribute accesses within a separate function::

    import asyncio

    from sqlalchemy.ext.asyncio import create_async_engine
    from sqlalchemy.ext.asyncio import AsyncSession

    def fetch_and_update_objects(session):
        """run traditional sync-style ORM code in a function that will be
        invoked within an awaitable.

        """

        # the session object here is a traditional ORM Session.
        # all features are available here including legacy Query use.

        stmt = select(A)

        result = session.execute(stmt)
        for a1 in result.scalars():
            print(a1)

            # lazy loads
            for b1 in a1.bs:
                print(b1)

        # legacy Query use
        a1 = session.query(A).order_by(A.id).first()

        a1.data = "new data"


    async def async_main():
        engine = create_async_engine(
            "postgresql+asyncpg://scott:tiger@localhost/test", echo=True,
        )
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.drop_all)
            await conn.run_sync(Base.metadata.create_all)

        async with AsyncSession(engine) as session:
            async with session.begin():
                session.add_all(
                    [
                        A(bs=[B(), B()], data="a1"),
                        A(bs=[B()], data="a2"),
                        A(bs=[B(), B()], data="a3"),
                    ]
                )

            await session.run_sync(fetch_and_update_objects)

            await session.commit()

        # for AsyncEngine created in function scope, close and
        # clean-up pooled connections
        await engine.dispose()

    asyncio.run(async_main())

The above approach of running certain functions within a "sync" runner
has some parallels to an application that runs a SQLAlchemy application
on top of an event-based programming library such as ``gevent``.  The
differences are as follows:

1. unlike when using ``gevent``, we can continue to use the standard Python
   asyncio event loop, or any custom event loop, without the need to integrate
   into the ``gevent`` event loop.

2. There is no "monkeypatching" whatsoever.   The above example makes use of
   a real asyncio driver and the underlying SQLAlchemy connection pool is also
   using the Python built-in ``asyncio.Queue`` for pooling connections.

3. The program can freely switch between async/await code and contained
   functions that use sync code with virtually no performance penalty.  There
   is no "thread executor" or any additional waiters or synchronization in use.

4. The underlying network drivers are also using pure Python asyncio
   concepts, no third party networking libraries as ``gevent`` and ``eventlet``
   provides are in use.

Using multiple asyncio event loops
----------------------------------

An application that makes use of multiple event loops, for example by combining asyncio
with multithreading, should not share the same :class:`_asyncio.AsyncEngine`
with different event loops when using the default pool implementation.

If an :class:`_asyncio.AsyncEngine` is be passed from one event loop to another,
the method :meth:`_asyncio.AsyncEngine.dispose()` should be called before it's
re-used on a new event loop. Failing to do so may lead to a ``RuntimeError``
along the lines of
``Task <Task pending ...> got Future attached to a different loop``

If the same engine must be shared between different loop, it should be configured
to disable pooling using :class:`~sqlalchemy.pool.NullPool`, preventing the Engine
from using any connection more than once::

    from sqlalchemy.pool import NullPool
    engine = create_async_engine(
        "postgresql+asyncpg://user:pass@host/dbname", poolclass=NullPool
    )


.. _asyncio_scoped_session:

Using asyncio scoped session
----------------------------

The usage of :class:`_asyncio.async_scoped_session` is mostly similar to
:class:`.scoped_session`. However, since there's no "thread-local" concept in
the asyncio context, the "scopefunc" paramater must be provided to the
constructor::

    from asyncio import current_task

    from sqlalchemy.orm import sessionmaker
    from sqlalchemy.ext.asyncio import async_scoped_session
    from sqlalchemy.ext.asyncio import AsyncSession

    async_session_factory = sessionmaker(some_async_engine, class_=_AsyncSession)
    AsyncSession = async_scoped_session(async_session_factory, scopefunc=current_task)

    some_async_session = AsyncSession()

:class:`_asyncio.async_scoped_session` also includes **proxy
behavior** similar to that of :class:`.scoped_session`, which means it can be
treated as a :class:`_asyncio.AsyncSession` directly, keeping in mind that
the usual ``await`` keywords are necessary, including for the
:meth:`_asyncio.async_scoped_session.remove` method::

    async def some_function(some_async_session, some_object):
       # use the AsyncSession directly
       some_async_session.add(some_object)

       # use the AsyncSession via the context-local proxy
       await AsyncSession.commit()

       # "remove" the current proxied AsyncSession for the local context
       await AsyncSession.remove()

.. versionadded:: 1.4.19

.. currentmodule:: sqlalchemy.ext.asyncio


.. _asyncio_inspector:

Using the Inspector to inspect schema objects
---------------------------------------------------

SQLAlchemy does not yet offer an asyncio version of the
:class:`_reflection.Inspector` (introduced at :ref:`metadata_reflection_inspector`),
however the existing interface may be used in an asyncio context by
leveraging the :meth:`_asyncio.AsyncConnection.run_sync` method of
:class:`_asyncio.AsyncConnection`::

    import asyncio

    from sqlalchemy.ext.asyncio import create_async_engine
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy import inspect

    engine = create_async_engine(
      "postgresql+asyncpg://scott:tiger@localhost/test"
    )

    def use_inspector(conn):
        inspector = inspect(conn)
        # use the inspector
        print(inspector.get_view_names())
        # return any value to the caller
        return inspector.get_table_names()

    async def async_main():
        async with engine.connect() as conn:
            tables = await conn.run_sync(use_inspector)

    asyncio.run(async_main())

.. seealso::

    :ref:`metadata_reflection`

    :ref:`inspection_toplevel`

Engine API Documentation
-------------------------

.. autofunction:: create_async_engine

.. autoclass:: AsyncEngine
   :members:

.. autoclass:: AsyncConnection
   :members:

.. autoclass:: AsyncTransaction
   :members:

Result Set API Documentation
----------------------------------

The :class:`_asyncio.AsyncResult` object is an async-adapted version of the
:class:`_result.Result` object.  It is only returned when using the
:meth:`_asyncio.AsyncConnection.stream` or :meth:`_asyncio.AsyncSession.stream`
methods, which return a result object that is on top of an active database
cursor.

.. autoclass:: AsyncResult
   :members:

.. autoclass:: AsyncScalarResult
   :members:

.. autoclass:: AsyncMappingResult
   :members:

ORM Session API Documentation
-----------------------------

.. autofunction:: async_object_session

.. autofunction:: async_session

.. autoclass:: async_scoped_session
   :members:
   :inherited-members:

.. autoclass:: AsyncSession
   :members:

.. autoclass:: AsyncSessionTransaction
   :members: