diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2010-07-05 11:15:02 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2010-07-05 11:15:02 +0000 |
| commit | 4145f04675f48c3df16e3ac83ddc188ac3ec87cf (patch) | |
| tree | b97d82c9325e94142097b4a4ee9bd8077d535850 /java | |
| parent | 56089d6922fe520311a3610c99a52585bad943b8 (diff) | |
| download | qpid-python-4145f04675f48c3df16e3ac83ddc188ac3ec87cf.tar.gz | |
QPID-2581 : Process new topic configuration
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@960544 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 309 insertions, 21 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java new file mode 100644 index 0000000000..b4ccc75f10 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; + +public class TopicConfig extends ConfigurationPlugin +{ + @Override + public String[] getElementsProcessed() + { + return new String[]{"name", "subscriptionName"}; + } + + public String getName() + { + // If we don't have a specific topic then this config is for all topics. + return getStringValue("name", "#"); + } + + public String getSubscriptionName() + { + return getStringValue("subscriptionName"); + } + + public void validateConfiguration() throws ConfigurationException + { + if (_configuration.isEmpty()) + { + throw new ConfigurationException("Topic section cannot be empty."); + } + + if (getStringValue("name") == null && getSubscriptionName() == null) + { + throw new ConfigurationException("Topic section must have a 'name' or 'subscriptionName' element."); + } + + System.err.println("********* Created TC:"+this); + } + + + @Override + public String formatToString() + { + String response = "Topic:"+getName(); + if (getSubscriptionName() != null) + { + response += ", SubscriptionName:"+getSubscriptionName(); + } + + return response; + } +}
\ No newline at end of file diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java index 0218bf7273..a16198788f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java @@ -22,8 +22,11 @@ package org.apache.qpid.server.configuration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.queue.AMQQueue; import java.util.Arrays; import java.util.HashMap; @@ -31,7 +34,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -public class TopicConfiguration extends ConfigurationPlugin +public class TopicConfiguration extends ConfigurationPlugin implements ExchangeConfigurationPlugin { public static final ConfigurationPluginFactory FACTORY = new TopicConfigurationFactory(); @@ -54,6 +57,7 @@ public class TopicConfiguration extends ConfigurationPlugin } Map<String, TopicConfig> _topics = new HashMap<String, TopicConfig>(); + Map<String, Map<String, TopicConfig>> _subscriptions = new HashMap<String, Map<String, TopicConfig>>(); public String[] getElementsProcessed() { @@ -68,53 +72,186 @@ public class TopicConfiguration extends ConfigurationPlugin throw new ConfigurationException("Topics section cannot be empty."); } - int topics = _configuration.getList("topic.name").size(); + int topics = _configuration.getList("topic.name").size() + + _configuration.getList("topic.subscriptionName").size(); - for(int index=0; index<topics;index++) + for (int index = 0; index < topics; index++) { TopicConfig topic = new TopicConfig(); - topic.setConfiguration(VIRTUALHOSTS_VIRTUALHOST_TOPICS + ".topic", _configuration.subset("topic(" + index + ")")); - String topicName = _configuration.getString("topic(" + index + ").name"); - if(_topics.containsKey(topicName)) + Configuration topicSubset = _configuration.subset("topic(" + index + ")"); + + // This will occur when we have a subscriptionName that is bound to a + // topic. + if (topicSubset.isEmpty()) + { + break; + } + + topic.setConfiguration(VIRTUALHOSTS_VIRTUALHOST_TOPICS + ".topic", topicSubset ); + + String name = _configuration.getString("topic(" + index + ").name"); + String subscriptionName = _configuration.getString("topic(" + index + ").subscriptionName"); + + // Record config if subscriptionName is there + if (subscriptionName != null) { - throw new ConfigurationException("Topics section cannot contain two entries for the same topic."); + processSubscription(subscriptionName, topic); } else { - _topics.put(topicName, topic); + // Otherwise record config as topic if we have the name + if (name != null) + { + processTopic(name, topic); + } } } } - public String toString() + /** + * @param name + * @param topic + * + * @throws org.apache.commons.configuration.ConfigurationException + * + */ + private void processTopic(String name, TopicConfig topic) throws ConfigurationException { - return getClass().getName() + ": Defined Topics:" + _topics.size(); + if (_topics.containsKey(name)) + { + throw new ConfigurationException("Topics section cannot contain two entries for the same topic."); + } + else + { + _topics.put(name, topic); + } } - public static class TopicConfig extends ConfigurationPlugin + + private void processSubscription(String name, TopicConfig topic) throws ConfigurationException { - @Override - public String[] getElementsProcessed() + Map<String,TopicConfig> topics; + if (_subscriptions.containsKey(name)) + { + topics = _subscriptions.get(name); + + if (topics.containsKey(topic.getName())) + { + throw new ConfigurationException("Subcription cannot contain two entries for the same topic."); + } + } + else { - return new String[]{"name"}; + topics = new HashMap<String,TopicConfig>(); } - public String getName() + topics.put(topic.getName(),topic); + _subscriptions.put(name, topics); + + } + + @Override + public String formatToString() + { + return "Topics:" + _topics + ", Subscriptions:" + _subscriptions; + } + + /** + * This processes the given queue and apply configuration in the following + * order: + * + * Global Topic Values -> Topic Values -> Subscription Values + * + * @param queue + * + * @return + */ + public ConfigurationPlugin getConfiguration(AMQQueue queue) + { + TopicConfig config = new TopicConfig(); + + // Add global topic configuration + config.addConfiguration(this); + + // Process Topic Bindings as these are more generic than subscriptions + List<TopicConfig> boundToTopics = new LinkedList<TopicConfig>(); + + //Merge the configuration in the order that they are bound + for (Binding binding : queue.getBindings()) { - // If we don't specify a topic name then match all topics - String configName = getStringValue("name"); - return configName == null ? "#" : configName; + if (binding.getExchange().getType().equals(TopicExchange.TYPE)) + { + // Identify topic for the binding key + TopicConfig topicConfig = getTopicConfigForRoutingKey(binding.getBindingKey()); + if (topicConfig != null) + { + boundToTopics.add(topicConfig); + } + } } + // If the Queue is bound to a number of topics then only use the global + // topic configuration. + // todo - What does it mean in terms of configuration to be bound to a + // number of topics? Do we try and merge? + // YES - right thing to do would be to merge from generic to specific. + // Means we need to be able to get an ordered list of topics for this + // binding. + if (boundToTopics.size() == 1) + { + config.addConfiguration(boundToTopics.get(0)); + } - public void validateConfiguration() throws ConfigurationException + // Apply subscription configurations + if (_subscriptions.containsKey(queue.getName())) { - if(_configuration.isEmpty()) + Map<String, TopicConfig> topics = _subscriptions.get(queue.getName()); + + TopicConfig subscriptionSpecificConfig = null; + + // See if we have a TopicConfig in topics for a topic we are bound to. + for (Binding binding : queue.getBindings()) + { + if (binding.getExchange().getType().equals(TopicExchange.TYPE)) + { + //todo - What does it mean to have multiple matches? + // Take the first match we get + if (subscriptionSpecificConfig == null) + { + // lookup the binding to see if we have a match in the subscription configs + subscriptionSpecificConfig = topics.get(binding.getBindingKey()); + } + } + } + + // Apply subscription specfic config. + if (subscriptionSpecificConfig != null) { - throw new ConfigurationException("Topic section cannot be empty."); + config.addConfiguration(subscriptionSpecificConfig); } } + + return config; + } + + /** + * This method should perform the same heuristics as the TopicExchange + * to attempt to identify a piece of configuration for the give routingKey. + * + * i.e. If we have 'stocks.*' defined in the config + * and we bind 'stocks.appl' then we should return the 'stocks.*' + * configuration. + * + * @param routingkey the key to lookup + * + * @return the TopicConfig if found. + */ + private TopicConfig getTopicConfigForRoutingKey(String routingkey) + { + //todo actually perform TopicExchange style lookup not just straight + // lookup as we are just now. + return _topics.get(routingkey); } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java new file mode 100644 index 0000000000..2cc49f7c93 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/TopicConfigurationTest.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQSecurityException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.AMQQueueFactory; +import org.apache.qpid.server.util.InternalBrokerBaseCase; + +public class TopicConfigurationTest extends InternalBrokerBaseCase +{ + + @Override + public void configure() + { + _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic.name", "stocks.nyse.appl"); + + _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic(1).subscriptionName", "testSubscriptionCreation:stockSubscription"); + + _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic(2).name", "stocks.nyse.orcl"); + _configXml.addProperty("virtualhosts.virtualhost.test.topics.topic(2).subscriptionName", getName()+":stockSubscription"); + } + + public void testTopicCreation() throws ConfigurationException, AMQSecurityException + { + Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME); + _virtualHost.getBindingFactory().addBinding("stocks.nyse.appl", _queue, topicExchange, null); + + TopicConfig config = _queue.getConfiguration().getConfiguration(TopicConfig.class.getName()); + + assertNotNull("Queue should have topic configuration bound to it.", config); + assertEquals("Configuration name not correct", "stocks.nyse.appl", config.getName()); + } + + public void testSubscriptionCreation() throws ConfigurationException, AMQException + { + + AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"), false, new AMQShortString("testowner"), + false, false, _virtualHost, null); + + _virtualHost.getQueueRegistry().registerQueue(queue); + Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); + _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange, null); + + + Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME); + _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange, null); + + TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName()); + + assertNotNull("Queue should have topic configuration bound to it.", config); + assertEquals("Configuration name not correct", getName() + ":stockSubscription", config.getSubscriptionName()); + } + + +} |
