From 8a0970ddc4fa4e9dae4d6c0a519e6de54ce60d43 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 1 May 2013 12:36:05 +0000 Subject: QPID-4786: Only have one thread processing session queue at a time git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1477975 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/MessagingSessionTests.cpp | 37 ++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) (limited to 'qpid/cpp/src/tests/MessagingSessionTests.cpp') diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 55cff046e2..5c3c5c41b1 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1217,6 +1217,43 @@ QPID_AUTO_TEST_CASE(testLinkBindingCleanup) BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE)); } +namespace { +struct Fetcher : public qpid::sys::Runnable { + Receiver receiver; + Message message; + bool result; + + Fetcher(Receiver r) : receiver(r), result(false) {} + void run() + { + result = receiver.fetch(message, Duration::SECOND*10); + } +}; +} + +QPID_AUTO_TEST_CASE(testConcurrentFetch) +{ + MessagingFixture fix; + Sender sender = fix.session.createSender("my-test-queue;{create:always, node : { x-declare : { auto-delete: true}}}"); + Receiver receiver = fix.session.createReceiver("my-test-queue"); + Fetcher fetcher(fix.session.createReceiver("amq.fanout")); + qpid::sys::Thread runner(fetcher); + Message out("test-message"); + for (int i = 0; i < 10; i++) {//try several times to make sure + sender.send(out, true); + //since the message is now on the queue, it should take less than the timeout to actually fetch it + qpid::sys::AbsTime start = qpid::sys::AbsTime::now(); + Message in; + BOOST_CHECK(receiver.fetch(in, qpid::messaging::Duration::SECOND*2)); + qpid::sys::Duration time(start, qpid::sys::AbsTime::now()); + BOOST_CHECK(time < qpid::sys::TIME_SEC*2); + if (time >= qpid::sys::TIME_SEC*2) break;//if we failed, no need to keep testing + } + fix.session.createSender("amq.fanout").send(out); + runner.join(); + BOOST_CHECK(fetcher.result); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests -- cgit v1.2.1