From de08fcfd5568f9eecc5e59e7ba00617dbfa6d017 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 7 May 2010 15:16:27 +0000 Subject: QPID-1447 : Fix Slow Consumer Detection directory Name and package name git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@942117 13f79535-47bb-0310-9956-ffa450edef68 --- .../SlowConsumerDisconnect/MANIFEST.MF | 22 -- .../experimental/SlowConsumerDisconnect/build.xml | 32 --- .../virtualhost/plugin/SlowConsumer/Activator.java | 54 ----- .../plugin/SlowConsumer/SlowConsumerDetection.java | 123 ----------- .../SlowConsumerDetectionConfiguration.java | 72 ------- .../SlowConsumerDetectionPolicyConfiguration.java | 81 ------- .../SlowConsumerDetectionQueueConfiguration.java | 125 ----------- .../SlowConsumer/SlowConsumerPolicyPlugin.java | 28 --- .../SlowConsumerPolicyPluginFactory.java | 29 --- .../plugin/SlowConsumer/TopicDeletePolicy.java | 113 ---------- .../org/apache/qpid/systest/SlowConsumerTest.java | 237 --------------------- .../experimental/slowconsumerdetection/MANIFEST.MF | 22 ++ .../experimental/slowconsumerdetection/build.xml | 32 +++ .../plugin/SlowConsumerDetectionConfiguration.java | 72 +++++++ .../SlowConsumerDetectionPolicyConfiguration.java | 81 +++++++ .../SlowConsumerDetectionQueueConfiguration.java | 127 +++++++++++ .../qpid/server/virtualhost/plugin/Activator.java | 59 +++++ .../virtualhost/plugin/SlowConsumerDetection.java | 126 +++++++++++ .../plugin/policies/TopicDeletePolicy.java | 116 ++++++++++ .../policies/SlowConsumerPolicyPlugin.java | 28 +++ .../policies/SlowConsumerPolicyPluginFactory.java | 30 +++ .../org/apache/qpid/systest/SlowConsumerTest.java | 237 +++++++++++++++++++++ 22 files changed, 930 insertions(+), 916 deletions(-) delete mode 100644 qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF delete mode 100644 qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml delete mode 100644 qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java delete mode 100644 qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java delete mode 100644 qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java delete mode 100644 qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java delete mode 100644 qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java delete mode 100644 qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java delete mode 100644 qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java delete mode 100644 qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java delete mode 100644 qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java create mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF create mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml create mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java create mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java create mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java create mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java create mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java create mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java create mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java create mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java create mode 100644 qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java (limited to 'qpid/java/broker-plugins') diff --git a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF b/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF deleted file mode 100644 index ff98e7bdca..0000000000 --- a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF +++ /dev/null @@ -1,22 +0,0 @@ -Manifest-Version: 1.0 -Bundle-ManifestVersion: 2 -Bundle-Name: Qpid Slow Consumer Detection -Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true -Bundle-Version: 1.0.0 -Bundle-Activator: org.apache.qpid.server.virtualhost.plugins.Activator -Import-Package: org.osgi.framework, - org.apache.qpid.server.configuration.plugins, - org.apache.qpid.server.configuration, - org.apache.qpid.server.virtualhost.plugins, - org.apache.qpid.server.virtualhost, - org.apache.qpid.server.queue, - org.apache.qpid.server.registry, - org.apache.qpid.server.plugins, - org.apache.qpid, - org.apache.log4j, - org.apache.commons.configuration -Bundle-RequiredExecutionEnvironment: JavaSE-1.6 -Bundle-ClassPath: . -Bundle-ActivationPolicy: lazy -Export-Package: org.apache.qpid.server.virtualhost.plugins;uses:="org.osgi.framework" - diff --git a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml b/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml deleted file mode 100644 index 4d01b54f2d..0000000000 --- a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml +++ /dev/null @@ -1,32 +0,0 @@ - - - - - - - - - - - - - diff --git a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java b/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java deleted file mode 100644 index c1f5a32c1e..0000000000 --- a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; - -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; -import org.osgi.framework.BundleActivator; -import org.osgi.framework.BundleContext; - -/** - * Activator that loads our OSGi bundles for the Slow Consumer Detection plugin. - * - * This includes Configuration - * - * @author ritchiem - */ -public class Activator implements BundleActivator -{ - public void start(BundleContext ctx) throws Exception - { - if (null != ctx) - { - ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory(), null); - ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory(), null); - ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory(), null); - ctx.registerService(VirtualHostPluginFactory.class.getName(), new SlowConsumerDetection.SlowConsumerFactory(), null); - ctx.registerService(SlowConsumerPolicyPluginFactory.class.getName(), new TopicDeletePolicy.DeletePolicyFactory(), null); - } - } - - public void stop(BundleContext ctx) throws Exception - { - // no need to do anything here, osgi will unregister the service for us - } - -} diff --git a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java b/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java deleted file mode 100644 index 40b21aec33..0000000000 --- a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; -import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; - -class SlowConsumerDetection implements VirtualHostPlugin -{ - Logger _logger = Logger.getLogger(SlowConsumerDetection.class); - private VirtualHost _virtualhost; - private SlowConsumerDetectionConfiguration _config; - private SlowConsumerPolicyPlugin _policy; - - public static class SlowConsumerFactory implements VirtualHostPluginFactory - { - public VirtualHostPlugin newInstance(VirtualHost vhost) - { - return new SlowConsumerDetection(vhost); - } - } - - public SlowConsumerDetection(VirtualHost vhost) - { - _virtualhost = vhost; - _config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class); - if (_config == null) - { - throw new IllegalArgumentException("Plugin has not been configured"); - } - - } - - public void run() - { - _logger.info("Starting the SlowConsumersDetection job"); - try - { - for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues()) - { - _logger.debug("Checking consumer status for queue: " - + q.getName()); - try - { - SlowConsumerDetectionQueueConfiguration config = - q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class); - - if (checkQueueStatus(q, config)) - { - config.getPolicy().performPolicy(q); - } - } - catch (Exception e) - { - _logger.error("Exception in SlowConsumersDetection " + - "for queue: " + - q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. - } - } - _logger.info("SlowConsumersDetection job completed."); - } - catch (Exception e) - { - _logger.error("SlowConsumersDetection job failed: " + e.getMessage(), e); - } - catch (Error e) - { - _logger.error("SlowConsumersDetection job failed with error: " + e.getMessage(), e); - } - } - - public long getDelay() - { - return _config.getDelay(); - } - - public String getTimeUnit() - { - return _config.getTimeUnit(); - } - - /** - * Check the depth,messageSize,messageAge,messageCount values for this q - * - * @param q the queue to check - * @param config - * - * @return true if the queue has reached a threshold. - */ - private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) - { - - _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); - - return config != null && - (q.getMessageCount() >= config.getMessageCount() || - q.getQueueDepth() >= config.getDepth() || - q.getOldestMessageArrivalTime() >= config.getMessageAge()); - } -} diff --git a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java b/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java deleted file mode 100644 index 262edcea16..0000000000 --- a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; - -import java.util.concurrent.TimeUnit; - -public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin -{ - public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException - { - SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public String[] getParentPaths() - { - return new String[]{"virtualhosts.virtualhost.slow-consumer-detection"}; - } - } - - public String[] getElementsProcessed() - { - return new String[]{"delay", - "timeunit"}; - } - - public long getDelay() - { - return _configuration.getLong("delay", 10); - } - - public String getTimeUnit() - { - return _configuration.getString("timeunit", TimeUnit.SECONDS.toString()); - } - - @Override - public void setConfiguration(String path, Configuration configuration) throws ConfigurationException - { - super.setConfiguration(path, configuration); - - System.out.println("Configured SCDC"); - System.out.println("Delay:" + getDelay()); - System.out.println("TimeUnit:" + getTimeUnit()); - } -} diff --git a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java b/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java deleted file mode 100644 index e758ef965e..0000000000 --- a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; - -import java.util.List; - -public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin -{ - - public static class SlowConsumerDetectionPolicyConfigurationFactory - implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, - Configuration config) - throws ConfigurationException - { - SlowConsumerDetectionPolicyConfiguration slowConsumerConfig = - new SlowConsumerDetectionPolicyConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public String[] getParentPaths() - { - return new String[]{ - "virtualhosts.virtualhost.queues.slow-consumer-detection.policy", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy", - "virtualhosts.virtualhost.topics.slow-consumer-detection.policy", - "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy"}; - } - } - - public String[] getElementsProcessed() - { - // NOTE: the use of '@name]' rather than '[@name]' this appears to be - // a bug in commons configuration. - //fixme - Simple test case needs raised and JIRA raised on Commons - return new String[]{"@name]", "options"}; - } - - public String getPolicyName() - { - return _configuration.getString("[@name]"); - } - - public String getOption(String option) - { - List options = _configuration.getList("options.option[@name]"); - - if (options != null && options.contains(option)) - { - return _configuration.getString("options.option[@value]" + - "(" + options.indexOf(option) + ")"); - } - - return null; - } -} diff --git a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java b/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java deleted file mode 100644 index 5230f7940c..0000000000 --- a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; -import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; -import org.apache.qpid.server.plugins.PluginManager; -import org.apache.qpid.server.registry.ApplicationRegistry; - -import java.util.Map; - -public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin -{ - private SlowConsumerPolicyPlugin _policyPlugin; - - public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory - { - public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException - { - SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration(); - slowConsumerConfig.setConfiguration(path, config); - return slowConsumerConfig; - } - - public String[] getParentPaths() - { - return new String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection", - "virtualhosts.virtualhost.topics.slow-consumer-detection", - "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection"}; - } - - } - - public String[] getElementsProcessed() - { - return new String[]{"messageAge", - "depth", - "messageCount"}; - } - - public int getMessageAge() - { - return (int) getConfigurationValue("messageAge"); - } - - public long getDepth() - { - return getConfigurationValue("depth"); - } - - public long getMessageCount() - { - return getConfigurationValue("messageCount"); - } - - public SlowConsumerPolicyPlugin getPolicy() - { - return _policyPlugin; - } - - @Override - public void setConfiguration(String path, Configuration configuration) throws ConfigurationException - { - super.setConfiguration(path, configuration); - - SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class); - - PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); - Map factories = - pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class); - - if (_logger.isDebugEnabled()) - { - _logger.debug("Configured SCDQC"); - _logger.debug("Age:" + getMessageAge()); - _logger.debug("Depth:" + getDepth()); - _logger.debug("Count:" + getMessageCount()); - _logger.debug("Policy:" + policyConfig.getPolicyName()); - _logger.debug("Available factories:" + factories); - } - - SlowConsumerPolicyPluginFactory pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase()); - - if (pluginFactory == null) - { - throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet()); - } - - _policyPlugin = pluginFactory.newInstance(policyConfig); - } - - private long getConfigurationValue(String property) - { - // The _configuration we are given is a munged configurated - // so the queue will already have queue-queues munging - - // we then need to ensure that the TopicsConfiguration - // and TopicConfiguration classes correctly munge their configuration: - // queue-queues -> topic-topics - - return _configuration.getLong(property, 0); - } - -} diff --git a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java b/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java deleted file mode 100644 index 2e1c799a59..0000000000 --- a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; - -import org.apache.qpid.server.queue.AMQQueue; - -public interface SlowConsumerPolicyPlugin -{ - public void performPolicy(AMQQueue Queue); -} diff --git a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java b/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java deleted file mode 100644 index 1454971f52..0000000000 --- a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; - -import org.apache.qpid.server.plugins.PluginFactory; - -public interface SlowConsumerPolicyPluginFactory extends PluginFactory -{ - - public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration); -} diff --git a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java b/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java deleted file mode 100644 index f1e7142fdd..0000000000 --- a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; - -import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.exchange.TopicExchange; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.queue.AMQQueue; - -public class TopicDeletePolicy implements SlowConsumerPolicyPlugin -{ - Logger _logger = Logger.getLogger(TopicDeletePolicy.class); - private SlowConsumerDetectionPolicyConfiguration _configuration; - - public static class DeletePolicyFactory implements SlowConsumerPolicyPluginFactory - { - - public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration) - { - return new TopicDeletePolicy(configuration); - } - - public String getPluginName() - { - return "topicdelete"; - } - } - - public TopicDeletePolicy(SlowConsumerDetectionPolicyConfiguration config) - { - _configuration = config; - } - - public void performPolicy(AMQQueue q) - { - AMQSessionModel owner = q.getExclusiveOwningSession(); - - // Only process exclusive queues - if (owner == null) - { - return; - } - - //Only process Topics - if(!validateQueueIsATopic(q)) - { - return; - } - - try - { - owner.getConnectionModel(). - closeSession(owner, AMQConstant.RESOURCE_ERROR, - "Consuming to slow."); - - String option = _configuration.getOption("delete-persistent"); - - boolean deletePersistent = option != null && Boolean.parseBoolean(option); - - if (!q.isAutoDelete() && deletePersistent) - { - q.delete(); - } - - } - catch (AMQException e) - { - _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName()); - } - - } - - /** - * Check the queue bindings to validate the queue is bound to the - * topic exchange. - * - * @param q the Queue - * @return true iff Q is bound to a TopicExchange - */ - private boolean validateQueueIsATopic(AMQQueue q) - { - for (Binding binding : q.getBindings()) - { - if (binding.getExchange() instanceof TopicExchange) - { - return true; - } - } - - return false; - } -} diff --git a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java deleted file mode 100644 index d5b2b1858b..0000000000 --- a/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java +++ /dev/null @@ -1,237 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.systest; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.AMQChannelClosedException; -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.test.utils.QpidTestCase; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.naming.NamingException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * QPID-1447 : Add slow consumer detection and disconnection. - * - * Slow consumers should on a topic should expect to receive a - * 506 : Resource Error if the hit a predefined threshold. - */ -public class SlowConsumerTest extends QpidTestCase implements ExceptionListener -{ - Destination _destination; - private CountDownLatch _disconnectionLatch = new CountDownLatch(1); - private int MAX_QUEUE_MESSAGE_COUNT; - private int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE; - - private Thread _publisher; - private static final long DISCONNECTION_WAIT = 5; - private Exception _publisherError = null; - private JMSException _connectionException = null; - - @Override - public void setUp() throws Exception, ConfigurationException, NamingException - { - // Set the houseKeepingThread period to be every 500 - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".slow-consumer-detection.delay", "1"); - - setConfigurationProperty("virtualhosts.virtualhost." - + getConnectionURL().getVirtualHost().substring(1) + - ".slow-consumer-detection.timeunit", "SECONDS"); - - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - "queues.slow-consumer-detection." + - "policy[@name]", "TopicDelete"); - - /** - * Queue Configuration - - - - 4235264 - - - 600000 - - - 50 - - - - - - - - - */ - - /** - * Plugin Configuration - * - - 1 - MINUTES - - - */ - - super.setUp(); - } - - public void exclusiveTransientQueue(int ackMode) throws Exception - { - - } - - public void tempQueue(int ackMode) throws Exception - { - - } - - public void topicConsumer(int ackMode) throws Exception - { - Connection connection = getConnection(); - - connection.setExceptionListener(this); - - Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode); - - _destination = session.createTopic(getName()); - - MessageConsumer consumer = session.createConsumer(_destination); - - connection.start(); - - // Start the consumer pre-fetching - // Don't care about response as we will fill the broker up with messages - // after this point and ensure that the client is disconnected at the - // right point. - consumer.receiveNoWait(); - startPublisher(_destination); - - boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); - - System.out.println("Validating"); - - if (!disconnected && isBroker010()) - { - try - { - ((AMQSession_0_10) session).sync(); - } - catch (AMQException amqe) - { - JMSException jmsException = new JMSException(amqe.getMessage()); - jmsException.setLinkedException(amqe); - _connectionException = jmsException; - } - } - - System.err.println("ConnectionException:" + _connectionException); - - assertTrue("Client was not disconnected.", _connectionException != null); - - Exception linked = _connectionException.getLinkedException(); - - System.err.println("Linked:" + linked); - - _publisher.join(); - - //Validate publishing occurred ok - if (_publisherError != null) - { - throw _publisherError; - } - - assertNotNull("No error received onException listener.", _connectionException); - - assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); - - assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass()); - - AMQChannelClosedException ccException = (AMQChannelClosedException) linked; - - assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode()); - } - - private void startPublisher(final Destination destination) - { - _publisher = new Thread(new Runnable() - { - - public void run() - { - try - { - Connection connection = getConnection(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - - MessageProducer publisher = session.createProducer(destination); - - setMessageSize(MESSAGE_SIZE); - - for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) - { - publisher.send(createNextMessage(session, count)); - } - } - catch (Exception e) - { - _publisherError = e; - _disconnectionLatch.countDown(); - } - } - }); - } - - public void testAutoAckTopicConsumerMessageCount() throws Exception - { - MAX_QUEUE_MESSAGE_COUNT = 10; - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - "queues.slow-consumer-detection" + - "messageCount", "9"); - - setMessageSize(MESSAGE_SIZE); - - topicConsumer(Session.AUTO_ACKNOWLEDGE); - } - - public void onException(JMSException e) - { - _connectionException = e; - - _disconnectionLatch.countDown(); - } -} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF b/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF new file mode 100644 index 0000000000..ff98e7bdca --- /dev/null +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF @@ -0,0 +1,22 @@ +Manifest-Version: 1.0 +Bundle-ManifestVersion: 2 +Bundle-Name: Qpid Slow Consumer Detection +Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true +Bundle-Version: 1.0.0 +Bundle-Activator: org.apache.qpid.server.virtualhost.plugins.Activator +Import-Package: org.osgi.framework, + org.apache.qpid.server.configuration.plugins, + org.apache.qpid.server.configuration, + org.apache.qpid.server.virtualhost.plugins, + org.apache.qpid.server.virtualhost, + org.apache.qpid.server.queue, + org.apache.qpid.server.registry, + org.apache.qpid.server.plugins, + org.apache.qpid, + org.apache.log4j, + org.apache.commons.configuration +Bundle-RequiredExecutionEnvironment: JavaSE-1.6 +Bundle-ClassPath: . +Bundle-ActivationPolicy: lazy +Export-Package: org.apache.qpid.server.virtualhost.plugins;uses:="org.osgi.framework" + diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml b/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml new file mode 100644 index 0000000000..4d01b54f2d --- /dev/null +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/build.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java new file mode 100644 index 0000000000..e42eba9fc6 --- /dev/null +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionConfiguration.java @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugin; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +import java.util.concurrent.TimeUnit; + +public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin +{ + public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public String[] getParentPaths() + { + return new String[]{"virtualhosts.virtualhost.slow-consumer-detection"}; + } + } + + public String[] getElementsProcessed() + { + return new String[]{"delay", + "timeunit"}; + } + + public long getDelay() + { + return _configuration.getLong("delay", 10); + } + + public String getTimeUnit() + { + return _configuration.getString("timeunit", TimeUnit.SECONDS.toString()); + } + + @Override + public void setConfiguration(String path, Configuration configuration) throws ConfigurationException + { + super.setConfiguration(path, configuration); + + System.out.println("Configured SCDC"); + System.out.println("Delay:" + getDelay()); + System.out.println("TimeUnit:" + getTimeUnit()); + } +} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java new file mode 100644 index 0000000000..b7719875a0 --- /dev/null +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugin; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; + +import java.util.List; + +public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin +{ + + public static class SlowConsumerDetectionPolicyConfigurationFactory + implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, + Configuration config) + throws ConfigurationException + { + SlowConsumerDetectionPolicyConfiguration slowConsumerConfig = + new SlowConsumerDetectionPolicyConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public String[] getParentPaths() + { + return new String[]{ + "virtualhosts.virtualhost.queues.slow-consumer-detection.policy", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy", + "virtualhosts.virtualhost.topics.slow-consumer-detection.policy", + "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy"}; + } + } + + public String[] getElementsProcessed() + { + // NOTE: the use of '@name]' rather than '[@name]' this appears to be + // a bug in commons configuration. + //fixme - Simple test case needs raised and JIRA raised on Commons + return new String[]{"@name]", "options"}; + } + + public String getPolicyName() + { + return _configuration.getString("[@name]"); + } + + public String getOption(String option) + { + List options = _configuration.getList("options.option[@name]"); + + if (options != null && options.contains(option)) + { + return _configuration.getString("options.option[@value]" + + "(" + options.indexOf(option) + ")"); + } + + return null; + } +} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java new file mode 100644 index 0000000000..a652539f14 --- /dev/null +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java @@ -0,0 +1,127 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration.plugin; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import org.apache.qpid.server.plugins.PluginManager; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; + +import java.util.Map; + +public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin +{ + private SlowConsumerPolicyPlugin _policyPlugin; + + public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public String[] getParentPaths() + { + return new String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection", + "virtualhosts.virtualhost.topics.slow-consumer-detection", + "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection"}; + } + + } + + public String[] getElementsProcessed() + { + return new String[]{"messageAge", + "depth", + "messageCount"}; + } + + public int getMessageAge() + { + return (int) getConfigurationValue("messageAge"); + } + + public long getDepth() + { + return getConfigurationValue("depth"); + } + + public long getMessageCount() + { + return getConfigurationValue("messageCount"); + } + + public SlowConsumerPolicyPlugin getPolicy() + { + return _policyPlugin; + } + + @Override + public void setConfiguration(String path, Configuration configuration) throws ConfigurationException + { + super.setConfiguration(path, configuration); + + SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class); + + PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); + Map factories = + pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Configured SCDQC"); + _logger.debug("Age:" + getMessageAge()); + _logger.debug("Depth:" + getDepth()); + _logger.debug("Count:" + getMessageCount()); + _logger.debug("Policy:" + policyConfig.getPolicyName()); + _logger.debug("Available factories:" + factories); + } + + SlowConsumerPolicyPluginFactory pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase()); + + if (pluginFactory == null) + { + throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet()); + } + + _policyPlugin = pluginFactory.newInstance(policyConfig); + } + + private long getConfigurationValue(String property) + { + // The _configuration we are given is a munged configurated + // so the queue will already have queue-queues munging + + // we then need to ensure that the TopicsConfiguration + // and TopicConfiguration classes correctly munge their configuration: + // queue-queues -> topic-topics + + return _configuration.getLong(property, 0); + } + +} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java new file mode 100644 index 0000000000..a6ab59a230 --- /dev/null +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/Activator.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin; + +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; +import org.apache.qpid.server.configuration.plugins.ConfigurationPluginFactory; +import org.apache.qpid.server.virtualhost.plugin.policies.TopicDeletePolicy; +import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; + +/** + * Activator that loads our OSGi bundles for the Slow Consumer Detection plugin. + * + * This includes Configuration + * + * @author ritchiem + */ +public class Activator implements BundleActivator +{ + public void start(BundleContext ctx) throws Exception + { + if (null != ctx) + { + ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory(), null); + ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory(), null); + ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory(), null); + ctx.registerService(VirtualHostPluginFactory.class.getName(), new SlowConsumerDetection.SlowConsumerFactory(), null); + ctx.registerService(SlowConsumerPolicyPluginFactory.class.getName(), new TopicDeletePolicy.DeletePolicyFactory(), null); + } + } + + public void stop(BundleContext ctx) throws Exception + { + // no need to do anything here, osgi will unregister the service for us + } + +} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java new file mode 100644 index 0000000000..cc6d310c90 --- /dev/null +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin; +import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; + +class SlowConsumerDetection implements VirtualHostPlugin +{ + Logger _logger = Logger.getLogger(SlowConsumerDetection.class); + private VirtualHost _virtualhost; + private SlowConsumerDetectionConfiguration _config; + private SlowConsumerPolicyPlugin _policy; + + public static class SlowConsumerFactory implements VirtualHostPluginFactory + { + public VirtualHostPlugin newInstance(VirtualHost vhost) + { + return new SlowConsumerDetection(vhost); + } + } + + public SlowConsumerDetection(VirtualHost vhost) + { + _virtualhost = vhost; + _config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class); + if (_config == null) + { + throw new IllegalArgumentException("Plugin has not been configured"); + } + + } + + public void run() + { + _logger.info("Starting the SlowConsumersDetection job"); + try + { + for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues()) + { + _logger.debug("Checking consumer status for queue: " + + q.getName()); + try + { + SlowConsumerDetectionQueueConfiguration config = + q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class); + + if (checkQueueStatus(q, config)) + { + config.getPolicy().performPolicy(q); + } + } + catch (Exception e) + { + _logger.error("Exception in SlowConsumersDetection " + + "for queue: " + + q.getNameShortString().toString(), e); + //Don't throw exceptions as this will stop the + // house keeping task from running. + } + } + _logger.info("SlowConsumersDetection job completed."); + } + catch (Exception e) + { + _logger.error("SlowConsumersDetection job failed: " + e.getMessage(), e); + } + catch (Error e) + { + _logger.error("SlowConsumersDetection job failed with error: " + e.getMessage(), e); + } + } + + public long getDelay() + { + return _config.getDelay(); + } + + public String getTimeUnit() + { + return _config.getTimeUnit(); + } + + /** + * Check the depth,messageSize,messageAge,messageCount values for this q + * + * @param q the queue to check + * @param config + * + * @return true if the queue has reached a threshold. + */ + private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) + { + + _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); + + return config != null && + (q.getMessageCount() >= config.getMessageCount() || + q.getQueueDepth() >= config.getDepth() || + q.getOldestMessageArrivalTime() >= config.getMessageAge()); + } +} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java new file mode 100644 index 0000000000..fb9dfed103 --- /dev/null +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/policies/TopicDeletePolicy.java @@ -0,0 +1,116 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.policies; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; + +public class TopicDeletePolicy implements SlowConsumerPolicyPlugin +{ + Logger _logger = Logger.getLogger(TopicDeletePolicy.class); + private SlowConsumerDetectionPolicyConfiguration _configuration; + + public static class DeletePolicyFactory implements SlowConsumerPolicyPluginFactory + { + + public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration) + { + return new TopicDeletePolicy(configuration); + } + + public String getPluginName() + { + return "topicdelete"; + } + } + + public TopicDeletePolicy(SlowConsumerDetectionPolicyConfiguration config) + { + _configuration = config; + } + + public void performPolicy(AMQQueue q) + { + AMQSessionModel owner = q.getExclusiveOwningSession(); + + // Only process exclusive queues + if (owner == null) + { + return; + } + + //Only process Topics + if(!validateQueueIsATopic(q)) + { + return; + } + + try + { + owner.getConnectionModel(). + closeSession(owner, AMQConstant.RESOURCE_ERROR, + "Consuming to slow."); + + String option = _configuration.getOption("delete-persistent"); + + boolean deletePersistent = option != null && Boolean.parseBoolean(option); + + if (!q.isAutoDelete() && deletePersistent) + { + q.delete(); + } + + } + catch (AMQException e) + { + _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName()); + } + + } + + /** + * Check the queue bindings to validate the queue is bound to the + * topic exchange. + * + * @param q the Queue + * @return true iff Q is bound to a TopicExchange + */ + private boolean validateQueueIsATopic(AMQQueue q) + { + for (Binding binding : q.getBindings()) + { + if (binding.getExchange() instanceof TopicExchange) + { + return true; + } + } + + return false; + } +} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java new file mode 100644 index 0000000000..0d80ef41dc --- /dev/null +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPlugin.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.slowconsumerdetection.policies; + +import org.apache.qpid.server.queue.AMQQueue; + +public interface SlowConsumerPolicyPlugin +{ + public void performPolicy(AMQQueue Queue); +} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java new file mode 100644 index 0000000000..2bd7b67e91 --- /dev/null +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/slowconsumerdetection/policies/SlowConsumerPolicyPluginFactory.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.slowconsumerdetection.policies; + +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionPolicyConfiguration; +import org.apache.qpid.server.plugins.PluginFactory; + +public interface SlowConsumerPolicyPluginFactory extends PluginFactory +{ + + public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration); +} diff --git a/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java new file mode 100644 index 0000000000..d5b2b1858b --- /dev/null +++ b/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java @@ -0,0 +1,237 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.systest; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.AMQChannelClosedException; +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.test.utils.QpidTestCase; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.NamingException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * QPID-1447 : Add slow consumer detection and disconnection. + * + * Slow consumers should on a topic should expect to receive a + * 506 : Resource Error if the hit a predefined threshold. + */ +public class SlowConsumerTest extends QpidTestCase implements ExceptionListener +{ + Destination _destination; + private CountDownLatch _disconnectionLatch = new CountDownLatch(1); + private int MAX_QUEUE_MESSAGE_COUNT; + private int MESSAGE_SIZE = DEFAULT_MESSAGE_SIZE; + + private Thread _publisher; + private static final long DISCONNECTION_WAIT = 5; + private Exception _publisherError = null; + private JMSException _connectionException = null; + + @Override + public void setUp() throws Exception, ConfigurationException, NamingException + { + // Set the houseKeepingThread period to be every 500 + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".slow-consumer-detection.delay", "1"); + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".slow-consumer-detection.timeunit", "SECONDS"); + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + "queues.slow-consumer-detection." + + "policy[@name]", "TopicDelete"); + + /** + * Queue Configuration + + + + 4235264 + + + 600000 + + + 50 + + + + + + + + + */ + + /** + * Plugin Configuration + * + + 1 + MINUTES + + + */ + + super.setUp(); + } + + public void exclusiveTransientQueue(int ackMode) throws Exception + { + + } + + public void tempQueue(int ackMode) throws Exception + { + + } + + public void topicConsumer(int ackMode) throws Exception + { + Connection connection = getConnection(); + + connection.setExceptionListener(this); + + Session session = connection.createSession(ackMode == Session.SESSION_TRANSACTED, ackMode); + + _destination = session.createTopic(getName()); + + MessageConsumer consumer = session.createConsumer(_destination); + + connection.start(); + + // Start the consumer pre-fetching + // Don't care about response as we will fill the broker up with messages + // after this point and ensure that the client is disconnected at the + // right point. + consumer.receiveNoWait(); + startPublisher(_destination); + + boolean disconnected = _disconnectionLatch.await(DISCONNECTION_WAIT, TimeUnit.SECONDS); + + System.out.println("Validating"); + + if (!disconnected && isBroker010()) + { + try + { + ((AMQSession_0_10) session).sync(); + } + catch (AMQException amqe) + { + JMSException jmsException = new JMSException(amqe.getMessage()); + jmsException.setLinkedException(amqe); + _connectionException = jmsException; + } + } + + System.err.println("ConnectionException:" + _connectionException); + + assertTrue("Client was not disconnected.", _connectionException != null); + + Exception linked = _connectionException.getLinkedException(); + + System.err.println("Linked:" + linked); + + _publisher.join(); + + //Validate publishing occurred ok + if (_publisherError != null) + { + throw _publisherError; + } + + assertNotNull("No error received onException listener.", _connectionException); + + assertNotNull("No linked exception set on:" + _connectionException.getMessage(), linked); + + assertEquals("Incorrect linked exception received.", AMQChannelClosedException.class, linked.getClass()); + + AMQChannelClosedException ccException = (AMQChannelClosedException) linked; + + assertEquals("Channel was not closed with correct code.", AMQConstant.RESOURCE_ERROR, ccException.getErrorCode()); + } + + private void startPublisher(final Destination destination) + { + _publisher = new Thread(new Runnable() + { + + public void run() + { + try + { + Connection connection = getConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageProducer publisher = session.createProducer(destination); + + setMessageSize(MESSAGE_SIZE); + + for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) + { + publisher.send(createNextMessage(session, count)); + } + } + catch (Exception e) + { + _publisherError = e; + _disconnectionLatch.countDown(); + } + } + }); + } + + public void testAutoAckTopicConsumerMessageCount() throws Exception + { + MAX_QUEUE_MESSAGE_COUNT = 10; + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + "queues.slow-consumer-detection" + + "messageCount", "9"); + + setMessageSize(MESSAGE_SIZE); + + topicConsumer(Session.AUTO_ACKNOWLEDGE); + } + + public void onException(JMSException e) + { + _connectionException = e; + + _disconnectionLatch.countDown(); + } +} -- cgit v1.2.1