diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-30 13:57:47 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-30 13:57:47 +0000 |
| commit | 6e79747ac572bceb085d53d0bccbf0604d9b435f (patch) | |
| tree | efea0ea2d0dc39c960b5b518467ef24c0bbd7964 /java | |
| parent | 691018b60bc4723d666e610d3c29d42bb3ac41fe (diff) | |
| download | qpid-python-6e79747ac572bceb085d53d0bccbf0604d9b435f.tar.gz | |
QPID-3601: Occasional failure to delete queue on broker shutdown
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1208434 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
2 files changed, 35 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java index 0638ea362f..6f8020fc54 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java @@ -87,6 +87,19 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin @Override public void validateConfiguration() throws ConfigurationException { + PluginManager pluginManager; + try + { + pluginManager = ApplicationRegistry.getInstance().getPluginManager(); + } + catch (IllegalStateException ise) + { + // We see this happen during shutdown due to asynchronous reconfig performed IO threads + // running at the same time as the shutdown handler. + _policyPlugin = null; + return; + } + if (!containsPositiveLong("messageAge") && !containsPositiveLong("depth") && !containsPositiveLong("messageCount")) @@ -96,8 +109,6 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin } SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName()); - - PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getSlowConsumerPlugins(); if (policyConfig == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java index 5c4fe0aab8..248b3b2143 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java @@ -26,11 +26,13 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.plugins.Plugin; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { @@ -56,7 +58,7 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin /** * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this - * cirtual host to record all the configured queues in a cache for processing by the housekeeping + * virtual host to record all the configured queues in a cache for processing by the housekeeping * thread. * * @see Plugin#configure(ConfigurationPlugin) @@ -65,9 +67,10 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { _config = (SlowConsumerDetectionConfiguration) config; _listener = new ConfiguredQueueBindingListener(getVirtualHost().getName()); - for (AMQShortString exchangeName : getVirtualHost().getExchangeRegistry().getExchangeNames()) + final ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry(); + for (AMQShortString exchangeName : exchangeRegistry.getExchangeNames()) { - getVirtualHost().getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener); + exchangeRegistry.getExchange(exchangeName).addBindingListener(_listener); } } @@ -87,11 +90,21 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin try { - SlowConsumerDetectionQueueConfiguration config = + final SlowConsumerDetectionQueueConfiguration config = q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); if (checkQueueStatus(q, config)) { - config.getPolicy().performPolicy(q); + final SlowConsumerPolicyPlugin policy = config.getPolicy(); + if (policy == null) + { + // We would only expect to see this during shutdown + _logger.warn("No slow consumer policy for queue " + q.getName()); + } + else + { + policy.performPolicy(q); + } + } } catch (Exception e) @@ -126,7 +139,10 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { if (config != null) { - _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); + if (_logger.isInfoEnabled()) + { + _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); + } int count = q.getMessageCount(); |
