summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp35
1 files changed, 30 insertions, 5 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 8e060c62d7..e66dc5915c 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -123,6 +123,15 @@ bool IncomingMessages::get(Handler& handler, Duration timeout)
return process(&handler, timeout);
}
+bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout)
+{
+ //if there is not already a received message, we must wait for one
+ if (received.empty() && !wait(timeout)) return false;
+ //else we have a message in received; return the corresponding destination
+ destination = received.front()->as<MessageTransferBody>()->getDestination();
+ return true;
+}
+
void IncomingMessages::accept()
{
acceptTracker.accept(session);
@@ -155,11 +164,11 @@ void IncomingMessages::releasePending(const std::string& destination)
}
/**
- * Get a frameset from session queue, waiting for up to the specified
- * duration and returning true if this could be achieved, false
- * otherwise. If a destination is supplied, only return a message for
- * that destination. In this case messages from other destinations
- * will be held on a received queue.
+ * Get a frameset that is accepted by the specified handler from
+ * session queue, waiting for up to the specified duration and
+ * returning true if this could be achieved, false otherwise. Messages
+ * that are not accepted by the handler are pushed onto received queue
+ * for later retrieval.
*/
bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
{
@@ -183,6 +192,22 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
return false;
}
+bool IncomingMessages::wait(qpid::sys::Duration duration)
+{
+ AbsTime deadline(AbsTime::now(), duration);
+ FrameSet::shared_ptr content;
+ for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ if (content->isA<MessageTransferBody>()) {
+ QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ received.push_back(content);
+ return true;
+ } else {
+ //TODO: handle other types of commands (e.g. message-accept, message-flow etc)
+ }
+ }
+ return false;
+}
+
uint32_t IncomingMessages::pendingAccept()
{
return acceptTracker.acceptsPending();