From 785bfe3e9a9e6afe5494e48d02be2665dc599bb8 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 12 Nov 2013 13:42:50 +0000 Subject: QPID-5251: allow policies to be specified that will create topics or queues on demand if they match the specified pattern git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1541059 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/amqp.cmake | 2 + qpid/cpp/src/qpid/broker/PersistableObject.cpp | 1 + qpid/cpp/src/qpid/broker/PersistableObject.h | 1 + qpid/cpp/src/qpid/broker/QueueSettings.cpp | 5 + qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp | 5 +- qpid/cpp/src/qpid/broker/amqp/BrokerContext.h | 5 +- qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp | 325 +++++++++++++++++++++++ qpid/cpp/src/qpid/broker/amqp/NodePolicy.h | 117 ++++++++ qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp | 31 ++- qpid/cpp/src/qpid/broker/amqp/Session.cpp | 44 ++- qpid/cpp/src/qpid/broker/amqp/Topic.cpp | 12 +- qpid/cpp/src/qpid/broker/amqp/Topic.h | 4 +- qpid/cpp/src/qpid/broker/management-schema.xml | 18 ++ qpid/cpp/src/qpid/sys/regex.h | 2 +- qpid/cpp/src/tests/policies.py | 209 +++++++++++++++ qpid/cpp/src/tests/swig_python_tests | 2 +- 16 files changed, 752 insertions(+), 31 deletions(-) create mode 100644 qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp create mode 100644 qpid/cpp/src/qpid/broker/amqp/NodePolicy.h create mode 100644 qpid/cpp/src/tests/policies.py (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index 52316d22b3..ba6e552cab 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -104,6 +104,8 @@ if (BUILD_AMQP) qpid/broker/amqp/ManagedOutgoingLink.cpp qpid/broker/amqp/Message.h qpid/broker/amqp/Message.cpp + qpid/broker/amqp/NodePolicy.h + qpid/broker/amqp/NodePolicy.cpp qpid/broker/amqp/NodeProperties.h qpid/broker/amqp/NodeProperties.cpp qpid/broker/amqp/Outgoing.h diff --git a/qpid/cpp/src/qpid/broker/PersistableObject.cpp b/qpid/cpp/src/qpid/broker/PersistableObject.cpp index 822f795954..575ef09270 100644 --- a/qpid/cpp/src/qpid/broker/PersistableObject.cpp +++ b/qpid/cpp/src/qpid/broker/PersistableObject.cpp @@ -33,6 +33,7 @@ PersistableObject::PersistableObject(const std::string& n, const std::string& t, PersistableObject::PersistableObject() : id(0) {} PersistableObject::~PersistableObject() {} const std::string& PersistableObject::getName() const { return name; } +const std::string& PersistableObject::getType() const { return type; } void PersistableObject::setPersistenceId(uint64_t i) const { id = i; } uint64_t PersistableObject::getPersistenceId() const { return id; } void PersistableObject::encode(framing::Buffer& buffer) const diff --git a/qpid/cpp/src/qpid/broker/PersistableObject.h b/qpid/cpp/src/qpid/broker/PersistableObject.h index 4d7e5e4498..da4bd44601 100644 --- a/qpid/cpp/src/qpid/broker/PersistableObject.h +++ b/qpid/cpp/src/qpid/broker/PersistableObject.h @@ -41,6 +41,7 @@ class PersistableObject : public PersistableConfig QPID_BROKER_EXTERN PersistableObject(const std::string& name, const std::string& type, const qpid::types::Variant::Map properties); QPID_BROKER_EXTERN virtual ~PersistableObject(); QPID_BROKER_EXTERN const std::string& getName() const; + QPID_BROKER_EXTERN const std::string& getType() const; QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const; QPID_BROKER_EXTERN uint64_t getPersistenceId() const; QPID_BROKER_EXTERN void encode(framing::Buffer& buffer) const; diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp index 53194cf064..8de8539579 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp +++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp @@ -62,6 +62,7 @@ const std::string FILTER("qpid.filter"); const std::string LIFETIME_POLICY("qpid.lifetime-policy"); const std::string DELETE_IF_UNUSED_KEY("delete-if-unused"); const std::string DELETE_IF_UNUSED_AND_EMPTY_KEY("delete-if-unused-and-empty"); +const std::string MANUAL("manual"); const std::string LVQ_LEGACY("qpid.last_value_queue"); const std::string LVQ_LEGACY_KEY("qpid.LVQ_key"); @@ -227,8 +228,12 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v } else if (key == LIFETIME_POLICY) { if (value.asString() == DELETE_IF_UNUSED_KEY) { lifetime = DELETE_IF_UNUSED; + autodelete = true; } else if (value.asString() == DELETE_IF_UNUSED_AND_EMPTY_KEY) { lifetime = DELETE_IF_UNUSED_AND_EMPTY; + autodelete = true; + } else if (value.asString() == MANUAL) { + autodelete = false; } else { QPID_LOG(warning, "Invalid value for " << LIFETIME_POLICY << ": " << value); } diff --git a/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp index b109da961e..9f7ae17293 100644 --- a/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp @@ -23,10 +23,11 @@ namespace qpid { namespace broker { namespace amqp { -BrokerContext::BrokerContext(Broker& b, Interconnects& i, TopicRegistry& t, const std::string& d) : broker(b), interconnects(i), topics(t), domain(d) {} -BrokerContext::BrokerContext(BrokerContext& c) : broker(c.broker), interconnects(c.interconnects), topics(c.topics), domain(c.domain) {} +BrokerContext::BrokerContext(Broker& b, Interconnects& i, TopicRegistry& t, NodePolicyRegistry& np, const std::string& d) : broker(b), interconnects(i), topics(t), nodePolicies(np), domain(d) {} +BrokerContext::BrokerContext(BrokerContext& c) : broker(c.broker), interconnects(c.interconnects), topics(c.topics), nodePolicies(c.nodePolicies), domain(c.domain) {} Broker& BrokerContext::getBroker() { return broker; } Interconnects& BrokerContext::getInterconnects() { return interconnects; } TopicRegistry& BrokerContext::getTopics() { return topics; } +NodePolicyRegistry& BrokerContext::getNodePolicies() { return nodePolicies; } std::string BrokerContext::getDomain() { return domain; } }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h index 81c449c68d..feb35e39c4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h +++ b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h @@ -29,22 +29,25 @@ class Broker; namespace amqp { class Interconnects; class TopicRegistry; +class NodePolicyRegistry; /** * Context providing access to broker scoped entities. */ class BrokerContext { public: - BrokerContext(Broker&, Interconnects&, TopicRegistry&, const std::string&); + BrokerContext(Broker&, Interconnects&, TopicRegistry&, NodePolicyRegistry&, const std::string&); BrokerContext(BrokerContext&); Broker& getBroker(); Interconnects& getInterconnects(); TopicRegistry& getTopics(); + NodePolicyRegistry& getNodePolicies(); std::string getDomain(); private: Broker& broker; Interconnects& interconnects; TopicRegistry& topics; + NodePolicyRegistry& nodePolicies; std::string domain; }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp b/qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp new file mode 100644 index 0000000000..6cefe36f67 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/NodePolicy.cpp @@ -0,0 +1,325 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/NodePolicy.h" +#include "qpid/broker/amqp/Connection.h" +#include "qpid/broker/amqp/Topic.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Exchange.h" +#include "qpid/types/Exception.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" + +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace qpid { +namespace broker { +namespace amqp { +namespace { +const std::string DURABLE("durable"); +const std::string AUTO_DELETE("auto-delete"); +const std::string LIFETIME_POLICY("qpid.lifetime-policy"); +const std::string MANUAL("manual"); +const std::string UNUSED("delete-if-unused"); +const std::string UNUSED_AND_EMPTY("delete-if-unused-and-empty"); +const std::string QUEUE_POLICY("QueuePolicy"); +const std::string TOPIC_POLICY("TopicPolicy"); +const std::string QUEUE("queue"); +const std::string TOPIC("topic"); +const std::string ALTERNATE_EXCHANGE("alternate-exchange"); +const std::string EXCHANGE_TYPE("exchange-type"); +const std::string QPID_MSG_SEQUENCE("qpid.msg_sequence"); +const std::string QPID_IVE("qpid.ive"); +const std::string EMPTY; + +template +T get(const std::string& k, const qpid::types::Variant::Map& m, T defaultValue) +{ + qpid::types::Variant::Map::const_iterator i = m.find(k); + if (i == m.end()) return defaultValue; + else return i->second; +} + +std::string getProperty(const std::string& k, const qpid::types::Variant::Map& m) +{ + return get(k, m, EMPTY); +} + +bool testProperty(const std::string& k, const qpid::types::Variant::Map& m) +{ + return get(k, m, false); +} + +qpid::types::Variant::Map filterForQueue(const qpid::types::Variant::Map& properties) +{ + qpid::types::Variant::Map filtered = properties; + filtered.erase(DURABLE); + filtered.erase(AUTO_DELETE); + filtered.erase(ALTERNATE_EXCHANGE); + return filtered; +} +qpid::types::Variant::Map filterForTopic(const qpid::types::Variant::Map& properties) +{ + qpid::types::Variant::Map filtered = properties; + filtered.erase(DURABLE); + filtered.erase(EXCHANGE_TYPE); + filtered.erase(AUTO_DELETE); + filtered.erase(QPID_IVE); + filtered.erase(QPID_MSG_SEQUENCE); + return filtered; +} +void copy(const std::string& key, const qpid::types::Variant::Map& from, qpid::types::Variant::Map& to) +{ + qpid::types::Variant::Map::const_iterator i = from.find(key); + if (i != from.end()) to.insert(*i); +} + +} +NodePolicy::NodePolicy(const std::string& type, const std::string& ptrn, const qpid::types::Variant::Map& props) + : PersistableObject(ptrn, type, props), pattern(ptrn), + durable(testProperty(DURABLE, props)), + alternateExchange(getProperty(ALTERNATE_EXCHANGE, props)), + compiled(pattern) {} + +NodePolicy::~NodePolicy() {} + +const std::string& NodePolicy::getPattern() const +{ + return pattern; +} + +bool NodePolicy::isDurable() const +{ + return durable; +} + +bool NodePolicy::match(const std::string& name) const +{ + return qpid::sys::regex_match(name, compiled); +} + +QueuePolicy::QueuePolicy(Broker& broker, const std::string& pattern, const qpid::types::Variant::Map& props) + : NodePolicy(QUEUE_POLICY, pattern, props), + queueSettings(durable, testProperty(AUTO_DELETE, props)) +{ + qpid::types::Variant::Map unused; + qpid::types::Variant::Map filtered = filterForQueue(props); + //if queue is not durable and neither lifetime policy nor + //autodelete were explicitly specified, clean it up when not + //needed by default: + if (!queueSettings.durable && props.find(LIFETIME_POLICY) == props.end() && props.find(AUTO_DELETE) == props.end()) { + filtered[LIFETIME_POLICY] = UNUSED_AND_EMPTY; + } + queueSettings.populate(filtered, unused); + qpid::amqp_0_10::translate(filtered, queueSettings.storeSettings); + + qpid::management::ManagementAgent* agent = broker.getManagementAgent(); + if (agent != 0) { + policy = _qmf::QueuePolicy::shared_ptr(new _qmf::QueuePolicy(agent, this, pattern)); + policy->set_properties(props); + agent->addObject(policy); + } +} +QueuePolicy::~QueuePolicy() +{ + if (policy != 0) policy->resourceDestroy(); +} + + +std::pair, boost::shared_ptr > QueuePolicy::create(const std::string& name, Connection& connection) +{ + std::pair, boost::shared_ptr > result; + result.first = connection.getBroker().createQueue(name, queueSettings, 0/*not exclusive*/, alternateExchange, connection.getUserId(), connection.getId()).first; + return result; +} + +boost::shared_ptr QueuePolicy::GetManagementObject() const +{ + return policy; +} + +TopicPolicy::TopicPolicy(Broker& broker, const std::string& pattern, const qpid::types::Variant::Map& props) + : NodePolicy(TOPIC_POLICY, pattern, props), exchangeType(getProperty(EXCHANGE_TYPE, props)), + autodelete(get(AUTO_DELETE, props, !durable)) +{ + qpid::types::Variant::Map::const_iterator i = props.find(LIFETIME_POLICY); + if (i != props.end()) { + if (i->second == MANUAL) { + autodelete = false; + } else if (i->second == UNUSED || i->second == UNUSED_AND_EMPTY/*though empty doesn't mean much for an exchange*/) { + autodelete = true; + } else { + QPID_LOG(warning, "Did not recognise lifetime policy " << i->second << " in topic policy for " << pattern); + } + } + topicSettings = filterForTopic(props); + copy(QPID_IVE, props, exchangeSettings); + copy(QPID_MSG_SEQUENCE, props, exchangeSettings); + if (exchangeType.empty()) exchangeType = TOPIC; + + qpid::management::ManagementAgent* agent = broker.getManagementAgent(); + if (agent != 0) { + policy = _qmf::TopicPolicy::shared_ptr(new _qmf::TopicPolicy(agent, this, pattern)); + policy->set_properties(props); + agent->addObject(policy); + } +} + +TopicPolicy::~TopicPolicy() +{ + if (policy != 0) policy->resourceDestroy(); +} + +std::pair, boost::shared_ptr > TopicPolicy::create(const std::string& name, Connection& connection) +{ + std::pair, boost::shared_ptr > result; + qpid::framing::FieldTable args; + qpid::amqp_0_10::translate(exchangeSettings, args); + boost::shared_ptr exchange = connection.getBroker().createExchange(name, exchangeType, isDurable(), autodelete, alternateExchange, + args, connection.getUserId(), connection.getId()).first; + result.second = connection.getTopics().createTopic(connection.getBroker(), name, exchange, topicSettings); + return result; +} + +boost::shared_ptr TopicPolicy::GetManagementObject() const +{ + return policy; +} + +boost::shared_ptr NodePolicyRegistry::createQueuePolicy(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties) +{ + boost::shared_ptr nodePolicy(new QueuePolicy(broker, name, properties)); + add(nodePolicy); + return nodePolicy; +} + +boost::shared_ptr NodePolicyRegistry::createTopicPolicy(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties) +{ + boost::shared_ptr nodePolicy(new TopicPolicy(broker, name, properties)); + add(nodePolicy); + return nodePolicy; +} + +boost::shared_ptr NodePolicyRegistry::createNodePolicy(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties) +{ + if (type == QUEUE_POLICY) { + return createQueuePolicy(broker, name, properties); + } else if (type == TOPIC_POLICY) { + return createTopicPolicy(broker, name, properties); + } else { + return boost::shared_ptr(); + } +} + +bool NodePolicyRegistry::createObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& /*userId*/, const std::string& /*connectionId*/) +{ + boost::shared_ptr nodePolicy = createNodePolicy(broker, type, name, properties); + if (nodePolicy) { + if (nodePolicy->isDurable()) broker.getStore().create(*nodePolicy); + return true; + } else { + return false; + } +} +bool NodePolicyRegistry::deleteObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map&, + const std::string& /*userId*/, const std::string& /*connectionId*/) +{ + if (type == QUEUE_POLICY || type == TOPIC_POLICY) { + boost::shared_ptr nodePolicy = remove(name, type); + if (nodePolicy) { + if (nodePolicy->isDurable()) broker.getStore().destroy(*nodePolicy); + return true; + } else { + return false; + } + } else { + return false; + } +} +bool NodePolicyRegistry::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + uint64_t persistenceId) +{ + + boost::shared_ptr nodePolicy = createNodePolicy(broker, type, name, properties); + if (nodePolicy) { + nodePolicy->setPersistenceId(persistenceId); + return true; + } else { + return false; + } +} + +void NodePolicyRegistry::add(boost::shared_ptr nodePolicy) +{ + qpid::sys::Mutex::ScopedLock l(lock); + NodePolicies::const_iterator i = nodePolicies.find(nodePolicy->getName()); + if (i == nodePolicies.end()) { + nodePolicies.insert(NodePolicies::value_type(nodePolicy->getName(), nodePolicy)); + } else { + if (i->second->getType() != nodePolicy->getType()) { + throw qpid::types::Exception(QPID_MSG("Cannot create object of type " << nodePolicy->getType() << " with key " + << nodePolicy->getName() << " as an object of type " << i->second->getType() << " already exists with the same key")); + } else { + throw qpid::types::Exception(QPID_MSG("An object of type " << nodePolicy->getType() << " with key " << nodePolicy->getName() << " already exists")); + } + } +} +boost::shared_ptr NodePolicyRegistry::remove(const std::string& pattern, const std::string& type) +{ + boost::shared_ptr result; + qpid::sys::Mutex::ScopedLock l(lock); + NodePolicies::iterator i = nodePolicies.find(pattern); + if (i != nodePolicies.end()) { + if (i->second->getType() != type) { + throw qpid::types::Exception(QPID_MSG("Object with key " << i->first << " is of type " << i->second->getType() << " not " << type)); + } + result = i->second; + nodePolicies.erase(i); + } + return result; +} +boost::shared_ptr NodePolicyRegistry::get(const std::string& pattern) +{ + qpid::sys::Mutex::ScopedLock l(lock); + NodePolicies::const_iterator i = nodePolicies.find(pattern); + if (i == nodePolicies.end()) { + return boost::shared_ptr(); + } else { + return i->second; + } +} + +boost::shared_ptr NodePolicyRegistry::match(const std::string& name) +{ + qpid::sys::Mutex::ScopedLock l(lock); + boost::shared_ptr best; + for (NodePolicies::const_iterator i = nodePolicies.begin(); i != nodePolicies.end(); ++i) { + //where multiple policies match, pick the one with the longest + //pattern as a crude guesstimate of the more specific one + if (i->second->match(name) && (!best || i->first.size() > best->getPattern().size())) { + best = i->second; + } + } + return best; +} + +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/NodePolicy.h b/qpid/cpp/src/qpid/broker/amqp/NodePolicy.h new file mode 100644 index 0000000000..d6e987d85f --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/NodePolicy.h @@ -0,0 +1,117 @@ +#ifndef QPID_BROKER_AMQP_NODEPOLICY_H +#define QPID_BROKER_AMQP_NODEPOLICY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/ObjectFactory.h" +#include "qpid/broker/PersistableObject.h" +#include "qpid/broker/QueueSettings.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/regex.h" +#include "qpid/types/Variant.h" +#include "qpid/management/Manageable.h" +#include "qmf/org/apache/qpid/broker/QueuePolicy.h" +#include "qmf/org/apache/qpid/broker/TopicPolicy.h" +#include + +namespace qpid { +namespace broker { +class Broker; +class Queue; +namespace amqp { +class Connection; +class Topic; + +/** + * Policy for creation of nodes 'on-demand' + */ +class NodePolicy : public PersistableObject, public management::Manageable +{ + public: + NodePolicy(const std::string& type, const std::string& ptrn, const qpid::types::Variant::Map& props); + virtual ~NodePolicy(); + const std::string& getPattern() const; + bool match(const std::string&) const; + bool isDurable() const; + virtual std::pair, boost::shared_ptr > create(const std::string&, Connection&) = 0; + virtual boost::shared_ptr GetManagementObject() const = 0; + protected: + NodePolicy(Broker&, const std::string& type, const std::string& pattern, const qpid::types::Variant::Map& properties); + const std::string pattern; + bool durable; + std::string alternateExchange; + qpid::sys::regex compiled; +}; + +class QueuePolicy : public NodePolicy +{ + public: + QueuePolicy(Broker&, const std::string& pattern, const qpid::types::Variant::Map& properties); + ~QueuePolicy(); + std::pair, boost::shared_ptr > create(const std::string&, Connection&); + boost::shared_ptr GetManagementObject() const; + private: + qpid::broker::QueueSettings queueSettings; + qmf::org::apache::qpid::broker::QueuePolicy::shared_ptr policy; +}; + +class TopicPolicy : public NodePolicy +{ + public: + TopicPolicy(Broker&, const std::string& pattern, const qpid::types::Variant::Map& properties); + ~TopicPolicy(); + std::pair, boost::shared_ptr > create(const std::string&, Connection&); + boost::shared_ptr GetManagementObject() const; + private: + qpid::types::Variant::Map topicSettings; + std::string exchangeType; + bool autodelete; + qpid::types::Variant::Map exchangeSettings; + qmf::org::apache::qpid::broker::TopicPolicy::shared_ptr policy; +}; + +class NodePolicyRegistry : public ObjectFactory +{ + public: + bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId); + bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId); + bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + uint64_t persistenceId); + + boost::shared_ptr match(const std::string& name); + boost::shared_ptr createQueuePolicy(Broker&, const std::string& name, const qpid::types::Variant::Map& properties); + boost::shared_ptr createTopicPolicy(Broker&, const std::string& name, const qpid::types::Variant::Map& properties); + private: + typedef std::map > NodePolicies; + qpid::sys::Mutex lock; + NodePolicies nodePolicies; + + boost::shared_ptr createNodePolicy(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties); + void add(boost::shared_ptr nodePolicy); + boost::shared_ptr remove(const std::string& pattern, const std::string& type); + boost::shared_ptr get(const std::string& pattern); +}; + +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_NODEPOLICY_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp index d0311c34d2..cd31ef7788 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp @@ -29,6 +29,7 @@ #include "qpid/broker/amqp/Connection.h" #include "qpid/broker/amqp/Interconnects.h" #include "qpid/broker/amqp/Message.h" +#include "qpid/broker/amqp/NodePolicy.h" #include "qpid/broker/amqp/Sasl.h" #include "qpid/broker/amqp/Topic.h" #include "qpid/broker/amqp/Translation.h" @@ -44,22 +45,27 @@ namespace amqp { struct Options : public qpid::Options { std::string domain; + std::vector queuePatterns; + std::vector topicPatterns; Options() : qpid::Options("AMQP 1.0 Options") { addOptions() - ("domain", optValue(domain, "DOMAIN"), "Domain of this broker"); + ("domain", optValue(domain, "DOMAIN"), "Domain of this broker") + ("queue-patterns", optValue(queuePatterns, "PATTERN"), "Pattern for on-demand queues") + ("topic-patterns", optValue(topicPatterns, "PATTERN"), "Pattern for on-demand topics"); } }; class ProtocolImpl : public BrokerContext, public Protocol { public: - ProtocolImpl(Interconnects* interconnects, TopicRegistry* topics, Broker& broker, const std::string& domain) - : BrokerContext(broker, *interconnects, *topics, domain) + ProtocolImpl(Interconnects* interconnects, TopicRegistry* topics, NodePolicyRegistry* policies, Broker& broker, const std::string& domain) + : BrokerContext(broker, *interconnects, *topics, *policies, domain) { interconnects->setContext(*this); broker.getObjectFactoryRegistry().add(interconnects);//registry deletes on shutdown broker.getObjectFactoryRegistry().add(topics);//registry deletes on shutdown + broker.getObjectFactoryRegistry().add(policies);//registry deletes on shutdown } qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); boost::intrusive_ptr translate(const qpid::broker::Message&); @@ -71,18 +77,33 @@ struct ProtocolPlugin : public Plugin { Options options; Options* getOptions() { return &options; } + NodePolicyRegistry* policies; + + ProtocolPlugin() : policies(0) {} void earlyInitialize(Plugin::Target& target) { //need to register protocol before recovery from store broker::Broker* broker = dynamic_cast(&target); if (broker) { - ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), new TopicRegistry(), *broker, options.domain); + policies = new NodePolicyRegistry(); + ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), new TopicRegistry(), policies, *broker, options.domain); broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown } } - void initialize(Plugin::Target&) {} + void initialize(Plugin::Target& target) + { + broker::Broker* broker = dynamic_cast(&target); + if (broker) { + for (std::vector::const_iterator i = options.queuePatterns.begin(); i != options.queuePatterns.end(); ++i) { + policies->createQueuePolicy(*broker, *i, qpid::types::Variant::Map()); + } + for (std::vector::const_iterator i = options.topicPatterns.begin(); i != options.topicPatterns.end(); ++i) { + policies->createTopicPolicy(*broker, *i, qpid::types::Variant::Map()); + } + } + } }; ProtocolPlugin instance; // Static initialization diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 7170da0797..ab677faac3 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -26,6 +26,7 @@ #include "Domain.h" #include "Exception.h" #include "Interconnects.h" +#include "NodePolicy.h" #include "Relay.h" #include "Topic.h" #include "qpid/amqp/descriptors.h" @@ -260,19 +261,36 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first; } } else { - size_t i = name.find('@'); - if (i != std::string::npos && (i+1) < name.length()) { - std::string domain = name.substr(i+1); - std::string local = name.substr(0, i); - std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str(); - //does this domain exist? - boost::shared_ptr d = connection.getInterconnects().findDomain(domain); - if (d) { - node.relay = boost::shared_ptr(new Relay(1000)); - if (incoming) { - d->connect(false, id, name, local, connection, node.relay); - } else { - d->connect(true, id, local, name, connection, node.relay); + boost::shared_ptr nodePolicy = connection.getNodePolicies().match(name); + if (nodePolicy) { + std::pair, boost::shared_ptr > result = nodePolicy->create(name, connection); + node.queue = result.first; + node.topic = result.second; + if (node.topic) node.exchange = node.topic->getExchange(); + + if (node.queue) { + QPID_LOG(info, "Created queue " << name << " from policy with pattern " << nodePolicy->getPattern()); + } else if (node.topic) { + QPID_LOG(info, "Created topic " << name << " from policy with pattern " << nodePolicy->getPattern()); + } else { + QPID_LOG(debug, "Created neither a topic nor a queue for " << name << " from policy with pattern " << nodePolicy->getPattern()); + } + + } else { + size_t i = name.find('@'); + if (i != std::string::npos && (i+1) < name.length()) { + std::string domain = name.substr(i+1); + std::string local = name.substr(0, i); + std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str(); + //does this domain exist? + boost::shared_ptr d = connection.getInterconnects().findDomain(domain); + if (d) { + node.relay = boost::shared_ptr(new Relay(1000)); + if (incoming) { + d->connect(false, id, name, local, connection, node.relay); + } else { + d->connect(true, id, local, name, connection, node.relay); + } } } } diff --git a/qpid/cpp/src/qpid/broker/amqp/Topic.cpp b/qpid/cpp/src/qpid/broker/amqp/Topic.cpp index c04f62b3d1..4e3de21c74 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Topic.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Topic.cpp @@ -58,8 +58,8 @@ qpid::types::Variant::Map filter(const qpid::types::Variant::Map& properties) } } -Topic::Topic(Broker& broker, const std::string& n, const qpid::types::Variant::Map& properties) - : PersistableObject(n, TOPIC, properties), name(n), durable(testProperty(DURABLE, properties)), exchange(broker.getExchanges().get(getProperty(EXCHANGE, properties))), +Topic::Topic(Broker& broker, const std::string& n, boost::shared_ptr e, const qpid::types::Variant::Map& properties) + : PersistableObject(n, TOPIC, properties), name(n), durable(testProperty(DURABLE, properties)), exchange(e), alternateExchange(getProperty(ALTERNATE_EXCHANGE, properties)) { if (exchange->getName().empty()) throw qpid::Exception("Exchange must be specified."); @@ -107,9 +107,9 @@ const std::string& Topic::getAlternateExchange() const { return alternateExchange; } -boost::shared_ptr TopicRegistry::createTopic(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties) +boost::shared_ptr TopicRegistry::createTopic(Broker& broker, const std::string& name, boost::shared_ptr exchange, const qpid::types::Variant::Map& properties) { - boost::shared_ptr topic(new Topic(broker, name, properties)); + boost::shared_ptr topic(new Topic(broker, name, exchange, properties)); add(topic); topic->getExchange()->setDeletionListener(name, boost::bind(&TopicRegistry::remove, this, name)); return topic; @@ -119,7 +119,7 @@ bool TopicRegistry::createObject(Broker& broker, const std::string& type, const const std::string& /*userId*/, const std::string& /*connectionId*/) { if (type == TOPIC) { - boost::shared_ptr topic = createTopic(broker, name, props); + boost::shared_ptr topic = createTopic(broker, name, broker.getExchanges().get(getProperty(EXCHANGE, props)), props); if (topic->isDurable()) broker.getStore().create(*topic); return true; } else { @@ -147,7 +147,7 @@ bool TopicRegistry::recoverObject(Broker& broker, const std::string& type, const uint64_t persistenceId) { if (type == TOPIC) { - boost::shared_ptr topic = createTopic(broker, name, properties); + boost::shared_ptr topic = createTopic(broker, name, broker.getExchanges().get(getProperty(EXCHANGE, properties)), properties); topic->setPersistenceId(persistenceId); return true; } else { diff --git a/qpid/cpp/src/qpid/broker/amqp/Topic.h b/qpid/cpp/src/qpid/broker/amqp/Topic.h index e08830ba0f..df16f4a738 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Topic.h +++ b/qpid/cpp/src/qpid/broker/amqp/Topic.h @@ -47,7 +47,7 @@ namespace amqp { class Topic : public PersistableObject, public management::Manageable { public: - Topic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties); + Topic(Broker&, const std::string& name, boost::shared_ptr, const qpid::types::Variant::Map& properties); ~Topic(); const std::string& getName() const; const QueueSettings& getPolicy() const; @@ -77,12 +77,12 @@ class TopicRegistry : public ObjectFactory bool add(boost::shared_ptr topic); boost::shared_ptr remove(const std::string& name); boost::shared_ptr get(const std::string& name); + boost::shared_ptr createTopic(Broker&, const std::string& name, boost::shared_ptr exchange, const qpid::types::Variant::Map& properties); private: typedef std::map > Topics; qpid::sys::Mutex lock; Topics topics; - boost::shared_ptr createTopic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties); }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/management-schema.xml b/qpid/cpp/src/qpid/broker/management-schema.xml index 4d66b72318..bf6514b855 100644 --- a/qpid/cpp/src/qpid/broker/management-schema.xml +++ b/qpid/cpp/src/qpid/broker/management-schema.xml @@ -437,6 +437,24 @@ + + + + + + + + + +