summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-05-13 18:54:39 +0000
committerAlan Conway <aconway@apache.org>2010-05-13 18:54:39 +0000
commit7e13897c9238d0e5a6a64df64eeddceb14c36002 (patch)
tree3e91072bbb28ba7f2403477b2c7f2f945cb3699c /cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
parent211eb170c15e6951f28e900e0ea8284e4e1456eb (diff)
downloadqpid-python-7e13897c9238d0e5a6a64df64eeddceb14c36002.tar.gz
Fix deadlocks & thread safety in new API classes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@943973 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp37
1 files changed, 27 insertions, 10 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index c26b2eb09f..b5d7bf78f4 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -104,6 +104,7 @@ struct Match
void IncomingMessages::setSession(qpid::client::AsyncSession s)
{
+ sys::Mutex::ScopedLock l(lock);
session = s;
incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
acceptTracker.reset();
@@ -111,13 +112,16 @@ void IncomingMessages::setSession(qpid::client::AsyncSession s)
bool IncomingMessages::get(Handler& handler, Duration timeout)
{
- //search through received list for any transfer of interest:
- for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
{
- MessageTransfer transfer(*i, *this);
- if (handler.accept(transfer)) {
- received.erase(i);
- return true;
+ sys::Mutex::ScopedLock l(lock);
+ //search through received list for any transfer of interest:
+ for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i++)
+ {
+ MessageTransfer transfer(*i, *this);
+ if (handler.accept(transfer)) {
+ received.erase(i);
+ return true;
+ }
}
}
//none found, check incoming:
@@ -126,6 +130,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout)
bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout)
{
+ sys::Mutex::ScopedLock l(lock);
//if there is not already a received message, we must wait for one
if (received.empty() && !wait(timeout)) return false;
//else we have a message in received; return the corresponding destination
@@ -135,20 +140,25 @@ bool IncomingMessages::getNextDestination(std::string& destination, Duration tim
void IncomingMessages::accept()
{
+ sys::Mutex::ScopedLock l(lock);
acceptTracker.accept(session);
}
void IncomingMessages::releaseAll()
{
- //first process any received messages...
- while (!received.empty()) {
- retrieve(received.front(), 0);
- received.pop_front();
+ {
+ //first process any received messages...
+ sys::Mutex::ScopedLock l(lock);
+ while (!received.empty()) {
+ retrieve(received.front(), 0);
+ received.pop_front();
+ }
}
//then pump out any available messages from incoming queue...
GetAny handler;
while (process(&handler, 0)) ;
//now release all messages
+ sys::Mutex::ScopedLock l(lock);
acceptTracker.release(session);
}
@@ -158,6 +168,7 @@ void IncomingMessages::releasePending(const std::string& destination)
while (process(0, 0)) ;
//now remove all messages for this destination from received list, recording their ids...
+ sys::Mutex::ScopedLock l(lock);
MatchAndTrack match(destination);
for (FrameSetQueue::iterator i = received.begin(); i != received.end(); i = match(*i) ? received.erase(i) : ++i) ;
//now release those messages
@@ -184,6 +195,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
} else {
//received message for another destination, keep for later
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ sys::Mutex::ScopedLock l(lock);
received.push_back(content);
}
} else {
@@ -200,6 +212,7 @@ bool IncomingMessages::wait(qpid::sys::Duration duration)
for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ sys::Mutex::ScopedLock l(lock);
received.push_back(content);
return true;
} else {
@@ -211,10 +224,12 @@ bool IncomingMessages::wait(qpid::sys::Duration duration)
uint32_t IncomingMessages::pendingAccept()
{
+ sys::Mutex::ScopedLock l(lock);
return acceptTracker.acceptsPending();
}
uint32_t IncomingMessages::pendingAccept(const std::string& destination)
{
+ sys::Mutex::ScopedLock l(lock);
return acceptTracker.acceptsPending(destination);
}
@@ -223,6 +238,7 @@ uint32_t IncomingMessages::available()
//first pump all available messages from incoming to received...
while (process(0, 0)) {}
//return the count of received messages
+ sys::Mutex::ScopedLock l(lock);
return received.size();
}
@@ -232,6 +248,7 @@ uint32_t IncomingMessages::available(const std::string& destination)
while (process(0, 0)) {}
//count all messages for this destination from received list
+ sys::Mutex::ScopedLock l(lock);
return std::for_each(received.begin(), received.end(), Match(destination)).matched;
}