summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-05-01 12:36:05 +0000
committerGordon Sim <gsim@apache.org>2013-05-01 12:36:05 +0000
commit8a0970ddc4fa4e9dae4d6c0a519e6de54ce60d43 (patch)
treebb667fbc14c78c7fbdda6525cda2bf26b4a49554 /qpid/cpp
parentd5b676fcd2b16ba2d416963b39df2d22828fc7d5 (diff)
downloadqpid-python-8a0970ddc4fa4e9dae4d6c0a519e6de54ce60d43.tar.gz
QPID-4786: Only have one thread processing session queue at a time
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1477975 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp66
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h8
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp37
3 files changed, 96 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index e832cd2567..db6e843cf6 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -100,8 +100,24 @@ struct Match
}
}
};
+
+struct ScopedRelease
+{
+ bool& flag;
+ qpid::sys::Monitor& lock;
+
+ ScopedRelease(bool& f, qpid::sys::Monitor& l) : flag(f), lock(l) {}
+ ~ScopedRelease()
+ {
+ sys::Monitor::ScopedLock l(lock);
+ flag = false;
+ lock.notifyAll();
+ }
+};
}
+IncomingMessages::IncomingMessages() : inUse(false) {}
+
void IncomingMessages::setSession(qpid::client::AsyncSession s)
{
sys::Mutex::ScopedLock l(lock);
@@ -110,10 +126,11 @@ void IncomingMessages::setSession(qpid::client::AsyncSession s)
acceptTracker.reset();
}
-bool IncomingMessages::get(Handler& handler, Duration timeout)
+bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout)
{
- {
- sys::Mutex::ScopedLock l(lock);
+ sys::Mutex::ScopedLock l(lock);
+ AbsTime deadline(AbsTime::now(), timeout);
+ do {
//search through received list for any transfer of interest:
for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
{
@@ -123,19 +140,42 @@ bool IncomingMessages::get(Handler& handler, Duration timeout)
return true;
}
}
- }
- //none found, check incoming:
- return process(&handler, timeout);
+ if (inUse) {
+ //someone is already waiting on the incoming session queue, wait for them to finish
+ lock.wait(deadline);
+ } else {
+ inUse = true;
+ ScopedRelease release(inUse, lock);
+ sys::Mutex::ScopedUnlock l(lock);
+ //wait for suitable new message to arrive
+ return process(&handler, timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline));
+ }
+ } while (AbsTime::now() < deadline);
+ return false;
}
-bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout)
+bool IncomingMessages::getNextDestination(std::string& destination, qpid::sys::Duration timeout)
{
sys::Mutex::ScopedLock l(lock);
- //if there is not already a received message, we must wait for one
- if (received.empty() && !wait(timeout)) return false;
- //else we have a message in received; return the corresponding destination
- destination = received.front()->as<MessageTransferBody>()->getDestination();
- return true;
+ AbsTime deadline(AbsTime::now(), timeout);
+ while (received.empty() && AbsTime::now() < deadline) {
+ if (inUse) {
+ //someone is already waiting on the sessions incoming queue
+ lock.wait(deadline);
+ } else {
+ inUse = true;
+ ScopedRelease release(inUse, lock);
+ sys::Mutex::ScopedUnlock l(lock);
+ //wait for an incoming message
+ wait(timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline));
+ }
+ }
+ if (!received.empty()) {
+ destination = received.front()->as<MessageTransferBody>()->getDestination();
+ return true;
+ } else {
+ return false;
+ }
}
void IncomingMessages::accept()
@@ -206,6 +246,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
sys::Mutex::ScopedLock l(lock);
received.push_back(content);
+ lock.notifyAll();
}
} else {
//TODO: handle other types of commands (e.g. message-accept, message-flow etc)
@@ -225,6 +266,7 @@ bool IncomingMessages::wait(qpid::sys::Duration duration)
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
sys::Mutex::ScopedLock l(lock);
received.push_back(content);
+ lock.notifyAll();
return true;
} else {
//TODO: handle other types of commands (e.g. message-accept, message-flow etc)
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index 9053b70312..0f1bbd2720 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -10,9 +10,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -68,6 +68,7 @@ class IncomingMessages
virtual bool accept(MessageTransfer& transfer) = 0;
};
+ IncomingMessages();
void setSession(qpid::client::AsyncSession session);
bool get(Handler& handler, qpid::sys::Duration timeout);
bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
@@ -84,9 +85,10 @@ class IncomingMessages
private:
typedef std::deque<FrameSetPtr> FrameSetQueue;
- sys::Mutex lock;
+ sys::Monitor lock;
qpid::client::AsyncSession session;
boost::shared_ptr< sys::BlockingQueue<FrameSetPtr> > incoming;
+ bool inUse;
FrameSetQueue received;
AcceptTracker acceptTracker;
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index 55cff046e2..5c3c5c41b1 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -1217,6 +1217,43 @@ QPID_AUTO_TEST_CASE(testLinkBindingCleanup)
BOOST_CHECK(!receiver2.fetch(in, Duration::IMMEDIATE));
}
+namespace {
+struct Fetcher : public qpid::sys::Runnable {
+ Receiver receiver;
+ Message message;
+ bool result;
+
+ Fetcher(Receiver r) : receiver(r), result(false) {}
+ void run()
+ {
+ result = receiver.fetch(message, Duration::SECOND*10);
+ }
+};
+}
+
+QPID_AUTO_TEST_CASE(testConcurrentFetch)
+{
+ MessagingFixture fix;
+ Sender sender = fix.session.createSender("my-test-queue;{create:always, node : { x-declare : { auto-delete: true}}}");
+ Receiver receiver = fix.session.createReceiver("my-test-queue");
+ Fetcher fetcher(fix.session.createReceiver("amq.fanout"));
+ qpid::sys::Thread runner(fetcher);
+ Message out("test-message");
+ for (int i = 0; i < 10; i++) {//try several times to make sure
+ sender.send(out, true);
+ //since the message is now on the queue, it should take less than the timeout to actually fetch it
+ qpid::sys::AbsTime start = qpid::sys::AbsTime::now();
+ Message in;
+ BOOST_CHECK(receiver.fetch(in, qpid::messaging::Duration::SECOND*2));
+ qpid::sys::Duration time(start, qpid::sys::AbsTime::now());
+ BOOST_CHECK(time < qpid::sys::TIME_SEC*2);
+ if (time >= qpid::sys::TIME_SEC*2) break;//if we failed, no need to keep testing
+ }
+ fix.session.createSender("amq.fanout").send(out);
+ runner.join();
+ BOOST_CHECK(fetcher.result);
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests