summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java124
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java29
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java40
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java47
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java142
5 files changed, 348 insertions, 34 deletions
diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java
new file mode 100644
index 0000000000..e4efac60f8
--- /dev/null
+++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systest;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.AMQChannelClosedException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.NamingException;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class MergeConfigurationTest extends TestingBaseCase
+{
+
+ protected int topicCount = 0;
+
+
+ public void configureTopic(String topic, int msgCount) throws NamingException, IOException, ConfigurationException
+ {
+
+ setProperty(".topics.topic("+topicCount+").name", topic);
+ setProperty(".topics.topic("+topicCount+").slow-consumer-detection.messageCount", String.valueOf(msgCount));
+ setProperty(".topics.topic("+topicCount+").slow-consumer-detection.policy.name", "TopicDelete");
+ topicCount++;
+ }
+
+
+ /**
+ * Test that setting messageCount takes affect on topics
+ *
+ * We send 10 messages and disconnect at 9
+ *
+ * @throws Exception
+ */
+ public void testTopicConsumerMessageCount() throws Exception
+ {
+ MAX_QUEUE_MESSAGE_COUNT = 10;
+
+ configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT * 4) - 1);
+
+ //Configure topic as a subscription
+ setProperty(".topics.topic("+topicCount+").subscriptionName", "clientid:"+getTestQueueName());
+ configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT - 1));
+
+
+
+ //Start the broker
+ startBroker();
+
+ topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
+ }
+
+
+//
+// public void testMerge() throws ConfigurationException, AMQException
+// {
+//
+// AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"), false, new AMQShortString("testowner"),
+// false, false, _virtualHost, null);
+//
+// _virtualHost.getQueueRegistry().registerQueue(queue);
+// Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+// _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange, null);
+//
+//
+// Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
+// _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange, null);
+//
+// TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName());
+//
+// assertNotNull("Queue should have topic configuration bound to it.", config);
+// assertEquals("Configuration name not correct", getName() + ":stockSubscription", config.getSubscriptionName());
+//
+// ConfigurationPlugin scdConfig = queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
+// if (scdConfig instanceof org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration)
+// {
+// System.err.println("********************** scd is a SlowConsumerDetectionQueueConfiguration.");
+// }
+// else
+// {
+// System.err.println("********************** Test SCD "+SlowConsumerDetectionQueueConfiguration.class.getClassLoader());
+// System.err.println("********************** Broker SCD "+scdConfig.getClass().getClassLoader());
+// System.err.println("********************** Broker SCD "+scdConfig.getClass().isAssignableFrom(SlowConsumerDetectionQueueConfiguration.class));
+// System.err.println("********************** is a "+scdConfig.getClass());
+// }
+//
+// assertNotNull("Queue should have scd configuration bound to it.", scdConfig);
+// assertEquals("MessageCount is not correct", 10 , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getMessageCount());
+// assertEquals("Policy is not correct", TopicDeletePolicy.class.getName() , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getPolicy().getClass().getName());
+// }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java
new file mode 100644
index 0000000000..bfb2de4235
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfigurationPlugin.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface ExchangeConfigurationPlugin
+{
+ ConfigurationPlugin getConfiguration(AMQQueue queue);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
index f62022922a..4512de6fb4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
@@ -23,7 +23,6 @@ package org.apache.qpid.server.configuration;
import java.util.List;
import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
@@ -167,4 +166,43 @@ public class QueueConfiguration extends ConfigurationPlugin
{
return getStringValue("lvqKey", null);
}
+
+
+ public static class QueueConfig extends ConfigurationPlugin
+ {
+ @Override
+ public String[] getElementsProcessed()
+ {
+ return new String[]{"name"};
+ }
+
+ public String getName()
+ {
+ return getStringValue("name");
+ }
+
+
+ public void validateConfiguration() throws ConfigurationException
+ {
+ if (_configuration.isEmpty())
+ {
+ throw new ConfigurationException("Queue section cannot be empty.");
+ }
+
+ if (getName() == null)
+ {
+ throw new ConfigurationException("Queue section must have a 'name' element.");
+ }
+
+ }
+
+
+ @Override
+ public String formatToString()
+ {
+ return "Name:"+getName();
+ }
+
+
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
index 7dab02aee7..967e8a03f2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
@@ -34,11 +34,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
@@ -212,15 +208,48 @@ public class VirtualHostConfiguration extends ConfigurationPlugin
+ exchangeName.substring(0, 1).toUpperCase()
+ exchangeName.substring(1) + "Configuration";
- ConfigurationPlugin configPlugin
- = queue.getVirtualHost().getConfiguration().getConfiguration(exchangeClass);
-
+ ExchangeConfigurationPlugin exchangeConfiguration
+ = (ExchangeConfigurationPlugin) queue.getVirtualHost().getConfiguration().getConfiguration(exchangeClass);
// now need to perform the queue-topic-topics-queue magic.
+ // So make a new ConfigurationObject that will hold all the configuration for this queue.
+ ConfigurationPlugin queueConfig = new QueueConfiguration.QueueConfig();
+
+ // Initialise the queue with any Global values we may have
+ QueueConfiguration config = getConfiguration(QueueConfiguration.class.getName());
+ if (config == null)
+ {
+ PropertiesConfiguration newQueueConfig = new PropertiesConfiguration();
+ newQueueConfig.setProperty("name", queue.getName());
- System.err.println("*********** Reconfiguring queue with config:"+configPlugin);
+ try
+ {
+ queueConfig.setConfiguration("", newQueueConfig);
+ }
+ catch (ConfigurationException e)
+ {
+ // This will not occur as queues only require a name.
+ _logger.error("QueueConfiguration requirements have changed.");
+ }
+ }
+ else
+ {
+ queueConfig.addConfiguration(config);
+ }
+
+ // Merge any configuration the Exchange wishes to apply
+ if (exchangeConfiguration != null)
+ {
+ queueConfig.addConfiguration(exchangeConfiguration.getConfiguration(queue));
+ }
+
+ //Finally merge in any specific queue configuration we have.
+ if (_queues.containsKey(queue.getName()))
+ {
+ queueConfig.addConfiguration(_queues.get(queue.getName()));
+ }
- return configPlugin;
+ return queueConfig;
}
public long getMemoryUsageMaximum()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
index 1da1459f70..82b576ea51 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
@@ -18,6 +18,13 @@
*/
package org.apache.qpid.server.configuration.plugins;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConversionException;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.configuration.ConfigurationManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -27,13 +34,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConversionException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.ConfigurationManager;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-
public abstract class ConfigurationPlugin
{
protected static final Logger _logger = Logger.getLogger(ConfigurationPlugin.class);
@@ -45,20 +45,18 @@ public abstract class ConfigurationPlugin
/**
* The Elements that this Plugin can process.
- *
+ *
* For a Queues plugin that would be a list containing:
* <ul>
* <li>queue - the queue entries
* <li>the alerting values for defaults
* <li>exchange - the default exchange
* <li>durable - set the default durablity
- * </ul>
+ * </ul>
*/
abstract public String[] getElementsProcessed();
-
- /**
- * Performs configuration validation.
- */
+
+ /** Performs configuration validation. */
public void validateConfiguration() throws ConfigurationException
{
// Override in sub-classes
@@ -145,14 +143,20 @@ public abstract class ConfigurationPlugin
{
ConfigurationManager configurationManager = ApplicationRegistry.getInstance().getConfigurationManager();
Configuration handled = element.length() == 0 ? configuration : configuration.subset(element);
-
+
String configurationElement = element;
if (path.length() > 0)
{
- configurationElement = path + "." + configurationElement;
+ configurationElement = path + "." + configurationElement;
}
List<ConfigurationPlugin> handlers = configurationManager.getConfigurationPlugins(configurationElement, handled);
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("For '" + element + "' found handlers (" + handlers.size() + "):" + handlers);
+ }
+
for (ConfigurationPlugin plugin : handlers)
{
_pluginConfiguration.put(plugin.getClass().getName(), plugin);
@@ -161,10 +165,8 @@ public abstract class ConfigurationPlugin
validateConfiguration();
}
-
- /**
- * Helper method to print out list of keys in a {@link Configuration}.
- */
+
+ /** Helper method to print out list of keys in a {@link Configuration}. */
public static final void showKeys(Configuration config)
{
if (config.isEmpty())
@@ -186,7 +188,7 @@ public abstract class ConfigurationPlugin
{
return _configuration != null;
}
-
+
/// Getters
protected double getDoubleValue(String property)
@@ -199,7 +201,6 @@ public abstract class ConfigurationPlugin
return _configuration.getDouble(property, defaultValue);
}
-
protected long getLongValue(String property)
{
return getLongValue(property, 0);
@@ -250,8 +251,6 @@ public abstract class ConfigurationPlugin
return _configuration.getList(property, defaultValue);
}
-
-
/// Validation Helpers
protected boolean contains(String property)
@@ -259,7 +258,6 @@ public abstract class ConfigurationPlugin
return _configuration.getProperty(property) != null;
}
-
/**
* Provide mechanism to validate Configuration contains a Postiive Long Value
*
@@ -354,6 +352,102 @@ public abstract class ConfigurationPlugin
}
}
+ /**
+ * Given another configuration merge the configuration into our own config
+ *
+ * The new values being merged in will take precedence over existing values.
+ *
+ * In the simplistic case this means something like:
+ *
+ * So if we have configuration set
+ * name = 'fooo'
+ *
+ * And the new configuration contains a name then that will be reset.
+ * name = 'new'
+ *
+ * However this plugin will simply contain other plugins so the merge will
+ * be called until we end up at a base plugin that understand how to merge
+ * items. i.e Alerting values. Where the provided configuration will take
+ * precedence.
+ *
+ * @param configuration the config to merge in to our own.
+ */
+ public void addConfiguration(ConfigurationPlugin configuration)
+ {
+ // If given configuration is null then there is nothing to process.
+ if (configuration == null)
+ {
+ return;
+ }
+
+ // Merge all the sub configuration items
+ for (Map.Entry<String, ConfigurationPlugin> newPlugins : configuration._pluginConfiguration.entrySet())
+ {
+ String key = newPlugins.getKey();
+ ConfigurationPlugin config = newPlugins.getValue();
+
+ if (_pluginConfiguration.containsKey(key))
+ {
+ //Merge the configuration if we already have this type of config
+ _pluginConfiguration.get(key).mergeConfiguration(config);
+ }
+ else
+ {
+ //otherwise just add it to our config.
+ _pluginConfiguration.put(key, config);
+ }
+ }
+
+ //Merge the configuration itself
+ String key = configuration.getClass().getName();
+ if (_pluginConfiguration.containsKey(key))
+ {
+ //Merge the configuration if we already have this type of config
+ _pluginConfiguration.get(key).mergeConfiguration(configuration);
+ }
+ else
+ {
+ //If we are adding a configuration of our own type then merge
+ if (configuration.getClass() == this.getClass())
+ {
+ mergeConfiguration(configuration);
+ }
+ else
+ {
+ // just store this in case someone else needs it.
+ _pluginConfiguration.put(key, configuration);
+ }
+
+ }
+
+ }
+
+ protected void mergeConfiguration(ConfigurationPlugin configuration)
+ {
+ _configuration = configuration.getConfig();
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("\n").append(getClass().getSimpleName());
+ sb.append("=[ (").append(formatToString()).append(")");
+
+ for(Map.Entry<String,ConfigurationPlugin> item : _pluginConfiguration.entrySet())
+ {
+ sb.append("\n").append(item.getValue());
+ }
+
+ sb.append("]\n");
+
+ return sb.toString();
+ }
+
+ public String formatToString()
+ {
+ return super.toString();
+ }
}