summaryrefslogtreecommitdiff
path: root/java/broker-plugins
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker-plugins')
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF30
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/build.xml34
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java92
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java76
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java153
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java62
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java86
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java161
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties4
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties3
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java141
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java81
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java29
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java27
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java346
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java104
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java187
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java88
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java293
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java222
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java36
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java124
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SubscriptionTest.java146
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TestingBaseCase.java255
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TopicTest.java85
25 files changed, 0 insertions, 2865 deletions
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF b/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF
deleted file mode 100644
index 3d3d91381b..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF
+++ /dev/null
@@ -1,30 +0,0 @@
-Manifest-Version: 1.0
-Bundle-ManifestVersion: 2
-Bundle-Name: Qpid Slow Consumer Detection
-Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true
-Bundle-Version: 1.0.0
-Bundle-Activator: org.apache.qpid.server.virtualhost.plugin.Activator
-Import-Package: org.osgi.framework,
- org.apache.qpid.server.configuration.plugins,
- org.apache.qpid.server.configuration,
- org.apache.qpid.server.virtualhost.plugins,
- org.apache.qpid.server.virtualhost,
- org.apache.qpid.server.queue,
- org.apache.qpid.server.binding,
- org.apache.qpid.server.exchange,
- org.apache.qpid.server.registry,
- org.apache.qpid.server.plugins,
- org.apache.qpid.server.protocol,
- org.apache.qpid.server.logging,
- org.apache.qpid.server.logging.actors,
- org.apache.qpid.protocol,
- org.apache.qpid.framing,
- org.apache.qpid,
- org.apache.log4j,
- org.apache.commons.configuration
-Bundle-RequiredExecutionEnvironment: JavaSE-1.6
-Bundle-ClassPath: .
-Bundle-ActivationPolicy: lazy
-Export-Package: org.apache.qpid.server.virtualhost.plugin;uses:="org.osgi.framework",
- org.apache.qpid.server.virtualhost.plugin.policies
-
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/build.xml b/java/broker-plugins/experimental/slowconsumerdetection/build.xml
deleted file mode 100644
index 06ebc58030..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/build.xml
+++ /dev/null
@@ -1,34 +0,0 @@
-<!--
- -
- - Licensed to the Apache Software Foundation (ASF) under one
-nn - or more contributor license agreements. See the NOTICE file
- -n distributed with this work for additional information
- - regarding copyright ownership. The ASF licenses this file
- - to you under the Apache License, Version 2.0 (the
- - "License"); you may not use this file except in compliance
- - with the License. You may obtain a copy of the License at
- -
- - http://www.apache.org/licenses/LICENSE-2.0
- -
- - Unless required by applicable law or agreed to in writing,
- - software distributed under the License is distributed on an
- - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- - KIND, either express or implied. See the License for the
- - specific language governing permissions and limitations
- - under the License.
- -
- -->
-<project name="Slow Consumer Disconnect" default="build">
-
- <property name="module.depends" value="common broker broker-plugins"/>
- <property name="module.test.depends" value="broker/test common/test systests client management/common"/>
- <property name="module.manifest" value="MANIFEST.MF"/>
- <property name="module.plugin" value="true"/>
-
- <import file="../../../module.xml"/>
-
- <target name="bundle" depends="bundle-tasks"/>
-
- <target name="precompile" depends="gen_logging"/>
-
-</project>
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java
deleted file mode 100644
index dd63c9b698..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.configuration.plugin;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConversionException;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin
-{
- public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory
- {
- public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
- {
- SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration();
- slowConsumerConfig.setConfiguration(path, config);
- return slowConsumerConfig;
- }
-
- public List<String> getParentPaths()
- {
- return Arrays.asList("virtualhosts.virtualhost.slow-consumer-detection");
- }
- }
-
- //Set Default time unit to seconds
- TimeUnit _timeUnit = TimeUnit.SECONDS;
-
- public String[] getElementsProcessed()
- {
- return new String[]{"delay",
- "timeunit"};
- }
-
- public long getDelay()
- {
- return getLongValue("delay", 10);
- }
-
- public TimeUnit getTimeUnit()
- {
- return _timeUnit;
- }
-
- @Override
- public void validateConfiguration() throws ConfigurationException
- {
- validatePositiveLong("delay");
-
- String timeUnit = getStringValue("timeunit");
-
- if (timeUnit != null)
- {
- try
- {
- _timeUnit = TimeUnit.valueOf(timeUnit.toUpperCase());
- }
- catch (IllegalArgumentException iae)
- {
- throw new ConfigurationException("Unable to configure Slow Consumer Detection invalid TimeUnit:" + timeUnit);
- }
- }
-
- System.out.println("Configured SCDC");
- System.out.println("Delay:" + getDelay());
- System.out.println("TimeUnit:" + getTimeUnit());
- }
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
deleted file mode 100644
index 8e2ecff6fb..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.configuration.plugin;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin
-{
- public static class SlowConsumerDetectionPolicyConfigurationFactory implements ConfigurationPluginFactory
- {
- public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
- {
- SlowConsumerDetectionPolicyConfiguration slowConsumerConfig = new SlowConsumerDetectionPolicyConfiguration();
- slowConsumerConfig.setConfiguration(path, config);
- return slowConsumerConfig;
- }
-
- public List<String> getParentPaths()
- {
- return Arrays.asList(
- "virtualhosts.virtualhost.queues.slow-consumer-detection.policy",
- "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy",
- "virtualhosts.virtualhost.topics.slow-consumer-detection.policy",
- "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy");
- }
- }
-
- public String[] getElementsProcessed()
- {
- return new String[]{"name"};
- }
-
- public String getPolicyName()
- {
- return getStringValue("name");
- }
-
- @Override
- public void validateConfiguration() throws ConfigurationException
- {
- if (getPolicyName() == null)
- {
- throw new ConfigurationException("No Slow consumer policy defined.");
- }
- }
-
- @Override
- public String formatToString()
- {
- return "Policy:"+getPolicyName();
- }
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
deleted file mode 100644
index e825556e61..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.configuration.plugin;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
-import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
-{
- private SlowConsumerPolicyPlugin _policyPlugin;
-
- public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory
- {
- public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
- {
- SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration();
- slowConsumerConfig.setConfiguration(path, config);
- return slowConsumerConfig;
- }
-
- public List<String> getParentPaths()
- {
- return Arrays.asList(
- "virtualhosts.virtualhost.queues.slow-consumer-detection",
- "virtualhosts.virtualhost.queues.queue.slow-consumer-detection",
- "virtualhosts.virtualhost.topics.slow-consumer-detection",
- "virtualhosts.virtualhost.topics.topic.slow-consumer-detection");
- }
- }
-
- public String[] getElementsProcessed()
- {
- return new String[]{"messageAge",
- "depth",
- "messageCount"};
- }
-
- public long getMessageAge()
- {
- return getLongValue("messageAge");
- }
-
- public long getDepth()
- {
- return getLongValue("depth");
- }
-
- public long getMessageCount()
- {
- return getLongValue("messageCount");
- }
-
- public SlowConsumerPolicyPlugin getPolicy()
- {
- return _policyPlugin;
- }
-
- @Override
- public void validateConfiguration() throws ConfigurationException
- {
- if (!containsPositiveLong("messageAge") &&
- !containsPositiveLong("depth") &&
- !containsPositiveLong("messageCount"))
- {
- throw new ConfigurationException("At least one configuration property" +
- "('messageAge','depth' or 'messageCount') must be specified.");
- }
-
- SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName());
-
- PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager();
- Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class);
-
- if (policyConfig == null)
- {
- throw new ConfigurationException("No Slow Consumer Policy specified. Known Policies:" + factories.keySet());
- }
-
- if (_logger.isDebugEnabled())
- {
- Iterator<?> keys = policyConfig.getConfig().getKeys();
-
- while (keys.hasNext())
- {
- String key = (String) keys.next();
-
- _logger.debug("Policy Keys:" + key);
- }
-
- }
-
- SlowConsumerPolicyPluginFactory<SlowConsumerPolicyPlugin> pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase());
-
- if (pluginFactory == null)
- {
- throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet());
- }
-
- _policyPlugin = pluginFactory.newInstance(policyConfig);
-
- // Debug the creation of this Config
- _logger.debug(this);
- }
-
- public String formatToString()
- {
- StringBuilder sb = new StringBuilder();
- if (getMessageAge() > 0)
- {
- sb.append("Age:").append(getMessageAge()).append(":");
- }
- if (getDepth() > 0)
- {
- sb.append("Depth:").append(getDepth()).append(":");
- }
- if (getMessageCount() > 0)
- {
- sb.append("Count:").append(getMessageCount()).append(":");
- }
-
- sb.append("Policy[").append(getPolicy()).append("]");
- return sb.toString();
- }
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java
deleted file mode 100644
index 7b0168d436..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugin;
-
-import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration;
-import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration;
-import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-import org.apache.qpid.server.virtualhost.plugin.policies.TopicDeletePolicy;
-import org.apache.qpid.server.virtualhost.plugin.policies.TopicDeletePolicyConfiguration;
-import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
-import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-
-/**
- * Activator that loads our OSGi bundles for the Slow Consumer Detection plugin.
- *
- * This includes Configuration
- *
- * @author ritchiem
- */
-public class Activator implements BundleActivator
-{
- public void start(BundleContext ctx) throws Exception
- {
- if (null != ctx)
- {
- ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory(), null);
- ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory(), null);
- ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory(), null);
- ctx.registerService(VirtualHostPluginFactory.class.getName(), new SlowConsumerDetection.SlowConsumerFactory(), null);
-
- ctx.registerService(SlowConsumerPolicyPluginFactory.class.getName(), new TopicDeletePolicy.TopicDeletePolicyFactory(), null);
- ctx.registerService(ConfigurationPluginFactory.class.getName(), new TopicDeletePolicyConfiguration.TopicDeletePolicyConfigurationFactory(), null);
- }
- }
-
- public void stop(BundleContext ctx) throws Exception
- {
- // no need to do anything here, osgi will unregister the service for us
- }
-
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java
deleted file mode 100644
index d947e9a367..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package org.apache.qpid.server.virtualhost.plugin;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration;
-import org.apache.qpid.server.exchange.AbstractExchange;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.Exchange.BindingListener;
-import org.apache.qpid.server.queue.AMQQueue;
-
-/**
- * This is a listener that caches queues that are configured for slow consumer disconnection.
- *
- * There should be one listener per virtual host, which can be added to all exchanges on
- * that host.
- *
- * TODO In future, it will be possible to configure the policy at runtime, so only the queue
- * itself is cached, and the configuration looked up by the housekeeping thread. This means
- * that there may be occasions where the copy of the cache contents retrieved by the thread
- * does not contain queues that are configured, or that configured queues are not present.
- *
- * @see BindingListener
- */
-public class ConfiguredQueueBindingListener implements BindingListener
-{
- private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class);
-
- private String _vhostName;
- private Set<AMQQueue> _cache = Collections.synchronizedSet(new HashSet<AMQQueue>());
-
- public ConfiguredQueueBindingListener(String vhostName)
- {
- _vhostName = vhostName;
- }
-
- /**
- * @see BindingListener#bindingAdded(Exchange, Binding)
- */
- public void bindingAdded(Exchange exchange, Binding binding)
- {
- processBinding(binding);
- }
-
- /**
- * @see BindingListener#bindingRemoved(Exchange, Binding)
- */
- public void bindingRemoved(Exchange exchange, Binding binding)
- {
- processBinding(binding);
- }
-
- private void processBinding(Binding binding)
- {
- AMQQueue queue = binding.getQueue();
-
- SlowConsumerDetectionQueueConfiguration config =
- queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
- if (config != null)
- {
- _cache.add(queue);
- }
- else
- {
- _cache.remove(queue);
- }
- }
-
- /**
- * Lookup and return the cache of configured {@link AMQQueue}s.
- *
- * Note that when accessing the cached queues, the {@link Iterator} is not thread safe
- * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the
- * cache is returned.
- *
- * @return a copy of the cached {@link java.util.Set} of queues
- */
- public Set<AMQQueue> getQueueCache()
- {
- return new HashSet<AMQQueue>(_cache);
- }
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
deleted file mode 100644
index 7de95bbfa7..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- *
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugin;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration;
-import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.plugins.Plugin;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.plugin.logging.SlowConsumerDetectionMessages;
-import org.apache.qpid.server.virtualhost.plugins.VirtualHostHouseKeepingPlugin;
-import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
-
-class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
-{
- private SlowConsumerDetectionConfiguration _config;
- private ConfiguredQueueBindingListener _listener;
-
- public static class SlowConsumerFactory implements VirtualHostPluginFactory
- {
- public SlowConsumerDetection newInstance(VirtualHost vhost)
- {
- SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName());
-
- if (config == null)
- {
- return null;
- }
-
- SlowConsumerDetection plugin = new SlowConsumerDetection(vhost);
- plugin.configure(config);
- return plugin;
- }
- }
-
- /**
- * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this
- * cirtual host to record all the configured queues in a cache for processing by the housekeeping
- * thread.
- *
- * @see Plugin#configure(ConfigurationPlugin)
- */
- public void configure(ConfigurationPlugin config)
- {
- _config = (SlowConsumerDetectionConfiguration) config;
- _listener = new ConfiguredQueueBindingListener(_virtualhost.getName());
- for (AMQShortString exchangeName : _virtualhost.getExchangeRegistry().getExchangeNames())
- {
- _virtualhost.getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener);
- }
- }
-
- public SlowConsumerDetection(VirtualHost vhost)
- {
- super(vhost);
- }
-
- public void execute()
- {
- CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING());
-
- Set<AMQQueue> cache = _listener.getQueueCache();
- for (AMQQueue q : cache)
- {
- CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName()));
-
- try
- {
- SlowConsumerDetectionQueueConfiguration config =
- q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
- if (checkQueueStatus(q, config))
- {
- config.getPolicy().performPolicy(q);
- }
- }
- catch (Exception e)
- {
- // Don't throw exceptions as this will stop the house keeping task from running.
- _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e);
- }
- }
-
- CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE());
- }
-
- public long getDelay()
- {
- return _config.getDelay();
- }
-
- public TimeUnit getTimeUnit()
- {
- return _config.getTimeUnit();
- }
-
- /**
- * Check the depth,messageSize,messageAge,messageCount values for this q
- *
- * @param q the queue to check
- * @param config the queue configuration to compare against the queue state
- *
- * @return true if the queue has reached a threshold.
- */
- private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config)
- {
- if (config != null)
- {
- _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
-
- int count = q.getMessageCount();
-
- // First Check message counts
- if ((config.getMessageCount() != 0 && count >= config.getMessageCount()) ||
- // The check queue depth
- (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) ||
- // finally if we have messages on the queue check Arrival time.
- // We must check count as OldestArrival time is Long.MAX_LONG when
- // there are no messages.
- (config.getMessageAge() != 0 &&
- ((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge())))
- {
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Detected Slow Consumer on Queue(" + q.getName() + ")");
- _logger.debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount());
- _logger.debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth());
- _logger.debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge());
- }
-
- return true;
- }
- }
- return false;
- }
-
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties
deleted file mode 100644
index 2714935a71..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties
+++ /dev/null
@@ -1,4 +0,0 @@
-#SlowConsumerDetection.logMessages
-RUNNING = SCD-1001 : Running
-COMPLETE = SCD-1002 : Complete
-CHECKING_QUEUE = SCD-1003 : Checking Status of Queue {0} \ No newline at end of file
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties
deleted file mode 100644
index d0f5965c39..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties
+++ /dev/null
@@ -1,3 +0,0 @@
-#TopicDeletePolicy.logMessages
-DELETING_QUEUE = TDP-1001 : Deleting Queue
-DISCONNECTING = TDP-1002 : Disconnecting Session \ No newline at end of file
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java
deleted file mode 100644
index 3bd4ae8d4e..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugin.policies;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.exchange.TopicExchange;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.plugin.logging.TopicDeletePolicyMessages;
-import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
-import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
-
-public class TopicDeletePolicy implements SlowConsumerPolicyPlugin
-{
- Logger _logger = Logger.getLogger(TopicDeletePolicy.class);
- private TopicDeletePolicyConfiguration _configuration;
-
- public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory
- {
- public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException
- {
- TopicDeletePolicyConfiguration config =
- configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName());
-
- TopicDeletePolicy policy = new TopicDeletePolicy();
- policy.configure(config);
- return policy;
- }
-
- public String getPluginName()
- {
- return "topicdelete";
- }
-
- public Class<TopicDeletePolicy> getPluginClass()
- {
- return TopicDeletePolicy.class;
- }
- }
-
- public void performPolicy(AMQQueue q)
- {
- if (q == null)
- {
- return;
- }
-
- AMQSessionModel owner = q.getExclusiveOwningSession();
-
- // Only process exclusive queues
- if (owner == null)
- {
- return;
- }
-
- //Only process Topics
- if (!validateQueueIsATopic(q))
- {
- return;
- }
-
- try
- {
- CurrentActor.get().message(owner.getLogSubject(),TopicDeletePolicyMessages.DISCONNECTING());
- // Close the consumer . this will cause autoDelete Queues to be purged
- owner.getConnectionModel().
- closeSession(owner, AMQConstant.RESOURCE_ERROR,
- "Consuming to slow.");
-
- // Actively delete non autoDelete queues if deletePersistent is set
- if (!q.isAutoDelete() && (_configuration != null && _configuration.deletePersistent()))
- {
- CurrentActor.get().message(q.getLogSubject(), TopicDeletePolicyMessages.DELETING_QUEUE());
- q.delete();
- }
-
- }
- catch (AMQException e)
- {
- _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName());
- }
-
- }
-
- /**
- * Check the queue bindings to validate the queue is bound to the
- * topic exchange.
- *
- * @param q the Queue
- *
- * @return true iff Q is bound to a TopicExchange
- */
- private boolean validateQueueIsATopic(AMQQueue q)
- {
- for (Binding binding : q.getBindings())
- {
- if (binding.getExchange() instanceof TopicExchange)
- {
- return true;
- }
- }
-
- return false;
- }
-
- public void configure(ConfigurationPlugin config)
- {
- _configuration = (TopicDeletePolicyConfiguration) config;
- }
-
- @Override
- public String toString()
- {
- return "TopicDelete" + (_configuration == null ? "" : "[" + _configuration + "]");
- }
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java
deleted file mode 100644
index e6ad1cbcc3..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugin.policies;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory;
-
-public class TopicDeletePolicyConfiguration extends ConfigurationPlugin
-{
-
- public static class TopicDeletePolicyConfigurationFactory
- implements ConfigurationPluginFactory
- {
- public ConfigurationPlugin newInstance(String path,
- Configuration config)
- throws ConfigurationException
- {
- TopicDeletePolicyConfiguration slowConsumerConfig =
- new TopicDeletePolicyConfiguration();
- slowConsumerConfig.setConfiguration(path, config);
- return slowConsumerConfig;
- }
-
- public List<String> getParentPaths()
- {
- return Arrays.asList(
- "virtualhosts.virtualhost.queues.slow-consumer-detection.policy.topicDelete",
- "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy.topicDelete",
- "virtualhosts.virtualhost.topics.slow-consumer-detection.policy.topicDelete",
- "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy.topicDelete");
- }
- }
-
- public String[] getElementsProcessed()
- {
- return new String[]{"delete-persistent"};
- }
-
- @Override
- public void validateConfiguration() throws ConfigurationException
- {
- // No validation required.
- }
-
- public boolean deletePersistent()
- {
- // If we don't have configuration then we don't deletePersistent Queues
- return (hasConfiguration() && contains("delete-persistent"));
- }
-
- @Override
- public String formatToString()
- {
- return (deletePersistent()?"delete-durable":"");
- }
-
-
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java
deleted file mode 100644
index 7f600abdc9..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.slowconsumerdetection.policies;
-
-import org.apache.qpid.server.plugins.Plugin;
-import org.apache.qpid.server.queue.AMQQueue;
-
-public interface SlowConsumerPolicyPlugin extends Plugin
-{
- public void performPolicy(AMQQueue Queue);
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java
deleted file mode 100644
index b2fe6766a6..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.slowconsumerdetection.policies;
-
-import org.apache.qpid.server.plugins.PluginFactory;
-
-public interface SlowConsumerPolicyPluginFactory<P extends SlowConsumerPolicyPlugin> extends PluginFactory<P>
-{
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java
deleted file mode 100644
index 40dc382d30..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugin;
-
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Provide Unit Test coverage of the virtualhost SlowConsumer Configuration
- * This is what controls how often the plugin will execute
- */
-public class SlowConsumerDetectionConfigurationTest extends InternalBrokerBaseCase
-{
-
- /**
- * Default Testing:
- *
- * Provide a fully complete and valid configuration specifying 'delay' and
- * 'timeunit' and ensure that it is correctly processed.
- *
- * Ensure no exceptions are thrown and that we get the same values back that
- * were put into the configuration.
- */
- public void testConfigLoadingValidConfig()
- {
- SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- long DELAY=10;
- String TIMEUNIT=TimeUnit.MICROSECONDS.toString();
- xmlconfig.addProperty("delay", String.valueOf(DELAY));
- xmlconfig.addProperty("timeunit", TIMEUNIT);
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- }
- catch (ConfigurationException e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- assertEquals("Delay not correctly returned.", DELAY, config.getDelay());
- assertEquals("TimeUnit not correctly returned.",
- TIMEUNIT, String.valueOf(config.getTimeUnit()));
- }
-
- /**
- * Default Testing:
- *
- * Test Missing TimeUnit value gets default.
- *
- * The TimeUnit value is optional and default to SECONDS.
- *
- * Test that if we do not specify a TimeUnit then we correctly get seconds.
- *
- * Also verify that relying on the default does not impact the setting of
- * the 'delay' value.
- *
- */
- public void testConfigLoadingMissingTimeUnitDefaults()
- {
- SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- long DELAY=10;
- xmlconfig.addProperty("delay", String.valueOf(DELAY));
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
- try
- {
- config.setConfiguration("", composite);
- }
- catch (ConfigurationException e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- assertEquals("Delay not correctly returned.", DELAY, config.getDelay());
- assertEquals("Default TimeUnit incorrect", TimeUnit.SECONDS, config.getTimeUnit());
- }
-
- /**
- * Input Testing:
- *
- * TimeUnit parsing requires the String value be in UpperCase.
- * Ensure we can handle when the user doesn't know this.
- *
- * Same test as 'testConfigLoadingValidConfig' but checking that
- * the timeunit field is not case sensitive.
- * i.e. the toUpper is being correctly applied.
- */
- public void testConfigLoadingValidConfigStrangeTimeUnit()
- {
- SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- long DELAY=10;
-
- xmlconfig.addProperty("delay", DELAY);
- xmlconfig.addProperty("timeunit", "MiCrOsEcOnDs");
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- }
- catch (ConfigurationException e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- assertEquals("Delay not correctly returned.", DELAY, config.getDelay());
- assertEquals("TimeUnit not correctly returned.",
- TimeUnit.MICROSECONDS.toString(), String.valueOf(config.getTimeUnit()));
-
- }
-
- /**
- * Failure Testing:
- *
- * Test that delay must be long not a string value.
- * Provide a delay as a written value not a long. 'ten'.
- *
- * This should throw a configuration exception which is being trapped and
- * verified to be the right exception, a NumberFormatException.
- *
- */
- public void testConfigLoadingInValidDelayString()
- {
- SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- xmlconfig.addProperty("delay", "ten");
- xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString());
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- fail("Configuration should fail to validate");
- }
- catch (ConfigurationException e)
- {
- Throwable cause = e.getCause();
-
- assertEquals("Cause not correct", NumberFormatException.class, cause.getClass());
- }
- }
-
- /**
- * Failure Testing:
- *
- * Test that negative delays are invalid.
- *
- * Delay must be a positive value as negative delay means doesn't make sense.
- *
- * Configuration exception with a useful message should be thrown here.
- *
- */
- public void testConfigLoadingInValidDelayNegative()
- {
- SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- xmlconfig.addProperty("delay", "-10");
- xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString());
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- fail("Configuration should fail to validate");
- }
- catch (ConfigurationException e)
- {
- Throwable cause = e.getCause();
-
- assertNotNull("Configuration Exception must not be null.", cause);
- assertEquals("Cause not correct",
- ConfigurationException.class, cause.getClass());
- assertEquals("Incorrect message.",
- "SlowConsumerDetectionConfiguration: 'delay' must be a Positive Long value.",
- cause.getMessage());
- }
- }
-
- /**
- * Failure Testing:
- *
- * Test that delay cannot be 0.
- *
- * A zero delay means run constantly. This is not how VirtualHostTasks
- * are designed to be run so we dis-allow the use of 0 delay.
- *
- * Same test as 'testConfigLoadingInValidDelayNegative' but with a 0 value.
- *
- */
- public void testConfigLoadingInValidDelayZero()
- {
- SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- xmlconfig.addProperty("delay", "0");
- xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString());
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- fail("Configuration should fail to validate");
- }
- catch (ConfigurationException e)
- {
- Throwable cause = e.getCause();
-
- assertNotNull("Configuration Exception must not be null.", cause);
- assertEquals("Cause not correct",
- ConfigurationException.class, cause.getClass());
- assertEquals("Incorrect message.",
- "SlowConsumerDetectionConfiguration: 'delay' must be a Positive Long value.",
- cause.getMessage());
- }
- }
-
- /**
- * Failure Testing:
- *
- * Test that missing delay fails.
- * If we have no delay then we do not pick a default. So a Configuration
- * Exception is thrown.
- *
- * */
- public void testConfigLoadingInValidMissingDelay()
- {
- SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- xmlconfig.addProperty("timeunit", TimeUnit.SECONDS.toString());
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
- try
- {
- config.setConfiguration("", composite);
- fail("Configuration should fail to validate");
- }
- catch (ConfigurationException e)
- {
- assertEquals("Incorrect message.", "SlowConsumerDetectionConfiguration: unable to configure invalid delay:null", e.getMessage());
- }
- }
-
- /**
- * Failure Testing:
- *
- * Test that erroneous TimeUnit fails.
- *
- * Valid TimeUnit values vary based on the JVM version i.e. 1.6 added HOURS/DAYS etc.
- *
- * We don't test the values for TimeUnit are accepted other than MILLISECONDS in the
- * positive testing at the start.
- *
- * Here we ensure that an erroneous for TimeUnit correctly throws an exception.
- *
- * We test with 'foo', which will never be a TimeUnit
- *
- */
- public void testConfigLoadingInValidTimeUnit()
- {
- SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration();
-
- String TIMEUNIT = "foo";
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- xmlconfig.addProperty("delay", "10");
- xmlconfig.addProperty("timeunit", TIMEUNIT);
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
- try
- {
- config.setConfiguration("", composite);
- fail("Configuration should fail to validate");
- }
- catch (ConfigurationException e)
- {
- assertEquals("Incorrect message.", "Unable to configure Slow Consumer Detection invalid TimeUnit:" + TIMEUNIT, e.getMessage());
- }
- }
-
-
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java
deleted file mode 100644
index 67c177f099..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugin;
-
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-
-/**
- * Test class to ensure that the policy configuration can be processed.
- */
-public class SlowConsumerDetectionPolicyConfigurationTest extends InternalBrokerBaseCase
-{
-
- /**
- * Input Testing:
- *
- * Test that a given String can be set and retrieved through the configuration
- *
- * No validation is being performed to ensure that the policy exists. Only
- * that a value can be set for the policy.
- *
- */
- public void testConfigLoadingValidConfig()
- {
- SlowConsumerDetectionPolicyConfiguration config = new SlowConsumerDetectionPolicyConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- String policyName = "TestPolicy";
- xmlconfig.addProperty("name", policyName);
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- }
- catch (ConfigurationException e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- assertEquals("Policy name not retrieved as expected.",
- policyName, config.getPolicyName());
- }
-
- /**
- * Failure Testing:
- *
- * Test that providing a configuration section without the 'name' field
- * causes an exception to be thrown.
- *
- * An empty configuration is provided and the thrown exception message
- * is checked to confirm the right reason.
- *
- */
- public void testConfigLoadingInValidConfig()
- {
- SlowConsumerDetectionPolicyConfiguration config = new SlowConsumerDetectionPolicyConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- fail("Config is invalid so won't validate.");
- }
- catch (ConfigurationException e)
- {
- e.printStackTrace();
- assertEquals("Exception message not as expected.", "No Slow consumer policy defined.", e.getMessage());
- }
- }
-
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java
deleted file mode 100644
index 57e3233eeb..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugin;
-
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-
-/**
- * Unit test the QueueConfiguration processing.
- *
- * This is slightly awkward as the SCDQC requries that a policy be available.
- *
- * So all the Valid test much catch the ensuing ConfigurationException and
- * validate that the error is due to a lack of a valid Policy
- */
-public class SlowConsumerDetectionQueueConfigurationTest extends InternalBrokerBaseCase
-{
-
- /**
- * Test a fully loaded configuration file.
- *
- * It is not an error to have all control values specified.
- *
- * Here we need to catch the ConfigurationException that ensures due to lack
- * of a Policy Plugin
- */
- public void testConfigLoadingValidConfig()
- {
- SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- xmlconfig.addProperty("messageAge", "60000");
- xmlconfig.addProperty("depth", "1024");
- xmlconfig.addProperty("messageCount", "10");
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- fail("No Policies are avaialbe to load in a unit test");
- }
- catch (ConfigurationException e)
- {
- assertEquals("No Slow Consumer Policy specified. Known Policies:[]",
- e.getMessage());
- }
- }
-
- /**
- * When we do not specify any control value then a ConfigurationException
- * must be thrown to remind us.
- */
- public void testConfigLoadingMissingConfig()
- {
- SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- fail("No Policies are avaialbe to load in a unit test");
- }
- catch (ConfigurationException e)
- {
-
- assertEquals("At least one configuration property('messageAge','depth'" +
- " or 'messageCount') must be specified.", e.getMessage());
- }
- }
-
- /**
- * Setting messageAge on its own is enough to have a valid configuration
- *
- * Here we need to catch the ConfigurationException that ensures due to lack
- * of a Policy Plugin
- */
- public void testConfigLoadingMessageAgeOk()
- {
- SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
- xmlconfig.addProperty("messageAge", "60000");
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- fail("No Policies are avaialbe to load in a unit test");
- }
- catch (ConfigurationException e)
- {
- assertEquals("No Slow Consumer Policy specified. Known Policies:[]",
- e.getMessage());
- }
- }
-
- /**
- * Setting depth on its own is enough to have a valid configuration
- *
- * Here we need to catch the ConfigurationException that ensures due to lack
- * of a Policy Plugin
- */
- public void testConfigLoadingDepthOk()
- {
- SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
- xmlconfig.addProperty("depth", "1024");
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- fail("No Policies are avaialbe to load in a unit test");
- }
- catch (ConfigurationException e)
- {
- assertEquals("No Slow Consumer Policy specified. Known Policies:[]",
- e.getMessage());
- }
- }
-
- /**
- * Setting messageCount on its own is enough to have a valid configuration
- *
- * Here we need to catch the ConfigurationException that ensures due to lack
- * of a Policy Plugin
- */
- public void testConfigLoadingMessageCountOk()
- {
- SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
- xmlconfig.addProperty("messageCount", "10");
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("", composite);
- fail("No Policies are avaialbe to load in a unit test");
- }
- catch (ConfigurationException e)
- {
- assertEquals("No Slow Consumer Policy specified. Known Policies:[]",
- e.getMessage());
- }
- }
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java
deleted file mode 100644
index 8b729a0f43..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugin.policies;
-
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-
-/**
- * Test to ensure TopicDelete Policy configuration can be loaded.
- */
-public class TopicDeletePolicyConfigurationTest extends InternalBrokerBaseCase
-{
- /**
- * Test without any configuration being provided that the
- * deletePersistent option is disabled.
- */
- public void testNoConfigNoDeletePersistent()
- {
- TopicDeletePolicyConfiguration config = new TopicDeletePolicyConfiguration();
-
- assertFalse("TopicDelete Configuration with no config should not delete persistent queues.",
- config.deletePersistent());
- }
-
- /**
- * Test that with the correct configuration the deletePersistent option can
- * be enabled.
- *
- * Test creates a new Configuration object and passes in the xml snippet
- * that the ConfigurationPlugin would receive during normal execution.
- * This is the XML that would be matched for this plugin:
- * <topicdelete>
- * <delete-persistent>
- * <topicdelete>
- *
- * So it would be subset and passed in as just:
- * <delete-persistent>
- *
- *
- * The property should therefore be enabled.
- *
- */
- public void testConfigDeletePersistent()
- {
- TopicDeletePolicyConfiguration config = new TopicDeletePolicyConfiguration();
-
- XMLConfiguration xmlconfig = new XMLConfiguration();
-
- xmlconfig.addProperty("delete-persistent","");
-
- // Create a CompositeConfiguration as this is what the broker uses
- CompositeConfiguration composite = new CompositeConfiguration();
- composite.addConfiguration(xmlconfig);
-
- try
- {
- config.setConfiguration("",composite);
- }
- catch (ConfigurationException e)
- {
- fail(e.getMessage());
- }
-
- assertTrue("A configured TopicDelete should delete persistent queues.",
- config.deletePersistent());
- }
-
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java
deleted file mode 100644
index 364766dfa7..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost.plugin.policies;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.XMLConfiguration;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.TopicExchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.protocol.InternalTestProtocolSession;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MockAMQQueue;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.InternalBrokerBaseCase;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
-public class TopicDeletePolicyTest extends InternalBrokerBaseCase
-{
-
- TopicDeletePolicyConfiguration _config;
-
- VirtualHost _defaultVhost;
- InternalTestProtocolSession _connection;
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- _defaultVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getDefaultVirtualHost();
-
- _connection = new InternalTestProtocolSession(_defaultVhost);
-
- _config = new TopicDeletePolicyConfiguration();
-
- XMLConfiguration config = new XMLConfiguration();
-
- _config.setConfiguration("", config);
- }
-
- private MockAMQQueue createOwnedQueue()
- {
- MockAMQQueue queue = new MockAMQQueue("testQueue");
-
- _defaultVhost.getQueueRegistry().registerQueue(queue);
-
- try
- {
- AMQChannel channel = new AMQChannel(_connection, 0, null);
- _connection.addChannel(channel);
-
- queue.setExclusiveOwningSession(channel);
- }
- catch (AMQException e)
- {
- fail("Unable to create Channel:" + e.getMessage());
- }
-
- return queue;
- }
-
- private void setQueueToAutoDelete(final AMQQueue queue)
- {
- ((MockAMQQueue) queue).setAutoDelete(true);
-
- queue.setDeleteOnNoConsumers(true);
- final AMQProtocolSession.Task deleteQueueTask =
- new AMQProtocolSession.Task()
- {
- public void doTask(AMQProtocolSession session) throws AMQException
- {
- queue.delete();
- }
- };
-
- ((AMQChannel) queue.getExclusiveOwningSession()).getProtocolSession().addSessionCloseTask(deleteQueueTask);
- }
-
- /** Check that a null queue passed in does not upset the policy. */
- public void testNullQueueParameter() throws ConfigurationException
- {
- TopicDeletePolicy policy = new TopicDeletePolicy();
- policy.configure(_config);
-
- try
- {
- policy.performPolicy(null);
- }
- catch (Exception e)
- {
- fail("Exception should not be thrown:" + e.getMessage());
- }
-
- }
-
- /**
- * Set a owning Session to null which means this is not an exclusive queue
- * so the queue should not be deleted
- */
- public void testNonExclusiveQueue()
- {
- TopicDeletePolicy policy = new TopicDeletePolicy();
- policy.configure(_config);
-
- MockAMQQueue queue = createOwnedQueue();
-
- queue.setExclusiveOwningSession(null);
-
- policy.performPolicy(queue);
-
- assertFalse("Queue should not be deleted", queue.isDeleted());
- assertFalse("Connection should not be closed", _connection.isClosed());
- }
-
- /**
- * Test that exclusive JMS Queues are not deleted.
- * Bind the queue to the direct exchange (so it is a JMS Queue).
- *
- * JMS Queues are not to be processed so this should not delete the queue.
- */
- public void testQueuesAreNotProcessed()
- {
- TopicDeletePolicy policy = new TopicDeletePolicy();
- policy.configure(_config);
-
- MockAMQQueue queue = createOwnedQueue();
-
- queue.addBinding(new Binding(null, "bindingKey", queue, new DirectExchange(), null));
-
- policy.performPolicy(queue);
-
- assertFalse("Queue should not be deleted", queue.isDeleted());
- assertFalse("Connection should not be closed", _connection.isClosed());
- }
-
- /**
- * Give a non auto-delete queue is bound to the topic exchange the
- * TopicDeletePolicy will close the connection and delete the queue,
- */
- public void testNonAutoDeleteTopicIsNotClosed()
- {
- TopicDeletePolicy policy = new TopicDeletePolicy();
- policy.configure(_config);
-
- MockAMQQueue queue = createOwnedQueue();
-
- queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
-
- queue.setAutoDelete(false);
-
- policy.performPolicy(queue);
-
- assertFalse("Queue should not be deleted", queue.isDeleted());
- assertTrue("Connection should be closed", _connection.isClosed());
- }
-
- /**
- * Give a auto-delete queue bound to the topic exchange the TopicDeletePolicy will
- * close the connection and delete the queue
- */
- public void testTopicIsClosed()
- {
- TopicDeletePolicy policy = new TopicDeletePolicy();
- policy.configure(_config);
-
- final MockAMQQueue queue = createOwnedQueue();
-
- queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
-
- setQueueToAutoDelete(queue);
-
- policy.performPolicy(queue);
-
- assertTrue("Queue should be deleted", queue.isDeleted());
- assertTrue("Connection should be closed", _connection.isClosed());
- }
-
- /**
- * Give a queue bound to the topic exchange the TopicDeletePolicy will
- * close the connection and NOT delete the queue
- */
- public void testNonAutoDeleteTopicIsClosedNotDeleted()
- {
- TopicDeletePolicy policy = new TopicDeletePolicy();
- policy.configure(_config);
-
- MockAMQQueue queue = createOwnedQueue();
-
- queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
-
- policy.performPolicy(queue);
-
- assertFalse("Queue should not be deleted", queue.isDeleted());
- assertTrue("Connection should be closed", _connection.isClosed());
- }
-
- /**
- * Give a queue bound to the topic exchange the TopicDeletePolicy suitably
- * configured with the delete-persistent tag will close the connection
- * and delete the queue
- */
- public void testPersistentTopicIsClosedAndDeleted()
- {
- //Set the config to delete persistent queues
- _config.getConfig().addProperty("delete-persistent", "");
-
- TopicDeletePolicy policy = new TopicDeletePolicy();
- policy.configure(_config);
-
- assertTrue("Config was not updated to delete Persistent topics",
- _config.deletePersistent());
-
- MockAMQQueue queue = createOwnedQueue();
-
- queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
-
- policy.performPolicy(queue);
-
- assertTrue("Queue should be deleted", queue.isDeleted());
- assertTrue("Connection should be closed", _connection.isClosed());
- }
-
- /**
- * Give a queue bound to the topic exchange the TopicDeletePolicy not
- * configured to close a persistent queue
- */
- public void testPersistentTopicIsClosedAndDeletedNullConfig()
- {
- TopicDeletePolicy policy = new TopicDeletePolicy();
- // Explicity say we are not configuring the policy.
- policy.configure(null);
-
- MockAMQQueue queue = createOwnedQueue();
-
- queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null));
-
- policy.performPolicy(queue);
-
- assertFalse("Queue should not be deleted", queue.isDeleted());
- assertTrue("Connection should be closed", _connection.isClosed());
- }
-
- public void testNonExclusiveQueueNullConfig()
- {
- _config = null;
- testNonExclusiveQueue();
- }
-
- public void testQueuesAreNotProcessedNullConfig()
- {
- _config = null;
- testQueuesAreNotProcessed();
- }
-
- public void testNonAutoDeleteTopicIsNotClosedNullConfig()
- {
- _config = null;
- testNonAutoDeleteTopicIsNotClosed();
- }
-
- public void testTopicIsClosedNullConfig()
- {
- _config = null;
- testTopicIsClosed();
- }
-
- public void testNonAutoDeleteTopicIsClosedNotDeletedNullConfig() throws AMQException
- {
- _config = null;
- testNonAutoDeleteTopicIsClosedNotDeleted();
- }
-
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java
deleted file mode 100644
index e0934faf44..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.systest;
-
-import org.apache.commons.configuration.ConfigurationException;
-
-import javax.jms.Session;
-import javax.naming.NamingException;
-import java.io.IOException;
-
-/**
- * QPID-1447 : Add slow consumer detection and disconnection.
- *
- * Slow consumers should on a topic should expect to receive a
- * 506 : Resource Error if the hit a predefined threshold.
- */
-public class GlobalQueuesTest extends TestingBaseCase
-{
-
- protected String CONFIG_SECTION = ".queues";
-
- /**
- * Queue Configuration
-
- <slow-consumer-detection>
- <!-- The depth before which the policy will be applied-->
- <depth>4235264</depth>
-
- <!-- The message age before which the policy will be applied-->
- <messageAge>600000</messageAge>
-
- <!-- The number of message before which the policy will be applied-->
- <messageCount>50</messageCount>
-
- <!-- Policies configuration -->
- <policy>
- <name>TopicDelete</name>
- <topicDelete>
- <delete-persistent/>
- </topicDelete>
- </policy>
- </slow-consumer-detection>
-
- */
-
- /**
- * VirtualHost Plugin Configuration
-
- <slow-consumer-detection>
- <delay>1</delay>
- <timeunit>MINUTES</timeunit>
- </slow-consumer-detection>
-
- */
-
- public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException
- {
- setProperty(CONFIG_SECTION + ".slow-consumer-detection." +
- "policy.name", "TopicDelete");
-
- setProperty(CONFIG_SECTION + ".slow-consumer-detection." +
- property, value);
-
- if (deleteDurable)
- {
- setProperty(CONFIG_SECTION + ".slow-consumer-detection." +
- "policy.topicdelete.delete-persistent", "");
- }
- }
-
- /**
- * Test that setting messageCount takes affect on topics
- *
- * We send 10 messages and disconnect at 9
- *
- * @throws Exception
- */
- public void testTopicConsumerMessageCount() throws Exception
- {
- MAX_QUEUE_MESSAGE_COUNT = 10;
-
- setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), false);
-
- //Start the broker
- startBroker();
-
- topicConsumer(Session.AUTO_ACKNOWLEDGE, false);
- }
-
- /**
- * Test that setting depth has an effect on topics
- *
- * Sets the message size for the test
- * Sets the depth to be 9 * the depth
- * Ensure that sending 10 messages causes the disconnection
- *
- * @throws Exception
- */
- public void testTopicConsumerMessageSize() throws Exception
- {
- MAX_QUEUE_MESSAGE_COUNT = 10;
-
- setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), false);
-
- //Start the broker
- startBroker();
-
- setMessageSize(MESSAGE_SIZE);
-
- topicConsumer(Session.AUTO_ACKNOWLEDGE, false);
- }
-
- /**
- * Test that setting messageAge has an effect on topics
- *
- * Sets the messageAge to be half the disconnection wait timeout
- * Send 10 messages and then ensure that we get disconnected as we will
- * wait for the full timeout.
- *
- * @throws Exception
- */
- public void testTopicConsumerMessageAge() throws Exception
- {
- MAX_QUEUE_MESSAGE_COUNT = 10;
-
- setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 2), false);
-
- //Start the broker
- startBroker();
-
- topicConsumer(Session.AUTO_ACKNOWLEDGE, false);
- }
-
- /**
- * Test that setting messageCount takes affect on a durable Consumer
- *
- * Ensure we set the delete-persistent option
- *
- * We send 10 messages and disconnect at 9
- *
- * @throws Exception
- */
-
- public void testTopicDurableConsumerMessageCount() throws Exception
- {
- MAX_QUEUE_MESSAGE_COUNT = 10;
-
- setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true);
-
- //Start the broker
- startBroker();
-
- topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
- }
-
- /**
- * Test that setting depth has an effect on durable consumer topics
- *
- * Ensure we set the delete-persistent option
- *
- * Sets the message size for the test
- * Sets the depth to be 9 * the depth
- * Ensure that sending 10 messages causes the disconnection
- *
- * @throws Exception
- */
- public void testTopicDurableConsumerMessageSize() throws Exception
- {
- MAX_QUEUE_MESSAGE_COUNT = 10;
-
- setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true);
-
- //Start the broker
- startBroker();
-
- setMessageSize(MESSAGE_SIZE);
-
- topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
- }
-
- /**
- * Test that setting messageAge has an effect on topics
- *
- * Ensure we set the delete-persistent option
- *
- * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec)
- * Send 10 messages and then ensure that we get disconnected as we will
- * wait for the full timeout.
- *
- * @throws Exception
- */
- public void testTopicDurableConsumerMessageAge() throws Exception
- {
- MAX_QUEUE_MESSAGE_COUNT = 10;
-
- setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true);
-
- //Start the broker
- startBroker();
-
- topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
- }
-
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java
deleted file mode 100644
index aff5d1b1b8..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.systest;
-
-import org.apache.commons.configuration.ConfigurationException;
-
-import javax.naming.NamingException;
-import java.io.IOException;
-
-public class GlobalTopicsTest extends GlobalQueuesTest
-{
- @Override
- public void setUp() throws Exception
- {
- CONFIG_SECTION = ".topics";
- super.setUp();
- }
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java
deleted file mode 100644
index e4efac60f8..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.systest;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.AMQChannelClosedException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.naming.NamingException;
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-public class MergeConfigurationTest extends TestingBaseCase
-{
-
- protected int topicCount = 0;
-
-
- public void configureTopic(String topic, int msgCount) throws NamingException, IOException, ConfigurationException
- {
-
- setProperty(".topics.topic("+topicCount+").name", topic);
- setProperty(".topics.topic("+topicCount+").slow-consumer-detection.messageCount", String.valueOf(msgCount));
- setProperty(".topics.topic("+topicCount+").slow-consumer-detection.policy.name", "TopicDelete");
- topicCount++;
- }
-
-
- /**
- * Test that setting messageCount takes affect on topics
- *
- * We send 10 messages and disconnect at 9
- *
- * @throws Exception
- */
- public void testTopicConsumerMessageCount() throws Exception
- {
- MAX_QUEUE_MESSAGE_COUNT = 10;
-
- configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT * 4) - 1);
-
- //Configure topic as a subscription
- setProperty(".topics.topic("+topicCount+").subscriptionName", "clientid:"+getTestQueueName());
- configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT - 1));
-
-
-
- //Start the broker
- startBroker();
-
- topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
- }
-
-
-//
-// public void testMerge() throws ConfigurationException, AMQException
-// {
-//
-// AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"), false, new AMQShortString("testowner"),
-// false, false, _virtualHost, null);
-//
-// _virtualHost.getQueueRegistry().registerQueue(queue);
-// Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
-// _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange, null);
-//
-//
-// Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME);
-// _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange, null);
-//
-// TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName());
-//
-// assertNotNull("Queue should have topic configuration bound to it.", config);
-// assertEquals("Configuration name not correct", getName() + ":stockSubscription", config.getSubscriptionName());
-//
-// ConfigurationPlugin scdConfig = queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
-// if (scdConfig instanceof org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration)
-// {
-// System.err.println("********************** scd is a SlowConsumerDetectionQueueConfiguration.");
-// }
-// else
-// {
-// System.err.println("********************** Test SCD "+SlowConsumerDetectionQueueConfiguration.class.getClassLoader());
-// System.err.println("********************** Broker SCD "+scdConfig.getClass().getClassLoader());
-// System.err.println("********************** Broker SCD "+scdConfig.getClass().isAssignableFrom(SlowConsumerDetectionQueueConfiguration.class));
-// System.err.println("********************** is a "+scdConfig.getClass());
-// }
-//
-// assertNotNull("Queue should have scd configuration bound to it.", scdConfig);
-// assertEquals("MessageCount is not correct", 10 , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getMessageCount());
-// assertEquals("Policy is not correct", TopicDeletePolicy.class.getName() , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getPolicy().getClass().getName());
-// }
-
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SubscriptionTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SubscriptionTest.java
deleted file mode 100644
index 9e9375fd44..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SubscriptionTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.systest;
-
-import org.apache.commons.configuration.ConfigurationException;
-
-import javax.jms.Session;
-import javax.naming.NamingException;
-import java.io.IOException;
-
-/**
- * Test SCD when configured with Subscription details.
- *
- * We run the subscription based tests here to validate that the
- * subscriptionname value is correctly associated with the subscription.
- *
- *
- */
-public class SubscriptionTest extends TestingBaseCase
-{
- private int _count=0;
- protected String CONFIG_SECTION = ".topics.topic";
-
- /**
- * Add configuration for the queue that relates just to this test.
- * We use the getTestQueueName() as our subscription. To ensure the
- * config sections do not overlap we identify each section with a _count
- * value.
- *
- * This would allow each test to configure more than one section.
- *
- * @param property to set
- * @param value the value to set
- * @param deleteDurable should deleteDurable be set.
- * @throws NamingException
- * @throws IOException
- * @throws ConfigurationException
- */
- public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException
- {
- setProperty(CONFIG_SECTION + "("+_count+").subscriptionName", "clientid:"+getTestQueueName());
-
- setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
- "policy.name", "TopicDelete");
-
- setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
- property, value);
-
- if (deleteDurable)
- {
- setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
- "policy.topicdelete.delete-persistent", "");
- }
- _count++;
- }
-
-
- /**
- * Test that setting messageCount takes affect on a durable Consumer
- *
- * Ensure we set the delete-persistent option
- *
- * We send 10 messages and disconnect at 9
- *
- * @throws Exception
- */
-
- public void testTopicDurableConsumerMessageCount() throws Exception
- {
- MAX_QUEUE_MESSAGE_COUNT = 10;
-
- setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true);
-
- //Start the broker
- startBroker();
-
- topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
- }
-
- /**
- * Test that setting depth has an effect on durable consumer topics
- *
- * Ensure we set the delete-persistent option
- *
- * Sets the message size for the test
- * Sets the depth to be 9 * the depth
- * Ensure that sending 10 messages causes the disconnection
- *
- * @throws Exception
- */
- public void testTopicDurableConsumerMessageSize() throws Exception
- {
- MAX_QUEUE_MESSAGE_COUNT = 10;
-
- setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true);
-
- //Start the broker
- startBroker();
-
- setMessageSize(MESSAGE_SIZE);
-
- topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
- }
-
- /**
- * Test that setting messageAge has an effect on topics
- *
- * Ensure we set the delete-persistent option
- *
- * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec)
- * Send 10 messages and then ensure that we get disconnected as we will
- * wait for the full timeout.
- *
- * @throws Exception
- */
- public void testTopicDurableConsumerMessageAge() throws Exception
- {
- MAX_QUEUE_MESSAGE_COUNT = 10;
-
- setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true);
-
- //Start the broker
- startBroker();
-
- topicConsumer(Session.AUTO_ACKNOWLEDGE, true);
- }
-
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TestingBaseCase.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TestingBaseCase.java
deleted file mode 100644
index 9831c74574..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TestingBaseCase.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.systest;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.qpid.AMQChannelClosedException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.naming.NamingException;
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener
-{
-
- Topic _destination;
- protected CountDownLatch _disconnectionLatch = new CountDownLatch(1);
- protected int MAX_QUEUE_MESSAGE_COUNT;
- protected int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE;
-
- private Thread _publisher;
- protected static final long DISCONNECTION_WAIT = 5;
- protected Exception _publisherError = null;
- protected JMSException _connectionException = null;
- private static final long JOIN_WAIT = 5000;
-
- @Override
- public void setUp() throws Exception
- {
-
- setConfigurationProperty("virtualhosts.virtualhost."
- + getConnectionURL().getVirtualHost().substring(1) +
- ".slow-consumer-detection.delay", "1");
-
- setConfigurationProperty("virtualhosts.virtualhost."
- + getConnectionURL().getVirtualHost().substring(1) +
- ".slow-consumer-detection.timeunit", "SECONDS");
-
- }
-
-
- protected void setProperty(String property, String value) throws NamingException, IOException, ConfigurationException
- {
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- property, value);
- }
-
-
- /**
- * Create and start an asynchrounous publisher that will send MAX_QUEUE_MESSAGE_COUNT
- * messages to the provided destination. Messages are sent in a new connection
- * on a transaction. Any error is captured and the test is signalled to exit.
- *
- * @param destination
- */
- private void startPublisher(final Destination destination)
- {
- _publisher = new Thread(new Runnable()
- {
-
- public void run()
- {
- try
- {
- Connection connection = getConnection();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-
- MessageProducer publisher = session.createProducer(destination);
-
- for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++)
- {
- publisher.send(createNextMessage(session, count));
- session.commit();
- }
- }
- catch (Exception e)
- {
- _publisherError = e;
- _disconnectionLatch.countDown();
- }
- }
- });
-
- _publisher.start();
- }
-
-
-
- /**
- * Perform the Main test of a topic Consumer with the given AckMode.
- *
- * Test creates a new connection and sets up the connection to prevent
- * failover
- *
- * A new consumer is connected and started so that it will prefetch msgs.
- *
- * An asynchrounous publisher is started to fill the broker with messages.
- *
- * We then wait to be notified of the disconnection via the ExceptionListener
- *
- * 0-10 does not have the same notification paths but sync() apparently should
- * give us the exception, currently it doesn't, so the test is excluded from 0-10
- *
- * We should ensure that this test has the same path for all protocol versions.
- *
- * Clients should not have to modify their code based on the protocol in use.
- *
- * @param ackMode @see javax.jms.Session
- *
- * @throws Exception
- */
- protected void topicConsumer(int ackMode, boolean durable) throws Exception
- {
- Connection connection = getConnection();
-
- connection.setExceptionListener(this);
-
- Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode);
-
- _destination = session.createTopic(getName());
-
- MessageConsumer consumer;
-
- if (durable)
- {
- consumer = session.createDurableSubscriber(_destination, getTestQueueName());
- }
- else
- {
- consumer = session.createConsumer(_destination);
- }
-
- connection.start();
-
- // Start the consumer pre-fetching
- // Don't care about response as we will fill the broker up with messages
- // after this point and ensure that the client is disconnected at the
- // right point.
- consumer.receiveNoWait();
- startPublisher(_destination);
-
- boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS);
-
- if (!disconnected && isBroker010())
- {
- try
- {
- ((AMQSession_0_10) session).sync();
- }
- catch (AMQException amqe)
- {
- JMSException jmsException = new JMSException(amqe.getMessage());
- jmsException.setLinkedException(amqe);
- jmsException.initCause(amqe);
- _connectionException = jmsException;
- }
- }
-
- assertTrue("Client was not disconnected.", _connectionException != null);
-
- Exception linked = _connectionException.getLinkedException();
-
- _publisher.join(JOIN_WAIT);
-
- assertFalse("Publisher still running", _publisher.isAlive());
-
- //Validate publishing occurred ok
- if (_publisherError != null)
- {
- throw _publisherError;
- }
-
- // NOTE these exceptions will need to be modeled so that they are not
- // 0-8 specific. e.g. JMSSessionClosedException
-
- assertNotNull("No error received onException listener.", _connectionException);
-
- assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked);
-
- assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass());
-
- AMQChannelClosedException ccException = (AMQChannelClosedException) linked;
-
- assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode());
- }
-
-
- // Exception Listener
-
- public void onException(JMSException e)
- {
- _connectionException = e;
-
- e.printStackTrace();
-
- _disconnectionLatch.countDown();
- }
-
- /// Connection Listener
-
- public void bytesSent(long count)
- {
- }
-
- public void bytesReceived(long count)
- {
- }
-
- public boolean preFailover(boolean redirect)
- {
- // Prevent Failover
- return false;
- }
-
- public boolean preResubscribe()
- {
- return false;
- }
-
- public void failoverComplete()
- {
- }
-}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TopicTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TopicTest.java
deleted file mode 100644
index 09c849cfde..0000000000
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TopicTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.systest;
-
-import org.apache.commons.configuration.ConfigurationException;
-
-import javax.naming.NamingException;
-import java.io.IOException;
-
-/**
- * This Topic test extends the Global queue test so it will run all the topic
- * and subscription tests.
- *
- * We redefine the CONFIG_SECTION here so that the configuration is written
- * against a topic element.
- *
- * To complete the migration to testing 'topic' elements we also override
- * the setConfig to use the test name as the topic name.
- *
- */
-public class TopicTest extends GlobalQueuesTest
-{
- private int _count=0;
-
- @Override
- public void setUp() throws Exception
- {
- CONFIG_SECTION = ".topics.topic";
- super.setUp();
- }
-
- /**
- * Add configuration for the queue that relates just to this test.
- * We use the getTestQueueName() as our subscription. To ensure the
- * config sections do not overlap we identify each section with a _count
- * value.
- *
- * This would allow each test to configure more than one section.
- *
- * @param property to set
- * @param value the value to set
- * @param deleteDurable should deleteDurable be set.
- * @throws NamingException
- * @throws IOException
- * @throws ConfigurationException
- */
- @Override
- public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException
- {
- setProperty(CONFIG_SECTION + "("+_count+").name", getName());
-
- setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
- "policy.name", "TopicDelete");
-
- setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
- property, value);
-
- if (deleteDurable)
- {
- setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." +
- "policy.topicdelete.delete-persistent", "");
- }
- _count++;
- }
-
-
-}