diff options
| author | Gordon Sim <gsim@apache.org> | 2009-09-14 10:21:49 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-09-14 10:21:49 +0000 |
| commit | e5a9bace572937edff916e3d3f2205e3e54fdf05 (patch) | |
| tree | adc8e5e5fa1cd558e0868245889670c19683a93e /cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | |
| parent | e2e4f4e3450d13176a84b93736abb21f9a9df1fe (diff) | |
| download | qpid-python-e5a9bace572937edff916e3d3f2205e3e54fdf05.tar.gz | |
Added available and pendingAck properties to Receiver; added capacity and pending properties to Sender.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@814562 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 53 |
1 files changed, 48 insertions, 5 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index b0a16674e1..d22208368b 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -81,12 +81,31 @@ struct MatchAndTrack } } }; + +struct Match +{ + const std::string destination; + uint32_t matched; + + Match(const std::string& d) : destination(d), matched(0) {} + + bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command) + { + if (command->as<MessageTransferBody>()->getDestination() == destination) { + ++matched; + return true; + } else { + return false; + } + } +}; } void IncomingMessages::setSession(qpid::client::AsyncSession s) { session = s; incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault(); + acceptTracker.reset(); } bool IncomingMessages::get(Handler& handler, Duration timeout) @@ -106,8 +125,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout) void IncomingMessages::accept() { - session.messageAccept(unaccepted); - unaccepted.clear(); + acceptTracker.accept(session); } void IncomingMessages::releaseAll() @@ -121,8 +139,7 @@ void IncomingMessages::releaseAll() GetAny handler; while (process(&handler, 0)) ; //now release all messages - session.messageRelease(unaccepted); - unaccepted.clear(); + acceptTracker.release(session); } void IncomingMessages::releasePending(const std::string& destination) @@ -166,6 +183,32 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) return false; } +uint32_t IncomingMessages::pendingAccept() +{ + return acceptTracker.acceptsPending(); +} +uint32_t IncomingMessages::pendingAccept(const std::string& destination) +{ + return acceptTracker.acceptsPending(destination); +} + +uint32_t IncomingMessages::available() +{ + //first pump all available messages from incoming to received... + while (process(0, 0)) {} + //return the count of received messages + return received.size(); +} + +uint32_t IncomingMessages::available(const std::string& destination) +{ + //first pump all available messages from incoming to received... + while (process(0, 0)) {} + + //count all messages for this destination from received list + return std::for_each(received.begin(), received.end(), Match(destination)).matched; +} + void populate(qpid::messaging::Message& message, FrameSet& command); /** @@ -180,7 +223,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m } const MessageTransferBody* transfer = command->as<MessageTransferBody>(); if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) { - unaccepted.add(command->getId()); + acceptTracker.delivered(transfer->getDestination(), command->getId()); } session.markCompleted(command->getId(), false, false); } |
