summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2023-02-06 23:52:23 +0000
committerGerrit Code Review <gerrit@bbpush.zzzcomputing.com>2023-02-06 23:52:23 +0000
commit54abda842e005b3aec48c48eb1643eefb096ecbc (patch)
treece8a6ffd79dfe825051d80ff26092521b776b36b /test
parent557c4a97a8657b86ee383ce8891d59f1ce3f2ec8 (diff)
parent17f1b30a94bf5c20db5036a712dc682ec0814dab (diff)
downloadsqlalchemy-54abda842e005b3aec48c48eb1643eefb096ecbc.tar.gz
Merge "do not return asyncio connections to the pool under gc" into main
Diffstat (limited to 'test')
-rw-r--r--test/engine/test_pool.py83
-rw-r--r--test/engine/test_transaction.py160
-rw-r--r--test/ext/asyncio/test_engine_py3k.py63
3 files changed, 209 insertions, 97 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py
index 4fddcc871..6730d7012 100644
--- a/test/engine/test_pool.py
+++ b/test/engine/test_pool.py
@@ -23,6 +23,7 @@ from sqlalchemy.testing import assert_raises_context_ok
from sqlalchemy.testing import assert_warns_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises
+from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_none
@@ -456,6 +457,13 @@ class PoolEventsTest(PoolTestBase):
)
canary = []
+ @event.listens_for(p, "reset")
+ def reset(conn, rec, state):
+ canary.append(
+ f"""reset_{'rollback_ok'
+ if state.asyncio_safe else 'no_rollback'}"""
+ )
+
@event.listens_for(p, "checkin")
def checkin(*arg, **kw):
canary.append("checkin")
@@ -668,7 +676,7 @@ class PoolEventsTest(PoolTestBase):
c1 = p.connect()
eq_(canary, [])
c1.close()
- eq_(canary, ["checkin"])
+ eq_(canary, ["reset_rollback_ok", "checkin"])
def test_reset_event(self):
p, canary = self._reset_event_fixture()
@@ -728,11 +736,13 @@ class PoolEventsTest(PoolTestBase):
assert canary.call_args_list[0][0][0] is dbapi_con
assert canary.call_args_list[0][0][2] is exc
- @testing.combinations((True,), (False,), argnames="is_asyncio")
- @testing.combinations((True,), (False,), argnames="has_terminate")
+ @testing.variation("is_asyncio", [True, False])
+ @testing.variation("has_terminate", [True, False])
def test_checkin_event_gc(self, is_asyncio, has_terminate):
+ """tests for #8419, which have been modified for 2.0 in #9237"""
+
p, canary = self._checkin_event_fixture(
- _is_asyncio=is_asyncio, _has_terminate=has_terminate
+ _is_asyncio=bool(is_asyncio), _has_terminate=bool(has_terminate)
)
c1 = p.connect()
@@ -740,18 +750,38 @@ class PoolEventsTest(PoolTestBase):
dbapi_connection = weakref.ref(c1.dbapi_connection)
eq_(canary, [])
- del c1
- lazy_gc()
- detach_gced = is_asyncio and not has_terminate
+ if is_asyncio:
+ if has_terminate:
+ with expect_warnings(
+ "The garbage collector is trying to clean up.*which will "
+ "be terminated."
+ ):
+ del c1
+ lazy_gc()
+ else:
+ with expect_warnings(
+ "The garbage collector is trying to clean up.*which will "
+ "be dropped, as it cannot be safely terminated."
+ ):
+ del c1
+ lazy_gc()
+ else:
+ del c1
+ lazy_gc()
+
+ detach_gced = is_asyncio
if detach_gced:
- # "close_detached" is not called because for asyncio the
- # connection is just lost.
- eq_(canary, ["detach"])
+ if has_terminate:
+ eq_(canary, ["reset_no_rollback", "detach", "close_detached"])
+ else:
+ # "close_detached" is not called because for asyncio without
+ # terminate the connection is just lost.
+ eq_(canary, ["reset_no_rollback", "detach"])
else:
- eq_(canary, ["checkin"])
+ eq_(canary, ["reset_rollback_ok", "checkin"])
gc_collect()
if detach_gced:
@@ -769,10 +799,13 @@ class PoolEventsTest(PoolTestBase):
eq_(canary, [])
c1.close()
- eq_(canary, ["checkin"])
+ eq_(canary, ["reset_rollback_ok", "checkin"])
c2.close()
- eq_(canary, ["checkin", "checkin"])
+ eq_(
+ canary,
+ ["reset_rollback_ok", "checkin", "reset_rollback_ok", "checkin"],
+ )
def test_listen_targets_scope(self):
canary = []
@@ -1686,28 +1719,32 @@ class QueuePoolTest(PoolTestBase):
raise tsa.exc.DisconnectionError()
conn = pool.connect()
- old_dbapi_conn = conn.dbapi_connection
+ normally_closed_dbapi_conn = conn.dbapi_connection
conn.close()
- eq_(old_dbapi_conn.mock_calls, [call.rollback()])
+ eq_(normally_closed_dbapi_conn.mock_calls, [call.rollback()])
- old_dbapi_conn.boom = "yes"
+ normally_closed_dbapi_conn.boom = "yes"
conn = pool.connect()
- dbapi_conn = conn.dbapi_connection
+
+ # normally closed conn was checked out again but had a problem,
+ # so was replaced
+ eq_(
+ normally_closed_dbapi_conn.mock_calls,
+ [call.rollback(), call.close()],
+ )
+
+ not_closed_dbapi_conn = conn.dbapi_connection
del conn
gc_collect()
if detach_gced:
# new connection was detached + abandoned on return
- eq_(dbapi_conn.mock_calls, [])
+ eq_(not_closed_dbapi_conn.mock_calls, [])
else:
# new connection reset and returned to pool
- eq_(dbapi_conn.mock_calls, [call.rollback()])
-
- # old connection was just closed - did not get an
- # erroneous reset on return
- eq_(old_dbapi_conn.mock_calls, [call.rollback(), call.close()])
+ eq_(not_closed_dbapi_conn.mock_calls, [call.rollback()])
@testing.requires.timing_intensive
def test_recycle_pool_no_race(self):
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index e24c84820..1b3c4c17e 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -1069,25 +1069,25 @@ class TransactionTest(fixtures.TablesTest):
def test_savepoint_seven(self):
users = self.tables.users
- conn = testing.db.connect()
- trans = conn.begin()
- conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ with testing.db.connect() as conn:
+ trans = conn.begin()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- sp1 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
- sp2 = conn.begin_nested()
- conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
- assert conn.in_transaction()
+ assert conn.in_transaction()
- trans.close()
+ trans.close()
- assert not sp1.is_active
- assert not sp2.is_active
- assert not trans.is_active
- assert conn._transaction is None
- assert conn._nested_transaction is None
+ assert not sp1.is_active
+ assert not sp2.is_active
+ assert not trans.is_active
+ assert conn._transaction is None
+ assert conn._nested_transaction is None
with testing.db.connect() as conn:
eq_(
@@ -1163,41 +1163,47 @@ class IsolationLevelTest(fixtures.TestBase):
def test_engine_param_stays(self):
eng = testing_engine()
- isolation_level = eng.dialect.get_isolation_level(
- eng.connect().connection.dbapi_connection
- )
+ with eng.connect() as conn:
+ isolation_level = eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ )
level = self._non_default_isolation_level()
ne_(isolation_level, level)
eng = testing_engine(options=dict(isolation_level=level))
- eq_(
- eng.dialect.get_isolation_level(
- eng.connect().connection.dbapi_connection
- ),
- level,
- )
+ with eng.connect() as conn:
+ eq_(
+ eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ ),
+ level,
+ )
# check that it stays
- conn = eng.connect()
- eq_(
- eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
- level,
- )
- conn.close()
+ with eng.connect() as conn:
+ eq_(
+ eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ ),
+ level,
+ )
- conn = eng.connect()
- eq_(
- eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
- level,
- )
- conn.close()
+ with eng.connect() as conn:
+ eq_(
+ eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ ),
+ level,
+ )
def test_default_level(self):
eng = testing_engine(options=dict())
- isolation_level = eng.dialect.get_isolation_level(
- eng.connect().connection.dbapi_connection
- )
+
+ with eng.connect() as conn:
+ isolation_level = eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ )
eq_(isolation_level, self._default_isolation_level())
def test_reset_level(self):
@@ -1335,16 +1341,16 @@ class IsolationLevelTest(fixtures.TestBase):
def test_connection_invalidated(self):
eng = testing_engine()
- conn = eng.connect()
- c2 = conn.execution_options(
- isolation_level=self._non_default_isolation_level()
- )
- c2.invalidate()
- c2.connection
+ with eng.connect() as conn:
+ c2 = conn.execution_options(
+ isolation_level=self._non_default_isolation_level()
+ )
+ c2.invalidate()
+ c2.connection
- # TODO: do we want to rebuild the previous isolation?
- # for now, this is current behavior so we will leave it.
- eq_(c2.get_isolation_level(), self._default_isolation_level())
+ # TODO: do we want to rebuild the previous isolation?
+ # for now, this is current behavior so we will leave it.
+ eq_(c2.get_isolation_level(), self._default_isolation_level())
def test_per_connection(self):
from sqlalchemy.pool import QueuePool
@@ -1384,24 +1390,26 @@ class IsolationLevelTest(fixtures.TestBase):
def test_exception_in_transaction(self):
eng = testing_engine()
- c1 = eng.connect()
- with expect_raises_message(
- exc.InvalidRequestError,
- r"This connection has already initialized a SQLAlchemy "
- r"Transaction\(\) object via begin\(\) or autobegin; "
- r"isolation_level may not be altered unless rollback\(\) or "
- r"commit\(\) is called first.",
- ):
- with c1.begin():
- c1 = c1.execution_options(
- isolation_level=self._non_default_isolation_level()
- )
+ with eng.connect() as c1:
+ with expect_raises_message(
+ exc.InvalidRequestError,
+ r"This connection has already initialized a SQLAlchemy "
+ r"Transaction\(\) object via begin\(\) or autobegin; "
+ r"isolation_level may not be altered unless rollback\(\) or "
+ r"commit\(\) is called first.",
+ ):
+ with c1.begin():
+ c1 = c1.execution_options(
+ isolation_level=self._non_default_isolation_level()
+ )
- # was never set, so we are on original value
- eq_(
- eng.dialect.get_isolation_level(c1.connection.dbapi_connection),
- self._default_isolation_level(),
- )
+ # was never set, so we are on original value
+ eq_(
+ eng.dialect.get_isolation_level(
+ c1.connection.dbapi_connection
+ ),
+ self._default_isolation_level(),
+ )
def test_per_statement_bzzt(self):
assert_raises_message(
@@ -1424,22 +1432,26 @@ class IsolationLevelTest(fixtures.TestBase):
}
),
)
- conn = eng.connect()
- eq_(
- eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
- self._non_default_isolation_level(),
- )
+ with eng.connect() as conn:
+ eq_(
+ eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ ),
+ self._non_default_isolation_level(),
+ )
def test_per_option_engine(self):
eng = testing_engine(testing.db.url).execution_options(
isolation_level=self._non_default_isolation_level()
)
- conn = eng.connect()
- eq_(
- eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
- self._non_default_isolation_level(),
- )
+ with eng.connect() as conn:
+ eq_(
+ eng.dialect.get_isolation_level(
+ conn.connection.dbapi_connection
+ ),
+ self._non_default_isolation_level(),
+ )
def test_isolation_level_accessors_connection_default(self):
eng = testing_engine(testing.db.url)
diff --git a/test/ext/asyncio/test_engine_py3k.py b/test/ext/asyncio/test_engine_py3k.py
index 2eebb433d..bce669e4f 100644
--- a/test/ext/asyncio/test_engine_py3k.py
+++ b/test/ext/asyncio/test_engine_py3k.py
@@ -31,6 +31,7 @@ from sqlalchemy.testing import combinations
from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import eq_regex
from sqlalchemy.testing import expect_raises
from sqlalchemy.testing import expect_raises_message
from sqlalchemy.testing import fixtures
@@ -339,6 +340,68 @@ class AsyncEngineTest(EngineFixture):
is_false(t1 == None)
+ @testing.variation("simulate_gc", [True, False])
+ def test_appropriate_warning_for_gced_connection(
+ self, async_engine, simulate_gc
+ ):
+ """test #9237 which builds upon a not really complete solution
+ added for #8419."""
+
+ async def go():
+ conn = await async_engine.connect()
+ await conn.begin()
+ await conn.execute(select(1))
+ pool_connection = await conn.get_raw_connection()
+ return pool_connection
+
+ from sqlalchemy.util.concurrency import await_only
+
+ pool_connection = await_only(go())
+
+ rec = pool_connection._connection_record
+ ref = rec.fairy_ref
+ pool = pool_connection._pool
+ echo = False
+
+ if simulate_gc:
+ # not using expect_warnings() here because we also want to do a
+ # negative test for warnings, and we want to absolutely make sure
+ # the thing here that emits the warning is the correct path
+ from sqlalchemy.pool.base import _finalize_fairy
+
+ with mock.patch.object(
+ pool._dialect,
+ "do_rollback",
+ mock.Mock(side_effect=Exception("can't run rollback")),
+ ), mock.patch("sqlalchemy.util.warn") as m:
+
+ _finalize_fairy(
+ None, rec, pool, ref, echo, transaction_was_reset=False
+ )
+
+ if async_engine.dialect.has_terminate:
+ expected_msg = (
+ "The garbage collector is trying to clean up.*which will "
+ "be terminated."
+ )
+ else:
+ expected_msg = (
+ "The garbage collector is trying to clean up.*which will "
+ "be dropped, as it cannot be safely terminated."
+ )
+
+ # [1] == .args, not in 3.7
+ eq_regex(m.mock_calls[0][1][0], expected_msg)
+ else:
+ # the warning emitted by the pool is inside of a try/except:
+ # so it's impossible right now to have this warning "raise".
+ # for now, test by using mock.patch
+
+ with mock.patch("sqlalchemy.util.warn") as m:
+ pool_connection.close()
+
+ eq_(m.mock_calls, [])
+
def test_clear_compiled_cache(self, async_engine):
async_engine.sync_engine._compiled_cache["foo"] = "bar"
eq_(async_engine.sync_engine._compiled_cache["foo"], "bar")