diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 111 |
1 files changed, 56 insertions, 55 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 607dc09fe8..3a609d67f1 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -34,6 +34,8 @@ #include <sstream> +#include <boost/enable_shared_from_this.hpp> + using namespace qpid::broker; using namespace qpid::framing; @@ -44,47 +46,34 @@ namespace { template <typename T> void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue) { - if (resume > stop) { - throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type + if (stop) { + if (resume > stop) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type << "=" << resume - << " must be less than qpid.flow_stop_" << type + << " must be less or equal to qpid.flow_stop_" << type << "=" << stop)); - } - if (resume == 0) resume = stop; - if (max != 0 && (max < stop)) { - throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type + } + if (resume == 0) resume = stop; + if (max != 0 && (max < stop)) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type << "=" << stop << " must be less than qpid.max_" << type << "=" << max)); + } } } } -QueueFlowLimit::QueueFlowLimit(Queue *_queue, +QueueFlowLimit::QueueFlowLimit(const std::string& _queueName, uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize) - : queue(_queue), queueName("<unknown>"), + : queue(0), queueName(_queueName), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), flowStopped(false), count(0), size(0), broker(0) { - uint32_t maxCount(0); - uint64_t maxSize(0); - - if (queue) { - queueName = _queue->getName(); - if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount(); - if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize(); - broker = queue->getBroker(); - queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject()); - if (queueMgmtObj) { - queueMgmtObj->set_flowStopped(isFlowControlActive()); - } - } - validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName ); - validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName ); QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount << ", flowResumeCount=" << flowResumeCount << ", flowStopSize=" << flowStopSize << ", flowResumeSize=" << flowResumeSize ); @@ -245,50 +234,62 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint } -void QueueFlowLimit::observe(Queue& queue, const QueueSettings& settings) +void QueueFlowLimit::observe(Queue& queue) { - QueueFlowLimit *ptr = createLimit( &queue, settings ); - if (ptr) { - boost::shared_ptr<QueueFlowLimit> observer(ptr); - queue.addObserver(observer); + /* set up management stuff */ + broker = queue.getBroker(); + queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue.GetManagementObject()); + if (queueMgmtObj) { + queueMgmtObj->set_flowStopped(isFlowControlActive()); } + + /* set up the observer */ + queue.addObserver(shared_from_this()); } /** returns ptr to a QueueFlowLimit, else 0 if no limit */ -QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& settings) +boost::shared_ptr<QueueFlowLimit> QueueFlowLimit::createLimit(const std::string& queueName, const QueueSettings& settings) { if (settings.dropMessagesAtLimit) { // The size of a RING queue is limited by design - no need for flow control. - return 0; + return boost::shared_ptr<QueueFlowLimit>(); } + if ((!settings.flowStop.hasCount()) && (!settings.flowStop.hasSize()) && (settings.flowResume.hasCount() || settings.flowResume.hasSize())) + QPID_LOG(warning, "queue " << queueName << ": user-configured flow limits are ignored as no stop limits provided"); - if (settings.flowStop.hasCount() || settings.flowStop.hasSize()) { - // user provided (some) flow settings manually... - if (settings.flowStop.getCount() || settings.flowStop.getSize()) { - return new QueueFlowLimit(queue, - settings.flowStop.getCount(), - settings.flowResume.getCount(), - settings.flowStop.getSize(), - settings.flowResume.getSize()); - } else { - //don't have a non-zero value for either the count or the - //size to stop at, yet at least one of these settings was - //provided, i.e it was set to 0 explicitly which we treat - //as turning it off - return 0; - } - } + uint32_t flowStopCount(0), flowResumeCount(0), maxMsgCount(settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0); + uint64_t flowStopSize(0), flowResumeSize(0), maxByteCount(settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize); + // pre-fill by defaults, if exist if (defaultFlowStopRatio) { // broker has a default ratio setup... - uint64_t maxByteCount = settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize; - uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); - uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); - uint32_t maxMsgCount = settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0; - uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5); - uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0)); - return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); + flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); + flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5); + } + + if (defaultFlowResumeRatio) { // broker has a default ratio setup... + flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); + flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0)); + } + + // update by user-specified thresholds + if (settings.flowStop.hasCount()) + flowStopCount = settings.flowStop.getCount(); + if (settings.flowStop.hasSize()) + flowStopSize = settings.flowStop.getSize(); + if (settings.flowResume.hasCount()) + flowResumeCount = settings.flowResume.getCount(); + if (settings.flowResume.hasSize()) + flowResumeSize = settings.flowResume.getSize(); + + if (flowStopCount || flowStopSize) { + validateFlowConfig(maxMsgCount, flowStopCount, flowResumeCount, "count", queueName ); + validateFlowConfig(maxByteCount, flowStopSize, flowResumeSize, "size", queueName ); + return boost::shared_ptr<QueueFlowLimit>(new QueueFlowLimit(queueName, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); } - return 0; + else + //don't have a non-zero value for either the count or the + //size to stop at, so no flow limit applicable + return boost::shared_ptr<QueueFlowLimit>(); } namespace qpid { |
