diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-10-08 15:20:48 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-10-10 01:17:25 -0400 |
commit | 2665a0c4cb3e94e6545d0b9bbcbcc39ccffebaba (patch) | |
tree | ed25383ce7e5899d7d643a11df0f8aee9f2ab959 /lib/sqlalchemy/ext/asyncio/engine.py | |
parent | bcc17b1d6e2cac3b0e45c0b17a62cf2d5fc5c5ab (diff) | |
download | sqlalchemy-2665a0c4cb3e94e6545d0b9bbcbcc39ccffebaba.tar.gz |
generalize scoped_session proxying and apply to asyncio elements
Reworked the proxy creation used by scoped_session() to be
based on fully copied code with augmented docstrings and
moved it into langhelpers. asyncio session, engine,
connection can now take
advantage of it so that all non-async methods are availble.
Overall implementation of most important accessors / methods
on AsyncConnection, etc. , including awaitable versions
of invalidate, execution_options, etc.
In order to support an event dispatcher on the async
classes while still allowing them to hold __slots__,
make some adjustments to the event system to allow
that to be present, at least rudimentally.
Fixes: #5628
Change-Id: I5eb6929fc1e4fdac99e4b767dcfd49672d56e2b2
Diffstat (limited to 'lib/sqlalchemy/ext/asyncio/engine.py')
-rw-r--r-- | lib/sqlalchemy/ext/asyncio/engine.py | 153 |
1 files changed, 140 insertions, 13 deletions
diff --git a/lib/sqlalchemy/ext/asyncio/engine.py b/lib/sqlalchemy/ext/asyncio/engine.py index 4a92fb1f2..9e4851dfc 100644 --- a/lib/sqlalchemy/ext/asyncio/engine.py +++ b/lib/sqlalchemy/ext/asyncio/engine.py @@ -8,12 +8,11 @@ from .base import StartableContext from .result import AsyncResult from ... import exc from ... import util -from ...engine import Connection from ...engine import create_engine as _create_engine -from ...engine import Engine from ...engine import Result from ...engine import Transaction -from ...engine.base import OptionEngineMixin +from ...future import Connection +from ...future import Engine from ...sql import Executable from ...util.concurrency import greenlet_spawn @@ -41,7 +40,24 @@ def create_async_engine(*arg, **kw): return AsyncEngine(sync_engine) -class AsyncConnection(StartableContext): +class AsyncConnectable: + __slots__ = "_slots_dispatch" + + +@util.create_proxy_methods( + Connection, + ":class:`_future.Connection`", + ":class:`_asyncio.AsyncConnection`", + classmethods=[], + methods=[], + attributes=[ + "closed", + "invalidated", + "dialect", + "default_isolation_level", + ], +) +class AsyncConnection(StartableContext, AsyncConnectable): """An asyncio proxy for a :class:`_engine.Connection`. :class:`_asyncio.AsyncConnection` is acquired using the @@ -58,15 +74,23 @@ class AsyncConnection(StartableContext): """ # noqa + # AsyncConnection is a thin proxy; no state should be added here + # that is not retrievable from the "sync" engine / connection, e.g. + # current transaction, info, etc. It should be possible to + # create a new AsyncConnection that matches this one given only the + # "sync" elements. __slots__ = ( "sync_engine", "sync_connection", ) def __init__( - self, sync_engine: Engine, sync_connection: Optional[Connection] = None + self, + async_engine: "AsyncEngine", + sync_connection: Optional[Connection] = None, ): - self.sync_engine = sync_engine + self.engine = async_engine + self.sync_engine = async_engine.sync_engine self.sync_connection = sync_connection async def start(self): @@ -79,6 +103,34 @@ class AsyncConnection(StartableContext): self.sync_connection = await (greenlet_spawn(self.sync_engine.connect)) return self + @property + def connection(self): + """Not implemented for async; call + :meth:`_asyncio.AsyncConnection.get_raw_connection`. + + """ + raise exc.InvalidRequestError( + "AsyncConnection.connection accessor is not implemented as the " + "attribute may need to reconnect on an invalidated connection. " + "Use the get_raw_connection() method." + ) + + async def get_raw_connection(self): + """Return the pooled DBAPI-level connection in use by this + :class:`_asyncio.AsyncConnection`. + + This is typically the SQLAlchemy connection-pool proxied connection + which then has an attribute .connection that refers to the actual + DBAPI-level connection. + """ + conn = self._sync_connection() + + return await greenlet_spawn(getattr, conn, "connection") + + @property + def _proxied(self): + return self.sync_connection + def _sync_connection(self): if not self.sync_connection: self._raise_for_not_started() @@ -94,6 +146,43 @@ class AsyncConnection(StartableContext): self._sync_connection() return AsyncTransaction(self, nested=True) + async def invalidate(self, exception=None): + """Invalidate the underlying DBAPI connection associated with + this :class:`_engine.Connection`. + + See the method :meth:`_engine.Connection.invalidate` for full + detail on this method. + + """ + + conn = self._sync_connection() + return await greenlet_spawn(conn.invalidate, exception=exception) + + async def get_isolation_level(self): + conn = self._sync_connection() + return await greenlet_spawn(conn.get_isolation_level) + + async def set_isolation_level(self): + conn = self._sync_connection() + return await greenlet_spawn(conn.get_isolation_level) + + async def execution_options(self, **opt): + r"""Set non-SQL options for the connection which take effect + during execution. + + This returns this :class:`_asyncio.AsyncConnection` object with + the new options added. + + See :meth:`_future.Connection.execution_options` for full details + on this method. + + """ + + conn = self._sync_connection() + c2 = await greenlet_spawn(conn.execution_options, **opt) + assert c2 is conn + return self + async def commit(self): """Commit the transaction that is currently in progress. @@ -287,7 +376,19 @@ class AsyncConnection(StartableContext): await self.close() -class AsyncEngine: +@util.create_proxy_methods( + Engine, + ":class:`_future.Engine`", + ":class:`_asyncio.AsyncEngine`", + classmethods=[], + methods=[ + "clear_compiled_cache", + "update_execution_options", + "get_execution_options", + ], + attributes=["url", "pool", "dialect", "engine", "name", "driver", "echo"], +) +class AsyncEngine(AsyncConnectable): """An asyncio proxy for a :class:`_engine.Engine`. :class:`_asyncio.AsyncEngine` is acquired using the @@ -301,7 +402,12 @@ class AsyncEngine: """ # noqa - __slots__ = ("sync_engine",) + # AsyncEngine is a thin proxy; no state should be added here + # that is not retrievable from the "sync" engine / connection, e.g. + # current transaction, info, etc. It should be possible to + # create a new AsyncEngine that matches this one given only the + # "sync" elements. + __slots__ = ("sync_engine", "_proxied") _connection_cls = AsyncConnection @@ -327,7 +433,7 @@ class AsyncEngine: await self.conn.close() def __init__(self, sync_engine: Engine): - self.sync_engine = sync_engine + self.sync_engine = self._proxied = sync_engine def begin(self): """Return a context manager which when entered will deliver an @@ -363,7 +469,7 @@ class AsyncEngine: """ - return self._connection_cls(self.sync_engine) + return self._connection_cls(self) async def raw_connection(self) -> Any: """Return a "raw" DBAPI connection from the connection pool. @@ -375,12 +481,33 @@ class AsyncEngine: """ return await greenlet_spawn(self.sync_engine.raw_connection) + def execution_options(self, **opt): + """Return a new :class:`_asyncio.AsyncEngine` that will provide + :class:`_asyncio.AsyncConnection` objects with the given execution + options. + + Proxied from :meth:`_future.Engine.execution_options`. See that + method for details. + + """ + + return AsyncEngine(self.sync_engine.execution_options(**opt)) -class AsyncOptionEngine(OptionEngineMixin, AsyncEngine): - pass + async def dispose(self): + """Dispose of the connection pool used by this + :class:`_asyncio.AsyncEngine`. + This will close all connection pool connections that are + **currently checked in**. See the documentation for the underlying + :meth:`_future.Engine.dispose` method for further notes. + + .. seealso:: + + :meth:`_future.Engine.dispose` + + """ -AsyncEngine._option_cls = AsyncOptionEngine + return await greenlet_spawn(self.sync_engine.dispose) class AsyncTransaction(StartableContext): |