From 9b7442210d74846fac84e5e86236f0f2fc21886c Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Thu, 28 Apr 2011 12:25:59 +0000 Subject: QPID-3076: enable flow control for clustered broker configurations. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1097432 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Broker.cpp | 8 +- cpp/src/qpid/broker/Queue.h | 5 ++ cpp/src/qpid/broker/QueueFlowLimit.cpp | 135 ++++++++++++++++------------ cpp/src/qpid/broker/QueueFlowLimit.h | 11 ++- cpp/src/qpid/broker/StatefulQueueObserver.h | 63 +++++++++++++ 5 files changed, 152 insertions(+), 70 deletions(-) create mode 100644 cpp/src/qpid/broker/StatefulQueueObserver.h (limited to 'cpp/src/qpid/broker') diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 764da735e3..240766c443 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -248,13 +248,7 @@ Broker::Broker(const Broker::Options& conf) : // Early-Initialize plugins Plugin::earlyInitAll(*this); - /** todo KAG - remove once cluster support for flow control done */ - if (isInCluster()) { - QPID_LOG(info, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default."); - QueueFlowLimit::setDefaults(0, 0, 0); - } else { - QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); - } + QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); // If no plugin store module registered itself, set up the null store. if (NullMessageStore::isNullStore(store.get())) diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 73d52ec9ca..c4f1bcc07e 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -348,6 +348,11 @@ class Queue : public boost::enable_shared_from_this, bindings.eachBinding(f); } + /** Apply f to each Observer on the queue */ + template void eachObserver(F f) { + std::for_each(observers.begin(), observers.end(), f); + } + /** Set the position sequence number for the next message on the queue. * Must be >= the current sequence number. * Used by cluster to replicate queues. diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp index 3494288f7b..20679972ff 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -92,7 +92,7 @@ namespace { QueueFlowLimit::QueueFlowLimit(Queue *_queue, uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize) - : queue(_queue), queueName(""), + : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName(""), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0) @@ -123,8 +123,6 @@ QueueFlowLimit::QueueFlowLimit(Queue *_queue, void QueueFlowLimit::enqueued(const QueuedMessage& msg) { - if (!msg.payload) return; - sys::Mutex::ScopedLock l(indexLock); ++count; @@ -152,7 +150,9 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) } QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position); msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes - index.insert(msg.payload); + bool unique; + unique = index.insert(std::pair >(msg.position, msg.payload)).second; + assert(unique); } } @@ -160,8 +160,6 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) void QueueFlowLimit::dequeued(const QueuedMessage& msg) { - if (!msg.payload) return; - sys::Mutex::ScopedLock l(indexLock); if (count > 0) { @@ -189,16 +187,16 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) if (!index.empty()) { if (!flowStopped) { // flow enabled - release all pending msgs - while (!index.empty()) { - std::set< boost::intrusive_ptr >::iterator itr = index.begin(); - (*itr)->getIngressCompletion().finishCompleter(); - index.erase(itr); - } + for (std::map >::iterator itr = index.begin(); + itr != index.end(); ++itr) + if (itr->second) + itr->second->getIngressCompletion().finishCompleter(); + index.clear(); } else { // even if flow controlled, we must release this msg as it is being dequeued - std::set< boost::intrusive_ptr >::iterator itr = index.find(msg.payload); + std::map >::iterator itr = index.find(msg.position); if (itr != index.end()) { // this msg is flow controlled, release it: - (*itr)->getIngressCompletion().finishCompleter(); + msg.payload->getIngressCompletion().finishCompleter(); index.erase(itr); } } @@ -206,34 +204,6 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) } -/** used by clustering: is the given message's completion blocked due to flow - * control? True if message is blocked. (for the clustering updater: done - * after msgs have been replicated to the updatee). - */ -bool QueueFlowLimit::getState(const QueuedMessage& msg) const -{ - sys::Mutex::ScopedLock l(indexLock); - return (index.find(msg.payload) != index.end()); -} - - -/** artificially force the flow control state of a given message - * (for the clustering updatee: done after msgs have been replicated to - * the updatee's queue) - */ -void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked) -{ - if (blocked && msg.payload) { - - sys::Mutex::ScopedLock l(indexLock); - assert(index.find(msg.payload) == index.end()); - - QPID_LOG(debug, "Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC"); - index.insert(msg.payload); - } -} - - void QueueFlowLimit::encode(Buffer& buffer) const { buffer.putLong(flowStopCount); @@ -281,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint defaultFlowStopRatio = flowStopRatio; defaultFlowResumeRatio = flowResumeRatio; - /** @todo Verify valid range on Broker::Options instead of here */ + /** @todo KAG: Verify valid range on Broker::Options instead of here */ if (flowStopRatio > 100 || flowResumeRatio > 100) throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:" << " flowStopRatio=" << flowStopRatio @@ -320,14 +290,6 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control return 0; } - /** @todo KAG - remove once cluster support for flow control done. */ - // TODO aconway 2011-02-16: is queue==0 only in tests? - // TODO kgiusti 2011-02-19: yes! The unit tests test this class in isolation */ - if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { - QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " - << queue->getName()); - return 0; - } return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } @@ -335,17 +297,76 @@ QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::F uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize); uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); + return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize); + } + return 0; +} - /** todo KAG - remove once cluster support for flow control done. */ - if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { - QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " - << queue->getName()); - return 0; +/* Cluster replication */ + +namespace { + /** pack a set of sequence number ranges into a framing::Array */ + void buildSeqRangeArray(qpid::framing::Array *seqs, + const qpid::framing::SequenceNumber first, + const qpid::framing::SequenceNumber last) + { + seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first))); + seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last))); + } +} + +/** Runs on UPDATER to snapshot current state */ +void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const +{ + sys::Mutex::ScopedLock l(indexLock); + state.clear(); + + framing::SequenceSet ss; + if (!index.empty()) { + /* replicate the set of messages pending flow control */ + for (std::map >::const_iterator itr = index.begin(); + itr != index.end(); ++itr) { + ss.add(itr->first); } + framing::Array seqs(TYPE_CODE_UINT32); + ss.for_each(boost::bind(&buildSeqRangeArray, &seqs, _1, _2)); + state.setArray("pendingMsgSeqs", seqs); + } + QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss); +} - return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize); + +/** called on UPDATEE to set state from snapshot */ +void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) +{ + sys::Mutex::ScopedLock l(indexLock); + index.clear(); + + framing::SequenceSet fcmsg; + framing::Array seqArray(TYPE_CODE_UINT32); + if (state.getArray("pendingMsgSeqs", seqArray)) { + assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges + framing::Array::const_iterator i = seqArray.begin(); + while (i != seqArray.end()) { + framing::SequenceNumber first((*i)->getIntegerValue()); + ++i; + framing::SequenceNumber last((*i)->getIntegerValue()); + ++i; + fcmsg.add(first, last); + for (SequenceNumber seq = first; seq <= last; ++seq) { + QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked + bool unique; + unique = index.insert(std::pair >(seq, msg.payload)).second; + assert(unique); + } + } } - return 0; + + flowStopped = index.size() != 0; + if (queueMgmtObj) { + queueMgmtObj->set_flowStopped(isFlowControlActive()); + } + QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg) } diff --git a/cpp/src/qpid/broker/QueueFlowLimit.h b/cpp/src/qpid/broker/QueueFlowLimit.h index 69d91df45a..5fdae39c29 100644 --- a/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/cpp/src/qpid/broker/QueueFlowLimit.h @@ -27,7 +27,7 @@ #include #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/QueuedMessage.h" -#include "qpid/broker/QueueObserver.h" +#include "qpid/broker/StatefulQueueObserver.h" #include "qpid/framing/FieldTable.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Mutex.h" @@ -53,7 +53,7 @@ class Broker; * passing _either_ level may turn flow control ON, but _both_ must be * below level before flow control will be turned OFF. */ - class QueueFlowLimit : public QueueObserver + class QueueFlowLimit : public StatefulQueueObserver { static uint64_t defaultMaxSize; static uint defaultFlowStopRatio; @@ -86,9 +86,8 @@ class Broker; QPID_BROKER_EXTERN void dequeued(const QueuedMessage&); /** for clustering: */ - /** true if the given message is flow controlled, and cannot be completed. */ - bool getState(const QueuedMessage&) const; - void setState(const QueuedMessage&, bool blocked); + QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const; + QPID_BROKER_EXTERN void setState(const qpid::framing::FieldTable&); uint32_t getFlowStopCount() const { return flowStopCount; } uint32_t getFlowResumeCount() const { return flowResumeCount; } @@ -111,7 +110,7 @@ class Broker; protected: // msgs waiting for flow to become available. - std::set< boost::intrusive_ptr > index; + std::map > index; mutable qpid::sys::Mutex indexLock; _qmfBroker::Queue *queueMgmtObj; diff --git a/cpp/src/qpid/broker/StatefulQueueObserver.h b/cpp/src/qpid/broker/StatefulQueueObserver.h new file mode 100644 index 0000000000..c682d460b7 --- /dev/null +++ b/cpp/src/qpid/broker/StatefulQueueObserver.h @@ -0,0 +1,63 @@ +#ifndef QPID_BROKER_STATEFULQUEUEOBSERVER_H +#define QPID_BROKER_STATEFULQUEUEOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/QueueObserver.h" +#include "qpid/framing/FieldTable.h" + +namespace qpid { +namespace broker { + +/** + * Specialized type of QueueObserver that maintains internal state that has to + * be replicated across clustered brokers. + */ +class StatefulQueueObserver : public QueueObserver +{ + public: + StatefulQueueObserver(std::string _id) : id(_id) {} + virtual ~StatefulQueueObserver() {} + + /** This identifier must uniquely identify this particular observer amoung + * all observers on a queue. For cluster replication, this id will be used + * to identify the peer queue observer for synchronization across + * brokers. + */ + const std::string& getId() const { return id; } + + /** This method should return the observer's internal state as an opaque + * map. + */ + virtual void getState(qpid::framing::FieldTable& state ) const = 0; + + /** The input map represents the internal state of the peer observer that + * this observer should synchonize to. + */ + virtual void setState(const qpid::framing::FieldTable&) = 0; + + + private: + std::string id; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_STATEFULQUEUEOBSERVER_H*/ -- cgit v1.2.1