summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SubscriptionManager.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.h')
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h128
1 files changed, 70 insertions, 58 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 07faa48fee..8b27a2c9b9 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -25,9 +25,10 @@
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Completion.h>
#include <qpid/client/Session.h>
+#include <qpid/client/AsyncSession.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/LocalQueue.h>
-#include <qpid/client/FlowControl.h>
+#include <qpid/client/Subscription.h>
#include <qpid/sys/Runnable.h>
#include <set>
#include <sstream>
@@ -48,15 +49,10 @@ class SubscriptionManager : public sys::Runnable
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
- void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&);
-
qpid::client::Dispatcher dispatcher;
qpid::client::AsyncSession session;
- FlowControl flowControl;
- AckPolicy autoAck;
- bool acceptMode;
- bool acquireMode;
bool autoStop;
+ SubscriptionSettings defaultSettings;
public:
/** Create a new SubscriptionManager associated with a session */
@@ -70,14 +66,13 @@ class SubscriptionManager : public sys::Runnable
*
*@param listener Listener object to receive messages.
*@param queue Name of the queue to subscribe to.
- *@param flow initial FlowControl for the subscription.
- *@param tag Unique destination tag for the listener.
- * If not specified, the queue name is used.
+ *@param settings settings for the subscription.
+ *@param name unique destination name for the subscription, defaults to queue name.
*/
- void subscribe(MessageListener& listener,
- const std::string& queue,
- const FlowControl& flow,
- const std::string& tag=std::string());
+ Subscription subscribe(MessageListener& listener,
+ const std::string& queue,
+ const SubscriptionSettings& settings,
+ const std::string& name=std::string());
/**
* Subscribe a LocalQueue to receive messages from queue.
@@ -86,13 +81,13 @@ class SubscriptionManager : public sys::Runnable
*
*@param queue Name of the queue to subscribe to.
*@param flow initial FlowControl for the subscription.
- *@param tag Unique destination tag for the listener.
+ *@param name unique destination name for the subscription, defaults to queue name.
* If not specified, the queue name is used.
*/
- void subscribe(LocalQueue& localQueue,
- const std::string& queue,
- const FlowControl& flow,
- const std::string& tag=std::string());
+ Subscription subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const SubscriptionSettings& settings,
+ const std::string& name=std::string());
/**
* Subscribe a MessagesListener to receive messages from queue.
@@ -102,12 +97,12 @@ class SubscriptionManager : public sys::Runnable
*
*@param listener Listener object to receive messages.
*@param queue Name of the queue to subscribe to.
- *@param tag Unique destination tag for the listener.
+ *@param name unique destination name for the subscription, defaults to queue name.
* If not specified, the queue name is used.
*/
- void subscribe(MessageListener& listener,
- const std::string& queue,
- const std::string& tag=std::string());
+ Subscription subscribe(MessageListener& listener,
+ const std::string& queue,
+ const std::string& name=std::string());
/**
* Subscribe a LocalQueue to receive messages from queue.
@@ -115,12 +110,12 @@ class SubscriptionManager : public sys::Runnable
* Incoming messages are stored in the queue for you to retrieve.
*
*@param queue Name of the queue to subscribe to.
- *@param tag Unique destination tag for the listener.
+ *@param name unique destination name for the subscription, defaults to queue name.
* If not specified, the queue name is used.
*/
- void subscribe(LocalQueue& localQueue,
- const std::string& queue,
- const std::string& tag=std::string());
+ Subscription subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const std::string& name=std::string());
/** Get a single message from a queue.
@@ -131,8 +126,13 @@ class SubscriptionManager : public sys::Runnable
*/
bool get(Message& result, const std::string& queue, sys::Duration timeout=0);
- /** Cancel a subscription. */
- void cancel(const std::string tag);
+ /** Get a subscription by name, returns a null Subscription handle
+ * if not found.
+ */
+ Subscription getSubscription(const std::string& name) const;
+
+ /** Cancel a subscription. See also: Subscription.cancel() */
+ void cancel(const std::string& name);
/** Deliver messages in the current thread until stop() is called.
* Only one thread may be running in a SubscriptionManager at a time.
@@ -157,53 +157,65 @@ class SubscriptionManager : public sys::Runnable
static const uint32_t UNLIMITED=0xFFFFFFFF;
- /** Set the flow control for destination. */
- void setFlowControl(const std::string& destintion, const FlowControl& flow);
-
- /** Set the default initial flow control for subscriptions that do not specify it. */
- void setFlowControl(const FlowControl& flow);
+ /** Set the flow control for a subscription. */
+ void setFlowControl(const std::string& name, const FlowControl& flow) {
+ getSubscription(name).setFlowControl(flow);
+ }
- /** Get the default flow control for new subscriptions that do not specify it. */
- const FlowControl& getFlowControl() const;
-
- /** Set the flow control for destination tag.
- *@param tag: name of the destination.
+ /** Set the flow control for a subscription.
+ *@param name: name of the subscription.
*@param messages: message credit.
*@param bytes: byte credit.
*@param window: if true use window-based flow control.
*/
- void setFlowControl(const std::string& tag, uint32_t messages, uint32_t bytes, bool window=true);
+ void setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window=true) {
+ setFlowControl(name, messages, bytes, window);
+ }
- /** Set the initial flow control settings to be applied to each new subscribtion.
- *@param messages: message credit.
- *@param bytes: byte credit.
- *@param window: if true use window-based flow control.
+ /** Set the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
+ */
+ void setDefaultSettings(const SubscriptionSettings& s) { defaultSettings = s; }
+
+ /** Get the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
*/
- void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true);
+ const SubscriptionSettings& getDefaultSettings() const { return defaultSettings; }
- /** Set the accept-mode for new subscriptions. Defaults to true.
- *@param required: if true messages must be confirmed by calling
- *Message::acknowledge() or automatically via an AckPolicy, see setAckPolicy()
+ /** Get the default settings for subscribe() calls that don't
+ * include a SubscriptionSettings parameter.
*/
- void setAcceptMode(bool required);
+ SubscriptionSettings& getDefaultSettings() { return defaultSettings; }
- /** Set the acquire-mode for new subscriptions. Defaults to false.
- *@param acquire: if false messages pre-acquired, if true
- * messages are dequed on acknowledgement or on transfer
- * depending on acceptMode.
+ /**
+ * Set the default flow control settings for subscribe() calls
+ * that don't include a SubscriptionSettings parameter.
+ *
+ *@param messages: message credit.
+ *@param bytes: byte credit.
+ *@param window: if true use window-based flow control.
*/
- void setAcquireMode(bool acquire);
+ void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true) {
+ defaultSettings.flowControl = FlowControl(messages, bytes, window);
+ }
- /** Set the acknowledgement policy for new subscriptions.
- * Default is to acknowledge every message automatically.
+ /**
+ *Set the default accept-mode for subscribe() calls that don't
+ *include a SubscriptionSettings parameter.
*/
- void setAckPolicy(const AckPolicy& autoAck);
+ void setAcceptMode(AcceptMode mode) { defaultSettings.acceptMode = mode; }
- AckPolicy& getAckPolicy();
+ /**
+ * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings.
+ */
+ void setAcquireMode(AcquireMode mode) { defaultSettings.acquireMode = mode; }
void registerFailoverHandler ( boost::function<void ()> fh );
Session getSession() const;
+
+ private:
+ std::map<std::string, Subscription> subscriptions;
};
/** AutoCancel cancels a subscription in its destructor */