summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/TransactionObserverTest.cpp9
-rw-r--r--qpid/cpp/src/tests/TxBufferTest.cpp11
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py46
3 files changed, 42 insertions, 24 deletions
diff --git a/qpid/cpp/src/tests/TransactionObserverTest.cpp b/qpid/cpp/src/tests/TransactionObserverTest.cpp
index 2a7d94b1ae..80ef494c21 100644
--- a/qpid/cpp/src/tests/TransactionObserverTest.cpp
+++ b/qpid/cpp/src/tests/TransactionObserverTest.cpp
@@ -79,8 +79,10 @@ struct MockBrokerObserver : public BrokerObserver {
MockBrokerObserver(bool prep_=true) : prep(prep_) {}
void startTx(const intrusive_ptr<TxBuffer>& buffer) {
- tx.reset(new MockTransactionObserver(prep));
- buffer->setObserver(tx);
+ if (!tx) { // Don't overwrite first tx with automatically started second tx.
+ tx.reset(new MockTransactionObserver(prep));
+ buffer->setObserver(tx);
+ }
}
};
@@ -94,7 +96,7 @@ Session simpleTxTransaction(MessagingFixture& fix) {
return txSession;
}
-QPID_AUTO_TEST_CASE(tesTxtCommit) {
+QPID_AUTO_TEST_CASE(testTxCommit) {
MessagingFixture fix;
shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver);
fix.broker->getBrokerObservers().add(brokerObserver);
@@ -114,6 +116,7 @@ QPID_AUTO_TEST_CASE(testTxFail) {
fix.broker->getBrokerObservers().add(brokerObserver);
Session txSession = simpleTxTransaction(fix);
try {
+ ScopedSuppressLogging sl; // Suppress messages for expected error.
txSession.commit();
BOOST_FAIL("Expected exception");
} catch(...) {}
diff --git a/qpid/cpp/src/tests/TxBufferTest.cpp b/qpid/cpp/src/tests/TxBufferTest.cpp
index 4807026ab7..3f052d213e 100644
--- a/qpid/cpp/src/tests/TxBufferTest.cpp
+++ b/qpid/cpp/src/tests/TxBufferTest.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/TxBuffer.h"
#include "unit_test.h"
+#include "test_tools.h"
#include <iostream>
#include <vector>
#include "TxMocks.h"
@@ -50,7 +51,8 @@ QPID_AUTO_TEST_CASE(testCommitLocal)
buffer.enlist(static_pointer_cast<TxOp>(opB));//opB enlisted twice
buffer.enlist(static_pointer_cast<TxOp>(opC));
- BOOST_CHECK(buffer.commitLocal(&store));
+ buffer.startCommit(&store);
+ buffer.endCommit(&store);
store.check();
BOOST_CHECK(store.isCommitted());
opA->check();
@@ -75,7 +77,12 @@ QPID_AUTO_TEST_CASE(testFailOnCommitLocal)
buffer.enlist(static_pointer_cast<TxOp>(opB));
buffer.enlist(static_pointer_cast<TxOp>(opC));
- BOOST_CHECK(!buffer.commitLocal(&store));
+ try {
+ ScopedSuppressLogging sl; // Suppress messages for expected error.
+ buffer.startCommit(&store);
+ buffer.endCommit(&store);
+ BOOST_FAIL("Expected exception");
+ } catch (...) {}
BOOST_CHECK(store.isAborted());
store.check();
opA->check();
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index ad546afc62..79024d48e3 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1346,24 +1346,19 @@ class TransactionTests(HaBrokerTest):
tx.commit()
tx.sync()
+ tx.close()
for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
- def assert_tx_cleanup(self, b, tx_queues):
+ def assert_tx_clean(self, b):
"""Verify that there are no transaction artifacts
(exchanges, queues, subscriptions) on b."""
-
- self.assertEqual(0, len(b.agent().tx_queues()), msg=b)
- self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b)
-
- # TX exchanges don't show up in management so test for existence by name.
- s = b.connect_admin().session()
- try:
- for q in tx_queues:
- try:
- s.sender("%s;{node:{type:topic}}"%q)
- self.fail("Found tx exchange %s on %s "%(q,b))
- except NotFound: pass
- finally: s.connection.close()
+ queues=[]
+ def txq(): queues = b.agent().tx_queues(); return not queues
+ assert retry(txq), "%s: unexpected %s"%(b,queues)
+ subs=[]
+ def txs(): subs = self.tx_subscriptions(b); return not subs
+ assert retry(txs), "%s: unexpected %s"%(b,subs)
+ # TODO aconway 2013-10-15: TX exchanges don't show up in management.
def assert_simple_commit_outcome(self, b, tx_queues):
b.assert_browse_backup("a", [], msg=b)
@@ -1379,7 +1374,7 @@ class TransactionTests(HaBrokerTest):
<commit tx=1>
"""
self.assertEqual(expect, open_read(b.store_log), msg=b)
- self.assert_tx_cleanup(b, tx_queues)
+ self.assert_tx_clean(b)
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
@@ -1388,6 +1383,7 @@ class TransactionTests(HaBrokerTest):
tx_queues = cluster[0].agent().tx_queues()
tx.acknowledge()
tx.rollback()
+ tx.close() # For clean test.
for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
def assert_simple_rollback_outcome(self, b, tx_queues):
@@ -1399,7 +1395,7 @@ class TransactionTests(HaBrokerTest):
<enqueue a z>
"""
self.assertEqual(open_read(b.store_log), expect, msg=b)
- self.assert_tx_cleanup(b, tx_queues)
+ self.assert_tx_clean(b)
def test_tx_simple_failover(self):
cluster = HaCluster(self, 3, test_store=True)
@@ -1423,6 +1419,7 @@ class TransactionTests(HaBrokerTest):
tx.commit()
tx.sync()
tx_queues = cluster[0].agent().tx_queues()
+ tx.close()
self.assert_simple_commit_outcome(cluster[0], tx_queues)
# Test rollback
@@ -1433,6 +1430,7 @@ class TransactionTests(HaBrokerTest):
tx.acknowledge()
tx.rollback()
tx.sync()
+ tx.close()
self.assert_simple_rollback_outcome(cluster[0], tx_queues)
def assert_commit_raises(self, tx):
@@ -1448,7 +1446,7 @@ class TransactionTests(HaBrokerTest):
for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
self.assert_commit_raises(tx)
for b in cluster: b.assert_browse_backup("q", [])
- self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
+ self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
def test_tx_join_leave(self):
@@ -1465,14 +1463,15 @@ class TransactionTests(HaBrokerTest):
cluster[1].kill(final=False)
s.send("b")
self.assert_commit_raises(tx)
- self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
-
+ for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b)
# Joining
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
cluster.restart(1)
tx.commit()
+ tx.close()
+ for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
@@ -1493,6 +1492,15 @@ class TransactionTests(HaBrokerTest):
for t in threads: t.join()
for s in sessions: s.connection.close()
+ def test_broker_tx_tests(self):
+ cluster = HaCluster(self, 3)
+ print "Running python broker tx tests"
+ p = subprocess.Popen(["qpid-python-test",
+ "-m", "qpid_tests.broker_0_10",
+ "-b", "localhost:%s"%(cluster[0].port()),
+ "*.tx.*"])
+ assert not p.wait()
+ print "Finished python broker tx tests"
if __name__ == "__main__":
outdir = "ha_tests.tmp"