diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /cpp/src/qpid/broker/QueueFlowLimit.cpp | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r-- | cpp/src/qpid/broker/QueueFlowLimit.cpp | 407 |
1 files changed, 0 insertions, 407 deletions
diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp deleted file mode 100644 index b2e2e54bdf..0000000000 --- a/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ /dev/null @@ -1,407 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/broker/QueueFlowLimit.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/Queue.h" -#include "qpid/Exception.h" -#include "qpid/framing/FieldValue.h" -#include "qpid/framing/reply_exceptions.h" -#include "qpid/log/Statement.h" -#include "qpid/sys/Mutex.h" -#include "qpid/broker/SessionState.h" -#include "qpid/sys/ClusterSafe.h" - -#include "qmf/org/apache/qpid/broker/Queue.h" - -#include <sstream> - -using namespace qpid::broker; -using namespace qpid::framing; - -namespace { - /** ensure that the configured flow control stop and resume values are - * valid with respect to the maximum queue capacity, and each other - */ - template <typename T> - void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue) - { - if (resume > stop) { - throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type - << "=" << resume - << " must be less than qpid.flow_stop_" << type - << "=" << stop)); - } - if (resume == 0) resume = stop; - if (max != 0 && (max < stop)) { - throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type - << "=" << stop - << " must be less than qpid.max_" << type - << "=" << max)); - } - } - - /** extract a capacity value as passed in an argument map - */ - uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue) - { - FieldTable::ValuePtr v = settings.get(key); - - int64_t result = 0; - - if (!v) return defaultValue; - if (v->getType() == 0x23) { - QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>()); - } else if (v->getType() == 0x33) { - QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>()); - } else if (v->convertsTo<int64_t>()) { - result = v->get<int64_t>(); - QPID_LOG(debug, "Got integer value for " << key << ": " << result); - if (result >= 0) return result; - } else if (v->convertsTo<string>()) { - string s(v->get<string>()); - QPID_LOG(debug, "Got string value for " << key << ": " << s); - std::istringstream convert(s); - if (convert >> result && result >= 0) return result; - } - - QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")"); - return defaultValue; - } -} - - - -QueueFlowLimit::QueueFlowLimit(Queue *_queue, - uint32_t _flowStopCount, uint32_t _flowResumeCount, - uint64_t _flowStopSize, uint64_t _flowResumeSize) - : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), queueName("<unknown>"), - flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), - flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), - flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0) -{ - uint32_t maxCount(0); - uint64_t maxSize(0); - - if (queue) { - queueName = _queue->getName(); - if (queue->getPolicy()) { - maxSize = _queue->getPolicy()->getMaxSize(); - maxCount = _queue->getPolicy()->getMaxCount(); - } - broker = queue->getBroker(); - queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject()); - if (queueMgmtObj) { - queueMgmtObj->set_flowStopped(isFlowControlActive()); - } - } - validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName ); - validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName ); - QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount - << ", flowResumeCount=" << flowResumeCount - << ", flowStopSize=" << flowStopSize << ", flowResumeSize=" << flowResumeSize ); -} - - -QueueFlowLimit::~QueueFlowLimit() -{ - sys::Mutex::ScopedLock l(indexLock); - if (!index.empty()) { - // we're gone - release all pending msgs - for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin(); - itr != index.end(); ++itr) - if (itr->second) - try { - itr->second->getIngressCompletion().finishCompleter(); - } catch (...) {} // ignore - not safe for a destructor to throw. - index.clear(); - } -} - - -void QueueFlowLimit::enqueued(const QueuedMessage& msg) -{ - sys::Mutex::ScopedLock l(indexLock); - - ++count; - size += msg.payload->contentSize(); - - if (!flowStopped) { - if (flowStopCount && count > flowStopCount) { - flowStopped = true; - QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." ); - } else if (flowStopSize && size > flowStopSize) { - flowStopped = true; - QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); - } - if (flowStopped && queueMgmtObj) { - queueMgmtObj->set_flowStopped(true); - queueMgmtObj->inc_flowStoppedCount(); - } - } - - if (flowStopped || !index.empty()) { - // ignore flow control if we are populating the queue due to cluster replication: - if (broker && broker->isClusterUpdatee()) { - QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position); - return; - } - QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position); - msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes - bool unique; - unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second; - assert(unique); - } -} - - - -void QueueFlowLimit::dequeued(const QueuedMessage& msg) -{ - sys::Mutex::ScopedLock l(indexLock); - - if (count > 0) { - --count; - } else { - throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName)); - } - - uint64_t _size = msg.payload->contentSize(); - if (_size <= size) { - size -= _size; - } else { - throw Exception(QPID_MSG("Flow limit size underflow on dequeue. Queue=" << queueName)); - } - - if (flowStopped && - (flowResumeSize == 0 || size < flowResumeSize) && - (flowResumeCount == 0 || count < flowResumeCount)) { - flowStopped = false; - if (queueMgmtObj) - queueMgmtObj->set_flowStopped(false); - QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." ); - } - - if (!index.empty()) { - if (!flowStopped) { - // flow enabled - release all pending msgs - for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.begin(); - itr != index.end(); ++itr) - if (itr->second) - itr->second->getIngressCompletion().finishCompleter(); - index.clear(); - } else { - // even if flow controlled, we must release this msg as it is being dequeued - std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.position); - if (itr != index.end()) { // this msg is flow controlled, release it: - msg.payload->getIngressCompletion().finishCompleter(); - index.erase(itr); - } - } - } -} - - -void QueueFlowLimit::encode(Buffer& buffer) const -{ - buffer.putLong(flowStopCount); - buffer.putLong(flowResumeCount); - buffer.putLongLong(flowStopSize); - buffer.putLongLong(flowResumeSize); - buffer.putLong(count); - buffer.putLongLong(size); -} - - -void QueueFlowLimit::decode ( Buffer& buffer ) -{ - flowStopCount = buffer.getLong(); - flowResumeCount = buffer.getLong(); - flowStopSize = buffer.getLongLong(); - flowResumeSize = buffer.getLongLong(); - count = buffer.getLong(); - size = buffer.getLongLong(); -} - - -uint32_t QueueFlowLimit::encodedSize() const { - return sizeof(uint32_t) + // flowStopCount - sizeof(uint32_t) + // flowResumecount - sizeof(uint64_t) + // flowStopSize - sizeof(uint64_t) + // flowResumeSize - sizeof(uint32_t) + // count - sizeof(uint64_t); // size -} - - -const std::string QueueFlowLimit::flowStopCountKey("qpid.flow_stop_count"); -const std::string QueueFlowLimit::flowResumeCountKey("qpid.flow_resume_count"); -const std::string QueueFlowLimit::flowStopSizeKey("qpid.flow_stop_size"); -const std::string QueueFlowLimit::flowResumeSizeKey("qpid.flow_resume_size"); -uint64_t QueueFlowLimit::defaultMaxSize; -uint QueueFlowLimit::defaultFlowStopRatio; -uint QueueFlowLimit::defaultFlowResumeRatio; - - -void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint flowResumeRatio) -{ - defaultMaxSize = maxQueueSize; - defaultFlowStopRatio = flowStopRatio; - defaultFlowResumeRatio = flowResumeRatio; - - /** @todo KAG: Verify valid range on Broker::Options instead of here */ - if (flowStopRatio > 100 || flowResumeRatio > 100) - throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:" - << " flowStopRatio=" << flowStopRatio - << " flowResumeRatio=" << flowResumeRatio)); - if (flowResumeRatio > flowStopRatio) - throw InvalidArgumentException(QPID_MSG("Default queue flow stop ratio must be >= flow resume ratio:" - << " flowStopRatio=" << flowStopRatio - << " flowResumeRatio=" << flowResumeRatio)); -} - - -void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& settings) -{ - QueueFlowLimit *ptr = createLimit( &queue, settings ); - if (ptr) { - boost::shared_ptr<QueueFlowLimit> observer(ptr); - queue.addObserver(observer); - } -} - -/** returns ptr to a QueueFlowLimit, else 0 if no limit */ -QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::FieldTable& settings) -{ - std::string type(QueuePolicy::getType(settings)); - - if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) { - // The size of a RING queue is limited by design - no need for flow control. - return 0; - } - - if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey) || - settings.get(flowResumeCountKey) || settings.get(flowResumeSizeKey)) { - // user provided (some) flow settings manually... - uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0); - uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0); - uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0); - uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0); - if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control - return 0; - } - return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); - } - - if (defaultFlowStopRatio) { // broker has a default ratio setup... - uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize); - uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); - uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); - uint32_t maxMsgCount = getCapacity(settings, QueuePolicy::maxCountKey, 0); // no size by default - uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5); - uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0)); - - return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); - } - return 0; -} - -/* Cluster replication */ - -namespace { - /** pack a set of sequence number ranges into a framing::Array */ - void buildSeqRangeArray(qpid::framing::Array *seqs, - const qpid::framing::SequenceNumber& first, - const qpid::framing::SequenceNumber& last) - { - seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(first))); - seqs->push_back(qpid::framing::Array::ValuePtr(new Unsigned32Value(last))); - } -} - -/** Runs on UPDATER to snapshot current state */ -void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const -{ - sys::Mutex::ScopedLock l(indexLock); - state.clear(); - - framing::SequenceSet ss; - if (!index.empty()) { - /* replicate the set of messages pending flow control */ - for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> >::const_iterator itr = index.begin(); - itr != index.end(); ++itr) { - ss.add(itr->first); - } - framing::Array seqs(TYPE_CODE_UINT32); - typedef boost::function<void(framing::SequenceNumber, framing::SequenceNumber)> arrayBuilder; - ss.for_each((arrayBuilder)boost::bind(&buildSeqRangeArray, &seqs, _1, _2)); - state.setArray("pendingMsgSeqs", seqs); - } - QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating pending msgs, range=" << ss); -} - - -/** called on UPDATEE to set state from snapshot */ -void QueueFlowLimit::setState(const qpid::framing::FieldTable& state) -{ - sys::Mutex::ScopedLock l(indexLock); - index.clear(); - - framing::SequenceSet fcmsg; - framing::Array seqArray(TYPE_CODE_UINT32); - if (state.getArray("pendingMsgSeqs", seqArray)) { - assert((seqArray.count() & 0x01) == 0); // must be even since they are sequence ranges - framing::Array::const_iterator i = seqArray.begin(); - while (i != seqArray.end()) { - framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 4>()); - ++i; - framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>()); - ++i; - fcmsg.add(first, last); - for (SequenceNumber seq = first; seq <= last; ++seq) { - QueuedMessage msg(queue->find(seq)); // fyi: msg.payload may be null if msg is delivered & unacked - bool unique; - unique = index.insert(std::pair<framing::SequenceNumber, boost::intrusive_ptr<Message> >(seq, msg.payload)).second; - assert(unique); - } - } - } - - flowStopped = index.size() != 0; - if (queueMgmtObj) { - queueMgmtObj->set_flowStopped(isFlowControlActive()); - } - QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the pending msgs, range=" << fcmsg) -} - - -namespace qpid { - namespace broker { - -std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f) -{ - out << "; flowStopCount=" << f.flowStopCount << ", flowResumeCount=" << f.flowResumeCount; - out << "; flowStopSize=" << f.flowStopSize << ", flowResumeSize=" << f.flowResumeSize; - return out; -} - - } -} - |