summaryrefslogtreecommitdiff
path: root/java/broker-plugins
diff options
context:
space:
mode:
authorAndrew Donald Kennedy <grkvlt@apache.org>2010-07-22 13:02:49 +0000
committerAndrew Donald Kennedy <grkvlt@apache.org>2010-07-22 13:02:49 +0000
commitcfb3a1ef5b743d68a2a78754ef0fdc750378d3cc (patch)
tree0e5e6ca88df77af065657042c331e1b9283c63b7 /java/broker-plugins
parent6e1e7490054ed9575a8b4090f29632cbbc6f730e (diff)
downloadqpid-python-cfb3a1ef5b743d68a2a78754ef0fdc750378d3cc.tar.gz
QPID-2679: cache queues that are configured on a per-virtualhost basis
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966634 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-plugins')
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java86
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java43
2 files changed, 114 insertions, 15 deletions
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java
new file mode 100644
index 0000000000..d947e9a367
--- /dev/null
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java
@@ -0,0 +1,86 @@
+package org.apache.qpid.server.virtualhost.plugin;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration;
+import org.apache.qpid.server.exchange.AbstractExchange;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.Exchange.BindingListener;
+import org.apache.qpid.server.queue.AMQQueue;
+
+/**
+ * This is a listener that caches queues that are configured for slow consumer disconnection.
+ *
+ * There should be one listener per virtual host, which can be added to all exchanges on
+ * that host.
+ *
+ * TODO In future, it will be possible to configure the policy at runtime, so only the queue
+ * itself is cached, and the configuration looked up by the housekeeping thread. This means
+ * that there may be occasions where the copy of the cache contents retrieved by the thread
+ * does not contain queues that are configured, or that configured queues are not present.
+ *
+ * @see BindingListener
+ */
+public class ConfiguredQueueBindingListener implements BindingListener
+{
+ private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class);
+
+ private String _vhostName;
+ private Set<AMQQueue> _cache = Collections.synchronizedSet(new HashSet<AMQQueue>());
+
+ public ConfiguredQueueBindingListener(String vhostName)
+ {
+ _vhostName = vhostName;
+ }
+
+ /**
+ * @see BindingListener#bindingAdded(Exchange, Binding)
+ */
+ public void bindingAdded(Exchange exchange, Binding binding)
+ {
+ processBinding(binding);
+ }
+
+ /**
+ * @see BindingListener#bindingRemoved(Exchange, Binding)
+ */
+ public void bindingRemoved(Exchange exchange, Binding binding)
+ {
+ processBinding(binding);
+ }
+
+ private void processBinding(Binding binding)
+ {
+ AMQQueue queue = binding.getQueue();
+
+ SlowConsumerDetectionQueueConfiguration config =
+ queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
+ if (config != null)
+ {
+ _cache.add(queue);
+ }
+ else
+ {
+ _cache.remove(queue);
+ }
+ }
+
+ /**
+ * Lookup and return the cache of configured {@link AMQQueue}s.
+ *
+ * Note that when accessing the cached queues, the {@link Iterator} is not thread safe
+ * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the
+ * cache is returned.
+ *
+ * @return a copy of the cached {@link java.util.Set} of queues
+ */
+ public Set<AMQQueue> getQueueCache()
+ {
+ return new HashSet<AMQQueue>(_cache);
+ }
+}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
index cac52c2fdf..7de95bbfa7 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
@@ -1,6 +1,5 @@
/*
*
- * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -20,12 +19,16 @@
*/
package org.apache.qpid.server.virtualhost.plugin;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration;
import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.plugin.logging.SlowConsumerDetectionMessages;
@@ -35,6 +38,7 @@ import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
private SlowConsumerDetectionConfiguration _config;
+ private ConfiguredQueueBindingListener _listener;
public static class SlowConsumerFactory implements VirtualHostPluginFactory
{
@@ -53,9 +57,21 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
}
}
+ /**
+ * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this
+ * cirtual host to record all the configured queues in a cache for processing by the housekeeping
+ * thread.
+ *
+ * @see Plugin#configure(ConfigurationPlugin)
+ */
public void configure(ConfigurationPlugin config)
{
_config = (SlowConsumerDetectionConfiguration) config;
+ _listener = new ConfiguredQueueBindingListener(_virtualhost.getName());
+ for (AMQShortString exchangeName : _virtualhost.getExchangeRegistry().getExchangeNames())
+ {
+ _virtualhost.getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener);
+ }
}
public SlowConsumerDetection(VirtualHost vhost)
@@ -63,19 +79,19 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
super(vhost);
}
- @Override
public void execute()
{
- SlowConsumerDetectionMessages.RUNNING();
-
- for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues())
+ CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING());
+
+ Set<AMQQueue> cache = _listener.getQueueCache();
+ for (AMQQueue q : cache)
{
- SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName());
+ CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName()));
+
try
{
SlowConsumerDetectionQueueConfiguration config =
- q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
-
+ q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
if (checkQueueStatus(q, config))
{
config.getPolicy().performPolicy(q);
@@ -83,15 +99,12 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
}
catch (Exception e)
{
- _logger.error("Exception in SlowConsumersDetection " +
- "for queue: " +
- q.getNameShortString().toString(), e);
- //Don't throw exceptions as this will stop the
- // house keeping task from running.
+ // Don't throw exceptions as this will stop the house keeping task from running.
+ _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e);
}
}
- SlowConsumerDetectionMessages.COMPLETE();
+ CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE());
}
public long getDelay()