From 449d9d326b7cea02934c3e7e8ea7e84a817e47ac Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 17 Feb 2009 20:18:38 +0000 Subject: Minor fixes. client/SubscriptionManager: made it thread safe, was causing latencytest to crash with --rate and --time-limit. cluster/Cluster.cpp: don't call cpg_leave during shutdown. Not required and a problem if shutdown was caused by a cpg error. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@745226 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/SubscriptionManager.cpp | 8 ++++++-- cpp/src/qpid/client/SubscriptionManager.h | 13 +++++-------- 2 files changed, 11 insertions(+), 10 deletions(-) (limited to 'cpp/src/qpid/client') diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 1d1d23056e..b016109ead 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -41,6 +41,7 @@ SubscriptionManager::SubscriptionManager(const Session& s) Subscription SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { + sys::Mutex::ScopedLock l(lock); std::string name=n.empty() ? q:n; boost::intrusive_ptr si = new SubscriptionImpl(*this, q, ss, name, &listener); dispatcher.listen(si); @@ -52,6 +53,7 @@ Subscription SubscriptionManager::subscribe( Subscription SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { + sys::Mutex::ScopedLock l(lock); std::string name=n.empty() ? q:n; boost::intrusive_ptr si = new SubscriptionImpl(*this, q, ss, name, 0); lq.queue=si->divert(); @@ -74,13 +76,14 @@ Subscription SubscriptionManager::subscribe( void SubscriptionManager::cancel(const std::string& dest) { + sys::Mutex::ScopedLock l(lock); std::map::iterator i = subscriptions.find(dest); if (i != subscriptions.end()) { sync(session).messageCancel(dest); dispatcher.cancel(dest); Subscription s = i->second; - if (s.isValid()) subscriptions[dest].impl->cancelDiversion(); - subscriptions.erase(dest); + if (s.isValid()) s.impl->cancelDiversion(); + subscriptions.erase(i); } } @@ -131,6 +134,7 @@ Message SubscriptionManager::get(const std::string& queue, sys::Duration timeout Session SubscriptionManager::getSession() const { return session; } Subscription SubscriptionManager::getSubscription(const std::string& name) const { + sys::Mutex::ScopedLock l(lock); std::map::const_iterator i = subscriptions.find(name); if (i == subscriptions.end()) throw Exception(QPID_MSG("Subscription not found: " << name)); diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 89823a11bc..6b45092931 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -95,14 +95,6 @@ namespace client { */ class SubscriptionManager : public sys::Runnable { - typedef sys::Mutex::ScopedLock Lock; - typedef sys::Mutex::ScopedUnlock Unlock; - - qpid::client::Dispatcher dispatcher; - qpid::client::AsyncSession session; - bool autoStop; - SubscriptionSettings defaultSettings; - public: /** Create a new SubscriptionManager associated with a session */ SubscriptionManager(const Session& session); @@ -271,6 +263,11 @@ class SubscriptionManager : public sys::Runnable Session getSession() const; private: + mutable sys::Mutex lock; + qpid::client::Dispatcher dispatcher; + qpid::client::AsyncSession session; + bool autoStop; + SubscriptionSettings defaultSettings; std::map subscriptions; }; -- cgit v1.2.1