summaryrefslogtreecommitdiff
path: root/test/engine/test_deprecations.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_deprecations.py')
-rw-r--r--test/engine/test_deprecations.py351
1 files changed, 351 insertions, 0 deletions
diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py
index 47e59b55d..6acc2967d 100644
--- a/test/engine/test_deprecations.py
+++ b/test/engine/test_deprecations.py
@@ -425,6 +425,357 @@ class TransactionTest(fixtures.TablesTest):
with testing.db.connect() as conn:
eq_(conn.execute(users.select()).fetchall(), [(1, "user1")])
+ def test_begin_begin_rollback_rollback(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans2.rollback()
+ assert connection.connection._reset_agent is None
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+
+ def test_begin_begin_commit_commit(self):
+ with testing.db.connect() as connection:
+ trans = connection.begin()
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ assert connection.connection._reset_agent is trans
+ trans2.commit()
+ assert connection.connection._reset_agent is trans
+ trans.commit()
+ assert connection.connection._reset_agent is None
+
+ def test_branch_nested_rollback(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ connection.begin()
+ branched = connection.connect()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name="user1")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ nested = branched.begin()
+ branched.execute(users.insert(), user_id=2, user_name="user2")
+ nested.rollback()
+ assert not connection.in_transaction()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive transaction. Please",
+ connection.exec_driver_sql,
+ "select 1",
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_cancelled_by_toplevel_marker(self, local_connection):
+ conn = local_connection
+ users = self.tables.users
+ trans = conn.begin()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ mk1 = conn.begin()
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ mk1.rollback()
+
+ assert not sp1.is_active
+ assert not trans.is_active
+ assert conn._transaction is trans
+ assert conn._nested_transaction is None
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ @testing.requires.savepoints
+ def test_rollback_to_subtransaction(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ trans2 = connection.begin_nested()
+ connection.execute(users.insert(), user_id=2, user_name="user2")
+
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans3 = connection.begin()
+ connection.execute(users.insert(), user_id=3, user_name="user3")
+ trans3.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive savepoint transaction.",
+ connection.exec_driver_sql,
+ "select 1",
+ )
+ trans2.rollback()
+ assert connection._nested_transaction is None
+
+ connection.execute(users.insert(), user_id=4, user_name="user4")
+ transaction.commit()
+ eq_(
+ connection.execute(
+ select(users.c.user_id).order_by(users.c.user_id)
+ ).fetchall(),
+ [(1,), (4,)],
+ )
+
+ # PG emergency shutdown:
+ # select * from pg_prepared_xacts
+ # ROLLBACK PREPARED '<xid>'
+ # MySQL emergency shutdown:
+ # for arg in `mysql -u root -e "xa recover" | cut -c 8-100 |
+ # grep sa`; do mysql -u root -e "xa rollback '$arg'"; done
+ @testing.requires.skip_mysql_on_windows
+ @testing.requires.two_phase_transactions
+ @testing.requires.savepoints
+ def test_mixed_two_phase_transaction(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ transaction = connection.begin_twophase()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ transaction2 = connection.begin()
+ connection.execute(users.insert(), user_id=2, user_name="user2")
+ transaction3 = connection.begin_nested()
+ connection.execute(users.insert(), user_id=3, user_name="user3")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ transaction4 = connection.begin()
+ connection.execute(users.insert(), user_id=4, user_name="user4")
+ transaction4.commit()
+ transaction3.rollback()
+ connection.execute(users.insert(), user_id=5, user_name="user5")
+ transaction2.commit()
+ transaction.prepare()
+ transaction.commit()
+ eq_(
+ connection.execute(
+ select(users.c.user_id).order_by(users.c.user_id)
+ ).fetchall(),
+ [(1,), (2,), (5,)],
+ )
+
+ @testing.requires.savepoints
+ def test_inactive_due_to_subtransaction_on_nested_no_commit(
+ self, local_connection
+ ):
+ connection = local_connection
+ trans = connection.begin()
+
+ nested = connection.begin_nested()
+
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ trans2.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive savepoint transaction. "
+ "Please rollback",
+ nested.commit,
+ )
+ trans.commit()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This nested transaction is inactive",
+ nested.commit,
+ )
+
+ def test_close(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), user_id=3, user_name="user3")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ connection.execute(users.insert(), user_id=4, user_name="user4")
+ connection.execute(users.insert(), user_id=5, user_name="user5")
+ assert connection.in_transaction()
+ trans2.close()
+ assert connection.in_transaction()
+ transaction.commit()
+ assert not connection.in_transaction()
+ self.assert_(
+ connection.exec_driver_sql(
+ "select count(*) from " "users"
+ ).scalar()
+ == 5
+ )
+ result = connection.exec_driver_sql("select * from users")
+ assert len(result.fetchall()) == 5
+
+ def test_close2(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), user_id=3, user_name="user3")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ connection.execute(users.insert(), user_id=4, user_name="user4")
+ connection.execute(users.insert(), user_id=5, user_name="user5")
+ assert connection.in_transaction()
+ trans2.close()
+ assert connection.in_transaction()
+ transaction.close()
+ assert not connection.in_transaction()
+ self.assert_(
+ connection.exec_driver_sql(
+ "select count(*) from " "users"
+ ).scalar()
+ == 0
+ )
+ result = connection.exec_driver_sql("select * from users")
+ assert len(result.fetchall()) == 0
+
+ def test_inactive_due_to_subtransaction_no_commit(self, local_connection):
+ connection = local_connection
+ trans = connection.begin()
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ trans2.rollback()
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive transaction. Please rollback",
+ trans.commit,
+ )
+
+ trans.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This transaction is inactive",
+ trans.commit,
+ )
+
+ def test_nested_rollback(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ try:
+ transaction = connection.begin()
+ try:
+ connection.execute(
+ users.insert(), user_id=1, user_name="user1"
+ )
+ connection.execute(
+ users.insert(), user_id=2, user_name="user2"
+ )
+ connection.execute(
+ users.insert(), user_id=3, user_name="user3"
+ )
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ try:
+ connection.execute(
+ users.insert(), user_id=4, user_name="user4"
+ )
+ connection.execute(
+ users.insert(), user_id=5, user_name="user5"
+ )
+ raise Exception("uh oh")
+ trans2.commit()
+ except Exception:
+ trans2.rollback()
+ raise
+ transaction.rollback()
+ except Exception:
+ transaction.rollback()
+ raise
+ except Exception as e:
+ # and not "This transaction is inactive"
+ # comment moved here to fix pep8
+ assert str(e) == "uh oh"
+ else:
+ assert False
+
+ def test_nesting(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), user_id=2, user_name="user2")
+ connection.execute(users.insert(), user_id=3, user_name="user3")
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ trans2 = connection.begin()
+ connection.execute(users.insert(), user_id=4, user_name="user4")
+ connection.execute(users.insert(), user_id=5, user_name="user5")
+ trans2.commit()
+ transaction.rollback()
+ self.assert_(
+ connection.exec_driver_sql(
+ "select count(*) from " "users"
+ ).scalar()
+ == 0
+ )
+ result = connection.exec_driver_sql("select * from users")
+ assert len(result.fetchall()) == 0
+
+ def test_no_marker_on_inactive_trans(self, local_connection):
+ conn = local_connection
+ conn.begin()
+
+ with testing.expect_deprecated_20(
+ r"Calling .begin\(\) when a transaction is already "
+ "begun, creating a 'sub' transaction"
+ ):
+ mk1 = conn.begin()
+
+ mk1.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "the current transaction on this connection is inactive.",
+ conn.begin,
+ )
+
def test_implicit_autocommit_compiled(self):
users = self.tables.users