summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2010-05-07 15:11:56 +0000
committerMartin Ritchie <ritchiem@apache.org>2010-05-07 15:11:56 +0000
commit2947b0561d8ac81a9a053b4b3f60d353b36eece9 (patch)
tree5f0688ff78243961436fbd81f7ad452ea7e9e887 /java
parentef0fd0d8a919a759bcff64a026e3abd7a71db1e7 (diff)
downloadqpid-python-2947b0561d8ac81a9a053b4b3f60d353b36eece9.tar.gz
QPID-1447 : Exclude SCD testing until complete
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@942108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF22
-rw-r--r--java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml32
-rw-r--r--java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java54
-rw-r--r--java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java123
-rw-r--r--java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java72
-rw-r--r--java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java81
-rw-r--r--java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java125
-rw-r--r--java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java28
-rw-r--r--java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java29
-rw-r--r--java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java113
-rw-r--r--java/test-profiles/Excludes3
11 files changed, 682 insertions, 0 deletions
diff --git a/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF b/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF
new file mode 100644
index 0000000000..80dcd33efb
--- /dev/null
+++ b/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF
@@ -0,0 +1,22 @@
+Manifest-Version: 1.0
+Bundle-ManifestVersion: 2
+Bundle-Name: Qpid Slow Consumer Detection
+Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true
+Bundle-Version: 1.0.0
+Bundle-Activator: org.apache.qpid.server.virtualhost.plugins.Activator
+Import-Package: org.osgi.framework,
+ org.apache.qpid.server.configuration.plugin,
+ org.apache.qpid.server.configuration,
+ org.apache.qpid.server.virtualhost.plugin,
+ org.apache.qpid.server.virtualhost,
+ org.apache.qpid.server.queue,
+ org.apache.qpid.server.registry,
+ org.apache.qpid.server.plugins,
+ org.apache.qpid,
+ org.apache.log4j,
+ org.apache.commons.configuration
+Bundle-RequiredExecutionEnvironment: JavaSE-1.6
+Bundle-ClassPath: .
+Bundle-ActivationPolicy: lazy
+Export-Package: org.apache.qpid.server.virtualhost.plugins;uses:="org.osgi.framework"
+
diff --git a/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml b/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml
new file mode 100644
index 0000000000..4d01b54f2d
--- /dev/null
+++ b/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml
@@ -0,0 +1,32 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+nn - or more contributor license agreements. See the NOTICE file
+ -n distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="Slow Consumer Disconnect" default="build">
+
+ <property name="module.depends" value="common broker broker-plugins"/>
+ <property name="module.test.depends" value="broker/test systests client management/common"/>
+ <property name="module.manifest" value="MANIFEST.MF"/>
+ <property name="module.plugin" value="true"/>
+
+ <import file="../../../module.xml"/>
+
+ <target name="bundle" depends="bundle-tasks"/>
+
+</project>
diff --git a/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java
new file mode 100644
index 0000000000..62088e8bb3
--- /dev/null
+++ b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPluginFactory;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+
+/**
+ * Activator that loads our OSGi bundles for the Slow Consumer Detection plugin.
+ *
+ * This includes Configuration
+ *
+ * @author ritchiem
+ */
+public class Activator implements BundleActivator
+{
+ public void start(BundleContext ctx) throws Exception
+ {
+ if (null != ctx)
+ {
+ ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory(), null);
+ ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory(), null);
+ ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory(), null);
+ ctx.registerService(VirtualHostPluginFactory.class.getName(), new SlowConsumerDetection.SlowConsumerFactory(), null);
+ ctx.registerService(SlowConsumerPolicyPluginFactory.class.getName(), new TopicDeletePolicy.DeletePolicyFactory(), null);
+ }
+ }
+
+ public void stop(BundleContext ctx) throws Exception
+ {
+ // no need to do anything here, osgi will unregister the service for us
+ }
+
+}
diff --git a/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java
new file mode 100644
index 0000000000..2f4bf1a528
--- /dev/null
+++ b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPlugin;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPluginFactory;
+
+class SlowConsumerDetection implements VirtualHostPlugin
+{
+ Logger _logger = Logger.getLogger(SlowConsumerDetection.class);
+ private VirtualHost _virtualhost;
+ private SlowConsumerDetectionConfiguration _config;
+ private SlowConsumerPolicyPlugin _policy;
+
+ public static class SlowConsumerFactory implements VirtualHostPluginFactory
+ {
+ public VirtualHostPlugin newInstance(VirtualHost vhost)
+ {
+ return new SlowConsumerDetection(vhost);
+ }
+ }
+
+ public SlowConsumerDetection(VirtualHost vhost)
+ {
+ _virtualhost = vhost;
+ _config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class);
+ if (_config == null)
+ {
+ throw new IllegalArgumentException("Plugin has not been configured");
+ }
+
+ }
+
+ public void run()
+ {
+ _logger.info("Starting the SlowConsumersDetection job");
+ try
+ {
+ for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues())
+ {
+ _logger.debug("Checking consumer status for queue: "
+ + q.getName());
+ try
+ {
+ SlowConsumerDetectionQueueConfiguration config =
+ q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class);
+
+ if (checkQueueStatus(q, config))
+ {
+ config.getPolicy().performPolicy(q);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Exception in SlowConsumersDetection " +
+ "for queue: " +
+ q.getNameShortString().toString(), e);
+ //Don't throw exceptions as this will stop the
+ // house keeping task from running.
+ }
+ }
+ _logger.info("SlowConsumersDetection job completed.");
+ }
+ catch (Exception e)
+ {
+ _logger.error("SlowConsumersDetection job failed: " + e.getMessage(), e);
+ }
+ catch (Error e)
+ {
+ _logger.error("SlowConsumersDetection job failed with error: " + e.getMessage(), e);
+ }
+ }
+
+ public long getDelay()
+ {
+ return _config.getDelay();
+ }
+
+ public String getTimeUnit()
+ {
+ return _config.getTimeUnit();
+ }
+
+ /**
+ * Check the depth,messageSize,messageAge,messageCount values for this q
+ *
+ * @param q the queue to check
+ * @param config
+ *
+ * @return true if the queue has reached a threshold.
+ */
+ private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config)
+ {
+
+ _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
+
+ return config != null &&
+ (q.getMessageCount() >= config.getMessageCount() ||
+ q.getQueueDepth() >= config.getDepth() ||
+ q.getOldestMessageArrivalTime() >= config.getMessageAge());
+ }
+}
diff --git a/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java
new file mode 100644
index 0000000000..08dc9d2d09
--- /dev/null
+++ b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin
+{
+ public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory
+ {
+ public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
+ {
+ SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration();
+ slowConsumerConfig.setConfiguration(path, config);
+ return slowConsumerConfig;
+ }
+
+ public String[] getParentPaths()
+ {
+ return new String[]{"virtualhosts.virtualhost.slow-consumer-detection"};
+ }
+ }
+
+ public String[] getElementsProcessed()
+ {
+ return new String[]{"delay",
+ "timeunit"};
+ }
+
+ public long getDelay()
+ {
+ return _configuration.getLong("delay", 10);
+ }
+
+ public String getTimeUnit()
+ {
+ return _configuration.getString("timeunit", TimeUnit.SECONDS.toString());
+ }
+
+ @Override
+ public void setConfiguration(String path, Configuration configuration) throws ConfigurationException
+ {
+ super.setConfiguration(path, configuration);
+
+ System.out.println("Configured SCDC");
+ System.out.println("Delay:" + getDelay());
+ System.out.println("TimeUnit:" + getTimeUnit());
+ }
+}
diff --git a/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java
new file mode 100644
index 0000000000..7a5f1f1bff
--- /dev/null
+++ b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+
+import java.util.List;
+
+public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin
+{
+
+ public static class SlowConsumerDetectionPolicyConfigurationFactory
+ implements ConfigurationPluginFactory
+ {
+ public ConfigurationPlugin newInstance(String path,
+ Configuration config)
+ throws ConfigurationException
+ {
+ SlowConsumerDetectionPolicyConfiguration slowConsumerConfig =
+ new SlowConsumerDetectionPolicyConfiguration();
+ slowConsumerConfig.setConfiguration(path, config);
+ return slowConsumerConfig;
+ }
+
+ public String[] getParentPaths()
+ {
+ return new String[]{
+ "virtualhosts.virtualhost.queues.slow-consumer-detection.policy",
+ "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy",
+ "virtualhosts.virtualhost.topics.slow-consumer-detection.policy",
+ "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy"};
+ }
+ }
+
+ public String[] getElementsProcessed()
+ {
+ // NOTE: the use of '@name]' rather than '[@name]' this appears to be
+ // a bug in commons configuration.
+ //fixme - Simple test case needs raised and JIRA raised on Commons
+ return new String[]{"@name]", "options"};
+ }
+
+ public String getPolicyName()
+ {
+ return _configuration.getString("[@name]");
+ }
+
+ public String getOption(String option)
+ {
+ List options = _configuration.getList("options.option[@name]");
+
+ if (options != null && options.contains(option))
+ {
+ return _configuration.getString("options.option[@value]" +
+ "(" + options.indexOf(option) + ")");
+ }
+
+ return null;
+ }
+}
diff --git a/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java
new file mode 100644
index 0000000000..1eb79696d2
--- /dev/null
+++ b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.util.Map;
+
+public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
+{
+ private SlowConsumerPolicyPlugin _policyPlugin;
+
+ public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory
+ {
+ public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException
+ {
+ SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration();
+ slowConsumerConfig.setConfiguration(path, config);
+ return slowConsumerConfig;
+ }
+
+ public String[] getParentPaths()
+ {
+ return new String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection",
+ "virtualhosts.virtualhost.queues.queue.slow-consumer-detection",
+ "virtualhosts.virtualhost.topics.slow-consumer-detection",
+ "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection"};
+ }
+
+ }
+
+ public String[] getElementsProcessed()
+ {
+ return new String[]{"messageAge",
+ "depth",
+ "messageCount"};
+ }
+
+ public int getMessageAge()
+ {
+ return (int) getConfigurationValue("messageAge");
+ }
+
+ public long getDepth()
+ {
+ return getConfigurationValue("depth");
+ }
+
+ public long getMessageCount()
+ {
+ return getConfigurationValue("messageCount");
+ }
+
+ public SlowConsumerPolicyPlugin getPolicy()
+ {
+ return _policyPlugin;
+ }
+
+ @Override
+ public void setConfiguration(String path, Configuration configuration) throws ConfigurationException
+ {
+ super.setConfiguration(path, configuration);
+
+ SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class);
+
+ PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager();
+ Map<String, SlowConsumerPolicyPluginFactory> factories =
+ pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Configured SCDQC");
+ _logger.debug("Age:" + getMessageAge());
+ _logger.debug("Depth:" + getDepth());
+ _logger.debug("Count:" + getMessageCount());
+ _logger.debug("Policy:" + policyConfig.getPolicyName());
+ _logger.debug("Available factories:" + factories);
+ }
+
+ SlowConsumerPolicyPluginFactory pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase());
+
+ if (pluginFactory == null)
+ {
+ throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet());
+ }
+
+ _policyPlugin = pluginFactory.newInstance(policyConfig);
+ }
+
+ private long getConfigurationValue(String property)
+ {
+ // The _configuration we are given is a munged configurated
+ // so the queue will already have queue-queues munging
+
+ // we then need to ensure that the TopicsConfiguration
+ // and TopicConfiguration classes correctly munge their configuration:
+ // queue-queues -> topic-topics
+
+ return _configuration.getLong(property, 0);
+ }
+
+}
diff --git a/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java
new file mode 100644
index 0000000000..2e1c799a59
--- /dev/null
+++ b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface SlowConsumerPolicyPlugin
+{
+ public void performPolicy(AMQQueue Queue);
+}
diff --git a/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java
new file mode 100644
index 0000000000..1454971f52
--- /dev/null
+++ b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.plugins.PluginFactory;
+
+public interface SlowConsumerPolicyPluginFactory extends PluginFactory
+{
+
+ public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration);
+}
diff --git a/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java
new file mode 100644
index 0000000000..f1e7142fdd
--- /dev/null
+++ b/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class TopicDeletePolicy implements SlowConsumerPolicyPlugin
+{
+ Logger _logger = Logger.getLogger(TopicDeletePolicy.class);
+ private SlowConsumerDetectionPolicyConfiguration _configuration;
+
+ public static class DeletePolicyFactory implements SlowConsumerPolicyPluginFactory
+ {
+
+ public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration)
+ {
+ return new TopicDeletePolicy(configuration);
+ }
+
+ public String getPluginName()
+ {
+ return "topicdelete";
+ }
+ }
+
+ public TopicDeletePolicy(SlowConsumerDetectionPolicyConfiguration config)
+ {
+ _configuration = config;
+ }
+
+ public void performPolicy(AMQQueue q)
+ {
+ AMQSessionModel owner = q.getExclusiveOwningSession();
+
+ // Only process exclusive queues
+ if (owner == null)
+ {
+ return;
+ }
+
+ //Only process Topics
+ if(!validateQueueIsATopic(q))
+ {
+ return;
+ }
+
+ try
+ {
+ owner.getConnectionModel().
+ closeSession(owner, AMQConstant.RESOURCE_ERROR,
+ "Consuming to slow.");
+
+ String option = _configuration.getOption("delete-persistent");
+
+ boolean deletePersistent = option != null && Boolean.parseBoolean(option);
+
+ if (!q.isAutoDelete() && deletePersistent)
+ {
+ q.delete();
+ }
+
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName());
+ }
+
+ }
+
+ /**
+ * Check the queue bindings to validate the queue is bound to the
+ * topic exchange.
+ *
+ * @param q the Queue
+ * @return true iff Q is bound to a TopicExchange
+ */
+ private boolean validateQueueIsATopic(AMQQueue q)
+ {
+ for (Binding binding : q.getBindings())
+ {
+ if (binding.getExchange() instanceof TopicExchange)
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/java/test-profiles/Excludes b/java/test-profiles/Excludes
index 05f68cbe9d..6a2e33c1fe 100644
--- a/java/test-profiles/Excludes
+++ b/java/test-profiles/Excludes
@@ -31,3 +31,6 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#*
// QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail.
org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
+
+// QPID-1447 : Work In Progress
+org.apache.qpid.systest.SlowConsumerTest#*