diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 13:09:56 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 13:09:56 +0000 |
| commit | 8dbbb9ad305ca936fb924420ec31e27e9a9d0caf (patch) | |
| tree | 949ab5a030574ea199ee9821367eedb0c84131b6 /java/broker-plugins | |
| parent | cfb3a1ef5b743d68a2a78754ef0fdc750378d3cc (diff) | |
| download | qpid-python-8dbbb9ad305ca936fb924420ec31e27e9a9d0caf.tar.gz | |
QPID-2682: Move slow consumer disconnection mechanism to the broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966637 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-plugins')
25 files changed, 0 insertions, 2865 deletions
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF b/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF deleted file mode 100644 index 3d3d91381b..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF +++ /dev/null @@ -1,30 +0,0 @@ -Manifest-Version: 1.0 -Bundle-ManifestVersion: 2 -Bundle-Name: Qpid Slow Consumer Detection -Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true -Bundle-Version: 1.0.0 -Bundle-Activator: org.apache.qpid.server.virtualhost.plugin.Activator -Import-Package: org.osgi.framework, - org.apache.qpid.server.configuration.plugins, - org.apache.qpid.server.configuration, - org.apache.qpid.server.virtualhost.plugins, - org.apache.qpid.server.virtualhost, - org.apache.qpid.server.queue, - org.apache.qpid.server.binding, - org.apache.qpid.server.exchange, - org.apache.qpid.server.registry, - org.apache.qpid.server.plugins, - org.apache.qpid.server.protocol, - org.apache.qpid.server.logging, - org.apache.qpid.server.logging.actors, - org.apache.qpid.protocol, - org.apache.qpid.framing, - org.apache.qpid, - org.apache.log4j, - org.apache.commons.configuration -Bundle-RequiredExecutionEnvironment: JavaSE-1.6 -Bundle-ClassPath: . -Bundle-ActivationPolicy: lazy -Export-Package: org.apache.qpid.server.virtualhost.plugin;uses:="org.osgi.framework", - org.apache.qpid.server.virtualhost.plugin.policies - diff --git a/java/broker-plugins/experimental/slowconsumerdetection/build.xml b/java/broker-plugins/experimental/slowconsumerdetection/build.xml deleted file mode 100644 index 06ebc58030..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/build.xml +++ /dev/null @@ -1,34 +0,0 @@ -<!-- - - - - Licensed to the Apache Software Foundation (ASF) under one -nn - or more contributor license agreements. See the NOTICE file - -n distributed with this work for additional information - - regarding copyright ownership. The ASF licenses this file - - to you under the Apache License, Version 2.0 (the - - "License"); you may not use this file except in compliance - - with the License. You may obtain a copy of the License at - - - - http://www.apache.org/licenses/LICENSE-2.0 - - - - Unless required by applicable law or agreed to in writing, - - software distributed under the License is distributed on an - - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - - KIND, either express or implied. See the License for the - - specific language governing permissions and limitations - - under the License. - - - --> -<project name="Slow Consumer Disconnect" default="build"> - - <property name="module.depends" value="common broker broker-plugins"/> - <property name="module.test.depends" value="broker/test common/test systests client management/common"/> - <property name="module.manifest" value="MANIFEST.MF"/> - <property name="module.plugin" value="true"/> - - <import file="../../../module.xml"/> - - <target name="bundle" depends="bundle-tasks"/> - - <target name="precompile" depends="gen_logging"/> - -</project> diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java deleted file mode 100644 index dd63c9b698..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.plugin; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.ConversionException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; - -public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin -{ - public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException - { - SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public List<String> getParentPaths() - { - return Arrays.asList("virtualhosts.virtualhost.slow-consumer-detection"); - } - } - - //Set Default time unit to seconds - TimeUnit _timeUnit = TimeUnit.SECONDS; - - public String[] getElementsProcessed() - { - return new String[]{"delay", - "timeunit"}; - } - - public long getDelay() - { - return getLongValue("delay", 10); - } - - public TimeUnit getTimeUnit() - { - return _timeUnit; - } - - @Override - public void validateConfiguration() throws ConfigurationException - { - validatePositiveLong("delay"); - - String timeUnit = getStringValue("timeunit"); - - if (timeUnit != null) - { - try - { - _timeUnit = TimeUnit.valueOf(timeUnit.toUpperCase()); - } - catch (IllegalArgumentException iae) - { - throw new ConfigurationException("Unable to configure Slow Consumer Detection invalid TimeUnit:" + timeUnit); - } - } - - System.out.println("Configured SCDC"); - System.out.println("Delay:" + getDelay()); - System.out.println("TimeUnit:" + getTimeUnit()); - } -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java deleted file mode 100644 index 8e2ecff6fb..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.plugin; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; - -import java.util.Arrays; -import java.util.List; - -public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin -{ - public static class SlowConsumerDetectionPolicyConfigurationFactory implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException - { - SlowConsumerDetectionPolicyConfiguration slowConsumerConfig = new SlowConsumerDetectionPolicyConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public List<String> getParentPaths() - { - return Arrays.asList( - "virtualhosts.virtualhost.queues.slow-consumer-detection.policy", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy", - "virtualhosts.virtualhost.topics.slow-consumer-detection.policy", - "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy"); - } - } - - public String[] getElementsProcessed() - { - return new String[]{"name"}; - } - - public String getPolicyName() - { - return getStringValue("name"); - } - - @Override - public void validateConfiguration() throws ConfigurationException - { - if (getPolicyName() == null) - { - throw new ConfigurationException("No Slow consumer policy defined."); - } - } - - @Override - public String formatToString() - { - return "Policy:"+getPolicyName(); - } -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java deleted file mode 100644 index e825556e61..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.configuration.plugin; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; -import org.apache.qpid.server.plugins.PluginManager; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin -{ - private SlowConsumerPolicyPlugin _policyPlugin; - - public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException - { - SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public List<String> getParentPaths() - { - return Arrays.asList( - "virtualhosts.virtualhost.queues.slow-consumer-detection", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection", - "virtualhosts.virtualhost.topics.slow-consumer-detection", - "virtualhosts.virtualhost.topics.topic.slow-consumer-detection"); - } - } - - public String[] getElementsProcessed() - { - return new String[]{"messageAge", - "depth", - "messageCount"}; - } - - public long getMessageAge() - { - return getLongValue("messageAge"); - } - - public long getDepth() - { - return getLongValue("depth"); - } - - public long getMessageCount() - { - return getLongValue("messageCount"); - } - - public SlowConsumerPolicyPlugin getPolicy() - { - return _policyPlugin; - } - - @Override - public void validateConfiguration() throws ConfigurationException - { - if (!containsPositiveLong("messageAge") && - !containsPositiveLong("depth") && - !containsPositiveLong("messageCount")) - { - throw new ConfigurationException("At least one configuration property" + - "('messageAge','depth' or 'messageCount') must be specified."); - } - - SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName()); - - PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); - Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class); - - if (policyConfig == null) - { - throw new ConfigurationException("No Slow Consumer Policy specified. Known Policies:" + factories.keySet()); - } - - if (_logger.isDebugEnabled()) - { - Iterator<?> keys = policyConfig.getConfig().getKeys(); - - while (keys.hasNext()) - { - String key = (String) keys.next(); - - _logger.debug("Policy Keys:" + key); - } - - } - - SlowConsumerPolicyPluginFactory<SlowConsumerPolicyPlugin> pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase()); - - if (pluginFactory == null) - { - throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet()); - } - - _policyPlugin = pluginFactory.newInstance(policyConfig); - - // Debug the creation of this Config - _logger.debug(this); - } - - public String formatToString() - { - StringBuilder sb = new StringBuilder(); - if (getMessageAge() > 0) - { - sb.append("Age:").append(getMessageAge()).append(":"); - } - if (getDepth() > 0) - { - sb.append("Depth:").append(getDepth()).append(":"); - } - if (getMessageCount() > 0) - { - sb.append("Count:").append(getMessageCount()).append(":"); - } - - sb.append("Policy[").append(getPolicy()).append("]"); - return sb.toString(); - } -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java deleted file mode 100644 index 7b0168d436..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin; - -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; -import org.apache.qpid.server.virtualhost.plugin.policies.TopicDeletePolicy; -import org.apache.qpid.server.virtualhost.plugin.policies.TopicDeletePolicyConfiguration; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; - -/** - * Activator that loads our OSGi bundles for the Slow Consumer Detection plugin. - * - * This includes Configuration - * - * @author ritchiem - */ -public class Activator implements BundleActivator -{ - public void start(BundleContext ctx) throws Exception - { - if (null != ctx) - { - ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory(), null); - ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory(), null); - ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory(), null); - ctx.registerService(VirtualHostPluginFactory.class.getName(), new SlowConsumerDetection.SlowConsumerFactory(), null); - - ctx.registerService(SlowConsumerPolicyPluginFactory.class.getName(), new TopicDeletePolicy.TopicDeletePolicyFactory(), null); - ctx.registerService(ConfigurationPluginFactory.class.getName(), new TopicDeletePolicyConfiguration.TopicDeletePolicyConfigurationFactory(), null); - } - } - - public void stop(BundleContext ctx) throws Exception - { - // no need to do anything here, osgi will unregister the service for us - } - -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java deleted file mode 100644 index d947e9a367..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java +++ /dev/null @@ -1,86 +0,0 @@ -package org.apache.qpid.server.virtualhost.plugin; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.exchange.AbstractExchange; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.exchange.Exchange.BindingListener; -import org.apache.qpid.server.queue.AMQQueue; - -/** - * This is a listener that caches queues that are configured for slow consumer disconnection. - * - * There should be one listener per virtual host, which can be added to all exchanges on - * that host. - * - * TODO In future, it will be possible to configure the policy at runtime, so only the queue - * itself is cached, and the configuration looked up by the housekeeping thread. This means - * that there may be occasions where the copy of the cache contents retrieved by the thread - * does not contain queues that are configured, or that configured queues are not present. - * - * @see BindingListener - */ -public class ConfiguredQueueBindingListener implements BindingListener -{ - private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class); - - private String _vhostName; - private Set<AMQQueue> _cache = Collections.synchronizedSet(new HashSet<AMQQueue>()); - - public ConfiguredQueueBindingListener(String vhostName) - { - _vhostName = vhostName; - } - - /** - * @see BindingListener#bindingAdded(Exchange, Binding) - */ - public void bindingAdded(Exchange exchange, Binding binding) - { - processBinding(binding); - } - - /** - * @see BindingListener#bindingRemoved(Exchange, Binding) - */ - public void bindingRemoved(Exchange exchange, Binding binding) - { - processBinding(binding); - } - - private void processBinding(Binding binding) - { - AMQQueue queue = binding.getQueue(); - - SlowConsumerDetectionQueueConfiguration config = - queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); - if (config != null) - { - _cache.add(queue); - } - else - { - _cache.remove(queue); - } - } - - /** - * Lookup and return the cache of configured {@link AMQQueue}s. - * - * Note that when accessing the cached queues, the {@link Iterator} is not thread safe - * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the - * cache is returned. - * - * @return a copy of the cached {@link java.util.Set} of queues - */ - public Set<AMQQueue> getQueueCache() - { - return new HashSet<AMQQueue>(_cache); - } -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java deleted file mode 100644 index 7de95bbfa7..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.plugins.Plugin; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.plugin.logging.SlowConsumerDetectionMessages; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostHouseKeepingPlugin; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; - -class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin -{ - private SlowConsumerDetectionConfiguration _config; - private ConfiguredQueueBindingListener _listener; - - public static class SlowConsumerFactory implements VirtualHostPluginFactory - { - public SlowConsumerDetection newInstance(VirtualHost vhost) - { - SlowConsumerDetectionConfiguration config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class.getName()); - - if (config == null) - { - return null; - } - - SlowConsumerDetection plugin = new SlowConsumerDetection(vhost); - plugin.configure(config); - return plugin; - } - } - - /** - * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this - * cirtual host to record all the configured queues in a cache for processing by the housekeeping - * thread. - * - * @see Plugin#configure(ConfigurationPlugin) - */ - public void configure(ConfigurationPlugin config) - { - _config = (SlowConsumerDetectionConfiguration) config; - _listener = new ConfiguredQueueBindingListener(_virtualhost.getName()); - for (AMQShortString exchangeName : _virtualhost.getExchangeRegistry().getExchangeNames()) - { - _virtualhost.getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener); - } - } - - public SlowConsumerDetection(VirtualHost vhost) - { - super(vhost); - } - - public void execute() - { - CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING()); - - Set<AMQQueue> cache = _listener.getQueueCache(); - for (AMQQueue q : cache) - { - CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName())); - - try - { - SlowConsumerDetectionQueueConfiguration config = - q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); - if (checkQueueStatus(q, config)) - { - config.getPolicy().performPolicy(q); - } - } - catch (Exception e) - { - // Don't throw exceptions as this will stop the house keeping task from running. - _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e); - } - } - - CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE()); - } - - public long getDelay() - { - return _config.getDelay(); - } - - public TimeUnit getTimeUnit() - { - return _config.getTimeUnit(); - } - - /** - * Check the depth,messageSize,messageAge,messageCount values for this q - * - * @param q the queue to check - * @param config the queue configuration to compare against the queue state - * - * @return true if the queue has reached a threshold. - */ - private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) - { - if (config != null) - { - _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); - - int count = q.getMessageCount(); - - // First Check message counts - if ((config.getMessageCount() != 0 && count >= config.getMessageCount()) || - // The check queue depth - (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) || - // finally if we have messages on the queue check Arrival time. - // We must check count as OldestArrival time is Long.MAX_LONG when - // there are no messages. - (config.getMessageAge() != 0 && - ((count > 0) && q.getOldestMessageArrivalTime() >= config.getMessageAge()))) - { - - if (_logger.isDebugEnabled()) - { - _logger.debug("Detected Slow Consumer on Queue(" + q.getName() + ")"); - _logger.debug("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount()); - _logger.debug("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth()); - _logger.debug("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge()); - } - - return true; - } - } - return false; - } - -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties deleted file mode 100644 index 2714935a71..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/SlowConsumerDetection_logmessages.properties +++ /dev/null @@ -1,4 +0,0 @@ -#SlowConsumerDetection.logMessages -RUNNING = SCD-1001 : Running -COMPLETE = SCD-1002 : Complete -CHECKING_QUEUE = SCD-1003 : Checking Status of Queue {0}
\ No newline at end of file diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties deleted file mode 100644 index d0f5965c39..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/logging/TopicDeletePolicy_logmessages.properties +++ /dev/null @@ -1,3 +0,0 @@ -#TopicDeletePolicy.logMessages -DELETING_QUEUE = TDP-1001 : Deleting Queue -DISCONNECTING = TDP-1002 : Disconnecting Session
\ No newline at end of file diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java deleted file mode 100644 index 3bd4ae8d4e..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.policies; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.plugin.logging.TopicDeletePolicyMessages; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; -import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; - -public class TopicDeletePolicy implements SlowConsumerPolicyPlugin -{ - Logger _logger = Logger.getLogger(TopicDeletePolicy.class); - private TopicDeletePolicyConfiguration _configuration; - - public static class TopicDeletePolicyFactory implements SlowConsumerPolicyPluginFactory - { - public TopicDeletePolicy newInstance(ConfigurationPlugin configuration) throws ConfigurationException - { - TopicDeletePolicyConfiguration config = - configuration.getConfiguration(TopicDeletePolicyConfiguration.class.getName()); - - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(config); - return policy; - } - - public String getPluginName() - { - return "topicdelete"; - } - - public Class<TopicDeletePolicy> getPluginClass() - { - return TopicDeletePolicy.class; - } - } - - public void performPolicy(AMQQueue q) - { - if (q == null) - { - return; - } - - AMQSessionModel owner = q.getExclusiveOwningSession(); - - // Only process exclusive queues - if (owner == null) - { - return; - } - - //Only process Topics - if (!validateQueueIsATopic(q)) - { - return; - } - - try - { - CurrentActor.get().message(owner.getLogSubject(),TopicDeletePolicyMessages.DISCONNECTING()); - // Close the consumer . this will cause autoDelete Queues to be purged - owner.getConnectionModel(). - closeSession(owner, AMQConstant.RESOURCE_ERROR, - "Consuming to slow."); - - // Actively delete non autoDelete queues if deletePersistent is set - if (!q.isAutoDelete() && (_configuration != null && _configuration.deletePersistent())) - { - CurrentActor.get().message(q.getLogSubject(), TopicDeletePolicyMessages.DELETING_QUEUE()); - q.delete(); - } - - } - catch (AMQException e) - { - _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName()); - } - - } - - /** - * Check the queue bindings to validate the queue is bound to the - * topic exchange. - * - * @param q the Queue - * - * @return true iff Q is bound to a TopicExchange - */ - private boolean validateQueueIsATopic(AMQQueue q) - { - for (Binding binding : q.getBindings()) - { - if (binding.getExchange() instanceof TopicExchange) - { - return true; - } - } - - return false; - } - - public void configure(ConfigurationPlugin config) - { - _configuration = (TopicDeletePolicyConfiguration) config; - } - - @Override - public String toString() - { - return "TopicDelete" + (_configuration == null ? "" : "[" + _configuration + "]"); - } -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java deleted file mode 100644 index e6ad1cbcc3..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfiguration.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.policies; - -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; - -public class TopicDeletePolicyConfiguration extends ConfigurationPlugin -{ - - public static class TopicDeletePolicyConfigurationFactory - implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, - Configuration config) - throws ConfigurationException - { - TopicDeletePolicyConfiguration slowConsumerConfig = - new TopicDeletePolicyConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public List<String> getParentPaths() - { - return Arrays.asList( - "virtualhosts.virtualhost.queues.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.topics.slow-consumer-detection.policy.topicDelete", - "virtualhosts.virtualhost.topics.topic.slow-consumer-detection.policy.topicDelete"); - } - } - - public String[] getElementsProcessed() - { - return new String[]{"delete-persistent"}; - } - - @Override - public void validateConfiguration() throws ConfigurationException - { - // No validation required. - } - - public boolean deletePersistent() - { - // If we don't have configuration then we don't deletePersistent Queues - return (hasConfiguration() && contains("delete-persistent")); - } - - @Override - public String formatToString() - { - return (deletePersistent()?"delete-durable":""); - } - - -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java deleted file mode 100644 index 7f600abdc9..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.slowconsumerdetection.policies; - -import org.apache.qpid.server.plugins.Plugin; -import org.apache.qpid.server.queue.AMQQueue; - -public interface SlowConsumerPolicyPlugin extends Plugin -{ - public void performPolicy(AMQQueue Queue); -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java deleted file mode 100644 index b2fe6766a6..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.slowconsumerdetection.policies; - -import org.apache.qpid.server.plugins.PluginFactory; - -public interface SlowConsumerPolicyPluginFactory<P extends SlowConsumerPolicyPlugin> extends PluginFactory<P> -{ -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java deleted file mode 100644 index 40dc382d30..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionConfigurationTest.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin; - -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; -import org.apache.qpid.server.util.InternalBrokerBaseCase; - -import java.util.concurrent.TimeUnit; - -/** - * Provide Unit Test coverage of the virtualhost SlowConsumer Configuration - * This is what controls how often the plugin will execute - */ -public class SlowConsumerDetectionConfigurationTest extends InternalBrokerBaseCase -{ - - /** - * Default Testing: - * - * Provide a fully complete and valid configuration specifying 'delay' and - * 'timeunit' and ensure that it is correctly processed. - * - * Ensure no exceptions are thrown and that we get the same values back that - * were put into the configuration. - */ - public void testConfigLoadingValidConfig() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - long DELAY=10; - String TIMEUNIT=TimeUnit.MICROSECONDS.toString(); - xmlconfig.addProperty("delay", String.valueOf(DELAY)); - xmlconfig.addProperty("timeunit", TIMEUNIT); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - } - catch (ConfigurationException e) - { - e.printStackTrace(); - fail(e.getMessage()); - } - - assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); - assertEquals("TimeUnit not correctly returned.", - TIMEUNIT, String.valueOf(config.getTimeUnit())); - } - - /** - * Default Testing: - * - * Test Missing TimeUnit value gets default. - * - * The TimeUnit value is optional and default to SECONDS. - * - * Test that if we do not specify a TimeUnit then we correctly get seconds. - * - * Also verify that relying on the default does not impact the setting of - * the 'delay' value. - * - */ - public void testConfigLoadingMissingTimeUnitDefaults() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - long DELAY=10; - xmlconfig.addProperty("delay", String.valueOf(DELAY)); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - try - { - config.setConfiguration("", composite); - } - catch (ConfigurationException e) - { - e.printStackTrace(); - fail(e.getMessage()); - } - - assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); - assertEquals("Default TimeUnit incorrect", TimeUnit.SECONDS, config.getTimeUnit()); - } - - /** - * Input Testing: - * - * TimeUnit parsing requires the String value be in UpperCase. - * Ensure we can handle when the user doesn't know this. - * - * Same test as 'testConfigLoadingValidConfig' but checking that - * the timeunit field is not case sensitive. - * i.e. the toUpper is being correctly applied. - */ - public void testConfigLoadingValidConfigStrangeTimeUnit() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - long DELAY=10; - - xmlconfig.addProperty("delay", DELAY); - xmlconfig.addProperty("timeunit", "MiCrOsEcOnDs"); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - } - catch (ConfigurationException e) - { - e.printStackTrace(); - fail(e.getMessage()); - } - - assertEquals("Delay not correctly returned.", DELAY, config.getDelay()); - assertEquals("TimeUnit not correctly returned.", - TimeUnit.MICROSECONDS.toString(), String.valueOf(config.getTimeUnit())); - - } - - /** - * Failure Testing: - * - * Test that delay must be long not a string value. - * Provide a delay as a written value not a long. 'ten'. - * - * This should throw a configuration exception which is being trapped and - * verified to be the right exception, a NumberFormatException. - * - */ - public void testConfigLoadingInValidDelayString() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("delay", "ten"); - xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("Configuration should fail to validate"); - } - catch (ConfigurationException e) - { - Throwable cause = e.getCause(); - - assertEquals("Cause not correct", NumberFormatException.class, cause.getClass()); - } - } - - /** - * Failure Testing: - * - * Test that negative delays are invalid. - * - * Delay must be a positive value as negative delay means doesn't make sense. - * - * Configuration exception with a useful message should be thrown here. - * - */ - public void testConfigLoadingInValidDelayNegative() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("delay", "-10"); - xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("Configuration should fail to validate"); - } - catch (ConfigurationException e) - { - Throwable cause = e.getCause(); - - assertNotNull("Configuration Exception must not be null.", cause); - assertEquals("Cause not correct", - ConfigurationException.class, cause.getClass()); - assertEquals("Incorrect message.", - "SlowConsumerDetectionConfiguration: 'delay' must be a Positive Long value.", - cause.getMessage()); - } - } - - /** - * Failure Testing: - * - * Test that delay cannot be 0. - * - * A zero delay means run constantly. This is not how VirtualHostTasks - * are designed to be run so we dis-allow the use of 0 delay. - * - * Same test as 'testConfigLoadingInValidDelayNegative' but with a 0 value. - * - */ - public void testConfigLoadingInValidDelayZero() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("delay", "0"); - xmlconfig.addProperty("timeunit", TimeUnit.MICROSECONDS.toString()); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("Configuration should fail to validate"); - } - catch (ConfigurationException e) - { - Throwable cause = e.getCause(); - - assertNotNull("Configuration Exception must not be null.", cause); - assertEquals("Cause not correct", - ConfigurationException.class, cause.getClass()); - assertEquals("Incorrect message.", - "SlowConsumerDetectionConfiguration: 'delay' must be a Positive Long value.", - cause.getMessage()); - } - } - - /** - * Failure Testing: - * - * Test that missing delay fails. - * If we have no delay then we do not pick a default. So a Configuration - * Exception is thrown. - * - * */ - public void testConfigLoadingInValidMissingDelay() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("timeunit", TimeUnit.SECONDS.toString()); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - try - { - config.setConfiguration("", composite); - fail("Configuration should fail to validate"); - } - catch (ConfigurationException e) - { - assertEquals("Incorrect message.", "SlowConsumerDetectionConfiguration: unable to configure invalid delay:null", e.getMessage()); - } - } - - /** - * Failure Testing: - * - * Test that erroneous TimeUnit fails. - * - * Valid TimeUnit values vary based on the JVM version i.e. 1.6 added HOURS/DAYS etc. - * - * We don't test the values for TimeUnit are accepted other than MILLISECONDS in the - * positive testing at the start. - * - * Here we ensure that an erroneous for TimeUnit correctly throws an exception. - * - * We test with 'foo', which will never be a TimeUnit - * - */ - public void testConfigLoadingInValidTimeUnit() - { - SlowConsumerDetectionConfiguration config = new SlowConsumerDetectionConfiguration(); - - String TIMEUNIT = "foo"; - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("delay", "10"); - xmlconfig.addProperty("timeunit", TIMEUNIT); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - try - { - config.setConfiguration("", composite); - fail("Configuration should fail to validate"); - } - catch (ConfigurationException e) - { - assertEquals("Incorrect message.", "Unable to configure Slow Consumer Detection invalid TimeUnit:" + TIMEUNIT, e.getMessage()); - } - } - - -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java deleted file mode 100644 index 67c177f099..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionPolicyConfigurationTest.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin; - -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; -import org.apache.qpid.server.util.InternalBrokerBaseCase; - -/** - * Test class to ensure that the policy configuration can be processed. - */ -public class SlowConsumerDetectionPolicyConfigurationTest extends InternalBrokerBaseCase -{ - - /** - * Input Testing: - * - * Test that a given String can be set and retrieved through the configuration - * - * No validation is being performed to ensure that the policy exists. Only - * that a value can be set for the policy. - * - */ - public void testConfigLoadingValidConfig() - { - SlowConsumerDetectionPolicyConfiguration config = new SlowConsumerDetectionPolicyConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - String policyName = "TestPolicy"; - xmlconfig.addProperty("name", policyName); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - } - catch (ConfigurationException e) - { - e.printStackTrace(); - fail(e.getMessage()); - } - - assertEquals("Policy name not retrieved as expected.", - policyName, config.getPolicyName()); - } - - /** - * Failure Testing: - * - * Test that providing a configuration section without the 'name' field - * causes an exception to be thrown. - * - * An empty configuration is provided and the thrown exception message - * is checked to confirm the right reason. - * - */ - public void testConfigLoadingInValidConfig() - { - SlowConsumerDetectionPolicyConfiguration config = new SlowConsumerDetectionPolicyConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("Config is invalid so won't validate."); - } - catch (ConfigurationException e) - { - e.printStackTrace(); - assertEquals("Exception message not as expected.", "No Slow consumer policy defined.", e.getMessage()); - } - } - -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java deleted file mode 100644 index 57e3233eeb..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetectionQueueConfigurationTest.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin; - -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; -import org.apache.qpid.server.util.InternalBrokerBaseCase; - -/** - * Unit test the QueueConfiguration processing. - * - * This is slightly awkward as the SCDQC requries that a policy be available. - * - * So all the Valid test much catch the ensuing ConfigurationException and - * validate that the error is due to a lack of a valid Policy - */ -public class SlowConsumerDetectionQueueConfigurationTest extends InternalBrokerBaseCase -{ - - /** - * Test a fully loaded configuration file. - * - * It is not an error to have all control values specified. - * - * Here we need to catch the ConfigurationException that ensures due to lack - * of a Policy Plugin - */ - public void testConfigLoadingValidConfig() - { - SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("messageAge", "60000"); - xmlconfig.addProperty("depth", "1024"); - xmlconfig.addProperty("messageCount", "10"); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("No Policies are avaialbe to load in a unit test"); - } - catch (ConfigurationException e) - { - assertEquals("No Slow Consumer Policy specified. Known Policies:[]", - e.getMessage()); - } - } - - /** - * When we do not specify any control value then a ConfigurationException - * must be thrown to remind us. - */ - public void testConfigLoadingMissingConfig() - { - SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("No Policies are avaialbe to load in a unit test"); - } - catch (ConfigurationException e) - { - - assertEquals("At least one configuration property('messageAge','depth'" + - " or 'messageCount') must be specified.", e.getMessage()); - } - } - - /** - * Setting messageAge on its own is enough to have a valid configuration - * - * Here we need to catch the ConfigurationException that ensures due to lack - * of a Policy Plugin - */ - public void testConfigLoadingMessageAgeOk() - { - SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - xmlconfig.addProperty("messageAge", "60000"); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("No Policies are avaialbe to load in a unit test"); - } - catch (ConfigurationException e) - { - assertEquals("No Slow Consumer Policy specified. Known Policies:[]", - e.getMessage()); - } - } - - /** - * Setting depth on its own is enough to have a valid configuration - * - * Here we need to catch the ConfigurationException that ensures due to lack - * of a Policy Plugin - */ - public void testConfigLoadingDepthOk() - { - SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - xmlconfig.addProperty("depth", "1024"); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("No Policies are avaialbe to load in a unit test"); - } - catch (ConfigurationException e) - { - assertEquals("No Slow Consumer Policy specified. Known Policies:[]", - e.getMessage()); - } - } - - /** - * Setting messageCount on its own is enough to have a valid configuration - * - * Here we need to catch the ConfigurationException that ensures due to lack - * of a Policy Plugin - */ - public void testConfigLoadingMessageCountOk() - { - SlowConsumerDetectionQueueConfiguration config = new SlowConsumerDetectionQueueConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - xmlconfig.addProperty("messageCount", "10"); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("", composite); - fail("No Policies are avaialbe to load in a unit test"); - } - catch (ConfigurationException e) - { - assertEquals("No Slow Consumer Policy specified. Known Policies:[]", - e.getMessage()); - } - } -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java deleted file mode 100644 index 8b729a0f43..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyConfigurationTest.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.policies; - -import org.apache.commons.configuration.CompositeConfiguration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.server.util.InternalBrokerBaseCase; - -/** - * Test to ensure TopicDelete Policy configuration can be loaded. - */ -public class TopicDeletePolicyConfigurationTest extends InternalBrokerBaseCase -{ - /** - * Test without any configuration being provided that the - * deletePersistent option is disabled. - */ - public void testNoConfigNoDeletePersistent() - { - TopicDeletePolicyConfiguration config = new TopicDeletePolicyConfiguration(); - - assertFalse("TopicDelete Configuration with no config should not delete persistent queues.", - config.deletePersistent()); - } - - /** - * Test that with the correct configuration the deletePersistent option can - * be enabled. - * - * Test creates a new Configuration object and passes in the xml snippet - * that the ConfigurationPlugin would receive during normal execution. - * This is the XML that would be matched for this plugin: - * <topicdelete> - * <delete-persistent> - * <topicdelete> - * - * So it would be subset and passed in as just: - * <delete-persistent> - * - * - * The property should therefore be enabled. - * - */ - public void testConfigDeletePersistent() - { - TopicDeletePolicyConfiguration config = new TopicDeletePolicyConfiguration(); - - XMLConfiguration xmlconfig = new XMLConfiguration(); - - xmlconfig.addProperty("delete-persistent",""); - - // Create a CompositeConfiguration as this is what the broker uses - CompositeConfiguration composite = new CompositeConfiguration(); - composite.addConfiguration(xmlconfig); - - try - { - config.setConfiguration("",composite); - } - catch (ConfigurationException e) - { - fail(e.getMessage()); - } - - assertTrue("A configured TopicDelete should delete persistent queues.", - config.deletePersistent()); - } - -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java deleted file mode 100644 index 364766dfa7..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicyTest.java +++ /dev/null @@ -1,293 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.policies; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.XMLConfiguration; -import org.apache.qpid.AMQException; -import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.protocol.AMQProtocolSession; -import org.apache.qpid.server.protocol.InternalTestProtocolSession; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MockAMQQueue; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.util.InternalBrokerBaseCase; -import org.apache.qpid.server.virtualhost.VirtualHost; - -public class TopicDeletePolicyTest extends InternalBrokerBaseCase -{ - - TopicDeletePolicyConfiguration _config; - - VirtualHost _defaultVhost; - InternalTestProtocolSession _connection; - - public void setUp() throws Exception - { - super.setUp(); - - _defaultVhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getDefaultVirtualHost(); - - _connection = new InternalTestProtocolSession(_defaultVhost); - - _config = new TopicDeletePolicyConfiguration(); - - XMLConfiguration config = new XMLConfiguration(); - - _config.setConfiguration("", config); - } - - private MockAMQQueue createOwnedQueue() - { - MockAMQQueue queue = new MockAMQQueue("testQueue"); - - _defaultVhost.getQueueRegistry().registerQueue(queue); - - try - { - AMQChannel channel = new AMQChannel(_connection, 0, null); - _connection.addChannel(channel); - - queue.setExclusiveOwningSession(channel); - } - catch (AMQException e) - { - fail("Unable to create Channel:" + e.getMessage()); - } - - return queue; - } - - private void setQueueToAutoDelete(final AMQQueue queue) - { - ((MockAMQQueue) queue).setAutoDelete(true); - - queue.setDeleteOnNoConsumers(true); - final AMQProtocolSession.Task deleteQueueTask = - new AMQProtocolSession.Task() - { - public void doTask(AMQProtocolSession session) throws AMQException - { - queue.delete(); - } - }; - - ((AMQChannel) queue.getExclusiveOwningSession()).getProtocolSession().addSessionCloseTask(deleteQueueTask); - } - - /** Check that a null queue passed in does not upset the policy. */ - public void testNullQueueParameter() throws ConfigurationException - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - try - { - policy.performPolicy(null); - } - catch (Exception e) - { - fail("Exception should not be thrown:" + e.getMessage()); - } - - } - - /** - * Set a owning Session to null which means this is not an exclusive queue - * so the queue should not be deleted - */ - public void testNonExclusiveQueue() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - MockAMQQueue queue = createOwnedQueue(); - - queue.setExclusiveOwningSession(null); - - policy.performPolicy(queue); - - assertFalse("Queue should not be deleted", queue.isDeleted()); - assertFalse("Connection should not be closed", _connection.isClosed()); - } - - /** - * Test that exclusive JMS Queues are not deleted. - * Bind the queue to the direct exchange (so it is a JMS Queue). - * - * JMS Queues are not to be processed so this should not delete the queue. - */ - public void testQueuesAreNotProcessed() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new DirectExchange(), null)); - - policy.performPolicy(queue); - - assertFalse("Queue should not be deleted", queue.isDeleted()); - assertFalse("Connection should not be closed", _connection.isClosed()); - } - - /** - * Give a non auto-delete queue is bound to the topic exchange the - * TopicDeletePolicy will close the connection and delete the queue, - */ - public void testNonAutoDeleteTopicIsNotClosed() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); - - queue.setAutoDelete(false); - - policy.performPolicy(queue); - - assertFalse("Queue should not be deleted", queue.isDeleted()); - assertTrue("Connection should be closed", _connection.isClosed()); - } - - /** - * Give a auto-delete queue bound to the topic exchange the TopicDeletePolicy will - * close the connection and delete the queue - */ - public void testTopicIsClosed() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - final MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); - - setQueueToAutoDelete(queue); - - policy.performPolicy(queue); - - assertTrue("Queue should be deleted", queue.isDeleted()); - assertTrue("Connection should be closed", _connection.isClosed()); - } - - /** - * Give a queue bound to the topic exchange the TopicDeletePolicy will - * close the connection and NOT delete the queue - */ - public void testNonAutoDeleteTopicIsClosedNotDeleted() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); - - policy.performPolicy(queue); - - assertFalse("Queue should not be deleted", queue.isDeleted()); - assertTrue("Connection should be closed", _connection.isClosed()); - } - - /** - * Give a queue bound to the topic exchange the TopicDeletePolicy suitably - * configured with the delete-persistent tag will close the connection - * and delete the queue - */ - public void testPersistentTopicIsClosedAndDeleted() - { - //Set the config to delete persistent queues - _config.getConfig().addProperty("delete-persistent", ""); - - TopicDeletePolicy policy = new TopicDeletePolicy(); - policy.configure(_config); - - assertTrue("Config was not updated to delete Persistent topics", - _config.deletePersistent()); - - MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); - - policy.performPolicy(queue); - - assertTrue("Queue should be deleted", queue.isDeleted()); - assertTrue("Connection should be closed", _connection.isClosed()); - } - - /** - * Give a queue bound to the topic exchange the TopicDeletePolicy not - * configured to close a persistent queue - */ - public void testPersistentTopicIsClosedAndDeletedNullConfig() - { - TopicDeletePolicy policy = new TopicDeletePolicy(); - // Explicity say we are not configuring the policy. - policy.configure(null); - - MockAMQQueue queue = createOwnedQueue(); - - queue.addBinding(new Binding(null, "bindingKey", queue, new TopicExchange(), null)); - - policy.performPolicy(queue); - - assertFalse("Queue should not be deleted", queue.isDeleted()); - assertTrue("Connection should be closed", _connection.isClosed()); - } - - public void testNonExclusiveQueueNullConfig() - { - _config = null; - testNonExclusiveQueue(); - } - - public void testQueuesAreNotProcessedNullConfig() - { - _config = null; - testQueuesAreNotProcessed(); - } - - public void testNonAutoDeleteTopicIsNotClosedNullConfig() - { - _config = null; - testNonAutoDeleteTopicIsNotClosed(); - } - - public void testTopicIsClosedNullConfig() - { - _config = null; - testTopicIsClosed(); - } - - public void testNonAutoDeleteTopicIsClosedNotDeletedNullConfig() throws AMQException - { - _config = null; - testNonAutoDeleteTopicIsClosedNotDeleted(); - } - -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java deleted file mode 100644 index e0934faf44..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalQueuesTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; - -import javax.jms.Session; -import javax.naming.NamingException; -import java.io.IOException; - -/** - * QPID-1447 : Add slow consumer detection and disconnection. - * - * Slow consumers should on a topic should expect to receive a - * 506 : Resource Error if the hit a predefined threshold. - */ -public class GlobalQueuesTest extends TestingBaseCase -{ - - protected String CONFIG_SECTION = ".queues"; - - /** - * Queue Configuration - - <slow-consumer-detection> - <!-- The depth before which the policy will be applied--> - <depth>4235264</depth> - - <!-- The message age before which the policy will be applied--> - <messageAge>600000</messageAge> - - <!-- The number of message before which the policy will be applied--> - <messageCount>50</messageCount> - - <!-- Policies configuration --> - <policy> - <name>TopicDelete</name> - <topicDelete> - <delete-persistent/> - </topicDelete> - </policy> - </slow-consumer-detection> - - */ - - /** - * VirtualHost Plugin Configuration - - <slow-consumer-detection> - <delay>1</delay> - <timeunit>MINUTES</timeunit> - </slow-consumer-detection> - - */ - - public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException - { - setProperty(CONFIG_SECTION + ".slow-consumer-detection." + - "policy.name", "TopicDelete"); - - setProperty(CONFIG_SECTION + ".slow-consumer-detection." + - property, value); - - if (deleteDurable) - { - setProperty(CONFIG_SECTION + ".slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); - } - } - - /** - * Test that setting messageCount takes affect on topics - * - * We send 10 messages and disconnect at 9 - * - * @throws Exception - */ - public void testTopicConsumerMessageCount() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), false); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, false); - } - - /** - * Test that setting depth has an effect on topics - * - * Sets the message size for the test - * Sets the depth to be 9 * the depth - * Ensure that sending 10 messages causes the disconnection - * - * @throws Exception - */ - public void testTopicConsumerMessageSize() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), false); - - //Start the broker - startBroker(); - - setMessageSize(MESSAGE_SIZE); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, false); - } - - /** - * Test that setting messageAge has an effect on topics - * - * Sets the messageAge to be half the disconnection wait timeout - * Send 10 messages and then ensure that we get disconnected as we will - * wait for the full timeout. - * - * @throws Exception - */ - public void testTopicConsumerMessageAge() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 2), false); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, false); - } - - /** - * Test that setting messageCount takes affect on a durable Consumer - * - * Ensure we set the delete-persistent option - * - * We send 10 messages and disconnect at 9 - * - * @throws Exception - */ - - public void testTopicDurableConsumerMessageCount() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - /** - * Test that setting depth has an effect on durable consumer topics - * - * Ensure we set the delete-persistent option - * - * Sets the message size for the test - * Sets the depth to be 9 * the depth - * Ensure that sending 10 messages causes the disconnection - * - * @throws Exception - */ - public void testTopicDurableConsumerMessageSize() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true); - - //Start the broker - startBroker(); - - setMessageSize(MESSAGE_SIZE); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - /** - * Test that setting messageAge has an effect on topics - * - * Ensure we set the delete-persistent option - * - * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec) - * Send 10 messages and then ensure that we get disconnected as we will - * wait for the full timeout. - * - * @throws Exception - */ - public void testTopicDurableConsumerMessageAge() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java deleted file mode 100644 index aff5d1b1b8..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/GlobalTopicsTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; - -import javax.naming.NamingException; -import java.io.IOException; - -public class GlobalTopicsTest extends GlobalQueuesTest -{ - @Override - public void setUp() throws Exception - { - CONFIG_SECTION = ".topics"; - super.setUp(); - } -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java deleted file mode 100644 index e4efac60f8..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/MergeConfigurationTest.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.AMQChannelClosedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.NamingException; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class MergeConfigurationTest extends TestingBaseCase -{ - - protected int topicCount = 0; - - - public void configureTopic(String topic, int msgCount) throws NamingException, IOException, ConfigurationException - { - - setProperty(".topics.topic("+topicCount+").name", topic); - setProperty(".topics.topic("+topicCount+").slow-consumer-detection.messageCount", String.valueOf(msgCount)); - setProperty(".topics.topic("+topicCount+").slow-consumer-detection.policy.name", "TopicDelete"); - topicCount++; - } - - - /** - * Test that setting messageCount takes affect on topics - * - * We send 10 messages and disconnect at 9 - * - * @throws Exception - */ - public void testTopicConsumerMessageCount() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT * 4) - 1); - - //Configure topic as a subscription - setProperty(".topics.topic("+topicCount+").subscriptionName", "clientid:"+getTestQueueName()); - configureTopic(getName(), (MAX_QUEUE_MESSAGE_COUNT - 1)); - - - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - -// -// public void testMerge() throws ConfigurationException, AMQException -// { -// -// AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString(getName()+":stockSubscription"), false, new AMQShortString("testowner"), -// false, false, _virtualHost, null); -// -// _virtualHost.getQueueRegistry().registerQueue(queue); -// Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); -// _virtualHost.getBindingFactory().addBinding(getName(), queue, defaultExchange, null); -// -// -// Exchange topicExchange = _virtualHost.getExchangeRegistry().getExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME); -// _virtualHost.getBindingFactory().addBinding("stocks.nyse.orcl", queue, topicExchange, null); -// -// TopicConfig config = queue.getConfiguration().getConfiguration(TopicConfig.class.getName()); -// -// assertNotNull("Queue should have topic configuration bound to it.", config); -// assertEquals("Configuration name not correct", getName() + ":stockSubscription", config.getSubscriptionName()); -// -// ConfigurationPlugin scdConfig = queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); -// if (scdConfig instanceof org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration) -// { -// System.err.println("********************** scd is a SlowConsumerDetectionQueueConfiguration."); -// } -// else -// { -// System.err.println("********************** Test SCD "+SlowConsumerDetectionQueueConfiguration.class.getClassLoader()); -// System.err.println("********************** Broker SCD "+scdConfig.getClass().getClassLoader()); -// System.err.println("********************** Broker SCD "+scdConfig.getClass().isAssignableFrom(SlowConsumerDetectionQueueConfiguration.class)); -// System.err.println("********************** is a "+scdConfig.getClass()); -// } -// -// assertNotNull("Queue should have scd configuration bound to it.", scdConfig); -// assertEquals("MessageCount is not correct", 10 , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getMessageCount()); -// assertEquals("Policy is not correct", TopicDeletePolicy.class.getName() , ((SlowConsumerDetectionQueueConfiguration)scdConfig).getPolicy().getClass().getName()); -// } - -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SubscriptionTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SubscriptionTest.java deleted file mode 100644 index 9e9375fd44..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SubscriptionTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; - -import javax.jms.Session; -import javax.naming.NamingException; -import java.io.IOException; - -/** - * Test SCD when configured with Subscription details. - * - * We run the subscription based tests here to validate that the - * subscriptionname value is correctly associated with the subscription. - * - * - */ -public class SubscriptionTest extends TestingBaseCase -{ - private int _count=0; - protected String CONFIG_SECTION = ".topics.topic"; - - /** - * Add configuration for the queue that relates just to this test. - * We use the getTestQueueName() as our subscription. To ensure the - * config sections do not overlap we identify each section with a _count - * value. - * - * This would allow each test to configure more than one section. - * - * @param property to set - * @param value the value to set - * @param deleteDurable should deleteDurable be set. - * @throws NamingException - * @throws IOException - * @throws ConfigurationException - */ - public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException - { - setProperty(CONFIG_SECTION + "("+_count+").subscriptionName", "clientid:"+getTestQueueName()); - - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - "policy.name", "TopicDelete"); - - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - property, value); - - if (deleteDurable) - { - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); - } - _count++; - } - - - /** - * Test that setting messageCount takes affect on a durable Consumer - * - * Ensure we set the delete-persistent option - * - * We send 10 messages and disconnect at 9 - * - * @throws Exception - */ - - public void testTopicDurableConsumerMessageCount() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1), true); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - /** - * Test that setting depth has an effect on durable consumer topics - * - * Ensure we set the delete-persistent option - * - * Sets the message size for the test - * Sets the depth to be 9 * the depth - * Ensure that sending 10 messages causes the disconnection - * - * @throws Exception - */ - public void testTopicDurableConsumerMessageSize() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("depth", String.valueOf(MESSAGE_SIZE * 9), true); - - //Start the broker - startBroker(); - - setMessageSize(MESSAGE_SIZE); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - - /** - * Test that setting messageAge has an effect on topics - * - * Ensure we set the delete-persistent option - * - * Sets the messageAge to be 1/5 the disconnection wait timeout (or 1sec) - * Send 10 messages and then ensure that we get disconnected as we will - * wait for the full timeout. - * - * @throws Exception - */ - public void testTopicDurableConsumerMessageAge() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - - setConfig("messageAge", String.valueOf(DISCONNECTION_WAIT / 5), true); - - //Start the broker - startBroker(); - - topicConsumer(Session.AUTO_ACKNOWLEDGE, true); - } - -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TestingBaseCase.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TestingBaseCase.java deleted file mode 100644 index 9831c74574..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TestingBaseCase.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.AMQChannelClosedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.test.utils.QpidBrokerTestCase; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.naming.NamingException; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class TestingBaseCase extends QpidBrokerTestCase implements ExceptionListener, ConnectionListener -{ - - Topic _destination; - protected CountDownLatch _disconnectionLatch = new CountDownLatch(1); - protected int MAX_QUEUE_MESSAGE_COUNT; - protected int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE; - - private Thread _publisher; - protected static final long DISCONNECTION_WAIT = 5; - protected Exception _publisherError = null; - protected JMSException _connectionException = null; - private static final long JOIN_WAIT = 5000; - - @Override - public void setUp() throws Exception - { - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".slow-consumer-detection.delay", "1"); - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".slow-consumer-detection.timeunit", "SECONDS"); - - } - - - protected void setProperty(String property, String value) throws NamingException, IOException, ConfigurationException - { - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - property, value); - } - - - /** - * Create and start an asynchrounous publisher that will send MAX_QUEUE_MESSAGE_COUNT - * messages to the provided destination. Messages are sent in a new connection - * on a transaction. Any error is captured and the test is signalled to exit. - * - * @param destination - */ - private void startPublisher(final Destination destination) - { - _publisher = new Thread(new Runnable() - { - - public void run() - { - try - { - Connection connection = getConnection(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - - MessageProducer publisher = session.createProducer(destination); - - for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) - { - publisher.send(createNextMessage(session, count)); - session.commit(); - } - } - catch (Exception e) - { - _publisherError = e; - _disconnectionLatch.countDown(); - } - } - }); - - _publisher.start(); - } - - - - /** - * Perform the Main test of a topic Consumer with the given AckMode. - * - * Test creates a new connection and sets up the connection to prevent - * failover - * - * A new consumer is connected and started so that it will prefetch msgs. - * - * An asynchrounous publisher is started to fill the broker with messages. - * - * We then wait to be notified of the disconnection via the ExceptionListener - * - * 0-10 does not have the same notification paths but sync() apparently should - * give us the exception, currently it doesn't, so the test is excluded from 0-10 - * - * We should ensure that this test has the same path for all protocol versions. - * - * Clients should not have to modify their code based on the protocol in use. - * - * @param ackMode @see javax.jms.Session - * - * @throws Exception - */ - protected void topicConsumer(int ackMode, boolean durable) throws Exception - { - Connection connection = getConnection(); - - connection.setExceptionListener(this); - - Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode); - - _destination = session.createTopic(getName()); - - MessageConsumer consumer; - - if (durable) - { - consumer = session.createDurableSubscriber(_destination, getTestQueueName()); - } - else - { - consumer = session.createConsumer(_destination); - } - - connection.start(); - - // Start the consumer pre-fetching - // Don't care about response as we will fill the broker up with messages - // after this point and ensure that the client is disconnected at the - // right point. - consumer.receiveNoWait(); - startPublisher(_destination); - - boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); - - if (!disconnected && isBroker010()) - { - try - { - ((AMQSession_0_10) session).sync(); - } - catch (AMQException amqe) - { - JMSException jmsException = new JMSException(amqe.getMessage()); - jmsException.setLinkedException(amqe); - jmsException.initCause(amqe); - _connectionException = jmsException; - } - } - - assertTrue("Client was not disconnected.", _connectionException != null); - - Exception linked = _connectionException.getLinkedException(); - - _publisher.join(JOIN_WAIT); - - assertFalse("Publisher still running", _publisher.isAlive()); - - //Validate publishing occurred ok - if (_publisherError != null) - { - throw _publisherError; - } - - // NOTE these exceptions will need to be modeled so that they are not - // 0-8 specific. e.g. JMSSessionClosedException - - assertNotNull("No error received onException listener.", _connectionException); - - assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); - - assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass()); - - AMQChannelClosedException ccException = (AMQChannelClosedException) linked; - - assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode()); - } - - - // Exception Listener - - public void onException(JMSException e) - { - _connectionException = e; - - e.printStackTrace(); - - _disconnectionLatch.countDown(); - } - - /// Connection Listener - - public void bytesSent(long count) - { - } - - public void bytesReceived(long count) - { - } - - public boolean preFailover(boolean redirect) - { - // Prevent Failover - return false; - } - - public boolean preResubscribe() - { - return false; - } - - public void failoverComplete() - { - } -} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TopicTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TopicTest.java deleted file mode 100644 index 09c849cfde..0000000000 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/TopicTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; - -import javax.naming.NamingException; -import java.io.IOException; - -/** - * This Topic test extends the Global queue test so it will run all the topic - * and subscription tests. - * - * We redefine the CONFIG_SECTION here so that the configuration is written - * against a topic element. - * - * To complete the migration to testing 'topic' elements we also override - * the setConfig to use the test name as the topic name. - * - */ -public class TopicTest extends GlobalQueuesTest -{ - private int _count=0; - - @Override - public void setUp() throws Exception - { - CONFIG_SECTION = ".topics.topic"; - super.setUp(); - } - - /** - * Add configuration for the queue that relates just to this test. - * We use the getTestQueueName() as our subscription. To ensure the - * config sections do not overlap we identify each section with a _count - * value. - * - * This would allow each test to configure more than one section. - * - * @param property to set - * @param value the value to set - * @param deleteDurable should deleteDurable be set. - * @throws NamingException - * @throws IOException - * @throws ConfigurationException - */ - @Override - public void setConfig(String property, String value, boolean deleteDurable) throws NamingException, IOException, ConfigurationException - { - setProperty(CONFIG_SECTION + "("+_count+").name", getName()); - - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - "policy.name", "TopicDelete"); - - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - property, value); - - if (deleteDurable) - { - setProperty(CONFIG_SECTION + "("+_count+").slow-consumer-detection." + - "policy.topicdelete.delete-persistent", ""); - } - _count++; - } - - -} |
