diff options
| author | Alan Conway <aconway@apache.org> | 2014-08-22 14:13:14 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-08-22 14:13:14 +0000 |
| commit | 80097244af5350560a787b58a5135ae54365047a (patch) | |
| tree | 40b4925a61729fa1394789f8f8c6ff5c8429c0cd /qpid/cpp/src/tests/AsyncCompletion.cpp | |
| parent | dee3a10e026896da06ff178335c0b3f86a7e6604 (diff) | |
| download | qpid-python-80097244af5350560a787b58a5135ae54365047a.tar.gz | |
QPID-5855: JAVA Client Can not recieve message with qpid ha cluster "Session exception occured while trying to commit"
The problem: the java client sets the sync flag on tx.commit and then waits for
completion of the entire transaction. According to the 0-10 spec, this is
correct, the commit (or rollback) will not complete until all of the
transactional commands have completed. However the C++ broker was sometimes
completing a commit *before* one of the the corresponding enqueues. It issued
the completions up to the commit (because the commit is makred sync) but there
is a "hole" for the incomplete enqueue. The enqueue is not marked sync so when
this hole is filled no completion is sent and the client hangs.
Fix: make tx.commit a "sync point", that is it behaves like execution.sync and
is not completed till all preceeding commands are complete. Note tx.rollback
does not need modification as it is never completed asynchronously.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619816 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/AsyncCompletion.cpp')
| -rw-r--r-- | qpid/cpp/src/tests/AsyncCompletion.cpp | 35 |
1 files changed, 34 insertions, 1 deletions
diff --git a/qpid/cpp/src/tests/AsyncCompletion.cpp b/qpid/cpp/src/tests/AsyncCompletion.cpp index e32097106f..dc43f10156 100644 --- a/qpid/cpp/src/tests/AsyncCompletion.cpp +++ b/qpid/cpp/src/tests/AsyncCompletion.cpp @@ -44,7 +44,8 @@ using broker::PersistableQueue; using sys::TIME_SEC; using boost::intrusive_ptr; -/** @file Unit tests for async completion. +/** @file + * Unit tests for async completion. * Using a dummy store, verify that the broker indicates async completion of * message enqueues at the correct time. */ @@ -69,6 +70,10 @@ class AsyncCompletionMessageStore : public NullMessageStore { QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite) +/** + * Send a sync after a bunch of incomplete messages, verify the sync completes + * only when all the messages are complete. + */ QPID_AUTO_TEST_CASE(testWaitTillComplete) { SessionFixture fix; AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore; @@ -104,6 +109,34 @@ QPID_AUTO_TEST_CASE(testWaitTillComplete) { sync.wait(); // Should complete now, all messages are completed. } +/** + * Send a sync after all messages are complete, verify it completes immediately. + */ +QPID_AUTO_TEST_CASE(testSyncAfterComplete) { + SessionFixture fix; + AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore; + boost::shared_ptr<qpid::broker::MessageStore> p; + p.reset(store); + fix.broker->setStore(p); + AsyncSession s = fix.session; + + static const int count = 3; + + s.queueDeclare("q", arg::durable=true); + // Transfer and complete all the messages + for (int i = 0; i < count; ++i) { + Message msg(boost::lexical_cast<string>(i), "q"); + msg.getDeliveryProperties().setDeliveryMode(PERSISTENT); + Completion transfer = s.messageTransfer(arg::content=msg, arg::sync=true); + intrusive_ptr<PersistableMessage> enqueued = store->enqueued.pop(TIME_SEC); + enqueued->enqueueComplete(); + transfer.wait(); + } + // Send a sync, make sure it completes immediately + Completion sync = s.executionSync(arg::sync=true); + sync.wait(); // Should complete now, all messages are completed. +} + QPID_AUTO_TEST_CASE(testGetResult) { SessionFixture fix; AsyncSession s = fix.session; |
