From 14de0a97b80cc0f63208b61654dd0348fd3da8b1 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 3 Sep 2015 18:59:05 +0000 Subject: QPID-5855 - Simplified HA transaction logic. Removed complex and incorrect HA+TX logic, reverted to the following limitation: You can use transactions in a HA cluster, but there are limitations on the transactional guarantees. Transactions function normally with the *primary* broker but replication to the backups is not coverted by the atomic guarantee. The following situations are all safe: - Client rolls back a transaction. - Client successfully commits a transaction. - Primary fails during a transaction *before* the client sends a commit. - Transaction contains only one message. The problem case is when all of the following occur: - transaction contains multiple actions (enqueues or dequeues) - primary fails between client sending commit and receiving commit-complete. In this case it is possible that only part of the transaction was replicated to the backups, so on fail-over partial transaction results may be visible. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1701109 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/ha_test.py | 2 + qpid/cpp/src/tests/ha_tests.py | 206 ++++------------------------------------- 2 files changed, 20 insertions(+), 188 deletions(-) (limited to 'qpid/cpp/src/tests') diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index 82ca808cb1..ace225a509 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -267,6 +267,8 @@ acl allow all all c = self.connect_admin() try: wait_address(c, queue) + if not "msg" in kwargs: + kwargs["msg"]=str(self) assert_browse_retry(c.session(), queue, expected, **kwargs) finally: c.close() diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 2ee2e291e2..0efb8182ec 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1327,28 +1327,25 @@ class TransactionTests(HaBrokerTest): sb.close() return tx - def tx_subscriptions(self, broker): - """Return list of queue names for tx subscriptions""" - return [q for q in broker.agent.repsub_queues() - if q.startswith("qpid.ha-tx")] - def test_tx_simple_commit(self): cluster = HaCluster(self, 2, test_store=True, wait=True) tx = self.tx_simple_setup(cluster) tx.sync() - tx_queues = cluster[0].agent.tx_queues() - - # NOTE: backup does not process transactional dequeues until prepare - cluster[1].assert_browse_backup("a", ["x","y","z"]) - cluster[1].assert_browse_backup("b", ['0', '1', '2']) - tx.acknowledge() + # Pre transaction - messages are acquired on primary but not yet dequeued + # so still there on backup. + cluster[0].assert_browse_backup("a", []) + cluster[1].assert_browse_backup("a", ['x', 'y', 'z']) + for b in cluster: + b.assert_browse_backup("b", ['0', '1', '2']) tx.commit() tx.sync() tx.close() + # Post transaction: all synced. for b in cluster: - self.assert_simple_commit_outcome(b, tx_queues) + b.assert_browse_backup("a", []) + b.assert_browse_backup("b", ['0', '1', '2', "x","y","z"]) # Verify non-tx dequeue is replicated correctly c = cluster.connect(0, protocol=self.tx_protocol) @@ -1360,121 +1357,22 @@ class TransactionTests(HaBrokerTest): c.close() tx.connection.close() - - def check_enq_deq(self, cluster, queue, expect): - for b in cluster: - q = b.agent.getQueue(queue) - self.assertEqual( - (b.name,)+expect, - (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues)) - - def test_tx_enq_notx_deq(self): - """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" - cluster = HaCluster(self, 2, test_store=True) - c = cluster.connect(0, protocol=self.tx_protocol) - - tx = c.session(transactional=True) - c.session().sender("qq;{create:always}").send("m1") - tx.sender("qq;{create:always}").send("tx") - tx.commit() - tx.close() - c.session().sender("qq;{create:always}").send("m2") - self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0)) - - notx = c.session() - self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))]) - notx.acknowledge() - self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0)) - for b in cluster: b.assert_browse_backup('qq', [], msg=b) - for b in cluster: self.assert_tx_clean(b) - - def test_tx_enq_notx_deq_qpid_send(self): - """Verify that a non-tx dequeue of a tx enqueue is replicated correctly""" - cluster = HaCluster(self, 2, test_store=True) - - self.popen( - ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1', - '--content-string=foo'] - ).assert_exit_ok() - for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b) - self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0)) - - self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok() - self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0)) - for b in cluster: b.assert_browse_backup('qq', [], msg=b) - for b in cluster: self.assert_tx_clean(b) - - def assert_tx_clean(self, b): - """Verify that there are no transaction artifacts - (exchanges, queues, subscriptions) on b.""" - class FunctionCache: # Call a function and cache the result. - def __init__(self, f): self.f, self.value = f, None - def __call__(self): self.value = self.f(); return self.value - - txq= FunctionCache(b.agent.tx_queues) - assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value) - txsub = FunctionCache(lambda: self.tx_subscriptions(b)) - assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value) - # TODO aconway 2013-10-15: TX exchanges don't show up in management. - - def assert_simple_commit_outcome(self, b, tx_queues): - b.assert_browse_backup("a", [], msg=b) - b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) - # Check for expected actions on the store - expect = """ - - - - - - - -""" - self.assertEqual(expect, open_read(b.store_log), msg=b) - self.assert_tx_clean(b) - def test_tx_simple_rollback(self): cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster) tx.sync() - tx_queues = cluster[0].agent.tx_queues() tx.acknowledge() tx.rollback() - tx.close() # For clean test. - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + + for b in cluster: + b.assert_browse_backup("a", ["x","y","z"]) + b.assert_browse_backup("b", ['0', '1', '2']) + + tx.close() tx.connection.close() - def assert_simple_rollback_outcome(self, b, tx_queues): - b.assert_browse_backup("a", ["x","y","z"], msg=b) - b.assert_browse_backup("b", ['0', '1', '2'], msg=b) - # Check for expected actions on the store - expect = """ - - -""" - self.assertEqual(open_read(b.store_log), expect, msg=b) - self.assert_tx_clean(b) def test_tx_simple_failure(self): - """Verify we throw TransactionAborted if there is a store error during a transaction""" - cluster = HaCluster(self, 3, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.acknowledge() - l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. - try: - cluster.bounce(0) # Should cause roll-back - tx.connection.session() # Wait for reconnect - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) - self.assertRaises(qm.TransactionAborted, tx.sync) - self.assertRaises(qm.TransactionAborted, tx.commit) - try: tx.connection.close() - except qm.TransactionAborted: pass # Occasionally get exception on close. - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) - finally: l.restore() - - def test_tx_simple_failover(self): """Verify we throw TransactionAborted if there is a fail-over during a transaction""" cluster = HaCluster(self, 3, test_store=True) tx = self.tx_simple_setup(cluster) @@ -1485,79 +1383,15 @@ class TransactionTests(HaBrokerTest): try: cluster.bounce(0) # Should cause roll-back tx.connection.session() # Wait for reconnect - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) self.assertRaises(qm.TransactionAborted, tx.sync) self.assertRaises(qm.TransactionAborted, tx.commit) try: tx.connection.close() except qm.TransactionAborted: pass # Occasionally get exception on close. - for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) + for b in cluster: + b.assert_browse_backup("a", ["x","y","z"]) + b.assert_browse_backup("b", ['0', '1', '2']) finally: l.restore() - def test_tx_unknown_failover(self): - """Verify we throw TransactionUnknown if there is a failure during commit""" - cluster = HaCluster(self, 3, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.acknowledge() - l = LogLevel(ERROR) # Hide expected WARNING log messages from failover. - try: - os.kill(cluster[2].pid, signal.SIGSTOP) # Delay prepare response - class CommitThread(Thread): - def run(self): - try: tx.commit() - except Exception, e: - self.error = e - t = CommitThread() - t.start() # Commit in progress - t.join(timeout=0.01) - self.assertTrue(t.isAlive()) - cluster.bounce(0) - os.kill(cluster[2].pid, signal.SIGCONT) - t.join() - try: raise t.error - except qm.TransactionUnknown: pass - for b in cluster: self.assert_tx_clean(b) - try: tx.connection.close() - except qm.TransactionUnknown: pass # Occasionally get exception on close. - finally: l.restore() - - def test_tx_no_backups(self): - """Test the special case of a TX where there are no backups""" - - # Test commit - cluster = HaCluster(self, 1, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.acknowledge() - tx.commit() - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.close() - self.assert_simple_commit_outcome(cluster[0], tx_queues) - - # Test rollback - cluster = HaCluster(self, 1, test_store=True) - tx = self.tx_simple_setup(cluster) - tx.sync() - tx_queues = cluster[0].agent.tx_queues() - tx.acknowledge() - tx.rollback() - tx.sync() - tx.close() - self.assert_simple_rollback_outcome(cluster[0], tx_queues) - - def test_tx_backup_fail(self): - cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]]) - c = cluster[0].connect(protocol=self.tx_protocol) - tx = c.session(transactional=True) - s = tx.sender("q;{create:always,node:{durable:true}}") - for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True)) - def commit_sync(): tx.commit(); tx.sync() - self.assertRaises(qm.TransactionAborted, commit_sync) - for b in cluster: b.assert_browse_backup("q", []) - self.assertEqual(open_read(cluster[0].store_log), "\n\n\n\n\n") - self.assertEqual(open_read(cluster[1].store_log), "\n\n\n\n") - def test_tx_join_leave(self): """Test cluster members joining/leaving cluster. Also check that tx-queues are cleaned up at end of transaction.""" @@ -1568,13 +1402,11 @@ class TransactionTests(HaBrokerTest): tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) s = tx.sender("q;{create:always}") s.send("a", sync=True) - self.assertEqual([1,1,1], [len(b.agent.tx_queues()) for b in cluster]) cluster[1].kill(final=False) s.send("b") tx.commit() tx.connection.close() for b in [cluster[0],cluster[2]]: - self.assert_tx_clean(b) b.assert_browse_backup("q", ["a","b"], msg=b) # Joining tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True) @@ -1583,7 +1415,6 @@ class TransactionTests(HaBrokerTest): cluster.restart(1) # Not a part of the current transaction. tx.commit() tx.connection.close() - for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b) @@ -1596,7 +1427,6 @@ class TransactionTests(HaBrokerTest): for s in sessions: sn = s.sender("qq;{create:always,node:{durable:true}}") sn.send(qm.Message("foo", durable=True)) - self.assertEqual(n, len(cluster[1].agent.tx_queues())) threads = [ Thread(target=s.commit) for s in sessions] for t in threads: t.start() cluster[0].ready(timeout=1) # Check for deadlock -- cgit v1.2.1