summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py79
1 files changed, 51 insertions, 28 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index c3577ca626..9a39bdf979 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1320,11 +1320,18 @@ class TransactionTests(BrokerTest):
sb.send(m)
return tx
+ def tx_subscriptions(self, broker):
+ """Return list of queue names for tx subscriptions"""
+ return [q for q in broker.agent().repsub_queues()
+ if q.startswith("qpid.ha-tx")]
+
def test_tx_simple_commit(self):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.sync()
+ self.assertEqual(1, len(self.tx_subscriptions(cluster[0]))) # One backup of the transaction
+
# NOTE: backup does not process transactional dequeues until prepare
cluster[1].assert_browse_backup("a", ["x","y","z"])
cluster[1].assert_browse_backup("b", ['0', '1', '2'])
@@ -1333,10 +1340,12 @@ class TransactionTests(BrokerTest):
tx.commit()
tx.sync()
- for b in cluster:
- b.assert_browse_backup("a", [], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
+ for b in cluster: self.assert_simple_commit_outcome(b)
+ self.assertEqual(0, len(self.tx_subscriptions(cluster[0]))) # Backup tx subscription cancelled.
+ def assert_simple_commit_outcome(self, b):
+ b.assert_browse_backup("a", [], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
# Check for expected actions on the store
expect = """<enqueue a x>
<enqueue a y>
@@ -1347,42 +1356,59 @@ class TransactionTests(BrokerTest):
<dequeue a z tx=1>
<commit tx=1>
"""
- self.assertEqual(expect, open_read(cluster[0].store_log))
- self.assertEqual(expect, open_read(cluster[1].store_log))
+ self.assertEqual(expect, open_read(b.store_log), msg=b)
+ # Check that transaction artifacts are cleaned up.
+ self.assertEqual([], b.agent().tx_queues(), msg=b)
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.acknowledge()
tx.rollback()
- for b in cluster:
- b.assert_browse_backup("a", ["x","y","z"], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+ for b in cluster: self.assert_simple_rollback_outcome(b)
+
+ def assert_simple_rollback_outcome(self, b):
+ b.assert_browse_backup("a", ["x","y","z"], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
# Check for expected actions on the store
expect = """<enqueue a x>
<enqueue a y>
<enqueue a z>
"""
- self.assertEqual(open_read(cluster[0].store_log), expect)
- self.assertEqual(open_read(cluster[1].store_log), expect)
+ self.assertEqual(open_read(b.store_log), expect, msg=b)
+ # Check that transaction artifacts are cleaned up.
+ self.assertEqual([], b.agent().tx_queues(), msg=b)
def test_tx_simple_failover(self):
- cluster = HaCluster(self, 2, test_store=True)
+ cluster = HaCluster(self, 3, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
tx.acknowledge()
cluster.bounce(0) # Should cause roll-back
- cluster[0].wait_status("ready")
- for b in cluster:
- b.assert_browse_backup("a", ["x","y","z"], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+ cluster[0].wait_status("ready") # Restarted.
+ cluster[1].wait_status("active") # Promoted.
+ cluster[2].wait_status("ready") # Failed over.
+ for b in cluster: self.assert_simple_rollback_outcome(b)
+
+ def test_tx_no_backups(self):
+ """Test the special case of a TX where there are no backups"""
+
+ # Test commit
+ cluster = HaCluster(self, 1, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
+ tx.commit()
+ tx.sync()
+ self.assert_simple_commit_outcome(cluster[0])
+
+ # Test rollback
+ cluster = HaCluster(self, 1, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
+ tx.rollback()
+ tx.sync()
+ self.assert_simple_rollback_outcome(cluster[0])
- # Check for expected actions on the store
- expect = """<enqueue a x>
-<enqueue a y>
-<enqueue a z>
-"""
- self.assertEqual(open_read(cluster[0].store_log), expect)
- self.assertEqual(open_read(cluster[1].store_log), expect)
def test_tx_backup_fail(self):
cluster = HaCluster(
@@ -1400,26 +1426,23 @@ class TransactionTests(BrokerTest):
"""Test cluster members joining/leaving cluster.
Also check that tx-queues are cleaned up at end of transaction."""
- def tx_queues(broker):
- return [q for q in broker.agent().get_queues() if q.startswith("qpid.ha-tx")]
-
cluster = HaCluster(self, 3)
# Leaving
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("a", sync=True)
- self.assertEqual([1,1,1], [len(tx_queues(b)) for b in cluster])
+ self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
cluster[1].kill(final=False)
s.send("b")
self.assertRaises(ServerError, tx.commit)
- self.assertEqual([[],[]], [tx_queues(b) for b in [cluster[0],cluster[2]]])
+ self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
# Joining
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
- tx_q = tx_queues(cluster[0])[0]
+ tx_q = cluster[0].agent().tx_queues()[0]
cluster.restart(1)
# Verify the new member should not be in the transaction.
# but should receive the result of the transaction via normal replication.