diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 35 |
1 files changed, 30 insertions, 5 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 8e060c62d7..e66dc5915c 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -123,6 +123,15 @@ bool IncomingMessages::get(Handler& handler, Duration timeout) return process(&handler, timeout); } +bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout) +{ + //if there is not already a received message, we must wait for one + if (received.empty() && !wait(timeout)) return false; + //else we have a message in received; return the corresponding destination + destination = received.front()->as<MessageTransferBody>()->getDestination(); + return true; +} + void IncomingMessages::accept() { acceptTracker.accept(session); @@ -155,11 +164,11 @@ void IncomingMessages::releasePending(const std::string& destination) } /** - * Get a frameset from session queue, waiting for up to the specified - * duration and returning true if this could be achieved, false - * otherwise. If a destination is supplied, only return a message for - * that destination. In this case messages from other destinations - * will be held on a received queue. + * Get a frameset that is accepted by the specified handler from + * session queue, waiting for up to the specified duration and + * returning true if this could be achieved, false otherwise. Messages + * that are not accepted by the handler are pushed onto received queue + * for later retrieval. */ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) { @@ -183,6 +192,22 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) return false; } +bool IncomingMessages::wait(qpid::sys::Duration duration) +{ + AbsTime deadline(AbsTime::now(), duration); + FrameSet::shared_ptr content; + for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { + if (content->isA<MessageTransferBody>()) { + QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); + received.push_back(content); + return true; + } else { + //TODO: handle other types of commands (e.g. message-accept, message-flow etc) + } + } + return false; +} + uint32_t IncomingMessages::pendingAccept() { return acceptTracker.acceptsPending(); |
