diff options
| author | Pavel Moravec <pmoravec@apache.org> | 2013-11-19 15:16:11 +0000 |
|---|---|---|
| committer | Pavel Moravec <pmoravec@apache.org> | 2013-11-19 15:16:11 +0000 |
| commit | 320112d419305cf5792226e7520b3bb52f27b221 (patch) | |
| tree | 9879e1a65606b146fd02ad91b3747f60cf180196 /qpid/cpp/src | |
| parent | 5a42352ad6e33c2280fea5737b2b4db1a27b8e2a (diff) | |
| download | qpid-python-320112d419305cf5792226e7520b3bb52f27b221.tar.gz | |
QPID-5278 , QPID-5281: Queue flow limit validation ignores size parameters , Creating a queue with invalid settings results in no queue but only its management object exists
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1543449 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFactory.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 111 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.h | 14 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/QueueFlowLimitTest.cpp | 43 |
4 files changed, 85 insertions, 88 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueueFactory.cpp b/qpid/cpp/src/qpid/broker/QueueFactory.cpp index 90609c1f9d..e5d9431555 100644 --- a/qpid/cpp/src/qpid/broker/QueueFactory.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFactory.cpp @@ -48,6 +48,7 @@ QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {} boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings) { settings.validate(); + boost::shared_ptr<QueueFlowLimit> flow_ptr(QueueFlowLimit::createLimit(name, settings)); //1. determine Queue type (i.e. whether we are subclassing Queue) // -> if 'ring' policy is in use then subclass @@ -100,7 +101,9 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que ThresholdAlerts::observe(*queue, *(broker->getManagementAgent()), settings, broker->getOptions().queueThresholdEventRatio); } //5. flow control config - QueueFlowLimit::observe(*queue, settings); + if (flow_ptr) { + flow_ptr->observe(*queue); + } return queue; } diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 607dc09fe8..3a609d67f1 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -34,6 +34,8 @@ #include <sstream> +#include <boost/enable_shared_from_this.hpp> + using namespace qpid::broker; using namespace qpid::framing; @@ -44,47 +46,34 @@ namespace { template <typename T> void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue) { - if (resume > stop) { - throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type + if (stop) { + if (resume > stop) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type << "=" << resume - << " must be less than qpid.flow_stop_" << type + << " must be less or equal to qpid.flow_stop_" << type << "=" << stop)); - } - if (resume == 0) resume = stop; - if (max != 0 && (max < stop)) { - throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type + } + if (resume == 0) resume = stop; + if (max != 0 && (max < stop)) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type << "=" << stop << " must be less than qpid.max_" << type << "=" << max)); + } } } } -QueueFlowLimit::QueueFlowLimit(Queue *_queue, +QueueFlowLimit::QueueFlowLimit(const std::string& _queueName, uint32_t _flowStopCount, uint32_t _flowResumeCount, uint64_t _flowStopSize, uint64_t _flowResumeSize) - : queue(_queue), queueName("<unknown>"), + : queue(0), queueName(_queueName), flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), flowStopped(false), count(0), size(0), broker(0) { - uint32_t maxCount(0); - uint64_t maxSize(0); - - if (queue) { - queueName = _queue->getName(); - if (queue->getSettings().maxDepth.hasCount()) maxCount = queue->getSettings().maxDepth.getCount(); - if (queue->getSettings().maxDepth.hasCount()) maxSize = queue->getSettings().maxDepth.getSize(); - broker = queue->getBroker(); - queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue->GetManagementObject()); - if (queueMgmtObj) { - queueMgmtObj->set_flowStopped(isFlowControlActive()); - } - } - validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName ); - validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName ); QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount << ", flowResumeCount=" << flowResumeCount << ", flowStopSize=" << flowStopSize << ", flowResumeSize=" << flowResumeSize ); @@ -245,50 +234,62 @@ void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint } -void QueueFlowLimit::observe(Queue& queue, const QueueSettings& settings) +void QueueFlowLimit::observe(Queue& queue) { - QueueFlowLimit *ptr = createLimit( &queue, settings ); - if (ptr) { - boost::shared_ptr<QueueFlowLimit> observer(ptr); - queue.addObserver(observer); + /* set up management stuff */ + broker = queue.getBroker(); + queueMgmtObj = boost::dynamic_pointer_cast<_qmfBroker::Queue> (queue.GetManagementObject()); + if (queueMgmtObj) { + queueMgmtObj->set_flowStopped(isFlowControlActive()); } + + /* set up the observer */ + queue.addObserver(shared_from_this()); } /** returns ptr to a QueueFlowLimit, else 0 if no limit */ -QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const QueueSettings& settings) +boost::shared_ptr<QueueFlowLimit> QueueFlowLimit::createLimit(const std::string& queueName, const QueueSettings& settings) { if (settings.dropMessagesAtLimit) { // The size of a RING queue is limited by design - no need for flow control. - return 0; + return boost::shared_ptr<QueueFlowLimit>(); } + if ((!settings.flowStop.hasCount()) && (!settings.flowStop.hasSize()) && (settings.flowResume.hasCount() || settings.flowResume.hasSize())) + QPID_LOG(warning, "queue " << queueName << ": user-configured flow limits are ignored as no stop limits provided"); - if (settings.flowStop.hasCount() || settings.flowStop.hasSize()) { - // user provided (some) flow settings manually... - if (settings.flowStop.getCount() || settings.flowStop.getSize()) { - return new QueueFlowLimit(queue, - settings.flowStop.getCount(), - settings.flowResume.getCount(), - settings.flowStop.getSize(), - settings.flowResume.getSize()); - } else { - //don't have a non-zero value for either the count or the - //size to stop at, yet at least one of these settings was - //provided, i.e it was set to 0 explicitly which we treat - //as turning it off - return 0; - } - } + uint32_t flowStopCount(0), flowResumeCount(0), maxMsgCount(settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0); + uint64_t flowStopSize(0), flowResumeSize(0), maxByteCount(settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize); + // pre-fill by defaults, if exist if (defaultFlowStopRatio) { // broker has a default ratio setup... - uint64_t maxByteCount = settings.maxDepth.hasSize() ? settings.maxDepth.getSize() : defaultMaxSize; - uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); - uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); - uint32_t maxMsgCount = settings.maxDepth.hasCount() ? settings.maxDepth.getCount() : 0; - uint32_t flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5); - uint32_t flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0)); - return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); + flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); + flowStopCount = (uint32_t)(maxMsgCount * (defaultFlowStopRatio/100.0) + 0.5); + } + + if (defaultFlowResumeRatio) { // broker has a default ratio setup... + flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); + flowResumeCount = (uint32_t)(maxMsgCount * (defaultFlowResumeRatio/100.0)); + } + + // update by user-specified thresholds + if (settings.flowStop.hasCount()) + flowStopCount = settings.flowStop.getCount(); + if (settings.flowStop.hasSize()) + flowStopSize = settings.flowStop.getSize(); + if (settings.flowResume.hasCount()) + flowResumeCount = settings.flowResume.getCount(); + if (settings.flowResume.hasSize()) + flowResumeSize = settings.flowResume.getSize(); + + if (flowStopCount || flowStopSize) { + validateFlowConfig(maxMsgCount, flowStopCount, flowResumeCount, "count", queueName ); + validateFlowConfig(maxByteCount, flowStopSize, flowResumeSize, "size", queueName ); + return boost::shared_ptr<QueueFlowLimit>(new QueueFlowLimit(queueName, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize)); } - return 0; + else + //don't have a non-zero value for either the count or the + //size to stop at, so no flow limit applicable + return boost::shared_ptr<QueueFlowLimit>(); } namespace qpid { diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h index b9aa09ec3a..92bd7f76a0 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h @@ -33,6 +33,8 @@ #include "qpid/sys/Mutex.h" #include "qmf/org/apache/qpid/broker/Queue.h" +#include <boost/enable_shared_from_this.hpp> + namespace _qmfBroker = qmf::org::apache::qpid::broker; namespace qpid { @@ -50,7 +52,7 @@ struct QueueSettings; * passing _either_ level may turn flow control ON, but _both_ must be * below level before flow control will be turned OFF. */ - class QueueFlowLimit : public QueueObserver + class QueueFlowLimit : public QueueObserver, public boost::enable_shared_from_this<QueueFlowLimit> { static uint64_t defaultMaxSize; static uint defaultFlowStopRatio; @@ -99,7 +101,8 @@ struct QueueSettings; void decode(framing::Buffer& buffer); uint32_t encodedSize() const; - static QPID_BROKER_EXTERN void observe(Queue& queue, const QueueSettings& settings); + QPID_BROKER_EXTERN void observe(Queue& queue); + static QPID_BROKER_EXTERN boost::shared_ptr<QueueFlowLimit> createLimit(const std::string& queueName, const QueueSettings& settings); static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio); friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&); @@ -113,10 +116,9 @@ struct QueueSettings; const Broker *broker; - QPID_BROKER_EXTERN QueueFlowLimit(Queue *queue, - uint32_t flowStopCount, uint32_t flowResumeCount, - uint64_t flowStopSize, uint64_t flowResumeSize); - static QPID_BROKER_EXTERN QueueFlowLimit *createLimit(Queue *queue, const QueueSettings& settings); + QPID_BROKER_EXTERN QueueFlowLimit(const std::string& _queueName, + uint32_t _flowStopCount, uint32_t _flowResumeCount, + uint64_t _flowStopSize, uint64_t _flowResumeSize); }; }} diff --git a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp index 7b0a776062..b35294922d 100644 --- a/qpid/cpp/src/tests/QueueFlowLimitTest.cpp +++ b/qpid/cpp/src/tests/QueueFlowLimitTest.cpp @@ -46,7 +46,7 @@ class TestFlow : public QueueFlowLimit public: TestFlow(uint32_t flowStopCount, uint32_t flowResumeCount, uint64_t flowStopSize, uint64_t flowResumeSize) : - QueueFlowLimit(0, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize) + QueueFlowLimit("", flowStopCount, flowResumeCount, flowStopSize, flowResumeSize) {} virtual ~TestFlow() {} @@ -66,11 +66,11 @@ public: return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); } - static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& arguments) + static boost::shared_ptr<qpid::broker::QueueFlowLimit> getQueueFlowLimit(const qpid::framing::FieldTable& arguments) { QueueSettings settings; settings.populate(arguments, settings.storeSettings); - return QueueFlowLimit::createLimit(0, settings); + return QueueFlowLimit::createLimit("", settings); } }; @@ -357,10 +357,9 @@ QPID_AUTO_TEST_CASE(testFlowDefaultArgs) 80, // 80% stop threshold 70); // 70% resume threshold FieldTable args; - QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(flow); - BOOST_CHECK(ptr); - std::auto_ptr<QueueFlowLimit> flow(ptr); BOOST_CHECK_EQUAL((uint64_t) 2360001, flow->getFlowStopSize()); BOOST_CHECK_EQUAL((uint64_t) 2065000, flow->getFlowResumeSize()); BOOST_CHECK_EQUAL( 0u, flow->getFlowStopCount()); @@ -372,17 +371,17 @@ QPID_AUTO_TEST_CASE(testFlowDefaultArgs) QPID_AUTO_TEST_CASE(testFlowOverrideArgs) { - QueueFlowLimit::setDefaults(2950001, // max queue byte count + QueueFlowLimit::setDefaults(0, // max queue byte count 80, // 80% stop threshold 70); // 70% resume threshold { FieldTable args; args.setInt(QueueFlowLimit::flowStopCountKey, 35000); args.setInt(QueueFlowLimit::flowResumeCountKey, 30000); +// args.setInt(QueueFlowLimit::flowStopSizeKey, 0); - QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); - BOOST_CHECK(ptr); - std::auto_ptr<QueueFlowLimit> flow(ptr); + boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(flow); BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount()); BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount()); @@ -396,9 +395,8 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs) args.setInt(QueueFlowLimit::flowStopSizeKey, 350000); args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000); - QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); - BOOST_CHECK(ptr); - std::auto_ptr<QueueFlowLimit> flow(ptr); + boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(flow); BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount()); BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount()); @@ -414,9 +412,8 @@ QPID_AUTO_TEST_CASE(testFlowOverrideArgs) args.setInt(QueueFlowLimit::flowStopSizeKey, 350000); args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000); - QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); - BOOST_CHECK(ptr); - std::auto_ptr<QueueFlowLimit> flow(ptr); + boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(flow); BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount()); BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount()); @@ -434,9 +431,8 @@ QPID_AUTO_TEST_CASE(testFlowOverrideDefaults) 97, // stop threshold 73); // resume threshold FieldTable args; - QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); - BOOST_CHECK(ptr); - std::auto_ptr<QueueFlowLimit> flow(ptr); + boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(flow); BOOST_CHECK_EQUAL((uint32_t) 2861501, flow->getFlowStopSize()); BOOST_CHECK_EQUAL((uint32_t) 2153500, flow->getFlowResumeSize()); @@ -450,14 +446,9 @@ QPID_AUTO_TEST_CASE(testFlowDisable) { FieldTable args; args.setInt(QueueFlowLimit::flowStopCountKey, 0); - QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); - BOOST_CHECK(!ptr); - } - { - FieldTable args; args.setInt(QueueFlowLimit::flowStopSizeKey, 0); - QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); - BOOST_CHECK(!ptr); + boost::shared_ptr<QueueFlowLimit> flow = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(!flow); } } |
