diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 46 |
1 files changed, 27 insertions, 19 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index ad546afc62..79024d48e3 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -1346,24 +1346,19 @@ class TransactionTests(HaBrokerTest): tx.commit() tx.sync() + tx.close() for b in cluster: self.assert_simple_commit_outcome(b, tx_queues) - def assert_tx_cleanup(self, b, tx_queues): + def assert_tx_clean(self, b): """Verify that there are no transaction artifacts (exchanges, queues, subscriptions) on b.""" - - self.assertEqual(0, len(b.agent().tx_queues()), msg=b) - self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b) - - # TX exchanges don't show up in management so test for existence by name. - s = b.connect_admin().session() - try: - for q in tx_queues: - try: - s.sender("%s;{node:{type:topic}}"%q) - self.fail("Found tx exchange %s on %s "%(q,b)) - except NotFound: pass - finally: s.connection.close() + queues=[] + def txq(): queues = b.agent().tx_queues(); return not queues + assert retry(txq), "%s: unexpected %s"%(b,queues) + subs=[] + def txs(): subs = self.tx_subscriptions(b); return not subs + assert retry(txs), "%s: unexpected %s"%(b,subs) + # TODO aconway 2013-10-15: TX exchanges don't show up in management. def assert_simple_commit_outcome(self, b, tx_queues): b.assert_browse_backup("a", [], msg=b) @@ -1379,7 +1374,7 @@ class TransactionTests(HaBrokerTest): <commit tx=1> """ self.assertEqual(expect, open_read(b.store_log), msg=b) - self.assert_tx_cleanup(b, tx_queues) + self.assert_tx_clean(b) def test_tx_simple_rollback(self): cluster = HaCluster(self, 2, test_store=True) @@ -1388,6 +1383,7 @@ class TransactionTests(HaBrokerTest): tx_queues = cluster[0].agent().tx_queues() tx.acknowledge() tx.rollback() + tx.close() # For clean test. for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues) def assert_simple_rollback_outcome(self, b, tx_queues): @@ -1399,7 +1395,7 @@ class TransactionTests(HaBrokerTest): <enqueue a z> """ self.assertEqual(open_read(b.store_log), expect, msg=b) - self.assert_tx_cleanup(b, tx_queues) + self.assert_tx_clean(b) def test_tx_simple_failover(self): cluster = HaCluster(self, 3, test_store=True) @@ -1423,6 +1419,7 @@ class TransactionTests(HaBrokerTest): tx.commit() tx.sync() tx_queues = cluster[0].agent().tx_queues() + tx.close() self.assert_simple_commit_outcome(cluster[0], tx_queues) # Test rollback @@ -1433,6 +1430,7 @@ class TransactionTests(HaBrokerTest): tx.acknowledge() tx.rollback() tx.sync() + tx.close() self.assert_simple_rollback_outcome(cluster[0], tx_queues) def assert_commit_raises(self, tx): @@ -1448,7 +1446,7 @@ class TransactionTests(HaBrokerTest): for m in ["foo","bang","bar"]: s.send(Message(m, durable=True)) self.assert_commit_raises(tx) for b in cluster: b.assert_browse_backup("q", []) - self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n") + self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n") self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n") def test_tx_join_leave(self): @@ -1465,14 +1463,15 @@ class TransactionTests(HaBrokerTest): cluster[1].kill(final=False) s.send("b") self.assert_commit_raises(tx) - self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]]) - + for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b) # Joining tx = cluster[0].connect().session(transactional=True) s = tx.sender("q;{create:always}") s.send("foo") cluster.restart(1) tx.commit() + tx.close() + for b in cluster: self.assert_tx_clean(b) # The new member is not in the tx but receives the results normal replication. for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b) @@ -1493,6 +1492,15 @@ class TransactionTests(HaBrokerTest): for t in threads: t.join() for s in sessions: s.connection.close() + def test_broker_tx_tests(self): + cluster = HaCluster(self, 3) + print "Running python broker tx tests" + p = subprocess.Popen(["qpid-python-test", + "-m", "qpid_tests.broker_0_10", + "-b", "localhost:%s"%(cluster[0].port()), + "*.tx.*"]) + assert not p.wait() + print "Finished python broker tx tests" if __name__ == "__main__": outdir = "ha_tests.tmp" |
