From d4c28fabf668dcef1cfd67c7a73e57dc0d168dfd Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 14 Jun 2010 12:36:56 +0000 Subject: QPID-2638 : Add initial support for Topics section in configuration file. Added getQueueConfiguration(AMQQueue) which will return a new configuration for the given queue reflecting its binding status. This will allow the queue to be reconfigured during the binding process. Full Docs on this approach to appear on wiki. AMQQueue.configure and getConfiguration() have been updated to use ConfigurationPlugin rather than QueueConfiguration, The queue may be configured by a TopicConfiguration now. Update SlowConsumerTest to be GlobalQueuesTest and add a GlobalTopicsTest to match, where the config is added to the queues or topics section respectively git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@954433 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/binding/BindingFactory.java | 11 ++ .../server/configuration/TopicConfiguration.java | 120 +++++++++++++++++++++ .../configuration/VirtualHostConfiguration.java | 76 +++++++++++++ .../configuration/plugins/ConfigurationPlugin.java | 8 +- .../apache/qpid/server/plugins/PluginManager.java | 3 + .../org/apache/qpid/server/queue/AMQQueue.java | 5 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 26 +++-- .../qpid/server/security/SecurityManager.java | 2 +- .../server/security/access/plugins/AllowAll.java | 2 +- .../server/security/access/plugins/DenyAll.java | 2 +- .../security/access/plugins/LegacyAccess.java | 2 +- .../apache/qpid/server/logging/LogMessageTest.java | 4 +- .../org/apache/qpid/server/queue/MockAMQQueue.java | 7 +- 13 files changed, 242 insertions(+), 26 deletions(-) create mode 100644 java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java (limited to 'java/broker') diff --git a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java index 50377eaf52..b24a326ed3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java @@ -32,6 +32,8 @@ import org.apache.qpid.server.configuration.BindingConfig; import org.apache.qpid.server.configuration.BindingConfigType; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; +import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.BindingMessages; @@ -201,6 +203,15 @@ public class BindingFactory exchange.addBinding(b); getConfigStore().addConfiguredObject(b); b.logCreation(); + + //Reconfigure the queue for to reflect this new binding. + ConfigurationPlugin config = queue.getVirtualHost().getConfiguration().getQueueConfiguration(queue); + + if (config != null) + { + // Reconfigure with new config. + queue.configure(config); + } return true; } else diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java new file mode 100644 index 0000000000..0218bf7273 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfiguration.java @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class TopicConfiguration extends ConfigurationPlugin +{ + public static final ConfigurationPluginFactory FACTORY = new TopicConfigurationFactory(); + + private static final String VIRTUALHOSTS_VIRTUALHOST_TOPICS = "virtualhosts.virtualhost.topics"; + + public static class TopicConfigurationFactory implements ConfigurationPluginFactory + { + + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + TopicConfiguration topicsConfig = new TopicConfiguration(); + topicsConfig.setConfiguration(path, config); + return topicsConfig; + } + + public List getParentPaths() + { + return Arrays.asList(VIRTUALHOSTS_VIRTUALHOST_TOPICS); + } + } + + Map _topics = new HashMap(); + + public String[] getElementsProcessed() + { + return new String[]{"topic"}; + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + if (_configuration.isEmpty()) + { + throw new ConfigurationException("Topics section cannot be empty."); + } + + int topics = _configuration.getList("topic.name").size(); + + for(int index=0; index bindings = new ArrayList(queue.getBindings()); + + List exchangeClasses = new ArrayList(bindings.size()); + + //Remove default exchange + for (int index = 0; index < bindings.size(); index++) + { + // Ignore the DEFAULT Exchange binding + if (bindings.get(index).getExchange().getNameShortString().equals(ExchangeDefaults.DEFAULT_EXCHANGE_NAME)) + { + bindings.remove(index); + } + else + { + exchangeClasses.add(bindings.get(index).getExchange().getType().getName()); + + if (exchangeClasses.size() > 1) + { + // If we have more than 1 class of exchange then we can only use the global queue configuration. + // and this will be returned from the default getQueueConfiguration + return null; + } + } + } + + // If we are just bound the the default exchange then use the default. + if (bindings.isEmpty()) + { + return null; + } + + // If we are bound to only one type of exchange then we are going + // to have to resolve the configuration for that exchange. + + String exchangeName = bindings.get(0).getExchange().getType().getName().toString(); + + // Lookup a Configuration handler for this Exchange. + + // Build the expected class name. sConfiguration + // i.e. TopicConfiguration or HeadersConfiguration + String exchangeClass = "org.apache.qpid.server.configuration." + + exchangeName.substring(0, 1).toUpperCase() + + exchangeName.substring(1) + "Configuration"; + + ConfigurationPlugin configPlugin + = queue.getVirtualHost().getConfiguration().getConfiguration(exchangeClass); + + + // now need to perform the queue-topic-topics-queue magic. + + System.err.println("*********** Reconfiguring queue with config:"+configPlugin); + + return configPlugin; + } + public long getMemoryUsageMaximum() { return getLongValue("queues.maximumMemoryUsage"); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java index 9024c6aec6..1da1459f70 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java @@ -38,8 +38,8 @@ public abstract class ConfigurationPlugin { protected static final Logger _logger = Logger.getLogger(ConfigurationPlugin.class); - private Map, ConfigurationPlugin> - _pluginConfiguration = new HashMap, ConfigurationPlugin>(); + private Map + _pluginConfiguration = new HashMap(); protected Configuration _configuration; @@ -69,7 +69,7 @@ public abstract class ConfigurationPlugin return _configuration; } - public C getConfiguration(Class plugin) + public C getConfiguration(String plugin) { return (C) _pluginConfiguration.get(plugin); } @@ -155,7 +155,7 @@ public abstract class ConfigurationPlugin List handlers = configurationManager.getConfigurationPlugins(configurationElement, handled); for (ConfigurationPlugin plugin : handlers) { - _pluginConfiguration.put(plugin.getClass(), plugin); + _pluginConfiguration.put(plugin.getClass().getName(), plugin); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index 466bc9e228..8364635632 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -34,6 +34,8 @@ import org.apache.felix.framework.Felix; import org.apache.felix.framework.util.StringMap; import org.apache.log4j.Logger; import org.apache.qpid.common.Closeable; +import org.apache.qpid.server.configuration.TopicConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.security.SecurityManager; @@ -79,6 +81,7 @@ public class PluginManager implements Closeable _securityPlugins.put(pluginFactory.getPluginName(), pluginFactory); } for (ConfigurationPluginFactory configFactory : Arrays.asList( + TopicConfiguration.FACTORY, SecurityManager.SecurityConfiguration.FACTORY, AllowAll.AllowAllConfiguration.FACTORY, DenyAll.DenyAllConfiguration.FACTORY, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 3b60734a2e..225fbec930 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -274,9 +275,9 @@ public interface AMQQueue extends Managable, Comparable, ExchangeRefer public void doTask(AMQQueue queue) throws AMQException; } - void configure(QueueConfiguration config); + void configure(ConfigurationPlugin config); - QueueConfiguration getConfiguration(); + ConfigurationPlugin getConfiguration(); ManagedObject getManagedObject(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index f81a4a6911..451d59b2e9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.pool.ReadWriteRunnable; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.configuration.ConfigStore; @@ -185,7 +186,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener //TODO : persist creation time private long _createTime = System.currentTimeMillis(); - private QueueConfiguration _queueConfiguration; + private ConfigurationPlugin _queueConfiguration; @@ -2065,24 +2066,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } - public void configure(QueueConfiguration config) + public void configure(ConfigurationPlugin config) { if (config != null) { - setMaximumMessageAge(config.getMaximumMessageAge()); - setMaximumQueueDepth(config.getMaximumQueueDepth()); - setMaximumMessageSize(config.getMaximumMessageSize()); - setMaximumMessageCount(config.getMaximumMessageCount()); - setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap()); - _capacity = config.getCapacity(); - _flowResumeCapacity = config.getFlowResumeCapacity(); + if (config instanceof QueueConfiguration) + { + + setMaximumMessageAge(((QueueConfiguration)config).getMaximumMessageAge()); + setMaximumQueueDepth(((QueueConfiguration)config).getMaximumQueueDepth()); + setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize()); + setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount()); + setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap()); + _capacity = ((QueueConfiguration)config).getCapacity(); + _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity(); + } _queueConfiguration = config; + } } - public QueueConfiguration getConfiguration() + public ConfigurationPlugin getConfiguration() { return _queueConfiguration; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java index 362d919a5e..ff28d76053 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -153,7 +153,7 @@ public class SecurityManager public Map configurePlugins(ConfigurationPlugin hostConfig) throws ConfigurationException { Map plugins = new HashMap(); - SecurityConfiguration securityConfig = hostConfig.getConfiguration(SecurityConfiguration.class); + SecurityConfiguration securityConfig = hostConfig.getConfiguration(SecurityConfiguration.class.getName()); // If we have no security Configuration then there is nothing to configure. if (securityConfig != null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java index d4777b8cb3..db18a89231 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java @@ -66,7 +66,7 @@ public class AllowAll extends BasicPlugin { public AllowAll newInstance(ConfigurationPlugin config) throws ConfigurationException { - AllowAllConfiguration configuration = config.getConfiguration(AllowAllConfiguration.class); + AllowAllConfiguration configuration = config.getConfiguration(AllowAllConfiguration.class.getName()); // If there is no configuration for this plugin then don't load it. if (configuration == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java index cd68511730..6c0fb1eaa4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java @@ -66,7 +66,7 @@ public class DenyAll extends BasicPlugin { public DenyAll newInstance(ConfigurationPlugin config) throws ConfigurationException { - DenyAllConfiguration configuration = config.getConfiguration(DenyAllConfiguration.class); + DenyAllConfiguration configuration = config.getConfiguration(DenyAllConfiguration.class.getName()); // If there is no configuration for this plugin then don't load it. if (configuration == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java index 1250cdcb1b..bd99cdd1fa 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/LegacyAccess.java @@ -60,7 +60,7 @@ public class LegacyAccess extends BasicPlugin { public LegacyAccess newInstance(ConfigurationPlugin config) throws ConfigurationException { - LegacyAccessConfiguration configuration = config.getConfiguration(LegacyAccessConfiguration.class); + LegacyAccessConfiguration configuration = config.getConfiguration(LegacyAccessConfiguration.class.getName()); // If there is no configuration for this plugin then don't load it. if (configuration == null) diff --git a/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java index 7a8cabf512..956bb6f8fa 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/logging/LogMessageTest.java @@ -38,7 +38,7 @@ public class LogMessageTest extends TestCase { Locale usLocal = Locale.US; Locale.setDefault(usLocal); - ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.LogMessages", + ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Broker_logmessages", usLocal); assertNotNull("Unable to load ResourceBundle", _messages); @@ -55,7 +55,7 @@ public class LogMessageTest extends TestCase Locale.setDefault(japanese); try { - ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.LogMessages", + ResourceBundle _messages = ResourceBundle.getBundle("org.apache.qpid.server.logging.messages.Broker_logmessages", japanese); assertNotNull("Unable to load ResourceBundle", _messages); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 70d146437f..51b049787c 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.queue; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.*; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.subscription.Subscription; @@ -36,7 +36,6 @@ import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.AMQException; -import javax.swing.*; import java.util.List; import java.util.Set; import java.util.Map; @@ -520,12 +519,12 @@ public class MockAMQQueue implements AMQQueue //To change body of implemented methods use File | Settings | File Templates. } - public void configure(QueueConfiguration config) + public void configure(ConfigurationPlugin config) { } - public QueueConfiguration getConfiguration() + public ConfigurationPlugin getConfiguration() { return null; //To change body of implemented methods use File | Settings | File Templates. } -- cgit v1.2.1