summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFederico Caselli <cfederico87@gmail.com>2021-01-09 13:25:55 +0100
committerFederico Caselli <cfederico87@gmail.com>2021-01-21 21:42:58 +0100
commite56534995de2a97210d9c3d58183e8d245cdae94 (patch)
tree8b820ef993bb4157b107322a6bba8f3c2d78961d
parent851a3a362ee5e05b8438f92e2e1df63c68f79d68 (diff)
downloadsqlalchemy-e56534995de2a97210d9c3d58183e8d245cdae94.tar.gz
Fix a couple of bugs in the asyncio implementation
Log an informative message if a connection is not closed and the gc is reclaiming it when using an async dpapi, that does not support running IO at that stage. The ``AsyncAdaptedQueue`` used by default on async dpapis should instantiate a queue only when it's first used to avoid binding it to a possibly wrong event loop. Fixes: #5823 Change-Id: Ibfc50e209b1937ae3d6599ae7997f028c7a92c33
-rw-r--r--doc/build/changelog/unreleased_14/5823.rst4
-rw-r--r--doc/build/orm/extensions/asyncio.rst23
-rw-r--r--lib/sqlalchemy/ext/asyncio/engine.py1
-rw-r--r--lib/sqlalchemy/pool/base.py33
-rw-r--r--lib/sqlalchemy/pool/impl.py6
-rw-r--r--lib/sqlalchemy/util/queue.py32
-rw-r--r--test/base/test_concurrency_py3k.py50
-rw-r--r--test/engine/test_pool.py26
8 files changed, 146 insertions, 29 deletions
diff --git a/doc/build/changelog/unreleased_14/5823.rst b/doc/build/changelog/unreleased_14/5823.rst
index 74debdaa9..e5f43db45 100644
--- a/doc/build/changelog/unreleased_14/5823.rst
+++ b/doc/build/changelog/unreleased_14/5823.rst
@@ -10,4 +10,6 @@
the connection including transaction rollback or connection close as this
will often be outside of the event loop.
-
+ The ``AsyncAdaptedQueue`` used by default on async dpapis
+ should instantiate a queue only when it's first used
+ to avoid binding it to a possibly wrong event loop.
diff --git a/doc/build/orm/extensions/asyncio.rst b/doc/build/orm/extensions/asyncio.rst
index aed01678a..2fa274fcd 100644
--- a/doc/build/orm/extensions/asyncio.rst
+++ b/doc/build/orm/extensions/asyncio.rst
@@ -255,6 +255,29 @@ differences are as follows:
concepts, no third party networking libraries as ``gevent`` and ``eventlet``
provides are in use.
+Using multiple asyncio event loops
+----------------------------------
+
+An application that makes use of multiple event loops, for example by combining asyncio
+with multithreading, should not share the same :class:`_asyncio.AsyncEngine`
+with different event loops when using the default pool implementation.
+
+If an :class:`_asyncio.AsyncEngine` is be passed from one event loop to another,
+the method :meth:`_asyncio.AsyncEngine.dispose()` should be called before it's
+re-used on a new event loop. Failing to do so may lead to a ``RuntimeError``
+along the lines of
+``Task <Task pending ...> got Future attached to a different loop``
+
+If the same engine must be shared between different loop, it should be configured
+to disable pooling using :class:`~sqlalchemy.pool.NullPool`, preventing the Engine
+from using any connection more than once:
+
+ from sqlalchemy.pool import NullPool
+ engine = create_async_engine(
+ "postgresql+asyncpg://user:pass@host/dbname", poolclass=NullPool
+ )
+
+
.. currentmodule:: sqlalchemy.ext.asyncio
Engine API Documentation
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py
index 829e89b71..aa7e60dfb 100644
--- a/lib/sqlalchemy/ext/asyncio/engine.py
+++ b/lib/sqlalchemy/ext/asyncio/engine.py
@@ -494,7 +494,6 @@ class AsyncEngine(ProxyComparable, AsyncConnectable):
.. versionadded:: 1.4
-
""" # noqa
# AsyncEngine is a thin proxy; no state should be added here
diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py
index 47d9e2cba..9b4e61fc3 100644
--- a/lib/sqlalchemy/pool/base.py
+++ b/lib/sqlalchemy/pool/base.py
@@ -26,7 +26,6 @@ reset_none = util.symbol("reset_none")
class _ConnDialect(object):
-
"""partial implementation of :class:`.Dialect`
which provides DBAPI connection methods.
@@ -36,6 +35,8 @@ class _ConnDialect(object):
"""
+ is_async = False
+
def do_rollback(self, dbapi_connection):
dbapi_connection.rollback()
@@ -606,11 +607,20 @@ class _ConnectionRecord(object):
def _finalize_fairy(
- connection, connection_record, pool, ref, echo, fairy=None
+ connection,
+ connection_record,
+ pool,
+ ref, # this is None when called directly, not by the gc
+ echo,
+ fairy=None,
):
"""Cleanup for a :class:`._ConnectionFairy` whether or not it's already
been garbage collected.
+ When using an async dialect no IO can happen here (without using
+ a dedicated thread), since this is called outside the greenlet
+ context and with an already running loop. In this case function
+ will only log a message and raise a warning.
"""
if ref:
@@ -624,7 +634,8 @@ def _finalize_fairy(
assert connection is None
connection = connection_record.connection
- dont_restore_gced = pool._is_asyncio
+ # null pool is not _is_asyncio but can be used also with async dialects
+ dont_restore_gced = pool._dialect.is_async
if dont_restore_gced:
detach = not connection_record or ref
@@ -658,11 +669,17 @@ def _finalize_fairy(
pool._close_connection(connection)
else:
- util.warn(
- "asyncio connection is being garbage "
- "collected without being properly closed: %r"
- % connection
- )
+ message = (
+ "The garbage collector is trying to clean up "
+ "connection %r. This feature is unsupported on async "
+ "dbapi, since no IO can be performed at this stage to "
+ "reset the connection. Please close out all "
+ "connections when they are no longer used, calling "
+ "``close()`` or using a context manager to "
+ "manage their lifetime."
+ ) % connection
+ pool.logger.error(message)
+ util.warn(message)
except BaseException as e:
pool.logger.error(
diff --git a/lib/sqlalchemy/pool/impl.py b/lib/sqlalchemy/pool/impl.py
index 825ac0307..08371a31a 100644
--- a/lib/sqlalchemy/pool/impl.py
+++ b/lib/sqlalchemy/pool/impl.py
@@ -13,6 +13,7 @@
import traceback
import weakref
+from .base import _ConnDialect
from .base import _ConnectionFairy
from .base import _ConnectionRecord
from .base import Pool
@@ -221,9 +222,14 @@ class QueuePool(Pool):
return self._pool.maxsize - self._pool.qsize() + self._overflow
+class _AsyncConnDialect(_ConnDialect):
+ is_async = True
+
+
class AsyncAdaptedQueuePool(QueuePool):
_is_asyncio = True
_queue_class = sqla_queue.AsyncAdaptedQueue
+ _dialect = _AsyncConnDialect()
class FallbackAsyncAdaptedQueuePool(AsyncAdaptedQueuePool):
diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py
index ca5a3abde..30e388248 100644
--- a/lib/sqlalchemy/util/queue.py
+++ b/lib/sqlalchemy/util/queue.py
@@ -26,6 +26,7 @@ from .compat import threading
from .concurrency import asyncio
from .concurrency import await_fallback
from .concurrency import await_only
+from .langhelpers import memoized_property
__all__ = ["Empty", "Full", "Queue"]
@@ -206,15 +207,32 @@ class AsyncAdaptedQueue:
await_ = staticmethod(await_only)
def __init__(self, maxsize=0, use_lifo=False):
- if use_lifo:
- self._queue = asyncio.LifoQueue(maxsize=maxsize)
- else:
- self._queue = asyncio.Queue(maxsize=maxsize)
self.use_lifo = use_lifo
self.maxsize = maxsize
- self.empty = self._queue.empty
- self.full = self._queue.full
- self.qsize = self._queue.qsize
+
+ def empty(self):
+ return self._queue.empty()
+
+ def full(self):
+ return self._queue.full()
+
+ def qsize(self):
+ return self._queue.qsize()
+
+ @memoized_property
+ def _queue(self):
+ # Delay creation of the queue until it is first used, to avoid
+ # binding it to a possibly wrong event loop.
+ # By delaying the creation of the pool we accommodate the common
+ # usage pattern of instanciating the engine at module level, where a
+ # different event loop is in present compared to when the application
+ # is actually run.
+
+ if self.use_lifo:
+ queue = asyncio.LifoQueue(maxsize=self.maxsize)
+ else:
+ queue = asyncio.Queue(maxsize=self.maxsize)
+ return queue
def put_nowait(self, item):
try:
diff --git a/test/base/test_concurrency_py3k.py b/test/base/test_concurrency_py3k.py
index e7ae8c9ad..8eabece92 100644
--- a/test/base/test_concurrency_py3k.py
+++ b/test/base/test_concurrency_py3k.py
@@ -1,12 +1,18 @@
+import threading
+
from sqlalchemy import exc
from sqlalchemy import testing
from sqlalchemy.testing import async_test
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import is_true
+from sqlalchemy.util import asyncio
from sqlalchemy.util import await_fallback
from sqlalchemy.util import await_only
from sqlalchemy.util import greenlet_spawn
+from sqlalchemy.util import queue
try:
from greenlet import greenlet
@@ -152,3 +158,47 @@ class TestAsyncioCompat(fixtures.TestBase):
"The current operation required an async execution but none was",
):
await greenlet_spawn(run, _require_await=True)
+
+
+class TestAsyncAdaptedQueue(fixtures.TestBase):
+ def test_lazy_init(self):
+ run = [False]
+
+ def thread_go(q):
+ def go():
+ q.get(timeout=0.1)
+
+ with expect_raises(queue.Empty):
+ asyncio.run(greenlet_spawn(go))
+ run[0] = True
+
+ t = threading.Thread(
+ target=thread_go, args=[queue.AsyncAdaptedQueue()]
+ )
+ t.start()
+ t.join()
+
+ is_true(run[0])
+
+ def test_error_other_loop(self):
+ run = [False]
+
+ def thread_go(q):
+ def go():
+ eq_(q.get(block=False), 1)
+ q.get(timeout=0.1)
+
+ with expect_raises_message(
+ RuntimeError, "Task .* attached to a different loop"
+ ):
+ asyncio.run(greenlet_spawn(go))
+
+ run[0] = True
+
+ q = queue.AsyncAdaptedQueue()
+ q.put_nowait(1)
+ t = threading.Thread(target=thread_go, args=[q])
+ t.start()
+ t.join()
+
+ is_true(run[0])
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index b43a29fae..f29373e95 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -10,6 +10,7 @@ from sqlalchemy import pool
from sqlalchemy import select
from sqlalchemy import testing
from sqlalchemy.engine import default
+from sqlalchemy.pool.impl import _AsyncConnDialect
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_context_ok
from sqlalchemy.testing import assert_raises_message
@@ -89,10 +90,12 @@ class PoolTestBase(fixtures.TestBase):
def _queuepool_dbapi_fixture(self, **kw):
dbapi = MockDBAPI()
- return (
- dbapi,
- pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw),
- )
+ _is_asyncio = kw.pop("_is_asyncio", False)
+ p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw)
+ if _is_asyncio:
+ p._is_asyncio = True
+ p._dialect = _AsyncConnDialect()
+ return dbapi, p
class PoolTest(PoolTestBase):
@@ -283,6 +286,8 @@ class PoolDialectTest(PoolTestBase):
canary = []
class PoolDialect(object):
+ is_async = False
+
def do_rollback(self, dbapi_connection):
canary.append("R")
dbapi_connection.rollback()
@@ -361,8 +366,8 @@ class PoolEventsTest(PoolTestBase):
return p, canary
- def _checkin_event_fixture(self):
- p = self._queuepool_fixture()
+ def _checkin_event_fixture(self, _is_asyncio=False):
+ p = self._queuepool_fixture(_is_asyncio=_is_asyncio)
canary = []
@event.listens_for(p, "checkin")
@@ -639,10 +644,7 @@ class PoolEventsTest(PoolTestBase):
@testing.combinations((True, testing.requires.python3), (False,))
def test_checkin_event_gc(self, detach_gced):
- p, canary = self._checkin_event_fixture()
-
- if detach_gced:
- p._is_asyncio = True
+ p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced)
c1 = p.connect()
@@ -1517,11 +1519,11 @@ class QueuePoolTest(PoolTestBase):
@testing.combinations((True, testing.requires.python3), (False,))
def test_userspace_disconnectionerror_weakref_finalizer(self, detach_gced):
dbapi, pool = self._queuepool_dbapi_fixture(
- pool_size=1, max_overflow=2
+ pool_size=1, max_overflow=2, _is_asyncio=detach_gced
)
if detach_gced:
- pool._is_asyncio = True
+ pool._dialect.is_async = True
@event.listens_for(pool, "checkout")
def handle_checkout_event(dbapi_con, con_record, con_proxy):