summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-30 13:57:47 +0000
committerKeith Wall <kwall@apache.org>2011-11-30 13:57:47 +0000
commit6e79747ac572bceb085d53d0bccbf0604d9b435f (patch)
treeefea0ea2d0dc39c960b5b518467ef24c0bbd7964 /java
parent691018b60bc4723d666e610d3c29d42bb3ac41fe (diff)
downloadqpid-python-6e79747ac572bceb085d53d0bccbf0604d9b435f.tar.gz
QPID-3601: Occasional failure to delete queue on broker shutdown
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1208434 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java15
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java28
2 files changed, 35 insertions, 8 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java
index 0638ea362f..6f8020fc54 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java
@@ -87,6 +87,19 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
@Override
public void validateConfiguration() throws ConfigurationException
{
+ PluginManager pluginManager;
+ try
+ {
+ pluginManager = ApplicationRegistry.getInstance().getPluginManager();
+ }
+ catch (IllegalStateException ise)
+ {
+ // We see this happen during shutdown due to asynchronous reconfig performed IO threads
+ // running at the same time as the shutdown handler.
+ _policyPlugin = null;
+ return;
+ }
+
if (!containsPositiveLong("messageAge") &&
!containsPositiveLong("depth") &&
!containsPositiveLong("messageCount"))
@@ -96,8 +109,6 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
}
SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName());
-
- PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager();
Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getSlowConsumerPlugins();
if (policyConfig == null)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
index 5c4fe0aab8..248b3b2143 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
@@ -26,11 +26,13 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration;
import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.plugins.Plugin;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages;
+import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
@@ -56,7 +58,7 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
/**
* Configures the slow consumer disconnect plugin by adding a listener to each exchange on this
- * cirtual host to record all the configured queues in a cache for processing by the housekeeping
+ * virtual host to record all the configured queues in a cache for processing by the housekeeping
* thread.
*
* @see Plugin#configure(ConfigurationPlugin)
@@ -65,9 +67,10 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
_config = (SlowConsumerDetectionConfiguration) config;
_listener = new ConfiguredQueueBindingListener(getVirtualHost().getName());
- for (AMQShortString exchangeName : getVirtualHost().getExchangeRegistry().getExchangeNames())
+ final ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry();
+ for (AMQShortString exchangeName : exchangeRegistry.getExchangeNames())
{
- getVirtualHost().getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener);
+ exchangeRegistry.getExchange(exchangeName).addBindingListener(_listener);
}
}
@@ -87,11 +90,21 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
try
{
- SlowConsumerDetectionQueueConfiguration config =
+ final SlowConsumerDetectionQueueConfiguration config =
q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
if (checkQueueStatus(q, config))
{
- config.getPolicy().performPolicy(q);
+ final SlowConsumerPolicyPlugin policy = config.getPolicy();
+ if (policy == null)
+ {
+ // We would only expect to see this during shutdown
+ _logger.warn("No slow consumer policy for queue " + q.getName());
+ }
+ else
+ {
+ policy.performPolicy(q);
+ }
+
}
}
catch (Exception e)
@@ -126,7 +139,10 @@ public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
{
if (config != null)
{
- _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
+ }
int count = q.getMessageCount();