diff options
| author | Alan Conway <aconway@apache.org> | 2013-09-03 20:52:51 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2013-09-03 20:52:51 +0000 |
| commit | bec603bc053e95788c8936d9a17253bced049920 (patch) | |
| tree | ccda8680b0e18d2b690a0cb64b5c5eb226422b8f /qpid/cpp/src/tests | |
| parent | 5d510ffc8f13c649ff9bd77cc5c4f6139aaffee5 (diff) | |
| download | qpid-python-bec603bc053e95788c8936d9a17253bced049920.tar.gz | |
QPID:4327: HA support for TX transactions - fix cleanup at transaction end.
Was not removing transaction exchange at end of transaction.
Fix to transaction end logic.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1519846 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 48 |
1 files changed, 34 insertions, 14 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 9a39bdf979..47a1be815a 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -37,6 +37,7 @@ def grep(filename, regexp): class HaBrokerTest(BrokerTest): """Base class for HA broker tests""" + def assert_log_no_errors(self, broker): log = broker.get_log() if grep(log, re.compile("] error|] critical")): @@ -1329,8 +1330,7 @@ class TransactionTests(BrokerTest): cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) tx.sync() - - self.assertEqual(1, len(self.tx_subscriptions(cluster[0]))) # One backup of the transaction + tx_queues = cluster[0].agent().tx_queues() # NOTE: backup does not process transactional dequeues until prepare cluster[1].assert_browse_backup("a", ["x","y","z"]) @@ -1340,10 +1340,26 @@ class TransactionTests(BrokerTest): tx.commit() tx.sync() - for b in cluster: self.assert_simple_commit_outcome(b) - self.assertEqual(0, len(self.tx_subscriptions(cluster[0]))) # Backup tx subscription cancelled. + for b in cluster: self.assert_simple_commit_outcome(b, tx_queues) + + def assert_tx_cleanup(self, b, tx_queues): + """Verify that there are no transaction artifacts + (exchanges, queues, subscriptions) on b.""" - def assert_simple_commit_outcome(self, b): + self.assertEqual(0, len(b.agent().tx_queues()), msg=b) + self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b) + + # TX exchanges don't show up in management so test for existence by name. + s = b.connect_admin().session() + try: + for q in tx_queues: + try: + s.sender("%s;{node:{type:topic}}"%q) + self.fail("Found tx exchange %s on %s "%(q,b)) + except NotFound: pass + finally: s.connection.close() + + def assert_simple_commit_outcome(self, b, tx_queues): b.assert_browse_backup("a", [], msg=b) b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b) # Check for expected actions on the store @@ -1357,17 +1373,18 @@ class TransactionTests(BrokerTest): <commit tx=1> """ self.assertEqual(expect, open_read(b.store_log), msg=b) - # Check that transaction artifacts are cleaned up. - self.assertEqual([], b.agent().tx_queues(), msg=b) + self.assert_tx_cleanup(b, tx_queues) def test_tx_simple_rollback(self): cluster = HaCluster(self, 2, test_store=True) tx = self.tx_simple_setup(cluster[0]) + tx.sync() + tx_queues = cluster[0].agent().tx_queues() tx.acknowledge() tx.rollback() - for b in cluster: self.assert_simple_rollback_outcome(b) + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) - def assert_simple_rollback_outcome(self, b): + def assert_simple_rollback_outcome(self, b, tx_queues): b.assert_browse_backup("a", ["x","y","z"], msg=b) b.assert_browse_backup("b", ['0', '1', '2'], msg=b) # Check for expected actions on the store @@ -1376,19 +1393,19 @@ class TransactionTests(BrokerTest): <enqueue a z> """ self.assertEqual(open_read(b.store_log), expect, msg=b) - # Check that transaction artifacts are cleaned up. - self.assertEqual([], b.agent().tx_queues(), msg=b) + self.assert_tx_cleanup(b, tx_queues) def test_tx_simple_failover(self): cluster = HaCluster(self, 3, test_store=True) tx = self.tx_simple_setup(cluster[0]) tx.sync() + tx_queues = cluster[0].agent().tx_queues() tx.acknowledge() cluster.bounce(0) # Should cause roll-back cluster[0].wait_status("ready") # Restarted. cluster[1].wait_status("active") # Promoted. cluster[2].wait_status("ready") # Failed over. - for b in cluster: self.assert_simple_rollback_outcome(b) + for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) def test_tx_no_backups(self): """Test the special case of a TX where there are no backups""" @@ -1399,15 +1416,18 @@ class TransactionTests(BrokerTest): tx.acknowledge() tx.commit() tx.sync() - self.assert_simple_commit_outcome(cluster[0]) + tx_queues = cluster[0].agent().tx_queues() + self.assert_simple_commit_outcome(cluster[0], tx_queues) # Test rollback cluster = HaCluster(self, 1, test_store=True) tx = self.tx_simple_setup(cluster[0]) + tx.sync() + tx_queues = cluster[0].agent().tx_queues() tx.acknowledge() tx.rollback() tx.sync() - self.assert_simple_rollback_outcome(cluster[0]) + self.assert_simple_rollback_outcome(cluster[0], tx_queues) def test_tx_backup_fail(self): |
