diff options
| author | Gordon Sim <gsim@apache.org> | 2013-07-13 09:15:49 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-07-13 09:15:49 +0000 |
| commit | ce70d9dec7f82513d3d421f97c0a446987fd8107 (patch) | |
| tree | 79046ee5de7b0fd1558cd3c8c5337436bcea9d9b /qpid/cpp | |
| parent | 03349fee0dc9473750f465b92e37729299133902 (diff) | |
| download | qpid-python-ce70d9dec7f82513d3d421f97c0a446987fd8107.tar.gz | |
QPID-3247: add policy for self-struct subscription queue
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1502766 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFactory.cpp | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueSettings.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueSettings.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp | 45 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SelfDestructQueue.h | 45 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 26 |
8 files changed, 128 insertions, 0 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 12b7045cf1..c1d16404ef 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1311,6 +1311,7 @@ set (qpidbroker_SOURCES qpid/broker/SelectorToken.cpp qpid/broker/SelectorValue.h qpid/broker/SelectorValue.cpp + qpid/broker/SelfDestructQueue.cpp qpid/broker/SemanticState.h qpid/broker/SemanticState.cpp qpid/broker/SessionAdapter.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 8a98cea1ee..f2253e22a1 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -748,6 +748,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SelectorToken.h \ qpid/broker/SelectorValue.cpp \ qpid/broker/SelectorValue.h \ + qpid/broker/SelfDestructQueue.h \ + qpid/broker/SelfDestructQueue.cpp \ qpid/broker/SemanticState.cpp \ qpid/broker/SemanticState.h \ qpid/broker/SessionAdapter.cpp \ diff --git a/qpid/cpp/src/qpid/broker/QueueFactory.cpp b/qpid/cpp/src/qpid/broker/QueueFactory.cpp index 67499c9985..807026ef0d 100644 --- a/qpid/cpp/src/qpid/broker/QueueFactory.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFactory.cpp @@ -33,6 +33,7 @@ #include "qpid/broker/PagedQueue.h" #include "qpid/broker/PriorityQueue.h" #include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/SelfDestructQueue.h" #include "qpid/broker/ThresholdAlerts.h" #include "qpid/broker/FifoDistributor.h" #include "qpid/log/Statement.h" @@ -53,6 +54,8 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que boost::shared_ptr<Queue> queue; if (settings.dropMessagesAtLimit) { queue = boost::shared_ptr<Queue>(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker)); + } else if (settings.selfDestructAtLimit) { + queue = boost::shared_ptr<Queue>(new SelfDestructQueue(name, settings, settings.durable ? store : 0, parent, broker)); } else if (settings.lvqKey.size()) { std::auto_ptr<MessageMap> map(new MessageMap(settings.lvqKey)); queue = boost::shared_ptr<Queue>(new Lvq(name, map, settings, settings.durable ? store : 0, parent, broker)); diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp index fd90d11d76..30e4bb8fca 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp +++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp @@ -38,6 +38,7 @@ const std::string MAX_FILE_SIZE("qpid.file_size"); const std::string POLICY_TYPE("qpid.policy_type"); const std::string POLICY_TYPE_REJECT("reject"); const std::string POLICY_TYPE_RING("ring"); +const std::string POLICY_TYPE_SELF_DESTRUCT("self-destruct"); const std::string NO_LOCAL("no-local"); const std::string BROWSE_ONLY("qpid.browse-only"); const std::string TRACE_ID("qpid.trace.id"); @@ -96,6 +97,7 @@ QueueSettings::QueueSettings(bool d, bool a) : shareGroups(false), addTimestamp(false), dropMessagesAtLimit(false), + selfDestructAtLimit(false), paging(false), maxPages(0), pageFactor(0), @@ -120,6 +122,9 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v if (value.getString() == POLICY_TYPE_RING) { dropMessagesAtLimit = true; return true; + } else if (value.getString() == POLICY_TYPE_SELF_DESTRUCT) { + selfDestructAtLimit = true; + return true; } else if (value.getString() == POLICY_TYPE_REJECT) { //do nothing, thats the default return true; diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.h b/qpid/cpp/src/qpid/broker/QueueSettings.h index 166445be18..8d72115e18 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.h +++ b/qpid/cpp/src/qpid/broker/QueueSettings.h @@ -70,6 +70,7 @@ struct QueueSettings QueueDepth maxDepth; bool dropMessagesAtLimit;//aka ring queue policy + bool selfDestructAtLimit; //PagedQueue: bool paging; diff --git a/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp b/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp new file mode 100644 index 0000000000..f8b26a62ad --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SelfDestructQueue.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SelfDestructQueue.h" +#include "AclModule.h" +#include "Broker.h" +#include "QueueDepth.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { +SelfDestructQueue::SelfDestructQueue(const std::string& n, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b) : Queue(n, s, ms, p, b) +{ + QPID_LOG_CAT(debug, model, "Self-destruct queue created: " << name); +} +bool SelfDestructQueue::checkDepth(const QueueDepth& increment, const Message&) +{ + if (settings.maxDepth && (settings.maxDepth - current < increment)) { + broker->getQueues().destroy(name); + if (broker->getAcl()) + broker->getAcl()->recordDestroyQueue(name); + QPID_LOG_CAT(debug, model, "Queue " << name << " deleted itself due to reaching limit: " << current << " (policy is " << settings.maxDepth << ")"); + destroyed(); + } + current += increment; + return true; +} +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SelfDestructQueue.h b/qpid/cpp/src/qpid/broker/SelfDestructQueue.h new file mode 100644 index 0000000000..110eb1dcf7 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/SelfDestructQueue.h @@ -0,0 +1,45 @@ +#ifndef QPID_BROKER_SELFDESTRUCTQUEUE_H +#define QPID_BROKER_SELFDESTRUCTQUEUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Queue.h" + +namespace qpid { +namespace broker { + +/** + * Deletes itself when breaching specified maximum depth (useful as + * subscription queue for consumers that should be ejecetd from topic + * when they can't keep up). + */ +class SelfDestructQueue : public Queue +{ + public: + public: + SelfDestructQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); + bool checkDepth(const QueueDepth& increment, const Message&); + private: + private: +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_SELFDESTRUCTQUEUE_H*/ diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index b388f2c13a..9791cf9f38 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -1289,6 +1289,32 @@ QPID_AUTO_TEST_CASE(testSimpleRequestResponse) BOOST_CHECK_EQUAL(m.getSubject(), original.getSubject()); } +QPID_AUTO_TEST_CASE(testSelfDestructQueue) +{ + MessagingFixture fix; + //create receiver on temp queue for responses (using shorthand for temp queue) + Session other = fix.connection.createSession(); + Receiver r1 = other.createReceiver("amq.fanout; {link:{reliability:at-least-once, x-declare:{arguments:{qpid.max_count:10,qpid.policy_type:self-destruct}}}}"); + Receiver r2 = fix.session.createReceiver("amq.fanout"); + //send request + Sender s = fix.session.createSender("amq.fanout"); + for (uint i = 0; i < 20; ++i) { + s.send(Message((boost::format("MSG_%1%") % (i+1)).str())); + } + try { + ScopedSuppressLogging sl; + for (uint i = 0; i < 20; ++i) { + r1.fetch(Duration::SECOND); + } + BOOST_FAIL("Expected exception."); + } catch (const qpid::messaging::MessagingException&) { + } + + for (uint i = 0; i < 20; ++i) { + BOOST_CHECK_EQUAL(r2.fetch(Duration::SECOND).getContent(), (boost::format("MSG_%1%") % (i+1)).str()); + } +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |
