summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/AsyncCompletion.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-22 14:13:14 +0000
committerAlan Conway <aconway@apache.org>2014-08-22 14:13:14 +0000
commit80097244af5350560a787b58a5135ae54365047a (patch)
tree40b4925a61729fa1394789f8f8c6ff5c8429c0cd /qpid/cpp/src/tests/AsyncCompletion.cpp
parentdee3a10e026896da06ff178335c0b3f86a7e6604 (diff)
downloadqpid-python-80097244af5350560a787b58a5135ae54365047a.tar.gz
QPID-5855: JAVA Client Can not recieve message with qpid ha cluster "Session exception occured while trying to commit"
The problem: the java client sets the sync flag on tx.commit and then waits for completion of the entire transaction. According to the 0-10 spec, this is correct, the commit (or rollback) will not complete until all of the transactional commands have completed. However the C++ broker was sometimes completing a commit *before* one of the the corresponding enqueues. It issued the completions up to the commit (because the commit is makred sync) but there is a "hole" for the incomplete enqueue. The enqueue is not marked sync so when this hole is filled no completion is sent and the client hangs. Fix: make tx.commit a "sync point", that is it behaves like execution.sync and is not completed till all preceeding commands are complete. Note tx.rollback does not need modification as it is never completed asynchronously. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1619816 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/AsyncCompletion.cpp')
-rw-r--r--qpid/cpp/src/tests/AsyncCompletion.cpp35
1 files changed, 34 insertions, 1 deletions
diff --git a/qpid/cpp/src/tests/AsyncCompletion.cpp b/qpid/cpp/src/tests/AsyncCompletion.cpp
index e32097106f..dc43f10156 100644
--- a/qpid/cpp/src/tests/AsyncCompletion.cpp
+++ b/qpid/cpp/src/tests/AsyncCompletion.cpp
@@ -44,7 +44,8 @@ using broker::PersistableQueue;
using sys::TIME_SEC;
using boost::intrusive_ptr;
-/** @file Unit tests for async completion.
+/** @file
+ * Unit tests for async completion.
* Using a dummy store, verify that the broker indicates async completion of
* message enqueues at the correct time.
*/
@@ -69,6 +70,10 @@ class AsyncCompletionMessageStore : public NullMessageStore {
QPID_AUTO_TEST_SUITE(AsyncCompletionTestSuite)
+/**
+ * Send a sync after a bunch of incomplete messages, verify the sync completes
+ * only when all the messages are complete.
+ */
QPID_AUTO_TEST_CASE(testWaitTillComplete) {
SessionFixture fix;
AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore;
@@ -104,6 +109,34 @@ QPID_AUTO_TEST_CASE(testWaitTillComplete) {
sync.wait(); // Should complete now, all messages are completed.
}
+/**
+ * Send a sync after all messages are complete, verify it completes immediately.
+ */
+QPID_AUTO_TEST_CASE(testSyncAfterComplete) {
+ SessionFixture fix;
+ AsyncCompletionMessageStore* store = new AsyncCompletionMessageStore;
+ boost::shared_ptr<qpid::broker::MessageStore> p;
+ p.reset(store);
+ fix.broker->setStore(p);
+ AsyncSession s = fix.session;
+
+ static const int count = 3;
+
+ s.queueDeclare("q", arg::durable=true);
+ // Transfer and complete all the messages
+ for (int i = 0; i < count; ++i) {
+ Message msg(boost::lexical_cast<string>(i), "q");
+ msg.getDeliveryProperties().setDeliveryMode(PERSISTENT);
+ Completion transfer = s.messageTransfer(arg::content=msg, arg::sync=true);
+ intrusive_ptr<PersistableMessage> enqueued = store->enqueued.pop(TIME_SEC);
+ enqueued->enqueueComplete();
+ transfer.wait();
+ }
+ // Send a sync, make sure it completes immediately
+ Completion sync = s.executionSync(arg::sync=true);
+ sync.wait(); // Should complete now, all messages are completed.
+}
+
QPID_AUTO_TEST_CASE(testGetResult) {
SessionFixture fix;
AsyncSession s = fix.session;