diff options
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.h')
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 128 |
1 files changed, 70 insertions, 58 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 07faa48fee..8b27a2c9b9 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -25,9 +25,10 @@ #include <qpid/client/Dispatcher.h> #include <qpid/client/Completion.h> #include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> #include <qpid/client/MessageListener.h> #include <qpid/client/LocalQueue.h> -#include <qpid/client/FlowControl.h> +#include <qpid/client/Subscription.h> #include <qpid/sys/Runnable.h> #include <set> #include <sstream> @@ -48,15 +49,10 @@ class SubscriptionManager : public sys::Runnable typedef sys::Mutex::ScopedLock Lock; typedef sys::Mutex::ScopedUnlock Unlock; - void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&); - qpid::client::Dispatcher dispatcher; qpid::client::AsyncSession session; - FlowControl flowControl; - AckPolicy autoAck; - bool acceptMode; - bool acquireMode; bool autoStop; + SubscriptionSettings defaultSettings; public: /** Create a new SubscriptionManager associated with a session */ @@ -70,14 +66,13 @@ class SubscriptionManager : public sys::Runnable * *@param listener Listener object to receive messages. *@param queue Name of the queue to subscribe to. - *@param flow initial FlowControl for the subscription. - *@param tag Unique destination tag for the listener. - * If not specified, the queue name is used. + *@param settings settings for the subscription. + *@param name unique destination name for the subscription, defaults to queue name. */ - void subscribe(MessageListener& listener, - const std::string& queue, - const FlowControl& flow, - const std::string& tag=std::string()); + Subscription subscribe(MessageListener& listener, + const std::string& queue, + const SubscriptionSettings& settings, + const std::string& name=std::string()); /** * Subscribe a LocalQueue to receive messages from queue. @@ -86,13 +81,13 @@ class SubscriptionManager : public sys::Runnable * *@param queue Name of the queue to subscribe to. *@param flow initial FlowControl for the subscription. - *@param tag Unique destination tag for the listener. + *@param name unique destination name for the subscription, defaults to queue name. * If not specified, the queue name is used. */ - void subscribe(LocalQueue& localQueue, - const std::string& queue, - const FlowControl& flow, - const std::string& tag=std::string()); + Subscription subscribe(LocalQueue& localQueue, + const std::string& queue, + const SubscriptionSettings& settings, + const std::string& name=std::string()); /** * Subscribe a MessagesListener to receive messages from queue. @@ -102,12 +97,12 @@ class SubscriptionManager : public sys::Runnable * *@param listener Listener object to receive messages. *@param queue Name of the queue to subscribe to. - *@param tag Unique destination tag for the listener. + *@param name unique destination name for the subscription, defaults to queue name. * If not specified, the queue name is used. */ - void subscribe(MessageListener& listener, - const std::string& queue, - const std::string& tag=std::string()); + Subscription subscribe(MessageListener& listener, + const std::string& queue, + const std::string& name=std::string()); /** * Subscribe a LocalQueue to receive messages from queue. @@ -115,12 +110,12 @@ class SubscriptionManager : public sys::Runnable * Incoming messages are stored in the queue for you to retrieve. * *@param queue Name of the queue to subscribe to. - *@param tag Unique destination tag for the listener. + *@param name unique destination name for the subscription, defaults to queue name. * If not specified, the queue name is used. */ - void subscribe(LocalQueue& localQueue, - const std::string& queue, - const std::string& tag=std::string()); + Subscription subscribe(LocalQueue& localQueue, + const std::string& queue, + const std::string& name=std::string()); /** Get a single message from a queue. @@ -131,8 +126,13 @@ class SubscriptionManager : public sys::Runnable */ bool get(Message& result, const std::string& queue, sys::Duration timeout=0); - /** Cancel a subscription. */ - void cancel(const std::string tag); + /** Get a subscription by name, returns a null Subscription handle + * if not found. + */ + Subscription getSubscription(const std::string& name) const; + + /** Cancel a subscription. See also: Subscription.cancel() */ + void cancel(const std::string& name); /** Deliver messages in the current thread until stop() is called. * Only one thread may be running in a SubscriptionManager at a time. @@ -157,53 +157,65 @@ class SubscriptionManager : public sys::Runnable static const uint32_t UNLIMITED=0xFFFFFFFF; - /** Set the flow control for destination. */ - void setFlowControl(const std::string& destintion, const FlowControl& flow); - - /** Set the default initial flow control for subscriptions that do not specify it. */ - void setFlowControl(const FlowControl& flow); + /** Set the flow control for a subscription. */ + void setFlowControl(const std::string& name, const FlowControl& flow) { + getSubscription(name).setFlowControl(flow); + } - /** Get the default flow control for new subscriptions that do not specify it. */ - const FlowControl& getFlowControl() const; - - /** Set the flow control for destination tag. - *@param tag: name of the destination. + /** Set the flow control for a subscription. + *@param name: name of the subscription. *@param messages: message credit. *@param bytes: byte credit. *@param window: if true use window-based flow control. */ - void setFlowControl(const std::string& tag, uint32_t messages, uint32_t bytes, bool window=true); + void setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window=true) { + setFlowControl(name, messages, bytes, window); + } - /** Set the initial flow control settings to be applied to each new subscribtion. - *@param messages: message credit. - *@param bytes: byte credit. - *@param window: if true use window-based flow control. + /** Set the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. + */ + void setDefaultSettings(const SubscriptionSettings& s) { defaultSettings = s; } + + /** Get the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. */ - void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true); + const SubscriptionSettings& getDefaultSettings() const { return defaultSettings; } - /** Set the accept-mode for new subscriptions. Defaults to true. - *@param required: if true messages must be confirmed by calling - *Message::acknowledge() or automatically via an AckPolicy, see setAckPolicy() + /** Get the default settings for subscribe() calls that don't + * include a SubscriptionSettings parameter. */ - void setAcceptMode(bool required); + SubscriptionSettings& getDefaultSettings() { return defaultSettings; } - /** Set the acquire-mode for new subscriptions. Defaults to false. - *@param acquire: if false messages pre-acquired, if true - * messages are dequed on acknowledgement or on transfer - * depending on acceptMode. + /** + * Set the default flow control settings for subscribe() calls + * that don't include a SubscriptionSettings parameter. + * + *@param messages: message credit. + *@param bytes: byte credit. + *@param window: if true use window-based flow control. */ - void setAcquireMode(bool acquire); + void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true) { + defaultSettings.flowControl = FlowControl(messages, bytes, window); + } - /** Set the acknowledgement policy for new subscriptions. - * Default is to acknowledge every message automatically. + /** + *Set the default accept-mode for subscribe() calls that don't + *include a SubscriptionSettings parameter. */ - void setAckPolicy(const AckPolicy& autoAck); + void setAcceptMode(AcceptMode mode) { defaultSettings.acceptMode = mode; } - AckPolicy& getAckPolicy(); + /** + * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings. + */ + void setAcquireMode(AcquireMode mode) { defaultSettings.acquireMode = mode; } void registerFailoverHandler ( boost::function<void ()> fh ); Session getSession() const; + + private: + std::map<std::string, Subscription> subscriptions; }; /** AutoCancel cancels a subscription in its destructor */ |