summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py46
1 files changed, 27 insertions, 19 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index ad546afc62..79024d48e3 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1346,24 +1346,19 @@ class TransactionTests(HaBrokerTest):
tx.commit()
tx.sync()
+ tx.close()
for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
- def assert_tx_cleanup(self, b, tx_queues):
+ def assert_tx_clean(self, b):
"""Verify that there are no transaction artifacts
(exchanges, queues, subscriptions) on b."""
-
- self.assertEqual(0, len(b.agent().tx_queues()), msg=b)
- self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b)
-
- # TX exchanges don't show up in management so test for existence by name.
- s = b.connect_admin().session()
- try:
- for q in tx_queues:
- try:
- s.sender("%s;{node:{type:topic}}"%q)
- self.fail("Found tx exchange %s on %s "%(q,b))
- except NotFound: pass
- finally: s.connection.close()
+ queues=[]
+ def txq(): queues = b.agent().tx_queues(); return not queues
+ assert retry(txq), "%s: unexpected %s"%(b,queues)
+ subs=[]
+ def txs(): subs = self.tx_subscriptions(b); return not subs
+ assert retry(txs), "%s: unexpected %s"%(b,subs)
+ # TODO aconway 2013-10-15: TX exchanges don't show up in management.
def assert_simple_commit_outcome(self, b, tx_queues):
b.assert_browse_backup("a", [], msg=b)
@@ -1379,7 +1374,7 @@ class TransactionTests(HaBrokerTest):
<commit tx=1>
"""
self.assertEqual(expect, open_read(b.store_log), msg=b)
- self.assert_tx_cleanup(b, tx_queues)
+ self.assert_tx_clean(b)
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
@@ -1388,6 +1383,7 @@ class TransactionTests(HaBrokerTest):
tx_queues = cluster[0].agent().tx_queues()
tx.acknowledge()
tx.rollback()
+ tx.close() # For clean test.
for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
def assert_simple_rollback_outcome(self, b, tx_queues):
@@ -1399,7 +1395,7 @@ class TransactionTests(HaBrokerTest):
<enqueue a z>
"""
self.assertEqual(open_read(b.store_log), expect, msg=b)
- self.assert_tx_cleanup(b, tx_queues)
+ self.assert_tx_clean(b)
def test_tx_simple_failover(self):
cluster = HaCluster(self, 3, test_store=True)
@@ -1423,6 +1419,7 @@ class TransactionTests(HaBrokerTest):
tx.commit()
tx.sync()
tx_queues = cluster[0].agent().tx_queues()
+ tx.close()
self.assert_simple_commit_outcome(cluster[0], tx_queues)
# Test rollback
@@ -1433,6 +1430,7 @@ class TransactionTests(HaBrokerTest):
tx.acknowledge()
tx.rollback()
tx.sync()
+ tx.close()
self.assert_simple_rollback_outcome(cluster[0], tx_queues)
def assert_commit_raises(self, tx):
@@ -1448,7 +1446,7 @@ class TransactionTests(HaBrokerTest):
for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
self.assert_commit_raises(tx)
for b in cluster: b.assert_browse_backup("q", [])
- self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
+ self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
def test_tx_join_leave(self):
@@ -1465,14 +1463,15 @@ class TransactionTests(HaBrokerTest):
cluster[1].kill(final=False)
s.send("b")
self.assert_commit_raises(tx)
- self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
-
+ for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b)
# Joining
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
cluster.restart(1)
tx.commit()
+ tx.close()
+ for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
@@ -1493,6 +1492,15 @@ class TransactionTests(HaBrokerTest):
for t in threads: t.join()
for s in sessions: s.connection.close()
+ def test_broker_tx_tests(self):
+ cluster = HaCluster(self, 3)
+ print "Running python broker tx tests"
+ p = subprocess.Popen(["qpid-python-test",
+ "-m", "qpid_tests.broker_0_10",
+ "-b", "localhost:%s"%(cluster[0].port()),
+ "*.tx.*"])
+ assert not p.wait()
+ print "Finished python broker tx tests"
if __name__ == "__main__":
outdir = "ha_tests.tmp"