summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2010-07-05 11:15:25 +0000
committerMartin Ritchie <ritchiem@apache.org>2010-07-05 11:15:25 +0000
commitca2413955f751c1201f1be2f2e1c7c21e75b047e (patch)
tree4a10a30f5472ca64aadbc4ba01fadc988e2b93c2 /qpid/java
parent985afcea2d2dc14ad67863e6ac4413e9633bd2b2 (diff)
downloadqpid-python-ca2413955f751c1201f1be2f2e1c7c21e75b047e.tar.gz
QPID-2681 : Provide ability to merge configurations. This does simple merging of sub configuration elements. Currently the last value to be merged is taken as is. No explicit merging is performed. Merging is performed in the order Queues->(Exchange)->Queue, Where a configured Exchange Configuration component can decide how to perform its merge. TopicConfiguration performs the order Topics->Topic->Subscriptions, this allows Global Topic configuration to be overwritten by a specific topic version. Currently the Topic is only identified by a straight string wild card matching has not yet been implemented.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@960546 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java124
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java40
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java47
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java142
5 files changed, 348 insertions, 34 deletions
diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java
new file mode 100644
index 0000000000..e4efac60f8
--- /dev/null
+++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class MergeConfigurationTest extends TestingBaseCase
+{
+
+ protected int topicCount = 0;
+
+
+ public void configureTopic(String topic, int msgCount) throws NamingException, IOException, ConfigurationException
+ {
+
+ setProperty(".topics.topic("+topicCount+").name", topic);
+ setProperty(".topics.topic("+topicCount+").slow-consumer-detection.messageCount", String.valueOf(msgCount));
+ setProperty(".topics.topic("+topicCount+").slow-consumer-detection.policy.name", "TopicDelete");
+ topicCount++;
+ }
+
+
+ /**
+ * Test that setting messageCount takes affect on topics
+ *
+ * We send 10 messages and disconnect at 9
+ *
+ * @throws Exception
+ */
+ public void testTopicConsumerMessageCount() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT * 4) - 1);
+
+ //Configure topic as a subscription
+ setProperty(".topics.topic("+topicCount+").subscriptionName", "clientid:"+getTestQueueName());
+ configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT - 1));
+
+
+
+ //Start the broker
+ startBroker();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+
+//
+// public void testMerge() throws ConfigurationException, AMQException
+// {
+//
+// AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"), false, new AMQShortString("testowner"),
+// false, false, _virtualHost, null);
+//
+// _virtualHost.getQueueRegistry().registerQueue(queue);
+// Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+// _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange, null);
+//
+//
+// Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
+// _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange, null);
+//
+// TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName());
+//
+// assertNotNull("Queue should have topic configuration bound to it.", config);
+// assertEquals("Configuration name not correct", getName() + ":stockSubscription", config.getSubscriptionName());
+//
+// ConfigurationPlugin scdConfig = queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
+// if (scdConfig instanceof org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration)
+// {
+// System.err.println("********************** scd is a SlowConsumerDetectionQueueConfiguration.");
+// }
+// else
+// {
+// System.err.println("********************** Test SCD "+SlowConsumerDetectionQueueConfiguration.class.getClassLoader());
+// System.err.println("********************** Broker SCD "+scdConfig.getClass().getClassLoader());
+// System.err.println("********************** Broker SCD "+scdConfig.getClass().isAssignableFrom(SlowConsumerDetectionQueueConfiguration.class));
+// System.err.println("********************** is a "+scdConfig.getClass());
+// }
+//
+// assertNotNull("Queue should have scd configuration bound to it.", scdConfig);
+// assertEquals("MessageCount is not correct", 10 , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getMessageCount());
+// assertEquals("Policy is not correct", TopicDeletePolicy.class.getName() , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getPolicy().getClass().getName());
+// }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java
new file mode 100644
index 0000000000..bfb2de4235
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface ExchangeConfigurationPlugin
+{
+ ConfigurationPlugin getConfiguration(AMQQueue queue);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index f62022922a..4512de6fb4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.configuration;
import java.util.List;
import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
@@ -167,4 +166,43 @@ public class QueueConfiguration extends ConfigurationPlugin
{
return getStringValue("lvqKey", null);
}
+
+
+ public static class QueueConfig extends ConfigurationPlugin
+ {
+ @Override
+ public String[] getElementsProcessed()
+ {
+ return new String[]{"name"};
+ }
+
+ public String getName()
+ {
+ return getStringValue("name");
+ }
+
+
+ public void validateConfiguration() throws ConfigurationException
+ {
+ if (_configuration.isEmpty())
+ {
+ throw new ConfigurationException("Queue section cannot be empty.");
+ }
+
+ if (getName() == null)
+ {
+ throw new ConfigurationException("Queue section must have a 'name' element.");
+ }
+
+ }
+
+
+ @Override
+ public String formatToString()
+ {
+ return "Name:"+getName();
+ }
+
+
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 7dab02aee7..967e8a03f2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -34,11 +34,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
@@ -212,15 +208,48 @@ public class VirtualHostConfiguration extends ConfigurationPlugin
+ exchangeName.substring(0, 1).toUpperCase()
+ exchangeName.substring(1) + "Configuration";
- ConfigurationPlugin configPlugin
- = queue.getVirtualHost().getConfiguration().getConfiguration(exchangeClass);
-
+ ExchangeConfigurationPlugin exchangeConfiguration
+ = (ExchangeConfigurationPlugin) queue.getVirtualHost().getConfiguration().getConfiguration(exchangeClass);
// now need to perform the queue-topic-topics-queue magic.
+ // So make a new ConfigurationObject that will hold all the configuration for this queue.
+ ConfigurationPlugin queueConfig = new QueueConfiguration.QueueConfig();
+
+ // Initialise the queue with any Global values we may have
+ QueueConfiguration config = getConfiguration(QueueConfiguration.class.getName());
+ if (config == null)
+ {
+ PropertiesConfiguration newQueueConfig = new PropertiesConfiguration();
+ newQueueConfig.setProperty("name", queue.getName());
- System.err.println("*********** Reconfiguring queue with config:"+configPlugin);
+ try
+ {
+ queueConfig.setConfiguration("", newQueueConfig);
+ }
+ catch (ConfigurationException e)
+ {
+ // This will not occur as queues only require a name.
+ _logger.error("QueueConfiguration requirements have changed.");
+ }
+ }
+ else
+ {
+ queueConfig.addConfiguration(config);
+ }
+
+ // Merge any configuration the Exchange wishes to apply
+ if (exchangeConfiguration != null)
+ {
+ queueConfig.addConfiguration(exchangeConfiguration.getConfiguration(queue));
+ }
+
+ //Finally merge in any specific queue configuration we have.
+ if (_queues.containsKey(queue.getName()))
+ {
+ queueConfig.addConfiguration(_queues.get(queue.getName()));
+ }
- return configPlugin;
+ return queueConfig;
}
public long getMemoryUsageMaximum()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
index 1da1459f70..82b576ea51 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
@@ -18,6 +18,13 @@
*/
package org.apache.qpid.server.configuration.plugins;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConversionException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.ConfigurationManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -27,13 +34,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConversionException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.ConfigurationManager;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-
public abstract class ConfigurationPlugin
{
protected static final Logger _logger = Logger.getLogger(ConfigurationPlugin.class);
@@ -45,20 +45,18 @@ public abstract class ConfigurationPlugin
/**
* The Elements that this Plugin can process.
- *
+ *
* For a Queues plugin that would be a list containing:
* <ul>
* <li>queue - the queue entries
* <li>the alerting values for defaults
* <li>exchange - the default exchange
* <li>durable - set the default durablity
- * </ul>
+ * </ul>
*/
abstract public String[] getElementsProcessed();
-
- /**
- * Performs configuration validation.
- */
+
+ /** Performs configuration validation. */
public void validateConfiguration() throws ConfigurationException
{
// Override in sub-classes
@@ -145,14 +143,20 @@ public abstract class ConfigurationPlugin
{
ConfigurationManager configurationManager = ApplicationRegistry.getInstance().getConfigurationManager();
Configuration handled = element.length() == 0 ? configuration : configuration.subset(element);
-
+
String configurationElement = element;
if (path.length() > 0)
{
- configurationElement = path + "." + configurationElement;
+ configurationElement = path + "." + configurationElement;
}
List<ConfigurationPlugin> handlers = configurationManager.getConfigurationPlugins(configurationElement, handled);
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("For '" + element + "' found handlers (" + handlers.size() + "):" + handlers);
+ }
+
for (ConfigurationPlugin plugin : handlers)
{
_pluginConfiguration.put(plugin.getClass().getName(), plugin);
@@ -161,10 +165,8 @@ public abstract class ConfigurationPlugin
validateConfiguration();
}
-
- /**
- * Helper method to print out list of keys in a {@link Configuration}.
- */
+
+ /** Helper method to print out list of keys in a {@link Configuration}. */
public static final void showKeys(Configuration config)
{
if (config.isEmpty())
@@ -186,7 +188,7 @@ public abstract class ConfigurationPlugin
{
return _configuration != null;
}
-
+
/// Getters
protected double getDoubleValue(String property)
@@ -199,7 +201,6 @@ public abstract class ConfigurationPlugin
return _configuration.getDouble(property, defaultValue);
}
-
protected long getLongValue(String property)
{
return getLongValue(property, 0);
@@ -250,8 +251,6 @@ public abstract class ConfigurationPlugin
return _configuration.getList(property, defaultValue);
}
-
-
/// Validation Helpers
protected boolean contains(String property)
@@ -259,7 +258,6 @@ public abstract class ConfigurationPlugin
return _configuration.getProperty(property) != null;
}
-
/**
* Provide mechanism to validate Configuration contains a Postiive Long Value
*
@@ -354,6 +352,102 @@ public abstract class ConfigurationPlugin
}
}
+ /**
+ * Given another configuration merge the configuration into our own config
+ *
+ * The new values being merged in will take precedence over existing values.
+ *
+ * In the simplistic case this means something like:
+ *
+ * So if we have configuration set
+ * name = 'fooo'
+ *
+ * And the new configuration contains a name then that will be reset.
+ * name = 'new'
+ *
+ * However this plugin will simply contain other plugins so the merge will
+ * be called until we end up at a base plugin that understand how to merge
+ * items. i.e Alerting values. Where the provided configuration will take
+ * precedence.
+ *
+ * @param configuration the config to merge in to our own.
+ */
+ public void addConfiguration(ConfigurationPlugin configuration)
+ {
+ // If given configuration is null then there is nothing to process.
+ if (configuration == null)
+ {
+ return;
+ }
+
+ // Merge all the sub configuration items
+ for (Map.Entry<String, ConfigurationPlugin> newPlugins : configuration._pluginConfiguration.entrySet())
+ {
+ String key = newPlugins.getKey();
+ ConfigurationPlugin config = newPlugins.getValue();
+
+ if (_pluginConfiguration.containsKey(key))
+ {
+ //Merge the configuration if we already have this type of config
+ _pluginConfiguration.get(key).mergeConfiguration(config);
+ }
+ else
+ {
+ //otherwise just add it to our config.
+ _pluginConfiguration.put(key, config);
+ }
+ }
+
+ //Merge the configuration itself
+ String key = configuration.getClass().getName();
+ if (_pluginConfiguration.containsKey(key))
+ {
+ //Merge the configuration if we already have this type of config
+ _pluginConfiguration.get(key).mergeConfiguration(configuration);
+ }
+ else
+ {
+ //If we are adding a configuration of our own type then merge
+ if (configuration.getClass() == this.getClass())
+ {
+ mergeConfiguration(configuration);
+ }
+ else
+ {
+ // just store this in case someone else needs it.
+ _pluginConfiguration.put(key, configuration);
+ }
+
+ }
+
+ }
+
+ protected void mergeConfiguration(ConfigurationPlugin configuration)
+ {
+ _configuration = configuration.getConfig();
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("\n").append(getClass().getSimpleName());
+ sb.append("=[ (").append(formatToString()).append(")");
+
+ for(Map.Entry<String,ConfigurationPlugin> item : _pluginConfiguration.entrySet())
+ {
+ sb.append("\n").append(item.getValue());
+ }
+
+ sb.append("]\n");
+
+ return sb.toString();
+ }
+
+ public String formatToString()
+ {
+ return super.toString();
+ }
}