diff options
| author | mike bayer <mike_mp@zzzcomputing.com> | 2023-02-06 23:52:23 +0000 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@bbpush.zzzcomputing.com> | 2023-02-06 23:52:23 +0000 |
| commit | 54abda842e005b3aec48c48eb1643eefb096ecbc (patch) | |
| tree | ce8a6ffd79dfe825051d80ff26092521b776b36b /test | |
| parent | 557c4a97a8657b86ee383ce8891d59f1ce3f2ec8 (diff) | |
| parent | 17f1b30a94bf5c20db5036a712dc682ec0814dab (diff) | |
| download | sqlalchemy-54abda842e005b3aec48c48eb1643eefb096ecbc.tar.gz | |
Merge "do not return asyncio connections to the pool under gc" into main
Diffstat (limited to 'test')
| -rw-r--r-- | test/engine/test_pool.py | 83 | ||||
| -rw-r--r-- | test/engine/test_transaction.py | 160 | ||||
| -rw-r--r-- | test/ext/asyncio/test_engine_py3k.py | 63 |
3 files changed, 209 insertions, 97 deletions
diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 4fddcc871..6730d7012 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -23,6 +23,7 @@ from sqlalchemy.testing import assert_raises_context_ok from sqlalchemy.testing import assert_warns_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import expect_raises +from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_none @@ -456,6 +457,13 @@ class PoolEventsTest(PoolTestBase): ) canary = [] + @event.listens_for(p, "reset") + def reset(conn, rec, state): + canary.append( + f"""reset_{'rollback_ok' + if state.asyncio_safe else 'no_rollback'}""" + ) + @event.listens_for(p, "checkin") def checkin(*arg, **kw): canary.append("checkin") @@ -668,7 +676,7 @@ class PoolEventsTest(PoolTestBase): c1 = p.connect() eq_(canary, []) c1.close() - eq_(canary, ["checkin"]) + eq_(canary, ["reset_rollback_ok", "checkin"]) def test_reset_event(self): p, canary = self._reset_event_fixture() @@ -728,11 +736,13 @@ class PoolEventsTest(PoolTestBase): assert canary.call_args_list[0][0][0] is dbapi_con assert canary.call_args_list[0][0][2] is exc - @testing.combinations((True,), (False,), argnames="is_asyncio") - @testing.combinations((True,), (False,), argnames="has_terminate") + @testing.variation("is_asyncio", [True, False]) + @testing.variation("has_terminate", [True, False]) def test_checkin_event_gc(self, is_asyncio, has_terminate): + """tests for #8419, which have been modified for 2.0 in #9237""" + p, canary = self._checkin_event_fixture( - _is_asyncio=is_asyncio, _has_terminate=has_terminate + _is_asyncio=bool(is_asyncio), _has_terminate=bool(has_terminate) ) c1 = p.connect() @@ -740,18 +750,38 @@ class PoolEventsTest(PoolTestBase): dbapi_connection = weakref.ref(c1.dbapi_connection) eq_(canary, []) - del c1 - lazy_gc() - detach_gced = is_asyncio and not has_terminate + if is_asyncio: + if has_terminate: + with expect_warnings( + "The garbage collector is trying to clean up.*which will " + "be terminated." + ): + del c1 + lazy_gc() + else: + with expect_warnings( + "The garbage collector is trying to clean up.*which will " + "be dropped, as it cannot be safely terminated." + ): + del c1 + lazy_gc() + else: + del c1 + lazy_gc() + + detach_gced = is_asyncio if detach_gced: - # "close_detached" is not called because for asyncio the - # connection is just lost. - eq_(canary, ["detach"]) + if has_terminate: + eq_(canary, ["reset_no_rollback", "detach", "close_detached"]) + else: + # "close_detached" is not called because for asyncio without + # terminate the connection is just lost. + eq_(canary, ["reset_no_rollback", "detach"]) else: - eq_(canary, ["checkin"]) + eq_(canary, ["reset_rollback_ok", "checkin"]) gc_collect() if detach_gced: @@ -769,10 +799,13 @@ class PoolEventsTest(PoolTestBase): eq_(canary, []) c1.close() - eq_(canary, ["checkin"]) + eq_(canary, ["reset_rollback_ok", "checkin"]) c2.close() - eq_(canary, ["checkin", "checkin"]) + eq_( + canary, + ["reset_rollback_ok", "checkin", "reset_rollback_ok", "checkin"], + ) def test_listen_targets_scope(self): canary = [] @@ -1686,28 +1719,32 @@ class QueuePoolTest(PoolTestBase): raise tsa.exc.DisconnectionError() conn = pool.connect() - old_dbapi_conn = conn.dbapi_connection + normally_closed_dbapi_conn = conn.dbapi_connection conn.close() - eq_(old_dbapi_conn.mock_calls, [call.rollback()]) + eq_(normally_closed_dbapi_conn.mock_calls, [call.rollback()]) - old_dbapi_conn.boom = "yes" + normally_closed_dbapi_conn.boom = "yes" conn = pool.connect() - dbapi_conn = conn.dbapi_connection + + # normally closed conn was checked out again but had a problem, + # so was replaced + eq_( + normally_closed_dbapi_conn.mock_calls, + [call.rollback(), call.close()], + ) + + not_closed_dbapi_conn = conn.dbapi_connection del conn gc_collect() if detach_gced: # new connection was detached + abandoned on return - eq_(dbapi_conn.mock_calls, []) + eq_(not_closed_dbapi_conn.mock_calls, []) else: # new connection reset and returned to pool - eq_(dbapi_conn.mock_calls, [call.rollback()]) - - # old connection was just closed - did not get an - # erroneous reset on return - eq_(old_dbapi_conn.mock_calls, [call.rollback(), call.close()]) + eq_(not_closed_dbapi_conn.mock_calls, [call.rollback()]) @testing.requires.timing_intensive def test_recycle_pool_no_race(self): diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index e24c84820..1b3c4c17e 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -1069,25 +1069,25 @@ class TransactionTest(fixtures.TablesTest): def test_savepoint_seven(self): users = self.tables.users - conn = testing.db.connect() - trans = conn.begin() - conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + with testing.db.connect() as conn: + trans = conn.begin() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - sp1 = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) - sp2 = conn.begin_nested() - conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) - assert conn.in_transaction() + assert conn.in_transaction() - trans.close() + trans.close() - assert not sp1.is_active - assert not sp2.is_active - assert not trans.is_active - assert conn._transaction is None - assert conn._nested_transaction is None + assert not sp1.is_active + assert not sp2.is_active + assert not trans.is_active + assert conn._transaction is None + assert conn._nested_transaction is None with testing.db.connect() as conn: eq_( @@ -1163,41 +1163,47 @@ class IsolationLevelTest(fixtures.TestBase): def test_engine_param_stays(self): eng = testing_engine() - isolation_level = eng.dialect.get_isolation_level( - eng.connect().connection.dbapi_connection - ) + with eng.connect() as conn: + isolation_level = eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ) level = self._non_default_isolation_level() ne_(isolation_level, level) eng = testing_engine(options=dict(isolation_level=level)) - eq_( - eng.dialect.get_isolation_level( - eng.connect().connection.dbapi_connection - ), - level, - ) + with eng.connect() as conn: + eq_( + eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ), + level, + ) # check that it stays - conn = eng.connect() - eq_( - eng.dialect.get_isolation_level(conn.connection.dbapi_connection), - level, - ) - conn.close() + with eng.connect() as conn: + eq_( + eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ), + level, + ) - conn = eng.connect() - eq_( - eng.dialect.get_isolation_level(conn.connection.dbapi_connection), - level, - ) - conn.close() + with eng.connect() as conn: + eq_( + eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ), + level, + ) def test_default_level(self): eng = testing_engine(options=dict()) - isolation_level = eng.dialect.get_isolation_level( - eng.connect().connection.dbapi_connection - ) + + with eng.connect() as conn: + isolation_level = eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ) eq_(isolation_level, self._default_isolation_level()) def test_reset_level(self): @@ -1335,16 +1341,16 @@ class IsolationLevelTest(fixtures.TestBase): def test_connection_invalidated(self): eng = testing_engine() - conn = eng.connect() - c2 = conn.execution_options( - isolation_level=self._non_default_isolation_level() - ) - c2.invalidate() - c2.connection + with eng.connect() as conn: + c2 = conn.execution_options( + isolation_level=self._non_default_isolation_level() + ) + c2.invalidate() + c2.connection - # TODO: do we want to rebuild the previous isolation? - # for now, this is current behavior so we will leave it. - eq_(c2.get_isolation_level(), self._default_isolation_level()) + # TODO: do we want to rebuild the previous isolation? + # for now, this is current behavior so we will leave it. + eq_(c2.get_isolation_level(), self._default_isolation_level()) def test_per_connection(self): from sqlalchemy.pool import QueuePool @@ -1384,24 +1390,26 @@ class IsolationLevelTest(fixtures.TestBase): def test_exception_in_transaction(self): eng = testing_engine() - c1 = eng.connect() - with expect_raises_message( - exc.InvalidRequestError, - r"This connection has already initialized a SQLAlchemy " - r"Transaction\(\) object via begin\(\) or autobegin; " - r"isolation_level may not be altered unless rollback\(\) or " - r"commit\(\) is called first.", - ): - with c1.begin(): - c1 = c1.execution_options( - isolation_level=self._non_default_isolation_level() - ) + with eng.connect() as c1: + with expect_raises_message( + exc.InvalidRequestError, + r"This connection has already initialized a SQLAlchemy " + r"Transaction\(\) object via begin\(\) or autobegin; " + r"isolation_level may not be altered unless rollback\(\) or " + r"commit\(\) is called first.", + ): + with c1.begin(): + c1 = c1.execution_options( + isolation_level=self._non_default_isolation_level() + ) - # was never set, so we are on original value - eq_( - eng.dialect.get_isolation_level(c1.connection.dbapi_connection), - self._default_isolation_level(), - ) + # was never set, so we are on original value + eq_( + eng.dialect.get_isolation_level( + c1.connection.dbapi_connection + ), + self._default_isolation_level(), + ) def test_per_statement_bzzt(self): assert_raises_message( @@ -1424,22 +1432,26 @@ class IsolationLevelTest(fixtures.TestBase): } ), ) - conn = eng.connect() - eq_( - eng.dialect.get_isolation_level(conn.connection.dbapi_connection), - self._non_default_isolation_level(), - ) + with eng.connect() as conn: + eq_( + eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ), + self._non_default_isolation_level(), + ) def test_per_option_engine(self): eng = testing_engine(testing.db.url).execution_options( isolation_level=self._non_default_isolation_level() ) - conn = eng.connect() - eq_( - eng.dialect.get_isolation_level(conn.connection.dbapi_connection), - self._non_default_isolation_level(), - ) + with eng.connect() as conn: + eq_( + eng.dialect.get_isolation_level( + conn.connection.dbapi_connection + ), + self._non_default_isolation_level(), + ) def test_isolation_level_accessors_connection_default(self): eng = testing_engine(testing.db.url) diff --git a/test/ext/asyncio/test_engine_py3k.py b/test/ext/asyncio/test_engine_py3k.py index 2eebb433d..bce669e4f 100644 --- a/test/ext/asyncio/test_engine_py3k.py +++ b/test/ext/asyncio/test_engine_py3k.py @@ -31,6 +31,7 @@ from sqlalchemy.testing import combinations from sqlalchemy.testing import config from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ +from sqlalchemy.testing import eq_regex from sqlalchemy.testing import expect_raises from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import fixtures @@ -339,6 +340,68 @@ class AsyncEngineTest(EngineFixture): is_false(t1 == None) + @testing.variation("simulate_gc", [True, False]) + def test_appropriate_warning_for_gced_connection( + self, async_engine, simulate_gc + ): + """test #9237 which builds upon a not really complete solution + added for #8419.""" + + async def go(): + conn = await async_engine.connect() + await conn.begin() + await conn.execute(select(1)) + pool_connection = await conn.get_raw_connection() + return pool_connection + + from sqlalchemy.util.concurrency import await_only + + pool_connection = await_only(go()) + + rec = pool_connection._connection_record + ref = rec.fairy_ref + pool = pool_connection._pool + echo = False + + if simulate_gc: + # not using expect_warnings() here because we also want to do a + # negative test for warnings, and we want to absolutely make sure + # the thing here that emits the warning is the correct path + from sqlalchemy.pool.base import _finalize_fairy + + with mock.patch.object( + pool._dialect, + "do_rollback", + mock.Mock(side_effect=Exception("can't run rollback")), + ), mock.patch("sqlalchemy.util.warn") as m: + + _finalize_fairy( + None, rec, pool, ref, echo, transaction_was_reset=False + ) + + if async_engine.dialect.has_terminate: + expected_msg = ( + "The garbage collector is trying to clean up.*which will " + "be terminated." + ) + else: + expected_msg = ( + "The garbage collector is trying to clean up.*which will " + "be dropped, as it cannot be safely terminated." + ) + + # [1] == .args, not in 3.7 + eq_regex(m.mock_calls[0][1][0], expected_msg) + else: + # the warning emitted by the pool is inside of a try/except: + # so it's impossible right now to have this warning "raise". + # for now, test by using mock.patch + + with mock.patch("sqlalchemy.util.warn") as m: + pool_connection.close() + + eq_(m.mock_calls, []) + def test_clear_compiled_cache(self, async_engine): async_engine.sync_engine._compiled_cache["foo"] = "bar" eq_(async_engine.sync_engine._compiled_cache["foo"], "bar") |
