summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2021-09-01 08:58:06 -0400
committermike bayer <mike_mp@zzzcomputing.com>2021-09-02 14:19:20 +0000
commitd640192877e4d1da75e8dea34d2374c404e80538 (patch)
tree9bb87b162c50ad6385949bc47369388752f020f5
parentbf1fe670513abeb1596bc5266f50db1ffe62f3bd (diff)
downloadsqlalchemy-d640192877e4d1da75e8dea34d2374c404e80538.tar.gz
add asyncio.gather() example; add connection opts
while I dont like this approach very much, people will likely be asking for it a lot, so represent the most correct and efficient form we can handle right now. Added missing ``**kw`` arguments to the :meth:`_asyncio.AsyncSession.connection` method. Change-Id: Idadae2a02a4d96ecb96a5723ce64d017ab4c6217 References: https://github.com/sqlalchemy/sqlalchemy/discussions/6965
-rw-r--r--doc/build/changelog/unreleased_14/async_conn.rst5
-rw-r--r--examples/asyncio/gather_orm_statements.py118
-rw-r--r--lib/sqlalchemy/ext/asyncio/session.py9
-rw-r--r--test/ext/asyncio/test_session_py3k.py11
4 files changed, 141 insertions, 2 deletions
diff --git a/doc/build/changelog/unreleased_14/async_conn.rst b/doc/build/changelog/unreleased_14/async_conn.rst
new file mode 100644
index 000000000..7acb147ad
--- /dev/null
+++ b/doc/build/changelog/unreleased_14/async_conn.rst
@@ -0,0 +1,5 @@
+.. change::
+ :tags: bug, asyncio
+
+ Added missing ``**kw`` arguments to the
+ :meth:`_asyncio.AsyncSession.connection` method.
diff --git a/examples/asyncio/gather_orm_statements.py b/examples/asyncio/gather_orm_statements.py
new file mode 100644
index 000000000..edcdc1fe8
--- /dev/null
+++ b/examples/asyncio/gather_orm_statements.py
@@ -0,0 +1,118 @@
+"""
+Illustrates how to run many statements concurrently using ``asyncio.gather()``
+along many asyncio database connections, merging ORM results into a single
+``AsyncSession``.
+
+Note that this pattern loses all transactional safety and is also not
+necessarily any more performant than using a single Session, as it adds
+significant CPU-bound work both to maintain more database connections
+and sessions, as well as within the merging of results from external sessions
+into one.
+
+Python is a CPU-intensive language even in trivial cases, so it is strongly
+recommended that any workarounds for "speed" such as the one below are
+carefully vetted to show that they do in fact improve performance vs a
+traditional approach.
+
+"""
+
+import asyncio
+import random
+
+from sqlalchemy import Column
+from sqlalchemy import Integer
+from sqlalchemy import String
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy.ext.asyncio import create_async_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.future import select
+from sqlalchemy.orm import merge_frozen_result
+from sqlalchemy.orm import sessionmaker
+
+Base = declarative_base()
+
+
+class A(Base):
+ __tablename__ = "a"
+
+ id = Column(Integer, primary_key=True)
+ data = Column(String)
+
+
+async def run_out_of_band(
+ sessionmaker, session, statement, merge_results=True
+):
+ """run an ORM statement in a distinct session, merging the result
+ back into the given session.
+
+ """
+
+ async with sessionmaker() as oob_session:
+
+ # use AUTOCOMMIT for each connection to reduce transaction
+ # overhead / contention
+ await oob_session.connection(
+ execution_options={"isolation_level": "AUTOCOMMIT"}
+ )
+
+ # pre 1.4.24
+ # await oob_session.run_sync(
+ # lambda sync_session: sync_session.connection(
+ # execution_options={"isolation_level": "AUTOCOMMIT"}
+ # )
+ # )
+
+ result = await oob_session.execute(statement)
+
+ if merge_results:
+ # merge_results means the ORM objects from the result
+ # will be merged back into the original session.
+ # load=False means we can use the objects directly without
+ # re-selecting them. however this merge operation is still
+ # more expensive CPU-wise than a regular ORM load because the
+ # objects are copied into new instances
+ return (
+ await session.run_sync(
+ merge_frozen_result,
+ statement,
+ result.freeze(),
+ load=False,
+ )
+ )()
+ else:
+ await result.close()
+
+
+async def async_main():
+
+ engine = create_async_engine(
+ "postgresql+asyncpg://scott:tiger@localhost/test",
+ echo=True,
+ )
+
+ async with engine.begin() as conn:
+ await conn.run_sync(Base.metadata.drop_all)
+ await conn.run_sync(Base.metadata.create_all)
+
+ async_session = sessionmaker(
+ engine, expire_on_commit=False, class_=AsyncSession
+ )
+
+ async with async_session() as session, session.begin():
+ session.add_all([A(data="a_%d" % i) for i in range(100)])
+
+ statements = [
+ select(A).where(A.data == "a_%d" % random.choice(range(100)))
+ for i in range(30)
+ ]
+
+ results = await asyncio.gather(
+ *(
+ run_out_of_band(async_session, session, statement)
+ for statement in statements
+ )
+ )
+ print(f"results: {[r.all() for r in results]}")
+
+
+asyncio.run(async_main())
diff --git a/lib/sqlalchemy/ext/asyncio/session.py b/lib/sqlalchemy/ext/asyncio/session.py
index a62c7177c..6b18e3d7c 100644
--- a/lib/sqlalchemy/ext/asyncio/session.py
+++ b/lib/sqlalchemy/ext/asyncio/session.py
@@ -330,13 +330,18 @@ class AsyncSession(ReversibleProxy):
else:
return None
- async def connection(self):
+ async def connection(self, **kw):
r"""Return a :class:`_asyncio.AsyncConnection` object corresponding to
this :class:`.Session` object's transactional state.
+ .. versionadded:: 1.4.24 Added **kw arguments which are passed through
+ to the underlying :meth:`_orm.Session.connection` method.
+
"""
- sync_connection = await greenlet_spawn(self.sync_session.connection)
+ sync_connection = await greenlet_spawn(
+ self.sync_session.connection, **kw
+ )
return engine.AsyncConnection._retrieve_proxy_for_target(
sync_connection
)
diff --git a/test/ext/asyncio/test_session_py3k.py b/test/ext/asyncio/test_session_py3k.py
index 459d95ea6..4165991d4 100644
--- a/test/ext/asyncio/test_session_py3k.py
+++ b/test/ext/asyncio/test_session_py3k.py
@@ -615,6 +615,17 @@ class AsyncProxyTest(AsyncFixture):
is_(c1.engine, c2.engine)
@async_test
+ async def test_get_connection_kws(self, async_session):
+ c1 = await async_session.connection(
+ execution_options={"isolation_level": "AUTOCOMMIT"}
+ )
+
+ eq_(
+ c1.sync_connection._execution_options,
+ {"isolation_level": "AUTOCOMMIT"},
+ )
+
+ @async_test
async def test_get_connection_connection_bound(self, async_engine):
async with async_engine.begin() as conn:
async_session = AsyncSession(conn)