diff options
Diffstat (limited to 'java')
21 files changed, 335 insertions, 98 deletions
diff --git a/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java b/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java index 59fbaa4a34..e6e0059902 100644 --- a/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java +++ b/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/AccessControl.java @@ -60,7 +60,7 @@ public class AccessControl extends AbstractPlugin public AccessControl newInstance(ConfigurationPlugin config) throws ConfigurationException { - AccessControlConfiguration configuration = config.getConfiguration(AccessControlConfiguration.class); + AccessControlConfiguration configuration = config.getConfiguration(AccessControlConfiguration.class.getName()); // If there is no configuration for this plugin then don't load it. if (configuration == null) diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java index c094c7eb6d..0949204a33 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java @@ -95,7 +95,7 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin "('messageAge','depth' or 'messageCount') must be specified."); } - SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class); + SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName()); PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class); diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java index 77819f8dbf..cac52c2fdf 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java @@ -40,7 +40,7 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { public SlowConsumerDetection newInstance(VirtualHost vhost) { - SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class); + SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName()); if (config == null) { @@ -74,7 +74,7 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin try { SlowConsumerDetectionQueueConfiguration config = - q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class); + q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); if (checkQueueStatus(q, config)) { diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java index 3c67f6e6d7..c5275aa0bf 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java @@ -45,7 +45,7 @@ public class TopicDeletePolicy implements SlowConsumerPolicyPlugin public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException { TopicDeletePolicyConfiguration config = - configuration.getConfiguration(TopicDeletePolicyConfiguration.class); + configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName()); TopicDeletePolicy policy = new TopicDeletePolicy(); policy.configure(config); diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java index f0be3d2db0..513bafa8ad 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java @@ -47,7 +47,7 @@ import java.util.concurrent.TimeUnit; * Slow consumers should on a topic should expect to receive a * 506 : Resource Error if the hit a predefined threshold. */ -public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener +public class GlobalQueuesTest extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener { Topic _destination; private CountDownLatch _disconnectionLatch = new CountDownLatch(1); @@ -59,6 +59,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis private Exception _publisherError = null; private JMSException _connectionException = null; private static final long JOIN_WAIT = 5000; + protected String CONFIG_SECTION = ".queues"; @Override public void setUp() throws IOException, ConfigurationException, NamingException @@ -71,32 +72,26 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis + getConnectionURL().getVirtualHost().substring(1) + ".slow-consumer-detection.timeunit", "SECONDS"); - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "policy.name", "TopicDelete"); - - /** * Queue Configuration <slow-consumer-detection> - <!-- The depth before which the policy will be applied--> - <depth>4235264</depth> - - <!-- The message age before which the policy will be applied--> - <messageAge>600000</messageAge> - - <!-- The number of message before which the policy will be applied--> - <messageCount>50</messageCount> - - <!-- Policies configuration --> - <policy> - <name>TopicDelete</name> - <topicDelete> - <delete-persistent/> - </topicDelete> - </policy> + <!-- The depth before which the policy will be applied--> + <depth>4235264</depth> + + <!-- The message age before which the policy will be applied--> + <messageAge>600000</messageAge> + + <!-- The number of message before which the policy will be applied--> + <messageCount>50</messageCount> + + <!-- Policies configuration --> + <policy> + <name>TopicDelete</name> + <topicDelete> + <delete-persistent/> + </topicDelete> + </policy> </slow-consumer-detection> */ @@ -105,8 +100,8 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis * VirtualHost Plugin Configuration <slow-consumer-detection> - <delay>1</delay> - <timeunit>MINUTES</timeunit> + <delay>1</delay> + <timeunit>MINUTES</timeunit> </slow-consumer-detection> */ @@ -132,6 +127,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis * Clients should not have to modify their code based on the protocol in use. * * @param ackMode @see javax.jms.Session + * * @throws Exception */ public void topicConsumer(int ackMode, boolean durable) throws Exception @@ -260,6 +256,27 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis } + public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException + { + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + CONFIG_SECTION + ".slow-consumer-detection." + + "policy.name", "TopicDelete"); + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + CONFIG_SECTION + ".slow-consumer-detection." + + property, value); + + if (deleteDurable) + { + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + CONFIG_SECTION + ".slow-consumer-detection." + + "policy.topicdelete.delete-persistent", ""); + } + } + /** * Test that setting messageCount takes affect on topics * @@ -271,10 +288,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis { MAX_QUEUE_MESSAGE_COUNT = 10; - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1)); + setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), false); //Start the broker super.setUp(); @@ -295,10 +309,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis { MAX_QUEUE_MESSAGE_COUNT = 10; - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "depth", String.valueOf(MESSAGE_SIZE * 9)); + setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), false); //Start the broker super.setUp(); @@ -321,10 +332,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis { MAX_QUEUE_MESSAGE_COUNT = 10; - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "messageAge", String.valueOf(DISCONNECTION_WAIT / 2)); + setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 2), false); //Start the broker super.setUp(); @@ -346,15 +354,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis { MAX_QUEUE_MESSAGE_COUNT = 10; - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1)); - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); + setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true); //Start the broker super.setUp(); @@ -377,15 +377,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis { MAX_QUEUE_MESSAGE_COUNT = 10; - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "depth", String.valueOf(MESSAGE_SIZE * 9)); - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); + setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true); //Start the broker super.setUp(); @@ -395,7 +387,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis topicConsumer(Session.AUTO_ACKNOWLEDGE, true); } - /** + /** * Test that setting messageAge has an effect on topics * * Ensure we set the delete-persistent option @@ -410,15 +402,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis { MAX_QUEUE_MESSAGE_COUNT = 10; - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "messageAge", String.valueOf(DISCONNECTION_WAIT / 5)); - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".queues.slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); + setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true); //Start the broker super.setUp(); @@ -438,6 +422,7 @@ public class SlowConsumerTest extends QpidBrokerTestCase implements ExceptionLis _disconnectionLatch.countDown(); } + /// Connection Listener public void bytesSent(long count) diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java new file mode 100644 index 0000000000..1f8103fa3c --- /dev/null +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java @@ -0,0 +1,36 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; + +import javax.naming.NamingException; +import java.io.IOException; + +public class GlobalTopicsTest extends GlobalQueuesTest +{ + @Override + public void setUp() throws NamingException, IOException, ConfigurationException + { + CONFIG_SECTION = ".topics"; + super.setUp(); + } +} diff --git a/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java b/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java index 6fe0d03741..ae2baa95ca 100644 --- a/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java +++ b/java/broker-plugins/firewall/src/main/java/org/apache/qpid/server/security/access/plugins/Firewall.java @@ -44,7 +44,7 @@ public class Firewall extends AbstractPlugin { public Firewall newInstance(ConfigurationPlugin config) throws ConfigurationException { - FirewallConfiguration configuration = config.getConfiguration(FirewallConfiguration.class); + FirewallConfiguration configuration = config.getConfiguration(FirewallConfiguration.class.getName()); // If there is no configuration for this plugin then don't load it. if (configuration == null) diff --git a/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java b/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java index 1bf8761978..c9a476c5f2 100644 --- a/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java +++ b/java/broker-plugins/simple-xml/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java @@ -52,7 +52,7 @@ public class SimpleXML extends AbstractPlugin { public SimpleXML newInstance(ConfigurationPlugin config) throws ConfigurationException { - SimpleXMLConfiguration configuration = config.getConfiguration(SimpleXMLConfiguration.class); + SimpleXMLConfiguration configuration = config.getConfiguration(SimpleXMLConfiguration.class.getName()); // If there is no configuration for this plugin then don't load it. if (configuration == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java index 50377eaf52..b24a326ed3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java @@ -32,6 +32,8 @@ import org.apache.qpid.server.configuration.BindingConfig; import org.apache.qpid.server.configuration.BindingConfigType; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; +import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.BindingMessages; @@ -201,6 +203,15 @@ public class BindingFactory exchange.addBinding(b); getConfigStore().addConfiguredObject(b); b.logCreation(); + + //Reconfigure the queue for to reflect this new binding. + ConfigurationPlugin config = queue.getVirtualHost().getConfiguration().getQueueConfiguration(queue); + + if (config != null) + { + // Reconfigure with new config. + queue.configure(config); + } return true; } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java new file mode 100644 index 0000000000..0218bf7273 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class TopicConfiguration extends ConfigurationPlugin +{ + public static final ConfigurationPluginFactory FACTORY = new TopicConfigurationFactory(); + + private static final String VIRTUALHOSTS_VIRTUALHOST_TOPICS = "virtualhosts.virtualhost.topics"; + + public static class TopicConfigurationFactory implements ConfigurationPluginFactory + { + + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + TopicConfiguration topicsConfig = new TopicConfiguration(); + topicsConfig.setConfiguration(path, config); + return topicsConfig; + } + + public List<String> getParentPaths() + { + return Arrays.asList(VIRTUALHOSTS_VIRTUALHOST_TOPICS); + } + } + + Map<String, TopicConfig> _topics = new HashMap<String, TopicConfig>(); + + public String[] getElementsProcessed() + { + return new String[]{"topic"}; + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + if (_configuration.isEmpty()) + { + throw new ConfigurationException("Topics section cannot be empty."); + } + + int topics = _configuration.getList("topic.name").size(); + + for(int index=0; index<topics;index++) + { + TopicConfig topic = new TopicConfig(); + topic.setConfiguration(VIRTUALHOSTS_VIRTUALHOST_TOPICS + ".topic", _configuration.subset("topic(" + index + ")")); + + String topicName = _configuration.getString("topic(" + index + ").name"); + if(_topics.containsKey(topicName)) + { + throw new ConfigurationException("Topics section cannot contain two entries for the same topic."); + } + else + { + _topics.put(topicName, topic); + } + } + } + + public String toString() + { + return getClass().getName() + ": Defined Topics:" + _topics.size(); + } + + public static class TopicConfig extends ConfigurationPlugin + { + @Override + public String[] getElementsProcessed() + { + return new String[]{"name"}; + } + + public String getName() + { + // If we don't specify a topic name then match all topics + String configName = getStringValue("name"); + return configName == null ? "#" : configName; + } + + + public void validateConfiguration() throws ConfigurationException + { + if(_configuration.isEmpty()) + { + throw new ConfigurationException("Topic section cannot be empty."); + } + } + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java index 2be3311403..7dab02aee7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.configuration; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -29,7 +31,15 @@ import org.apache.commons.configuration.CompositeConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.binding.BindingFactory; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.ExchangeType; +import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MemoryMessageStore; @@ -147,6 +157,72 @@ public class VirtualHostConfiguration extends ConfigurationPlugin } } + public ConfigurationPlugin getQueueConfiguration(AMQQueue queue) + { + VirtualHostConfiguration hostConfig = queue.getVirtualHost().getConfiguration(); + + // First check if we have a named queue configuration (the easy case) + if (Arrays.asList(hostConfig.getQueueNames()).contains(queue.getName())) + { + return null; + } + + // We don't have an explicit queue config we must find out what we need. + ArrayList<Binding> bindings = new ArrayList<Binding>(queue.getBindings()); + + List<AMQShortString> exchangeClasses = new ArrayList<AMQShortString>(bindings.size()); + + //Remove default exchange + for (int index = 0; index < bindings.size(); index++) + { + // Ignore the DEFAULT Exchange binding + if (bindings.get(index).getExchange().getNameShortString().equals(ExchangeDefaults.DEFAULT_EXCHANGE_NAME)) + { + bindings.remove(index); + } + else + { + exchangeClasses.add(bindings.get(index).getExchange().getType().getName()); + + if (exchangeClasses.size() > 1) + { + // If we have more than 1 class of exchange then we can only use the global queue configuration. + // and this will be returned from the default getQueueConfiguration + return null; + } + } + } + + // If we are just bound the the default exchange then use the default. + if (bindings.isEmpty()) + { + return null; + } + + // If we are bound to only one type of exchange then we are going + // to have to resolve the configuration for that exchange. + + String exchangeName = bindings.get(0).getExchange().getType().getName().toString(); + + // Lookup a Configuration handler for this Exchange. + + // Build the expected class name. <Exchangename>sConfiguration + // i.e. TopicConfiguration or HeadersConfiguration + String exchangeClass = "org.apache.qpid.server.configuration." + + exchangeName.substring(0, 1).toUpperCase() + + exchangeName.substring(1) + "Configuration"; + + ConfigurationPlugin configPlugin + = queue.getVirtualHost().getConfiguration().getConfiguration(exchangeClass); + + + // now need to perform the queue-topic-topics-queue magic. + + System.err.println("*********** Reconfiguring queue with config:"+configPlugin); + + return configPlugin; + } + public long getMemoryUsageMaximum() { return getLongValue("queues.maximumMemoryUsage"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java index 9024c6aec6..1da1459f70 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java @@ -38,8 +38,8 @@ public abstract class ConfigurationPlugin { protected static final Logger _logger = Logger.getLogger(ConfigurationPlugin.class); - private Map<Class<? extends ConfigurationPlugin>, ConfigurationPlugin> - _pluginConfiguration = new HashMap<Class<? extends ConfigurationPlugin>, ConfigurationPlugin>(); + private Map<String, ConfigurationPlugin> + _pluginConfiguration = new HashMap<String, ConfigurationPlugin>(); protected Configuration _configuration; @@ -69,7 +69,7 @@ public abstract class ConfigurationPlugin return _configuration; } - public <C extends ConfigurationPlugin> C getConfiguration(Class<C> plugin) + public <C extends ConfigurationPlugin> C getConfiguration(String plugin) { return (C) _pluginConfiguration.get(plugin); } @@ -155,7 +155,7 @@ public abstract class ConfigurationPlugin List<ConfigurationPlugin> handlers = configurationManager.getConfigurationPlugins(configurationElement, handled); for (ConfigurationPlugin plugin : handlers) { - _pluginConfiguration.put(plugin.getClass(), plugin); + _pluginConfiguration.put(plugin.getClass().getName(), plugin); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index 466bc9e228..8364635632 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -34,6 +34,8 @@ import org.apache.felix.framework.Felix; import org.apache.felix.framework.util.StringMap; import org.apache.log4j.Logger; import org.apache.qpid.common.Closeable; +import org.apache.qpid.server.configuration.TopicConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.security.SecurityManager; @@ -79,6 +81,7 @@ public class PluginManager implements Closeable _securityPlugins.put(pluginFactory.getPluginName(), pluginFactory); } for (ConfigurationPluginFactory configFactory : Arrays.asList( + TopicConfiguration.FACTORY, SecurityManager.SecurityConfiguration.FACTORY, AllowAll.AllowAllConfiguration.FACTORY, DenyAll.DenyAllConfiguration.FACTORY, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 3b60734a2e..225fbec930 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -274,9 +275,9 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer public void doTask(AMQQueue queue) throws AMQException; } - void configure(QueueConfiguration config); + void configure(ConfigurationPlugin config); - QueueConfiguration getConfiguration(); + ConfigurationPlugin getConfiguration(); ManagedObject getManagedObject(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index f81a4a6911..451d59b2e9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.pool.ReadWriteRunnable; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.ConfigStore; @@ -185,7 +186,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //TODO : persist creation time private long _createTime = System.currentTimeMillis(); - private QueueConfiguration _queueConfiguration; + private ConfigurationPlugin _queueConfiguration; @@ -2065,24 +2066,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void configure(QueueConfiguration config) + public void configure(ConfigurationPlugin config) { if (config != null) { - setMaximumMessageAge(config.getMaximumMessageAge()); - setMaximumQueueDepth(config.getMaximumQueueDepth()); - setMaximumMessageSize(config.getMaximumMessageSize()); - setMaximumMessageCount(config.getMaximumMessageCount()); - setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); - _capacity = config.getCapacity(); - _flowResumeCapacity = config.getFlowResumeCapacity(); + if (config instanceof QueueConfiguration) + { + + setMaximumMessageAge(((QueueConfiguration)config).getMaximumMessageAge()); + setMaximumQueueDepth(((QueueConfiguration)config).getMaximumQueueDepth()); + setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize()); + setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount()); + setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap()); + _capacity = ((QueueConfiguration)config).getCapacity(); + _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity(); + } _queueConfiguration = config; + } } - public QueueConfiguration getConfiguration() + public ConfigurationPlugin getConfiguration() { return _queueConfiguration; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java index 362d919a5e..ff28d76053 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -153,7 +153,7 @@ public class SecurityManager public Map<String, SecurityPlugin> configurePlugins(ConfigurationPlugin hostConfig) throws ConfigurationException { Map<String, SecurityPlugin> plugins = new HashMap<String, SecurityPlugin>(); - SecurityConfiguration securityConfig = hostConfig.getConfiguration(SecurityConfiguration.class); + SecurityConfiguration securityConfig = hostConfig.getConfiguration(SecurityConfiguration.class.getName()); // If we have no security Configuration then there is nothing to configure. if (securityConfig != null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java index d4777b8cb3..db18a89231 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java @@ -66,7 +66,7 @@ public class AllowAll extends BasicPlugin { public AllowAll newInstance(ConfigurationPlugin config) throws ConfigurationException { - AllowAllConfiguration configuration = config.getConfiguration(AllowAllConfiguration.class); + AllowAllConfiguration configuration = config.getConfiguration(AllowAllConfiguration.class.getName()); // If there is no configuration for this plugin then don't load it. if (configuration == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java index cd68511730..6c0fb1eaa4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java @@ -66,7 +66,7 @@ public class DenyAll extends BasicPlugin { public DenyAll newInstance(ConfigurationPlugin config) throws ConfigurationException { - DenyAllConfiguration configuration = config.getConfiguration(DenyAllConfiguration.class); + DenyAllConfiguration configuration = config.getConfiguration(DenyAllConfiguration.class.getName()); // If there is no configuration for this plugin then don't load it. if (configuration == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java index 1250cdcb1b..bd99cdd1fa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java @@ -60,7 +60,7 @@ public class LegacyAccess extends BasicPlugin { public LegacyAccess newInstance(ConfigurationPlugin config) throws ConfigurationException { - LegacyAccessConfiguration configuration = config.getConfiguration(LegacyAccessConfiguration.class); + LegacyAccessConfiguration configuration = config.getConfiguration(LegacyAccessConfiguration.class.getName()); // If there is no configuration for this plugin then don't load it. if (configuration == null) diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java index 7a8cabf512..956bb6f8fa 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java @@ -38,7 +38,7 @@ public class LogMessageTest extends TestCase { Locale usLocal = Locale.US; Locale.setDefault(usLocal); - ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.LogMessages", + ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Broker_logmessages", usLocal); assertNotNull("Unable to load ResourceBundle", _messages); @@ -55,7 +55,7 @@ public class LogMessageTest extends TestCase Locale.setDefault(japanese); try { - ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.LogMessages", + ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Broker_logmessages", japanese); assertNotNull("Unable to load ResourceBundle", _messages); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 70d146437f..51b049787c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.*; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.subscription.Subscription; @@ -36,7 +36,6 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.AMQException; -import javax.swing.*; import java.util.List; import java.util.Set; import java.util.Map; @@ -520,12 +519,12 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } - public void configure(QueueConfiguration config) + public void configure(ConfigurationPlugin config) { } - public QueueConfiguration getConfiguration() + public ConfigurationPlugin getConfiguration() { return null; //To change body of implemented methods use File | Settings | File Templates. } |
