summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/ext/asyncio/engine.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-10-08 15:20:48 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2020-10-10 01:17:25 -0400
commit2665a0c4cb3e94e6545d0b9bbcbcc39ccffebaba (patch)
treeed25383ce7e5899d7d643a11df0f8aee9f2ab959 /lib/sqlalchemy/ext/asyncio/engine.py
parentbcc17b1d6e2cac3b0e45c0b17a62cf2d5fc5c5ab (diff)
downloadsqlalchemy-2665a0c4cb3e94e6545d0b9bbcbcc39ccffebaba.tar.gz
generalize scoped_session proxying and apply to asyncio elements
Reworked the proxy creation used by scoped_session() to be based on fully copied code with augmented docstrings and moved it into langhelpers. asyncio session, engine, connection can now take advantage of it so that all non-async methods are availble. Overall implementation of most important accessors / methods on AsyncConnection, etc. , including awaitable versions of invalidate, execution_options, etc. In order to support an event dispatcher on the async classes while still allowing them to hold __slots__, make some adjustments to the event system to allow that to be present, at least rudimentally. Fixes: #5628 Change-Id: I5eb6929fc1e4fdac99e4b767dcfd49672d56e2b2
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/engine.py')
-rw-r--r--lib/sqlalchemy/ext/asyncio/engine.py153
1 files changed, 140 insertions, 13 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py
index 4a92fb1f2..9e4851dfc 100644
--- a/lib/sqlalchemy/ext/asyncio/engine.py
+++ b/lib/sqlalchemy/ext/asyncio/engine.py
@@ -8,12 +8,11 @@ from .base import StartableContext
from .result import AsyncResult
from ... import exc
from ... import util
-from ...engine import Connection
from ...engine import create_engine as _create_engine
-from ...engine import Engine
from ...engine import Result
from ...engine import Transaction
-from ...engine.base import OptionEngineMixin
+from ...future import Connection
+from ...future import Engine
from ...sql import Executable
from ...util.concurrency import greenlet_spawn
@@ -41,7 +40,24 @@ def create_async_engine(*arg, **kw):
return AsyncEngine(sync_engine)
-class AsyncConnection(StartableContext):
+class AsyncConnectable:
+ __slots__ = "_slots_dispatch"
+
+
+@util.create_proxy_methods(
+ Connection,
+ ":class:`_future.Connection`",
+ ":class:`_asyncio.AsyncConnection`",
+ classmethods=[],
+ methods=[],
+ attributes=[
+ "closed",
+ "invalidated",
+ "dialect",
+ "default_isolation_level",
+ ],
+)
+class AsyncConnection(StartableContext, AsyncConnectable):
"""An asyncio proxy for a :class:`_engine.Connection`.
:class:`_asyncio.AsyncConnection` is acquired using the
@@ -58,15 +74,23 @@ class AsyncConnection(StartableContext):
""" # noqa
+ # AsyncConnection is a thin proxy; no state should be added here
+ # that is not retrievable from the "sync" engine / connection, e.g.
+ # current transaction, info, etc. It should be possible to
+ # create a new AsyncConnection that matches this one given only the
+ # "sync" elements.
__slots__ = (
"sync_engine",
"sync_connection",
)
def __init__(
- self, sync_engine: Engine, sync_connection: Optional[Connection] = None
+ self,
+ async_engine: "AsyncEngine",
+ sync_connection: Optional[Connection] = None,
):
- self.sync_engine = sync_engine
+ self.engine = async_engine
+ self.sync_engine = async_engine.sync_engine
self.sync_connection = sync_connection
async def start(self):
@@ -79,6 +103,34 @@ class AsyncConnection(StartableContext):
self.sync_connection = await (greenlet_spawn(self.sync_engine.connect))
return self
+ @property
+ def connection(self):
+ """Not implemented for async; call
+ :meth:`_asyncio.AsyncConnection.get_raw_connection`.
+
+ """
+ raise exc.InvalidRequestError(
+ "AsyncConnection.connection accessor is not implemented as the "
+ "attribute may need to reconnect on an invalidated connection. "
+ "Use the get_raw_connection() method."
+ )
+
+ async def get_raw_connection(self):
+ """Return the pooled DBAPI-level connection in use by this
+ :class:`_asyncio.AsyncConnection`.
+
+ This is typically the SQLAlchemy connection-pool proxied connection
+ which then has an attribute .connection that refers to the actual
+ DBAPI-level connection.
+ """
+ conn = self._sync_connection()
+
+ return await greenlet_spawn(getattr, conn, "connection")
+
+ @property
+ def _proxied(self):
+ return self.sync_connection
+
def _sync_connection(self):
if not self.sync_connection:
self._raise_for_not_started()
@@ -94,6 +146,43 @@ class AsyncConnection(StartableContext):
self._sync_connection()
return AsyncTransaction(self, nested=True)
+ async def invalidate(self, exception=None):
+ """Invalidate the underlying DBAPI connection associated with
+ this :class:`_engine.Connection`.
+
+ See the method :meth:`_engine.Connection.invalidate` for full
+ detail on this method.
+
+ """
+
+ conn = self._sync_connection()
+ return await greenlet_spawn(conn.invalidate, exception=exception)
+
+ async def get_isolation_level(self):
+ conn = self._sync_connection()
+ return await greenlet_spawn(conn.get_isolation_level)
+
+ async def set_isolation_level(self):
+ conn = self._sync_connection()
+ return await greenlet_spawn(conn.get_isolation_level)
+
+ async def execution_options(self, **opt):
+ r"""Set non-SQL options for the connection which take effect
+ during execution.
+
+ This returns this :class:`_asyncio.AsyncConnection` object with
+ the new options added.
+
+ See :meth:`_future.Connection.execution_options` for full details
+ on this method.
+
+ """
+
+ conn = self._sync_connection()
+ c2 = await greenlet_spawn(conn.execution_options, **opt)
+ assert c2 is conn
+ return self
+
async def commit(self):
"""Commit the transaction that is currently in progress.
@@ -287,7 +376,19 @@ class AsyncConnection(StartableContext):
await self.close()
-class AsyncEngine:
+@util.create_proxy_methods(
+ Engine,
+ ":class:`_future.Engine`",
+ ":class:`_asyncio.AsyncEngine`",
+ classmethods=[],
+ methods=[
+ "clear_compiled_cache",
+ "update_execution_options",
+ "get_execution_options",
+ ],
+ attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"],
+)
+class AsyncEngine(AsyncConnectable):
"""An asyncio proxy for a :class:`_engine.Engine`.
:class:`_asyncio.AsyncEngine` is acquired using the
@@ -301,7 +402,12 @@ class AsyncEngine:
""" # noqa
- __slots__ = ("sync_engine",)
+ # AsyncEngine is a thin proxy; no state should be added here
+ # that is not retrievable from the "sync" engine / connection, e.g.
+ # current transaction, info, etc. It should be possible to
+ # create a new AsyncEngine that matches this one given only the
+ # "sync" elements.
+ __slots__ = ("sync_engine", "_proxied")
_connection_cls = AsyncConnection
@@ -327,7 +433,7 @@ class AsyncEngine:
await self.conn.close()
def __init__(self, sync_engine: Engine):
- self.sync_engine = sync_engine
+ self.sync_engine = self._proxied = sync_engine
def begin(self):
"""Return a context manager which when entered will deliver an
@@ -363,7 +469,7 @@ class AsyncEngine:
"""
- return self._connection_cls(self.sync_engine)
+ return self._connection_cls(self)
async def raw_connection(self) -> Any:
"""Return a "raw" DBAPI connection from the connection pool.
@@ -375,12 +481,33 @@ class AsyncEngine:
"""
return await greenlet_spawn(self.sync_engine.raw_connection)
+ def execution_options(self, **opt):
+ """Return a new :class:`_asyncio.AsyncEngine` that will provide
+ :class:`_asyncio.AsyncConnection` objects with the given execution
+ options.
+
+ Proxied from :meth:`_future.Engine.execution_options`. See that
+ method for details.
+
+ """
+
+ return AsyncEngine(self.sync_engine.execution_options(**opt))
-class AsyncOptionEngine(OptionEngineMixin, AsyncEngine):
- pass
+ async def dispose(self):
+ """Dispose of the connection pool used by this
+ :class:`_asyncio.AsyncEngine`.
+ This will close all connection pool connections that are
+ **currently checked in**. See the documentation for the underlying
+ :meth:`_future.Engine.dispose` method for further notes.
+
+ .. seealso::
+
+ :meth:`_future.Engine.dispose`
+
+ """
-AsyncEngine._option_cls = AsyncOptionEngine
+ return await greenlet_spawn(self.sync_engine.dispose)
class AsyncTransaction(StartableContext):