summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-07-06 17:27:58 +0000
committerGordon Sim <gsim@apache.org>2010-07-06 17:27:58 +0000
commitdda1aa80a4fa0ddc72057881e21276197b3d344e (patch)
tree324dea0e36a43038556ff80c000216a2e5906da8 /cpp
parent35510d734264a82a5e477e826c3666e9fdbadc62 (diff)
downloadqpid-python-dda1aa80a4fa0ddc72057881e21276197b3d344e.tar.gz
QPID-664: Don't hold lock while waiting for incoming message in nextReceiver() call.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@960951 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp2
-rw-r--r--cpp/src/tests/MessagingThreadTests.cpp37
2 files changed, 38 insertions, 1 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index a6067097bb..800c3269b9 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -323,11 +323,11 @@ bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message,
bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout)
{
- qpid::sys::Mutex::ScopedLock l(lock);
while (true) {
try {
std::string destination;
if (incoming.getNextDestination(destination, adjust(timeout))) {
+ qpid::sys::Mutex::ScopedLock l(lock);
Receivers::const_iterator i = receivers.find(destination);
if (i == receivers.end()) {
throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown destination " << destination));
diff --git a/cpp/src/tests/MessagingThreadTests.cpp b/cpp/src/tests/MessagingThreadTests.cpp
index a355ba7800..48264735b1 100644
--- a/cpp/src/tests/MessagingThreadTests.cpp
+++ b/cpp/src/tests/MessagingThreadTests.cpp
@@ -54,6 +54,25 @@ struct ReceiveThread : public sys::Runnable {
}
};
+struct NextReceiverThread : public sys::Runnable {
+ Session session;
+ vector<string> received;
+ string error;
+
+ NextReceiverThread(Session s) : session(s) {}
+ void run() {
+ try {
+ while(true) {
+ Message m = session.nextReceiver(Duration::SECOND*5).fetch();
+ if (m.getContent() == "END") break;
+ received.push_back(m.getContent());
+ }
+ } catch (const std::exception& e) {
+ error = e.what();
+ }
+ }
+};
+
QPID_AUTO_TEST_CASE(testConcurrentSendReceive) {
MessagingFixture fix;
@@ -103,5 +122,23 @@ QPID_AUTO_TEST_CASE(testCloseSessionBusyReceiver) {
BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable);
}
+QPID_AUTO_TEST_CASE(testConcurrentSendNextReceiver) {
+ MessagingFixture fix;
+ Receiver r = fix.session.createReceiver("concurrent;{create:always,link:{reliability:unreliable}}");
+ const size_t COUNT=100;
+ r.setCapacity(COUNT);
+ NextReceiverThread rt(fix.session);
+ sys::Thread thread(rt);
+ sys::usleep(1000); // Give the receive thread time to block.
+ Sender s = fix.session.createSender("concurrent;{create:always}");
+ for (size_t i = 0; i < COUNT; ++i) {
+ s.send(Message());
+ }
+ s.send(Message("END"));
+ thread.join();
+ BOOST_CHECK_EQUAL(rt.error, string());
+ BOOST_CHECK_EQUAL(COUNT, rt.received.size());
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests