diff options
| author | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 13:02:49 +0000 |
|---|---|---|
| committer | Andrew Donald Kennedy <grkvlt@apache.org> | 2010-07-22 13:02:49 +0000 |
| commit | cfb3a1ef5b743d68a2a78754ef0fdc750378d3cc (patch) | |
| tree | 0e5e6ca88df77af065657042c331e1b9283c63b7 /java/broker-plugins | |
| parent | 6e1e7490054ed9575a8b4090f29632cbbc6f730e (diff) | |
| download | qpid-python-cfb3a1ef5b743d68a2a78754ef0fdc750378d3cc.tar.gz | |
QPID-2679: cache queues that are configured on a per-virtualhost basis
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966634 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-plugins')
2 files changed, 114 insertions, 15 deletions
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java new file mode 100644 index 0000000000..d947e9a367 --- /dev/null +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/ConfiguredQueueBindingListener.java @@ -0,0 +1,86 @@ +package org.apache.qpid.server.virtualhost.plugin; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; +import org.apache.qpid.server.exchange.AbstractExchange; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.exchange.Exchange.BindingListener; +import org.apache.qpid.server.queue.AMQQueue; + +/** + * This is a listener that caches queues that are configured for slow consumer disconnection. + * + * There should be one listener per virtual host, which can be added to all exchanges on + * that host. + * + * TODO In future, it will be possible to configure the policy at runtime, so only the queue + * itself is cached, and the configuration looked up by the housekeeping thread. This means + * that there may be occasions where the copy of the cache contents retrieved by the thread + * does not contain queues that are configured, or that configured queues are not present. + * + * @see BindingListener + */ +public class ConfiguredQueueBindingListener implements BindingListener +{ + private static final Logger _log = Logger.getLogger(ConfiguredQueueBindingListener.class); + + private String _vhostName; + private Set<AMQQueue> _cache = Collections.synchronizedSet(new HashSet<AMQQueue>()); + + public ConfiguredQueueBindingListener(String vhostName) + { + _vhostName = vhostName; + } + + /** + * @see BindingListener#bindingAdded(Exchange, Binding) + */ + public void bindingAdded(Exchange exchange, Binding binding) + { + processBinding(binding); + } + + /** + * @see BindingListener#bindingRemoved(Exchange, Binding) + */ + public void bindingRemoved(Exchange exchange, Binding binding) + { + processBinding(binding); + } + + private void processBinding(Binding binding) + { + AMQQueue queue = binding.getQueue(); + + SlowConsumerDetectionQueueConfiguration config = + queue.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); + if (config != null) + { + _cache.add(queue); + } + else + { + _cache.remove(queue); + } + } + + /** + * Lookup and return the cache of configured {@link AMQQueue}s. + * + * Note that when accessing the cached queues, the {@link Iterator} is not thread safe + * (see the {@link Collections#synchronizedSet(Set)} documentation) so a copy of the + * cache is returned. + * + * @return a copy of the cached {@link java.util.Set} of queues + */ + public Set<AMQQueue> getQueueCache() + { + return new HashSet<AMQQueue>(_cache); + } +} diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java index cac52c2fdf..7de95bbfa7 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java @@ -1,6 +1,5 @@ /* * - * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file @@ -20,12 +19,16 @@ */ package org.apache.qpid.server.virtualhost.plugin; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionConfiguration; import org.apache.qpid.server.configuration.plugin.SlowConsumerDetectionQueueConfiguration; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.plugins.Plugin; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.plugin.logging.SlowConsumerDetectionMessages; @@ -35,6 +38,7 @@ import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory; class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { private SlowConsumerDetectionConfiguration _config; + private ConfiguredQueueBindingListener _listener; public static class SlowConsumerFactory implements VirtualHostPluginFactory { @@ -53,9 +57,21 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin } } + /** + * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this + * cirtual host to record all the configured queues in a cache for processing by the housekeeping + * thread. + * + * @see Plugin#configure(ConfigurationPlugin) + */ public void configure(ConfigurationPlugin config) { _config = (SlowConsumerDetectionConfiguration) config; + _listener = new ConfiguredQueueBindingListener(_virtualhost.getName()); + for (AMQShortString exchangeName : _virtualhost.getExchangeRegistry().getExchangeNames()) + { + _virtualhost.getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener); + } } public SlowConsumerDetection(VirtualHost vhost) @@ -63,19 +79,19 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin super(vhost); } - @Override public void execute() { - SlowConsumerDetectionMessages.RUNNING(); - - for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues()) + CurrentActor.get().message(SlowConsumerDetectionMessages.RUNNING()); + + Set<AMQQueue> cache = _listener.getQueueCache(); + for (AMQQueue q : cache) { - SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName()); + CurrentActor.get().message(SlowConsumerDetectionMessages.CHECKING_QUEUE(q.getName())); + try { SlowConsumerDetectionQueueConfiguration config = - q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); - + q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); if (checkQueueStatus(q, config)) { config.getPolicy().performPolicy(q); @@ -83,15 +99,12 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin } catch (Exception e) { - _logger.error("Exception in SlowConsumersDetection " + - "for queue: " + - q.getNameShortString().toString(), e); - //Don't throw exceptions as this will stop the - // house keeping task from running. + // Don't throw exceptions as this will stop the house keeping task from running. + _logger.error("Exception in SlowConsumersDetection for queue: " + q.getName(), e); } } - SlowConsumerDetectionMessages.COMPLETE(); + CurrentActor.get().message(SlowConsumerDetectionMessages.COMPLETE()); } public long getDelay() |
