diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2010-05-18 14:44:43 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2010-05-18 14:44:43 +0000 |
| commit | cb881fa7637a654697c46513e27d3072a0e3d195 (patch) | |
| tree | cbd8eb17eca3fbdae9572ea97879cf3eb0a391e0 /java/broker-plugins/experimental | |
| parent | 6768b7db27e89ee454693849cea6883aef48f941 (diff) | |
| download | qpid-python-cb881fa7637a654697c46513e27d3072a0e3d195.tar.gz | |
QPID-1447 : Update Plugins to use changes to ConfigurationPlugin, Update Test to correctly run and prevent failover.
Update excludes to include test in Java Broker runs but not CPP or 010.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945683 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-plugins/experimental')
5 files changed, 122 insertions, 40 deletions
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF b/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF index ff98e7bdca..3b2fe3d37d 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF +++ b/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF @@ -3,20 +3,26 @@ Bundle-ManifestVersion: 2 Bundle-Name: Qpid Slow Consumer Detection Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true Bundle-Version: 1.0.0 -Bundle-Activator: org.apache.qpid.server.virtualhost.plugins.Activator +Bundle-Activator: org.apache.qpid.server.virtualhost.plugin.Activator Import-Package: org.osgi.framework, org.apache.qpid.server.configuration.plugins, org.apache.qpid.server.configuration, org.apache.qpid.server.virtualhost.plugins, org.apache.qpid.server.virtualhost, org.apache.qpid.server.queue, + org.apache.qpid.server.binding, + org.apache.qpid.server.exchange, org.apache.qpid.server.registry, org.apache.qpid.server.plugins, + org.apache.qpid.server.protocol, + org.apache.qpid.protocol, + org.apache.qpid.framing, org.apache.qpid, org.apache.log4j, org.apache.commons.configuration Bundle-RequiredExecutionEnvironment: JavaSE-1.6 Bundle-ClassPath: . Bundle-ActivationPolicy: lazy -Export-Package: org.apache.qpid.server.virtualhost.plugins;uses:="org.osgi.framework" +Export-Package: org.apache.qpid.server.virtualhost.plugin;uses:="org.osgi.framework", + org.apache.qpid.server.virtualhost.plugin.policies diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java index b7719875a0..a8ebd024b1 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java @@ -47,18 +47,13 @@ public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugi { return new String[]{ "virtualhosts.virtualhost.queues.slow-consumer-detection.policy", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy", - "virtualhosts.virtualhost.topics.slow-consumer-detection.policy", - "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy"}; + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy"}; } } public String[] getElementsProcessed() { - // NOTE: the use of '@name]' rather than '[@name]' this appears to be - // a bug in commons configuration. - //fixme - Simple test case needs raised and JIRA raised on Commons - return new String[]{"@name]", "options"}; + return new String[]{"[@name]", "options"}; } public String getPolicyName() diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java index a652539f14..c5281c626d 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java @@ -29,7 +29,10 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.Set; public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin { @@ -47,9 +50,7 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin public String[] getParentPaths() { return new String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection", - "virtualhosts.virtualhost.topics.slow-consumer-detection", - "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection"}; + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection"}; } } @@ -92,6 +93,21 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class); + Iterator<?> keys = policyConfig.getConfig().getKeys(); + + while (keys.hasNext()) + { + String key = (String) keys.next(); + + _logger.debug("Policy Keys:" + key); + + } + + if (policyConfig == null) + { + throw new ConfigurationException("No Slow Consumer Policy specified at:" + path + ". Known Policies:" + factories.keySet()); + } + if (_logger.isDebugEnabled()) { _logger.debug("Configured SCDQC"); diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java index fbbb205ff0..e69925f2b1 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java @@ -103,12 +103,26 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin */ private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) { + if (config != null) + { - _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); - return config != null && - (q.getMessageCount() >= config.getMessageCount() || - q.getQueueDepth() >= config.getDepth() || - q.getOldestMessageArrivalTime() >= config.getMessageAge()); + if ((config.getMessageCount() != 0 && q.getMessageCount() >= config.getMessageCount()) || + (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) || + (config.getMessageAge() != 0 && q.getOldestMessageArrivalTime() >= config.getMessageAge())) + { + + if (_logger.isInfoEnabled()) + { + _logger.info("Detected Slow Consumer on Queue(" + q.getName() + ")"); + _logger.info("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount()); + _logger.info("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth()); + _logger.info("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge()); + } + + return true; + } + } + return false; } } diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java index 0789367320..21d1f265fd 100644 --- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java +++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java @@ -24,6 +24,7 @@ import org.apache.commons.configuration.ConfigurationException; import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.test.utils.QpidTestCase; @@ -44,7 +45,7 @@ import java.util.concurrent.TimeUnit; * Slow consumers should on a topic should expect to receive a * 506 : Resource Error if the hit a predefined threshold. */ -public class SlowConsumerTest extends QpidTestCase implements ExceptionListener +public class SlowConsumerTest extends QpidTestCase implements ExceptionListener, ConnectionListener { Destination _destination; private CountDownLatch _disconnectionLatch = new CountDownLatch(1); @@ -55,6 +56,7 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener private static final long DISCONNECTION_WAIT = 5; private Exception _publisherError = null; private JMSException _connectionException = null; + private static final long JOIN_WAIT = 5000; @Override public void setUp() throws Exception, ConfigurationException, NamingException @@ -68,30 +70,43 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener + getConnectionURL().getVirtualHost().substring(1) + ".slow-consumer-detection.timeunit", "SECONDS"); - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - "queues.slow-consumer-detection." + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.slow-consumer-detection." + "policy[@name]", "TopicDelete"); + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.maximumMessageCount", "1"); + + + /** * Queue Configuration <slow-consumer-detection> <!-- The depth before which the policy will be applied--> - <depth>4235264</depth> - - <!-- The message age before which the policy will be applied--> - <messageAge>600000</messageAge> - - <!-- The number of message before which the policy will be applied--> - <messageCount>50</messageCount> - - <!-- Policies configuration --> - <policy name="TopicDelete"> - <options> - <option name="delete-persistent" value="true"/> - </options> - </policy> + <depth>4235264</depth> + + <!-- The message age before which the policy will be applied--> + <messageAge>600000</messageAge> + + <!-- The number of message before which the policy will be applied--> + <messageCount>50</messageCount> + + <!-- Policies configuration --> + <policy name="TopicDelete"> + <options> + <option name="delete-persistent" value="true"/> + </options> + </policy> + + <policy> + <name>TopicDelete"</name> + <topicDelete> + <delete-persistent/> + </topicDelete> + </policy> </slow-consumer-detection> */ @@ -105,8 +120,6 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener </slow-consumer-detection> */ - - super.setUp(); } public void exclusiveTransientQueue(int ackMode) throws Exception @@ -167,8 +180,10 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener System.err.println("Linked:" + linked); - _publisher.join(); + _publisher.join(JOIN_WAIT); + assertFalse("Publisher still running", _publisher.isAlive()); + //Validate publishing occurred ok if (_publisherError != null) { @@ -205,6 +220,7 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) { publisher.send(createNextMessage(session, count)); + session.commit(); } } catch (Exception e) @@ -214,15 +230,22 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener } } }); + + _publisher.start(); } public void testAutoAckTopicConsumerMessageCount() throws Exception { MAX_QUEUE_MESSAGE_COUNT = 10; + setConfigurationProperty("virtualhosts.virtualhost." + getConnectionURL().getVirtualHost().substring(1) + - "queues.slow-consumer-detection" + - "messageCount", "9"); + ".queues.slow-consumer-detection." + + "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1)); + + //Start the broker + super.setUp(); + setMessageSize(MESSAGE_SIZE); @@ -233,6 +256,34 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener { _connectionException = e; + System.out.println("***** SCT Received Exception: "+e); + e.printStackTrace(); + _disconnectionLatch.countDown(); } + + /// Connection Listener + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + // Prevent Failover + return false; + } + + public boolean preResubscribe() + { + return false; + } + + public void failoverComplete() + { + } } |
