diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-05-14 12:50:11 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-05-17 16:32:22 -0400 |
| commit | 0e53221eef50b3274841fbd1eb41e32f5dfc4e69 (patch) | |
| tree | e1c13d067b0db5e0aeee8b934dc0f934154ef9f3 /test | |
| parent | 79de84b25e87bbb7fa94f0dd513b4abc76e05a7e (diff) | |
| download | sqlalchemy-0e53221eef50b3274841fbd1eb41e32f5dfc4e69.tar.gz | |
Update transaction / connection handling
step one, do away with __connection attribute and using
awkward AttributeError logic
step two, move all management of "connection._transaction"
into the transaction objects themselves where it's easier
to follow.
build MarkerTransaction that takes the role of
"do-nothing block"
new connection datamodel is: connection._transaction, always
a root, connection._nested_transaction, always a nested.
nested transactions still chain to each other as this
is still sort of necessary but they consider the root
transaction separately, and the marker transactions
not at all.
introduce new InvalidRequestError subclass
PendingRollbackError. Apply to connection and session
for all cases where a transaction needs to be rolled
back before continuing. Within Connection,
both PendingRollbackError as well as ResourceClosedError
are now raised directly without being handled by
handle_dbapi_error(); this removes these two exception
cases from the handle_error event handler as well as
from StatementError wrapping, as these two exceptions are
not statement oriented and are instead programmatic
issues, that the application is failing to handle database
errors properly.
Revise savepoints so that when a release fails, they set
themselves as inactive so that their rollback() method
does not throw another exception.
Give savepoints another go on MySQL, can't get release working
however get support for basic round trip going
Fixes: #5327
Change-Id: Ia3cbbf56d4882fcc7980f90519412f1711fae74d
Diffstat (limited to 'test')
| -rw-r--r-- | test/engine/test_reconnect.py | 193 | ||||
| -rw-r--r-- | test/engine/test_transaction.py | 663 | ||||
| -rw-r--r-- | test/orm/test_transaction.py | 27 | ||||
| -rw-r--r-- | test/requirements.py | 22 |
4 files changed, 653 insertions, 252 deletions
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index a09b04748..f0d0a9b2f 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -103,8 +103,21 @@ def mock_connection(): else: return + def commit(): + if conn.explode == "commit": + raise MockDisconnect("Lost the DB connection on commit") + elif conn.explode == "commit_no_disconnect": + raise MockError( + "something broke on commit but we didn't lose the " + "connection" + ) + else: + return + conn = Mock( - rollback=Mock(side_effect=rollback), cursor=Mock(side_effect=cursor()) + rollback=Mock(side_effect=rollback), + commit=Mock(side_effect=commit), + cursor=Mock(side_effect=cursor()), ) return conn @@ -420,7 +433,7 @@ class MockReconnectTest(fixtures.TestBase): [[call()], [call()], []], ) - def test_invalidate_trans(self): + def test_invalidate_on_execute_trans(self): conn = self.db.connect() trans = conn.begin() self.dbapi.shutdown() @@ -432,7 +445,7 @@ class MockReconnectTest(fixtures.TestBase): assert conn.invalidated assert trans.is_active assert_raises_message( - tsa.exc.StatementError, + tsa.exc.PendingRollbackError, "Can't reconnect until invalid transaction is rolled back", conn.execute, select([1]), @@ -440,12 +453,30 @@ class MockReconnectTest(fixtures.TestBase): assert trans.is_active assert_raises_message( - tsa.exc.InvalidRequestError, + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + trans.commit, + ) + + # now it's inactive... + assert not trans.is_active + + # but still associated with the connection + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + conn.execute, + select([1]), + ) + assert not trans.is_active + + # still can't commit... error stays the same + assert_raises_message( + tsa.exc.PendingRollbackError, "Can't reconnect until invalid transaction is rolled back", trans.commit, ) - assert trans.is_active trans.rollback() assert not trans.is_active conn.execute(select([1])) @@ -455,6 +486,104 @@ class MockReconnectTest(fixtures.TestBase): [[call()], []], ) + def test_invalidate_on_commit_trans(self): + conn = self.db.connect() + trans = conn.begin() + self.dbapi.shutdown("commit") + + assert_raises(tsa.exc.DBAPIError, trans.commit) + + assert not conn.closed + assert conn.invalidated + assert not trans.is_active + + # error stays consistent + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + conn.execute, + select([1]), + ) + assert not trans.is_active + + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + trans.commit, + ) + + assert not trans.is_active + + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + conn.execute, + select([1]), + ) + assert not trans.is_active + + trans.rollback() + assert not trans.is_active + conn.execute(select([1])) + assert not conn.invalidated + + def test_commit_fails_contextmanager(self): + # this test is also performed in test/engine/test_transaction.py + # using real connections + conn = self.db.connect() + + def go(): + with conn.begin(): + self.dbapi.shutdown("commit_no_disconnect") + + assert_raises(tsa.exc.DBAPIError, go) + + assert not conn.in_transaction() + + def test_commit_fails_trans(self): + # this test is also performed in test/engine/test_transaction.py + # using real connections + + conn = self.db.connect() + trans = conn.begin() + self.dbapi.shutdown("commit_no_disconnect") + + assert_raises(tsa.exc.DBAPIError, trans.commit) + + assert not conn.closed + assert not conn.invalidated + assert not trans.is_active + + # error stays consistent + assert_raises_message( + tsa.exc.PendingRollbackError, + "This connection is on an inactive transaction. Please rollback", + conn.execute, + select([1]), + ) + assert not trans.is_active + + assert_raises_message( + tsa.exc.PendingRollbackError, + "This connection is on an inactive transaction. Please rollback", + trans.commit, + ) + + assert not trans.is_active + + assert_raises_message( + tsa.exc.PendingRollbackError, + "This connection is on an inactive transaction. Please rollback", + conn.execute, + select([1]), + ) + assert not trans.is_active + + trans.rollback() + assert not trans.is_active + conn.execute(select([1])) + assert not conn.invalidated + def test_invalidate_dont_call_finalizer(self): conn = self.db.connect() finalizer = mock.Mock() @@ -497,9 +626,9 @@ class MockReconnectTest(fixtures.TestBase): conn.close() assert conn.closed - assert conn.invalidated + assert not conn.invalidated assert_raises_message( - tsa.exc.StatementError, + tsa.exc.ResourceClosedError, "This Connection is closed", conn.execute, select([1]), @@ -544,7 +673,7 @@ class MockReconnectTest(fixtures.TestBase): assert not conn.invalidated assert_raises_message( - tsa.exc.StatementError, + tsa.exc.ResourceClosedError, "This Connection is closed", conn.execute, select([1]), @@ -594,10 +723,10 @@ class MockReconnectTest(fixtures.TestBase): ) assert conn.closed - assert conn.invalidated + assert not conn.invalidated assert_raises_message( - tsa.exc.StatementError, + tsa.exc.ResourceClosedError, "This Connection is closed", conn.execute, select([1]), @@ -955,7 +1084,7 @@ class RealReconnectTest(fixtures.TestBase): _assert_invalidated(c1_branch.execute, select([1])) assert not c1_branch.closed - assert not c1_branch._connection_is_valid + assert not c1_branch._still_open_and_dbapi_connection_is_valid def test_ensure_is_disconnect_gets_connection(self): def is_disconnect(e, conn, cursor): @@ -1062,6 +1191,7 @@ class RealReconnectTest(fixtures.TestBase): def test_with_transaction(self): conn = self.engine.connect() trans = conn.begin() + assert trans.is_valid eq_(conn.execute(select([1])).scalar(), 1) assert not conn.closed self.engine.test_shutdown() @@ -1069,21 +1199,56 @@ class RealReconnectTest(fixtures.TestBase): assert not conn.closed assert conn.invalidated assert trans.is_active + assert not trans.is_valid + assert_raises_message( - tsa.exc.StatementError, + tsa.exc.PendingRollbackError, "Can't reconnect until invalid transaction is rolled back", conn.execute, select([1]), ) assert trans.is_active + assert not trans.is_valid + assert_raises_message( - tsa.exc.InvalidRequestError, + tsa.exc.PendingRollbackError, "Can't reconnect until invalid transaction is rolled back", trans.commit, ) - assert trans.is_active + + # becomes inactive + assert not trans.is_active + assert not trans.is_valid + + # still asks us to rollback + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + conn.execute, + select([1]), + ) + + # still asks us.. + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + trans.commit, + ) + + # still...it's being consistent in what it is asking. + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + conn.execute, + select([1]), + ) + + # OK! trans.rollback() assert not trans.is_active + assert not trans.is_valid + + # conn still invalid but we can reconnect assert conn.invalidated eq_(conn.execute(select([1])).scalar(), 1) assert not conn.invalidated diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index fbc1ffd83..164604cd6 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -11,8 +11,10 @@ from sqlalchemy import select from sqlalchemy import String from sqlalchemy import testing from sqlalchemy import text +from sqlalchemy import util from sqlalchemy import VARCHAR from sqlalchemy.future import select as future_select +from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import expect_warnings @@ -49,8 +51,13 @@ class TransactionTest(fixtures.TestBase): def teardown_class(cls): users.drop(testing.db) - def test_commits(self): - connection = testing.db.connect() + @testing.fixture + def local_connection(self): + with testing.db.connect() as conn: + yield conn + + def test_commits(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") transaction.commit() @@ -66,10 +73,10 @@ class TransactionTest(fixtures.TestBase): transaction.commit() connection.close() - def test_rollback(self): + def test_rollback(self, local_connection): """test a basic rollback""" - connection = testing.db.connect() + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -77,10 +84,9 @@ class TransactionTest(fixtures.TestBase): transaction.rollback() result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() - def test_raise(self): - connection = testing.db.connect() + def test_raise(self, local_connection): + connection = local_connection transaction = connection.begin() try: @@ -95,10 +101,9 @@ class TransactionTest(fixtures.TestBase): result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() - def test_nested_rollback(self): - connection = testing.db.connect() + def test_nested_rollback(self, local_connection): + connection = local_connection try: transaction = connection.begin() try: @@ -129,176 +134,338 @@ class TransactionTest(fixtures.TestBase): transaction.rollback() raise except Exception as e: - try: - # and not "This transaction is inactive" - # comment moved here to fix pep8 - assert str(e) == "uh oh" - finally: - connection.close() + # and not "This transaction is inactive" + # comment moved here to fix pep8 + assert str(e) == "uh oh" + else: + assert False - def test_branch_nested_rollback(self): - connection = testing.db.connect() - try: - connection.begin() - branched = connection.connect() - assert branched.in_transaction() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - assert not connection.in_transaction() + def test_branch_nested_rollback(self, local_connection): + connection = local_connection + connection.begin() + branched = connection.connect() + assert branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name="user1") + nested = branched.begin() + branched.execute(users.insert(), user_id=2, user_name="user2") + nested.rollback() + assert not connection.in_transaction() - assert_raises_message( - exc.InvalidRequestError, - "This connection is on an inactive transaction. Please", - connection.exec_driver_sql, - "select 1", - ) + assert_raises_message( + exc.InvalidRequestError, + "This connection is on an inactive transaction. Please", + connection.exec_driver_sql, + "select 1", + ) - finally: - connection.close() + def test_no_marker_on_inactive_trans(self, local_connection): + conn = local_connection + conn.begin() - def test_inactive_due_to_subtransaction_no_commit(self): - connection = testing.db.connect() + mk1 = conn.begin() + + mk1.rollback() + + assert_raises_message( + exc.InvalidRequestError, + "the current transaction on this connection is inactive.", + conn.begin, + ) + + @testing.requires.savepoints + def test_savepoint_cancelled_by_toplevel_marker(self, local_connection): + conn = local_connection + trans = conn.begin() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + mk1 = conn.begin() + + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + mk1.rollback() + + assert not sp1.is_active + assert not trans.is_active + assert conn._transaction is trans + assert conn._nested_transaction is None + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 0, + ) + + def test_inactive_due_to_subtransaction_no_commit(self, local_connection): + connection = local_connection trans = connection.begin() trans2 = connection.begin() trans2.rollback() assert_raises_message( exc.InvalidRequestError, + "This connection is on an inactive transaction. Please rollback", + trans.commit, + ) + + trans.rollback() + + assert_raises_message( + exc.InvalidRequestError, "This transaction is inactive", trans.commit, ) - def test_branch_autorollback(self): - connection = testing.db.connect() - try: - branched = connection.connect() - branched.execute(users.insert(), user_id=1, user_name="user1") - try: - branched.execute(users.insert(), user_id=1, user_name="user1") - except exc.DBAPIError: - pass - finally: - connection.close() + @testing.requires.savepoints + def test_inactive_due_to_subtransaction_on_nested_no_commit( + self, local_connection + ): + connection = local_connection + trans = connection.begin() - def test_branch_orig_rollback(self): - connection = testing.db.connect() - try: - branched = connection.connect() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin() - assert branched.in_transaction() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, - ) + nested = connection.begin_nested() - finally: - connection.close() + trans2 = connection.begin() + trans2.rollback() - def test_branch_autocommit(self): - connection = testing.db.connect() - try: - branched = connection.connect() - branched.execute(users.insert(), user_id=1, user_name="user1") - finally: - connection.close() + assert_raises_message( + exc.InvalidRequestError, + "This connection is on an inactive savepoint transaction. " + "Please rollback", + nested.commit, + ) + trans.commit() + + assert_raises_message( + exc.InvalidRequestError, + "This nested transaction is inactive", + nested.commit, + ) + + def test_branch_autorollback(self, local_connection): + connection = local_connection + branched = connection.connect() + branched.execute(users.insert(), dict(user_id=1, user_name="user1")) + assert_raises( + exc.DBAPIError, + branched.execute, + users.insert(), + dict(user_id=1, user_name="user1"), + ) + # can continue w/o issue + branched.execute(users.insert(), dict(user_id=2, user_name="user2")) + + def test_branch_orig_rollback(self, local_connection): + connection = local_connection + branched = connection.connect() + branched.execute(users.insert(), dict(user_id=1, user_name="user1")) + nested = branched.begin() + assert branched.in_transaction() + branched.execute(users.insert(), dict(user_id=2, user_name="user2")) + nested.rollback() eq_( - testing.db.execute( - text("select count(*) from query_users") + connection.exec_driver_sql( + "select count(*) from query_users" ).scalar(), 1, ) - @testing.requires.savepoints - def test_branch_savepoint_rollback(self): - connection = testing.db.connect() - try: - trans = connection.begin() + @testing.requires.independent_connections + def test_branch_autocommit(self, local_connection): + with testing.db.connect() as connection: branched = connection.connect() - assert branched.in_transaction() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin_nested() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - assert connection.in_transaction() - trans.commit() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, + branched.execute( + users.insert(), dict(user_id=1, user_name="user1") ) - finally: - connection.close() + eq_( + local_connection.execute( + text("select count(*) from query_users") + ).scalar(), + 1, + ) + + @testing.requires.savepoints + def test_branch_savepoint_rollback(self, local_connection): + connection = local_connection + trans = connection.begin() + branched = connection.connect() + assert branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name="user1") + nested = branched.begin_nested() + branched.execute(users.insert(), user_id=2, user_name="user2") + nested.rollback() + assert connection.in_transaction() + trans.commit() + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 1, + ) @testing.requires.two_phase_transactions - def test_branch_twophase_rollback(self): - connection = testing.db.connect() - try: - branched = connection.connect() - assert not branched.in_transaction() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin_twophase() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - assert not connection.in_transaction() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, - ) + def test_branch_twophase_rollback(self, local_connection): + connection = local_connection + branched = connection.connect() + assert not branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name="user1") + nested = branched.begin_twophase() + branched.execute(users.insert(), user_id=2, user_name="user2") + nested.rollback() + assert not connection.in_transaction() + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 1, + ) + + def test_commit_fails_flat(self, local_connection): + connection = local_connection + + t1 = connection.begin() + + with mock.patch.object( + connection, + "_commit_impl", + mock.Mock(side_effect=exc.DBAPIError("failure", None, None, None)), + ): + assert_raises_message(exc.DBAPIError, r"failure", t1.commit) + + assert not t1.is_active + t1.rollback() # no error + + def test_commit_fails_ctxmanager(self, local_connection): + connection = local_connection + + transaction = [None] + + def go(): + with mock.patch.object( + connection, + "_commit_impl", + mock.Mock( + side_effect=exc.DBAPIError("failure", None, None, None) + ), + ): + with connection.begin() as t1: + transaction[0] = t1 + + assert_raises_message(exc.DBAPIError, r"failure", go) + + t1 = transaction[0] + assert not t1.is_active + t1.rollback() # no error + + @testing.requires.savepoints_w_release + def test_savepoint_rollback_fails_flat(self, local_connection): + connection = local_connection + t1 = connection.begin() + + s1 = connection.begin_nested() + + # force the "commit" of the savepoint that occurs + # when the "with" block fails, e.g. + # the RELEASE, to fail, because the savepoint is already + # released. + connection.dialect.do_release_savepoint(connection, s1._savepoint) + + assert_raises_message( + exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", s1.rollback + ) - finally: - connection.close() + assert not s1.is_active + + with testing.expect_warnings("nested transaction already"): + s1.rollback() # no error (though it warns) + + t1.commit() # no error - @testing.requires.python2 @testing.requires.savepoints_w_release - def test_savepoint_release_fails_warning(self): + def test_savepoint_release_fails_flat(self): with testing.db.connect() as connection: - connection.begin() + t1 = connection.begin() - with expect_warnings( - "An exception has occurred during handling of a previous " - "exception. The previous exception " - r"is:.*..SQL\:.*RELEASE SAVEPOINT" - ): + s1 = connection.begin_nested() + + # force the "commit" of the savepoint that occurs + # when the "with" block fails, e.g. + # the RELEASE, to fail, because the savepoint is already + # released. + connection.dialect.do_release_savepoint(connection, s1._savepoint) + + assert_raises_message( + exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", s1.commit + ) - def go(): - with connection.begin_nested() as savepoint: - connection.dialect.do_release_savepoint( - connection, savepoint._savepoint - ) + assert not s1.is_active + s1.rollback() # no error. prior to 1.4 this would try to rollback - assert_raises_message( - exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", go + t1.commit() # no error + + @testing.requires.savepoints_w_release + def test_savepoint_release_fails_ctxmanager(self, local_connection): + connection = local_connection + connection.begin() + + savepoint = [None] + + def go(): + + with connection.begin_nested() as sp: + savepoint[0] = sp + # force the "commit" of the savepoint that occurs + # when the "with" block fails, e.g. + # the RELEASE, to fail, because the savepoint is already + # released. + connection.dialect.do_release_savepoint( + connection, sp._savepoint ) - def test_retains_through_options(self): - connection = testing.db.connect() - try: - transaction = connection.begin() - connection.execute(users.insert(), user_id=1, user_name="user1") - conn2 = connection.execution_options(dummy=True) - conn2.execute(users.insert(), user_id=2, user_name="user2") - transaction.rollback() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 0, - ) - finally: - connection.close() + # prior to SQLAlchemy 1.4, the above release would fail + # and then the savepoint would try to rollback, and that failed + # also, causing a long exception chain that under Python 2 + # was particularly hard to diagnose, leading to issue + # #2696 which eventually impacted Openstack, and we + # had to add warnings that show what the "context" for an + # exception was. The SQL for the exception was + # ROLLBACK TO SAVEPOINT, and up the exception chain would be + # the RELEASE failing. + # + # now, when the savepoint "commit" fails, it sets itself as + # inactive. so it does not try to rollback and it cleans + # itself out appropriately. + # + + exc_ = assert_raises_message( + exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", go + ) + savepoint = savepoint[0] + assert not savepoint.is_active - def test_nesting(self): - connection = testing.db.connect() + if util.py3k: + # driver error + assert exc_.__cause__ + + # and that's it, no other context + assert not exc_.__cause__.__context__ + + def test_retains_through_options(self, local_connection): + connection = local_connection + transaction = connection.begin() + connection.execute(users.insert(), user_id=1, user_name="user1") + conn2 = connection.execution_options(dummy=True) + conn2.execute(users.insert(), user_id=2, user_name="user2") + transaction.rollback() + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 0, + ) + + def test_nesting(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -316,10 +483,9 @@ class TransactionTest(fixtures.TestBase): ) result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() - def test_with_interface(self): - connection = testing.db.connect() + def test_with_interface(self, local_connection): + connection = local_connection trans = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -346,10 +512,9 @@ class TransactionTest(fixtures.TestBase): ).scalar() == 1 ) - connection.close() - def test_close(self): - connection = testing.db.connect() + def test_close(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -370,10 +535,9 @@ class TransactionTest(fixtures.TestBase): ) result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 5 - connection.close() - def test_close2(self): - connection = testing.db.connect() + def test_close2(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -394,11 +558,10 @@ class TransactionTest(fixtures.TestBase): ) result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() @testing.requires.savepoints - def test_nested_subtransaction_rollback(self): - connection = testing.db.connect() + def test_nested_subtransaction_rollback(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -412,11 +575,10 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (3,)], ) - connection.close() @testing.requires.savepoints - def test_nested_subtransaction_commit(self): - connection = testing.db.connect() + def test_nested_subtransaction_commit(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -430,11 +592,10 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (2,), (3,)], ) - connection.close() @testing.requires.savepoints - def test_rollback_to_subtransaction(self): - connection = testing.db.connect() + def test_rollback_to_subtransaction(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -451,6 +612,7 @@ class TransactionTest(fixtures.TestBase): "select 1", ) trans2.rollback() + assert connection._nested_transaction is None connection.execute(users.insert(), user_id=4, user_name="user4") transaction.commit() @@ -460,11 +622,10 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (4,)], ) - connection.close() @testing.requires.two_phase_transactions - def test_two_phase_transaction(self): - connection = testing.db.connect() + def test_two_phase_transaction(self, local_connection): + connection = local_connection transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name="user1") transaction.prepare() @@ -487,7 +648,6 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (2,)], ) - connection.close() # PG emergency shutdown: # select * from pg_prepared_xacts @@ -495,12 +655,11 @@ class TransactionTest(fixtures.TestBase): # MySQL emergency shutdown: # for arg in `mysql -u root -e "xa recover" | cut -c 8-100 | # grep sa`; do mysql -u root -e "xa rollback '$arg'"; done - @testing.crashes("mysql", "Crashing on 5.5, not worth it") @testing.requires.skip_mysql_on_windows @testing.requires.two_phase_transactions @testing.requires.savepoints - def test_mixed_two_phase_transaction(self): - connection = testing.db.connect() + def test_mixed_two_phase_transaction(self, local_connection): + connection = local_connection transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name="user1") transaction2 = connection.begin() @@ -521,44 +680,46 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (2,), (5,)], ) - connection.close() @testing.requires.two_phase_transactions @testing.requires.two_phase_recovery def test_two_phase_recover(self): - # MySQL recovery doesn't currently seem to work correctly - # Prepared transactions disappear when connections are closed - # and even when they aren't it doesn't seem possible to use the - # recovery id. + # 2020, still can't get this to work w/ modern MySQL or MariaDB. + # the XA RECOVER comes back as bytes, OK, convert to string, + # XA COMMIT then says Unknown XID. Also, the drivers seem to be + # killing off the XID if I use the connection.invalidate() before + # trying to access in another connection. Not really worth it + # unless someone wants to step through how mysqlclient / pymysql + # support this correctly. connection = testing.db.connect() + transaction = connection.begin_twophase() - connection.execute(users.insert(), user_id=1, user_name="user1") + connection.execute(users.insert(), dict(user_id=1, user_name="user1")) transaction.prepare() connection.invalidate() - connection2 = testing.db.connect() - eq_( - connection2.execution_options(autocommit=True) - .execute(select([users.c.user_id]).order_by(users.c.user_id)) - .fetchall(), - [], - ) - recoverables = connection2.recover_twophase() - assert transaction.xid in recoverables - connection2.commit_prepared(transaction.xid, recover=True) - eq_( - connection2.execute( - select([users.c.user_id]).order_by(users.c.user_id) - ).fetchall(), - [(1,)], - ) - connection2.close() + with testing.db.connect() as connection2: + eq_( + connection2.execution_options(autocommit=True) + .execute(select([users.c.user_id]).order_by(users.c.user_id)) + .fetchall(), + [], + ) + recoverables = connection2.recover_twophase() + assert transaction.xid in recoverables + connection2.commit_prepared(transaction.xid, recover=True) + eq_( + connection2.execute( + select([users.c.user_id]).order_by(users.c.user_id) + ).fetchall(), + [(1,)], + ) @testing.requires.two_phase_transactions - def test_multiple_two_phase(self): - conn = testing.db.connect() + def test_multiple_two_phase(self, local_connection): + conn = local_connection xa = conn.begin_twophase() conn.execute(users.insert(), user_id=1, user_name="user1") xa.prepare() @@ -578,7 +739,6 @@ class TransactionTest(fixtures.TestBase): select([users.c.user_name]).order_by(users.c.user_id) ) eq_(result.fetchall(), [("user1",), ("user4",)]) - conn.close() @testing.requires.two_phase_transactions def test_reset_rollback_two_phase_no_rollback(self): @@ -652,7 +812,7 @@ class ResetAgentTest(fixtures.TestBase): with expect_warnings("Reset agent is not active"): conn.close() - def test_trans_commit_reset_agent_broken_ensure(self): + def test_trans_commit_reset_agent_broken_ensure_pool(self): eng = testing_engine(options={"pool_reset_on_return": "commit"}) conn = eng.connect() trans = conn.begin() @@ -669,8 +829,10 @@ class ResetAgentTest(fixtures.TestBase): assert connection.connection._reset_agent is t1 t2 = connection.begin_nested() assert connection.connection._reset_agent is t1 - assert connection._transaction is t2 + assert connection._nested_transaction is t2 + assert connection._transaction is t1 t2.close() + assert connection._nested_transaction is None assert connection._transaction is t1 assert connection.connection._reset_agent is t1 t1.close() @@ -684,10 +846,15 @@ class ResetAgentTest(fixtures.TestBase): assert connection.connection._reset_agent is t1 t2 = connection.begin_nested() assert connection.connection._reset_agent is t1 - assert connection._transaction is t2 + assert connection._nested_transaction is t2 + assert connection._transaction is t1 assert connection.connection._reset_agent is t1 t1.close() + + assert connection._nested_transaction is None + assert connection._transaction is None + assert connection.connection._reset_agent is None assert not t1.is_active @@ -698,19 +865,25 @@ class ResetAgentTest(fixtures.TestBase): assert connection.connection._reset_agent is t1 t2 = connection.begin_nested() assert connection.connection._reset_agent is t1 - assert connection._transaction is t2 + assert connection._nested_transaction is t2 + assert connection._transaction is t1 t2.close() + assert connection._nested_transaction is None assert connection._transaction is t1 assert connection.connection._reset_agent is t1 t1.rollback() + assert connection._transaction is None assert connection.connection._reset_agent is None + assert not t2.is_active assert not t1.is_active @testing.requires.savepoints def test_begin_nested_close(self): with testing.db.connect() as connection: trans = connection.begin_nested() - assert connection.connection._reset_agent is trans + assert ( + connection.connection._reset_agent is connection._transaction + ) assert not trans.is_active @testing.requires.savepoints @@ -719,7 +892,7 @@ class ResetAgentTest(fixtures.TestBase): trans = connection.begin() trans2 = connection.begin_nested() assert connection.connection._reset_agent is trans - assert trans2.is_active # was never closed + assert not trans2.is_active assert not trans.is_active @testing.requires.savepoints @@ -1177,11 +1350,9 @@ class IsolationLevelTest(fixtures.TestBase): class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): - """The SQLAlchemy 2.0 Connection ensures its own transaction is rolled - back upon close. Therefore the whole "reset agent" thing can go away. - this suite runs through all the reset agent tests to ensure the state - of the transaction is maintained while the "reset agent" feature is not - needed at all. + """Still some debate over if the "reset agent" should apply to the + future connection or not. + """ @@ -1192,7 +1363,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): with testing.db.connect() as connection: event.listen(connection, "rollback", canary) trans = connection.begin() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans + assert not trans.is_active eq_(canary.mock_calls, [mock.call(connection)]) @@ -1201,7 +1373,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): with testing.db.connect() as connection: event.listen(connection, "rollback", canary) trans = connection.begin() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None assert not trans.is_active @@ -1213,7 +1385,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "rollback", canary.rollback) event.listen(connection, "commit", canary.commit) trans = connection.begin() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None assert not trans.is_active @@ -1226,8 +1398,11 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "rollback", canary.rollback) event.listen(connection, "commit", canary.commit) trans = connection.begin_nested() - assert connection.connection._reset_agent is None - assert trans.is_active # it's a savepoint + assert ( + connection.connection._reset_agent is connection._transaction + ) + # it's a savepoint, but root made sure it closed + assert not trans.is_active eq_(canary.mock_calls, [mock.call.rollback(connection)]) @testing.requires.savepoints @@ -1238,8 +1413,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is None - assert trans2.is_active # was never closed + assert connection.connection._reset_agent is trans + assert not trans2.is_active assert not trans.is_active eq_(canary.mock_calls, [mock.call.rollback(connection)]) @@ -1254,15 +1429,15 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans2.rollback() # this is not a connection level event - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None eq_( canary.mock_calls, [ - mock.call.rollback_savepoint(connection, mock.ANY, trans), + mock.call.rollback_savepoint(connection, mock.ANY, None), mock.call.commit(connection), ], ) @@ -1275,9 +1450,9 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans2.rollback() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None eq_(canary.mock_calls, [mock.call.rollback(connection)]) @@ -1292,7 +1467,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): ) event.listen(connection, "commit", canary.commit) trans = connection.begin_twophase() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans assert not trans.is_active eq_( canary.mock_calls, @@ -1307,7 +1482,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) event.listen(connection, "commit_twophase", canary.commit_twophase) trans = connection.begin_twophase() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None eq_( @@ -1325,7 +1500,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): ) event.listen(connection, "commit", canary.commit) trans = connection.begin_twophase() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None eq_( @@ -1520,7 +1695,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): conn.invalidate() assert_raises_message( - exc.StatementError, + exc.PendingRollbackError, "Can't reconnect", conn.execute, select([1]), @@ -1672,7 +1847,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): with testing.db.begin() as conn: conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - conn.begin_nested() + sp1 = conn.begin_nested() conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) sp2 = conn.begin_nested() @@ -1680,8 +1855,12 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): sp2.rollback() + assert not sp2.is_active + assert sp1.is_active assert conn.in_transaction() + assert not sp1.is_active + with testing.db.connect() as conn: eq_( conn.scalar(future_select(func.count(1)).select_from(users)), @@ -1721,13 +1900,21 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): sp1 = conn.begin_nested() conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + assert conn._nested_transaction is sp1 + sp2 = conn.begin_nested() conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + assert conn._nested_transaction is sp2 + sp2.commit() + assert conn._nested_transaction is sp1 + sp1.rollback() + assert conn._nested_transaction is None + assert conn.in_transaction() with testing.db.connect() as conn: @@ -1735,3 +1922,33 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): conn.scalar(future_select(func.count(1)).select_from(users)), 1, ) + + @testing.requires.savepoints + def test_savepoint_seven(self): + users = self.tables.users + + conn = testing.db.connect() + trans = conn.begin() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + + assert conn.in_transaction() + + trans.close() + + assert not sp1.is_active + assert not sp2.is_active + assert not trans.is_active + assert conn._transaction is None + assert conn._nested_transaction is None + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 0, + ) diff --git a/test/orm/test_transaction.py b/test/orm/test_transaction.py index 78a62199a..22e7363b0 100644 --- a/test/orm/test_transaction.py +++ b/test/orm/test_transaction.py @@ -367,13 +367,25 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): sess.add(u) sess.flush() c1 = sess.connection(User) + dbapi_conn = c1.connection + assert dbapi_conn.is_valid sess.invalidate() - assert c1.invalidated + + # Connection object is closed + assert c1.closed + + # "invalidated" is not part of "closed" state + assert not c1.invalidated + + # but the DBAPI conn (really ConnectionFairy) + # is invalidated + assert not dbapi_conn.is_valid eq_(sess.query(User).all(), []) c2 = sess.connection(User) assert not c2.invalidated + assert c2.connection.is_valid def test_subtransaction_on_noautocommit(self): User, users = self.classes.User, self.tables.users @@ -859,7 +871,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): except Exception: trans2.rollback(_capture_exception=True) assert_raises_message( - sa_exc.InvalidRequestError, + sa_exc.PendingRollbackError, r"This Session's transaction has been rolled back due to a " r"previous exception during flush. To begin a new transaction " r"with this Session, first issue Session.rollback\(\). " @@ -1001,7 +1013,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): for i in range(5): assert_raises_message( - sa_exc.InvalidRequestError, + sa_exc.PendingRollbackError, "^This Session's transaction has been " r"rolled back due to a previous exception " "during flush. To " @@ -1037,7 +1049,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): with expect_warnings(".*during handling of a previous exception.*"): session.begin_nested() - savepoint = session.connection()._transaction._savepoint + savepoint = session.connection()._nested_transaction._savepoint # force the savepoint to disappear session.connection().dialect.do_release_savepoint( @@ -1708,7 +1720,12 @@ class SavepointTest(_LocalFixture): nested_trans._do_commit() is_(s.transaction, trans) - assert_raises(sa_exc.DBAPIError, s.rollback) + + with expect_warnings("nested transaction already deassociated"): + # this previously would raise + # "savepoint "sa_savepoint_1" does not exist", however as of + # #5327 the savepoint already knows it's inactive + s.rollback() assert u1 not in s.new diff --git a/test/requirements.py b/test/requirements.py index ed047d790..c07717aa8 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -720,7 +720,7 @@ class DefaultRequirements(SuiteRequirements): def pg_prepared_transaction(config): if not against(config, "postgresql"): - return False + return True with config.db.connect() as conn: try: @@ -743,19 +743,19 @@ class DefaultRequirements(SuiteRequirements): "oracle", "two-phase xact not implemented in SQLA/oracle" ), no_support( - "drizzle", "two-phase xact not supported by database" - ), - no_support( "sqlite", "two-phase xact not supported by database" ), no_support( "sybase", "two-phase xact not supported by drivers/SQLA" ), - no_support( - "mysql", - "recent MySQL communiity editions have too many issues " - "(late 2016), disabling for now", - ), + # in Ia3cbbf56d4882fcc7980f90519412f1711fae74d + # we are evaluating which modern MySQL / MariaDB versions + # can handle two-phase testing without too many problems + # no_support( + # "mysql", + # "recent MySQL communiity editions have too many issues " + # "(late 2016), disabling for now", + # ), NotPredicate( LambdaPredicate( pg_prepared_transaction, @@ -768,7 +768,9 @@ class DefaultRequirements(SuiteRequirements): @property def two_phase_recovery(self): return self.two_phase_transactions + ( - skip_if("mysql", "crashes on most mariadb and mysql versions") + skip_if( + "mysql", "still can't get recover to work w/ MariaDB / MySQL" + ) ) @property |
