summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-10-24 12:37:53 +0000
committerGordon Sim <gsim@apache.org>2013-10-24 12:37:53 +0000
commit1188148f10e2e814c58ca10e387cf1391b70ca35 (patch)
tree332bcb53d7dd6ce120e39fb1b3b8dfecfd4aed0b /qpid/cpp/src/tests
parent842f77bfeb53f5ceeb63cd9dc65b80defdb2266d (diff)
downloadqpid-python-1188148f10e2e814c58ca10e387cf1391b70ca35.tar.gz
QPID-4265: test closing of receiver with concurrent fetch
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1535355 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp49
1 files changed, 47 insertions, 2 deletions
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index 323d7e8231..605faf7fdd 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -1243,11 +1243,18 @@ struct Fetcher : public qpid::sys::Runnable {
Receiver receiver;
Message message;
bool result;
+ qpid::messaging::Duration timeout;
+ bool timedOut;
- Fetcher(Receiver r) : receiver(r), result(false) {}
+ Fetcher(Receiver r) : receiver(r), result(false), timeout(Duration::SECOND*10), timedOut(false) {}
void run()
{
- result = receiver.fetch(message, Duration::SECOND*10);
+ qpid::sys::AbsTime start(qpid::sys::now());
+ try {
+ result = receiver.fetch(message, timeout);
+ } catch (const MessagingException&) {}
+ qpid::sys::Duration timeTaken(start, qpid::sys::now());
+ timedOut = (uint64_t) timeTaken >= timeout.getMilliseconds() * qpid::sys::TIME_MSEC;
}
};
}
@@ -1387,6 +1394,44 @@ QPID_AUTO_TEST_CASE(testRollbackWithFullPrefetch)
txsession.commit();
}
+QPID_AUTO_TEST_CASE(testCloseAndConcurrentFetch)
+{
+ QueueFixture fix;
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Fetcher fetcher(receiver);
+ qpid::sys::Thread runner(fetcher);
+ ::usleep(500);
+ receiver.close();
+ runner.join();
+ BOOST_CHECK(!fetcher.timedOut);
+}
+
+QPID_AUTO_TEST_CASE(testCloseAndMultipleConcurrentFetches)
+{
+ QueueFixture fix;
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Receiver receiver2 = fix.session.createReceiver("amq.fanout");
+ Receiver receiver3 = fix.session.createReceiver("amq.fanout");
+ Fetcher fetcher(receiver);
+ Fetcher fetcher2(receiver2);
+ Fetcher fetcher3(receiver3);
+ qpid::sys::Thread runner(fetcher);
+ qpid::sys::Thread runner2(fetcher2);
+ qpid::sys::Thread runner3(fetcher3);
+ ::usleep(500);
+ receiver.close();
+ Message message("Test");
+ fix.session.createSender("amq.fanout").send(message);
+ runner2.join();
+ BOOST_CHECK(fetcher2.result);
+ BOOST_CHECK_EQUAL(fetcher2.message.getContent(), message.getContent());
+ runner3.join();
+ BOOST_CHECK(fetcher3.result);
+ BOOST_CHECK_EQUAL(fetcher3.message.getContent(), message.getContent());
+ runner.join();
+ BOOST_CHECK(!fetcher.timedOut);
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests