diff options
| author | Alan Conway <aconway@apache.org> | 2013-08-30 14:43:06 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-08-30 14:43:06 +0000 |
| commit | 54cdb4dcada8cfeb23d756e4980e701ebb382c13 (patch) | |
| tree | f9ce23279cffe298d1a3953489355214b827e530 /qpid/cpp/src/tests | |
| parent | 27d31ba355acfef3ec66c23e48864e88a358014b (diff) | |
| download | qpid-python-54cdb4dcada8cfeb23d756e4980e701ebb382c13.tar.gz | |
QPID-4327: HA clean up transaction artifacts at end of TX.
- Backups delete transactions on failover.
- TxReplicator cancel subscriptions when transaction is finished.
- TxReplicator rollback if destroyed prematurely.
- Handle special case of no backups for a tx.
- ha_tests.py: new and modified tests to cover the new functionality.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518982 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 4 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 17 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 79 |
3 files changed, 71 insertions, 29 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index b07a5b5d11..a282f59b13 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -415,6 +415,10 @@ class BrokerTest(TestCase): Provides a well-known working directory for each test. """ + def __init__(self, *args, **kwargs): + self.longMessage = True # Enable long messages for assert*(..., msg=xxx) + TestCase.__init__(self, *args, **kwargs) + # Environment settings. qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) ha_lib = os.getenv("HA_LIB") diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py index ab63602655..4a7b538edd 100755 --- a/qpid/cpp/src/tests/ha_test.py +++ b/qpid/cpp/src/tests/ha_test.py @@ -48,9 +48,24 @@ class QmfAgent(object): address, client_properties={"qpid.ha-admin":1}, **kwargs) self._agent = BrokerAgent(self._connection) - def get_queues(self): + def queues(self): return [q.values['name'] for q in self._agent.getAllQueues()] + def repsub_queue(self, sub): + """If QMF subscription sub is a replicating subscription return + the name of the replicated queue, else return None""" + session_name = self.getSession(sub.sessionRef).name + m = re.search("qpid.ha-q:(.*)\.", session_name) + return m and m.group(1) + + def repsub_queues(self): + """Return queue names for all replicating subscriptions""" + return filter(None, [self.repsub_queue(s) for s in self.getAllSubscriptions()]) + + def tx_queues(self): + """Return names of all tx-queues""" + return [q for q in self.queues() if q.startswith("qpid.ha-tx")] + def __getattr__(self, name): a = getattr(self._agent, name) return a diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index c3577ca626..9a39bdf979 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1320,11 +1320,18 @@ class TransactionTests(BrokerTest): sb.send(m) return tx + def tx_subscriptions(self, broker): + """Return list of queue names for tx subscriptions""" + return [q for q in broker.agent().repsub_queues() + if q.startswith("qpid.ha-tx")] + def test_tx_simple_commit(self): cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) tx.sync() + self.assertEqual(1, len(self.tx_subscriptions(cluster[0]))) # One backup of the transaction + # NOTE: backup does not process transactional dequeues until prepare cluster[1].assert_browse_backup("a", ["x","y","z"]) cluster[1].assert_browse_backup("b", ['0', '1', '2']) @@ -1333,10 +1340,12 @@ class TransactionTests(BrokerTest): tx.commit() tx.sync() - for b in cluster: - b.assert_browse_backup("a", [], msg=b) - b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) + for b in cluster: self.assert_simple_commit_outcome(b) + self.assertEqual(0, len(self.tx_subscriptions(cluster[0]))) # Backup tx subscription cancelled. + def assert_simple_commit_outcome(self, b): + b.assert_browse_backup("a", [], msg=b) + b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) # Check for expected actions on the store expect = """<enqueue a x> <enqueue a y> @@ -1347,42 +1356,59 @@ class TransactionTests(BrokerTest): <dequeue a z tx=1> <commit tx=1> """ - self.assertEqual(expect, open_read(cluster[0].store_log)) - self.assertEqual(expect, open_read(cluster[1].store_log)) + self.assertEqual(expect, open_read(b.store_log), msg=b) + # Check that transaction artifacts are cleaned up. + self.assertEqual([], b.agent().tx_queues(), msg=b) def test_tx_simple_rollback(self): cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) tx.acknowledge() tx.rollback() - for b in cluster: - b.assert_browse_backup("a", ["x","y","z"], msg=b) - b.assert_browse_backup("b", ['0', '1', '2'], msg=b) + for b in cluster: self.assert_simple_rollback_outcome(b) + + def assert_simple_rollback_outcome(self, b): + b.assert_browse_backup("a", ["x","y","z"], msg=b) + b.assert_browse_backup("b", ['0', '1', '2'], msg=b) # Check for expected actions on the store expect = """<enqueue a x> <enqueue a y> <enqueue a z> """ - self.assertEqual(open_read(cluster[0].store_log), expect) - self.assertEqual(open_read(cluster[1].store_log), expect) + self.assertEqual(open_read(b.store_log), expect, msg=b) + # Check that transaction artifacts are cleaned up. + self.assertEqual([], b.agent().tx_queues(), msg=b) def test_tx_simple_failover(self): - cluster = HaCluster(self, 2, test_store=True) + cluster = HaCluster(self, 3, test_store=True) tx = self.tx_simple_setup(cluster[0]) + tx.sync() tx.acknowledge() cluster.bounce(0) # Should cause roll-back - cluster[0].wait_status("ready") - for b in cluster: - b.assert_browse_backup("a", ["x","y","z"], msg=b) - b.assert_browse_backup("b", ['0', '1', '2'], msg=b) + cluster[0].wait_status("ready") # Restarted. + cluster[1].wait_status("active") # Promoted. + cluster[2].wait_status("ready") # Failed over. + for b in cluster: self.assert_simple_rollback_outcome(b) + + def test_tx_no_backups(self): + """Test the special case of a TX where there are no backups""" + + # Test commit + cluster = HaCluster(self, 1, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.acknowledge() + tx.commit() + tx.sync() + self.assert_simple_commit_outcome(cluster[0]) + + # Test rollback + cluster = HaCluster(self, 1, test_store=True) + tx = self.tx_simple_setup(cluster[0]) + tx.acknowledge() + tx.rollback() + tx.sync() + self.assert_simple_rollback_outcome(cluster[0]) - # Check for expected actions on the store - expect = """<enqueue a x> -<enqueue a y> -<enqueue a z> -""" - self.assertEqual(open_read(cluster[0].store_log), expect) - self.assertEqual(open_read(cluster[1].store_log), expect) def test_tx_backup_fail(self): cluster = HaCluster( @@ -1400,26 +1426,23 @@ class TransactionTests(BrokerTest): """Test cluster members joining/leaving cluster. Also check that tx-queues are cleaned up at end of transaction.""" - def tx_queues(broker): - return [q for q in broker.agent().get_queues() if q.startswith("qpid.ha-tx")] - cluster = HaCluster(self, 3) # Leaving tx = cluster[0].connect().session(transactional=True) s = tx.sender("q;{create:always}") s.send("a", sync=True) - self.assertEqual([1,1,1], [len(tx_queues(b)) for b in cluster]) + self.assertEqual([1,1,1], [len(b.agent().tx_queues()) for b in cluster]) cluster[1].kill(final=False) s.send("b") self.assertRaises(ServerError, tx.commit) - self.assertEqual([[],[]], [tx_queues(b) for b in [cluster[0],cluster[2]]]) + self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]]) # Joining tx = cluster[0].connect().session(transactional=True) s = tx.sender("q;{create:always}") s.send("foo") - tx_q = tx_queues(cluster[0])[0] + tx_q = cluster[0].agent().tx_queues()[0] cluster.restart(1) # Verify the new member should not be in the transaction. # but should receive the result of the transaction via normal replication. |
