summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-10-29 15:23:49 +0000
committerAlan Conway <aconway@apache.org>2013-10-29 15:23:49 +0000
commit1731c3ba99577fa515985609a675afd89e5c91e4 (patch)
tree8432209a8e11f28fca72d8d7972016572da0f3c6 /qpid/cpp/src/tests
parent7033bf67ab672fafc57d374f8a727cc8e4b7c54e (diff)
downloadqpid-python-1731c3ba99577fa515985609a675afd89e5c91e4.tar.gz
QPID-5139: HA transactions block a thread, can deadlock the broker
PrimaryTxObserver::prepare used to block pending responses from each backup. With concurrent transactions this can deadlock the broker: once all worker threads are blocked in prepare, responses from backups cannot be received. This commit generalizes the async completion mechanism for messages to allow async completion of arbitrary commands. It leaves the special-case code for messages undisturbed but adds a second path (starting from SessionState::handleCommand) for async completion of other commands. In particular it implements tx.commit to allow async completion. TxBuffer is now an AsyncCompletion and commitLocal() is split into - startCommit() called by SemanticState::commit() - endCommit() called when the commit command completes TxAccept no longer holds pre-computed ranges, compute fresh each time. - Avoid range iterators going out of date during a delayed commit. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1536754 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/TransactionObserverTest.cpp9
-rw-r--r--qpid/cpp/src/tests/TxBufferTest.cpp11
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py46
3 files changed, 42 insertions, 24 deletions
diff --git a/qpid/cpp/src/tests/TransactionObserverTest.cpp b/qpid/cpp/src/tests/TransactionObserverTest.cpp
index 2a7d94b1ae..80ef494c21 100644
--- a/qpid/cpp/src/tests/TransactionObserverTest.cpp
+++ b/qpid/cpp/src/tests/TransactionObserverTest.cpp
@@ -79,8 +79,10 @@ struct MockBrokerObserver : public BrokerObserver {
MockBrokerObserver(bool prep_=true) : prep(prep_) {}
void startTx(const intrusive_ptr<TxBuffer>& buffer) {
- tx.reset(new MockTransactionObserver(prep));
- buffer->setObserver(tx);
+ if (!tx) { // Don't overwrite first tx with automatically started second tx.
+ tx.reset(new MockTransactionObserver(prep));
+ buffer->setObserver(tx);
+ }
}
};
@@ -94,7 +96,7 @@ Session simpleTxTransaction(MessagingFixture& fix) {
return txSession;
}
-QPID_AUTO_TEST_CASE(tesTxtCommit) {
+QPID_AUTO_TEST_CASE(testTxCommit) {
MessagingFixture fix;
shared_ptr<MockBrokerObserver> brokerObserver(new MockBrokerObserver);
fix.broker->getBrokerObservers().add(brokerObserver);
@@ -114,6 +116,7 @@ QPID_AUTO_TEST_CASE(testTxFail) {
fix.broker->getBrokerObservers().add(brokerObserver);
Session txSession = simpleTxTransaction(fix);
try {
+ ScopedSuppressLogging sl; // Suppress messages for expected error.
txSession.commit();
BOOST_FAIL("Expected exception");
} catch(...) {}
diff --git a/qpid/cpp/src/tests/TxBufferTest.cpp b/qpid/cpp/src/tests/TxBufferTest.cpp
index 4807026ab7..3f052d213e 100644
--- a/qpid/cpp/src/tests/TxBufferTest.cpp
+++ b/qpid/cpp/src/tests/TxBufferTest.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/TxBuffer.h"
#include "unit_test.h"
+#include "test_tools.h"
#include <iostream>
#include <vector>
#include "TxMocks.h"
@@ -50,7 +51,8 @@ QPID_AUTO_TEST_CASE(testCommitLocal)
buffer.enlist(static_pointer_cast<TxOp>(opB));//opB enlisted twice
buffer.enlist(static_pointer_cast<TxOp>(opC));
- BOOST_CHECK(buffer.commitLocal(&store));
+ buffer.startCommit(&store);
+ buffer.endCommit(&store);
store.check();
BOOST_CHECK(store.isCommitted());
opA->check();
@@ -75,7 +77,12 @@ QPID_AUTO_TEST_CASE(testFailOnCommitLocal)
buffer.enlist(static_pointer_cast<TxOp>(opB));
buffer.enlist(static_pointer_cast<TxOp>(opC));
- BOOST_CHECK(!buffer.commitLocal(&store));
+ try {
+ ScopedSuppressLogging sl; // Suppress messages for expected error.
+ buffer.startCommit(&store);
+ buffer.endCommit(&store);
+ BOOST_FAIL("Expected exception");
+ } catch (...) {}
BOOST_CHECK(store.isAborted());
store.check();
opA->check();
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index ad546afc62..79024d48e3 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1346,24 +1346,19 @@ class TransactionTests(HaBrokerTest):
tx.commit()
tx.sync()
+ tx.close()
for b in cluster: self.assert_simple_commit_outcome(b, tx_queues)
- def assert_tx_cleanup(self, b, tx_queues):
+ def assert_tx_clean(self, b):
"""Verify that there are no transaction artifacts
(exchanges, queues, subscriptions) on b."""
-
- self.assertEqual(0, len(b.agent().tx_queues()), msg=b)
- self.assertEqual(0, len(self.tx_subscriptions(b)), msg=b)
-
- # TX exchanges don't show up in management so test for existence by name.
- s = b.connect_admin().session()
- try:
- for q in tx_queues:
- try:
- s.sender("%s;{node:{type:topic}}"%q)
- self.fail("Found tx exchange %s on %s "%(q,b))
- except NotFound: pass
- finally: s.connection.close()
+ queues=[]
+ def txq(): queues = b.agent().tx_queues(); return not queues
+ assert retry(txq), "%s: unexpected %s"%(b,queues)
+ subs=[]
+ def txs(): subs = self.tx_subscriptions(b); return not subs
+ assert retry(txs), "%s: unexpected %s"%(b,subs)
+ # TODO aconway 2013-10-15: TX exchanges don't show up in management.
def assert_simple_commit_outcome(self, b, tx_queues):
b.assert_browse_backup("a", [], msg=b)
@@ -1379,7 +1374,7 @@ class TransactionTests(HaBrokerTest):
<commit tx=1>
"""
self.assertEqual(expect, open_read(b.store_log), msg=b)
- self.assert_tx_cleanup(b, tx_queues)
+ self.assert_tx_clean(b)
def test_tx_simple_rollback(self):
cluster = HaCluster(self, 2, test_store=True)
@@ -1388,6 +1383,7 @@ class TransactionTests(HaBrokerTest):
tx_queues = cluster[0].agent().tx_queues()
tx.acknowledge()
tx.rollback()
+ tx.close() # For clean test.
for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
def assert_simple_rollback_outcome(self, b, tx_queues):
@@ -1399,7 +1395,7 @@ class TransactionTests(HaBrokerTest):
<enqueue a z>
"""
self.assertEqual(open_read(b.store_log), expect, msg=b)
- self.assert_tx_cleanup(b, tx_queues)
+ self.assert_tx_clean(b)
def test_tx_simple_failover(self):
cluster = HaCluster(self, 3, test_store=True)
@@ -1423,6 +1419,7 @@ class TransactionTests(HaBrokerTest):
tx.commit()
tx.sync()
tx_queues = cluster[0].agent().tx_queues()
+ tx.close()
self.assert_simple_commit_outcome(cluster[0], tx_queues)
# Test rollback
@@ -1433,6 +1430,7 @@ class TransactionTests(HaBrokerTest):
tx.acknowledge()
tx.rollback()
tx.sync()
+ tx.close()
self.assert_simple_rollback_outcome(cluster[0], tx_queues)
def assert_commit_raises(self, tx):
@@ -1448,7 +1446,7 @@ class TransactionTests(HaBrokerTest):
for m in ["foo","bang","bar"]: s.send(Message(m, durable=True))
self.assert_commit_raises(tx)
for b in cluster: b.assert_browse_backup("q", [])
- self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<abort tx=1>\n")
+ self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<enqueue q bar tx=1>\n<abort tx=1>\n")
self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue q foo tx=1>\n<enqueue q bang tx=1>\n<abort tx=1>\n")
def test_tx_join_leave(self):
@@ -1465,14 +1463,15 @@ class TransactionTests(HaBrokerTest):
cluster[1].kill(final=False)
s.send("b")
self.assert_commit_raises(tx)
- self.assertEqual([[],[]], [b.agent().tx_queues() for b in [cluster[0],cluster[2]]])
-
+ for b in [cluster[0],cluster[2]]: self.assert_tx_clean(b)
# Joining
tx = cluster[0].connect().session(transactional=True)
s = tx.sender("q;{create:always}")
s.send("foo")
cluster.restart(1)
tx.commit()
+ tx.close()
+ for b in cluster: self.assert_tx_clean(b)
# The new member is not in the tx but receives the results normal replication.
for b in cluster: b.assert_browse_backup("q", ["foo"], msg=b)
@@ -1493,6 +1492,15 @@ class TransactionTests(HaBrokerTest):
for t in threads: t.join()
for s in sessions: s.connection.close()
+ def test_broker_tx_tests(self):
+ cluster = HaCluster(self, 3)
+ print "Running python broker tx tests"
+ p = subprocess.Popen(["qpid-python-test",
+ "-m", "qpid_tests.broker_0_10",
+ "-b", "localhost:%s"%(cluster[0].port()),
+ "*.tx.*"])
+ assert not p.wait()
+ print "Finished python broker tx tests"
if __name__ == "__main__":
outdir = "ha_tests.tmp"