summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SubscriptionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp42
1 files changed, 34 insertions, 8 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 3fa75a54ac..324b11e1df 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -25,6 +25,7 @@
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
+#include <qpid/framing/Uuid.h>
#include <set>
#include <sstream>
@@ -34,35 +35,48 @@ namespace client {
SubscriptionManager::SubscriptionManager(const Session& s)
: dispatcher(s), session(s),
- messages(UNLIMITED), bytes(UNLIMITED), window(true),
+ flowControl(UNLIMITED, UNLIMITED, false),
acceptMode(0), acquireMode(0),
autoStop(true)
{}
void SubscriptionManager::subscribeInternal(
- const std::string& q, const std::string& dest)
+ const std::string& q, const std::string& dest, const FlowControl& fc)
{
session.messageSubscribe(
arg::queue=q, arg::destination=dest,
arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
- setFlowControl(dest, messages, bytes, window);
+ if (fc.messages || fc.bytes) // No need to set if all 0.
+ setFlowControl(dest, fc);
}
void SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& d)
{
+ subscribe(listener, q, getFlowControl(), d);
+}
+
+void SubscriptionManager::subscribe(
+ MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d)
+{
std::string dest=d.empty() ? q:d;
dispatcher.listen(dest, &listener, autoAck);
- return subscribeInternal(q, dest);
+ return subscribeInternal(q, dest, fc);
}
void SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const std::string& d)
{
+ subscribe(lq, q, getFlowControl(), d);
+}
+
+void SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d)
+{
std::string dest=d.empty() ? q:d;
lq.session=session;
lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
- return subscribeInternal(q, dest);
+ return subscribeInternal(q, dest, fc);
}
void SubscriptionManager::setFlowControl(
@@ -74,14 +88,20 @@ void SubscriptionManager::setFlowControl(
session.sync();
}
+void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) {
+ setFlowControl(dest, fc.messages, fc.bytes, fc.window);
+}
+
+void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; }
+
void SubscriptionManager::setFlowControl(
uint32_t messages_, uint32_t bytes_, bool window_)
{
- messages=messages_;
- bytes=bytes_;
- window=window_;
+ setFlowControl(FlowControl(messages_, bytes_, window_));
}
+const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; }
+
void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; }
void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
@@ -109,6 +129,12 @@ void SubscriptionManager::stop()
dispatcher.stop();
}
+Message SubscriptionManager::get(const std::string& queue) {
+ LocalQueue lq;
+ subscribe(lq, queue, FlowControl::messageCredit(1), framing::Uuid(true).str());
+ return lq.get();
+}
+
}} // namespace qpid::client
#endif