diff options
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerObservers.h | 18 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionObservers.h | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Observers.h | 32 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 78 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 16 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFactory.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueObservers.h | 77 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueReplicator.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueSnapshots.h | 8 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 6 |
13 files changed, 138 insertions, 116 deletions
diff --git a/qpid/cpp/src/qpid/broker/BrokerObservers.h b/qpid/cpp/src/qpid/broker/BrokerObservers.h index 8a9bdd4fd4..67427adcb1 100644 --- a/qpid/cpp/src/qpid/broker/BrokerObservers.h +++ b/qpid/cpp/src/qpid/broker/BrokerObservers.h @@ -24,19 +24,14 @@ #include "BrokerObserver.h" #include "Observers.h" -#include "qpid/sys/Mutex.h" namespace qpid { namespace broker { /** - * A broker observer that delegates to a collection of broker observers. - * - * THREAD SAFE + * Collection of BrokerObserver. */ -class BrokerObservers : public BrokerObserver, - public Observers<BrokerObserver> -{ +class BrokerObservers : public Observers<BrokerObserver> { public: void queueCreate(const boost::shared_ptr<Queue>& q) { each(boost::bind(&BrokerObserver::queueCreate, _1, q)); @@ -54,15 +49,13 @@ class BrokerObservers : public BrokerObserver, const boost::shared_ptr<Queue>& queue, const std::string& key, const framing::FieldTable& args) { - each(boost::bind( - &BrokerObserver::bind, _1, exchange, queue, key, args)); + each(boost::bind(&BrokerObserver::bind, _1, exchange, queue, key, args)); } void unbind(const boost::shared_ptr<Exchange>& exchange, const boost::shared_ptr<Queue>& queue, const std::string& key, const framing::FieldTable& args) { - each(boost::bind( - &BrokerObserver::unbind, _1, exchange, queue, key, args)); + each(boost::bind(&BrokerObserver::unbind, _1, exchange, queue, key, args)); } void startTx(const boost::intrusive_ptr<TxBuffer>& tx) { each(boost::bind(&BrokerObserver::startTx, _1, tx)); @@ -70,6 +63,9 @@ class BrokerObservers : public BrokerObserver, void startDtx(const boost::intrusive_ptr<DtxBuffer>& dtx) { each(boost::bind(&BrokerObserver::startDtx, _1, dtx)); } + + private: + template <class F> void each(F f) { Observers<BrokerObserver>::each(f); } }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ConnectionObservers.h b/qpid/cpp/src/qpid/broker/ConnectionObservers.h index e9014c80c3..8b9fb67aa5 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionObservers.h +++ b/qpid/cpp/src/qpid/broker/ConnectionObservers.h @@ -30,11 +30,8 @@ namespace broker { /** * A collection of connection observers. - * Calling a ConnectionObserver function will call that function on each observer. - * THREAD SAFE. */ -class ConnectionObservers : public ConnectionObserver, - public Observers<ConnectionObserver> +class ConnectionObservers : public Observers<ConnectionObserver> { public: void connection(Connection& c) { diff --git a/qpid/cpp/src/qpid/broker/Observers.h b/qpid/cpp/src/qpid/broker/Observers.h index d50c21e559..b7b26a0d38 100644 --- a/qpid/cpp/src/qpid/broker/Observers.h +++ b/qpid/cpp/src/qpid/broker/Observers.h @@ -24,7 +24,7 @@ #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> -#include <vector> +#include <set> #include <algorithm> namespace qpid { @@ -37,19 +37,21 @@ template <class Observer> class Observers { public: - void add(boost::shared_ptr<Observer> observer) { + typedef boost::shared_ptr<Observer> ObserverPtr; + + void add(const ObserverPtr& observer) { sys::Mutex::ScopedLock l(lock); - observers.push_back(observer); + observers.insert(observer); } - void remove(boost::shared_ptr<Observer> observer) { + void remove(const ObserverPtr& observer) { sys::Mutex::ScopedLock l(lock); - typename List::iterator i = std::find(observers.begin(), observers.end(), observer); - observers.erase(i); + observers.erase(observer) ; } + /** Iterate over the observers. */ template <class F> void each(F f) { - List copy; + Set copy; // Make a copy and iterate outside the lock. { sys::Mutex::ScopedLock l(lock); copy = observers; @@ -58,10 +60,20 @@ class Observers } protected: - typedef std::vector<boost::shared_ptr<Observer> > List; + typedef std::set<ObserverPtr> Set; + Observers() : lock(myLock) {} + + /** Specify a lock for the Observers to use */ + Observers(sys::Mutex& l) : lock(l) {} + + /** Iterate over the observers without taking the lock, caller must hold the lock */ + template <class F> void each(F f, const sys::Mutex::ScopedLock&) { + std::for_each(observers.begin(), observers.end(), f); + } - sys::Mutex lock; - List observers; + mutable sys::Mutex myLock; + mutable sys::Mutex& lock; + Set observers; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 3eaab30394..74d23fdabf 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -195,6 +195,7 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, persistenceId(0), settings(b ? merge(_settings, b->getOptions()) : _settings), eventMode(0), + observers(name, messageLock), broker(b), deleted(false), barrier(*this), @@ -988,68 +989,38 @@ void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, Sc { current -= QueueDepth(1, msg.getMessageSize()); mgntDeqStats(msg, mgmtObject, brokerMgmtObject); - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->dequeued(msg); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of dequeue for queue " << getName() << ": " << e.what()); - } - } + observers.dequeued(msg, lock); if (autodelete && isEmpty(lock)) autodelete->check(lock); } /** updates queue observers when a message has become unavailable for transfer. * Requires messageLock be held by caller. */ -void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock&) +void Queue::observeAcquire(const Message& msg, const Mutex::ScopedLock& l) { - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->acquired(msg); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of message removal for queue " << getName() << ": " << e.what()); - } - } + observers.acquired(msg, l); } /** updates queue observers when a message has become re-available for transfer * Requires messageLock be held by caller. */ -void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock&) +void Queue::observeRequeue(const Message& msg, const Mutex::ScopedLock& l) { - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->requeued(msg); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of message requeue for queue " << getName() << ": " << e.what()); - } - } + observers.requeued(msg, l); } /** updates queue observers when a new consumer has subscribed to this queue. */ -void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) +void Queue::observeConsumerAdd( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l) { - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->consumerAdded(c); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of new consumer for queue " << getName() << ": " << e.what()); - } - } + observers.consumerAdded(c, l); } /** updates queue observers when a consumer has unsubscribed from this queue. */ -void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock&) +void Queue::observeConsumerRemove( const Consumer& c, const qpid::sys::Mutex::ScopedLock& l) { - for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { - try{ - (*i)->consumerRemoved(c); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of removed consumer for queue " << getName() << ": " << e.what()); - } - } + observers.consumerRemoved(c, l); } @@ -1133,12 +1104,9 @@ void Queue::destroyed() if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); notifyDeleted(); { - Mutex::ScopedLock lock(messageLock); - for_each(observers.begin(), observers.end(), - boost::bind(&QueueObserver::destroy, _1)); - observers.clear(); + Mutex::ScopedLock l(messageLock); + observers.destroy(l); } - if (mgmtObject != 0) { mgmtObject->resourceDestroy(); if (brokerMgmtObject) @@ -1513,15 +1481,9 @@ void Queue::recoveryComplete(ExchangeRegistry& exchanges) /** updates queue observers and state when a message has become available for transfer * Requires messageLock be held by caller. */ -void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock&) +void Queue::observeEnqueue(const Message& m, const Mutex::ScopedLock& l) { - for (Observers::iterator i = observers.begin(); i != observers.end(); ++i) { - try { - (*i)->enqueued(m); - } catch (const std::exception& e) { - QPID_LOG(warning, "Exception on notification of enqueue for queue " << getName() << ": " << e.what()); - } - } + observers.enqueued(m, l); mgntEnqStats(m, mgmtObject, brokerMgmtObject); } @@ -1538,18 +1500,6 @@ bool Queue::isDeleted() const return deleted; } -void Queue::addObserver(boost::shared_ptr<QueueObserver> observer) -{ - Mutex::ScopedLock lock(messageLock); - observers.insert(observer); -} - -void Queue::removeObserver(boost::shared_ptr<QueueObserver> observer) -{ - Mutex::ScopedLock lock(messageLock); - observers.erase(observer); -} - void Queue::flush() { ScopedUse u(barrier); diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 3622b06dbd..6e7b17e1c5 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -31,7 +31,7 @@ #include "qpid/broker/PersistableQueue.h" #include "qpid/broker/QueueBindings.h" #include "qpid/broker/QueueListeners.h" -#include "qpid/broker/QueueObserver.h" +#include "qpid/broker/QueueObservers.h" #include "qpid/broker/QueueSettings.h" #include "qpid/broker/TxOp.h" @@ -169,7 +169,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, bool eligible; }; - typedef std::set< boost::shared_ptr<QueueObserver> > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; typedef boost::function1<void, Message&> MessageFunctor; @@ -211,7 +210,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, qmf::org::apache::qpid::broker::Broker::shared_ptr brokerMgmtObject; sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. int eventMode; - Observers observers; + QueueObservers observers; MessageInterceptors interceptors; std::string seqNoKey; Broker* broker; @@ -446,12 +445,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, bindings.eachBinding(f); } - /** Apply f to each Observer on the queue */ - template <class F> void eachObserver(F f) { - sys::Mutex::ScopedLock l(messageLock); - std::for_each<Observers::iterator, F>(observers.begin(), observers.end(), f); - } - /** * Set the sequence number for the back of the queue, the * next message enqueued will be pos+1. @@ -484,11 +477,12 @@ class Queue : public boost::enable_shared_from_this<Queue>, SubscriptionType type=CONSUMER ); - QPID_BROKER_EXTERN void addObserver(boost::shared_ptr<QueueObserver>); - QPID_BROKER_EXTERN void removeObserver(boost::shared_ptr<QueueObserver>); + QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); QPID_BROKER_EXTERN MessageInterceptors& getMessageInterceptors() { return interceptors; } + QPID_BROKER_EXTERN QueueObservers& getObservers() { return observers; } + /** * Notify queue that recovery has completed. */ diff --git a/qpid/cpp/src/qpid/broker/QueueFactory.cpp b/qpid/cpp/src/qpid/broker/QueueFactory.cpp index e5d9431555..d31b120cae 100644 --- a/qpid/cpp/src/qpid/broker/QueueFactory.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFactory.cpp @@ -90,7 +90,7 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que if (settings.groupKey.size()) { boost::shared_ptr<MessageGroupManager> mgm(MessageGroupManager::create( name, *(queue->messages), settings)); queue->allocator = mgm; - queue->addObserver(mgm); + queue->getObservers().add(mgm); } else { queue->allocator = boost::shared_ptr<MessageDistributor>(new FifoDistributor( *(queue->messages) )); } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 3a609d67f1..9c215d197f 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -244,7 +244,7 @@ void QueueFlowLimit::observe(Queue& queue) } /* set up the observer */ - queue.addObserver(shared_from_this()); + queue.getObservers().add(shared_from_this()); } /** returns ptr to a QueueFlowLimit, else 0 if no limit */ diff --git a/qpid/cpp/src/qpid/broker/QueueObservers.h b/qpid/cpp/src/qpid/broker/QueueObservers.h new file mode 100644 index 0000000000..1bf5f1f696 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueueObservers.h @@ -0,0 +1,77 @@ +#ifndef QPID_BROKER_QUEUEOBSERVERS_H +#define QPID_BROKER_QUEUEOBSERVERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Observers.h" +#include "QueueObserver.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { + +/** + * A collection of queue observers. + */ +class QueueObservers : public Observers<QueueObserver> { + public: + typedef Observers<QueueObserver> Base; + + // The only public functions are inherited from Observers<QueueObserver> + using Base::each; // Avoid function hiding. + + friend class Queue; + + typedef const sys::Mutex::ScopedLock& Lock; + + QueueObservers(const std::string& q, sys::Mutex& lock) : Base(lock), qname(q) {} + + template <class T> void each(void (QueueObserver::*f)(const T&), const T& arg, const char* fname, Lock l) { + Base::each(boost::bind(&QueueObservers::wrap<T>, this, f, boost::cref(arg), fname, _1), l); + } + + template <class T> void wrap(void (QueueObserver::*f)(const T&), const T& arg, const char* fname, const ObserverPtr& o) { + try { (o.get()->*f)(arg); } + catch (const std::exception& e) { + QPID_LOG(warning, "Exception on " << fname << " for queue " << qname << ": " << e.what()); + } + } + + + // Calls are locked externally by caller. + void enqueued(const Message& m, Lock l) { each(&QueueObserver::enqueued, m, "enqueue", l); } + void dequeued(const Message& m, Lock l) { each(&QueueObserver::dequeued, m, "dequeue", l); } + void acquired(const Message& m, Lock l) { each(&QueueObserver::acquired, m, "acquire", l); } + void requeued(const Message& m, Lock l) { each(&QueueObserver::requeued, m, "requeue", l); } + void consumerAdded(const Consumer& c, Lock l) { each(&QueueObserver::consumerAdded, c, "consumer added", l); } + void consumerRemoved(const Consumer& c, Lock l) { each(&QueueObserver::consumerRemoved, c, "consumer removed", l); } + void destroy(Lock l) { + Base::each(boost::bind(&QueueObserver::destroy, _1), l); + observers.clear(); + } + + std::string qname; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_QUEUEOBSERVERS_H*/ diff --git a/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp b/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp index afb9d9ff4e..41d20342c8 100644 --- a/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp +++ b/qpid/cpp/src/qpid/broker/ThresholdAlerts.cpp @@ -93,7 +93,7 @@ void ThresholdAlerts::observe(Queue& queue, qpid::management::ManagementAgent& a boost::shared_ptr<QueueObserver> observer( new ThresholdAlerts(queue.getName(), agent, ctu, ctd, stu, std, (_ctd == 0 && _std == 0)) ); - queue.addObserver(observer); + queue.getObservers().add(observer); } } diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index d2792e5e17..6ffd53ff21 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -55,8 +55,8 @@ QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) info.printId(os) << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); - queue.addObserver(observer); - // Set first after calling addObserver so we know that the back of the + queue.getObservers().add(observer); + // Set first after adding the observer so we know that the back of the // queue+1 is (or will be) a guarded position. QueuePosition front, back; q.getRange(front, back, broker::REPLICATOR); @@ -86,7 +86,7 @@ void QueueGuard::dequeued(const Message& m) { } void QueueGuard::cancel() { - queue.removeObserver(observer); + queue.getObservers().remove(observer); Mutex::ScopedLock l(lock); if (cancelled) return; QPID_LOG(debug, logPrefix << "Cancelled"); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 50f2ececdb..b43658365c 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -175,7 +175,7 @@ void QueueReplicator::activate() { boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this()))); // Enable callback to destroy() - queue->addObserver( + queue->getObservers().add( boost::shared_ptr<QueueObserver>(new QueueObserver(shared_from_this()))); } diff --git a/qpid/cpp/src/qpid/ha/QueueSnapshots.h b/qpid/cpp/src/qpid/ha/QueueSnapshots.h index 8711f9234c..6612c71f6a 100644 --- a/qpid/cpp/src/qpid/ha/QueueSnapshots.h +++ b/qpid/cpp/src/qpid/ha/QueueSnapshots.h @@ -45,18 +45,14 @@ class QueueSnapshots : public broker::BrokerObserver public: boost::shared_ptr<QueueSnapshot> get(const boost::shared_ptr<broker::Queue>& q) const { boost::shared_ptr<QueueSnapshot> qs; - q->eachObserver( + q->getObservers().each( boost::bind(QueueSnapshots::saveQueueSnapshot, _1, boost::ref(qs))); return qs; } // BrokerObserver overrides. void queueCreate(const boost::shared_ptr<broker::Queue>& q) { - q->addObserver(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot)); - } - - void queueDestroy(const boost::shared_ptr<broker::Queue>& q) { - q->removeObserver(get(q)); + q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot)); } private: diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 95215e1e59..2db7845067 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -150,12 +150,12 @@ void ReplicatingSubscription::initialize() { // However we must attach the observer _before_ we snapshot for // initial dequeues to be sure we don't miss any dequeues // between the snapshot and attaching the observer. - queue->addObserver( + queue->getObservers().add( boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue); // There may be no snapshot if the queue is being deleted concurrently. if (!snapshot) { - queue->removeObserver( + queue->getObservers().remove( boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted"); } @@ -254,7 +254,7 @@ void ReplicatingSubscription::cancel() } QPID_LOG(debug, logPrefix << "Cancelled"); if (primary) primary->removeReplica(*this); - getQueue()->removeObserver( + getQueue()->getObservers().remove( boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this())); guard->cancel(); ConsumerImpl::cancel(); |
