diff options
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 42 |
1 files changed, 34 insertions, 8 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 3fa75a54ac..324b11e1df 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -25,6 +25,7 @@ #include <qpid/client/Dispatcher.h> #include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> +#include <qpid/framing/Uuid.h> #include <set> #include <sstream> @@ -34,35 +35,48 @@ namespace client { SubscriptionManager::SubscriptionManager(const Session& s) : dispatcher(s), session(s), - messages(UNLIMITED), bytes(UNLIMITED), window(true), + flowControl(UNLIMITED, UNLIMITED, false), acceptMode(0), acquireMode(0), autoStop(true) {} void SubscriptionManager::subscribeInternal( - const std::string& q, const std::string& dest) + const std::string& q, const std::string& dest, const FlowControl& fc) { session.messageSubscribe( arg::queue=q, arg::destination=dest, arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); - setFlowControl(dest, messages, bytes, window); + if (fc.messages || fc.bytes) // No need to set if all 0. + setFlowControl(dest, fc); } void SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& d) { + subscribe(listener, q, getFlowControl(), d); +} + +void SubscriptionManager::subscribe( + MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d) +{ std::string dest=d.empty() ? q:d; dispatcher.listen(dest, &listener, autoAck); - return subscribeInternal(q, dest); + return subscribeInternal(q, dest, fc); } void SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const std::string& d) { + subscribe(lq, q, getFlowControl(), d); +} + +void SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d) +{ std::string dest=d.empty() ? q:d; lq.session=session; lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest)); - return subscribeInternal(q, dest); + return subscribeInternal(q, dest, fc); } void SubscriptionManager::setFlowControl( @@ -74,14 +88,20 @@ void SubscriptionManager::setFlowControl( session.sync(); } +void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) { + setFlowControl(dest, fc.messages, fc.bytes, fc.window); +} + +void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; } + void SubscriptionManager::setFlowControl( uint32_t messages_, uint32_t bytes_, bool window_) { - messages=messages_; - bytes=bytes_; - window=window_; + setFlowControl(FlowControl(messages_, bytes_, window_)); } +const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; } + void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; } void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; } @@ -109,6 +129,12 @@ void SubscriptionManager::stop() dispatcher.stop(); } +Message SubscriptionManager::get(const std::string& queue) { + LocalQueue lq; + subscribe(lq, queue, FlowControl::messageCredit(1), framing::Uuid(true).str()); + return lq.get(); +} + }} // namespace qpid::client #endif |