diff options
author | Alan Conway <aconway@apache.org> | 2009-05-04 17:22:33 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-05-04 17:22:33 +0000 |
commit | cf3a9eb8cf578be00f0556ff5d93bfdf7c12aec8 (patch) | |
tree | 2b67937230ab3aaa1770f85125a77376d616dc05 /cpp/src/qpid/client/SubscriptionManager.cpp | |
parent | 2407f9f523b0eeeb8988e30c797fdb46039fc3d3 (diff) | |
download | qpid-python-cf3a9eb8cf578be00f0556ff5d93bfdf7c12aec8.tar.gz |
Applied PIMPL pattern to SubscriptionManager.
Cleaned up some sloppy #includes.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@771366 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/SubscriptionManager.cpp')
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 129 |
1 files changed, 35 insertions, 94 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 999b9c6ba7..06aa0dcdf8 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -18,144 +18,85 @@ * under the License. * */ -#ifndef _Subscription_ -#define _Subscription_ #include "SubscriptionManager.h" -#include "SubscriptionImpl.h" -#include "LocalQueueImpl.h" +#include "SubscriptionManagerImpl.h" #include "PrivateImplRef.h" -#include <qpid/client/Dispatcher.h> -#include <qpid/client/Session.h> -#include <qpid/client/MessageListener.h> -#include <qpid/framing/Uuid.h> -#include <set> -#include <sstream> namespace qpid { namespace client { -SubscriptionManager::SubscriptionManager(const Session& s) - : dispatcher(s), session(s), autoStop(true) -{} +typedef PrivateImplRef<SubscriptionManager> PI; + +SubscriptionManager::SubscriptionManager(const Session& s) { PI::ctor(*this, new SubscriptionManagerImpl(s)); } +SubscriptionManager::SubscriptionManager(SubscriptionManagerImpl* i) { PI::ctor(*this, i); } +SubscriptionManager::SubscriptionManager(const SubscriptionManager& x) : Handle<SubscriptionManagerImpl>() { PI::copy(*this, x); } +SubscriptionManager::~SubscriptionManager() { PI::dtor(*this); } +SubscriptionManager& SubscriptionManager::operator=(const SubscriptionManager& x) { return PI::assign(*this, x); } Subscription SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n) -{ - sys::Mutex::ScopedLock l(lock); - std::string name=n.empty() ? q:n; - boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, &listener); - dispatcher.listen(si); - //issue subscription request after listener is registered with dispatcher - si->subscribe(); - return subscriptions[name] = Subscription(si.get()); -} +{ return impl->subscribe(listener, q, ss, n); } Subscription SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n) -{ - sys::Mutex::ScopedLock l(lock); - std::string name=n.empty() ? q:n; - boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0); - boost::intrusive_ptr<LocalQueueImpl> lqi = PrivateImplRef<LocalQueue>::get(lq); - lqi->queue=si->divert(); - si->subscribe(); - lqi->subscription = Subscription(si.get()); - return subscriptions[name] = lqi->subscription; -} +{ return impl->subscribe(lq, q, ss, n); } + Subscription SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& n) -{ - return subscribe(listener, q, defaultSettings, n); -} +{ return impl->subscribe(listener, q, n); } + Subscription SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const std::string& n) -{ - return subscribe(lq, q, defaultSettings, n); -} +{ return impl->subscribe(lq, q, n); } -void SubscriptionManager::cancel(const std::string& dest) -{ - sys::Mutex::ScopedLock l(lock); - std::map<std::string, Subscription>::iterator i = subscriptions.find(dest); - if (i != subscriptions.end()) { - sync(session).messageCancel(dest); - dispatcher.cancel(dest); - Subscription s = i->second; - if (s.isValid()) s.impl->cancelDiversion(); - subscriptions.erase(i); - } -} +void SubscriptionManager::cancel(const std::string& dest) { return impl->cancel(dest); } -void SubscriptionManager::setAutoStop(bool set) { autoStop=set; } +void SubscriptionManager::setAutoStop(bool set) { impl->setAutoStop(set); } -void SubscriptionManager::run() -{ - dispatcher.setAutoStop(autoStop); - dispatcher.run(); -} +void SubscriptionManager::run() { impl->run(); } -void SubscriptionManager::start() -{ - dispatcher.setAutoStop(autoStop); - dispatcher.start(); -} +void SubscriptionManager::start() { impl->start(); } -void SubscriptionManager::wait() -{ - dispatcher.wait(); -} +void SubscriptionManager::wait() { impl->wait(); } -void SubscriptionManager::stop() -{ - dispatcher.stop(); -} +void SubscriptionManager::stop() { impl->stop(); } bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Duration timeout) { - LocalQueue lq; - std::string unique = framing::Uuid(true).str(); - subscribe(lq, queue, SubscriptionSettings(FlowControl::messageCredit(1)), unique); - AutoCancel ac(*this, unique); - //first wait for message to be delivered if a timeout has been specified - if (timeout && lq.get(result, timeout)) - return true; - //make sure message is not on queue before final check - sync(session).messageFlush(unique); - return lq.get(result, 0); + return impl->get(result, queue, timeout); } Message SubscriptionManager::get(const std::string& queue, sys::Duration timeout) { - Message result; - if (!get(result, queue, timeout)) - throw Exception("Timed out waiting for a message"); - return result; + return impl->get(queue, timeout); } -Session SubscriptionManager::getSession() const { return session; } +Session SubscriptionManager::getSession() const { return impl->getSession(); } Subscription SubscriptionManager::getSubscription(const std::string& name) const { - sys::Mutex::ScopedLock l(lock); - std::map<std::string, Subscription>::const_iterator i = subscriptions.find(name); - if (i == subscriptions.end()) - throw Exception(QPID_MSG("Subscription not found: " << name)); - return i->second; + return impl->getSubscription(name); } - void SubscriptionManager::registerFailoverHandler (boost::function<void ()> fh) { - dispatcher.registerFailoverHandler(fh); + impl->registerFailoverHandler(fh); } void SubscriptionManager::setFlowControl(const std::string& name, const FlowControl& flow) { - getSubscription(name).setFlowControl(flow); + impl->setFlowControl(name, flow); } void SubscriptionManager::setFlowControl(const std::string& name, uint32_t messages, uint32_t bytes, bool window) { - setFlowControl(name, FlowControl(messages, bytes, window)); + impl->setFlowControl(name, FlowControl(messages, bytes, window)); } +void SubscriptionManager::setFlowControl(uint32_t messages, uint32_t bytes, bool window) { + impl->setFlowControl(messages, bytes, window); +} + +void SubscriptionManager::setAcceptMode(AcceptMode mode) { impl->setAcceptMode(mode); } +void SubscriptionManager::setAcquireMode(AcquireMode mode) { impl->setAcquireMode(mode); } + }} // namespace qpid::client -#endif + |