summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-09-14 10:21:49 +0000
committerGordon Sim <gsim@apache.org>2009-09-14 10:21:49 +0000
commite5a9bace572937edff916e3d3f2205e3e54fdf05 (patch)
treeadc8e5e5fa1cd558e0868245889670c19683a93e /cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
parente2e4f4e3450d13176a84b93736abb21f9a9df1fe (diff)
downloadqpid-python-e5a9bace572937edff916e3d3f2205e3e54fdf05.tar.gz
Added available and pendingAck properties to Receiver; added capacity and pending properties to Sender.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@814562 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp53
1 files changed, 48 insertions, 5 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index b0a16674e1..d22208368b 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -81,12 +81,31 @@ struct MatchAndTrack
}
}
};
+
+struct Match
+{
+ const std::string destination;
+ uint32_t matched;
+
+ Match(const std::string& d) : destination(d), matched(0) {}
+
+ bool operator()(boost::shared_ptr<qpid::framing::FrameSet> command)
+ {
+ if (command->as<MessageTransferBody>()->getDestination() == destination) {
+ ++matched;
+ return true;
+ } else {
+ return false;
+ }
+ }
+};
}
void IncomingMessages::setSession(qpid::client::AsyncSession s)
{
session = s;
incoming = SessionBase_0_10Access(session).get()->getDemux().getDefault();
+ acceptTracker.reset();
}
bool IncomingMessages::get(Handler& handler, Duration timeout)
@@ -106,8 +125,7 @@ bool IncomingMessages::get(Handler& handler, Duration timeout)
void IncomingMessages::accept()
{
- session.messageAccept(unaccepted);
- unaccepted.clear();
+ acceptTracker.accept(session);
}
void IncomingMessages::releaseAll()
@@ -121,8 +139,7 @@ void IncomingMessages::releaseAll()
GetAny handler;
while (process(&handler, 0)) ;
//now release all messages
- session.messageRelease(unaccepted);
- unaccepted.clear();
+ acceptTracker.release(session);
}
void IncomingMessages::releasePending(const std::string& destination)
@@ -166,6 +183,32 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
return false;
}
+uint32_t IncomingMessages::pendingAccept()
+{
+ return acceptTracker.acceptsPending();
+}
+uint32_t IncomingMessages::pendingAccept(const std::string& destination)
+{
+ return acceptTracker.acceptsPending(destination);
+}
+
+uint32_t IncomingMessages::available()
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+ //return the count of received messages
+ return received.size();
+}
+
+uint32_t IncomingMessages::available(const std::string& destination)
+{
+ //first pump all available messages from incoming to received...
+ while (process(0, 0)) {}
+
+ //count all messages for this destination from received list
+ return std::for_each(received.begin(), received.end(), Match(destination)).matched;
+}
+
void populate(qpid::messaging::Message& message, FrameSet& command);
/**
@@ -180,7 +223,7 @@ void IncomingMessages::retrieve(FrameSetPtr command, qpid::messaging::Message* m
}
const MessageTransferBody* transfer = command->as<MessageTransferBody>();
if (transfer->getAcquireMode() == ACQUIRE_MODE_PRE_ACQUIRED && transfer->getAcceptMode() == ACCEPT_MODE_EXPLICIT) {
- unaccepted.add(command->getId());
+ acceptTracker.delivered(transfer->getDestination(), command->getId());
}
session.markCompleted(command->getId(), false, false);
}