#ifndef QPID_CLIENT_SUBSCRIPTIONMANAGER_H #define QPID_CLIENT_SUBSCRIPTIONMANAGER_H /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "qpid/client/Session.h" #include "qpid/client/Subscription.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include "qpid/client/ClientImportExport.h" #include "qpid/client/MessageListener.h" #include "qpid/client/LocalQueue.h" #include "qpid/client/Handle.h" #include namespace qpid { namespace client { class SubscriptionManagerImpl; /** * A class to help create and manage subscriptions. * * Set up your subscriptions, then call run() to have messages * delivered. * * \ingroup clientapi * * \details * *

Subscribing and canceling subscriptions

* * * *

Waiting for messages (and returning)

* * * */ class QPID_CLIENT_CLASS_EXTERN SubscriptionManager : public sys::Runnable, public Handle { public: /** Create a new SubscriptionManager associated with a session */ QPID_CLIENT_EXTERN SubscriptionManager(const Session& session); QPID_CLIENT_EXTERN SubscriptionManager(const SubscriptionManager&); QPID_CLIENT_EXTERN ~SubscriptionManager(); QPID_CLIENT_EXTERN SubscriptionManager& operator=(const SubscriptionManager&); /** * Subscribe a MessagesListener to receive messages from queue. * * Provide your own subclass of MessagesListener to process * incoming messages. It will be called for each message received. * *@param listener Listener object to receive messages. *@param queue Name of the queue to subscribe to. *@param settings settings for the subscription. *@param name unique destination name for the subscription, defaults to queue name. */ QPID_CLIENT_EXTERN Subscription subscribe(MessageListener& listener, const std::string& queue, const SubscriptionSettings& settings, const std::string& name=std::string()); /** * Subscribe a LocalQueue to receive messages from queue. * * Incoming messages are stored in the queue for you to retrieve. * *@param queue Name of the queue to subscribe to. *@param flow initial FlowControl for the subscription. *@param name unique destination name for the subscription, defaults to queue name. * If not specified, the queue name is used. */ QPID_CLIENT_EXTERN Subscription subscribe(LocalQueue& localQueue, const std::string& queue, const SubscriptionSettings& settings, const std::string& name=std::string()); /** * Subscribe a MessagesListener to receive messages from queue. * * Provide your own subclass of MessagesListener to process * incoming messages. It will be called for each message received. * *@param listener Listener object to receive messages. *@param queue Name of the queue to subscribe to. *@param name unique destination name for the subscription, defaults to queue name. * If not specified, the queue name is used. */ QPID_CLIENT_EXTERN Subscription subscribe(MessageListener& listener, const std::string& queue, const std::string& name=std::string()); /** * Subscribe a LocalQueue to receive messages from queue. * * Incoming messages are stored in the queue for you to retrieve. * *@param queue Name of the queue to subscribe to. *@param name unique destination name for the subscription, defaults to queue name. * If not specified, the queue name is used. */ QPID_CLIENT_EXTERN Subscription subscribe(LocalQueue& localQueue, const std::string& queue, const std::string& name=std::string()); /** Get a single message from a queue. * (Note: this currently uses a subscription per invocation and is * thus relatively expensive. The subscription is cancelled as * part of each call which can trigger auto-deletion). *@param result is set to the message from the queue. *@param timeout wait up this timeout for a message to appear. *@return true if result was set, false if no message available after timeout. */ QPID_CLIENT_EXTERN bool get(Message& result, const std::string& queue, sys::Duration timeout=0); /** Get a single message from a queue. * (Note: this currently uses a subscription per invocation and is * thus relatively expensive. The subscription is cancelled as * part of each call which can trigger auto-deletion). *@param timeout wait up this timeout for a message to appear. *@return message from the queue. *@throw Exception if the timeout is exceeded. */ QPID_CLIENT_EXTERN Message get(const std::string& queue, sys::Duration timeout=sys::TIME_INFINITE); /** Get a subscription by name. *@throw Exception if not found. */ QPID_CLIENT_EXTERN Subscription getSubscription(const std::string& name) const; /** Cancel a subscription. See also: Subscription.cancel() */ QPID_CLIENT_EXTERN void cancel(const std::string& name); /** Deliver messages in the current thread until stop() is called. * Only one thread may be running in a SubscriptionManager at a time. * @see run */ QPID_CLIENT_EXTERN void run(); /** Start a new thread to deliver messages. * Only one thread may be running in a SubscriptionManager at a time. * @see start */ QPID_CLIENT_EXTERN void start(); /** * Wait for the thread started by a call to start() to complete. */ QPID_CLIENT_EXTERN void wait(); /** If set true, run() will stop when all subscriptions * are cancelled. If false, run will only stop when stop() * is called. True by default. */ QPID_CLIENT_EXTERN void setAutoStop(bool set=true); /** Stop delivery. Causes run() to return, or the thread started with start() to exit. */ QPID_CLIENT_EXTERN void stop(); static const uint32_t UNLIMITED=0xFFFFFFFF; /** Set the flow control for a subscription. */ QPID_CLIENT_EXTERN void setFlowControl(const std::string& name, const FlowControl& flow); /** Set the flow control for a subscription. *@param name: name of the subscription. *@param messages: message credit. *@param bytes: byte credit. *@param window: if true use window-based flow control. */ QPID_CLIENT_EXTERN void setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window=true); /** Set the default settings for subscribe() calls that don't * include a SubscriptionSettings parameter. */ QPID_CLIENT_EXTERN void setDefaultSettings(const SubscriptionSettings& s); /** Get the default settings for subscribe() calls that don't * include a SubscriptionSettings parameter. */ QPID_CLIENT_EXTERN const SubscriptionSettings& getDefaultSettings() const; /** Get the default settings for subscribe() calls that don't * include a SubscriptionSettings parameter. */ QPID_CLIENT_EXTERN SubscriptionSettings& getDefaultSettings(); /** * Set the default flow control settings for subscribe() calls * that don't include a SubscriptionSettings parameter. * *@param messages: message credit. *@param bytes: byte credit. *@param window: if true use window-based flow control. */ QPID_CLIENT_EXTERN void setFlowControl(uint32_t messages, uint32_t bytes, bool window=true); /** *Set the default accept-mode for subscribe() calls that don't *include a SubscriptionSettings parameter. */ QPID_CLIENT_EXTERN void setAcceptMode(AcceptMode mode); /** * Set the default acquire-mode subscribe()s that don't specify SubscriptionSettings. */ QPID_CLIENT_EXTERN void setAcquireMode(AcquireMode mode); QPID_CLIENT_EXTERN void registerFailoverHandler ( boost::function fh ); QPID_CLIENT_EXTERN Session getSession() const; SubscriptionManager(SubscriptionManagerImpl*); ///<@internal private: typedef SubscriptionManagerImpl Impl; friend class PrivateImplRef; }; /** AutoCancel cancels a subscription in its destructor */ class AutoCancel { public: AutoCancel(SubscriptionManager&, const std::string& tag); ~AutoCancel(); private: SubscriptionManager& sm; std::string tag; }; }} // namespace qpid::client #endif /*!QPID_CLIENT_SUBSCRIPTIONMANAGER_H*/