summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-11-12 10:30:53 +0000
committerGordon Sim <gsim@apache.org>2009-11-12 10:30:53 +0000
commit51e77c8bc7dc4d71422b421135ded1cb33bb5c55 (patch)
tree0588f217faf0babf5d4de255d113c05214a997ae /cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
parentf3de267295dc756a0abaa6187562396374cba41c (diff)
downloadqpid-python-51e77c8bc7dc4d71422b421135ded1cb33bb5c55.tar.gz
Merge branch 'next_receiver_changes' into trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@835323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp35
1 files changed, 30 insertions, 5 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 8e060c62d7..e66dc5915c 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -123,6 +123,15 @@ bool IncomingMessages::get(Handler& handler, Duration timeout)
return process(&handler, timeout);
}
+bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout)
+{
+ //if there is not already a received message, we must wait for one
+ if (received.empty() && !wait(timeout)) return false;
+ //else we have a message in received; return the corresponding destination
+ destination = received.front()->as<MessageTransferBody>()->getDestination();
+ return true;
+}
+
void IncomingMessages::accept()
{
acceptTracker.accept(session);
@@ -155,11 +164,11 @@ void IncomingMessages::releasePending(const std::string& destination)
}
/**
- * Get a frameset from session queue, waiting for up to the specified
- * duration and returning true if this could be achieved, false
- * otherwise. If a destination is supplied, only return a message for
- * that destination. In this case messages from other destinations
- * will be held on a received queue.
+ * Get a frameset that is accepted by the specified handler from
+ * session queue, waiting for up to the specified duration and
+ * returning true if this could be achieved, false otherwise. Messages
+ * that are not accepted by the handler are pushed onto received queue
+ * for later retrieval.
*/
bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
{
@@ -183,6 +192,22 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
return false;
}
+bool IncomingMessages::wait(qpid::sys::Duration duration)
+{
+ AbsTime deadline(AbsTime::now(), duration);
+ FrameSet::shared_ptr content;
+ for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ if (content->isA<MessageTransferBody>()) {
+ QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ received.push_back(content);
+ return true;
+ } else {
+ //TODO: handle other types of commands (e.g. message-accept, message-flow etc)
+ }
+ }
+ return false;
+}
+
uint32_t IncomingMessages::pendingAccept()
{
return acceptTracker.acceptsPending();