summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp25
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h3
2 files changed, 17 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index c13e0ef5e6..2ca2c85c64 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -164,12 +164,17 @@ bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout)
ScopedRelease release(inUse, lock);
sys::Mutex::ScopedUnlock l(lock);
//wait for suitable new message to arrive
- if (process(&handler, get_duration(timeout, deadline))) {
+ switch (process(&handler, get_duration(timeout, deadline))) {
+ case OK:
return true;
+ case CLOSED:
+ return false;
+ case EMPTY:
+ break;
}
}
if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed");
- } while (AbsTime::now() < deadline && !incoming->isClosed());
+ } while (AbsTime::now() < deadline);
return false;
}
namespace {
@@ -233,7 +238,7 @@ void IncomingMessages::releaseAll()
}
//then pump out any available messages from incoming queue...
GetAny handler;
- while (process(&handler, 0)) ;
+ while (process(&handler, 0) == OK) ;
//now release all messages
sys::Mutex::ScopedLock l(lock);
acceptTracker.release(session);
@@ -242,7 +247,7 @@ void IncomingMessages::releaseAll()
void IncomingMessages::releasePending(const std::string& destination)
{
//first pump all available messages from incoming to received...
- while (process(0, 0)) ;
+ while (process(0, 0) == OK) ;
//now remove all messages for this destination from received list, recording their ids...
sys::Mutex::ScopedLock l(lock);
@@ -269,7 +274,7 @@ bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration ti
* that are not accepted by the handler are pushed onto received queue
* for later retrieval.
*/
-bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
+IncomingMessages::ProcessState IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
{
AbsTime deadline(AbsTime::now(), duration);
FrameSet::shared_ptr content;
@@ -282,7 +287,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
} else if (handler && handler->accept(transfer)) {
QPID_LOG(debug, "Delivered " << *content->getMethod() << " "
<< *content->getHeaders());
- return true;
+ return OK;
} else {
//received message for another destination, keep for later
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
@@ -295,8 +300,8 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
}
}
}
- catch (const qpid::ClosedException&) {} // Just return false if queue closed.
- return false;
+ catch (const qpid::ClosedException&) { return CLOSED; }
+ return EMPTY;
}
bool IncomingMessages::wait(qpid::sys::Duration duration)
@@ -331,7 +336,7 @@ uint32_t IncomingMessages::pendingAccept(const std::string& destination)
uint32_t IncomingMessages::available()
{
//first pump all available messages from incoming to received...
- while (process(0, 0)) {}
+ while (process(0, 0) == OK) {}
//return the count of received messages
sys::Mutex::ScopedLock l(lock);
return received.size();
@@ -340,7 +345,7 @@ uint32_t IncomingMessages::available()
uint32_t IncomingMessages::available(const std::string& destination)
{
//first pump all available messages from incoming to received...
- while (process(0, 0)) {}
+ while (process(0, 0) == OK) {}
//count all messages for this destination from received list
sys::Mutex::ScopedLock l(lock);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index c9ea0673a3..4c9ee68ece 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -87,6 +87,7 @@ class IncomingMessages
uint32_t available(const std::string& destination);
private:
typedef std::deque<FrameSetPtr> FrameSetQueue;
+ enum ProcessState {EMPTY=0,OK=1,CLOSED=2};
sys::Monitor lock;
qpid::client::AsyncSession session;
@@ -95,7 +96,7 @@ class IncomingMessages
FrameSetQueue received;
AcceptTracker acceptTracker;
- bool process(Handler*, qpid::sys::Duration);
+ ProcessState process(Handler*, qpid::sys::Duration);
bool wait(qpid::sys::Duration);
bool pop(FrameSetPtr&, qpid::sys::Duration);