summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-08-23 09:28:06 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-08-24 13:02:23 -0400
commit776abf43d7404a3fa165588fd1e1e2d5ef9a9f04 (patch)
tree135f6055d98c0a956f32378d53d6ea6c6a358ad9
parent27bf1c1c287debb69c4644bf6dc35e3bad5470ad (diff)
downloadsqlalchemy-776abf43d7404a3fa165588fd1e1e2d5ef9a9f04.tar.gz
integrate connection.terminate() for supporting dialects
Integrated support for asyncpg's ``terminate()`` method call for cases where the connection pool is recycling a possibly timed-out connection, where a connection is being garbage collected that wasn't gracefully closed, as well as when the connection has been invalidated. This allows asyncpg to abandon the connection without waiting for a response that may incur long timeouts. Fixes: #8419 Change-Id: Ia575af779d5733b483a72dff3690b8bbbad2bb05
-rw-r--r--doc/build/changelog/unreleased_14/8419.rst10
-rw-r--r--lib/sqlalchemy/dialects/postgresql/asyncpg.py7
-rw-r--r--lib/sqlalchemy/engine/default.py5
-rw-r--r--lib/sqlalchemy/engine/interfaces.py21
-rw-r--r--lib/sqlalchemy/pool/base.py38
-rw-r--r--test/engine/test_logging.py2
-rw-r--r--test/engine/test_pool.py20
7 files changed, 87 insertions, 16 deletions
diff --git a/doc/build/changelog/unreleased_14/8419.rst b/doc/build/changelog/unreleased_14/8419.rst
new file mode 100644
index 000000000..a095d858d
--- /dev/null
+++ b/doc/build/changelog/unreleased_14/8419.rst
@@ -0,0 +1,10 @@
+.. change::
+ :tags: bug, asyncio
+ :tickets: 8419
+
+ Integrated support for asyncpg's ``terminate()`` method call for cases
+ where the connection pool is recycling a possibly timed-out connection,
+ where a connection is being garbage collected that wasn't gracefully
+ closed, as well as when the connection has been invalidated. This allows
+ asyncpg to abandon the connection without waiting for a response that may
+ incur long timeouts.
diff --git a/lib/sqlalchemy/dialects/postgresql/asyncpg.py b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
index 6888959f0..a84bece4f 100644
--- a/lib/sqlalchemy/dialects/postgresql/asyncpg.py
+++ b/lib/sqlalchemy/dialects/postgresql/asyncpg.py
@@ -793,6 +793,9 @@ class AsyncAdapt_asyncpg_connection(AdaptedConnection):
self.await_(self._connection.close())
+ def terminate(self):
+ self._connection.terminate()
+
class AsyncAdaptFallback_asyncpg_connection(AsyncAdapt_asyncpg_connection):
__slots__ = ()
@@ -895,6 +898,7 @@ class PGDialect_asyncpg(PGDialect):
supports_server_side_cursors = True
render_bind_cast = True
+ has_terminate = True
default_paramstyle = "format"
supports_sane_multi_rowcount = False
@@ -981,6 +985,9 @@ class PGDialect_asyncpg(PGDialect):
def get_deferrable(self, connection):
return connection.deferrable
+ def do_terminate(self, dbapi_connection) -> None:
+ dbapi_connection.terminate()
+
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py
index 80e687c32..9ad0ebbfc 100644
--- a/lib/sqlalchemy/engine/default.py
+++ b/lib/sqlalchemy/engine/default.py
@@ -237,6 +237,8 @@ class DefaultDialect(Dialect):
is_async = False
+ has_terminate = False
+
# TODO: this is not to be part of 2.0. implement rudimentary binary
# literals for SQLite, PostgreSQL, MySQL only within
# _Binary.literal_processor
@@ -620,6 +622,9 @@ class DefaultDialect(Dialect):
def do_commit(self, dbapi_connection):
dbapi_connection.commit()
+ def do_terminate(self, dbapi_connection):
+ self.do_close(dbapi_connection)
+
def do_close(self, dbapi_connection):
dbapi_connection.close()
diff --git a/lib/sqlalchemy/engine/interfaces.py b/lib/sqlalchemy/engine/interfaces.py
index 778c07592..01b266d68 100644
--- a/lib/sqlalchemy/engine/interfaces.py
+++ b/lib/sqlalchemy/engine/interfaces.py
@@ -966,6 +966,10 @@ class Dialect(EventTarget):
is_async: bool
"""Whether or not this dialect is intended for asyncio use."""
+ has_terminate: bool
+ """Whether or not this dialect has a separate "terminate" implementation
+ that does not block or require awaiting."""
+
engine_config_types: Mapping[str, Any]
"""a mapping of string keys that can be in an engine config linked to
type conversion functions.
@@ -1784,6 +1788,23 @@ class Dialect(EventTarget):
raise NotImplementedError()
+ def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
+ """Provide an implementation of ``connection.close()`` that tries as
+ much as possible to not block, given a DBAPI
+ connection.
+
+ In the vast majority of cases this just calls .close(), however
+ for some asyncio dialects may call upon different API features.
+
+ This hook is called by the :class:`_pool.Pool`
+ when a connection is being recycled or has been invalidated.
+
+ .. versionadded:: 1.4.41
+
+ """
+
+ raise NotImplementedError()
+
def do_close(self, dbapi_connection: DBAPIConnection) -> None:
"""Provide an implementation of ``connection.close()``, given a DBAPI
connection.
diff --git a/lib/sqlalchemy/pool/base.py b/lib/sqlalchemy/pool/base.py
index 51eb5e9f5..41f2f03b2 100644
--- a/lib/sqlalchemy/pool/base.py
+++ b/lib/sqlalchemy/pool/base.py
@@ -72,6 +72,7 @@ class _ConnDialect:
"""
is_async = False
+ has_terminate = False
def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
dbapi_connection.rollback()
@@ -79,6 +80,9 @@ class _ConnDialect:
def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
dbapi_connection.commit()
+ def do_terminate(self, dbapi_connection: DBAPIConnection) -> None:
+ dbapi_connection.close()
+
def do_close(self, dbapi_connection: DBAPIConnection) -> None:
dbapi_connection.close()
@@ -310,10 +314,19 @@ class Pool(log.Identified, event.EventTarget):
creator_fn = cast(_CreatorFnType, creator)
return lambda rec: creator_fn()
- def _close_connection(self, connection: DBAPIConnection) -> None:
- self.logger.debug("Closing connection %r", connection)
+ def _close_connection(
+ self, connection: DBAPIConnection, *, terminate: bool = False
+ ) -> None:
+ self.logger.debug(
+ "%s connection %r",
+ "Hard-closing" if terminate else "Closing",
+ connection,
+ )
try:
- self._dialect.do_close(connection)
+ if terminate:
+ self._dialect.do_terminate(connection)
+ else:
+ self._dialect.do_close(connection)
except Exception:
self.logger.error(
"Exception closing connection %r", connection, exc_info=True
@@ -742,7 +755,7 @@ class _ConnectionRecord(ConnectionPoolEntry):
if soft:
self._soft_invalidate_time = time.time()
else:
- self.__close()
+ self.__close(terminate=True)
self.dbapi_connection = None
def get_connection(self) -> DBAPIConnection:
@@ -789,7 +802,7 @@ class _ConnectionRecord(ConnectionPoolEntry):
recycle = True
if recycle:
- self.__close()
+ self.__close(terminate=True)
self.info.clear() # type: ignore # our info is always present
self.__connect()
@@ -804,12 +817,14 @@ class _ConnectionRecord(ConnectionPoolEntry):
or (self._soft_invalidate_time > self.starttime)
)
- def __close(self) -> None:
+ def __close(self, *, terminate: bool = False) -> None:
self.finalize_callback.clear()
if self.__pool.dispatch.close:
self.__pool.dispatch.close(self.dbapi_connection, self)
assert self.dbapi_connection is not None
- self.__pool._close_connection(self.dbapi_connection)
+ self.__pool._close_connection(
+ self.dbapi_connection, terminate=terminate
+ )
self.dbapi_connection = None
def __connect(self) -> None:
@@ -877,7 +892,9 @@ def _finalize_fairy(
dbapi_connection = connection_record.dbapi_connection
# null pool is not _is_asyncio but can be used also with async dialects
- dont_restore_gced = pool._dialect.is_async
+ dont_restore_gced = (
+ pool._dialect.is_async and not pool._dialect.has_terminate
+ )
if dont_restore_gced:
detach = connection_record is None or is_gc_cleanup
@@ -923,8 +940,9 @@ def _finalize_fairy(
message = (
"The garbage collector is trying to clean up "
f"connection {dbapi_connection!r}. This feature is "
- "unsupported on async "
- "dbapi, since no IO can be performed at this stage to "
+ "unsupported on asyncio "
+ 'dbapis that lack a "terminate" feature, since no '
+ "IO can be performed at this stage to "
"reset the connection. Please close out all "
"connections when they are no longer used, calling "
"``close()`` or using a context manager to "
diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py
index c6fd85684..38e1c436c 100644
--- a/test/engine/test_logging.py
+++ b/test/engine/test_logging.py
@@ -453,7 +453,7 @@ class PoolLoggingTest(fixtures.TestBase):
"Connection %r checked out from pool",
"Connection %r being returned to pool%s",
"Connection %s rollback-on-return",
- "Closing connection %r",
+ "%s connection %r",
]
+ (["Pool disposed. %s"] if dispose else []),
)
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 2bbb976a8..473d462a3 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -87,10 +87,13 @@ class PoolTestBase(fixtures.TestBase):
def _queuepool_dbapi_fixture(self, **kw):
dbapi = MockDBAPI()
_is_asyncio = kw.pop("_is_asyncio", False)
+ _has_terminate = kw.pop("_has_terminate", False)
p = pool.QueuePool(creator=lambda: dbapi.connect("foo.db"), **kw)
if _is_asyncio:
p._is_asyncio = True
p._dialect = _AsyncConnDialect()
+ if _has_terminate:
+ p._dialect.has_terminate = True
return dbapi, p
@@ -445,8 +448,10 @@ class PoolEventsTest(PoolTestBase):
return p, canary
- def _checkin_event_fixture(self, _is_asyncio=False):
- p = self._queuepool_fixture(_is_asyncio=_is_asyncio)
+ def _checkin_event_fixture(self, _is_asyncio=False, _has_terminate=False):
+ p = self._queuepool_fixture(
+ _is_asyncio=_is_asyncio, _has_terminate=_has_terminate
+ )
canary = []
@event.listens_for(p, "checkin")
@@ -721,9 +726,12 @@ class PoolEventsTest(PoolTestBase):
assert canary.call_args_list[0][0][0] is dbapi_con
assert canary.call_args_list[0][0][2] is exc
- @testing.combinations((True,), (False,))
- def test_checkin_event_gc(self, detach_gced):
- p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced)
+ @testing.combinations((True,), (False,), argnames="is_asyncio")
+ @testing.combinations((True,), (False,), argnames="has_terminate")
+ def test_checkin_event_gc(self, is_asyncio, has_terminate):
+ p, canary = self._checkin_event_fixture(
+ _is_asyncio=is_asyncio, _has_terminate=has_terminate
+ )
c1 = p.connect()
@@ -733,6 +741,8 @@ class PoolEventsTest(PoolTestBase):
del c1
lazy_gc()
+ detach_gced = is_asyncio and not has_terminate
+
if detach_gced:
# "close_detached" is not called because for asyncio the
# connection is just lost.