summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-09-03 20:52:51 +0000
committerAlan Conway <aconway@apache.org>2013-09-03 20:52:51 +0000
commitbec603bc053e95788c8936d9a17253bced049920 (patch)
treeccda8680b0e18d2b690a0cb64b5c5eb226422b8f /qpid/cpp/src/tests
parent5d510ffc8f13c649ff9bd77cc5c4f6139aaffee5 (diff)
downloadqpid-python-bec603bc053e95788c8936d9a17253bced049920.tar.gz
QPID:4327: HA support for TX transactions - fix cleanup at transaction end.
Was not removing transaction exchange at end of transaction. Fix to transaction end logic. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1519846 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py48
1 files changed, 34 insertions, 14 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 9a39bdf979..47a1be815a 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -37,6 +37,7 @@ def grep(filename, regexp):
class HaBrokerTest(BrokerTest):
"""Base class for HA broker tests"""
+
def assert_log_no_errors(self, broker):
log = broker.get_log()
if grep(log, re.compile("] error|] critical")):
@@ -1329,8 +1330,7 @@ class TransactionTests(BrokerTest):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.sync()
-
- self.assertEqual(1, len(self.tx_subscriptions(cluster[0]))) # One backup of the transaction
+ tx_queues = cluster[0].agent().tx_queues()
# NOTE: backup does not process transactional dequeues until prepare
cluster[1].assert_browse_backup("a", ["x","y","z"])
@@ -1340,10 +1340,26 @@ class TransactionTests(BrokerTest):
tx.commit()
tx.sync()
- for b in cluster: self.assert_simple_commit_outcome(b)
- self.assertEqual(0, len(self.tx_subscriptions(cluster[0]))) # Backup tx subscription cancelled.
+ for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
+
+ def assert_tx_cleanup(self, b, tx_queues):
+ """Verify that there are no transaction artifacts
+ (exchanges, queues, subscriptions) on b."""
- def assert_simple_commit_outcome(self, b):
+ self.assertEqual(0, len(b.agent().tx_queues()), msg=b)
+ self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b)
+
+ # TX exchanges don't show up in management so test for existence by name.
+ s = b.connect_admin().session()
+ try:
+ for q in tx_queues:
+ try:
+ s.sender("%s;{node:{type:topic}}"%q)
+ self.fail("Found tx exchange %s on %s "%(q,b))
+ except NotFound: pass
+ finally: s.connection.close()
+
+ def assert_simple_commit_outcome(self, b, tx_queues):
b.assert_browse_backup("a", [], msg=b)
b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
# Check for expected actions on the store
@@ -1357,17 +1373,18 @@ class TransactionTests(BrokerTest):
<commit tx=1>
"""
self.assertEqual(expect, open_read(b.store_log), msg=b)
- # Check that transaction artifacts are cleaned up.
- self.assertEqual([], b.agent().tx_queues(), msg=b)
+ self.assert_tx_cleanup(b, tx_queues)
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
tx.acknowledge()
tx.rollback()
- for b in cluster: self.assert_simple_rollback_outcome(b)
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
- def assert_simple_rollback_outcome(self, b):
+ def assert_simple_rollback_outcome(self, b, tx_queues):
b.assert_browse_backup("a", ["x","y","z"], msg=b)
b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
# Check for expected actions on the store
@@ -1376,19 +1393,19 @@ class TransactionTests(BrokerTest):
<enqueue a z>
"""
self.assertEqual(open_read(b.store_log), expect, msg=b)
- # Check that transaction artifacts are cleaned up.
- self.assertEqual([], b.agent().tx_queues(), msg=b)
+ self.assert_tx_cleanup(b, tx_queues)
def test_tx_simple_failover(self):
cluster = HaCluster(self, 3, test_store=True)
tx = self.tx_simple_setup(cluster[0])
tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
tx.acknowledge()
cluster.bounce(0) # Should cause roll-back
cluster[0].wait_status("ready") # Restarted.
cluster[1].wait_status("active") # Promoted.
cluster[2].wait_status("ready") # Failed over.
- for b in cluster: self.assert_simple_rollback_outcome(b)
+ for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
def test_tx_no_backups(self):
"""Test the special case of a TX where there are no backups"""
@@ -1399,15 +1416,18 @@ class TransactionTests(BrokerTest):
tx.acknowledge()
tx.commit()
tx.sync()
- self.assert_simple_commit_outcome(cluster[0])
+ tx_queues = cluster[0].agent().tx_queues()
+ self.assert_simple_commit_outcome(cluster[0], tx_queues)
# Test rollback
cluster = HaCluster(self, 1, test_store=True)
tx = self.tx_simple_setup(cluster[0])
+ tx.sync()
+ tx_queues = cluster[0].agent().tx_queues()
tx.acknowledge()
tx.rollback()
tx.sync()
- self.assert_simple_rollback_outcome(cluster[0])
+ self.assert_simple_rollback_outcome(cluster[0], tx_queues)
def test_tx_backup_fail(self):