summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-11-29 16:01:54 +0000
committerGordon Sim <gsim@apache.org>2008-11-29 16:01:54 +0000
commit92f646b3c0b5fa9e4e243d7402a7488cb4e17533 (patch)
treec29d3f8670631b3f0da9cf1819864318fac8007f /qpid/cpp/src
parent16578074ba43ea5858c85be75665222a4af526e8 (diff)
downloadqpid-python-92f646b3c0b5fa9e4e243d7402a7488cb4e17533.tar.gz
QPID-1280: fixed performance regression for multiple subscribers on shared queue
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@721685 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp46
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h7
-rw-r--r--qpid/cpp/src/qpid/broker/QueueListeners.cpp73
-rw-r--r--qpid/cpp/src/qpid/broker/QueueListeners.h68
5 files changed, 169 insertions, 27 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 60e163e8af..04541c8dc2 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -347,6 +347,7 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
qpid/broker/QueueCleaner.cpp \
+ qpid/broker/QueueListeners.cpp \
qpid/broker/PersistableMessage.cpp \
qpid/broker/Bridge.cpp \
qpid/broker/Connection.cpp \
@@ -473,6 +474,7 @@ nobase_include_HEADERS = \
qpid/broker/SessionAdapter.h \
qpid/broker/Exchange.h \
qpid/broker/Queue.h \
+ qpid/broker/QueueListeners.h \
qpid/broker/QueueCleaner.h \
qpid/broker/BrokerSingleton.h \
qpid/broker/Bridge.h \
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index d664e6e141..b090ffef43 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -109,12 +109,12 @@ Queue::~Queue()
void Queue::notifyDurableIOComplete()
{
- Listeners copy;
+ QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
- listeners.swap(copy);
+ listeners.populate(copy);
}
- for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+ copy.notify();
}
bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
@@ -187,14 +187,14 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
void Queue::requeue(const QueuedMessage& msg){
if (policy.get() && !policy->isEnqueued(msg)) return;
- Listeners copy;
+ QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
- listeners.swap(copy);
+ listeners.populate(copy);
}
- for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+ copy.notify();
}
void Queue::clearLVQIndex(const QueuedMessage& msg){
@@ -240,7 +240,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c)
if (messages.empty()) {
//no message available, register consumer for notification
//when this changes
- addListener(c);
+ listeners.addListener(c);
return false;
} else {
QueuedMessage msg = getFront();
@@ -248,7 +248,7 @@ bool Queue::checkForMessages(Consumer::shared_ptr c)
//though a message is on the queue, it has not yet been
//enqueued and so is not available for consumption yet,
//register consumer for notification when this changes
- addListener(c);
+ listeners.addListener(c);
return false;
} else {
//check that consumer has sufficient credit for the
@@ -266,7 +266,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
- addListener(c);
+ listeners.addListener(c);
return false;
} else {
QueuedMessage msg = getFront();
@@ -323,15 +323,15 @@ bool Queue::browseNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
void Queue::removeListener(Consumer::shared_ptr c)
{
- Mutex::ScopedLock locker(messageLock);
- Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
- if (i != listeners.end()) listeners.erase(i);
-}
-
-void Queue::addListener(Consumer::shared_ptr c)
-{
- Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
- if (i == listeners.end()) listeners.push_back(c);
+ QueueListeners::NotificationSet set;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ listeners.removeListener(c);
+ if (messages.size()) {
+ listeners.populate(set);
+ }
+ }
+ set.notify();
}
bool Queue::dispatch(Consumer::shared_ptr c)
@@ -361,7 +361,7 @@ bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
return true;
}
}
- addListener(c);
+ listeners.addListener(c);
return false;
}
@@ -491,7 +491,7 @@ void Queue::popMsg(QueuedMessage& qmsg)
}
void Queue::push(boost::intrusive_ptr<Message>& msg){
- Listeners copy;
+ QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
@@ -505,17 +505,17 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
i = lvq.find(key);
if (i == lvq.end()){
messages.push_back(qm);
- listeners.swap(copy);
+ listeners.populate(copy);
lvq[key] = msg;
}else {
i->second->setReplacementMessage(msg,this);
}
}else {
messages.push_back(qm);
- listeners.swap(copy);
+ listeners.populate(copy);
}
}
- for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+ copy.notify();
}
QueuedMessage Queue::getFront()
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index 76d9a59c3e..1f619c8812 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -27,6 +27,7 @@
#include "PersistableQueue.h"
#include "QueuePolicy.h"
#include "QueueBindings.h"
+#include "QueueListeners.h"
#include "RateTracker.h"
#include "qpid/framing/FieldTable.h"
@@ -64,7 +65,6 @@ namespace qpid {
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
- typedef std::list<Consumer::shared_ptr> Listeners;
typedef std::deque<QueuedMessage> Messages;
typedef std::map<string,boost::intrusive_ptr<Message> > LVQ;
@@ -80,7 +80,7 @@ namespace qpid {
bool inLastNodeFailure;
std::string traceId;
std::vector<std::string> traceExclude;
- Listeners listeners;
+ QueueListeners listeners;
Messages messages;
LVQ lvq;
mutable qpid::sys::Mutex consumerLock;
@@ -94,7 +94,7 @@ namespace qpid {
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
-RateTracker dequeueTracker;
+ RateTracker dequeueTracker;
void push(boost::intrusive_ptr<Message>& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -104,7 +104,6 @@ RateTracker dequeueTracker;
bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
void removeListener(Consumer::shared_ptr);
- void addListener(Consumer::shared_ptr);
bool isExcluded(boost::intrusive_ptr<Message>& msg);
diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.cpp b/qpid/cpp/src/qpid/broker/QueueListeners.cpp
new file mode 100644
index 0000000000..7baca7d0f4
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/QueueListeners.cpp
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueListeners.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+
+void QueueListeners::addListener(Consumer::shared_ptr c)
+{
+ if (c->preAcquires()) {
+ add(consumers, c);
+ } else {
+ add(browsers, c);
+ }
+}
+
+void QueueListeners::removeListener(Consumer::shared_ptr c)
+{
+ if (c->preAcquires()) {
+ remove(consumers, c);
+ } else {
+ remove(browsers, c);
+ }
+}
+
+void QueueListeners::populate(NotificationSet& set)
+{
+ if (consumers.size()) {
+ set.consumer = consumers.front();
+ consumers.pop_front();
+ } else {
+ browsers.swap(set.browsers);
+ }
+}
+
+void QueueListeners::add(Listeners& listeners, Consumer::shared_ptr c)
+{
+ Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
+ if (i == listeners.end()) listeners.push_back(c);
+}
+
+void QueueListeners::remove(Listeners& listeners, Consumer::shared_ptr c)
+{
+ Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
+ if (i != listeners.end()) listeners.erase(i);
+}
+
+void QueueListeners::NotificationSet::notify()
+{
+ if (consumer) consumer->notify();
+ else for_each(browsers.begin(), browsers.end(), boost::mem_fn(&Consumer::notify));
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/QueueListeners.h b/qpid/cpp/src/qpid/broker/QueueListeners.h
new file mode 100644
index 0000000000..53ed6a17e4
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/QueueListeners.h
@@ -0,0 +1,68 @@
+#ifndef QPID_BROKER_QUEUELISTENERS_H
+#define QPID_BROKER_QUEUELISTENERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Consumer.h"
+#include <list>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Track and notify components that wish to be notified of messages
+ * that become available on a queue.
+ *
+ * None of the methods defined here are protected by locking. However
+ * the populate method allows a 'snapshot' to be taken of the
+ * listeners to be notified. NotificationSet::notify() may then be
+ * called outside of any lock that protects the QueueListeners
+ * instance from concurrent access.
+ */
+class QueueListeners
+{
+ public:
+ typedef std::list<Consumer::shared_ptr> Listeners;
+
+ class NotificationSet
+ {
+ public:
+ void notify();
+ private:
+ Listeners browsers;
+ Consumer::shared_ptr consumer;
+ friend class QueueListeners;
+ };
+
+ void addListener(Consumer::shared_ptr);
+ void removeListener(Consumer::shared_ptr);
+ void populate(NotificationSet&);
+ private:
+ Listeners consumers;
+ Listeners browsers;
+
+ void add(Listeners&, Consumer::shared_ptr);
+ void remove(Listeners&, Consumer::shared_ptr);
+
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUELISTENERS_H*/