summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/SubscriptionManager.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-05-04 17:22:33 +0000
committerAlan Conway <aconway@apache.org>2009-05-04 17:22:33 +0000
commitcf3a9eb8cf578be00f0556ff5d93bfdf7c12aec8 (patch)
tree2b67937230ab3aaa1770f85125a77376d616dc05 /cpp/src/qpid/client/SubscriptionManager.cpp
parent2407f9f523b0eeeb8988e30c797fdb46039fc3d3 (diff)
downloadqpid-python-cf3a9eb8cf578be00f0556ff5d93bfdf7c12aec8.tar.gz
Applied PIMPL pattern to SubscriptionManager.
Cleaned up some sloppy #includes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@771366 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp129
1 files changed, 35 insertions, 94 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 999b9c6ba7..06aa0dcdf8 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -18,144 +18,85 @@
* under the License.
*
*/
-#ifndef _Subscription_
-#define _Subscription_
#include "SubscriptionManager.h"
-#include "SubscriptionImpl.h"
-#include "LocalQueueImpl.h"
+#include "SubscriptionManagerImpl.h"
#include "PrivateImplRef.h"
-#include <qpid/client/Dispatcher.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/framing/Uuid.h>
-#include <set>
-#include <sstream>
namespace qpid {
namespace client {
-SubscriptionManager::SubscriptionManager(const Session& s)
- : dispatcher(s), session(s), autoStop(true)
-{}
+typedef PrivateImplRef<SubscriptionManager> PI;
+
+SubscriptionManager::SubscriptionManager(const Session& s) { PI::ctor(*this, new SubscriptionManagerImpl(s)); }
+SubscriptionManager::SubscriptionManager(SubscriptionManagerImpl* i) { PI::ctor(*this, i); }
+SubscriptionManager::SubscriptionManager(const SubscriptionManager& x) : Handle<SubscriptionManagerImpl>() { PI::copy(*this, x); }
+SubscriptionManager::~SubscriptionManager() { PI::dtor(*this); }
+SubscriptionManager& SubscriptionManager::operator=(const SubscriptionManager& x) { return PI::assign(*this, x); }
Subscription SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
-{
- sys::Mutex::ScopedLock l(lock);
- std::string name=n.empty() ? q:n;
- boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener);
- dispatcher.listen(si);
- //issue subscription request after listener is registered with dispatcher
- si->subscribe();
- return subscriptions[name] = Subscription(si.get());
-}
+{ return impl->subscribe(listener, q, ss, n); }
Subscription SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n)
-{
- sys::Mutex::ScopedLock l(lock);
- std::string name=n.empty() ? q:n;
- boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0);
- boost::intrusive_ptr<LocalQueueImpl> lqi = PrivateImplRef<LocalQueue>::get(lq);
- lqi->queue=si->divert();
- si->subscribe();
- lqi->subscription = Subscription(si.get());
- return subscriptions[name] = lqi->subscription;
-}
+{ return impl->subscribe(lq, q, ss, n); }
+
Subscription SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& n)
-{
- return subscribe(listener, q, defaultSettings, n);
-}
+{ return impl->subscribe(listener, q, n); }
+
Subscription SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const std::string& n)
-{
- return subscribe(lq, q, defaultSettings, n);
-}
+{ return impl->subscribe(lq, q, n); }
-void SubscriptionManager::cancel(const std::string& dest)
-{
- sys::Mutex::ScopedLock l(lock);
- std::map<std::string, Subscription>::iterator i = subscriptions.find(dest);
- if (i != subscriptions.end()) {
- sync(session).messageCancel(dest);
- dispatcher.cancel(dest);
- Subscription s = i->second;
- if (s.isValid()) s.impl->cancelDiversion();
- subscriptions.erase(i);
- }
-}
+void SubscriptionManager::cancel(const std::string& dest) { return impl->cancel(dest); }
-void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }
+void SubscriptionManager::setAutoStop(bool set) { impl->setAutoStop(set); }
-void SubscriptionManager::run()
-{
- dispatcher.setAutoStop(autoStop);
- dispatcher.run();
-}
+void SubscriptionManager::run() { impl->run(); }
-void SubscriptionManager::start()
-{
- dispatcher.setAutoStop(autoStop);
- dispatcher.start();
-}
+void SubscriptionManager::start() { impl->start(); }
-void SubscriptionManager::wait()
-{
- dispatcher.wait();
-}
+void SubscriptionManager::wait() { impl->wait(); }
-void SubscriptionManager::stop()
-{
- dispatcher.stop();
-}
+void SubscriptionManager::stop() { impl->stop(); }
bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) {
- LocalQueue lq;
- std::string unique = framing::Uuid(true).str();
- subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique);
- AutoCancel ac(*this, unique);
- //first wait for message to be delivered if a timeout has been specified
- if (timeout && lq.get(result, timeout))
- return true;
- //make sure message is not on queue before final check
- sync(session).messageFlush(unique);
- return lq.get(result, 0);
+ return impl->get(result, queue, timeout);
}
Message SubscriptionManager::get(const std::string& queue, sys::Duration timeout) {
- Message result;
- if (!get(result, queue, timeout))
- throw Exception("Timed out waiting for a message");
- return result;
+ return impl->get(queue, timeout);
}
-Session SubscriptionManager::getSession() const { return session; }
+Session SubscriptionManager::getSession() const { return impl->getSession(); }
Subscription SubscriptionManager::getSubscription(const std::string& name) const {
- sys::Mutex::ScopedLock l(lock);
- std::map<std::string, Subscription>::const_iterator i = subscriptions.find(name);
- if (i == subscriptions.end())
- throw Exception(QPID_MSG("Subscription not found: " << name));
- return i->second;
+ return impl->getSubscription(name);
}
-
void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) {
- dispatcher.registerFailoverHandler(fh);
+ impl->registerFailoverHandler(fh);
}
void SubscriptionManager::setFlowControl(const std::string& name, const FlowControl& flow) {
- getSubscription(name).setFlowControl(flow);
+ impl->setFlowControl(name, flow);
}
void SubscriptionManager::setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window) {
- setFlowControl(name, FlowControl(messages, bytes, window));
+ impl->setFlowControl(name, FlowControl(messages, bytes, window));
}
+void SubscriptionManager::setFlowControl(uint32_t messages, uint32_t bytes, bool window) {
+ impl->setFlowControl(messages, bytes, window);
+}
+
+void SubscriptionManager::setAcceptMode(AcceptMode mode) { impl->setAcceptMode(mode); }
+void SubscriptionManager::setAcquireMode(AcquireMode mode) { impl->setAcquireMode(mode); }
+
}} // namespace qpid::client
-#endif
+