diff options
author | Gordon Sim <gsim@apache.org> | 2010-07-06 17:27:58 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-07-06 17:27:58 +0000 |
commit | dda1aa80a4fa0ddc72057881e21276197b3d344e (patch) | |
tree | 324dea0e36a43038556ff80c000216a2e5906da8 /cpp | |
parent | 35510d734264a82a5e477e826c3666e9fdbadc62 (diff) | |
download | qpid-python-dda1aa80a4fa0ddc72057881e21276197b3d344e.tar.gz |
QPID-664: Don't hold lock while waiting for incoming message in nextReceiver() call.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@960951 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/MessagingThreadTests.cpp | 37 |
2 files changed, 38 insertions, 1 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index a6067097bb..800c3269b9 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -323,11 +323,11 @@ bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messaging::Duration timeout) { - qpid::sys::Mutex::ScopedLock l(lock); while (true) { try { std::string destination; if (incoming.getNextDestination(destination, adjust(timeout))) { + qpid::sys::Mutex::ScopedLock l(lock); Receivers::const_iterator i = receivers.find(destination); if (i == receivers.end()) { throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown destination " << destination)); diff --git a/cpp/src/tests/MessagingThreadTests.cpp b/cpp/src/tests/MessagingThreadTests.cpp index a355ba7800..48264735b1 100644 --- a/cpp/src/tests/MessagingThreadTests.cpp +++ b/cpp/src/tests/MessagingThreadTests.cpp @@ -54,6 +54,25 @@ struct ReceiveThread : public sys::Runnable { } }; +struct NextReceiverThread : public sys::Runnable { + Session session; + vector<string> received; + string error; + + NextReceiverThread(Session s) : session(s) {} + void run() { + try { + while(true) { + Message m = session.nextReceiver(Duration::SECOND*5).fetch(); + if (m.getContent() == "END") break; + received.push_back(m.getContent()); + } + } catch (const std::exception& e) { + error = e.what(); + } + } +}; + QPID_AUTO_TEST_CASE(testConcurrentSendReceive) { MessagingFixture fix; @@ -103,5 +122,23 @@ QPID_AUTO_TEST_CASE(testCloseSessionBusyReceiver) { BOOST_CHECK_THROW(r.fetch(Duration(0)), NoMessageAvailable); } +QPID_AUTO_TEST_CASE(testConcurrentSendNextReceiver) { + MessagingFixture fix; + Receiver r = fix.session.createReceiver("concurrent;{create:always,link:{reliability:unreliable}}"); + const size_t COUNT=100; + r.setCapacity(COUNT); + NextReceiverThread rt(fix.session); + sys::Thread thread(rt); + sys::usleep(1000); // Give the receive thread time to block. + Sender s = fix.session.createSender("concurrent;{create:always}"); + for (size_t i = 0; i < COUNT; ++i) { + s.send(Message()); + } + s.send(Message("END")); + thread.join(); + BOOST_CHECK_EQUAL(rt.error, string()); + BOOST_CHECK_EQUAL(COUNT, rt.received.size()); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |