diff options
| author | Gordon Sim <gsim@apache.org> | 2008-11-29 16:01:54 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2008-11-29 16:01:54 +0000 |
| commit | 92f646b3c0b5fa9e4e243d7402a7488cb4e17533 (patch) | |
| tree | c29d3f8670631b3f0da9cf1819864318fac8007f /qpid/cpp/src | |
| parent | 16578074ba43ea5858c85be75665222a4af526e8 (diff) | |
| download | qpid-python-92f646b3c0b5fa9e4e243d7402a7488cb4e17533.tar.gz | |
QPID-1280: fixed performance regression for multiple subscribers on shared queue
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@721685 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 46 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueListeners.cpp | 73 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueListeners.h | 68 |
5 files changed, 169 insertions, 27 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 60e163e8af..04541c8dc2 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -347,6 +347,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Exchange.cpp \ qpid/broker/Queue.cpp \ qpid/broker/QueueCleaner.cpp \ + qpid/broker/QueueListeners.cpp \ qpid/broker/PersistableMessage.cpp \ qpid/broker/Bridge.cpp \ qpid/broker/Connection.cpp \ @@ -473,6 +474,7 @@ nobase_include_HEADERS = \ qpid/broker/SessionAdapter.h \ qpid/broker/Exchange.h \ qpid/broker/Queue.h \ + qpid/broker/QueueListeners.h \ qpid/broker/QueueCleaner.h \ qpid/broker/BrokerSingleton.h \ qpid/broker/Bridge.h \ diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index d664e6e141..b090ffef43 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -109,12 +109,12 @@ Queue::~Queue() void Queue::notifyDurableIOComplete() { - Listeners copy; + QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); - listeners.swap(copy); + listeners.populate(copy); } - for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); + copy.notify(); } bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) @@ -187,14 +187,14 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ void Queue::requeue(const QueuedMessage& msg){ if (policy.get() && !policy->isEnqueued(msg)) return; - Listeners copy; + QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); - listeners.swap(copy); + listeners.populate(copy); } - for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); + copy.notify(); } void Queue::clearLVQIndex(const QueuedMessage& msg){ @@ -240,7 +240,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c) if (messages.empty()) { //no message available, register consumer for notification //when this changes - addListener(c); + listeners.addListener(c); return false; } else { QueuedMessage msg = getFront(); @@ -248,7 +248,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c) //though a message is on the queue, it has not yet been //enqueued and so is not available for consumption yet, //register consumer for notification when this changes - addListener(c); + listeners.addListener(c); return false; } else { //check that consumer has sufficient credit for the @@ -266,7 +266,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) Mutex::ScopedLock locker(messageLock); if (messages.empty()) { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); - addListener(c); + listeners.addListener(c); return false; } else { QueuedMessage msg = getFront(); @@ -323,15 +323,15 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c) void Queue::removeListener(Consumer::shared_ptr c) { - Mutex::ScopedLock locker(messageLock); - Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c); - if (i != listeners.end()) listeners.erase(i); -} - -void Queue::addListener(Consumer::shared_ptr c) -{ - Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c); - if (i == listeners.end()) listeners.push_back(c); + QueueListeners::NotificationSet set; + { + Mutex::ScopedLock locker(messageLock); + listeners.removeListener(c); + if (messages.size()) { + listeners.populate(set); + } + } + set.notify(); } bool Queue::dispatch(Consumer::shared_ptr c) @@ -361,7 +361,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) { return true; } } - addListener(c); + listeners.addListener(c); return false; } @@ -491,7 +491,7 @@ void Queue::popMsg(QueuedMessage& qmsg) } void Queue::push(boost::intrusive_ptr<Message>& msg){ - Listeners copy; + QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); @@ -505,17 +505,17 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){ i = lvq.find(key); if (i == lvq.end()){ messages.push_back(qm); - listeners.swap(copy); + listeners.populate(copy); lvq[key] = msg; }else { i->second->setReplacementMessage(msg,this); } }else { messages.push_back(qm); - listeners.swap(copy); + listeners.populate(copy); } } - for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); + copy.notify(); } QueuedMessage Queue::getFront() diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 76d9a59c3e..1f619c8812 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -27,6 +27,7 @@ #include "PersistableQueue.h" #include "QueuePolicy.h" #include "QueueBindings.h" +#include "QueueListeners.h" #include "RateTracker.h" #include "qpid/framing/FieldTable.h" @@ -64,7 +65,6 @@ namespace qpid { class Queue : public boost::enable_shared_from_this<Queue>, public PersistableQueue, public management::Manageable { - typedef std::list<Consumer::shared_ptr> Listeners; typedef std::deque<QueuedMessage> Messages; typedef std::map<string,boost::intrusive_ptr<Message> > LVQ; @@ -80,7 +80,7 @@ namespace qpid { bool inLastNodeFailure; std::string traceId; std::vector<std::string> traceExclude; - Listeners listeners; + QueueListeners listeners; Messages messages; LVQ lvq; mutable qpid::sys::Mutex consumerLock; @@ -94,7 +94,7 @@ namespace qpid { boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; qmf::org::apache::qpid::broker::Queue* mgmtObject; -RateTracker dequeueTracker; + RateTracker dequeueTracker; void push(boost::intrusive_ptr<Message>& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); @@ -104,7 +104,6 @@ RateTracker dequeueTracker; bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); void removeListener(Consumer::shared_ptr); - void addListener(Consumer::shared_ptr); bool isExcluded(boost::intrusive_ptr<Message>& msg); diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.cpp b/qpid/cpp/src/qpid/broker/QueueListeners.cpp new file mode 100644 index 0000000000..7baca7d0f4 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueueListeners.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "QueueListeners.h" +#include <boost/bind.hpp> + +namespace qpid { +namespace broker { + +void QueueListeners::addListener(Consumer::shared_ptr c) +{ + if (c->preAcquires()) { + add(consumers, c); + } else { + add(browsers, c); + } +} + +void QueueListeners::removeListener(Consumer::shared_ptr c) +{ + if (c->preAcquires()) { + remove(consumers, c); + } else { + remove(browsers, c); + } +} + +void QueueListeners::populate(NotificationSet& set) +{ + if (consumers.size()) { + set.consumer = consumers.front(); + consumers.pop_front(); + } else { + browsers.swap(set.browsers); + } +} + +void QueueListeners::add(Listeners& listeners, Consumer::shared_ptr c) +{ + Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c); + if (i == listeners.end()) listeners.push_back(c); +} + +void QueueListeners::remove(Listeners& listeners, Consumer::shared_ptr c) +{ + Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c); + if (i != listeners.end()) listeners.erase(i); +} + +void QueueListeners::NotificationSet::notify() +{ + if (consumer) consumer->notify(); + else for_each(browsers.begin(), browsers.end(), boost::mem_fn(&Consumer::notify)); +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.h b/qpid/cpp/src/qpid/broker/QueueListeners.h new file mode 100644 index 0000000000..53ed6a17e4 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/QueueListeners.h @@ -0,0 +1,68 @@ +#ifndef QPID_BROKER_QUEUELISTENERS_H +#define QPID_BROKER_QUEUELISTENERS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Consumer.h" +#include <list> + +namespace qpid { +namespace broker { + +/** + * Track and notify components that wish to be notified of messages + * that become available on a queue. + * + * None of the methods defined here are protected by locking. However + * the populate method allows a 'snapshot' to be taken of the + * listeners to be notified. NotificationSet::notify() may then be + * called outside of any lock that protects the QueueListeners + * instance from concurrent access. + */ +class QueueListeners +{ + public: + typedef std::list<Consumer::shared_ptr> Listeners; + + class NotificationSet + { + public: + void notify(); + private: + Listeners browsers; + Consumer::shared_ptr consumer; + friend class QueueListeners; + }; + + void addListener(Consumer::shared_ptr); + void removeListener(Consumer::shared_ptr); + void populate(NotificationSet&); + private: + Listeners consumers; + Listeners browsers; + + void add(Listeners&, Consumer::shared_ptr); + void remove(Listeners&, Consumer::shared_ptr); + +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_QUEUELISTENERS_H*/ |
