summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp111
1 files changed, 56 insertions, 55 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 607dc09fe8..3a609d67f1 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -34,6 +34,8 @@
#include <sstream>
+#include <boost/enable_shared_from_this.hpp>
+
using namespace qpid::broker;
using namespace qpid::framing;
@@ -44,47 +46,34 @@ namespace {
template <typename T>
void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue)
{
- if (resume > stop) {
- throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type
+ if (stop) {
+ if (resume > stop) {
+ throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type
<< "=" << resume
- << " must be less than qpid.flow_stop_" << type
+ << " must be less or equal to qpid.flow_stop_" << type
<< "=" << stop));
- }
- if (resume == 0) resume = stop;
- if (max != 0 && (max < stop)) {
- throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type
+ }
+ if (resume == 0) resume = stop;
+ if (max != 0 && (max < stop)) {
+ throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type
<< "=" << stop
<< " must be less than qpid.max_" << type
<< "=" << max));
+ }
}
}
}
-QueueFlowLimit::QueueFlowLimit(Queue *_queue,
+QueueFlowLimit::QueueFlowLimit(const std::string& _queueName,
uint32_t _flowStopCount, uint32_t _flowResumeCount,
uint64_t _flowStopSize, uint64_t _flowResumeSize)
- : queue(_queue), queueName("<unknown>"),
+ : queue(0), queueName(_queueName),
flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
flowStopped(false), count(0), size(0), broker(0)
{
- uint32_t maxCount(0);
- uint64_t maxSize(0);
-
- if (queue) {
- queueName = _queue->getName();
- if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount();
- if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize();
- broker = queue->getBroker();
- queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject());
- if (queueMgmtObj) {
- queueMgmtObj->set_flowStopped(isFlowControlActive());
- }
- }
- validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName );
- validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName );
QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount
<< ", flowResumeCount=" << flowResumeCount
<< ", flowStopSize=" << flowStopSize << ", flowResumeSize=" << flowResumeSize );
@@ -245,50 +234,62 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint
}
-void QueueFlowLimit::observe(Queue& queue, const QueueSettings& settings)
+void QueueFlowLimit::observe(Queue& queue)
{
- QueueFlowLimit *ptr = createLimit( &queue, settings );
- if (ptr) {
- boost::shared_ptr<QueueFlowLimit> observer(ptr);
- queue.addObserver(observer);
+ /* set up management stuff */
+ broker = queue.getBroker();
+ queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue.GetManagementObject());
+ if (queueMgmtObj) {
+ queueMgmtObj->set_flowStopped(isFlowControlActive());
}
+
+ /* set up the observer */
+ queue.addObserver(shared_from_this());
}
/** returns ptr to a QueueFlowLimit, else 0 if no limit */
-QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& settings)
+boost::shared_ptr<QueueFlowLimit> QueueFlowLimit::createLimit(const std::string& queueName, const QueueSettings& settings)
{
if (settings.dropMessagesAtLimit) {
// The size of a RING queue is limited by design - no need for flow control.
- return 0;
+ return boost::shared_ptr<QueueFlowLimit>();
}
+ if ((!settings.flowStop.hasCount()) && (!settings.flowStop.hasSize()) && (settings.flowResume.hasCount() || settings.flowResume.hasSize()))
+ QPID_LOG(warning, "queue " << queueName << ": user-configured flow limits are ignored as no stop limits provided");
- if (settings.flowStop.hasCount() || settings.flowStop.hasSize()) {
- // user provided (some) flow settings manually...
- if (settings.flowStop.getCount() || settings.flowStop.getSize()) {
- return new QueueFlowLimit(queue,
- settings.flowStop.getCount(),
- settings.flowResume.getCount(),
- settings.flowStop.getSize(),
- settings.flowResume.getSize());
- } else {
- //don't have a non-zero value for either the count or the
- //size to stop at, yet at least one of these settings was
- //provided, i.e it was set to 0 explicitly which we treat
- //as turning it off
- return 0;
- }
- }
+ uint32_t flowStopCount(0), flowResumeCount(0), maxMsgCount(settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0);
+ uint64_t flowStopSize(0), flowResumeSize(0), maxByteCount(settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize);
+ // pre-fill by defaults, if exist
if (defaultFlowStopRatio) { // broker has a default ratio setup...
- uint64_t maxByteCount = settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize;
- uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
- uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
- uint32_t maxMsgCount = settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0;
- uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
- uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
- return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize);
+ flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5);
+ flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5);
+ }
+
+ if (defaultFlowResumeRatio) { // broker has a default ratio setup...
+ flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0));
+ flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0));
+ }
+
+ // update by user-specified thresholds
+ if (settings.flowStop.hasCount())
+ flowStopCount = settings.flowStop.getCount();
+ if (settings.flowStop.hasSize())
+ flowStopSize = settings.flowStop.getSize();
+ if (settings.flowResume.hasCount())
+ flowResumeCount = settings.flowResume.getCount();
+ if (settings.flowResume.hasSize())
+ flowResumeSize = settings.flowResume.getSize();
+
+ if (flowStopCount || flowStopSize) {
+ validateFlowConfig(maxMsgCount, flowStopCount, flowResumeCount, "count", queueName );
+ validateFlowConfig(maxByteCount, flowStopSize, flowResumeSize, "size", queueName );
+ return boost::shared_ptr<QueueFlowLimit>(new QueueFlowLimit(queueName, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize));
}
- return 0;
+ else
+ //don't have a non-zero value for either the count or the
+ //size to stop at, so no flow limit applicable
+ return boost::shared_ptr<QueueFlowLimit>();
}
namespace qpid {