summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-08-30 14:43:06 +0000
committerAlan Conway <aconway@apache.org>2013-08-30 14:43:06 +0000
commit54cdb4dcada8cfeb23d756e4980e701ebb382c13 (patch)
treef9ce23279cffe298d1a3953489355214b827e530 /qpid/cpp/src/tests
parent27d31ba355acfef3ec66c23e48864e88a358014b (diff)
downloadqpid-python-54cdb4dcada8cfeb23d756e4980e701ebb382c13.tar.gz
QPID-4327: HA clean up transaction artifacts at end of TX.
- Backups delete transactions on failover. - TxReplicator cancel subscriptions when transaction is finished. - TxReplicator rollback if destroyed prematurely. - Handle special case of no backups for a tx. - ha_tests.py: new and modified tests to cover the new functionality. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/brokertest.py4
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py17
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py79
3 files changed, 71 insertions, 29 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index b07a5b5d11..a282f59b13 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -415,6 +415,10 @@ class BrokerTest(TestCase):
Provides a well-known working directory for each test.
"""
+ def __init__(self, *args, **kwargs):
+ self.longMessage = True # Enable long messages for assert*(..., msg=xxx)
+ TestCase.__init__(self, *args, **kwargs)
+
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
ha_lib = os.getenv("HA_LIB")
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index ab63602655..4a7b538edd 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -48,9 +48,24 @@ class QmfAgent(object):
address, client_properties={"qpid.ha-admin":1}, **kwargs)
self._agent = BrokerAgent(self._connection)
- def get_queues(self):
+ def queues(self):
return [q.values['name'] for q in self._agent.getAllQueues()]
+ def repsub_queue(self, sub):
+ """If QMF subscription sub is a replicating subscription return
+ the name of the replicated queue, else return None"""
+ session_name = self.getSession(sub.sessionRef).name
+ m = re.search("qpid.ha-q:(.*)\.", session_name)
+ return m and m.group(1)
+
+ def repsub_queues(self):
+ """Return queue names for all replicating subscriptions"""
+ return filter(None, [self.repsub_queue(s) for s in self.getAllSubscriptions()])
+
+ def tx_queues(self):
+ """Return names of all tx-queues"""
+ return [q for q in self.queues() if q.startswith("qpid.ha-tx")]
+
def __getattr__(self, name):
a = getattr(self._agent, name)
return a
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index c3577ca626..9a39bdf979 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1320,11 +1320,18 @@ class TransactionTests(BrokerTest):
sb.send(m)
return tx
+ def tx_subscriptions(self, broker):
+ """Return list of queue names for tx subscriptions"""
+ return [q for q in broker.agent().repsub_queues()
+ if q.startswith("qpid.ha-tx")]
+
def test_tx_simple_commit(self):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.sync()
+ self.assertEqual(1, len(self.tx_subscriptions(cluster[0]))) # One backup of the transaction
+
# NOTE: backup does not process transactional dequeues until prepare
cluster[1].assert_browse_backup("a", ["x","y","z"])
cluster[1].assert_browse_backup("b", ['0', '1', '2'])
@@ -1333,10 +1340,12 @@ class TransactionTests(BrokerTest):
tx.commit()
tx.sync()
- for b in cluster:
- b.assert_browse_backup("a", [], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
+ for b in cluster: self.assert_simple_commit_outcome(b)
+ self.assertEqual(0, len(self.tx_subscriptions(cluster[0]))) # Backup tx subscription cancelled.
+ def assert_simple_commit_outcome(self, b):
+ b.assert_browse_backup("a", [], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
# Check for expected actions on the store
expect = """<enqueue a x>
<enqueue a y>
@@ -1347,42 +1356,59 @@ class TransactionTests(BrokerTest):
<dequeue a z tx=1>
<commit tx=1>
"""
- self.assertEqual(expect, open_read(cluster[0].store_log))
- self.assertEqual(expect, open_read(cluster[1].store_log))
+ self.assertEqual(expect, open_read(b.store_log), msg=b)
+ # Check that transaction artifacts are cleaned up.
+ self.assertEqual([], b.agent().tx_queues(), msg=b)
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.acknowledge()
tx.rollback()
- for b in cluster:
- b.assert_browse_backup("a", ["x","y","z"], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+ for b in cluster: self.assert_simple_rollback_outcome(b)
+
+ def assert_simple_rollback_outcome(self, b):
+ b.assert_browse_backup("a", ["x","y","z"], msg=b)
+ b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
# Check for expected actions on the store
expect = """<enqueue a x>
<enqueue a y>
<enqueue a z>
"""
- self.assertEqual(open_read(cluster[0].store_log), expect)
- self.assertEqual(open_read(cluster[1].store_log), expect)
+ self.assertEqual(open_read(b.store_log), expect, msg=b)
+ # Check that transaction artifacts are cleaned up.
+ self.assertEqual([], b.agent().tx_queues(), msg=b)
def test_tx_simple_failover(self):
- cluster = HaCluster(self, 2, test_store=True)
+ cluster = HaCluster(self, 3, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
tx.acknowledge()
cluster.bounce(0) # Should cause roll-back
- cluster[0].wait_status("ready")
- for b in cluster:
- b.assert_browse_backup("a", ["x","y","z"], msg=b)
- b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
+ cluster[0].wait_status("ready") # Restarted.
+ cluster[1].wait_status("active") # Promoted.
+ cluster[2].wait_status("ready") # Failed over.
+ for b in cluster: self.assert_simple_rollback_outcome(b)
+
+ def test_tx_no_backups(self):
+ """Test the special case of a TX where there are no backups"""
+
+ # Test commit
+ cluster = HaCluster(self, 1, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
+ tx.commit()
+ tx.sync()
+ self.assert_simple_commit_outcome(cluster[0])
+
+ # Test rollback
+ cluster = HaCluster(self, 1, test_store=True)
+ tx = self.tx_simple_setup(cluster[0])
+ tx.acknowledge()
+ tx.rollback()
+ tx.sync()
+ self.assert_simple_rollback_outcome(cluster[0])
- # Check for expected actions on the store
- expect = """<enqueue a x>
-<enqueue a y>
-<enqueue a z>
-"""
- self.assertEqual(open_read(cluster[0].store_log), expect)
- self.assertEqual(open_read(cluster[1].store_log), expect)
def test_tx_backup_fail(self):
cluster = HaCluster(
@@ -1400,26 +1426,23 @@ class TransactionTests(BrokerTest):
"""Test cluster members joining/leaving cluster.
Also check that tx-queues are cleaned up at end of transaction."""
- def tx_queues(broker):
- return [q for q in broker.agent().get_queues() if q.startswith("qpid.ha-tx")]
-
cluster = HaCluster(self, 3)
# Leaving
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("a", sync=True)
- self.assertEqual([1,1,1], [len(tx_queues(b)) for b in cluster])
+ self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster])
cluster[1].kill(final=False)
s.send("b")
self.assertRaises(ServerError, tx.commit)
- self.assertEqual([[],[]], [tx_queues(b) for b in [cluster[0],cluster[2]]])
+ self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
# Joining
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
- tx_q = tx_queues(cluster[0])[0]
+ tx_q = cluster[0].agent().tx_queues()[0]
cluster.restart(1)
# Verify the new member should not be in the transaction.
# but should receive the result of the transaction via normal replication.