diff options
| author | Gordon Sim <gsim@apache.org> | 2013-05-01 12:36:05 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-05-01 12:36:05 +0000 |
| commit | 8a0970ddc4fa4e9dae4d6c0a519e6de54ce60d43 (patch) | |
| tree | bb667fbc14c78c7fbdda6525cda2bf26b4a49554 /qpid/cpp | |
| parent | d5b676fcd2b16ba2d416963b39df2d22828fc7d5 (diff) | |
| download | qpid-python-8a0970ddc4fa4e9dae4d6c0a519e6de54ce60d43.tar.gz | |
QPID-4786: Only have one thread processing session queue at a time
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1477975 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 66 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h | 8 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 37 |
3 files changed, 96 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index e832cd2567..db6e843cf6 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -100,8 +100,24 @@ struct Match } } }; + +struct ScopedRelease +{ + bool& flag; + qpid::sys::Monitor& lock; + + ScopedRelease(bool& f, qpid::sys::Monitor& l) : flag(f), lock(l) {} + ~ScopedRelease() + { + sys::Monitor::ScopedLock l(lock); + flag = false; + lock.notifyAll(); + } +}; } +IncomingMessages::IncomingMessages() : inUse(false) {} + void IncomingMessages::setSession(qpid::client::AsyncSession s) { sys::Mutex::ScopedLock l(lock); @@ -110,10 +126,11 @@ void IncomingMessages::setSession(qpid::client::AsyncSession s) acceptTracker.reset(); } -bool IncomingMessages::get(Handler& handler, Duration timeout) +bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout) { - { - sys::Mutex::ScopedLock l(lock); + sys::Mutex::ScopedLock l(lock); + AbsTime deadline(AbsTime::now(), timeout); + do { //search through received list for any transfer of interest: for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++) { @@ -123,19 +140,42 @@ bool IncomingMessages::get(Handler& handler, Duration timeout) return true; } } - } - //none found, check incoming: - return process(&handler, timeout); + if (inUse) { + //someone is already waiting on the incoming session queue, wait for them to finish + lock.wait(deadline); + } else { + inUse = true; + ScopedRelease release(inUse, lock); + sys::Mutex::ScopedUnlock l(lock); + //wait for suitable new message to arrive + return process(&handler, timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline)); + } + } while (AbsTime::now() < deadline); + return false; } -bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout) +bool IncomingMessages::getNextDestination(std::string& destination, qpid::sys::Duration timeout) { sys::Mutex::ScopedLock l(lock); - //if there is not already a received message, we must wait for one - if (received.empty() && !wait(timeout)) return false; - //else we have a message in received; return the corresponding destination - destination = received.front()->as<MessageTransferBody>()->getDestination(); - return true; + AbsTime deadline(AbsTime::now(), timeout); + while (received.empty() && AbsTime::now() < deadline) { + if (inUse) { + //someone is already waiting on the sessions incoming queue + lock.wait(deadline); + } else { + inUse = true; + ScopedRelease release(inUse, lock); + sys::Mutex::ScopedUnlock l(lock); + //wait for an incoming message + wait(timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline)); + } + } + if (!received.empty()) { + destination = received.front()->as<MessageTransferBody>()->getDestination(); + return true; + } else { + return false; + } } void IncomingMessages::accept() @@ -206,6 +246,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); sys::Mutex::ScopedLock l(lock); received.push_back(content); + lock.notifyAll(); } } else { //TODO: handle other types of commands (e.g. message-accept, message-flow etc) @@ -225,6 +266,7 @@ bool IncomingMessages::wait(qpid::sys::Duration duration) QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); sys::Mutex::ScopedLock l(lock); received.push_back(content); + lock.notifyAll(); return true; } else { //TODO: handle other types of commands (e.g. message-accept, message-flow etc) diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index 9053b70312..0f1bbd2720 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -68,6 +68,7 @@ class IncomingMessages virtual bool accept(MessageTransfer& transfer) = 0; }; + IncomingMessages(); void setSession(qpid::client::AsyncSession session); bool get(Handler& handler, qpid::sys::Duration timeout); bool getNextDestination(std::string& destination, qpid::sys::Duration timeout); @@ -84,9 +85,10 @@ class IncomingMessages private: typedef std::deque<FrameSetPtr> FrameSetQueue; - sys::Mutex lock; + sys::Monitor lock; qpid::client::AsyncSession session; boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming; + bool inUse; FrameSetQueue received; AcceptTracker acceptTracker; diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 55cff046e2..5c3c5c41b1 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1217,6 +1217,43 @@ QPID_AUTO_TEST_CASE(testLinkBindingCleanup) BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE)); } +namespace { +struct Fetcher : public qpid::sys::Runnable { + Receiver receiver; + Message message; + bool result; + + Fetcher(Receiver r) : receiver(r), result(false) {} + void run() + { + result = receiver.fetch(message, Duration::SECOND*10); + } +}; +} + +QPID_AUTO_TEST_CASE(testConcurrentFetch) +{ + MessagingFixture fix; + Sender sender = fix.session.createSender("my-test-queue;{create:always, node : { x-declare : { auto-delete: true}}}"); + Receiver receiver = fix.session.createReceiver("my-test-queue"); + Fetcher fetcher(fix.session.createReceiver("amq.fanout")); + qpid::sys::Thread runner(fetcher); + Message out("test-message"); + for (int i = 0; i < 10; i++) {//try several times to make sure + sender.send(out, true); + //since the message is now on the queue, it should take less than the timeout to actually fetch it + qpid::sys::AbsTime start = qpid::sys::AbsTime::now(); + Message in; + BOOST_CHECK(receiver.fetch(in, qpid::messaging::Duration::SECOND*2)); + qpid::sys::Duration time(start, qpid::sys::AbsTime::now()); + BOOST_CHECK(time < qpid::sys::TIME_SEC*2); + if (time >= qpid::sys::TIME_SEC*2) break;//if we failed, no need to keep testing + } + fix.session.createSender("amq.fanout").send(out); + runner.join(); + BOOST_CHECK(fetcher.result); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |
