diff options
| author | Alan Conway <aconway@apache.org> | 2010-05-13 18:54:39 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2010-05-13 18:54:39 +0000 |
| commit | 7e13897c9238d0e5a6a64df64eeddceb14c36002 (patch) | |
| tree | 3e91072bbb28ba7f2403477b2c7f2f945cb3699c /cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | |
| parent | 211eb170c15e6951f28e900e0ea8284e4e1456eb (diff) | |
| download | qpid-python-7e13897c9238d0e5a6a64df64eeddceb14c36002.tar.gz | |
Fix deadlocks & thread safety in new API classes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@943973 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 37 |
1 files changed, 27 insertions, 10 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index c26b2eb09f..b5d7bf78f4 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -104,6 +104,7 @@ struct Match void IncomingMessages::setSession(qpid::client::AsyncSession s) { + sys::Mutex::ScopedLock l(lock); session = s; incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault(); acceptTracker.reset(); @@ -111,13 +112,16 @@ void IncomingMessages::setSession(qpid::client::AsyncSession s) bool IncomingMessages::get(Handler& handler, Duration timeout) { - //search through received list for any transfer of interest: - for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++) { - MessageTransfer transfer(*i, *this); - if (handler.accept(transfer)) { - received.erase(i); - return true; + sys::Mutex::ScopedLock l(lock); + //search through received list for any transfer of interest: + for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++) + { + MessageTransfer transfer(*i, *this); + if (handler.accept(transfer)) { + received.erase(i); + return true; + } } } //none found, check incoming: @@ -126,6 +130,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout) bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout) { + sys::Mutex::ScopedLock l(lock); //if there is not already a received message, we must wait for one if (received.empty() && !wait(timeout)) return false; //else we have a message in received; return the corresponding destination @@ -135,20 +140,25 @@ bool IncomingMessages::getNextDestination(std::string& destination, Duration tim void IncomingMessages::accept() { + sys::Mutex::ScopedLock l(lock); acceptTracker.accept(session); } void IncomingMessages::releaseAll() { - //first process any received messages... - while (!received.empty()) { - retrieve(received.front(), 0); - received.pop_front(); + { + //first process any received messages... + sys::Mutex::ScopedLock l(lock); + while (!received.empty()) { + retrieve(received.front(), 0); + received.pop_front(); + } } //then pump out any available messages from incoming queue... GetAny handler; while (process(&handler, 0)) ; //now release all messages + sys::Mutex::ScopedLock l(lock); acceptTracker.release(session); } @@ -158,6 +168,7 @@ void IncomingMessages::releasePending(const std::string& destination) while (process(0, 0)) ; //now remove all messages for this destination from received list, recording their ids... + sys::Mutex::ScopedLock l(lock); MatchAndTrack match(destination); for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i) ; //now release those messages @@ -184,6 +195,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) } else { //received message for another destination, keep for later QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); + sys::Mutex::ScopedLock l(lock); received.push_back(content); } } else { @@ -200,6 +212,7 @@ bool IncomingMessages::wait(qpid::sys::Duration duration) for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { if (content->isA<MessageTransferBody>()) { QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); + sys::Mutex::ScopedLock l(lock); received.push_back(content); return true; } else { @@ -211,10 +224,12 @@ bool IncomingMessages::wait(qpid::sys::Duration duration) uint32_t IncomingMessages::pendingAccept() { + sys::Mutex::ScopedLock l(lock); return acceptTracker.acceptsPending(); } uint32_t IncomingMessages::pendingAccept(const std::string& destination) { + sys::Mutex::ScopedLock l(lock); return acceptTracker.acceptsPending(destination); } @@ -223,6 +238,7 @@ uint32_t IncomingMessages::available() //first pump all available messages from incoming to received... while (process(0, 0)) {} //return the count of received messages + sys::Mutex::ScopedLock l(lock); return received.size(); } @@ -232,6 +248,7 @@ uint32_t IncomingMessages::available(const std::string& destination) while (process(0, 0)) {} //count all messages for this destination from received list + sys::Mutex::ScopedLock l(lock); return std::for_each(received.begin(), received.end(), Match(destination)).matched; } |
