summaryrefslogtreecommitdiff
path: root/java/broker-plugins
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2010-05-18 14:44:43 +0000
committerMartin Ritchie <ritchiem@apache.org>2010-05-18 14:44:43 +0000
commitcb881fa7637a654697c46513e27d3072a0e3d195 (patch)
treecbd8eb17eca3fbdae9572ea97879cf3eb0a391e0 /java/broker-plugins
parent6768b7db27e89ee454693849cea6883aef48f941 (diff)
downloadqpid-python-cb881fa7637a654697c46513e27d3072a0e3d195.tar.gz
QPID-1447 : Update Plugins to use changes to ConfigurationPlugin, Update Test to correctly run and prevent failover.
Update excludes to include test in Java Broker runs but not CPP or 010. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945683 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker-plugins')
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF10
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java9
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java22
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java24
-rw-r--r--java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java97
5 files changed, 122 insertions, 40 deletions
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF b/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF
index ff98e7bdca..3b2fe3d37d 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF
+++ b/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF
@@ -3,20 +3,26 @@ Bundle-ManifestVersion: 2
Bundle-Name: Qpid Slow Consumer Detection
Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true
Bundle-Version: 1.0.0
-Bundle-Activator: org.apache.qpid.server.virtualhost.plugins.Activator
+Bundle-Activator: org.apache.qpid.server.virtualhost.plugin.Activator
Import-Package: org.osgi.framework,
org.apache.qpid.server.configuration.plugins,
org.apache.qpid.server.configuration,
org.apache.qpid.server.virtualhost.plugins,
org.apache.qpid.server.virtualhost,
org.apache.qpid.server.queue,
+ org.apache.qpid.server.binding,
+ org.apache.qpid.server.exchange,
org.apache.qpid.server.registry,
org.apache.qpid.server.plugins,
+ org.apache.qpid.server.protocol,
+ org.apache.qpid.protocol,
+ org.apache.qpid.framing,
org.apache.qpid,
org.apache.log4j,
org.apache.commons.configuration
Bundle-RequiredExecutionEnvironment: JavaSE-1.6
Bundle-ClassPath: .
Bundle-ActivationPolicy: lazy
-Export-Package: org.apache.qpid.server.virtualhost.plugins;uses:="org.osgi.framework"
+Export-Package: org.apache.qpid.server.virtualhost.plugin;uses:="org.osgi.framework",
+ org.apache.qpid.server.virtualhost.plugin.policies
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
index b7719875a0..a8ebd024b1 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
@@ -47,18 +47,13 @@ public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugi
{
return new String[]{
"virtualhosts.virtualhost.queues.slow-consumer-detection.policy",
- "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy",
- "virtualhosts.virtualhost.topics.slow-consumer-detection.policy",
- "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy"};
+ "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy"};
}
}
public String[] getElementsProcessed()
{
- // NOTE: the use of '@name]' rather than '[@name]' this appears to be
- // a bug in commons configuration.
- //fixme - Simple test case needs raised and JIRA raised on Commons
- return new String[]{"@name]", "options"};
+ return new String[]{"[@name]", "options"};
}
public String getPolicyName()
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
index a652539f14..c5281c626d 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
@@ -29,7 +29,10 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
{
@@ -47,9 +50,7 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
public String[] getParentPaths()
{
return new String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection",
- "virtualhosts.virtualhost.queues.queue.slow-consumer-detection",
- "virtualhosts.virtualhost.topics.slow-consumer-detection",
- "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection"};
+ "virtualhosts.virtualhost.queues.queue.slow-consumer-detection"};
}
}
@@ -92,6 +93,21 @@ public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin
Map<String, SlowConsumerPolicyPluginFactory> factories =
pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class);
+ Iterator<?> keys = policyConfig.getConfig().getKeys();
+
+ while (keys.hasNext())
+ {
+ String key = (String) keys.next();
+
+ _logger.debug("Policy Keys:" + key);
+
+ }
+
+ if (policyConfig == null)
+ {
+ throw new ConfigurationException("No Slow Consumer Policy specified at:" + path + ". Known Policies:" + factories.keySet());
+ }
+
if (_logger.isDebugEnabled())
{
_logger.debug("Configured SCDQC");
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
index fbbb205ff0..e69925f2b1 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
@@ -103,12 +103,26 @@ class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
*/
private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config)
{
+ if (config != null)
+ {
- _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
- return config != null &&
- (q.getMessageCount() >= config.getMessageCount() ||
- q.getQueueDepth() >= config.getDepth() ||
- q.getOldestMessageArrivalTime() >= config.getMessageAge());
+ if ((config.getMessageCount() != 0 && q.getMessageCount() >= config.getMessageCount()) ||
+ (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) ||
+ (config.getMessageAge() != 0 && q.getOldestMessageArrivalTime() >= config.getMessageAge()))
+ {
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Detected Slow Consumer on Queue(" + q.getName() + ")");
+ _logger.info("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount());
+ _logger.info("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth());
+ _logger.info("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge());
+ }
+
+ return true;
+ }
+ }
+ return false;
}
}
diff --git a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
index 0789367320..21d1f265fd 100644
--- a/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
+++ b/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
@@ -24,6 +24,7 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.AMQChannelClosedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.test.utils.QpidTestCase;
@@ -44,7 +45,7 @@ import java.util.concurrent.TimeUnit;
* Slow consumers should on a topic should expect to receive a
* 506 : Resource Error if the hit a predefined threshold.
*/
-public class SlowConsumerTest extends QpidTestCase implements ExceptionListener
+public class SlowConsumerTest extends QpidTestCase implements ExceptionListener, ConnectionListener
{
Destination _destination;
private CountDownLatch _disconnectionLatch = new CountDownLatch(1);
@@ -55,6 +56,7 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener
private static final long DISCONNECTION_WAIT = 5;
private Exception _publisherError = null;
private JMSException _connectionException = null;
+ private static final long JOIN_WAIT = 5000;
@Override
public void setUp() throws Exception, ConfigurationException, NamingException
@@ -68,30 +70,43 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener
+ getConnectionURL().getVirtualHost().substring(1) +
".slow-consumer-detection.timeunit", "SECONDS");
- setConfigurationProperty("virtualhosts.virtualhost." +
- getConnectionURL().getVirtualHost().substring(1) +
- "queues.slow-consumer-detection." +
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ ".queues.slow-consumer-detection." +
"policy[@name]", "TopicDelete");
+ setConfigurationProperty("virtualhosts.virtualhost."
+ + getConnectionURL().getVirtualHost().substring(1) +
+ ".queues.maximumMessageCount", "1");
+
+
+
/**
* Queue Configuration
<slow-consumer-detection>
<!-- The depth before which the policy will be applied-->
- <depth>4235264</depth>
-
- <!-- The message age before which the policy will be applied-->
- <messageAge>600000</messageAge>
-
- <!-- The number of message before which the policy will be applied-->
- <messageCount>50</messageCount>
-
- <!-- Policies configuration -->
- <policy name="TopicDelete">
- <options>
- <option name="delete-persistent" value="true"/>
- </options>
- </policy>
+ <depth>4235264</depth>
+
+ <!-- The message age before which the policy will be applied-->
+ <messageAge>600000</messageAge>
+
+ <!-- The number of message before which the policy will be applied-->
+ <messageCount>50</messageCount>
+
+ <!-- Policies configuration -->
+ <policy name="TopicDelete">
+ <options>
+ <option name="delete-persistent" value="true"/>
+ </options>
+ </policy>
+
+ <policy>
+ <name>TopicDelete"</name>
+ <topicDelete>
+ <delete-persistent/>
+ </topicDelete>
+ </policy>
</slow-consumer-detection>
*/
@@ -105,8 +120,6 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener
</slow-consumer-detection>
*/
-
- super.setUp();
}
public void exclusiveTransientQueue(int ackMode) throws Exception
@@ -167,8 +180,10 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener
System.err.println("Linked:" + linked);
- _publisher.join();
+ _publisher.join(JOIN_WAIT);
+ assertFalse("Publisher still running", _publisher.isAlive());
+
//Validate publishing occurred ok
if (_publisherError != null)
{
@@ -205,6 +220,7 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener
for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++)
{
publisher.send(createNextMessage(session, count));
+ session.commit();
}
}
catch (Exception e)
@@ -214,15 +230,22 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener
}
}
});
+
+ _publisher.start();
}
public void testAutoAckTopicConsumerMessageCount() throws Exception
{
MAX_QUEUE_MESSAGE_COUNT = 10;
+
setConfigurationProperty("virtualhosts.virtualhost." +
getConnectionURL().getVirtualHost().substring(1) +
- "queues.slow-consumer-detection" +
- "messageCount", "9");
+ ".queues.slow-consumer-detection." +
+ "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1));
+
+ //Start the broker
+ super.setUp();
+
setMessageSize(MESSAGE_SIZE);
@@ -233,6 +256,34 @@ public class SlowConsumerTest extends QpidTestCase implements ExceptionListener
{
_connectionException = e;
+ System.out.println("***** SCT Received Exception: "+e);
+ e.printStackTrace();
+
_disconnectionLatch.countDown();
}
+
+ /// Connection Listener
+
+ public void bytesSent(long count)
+ {
+ }
+
+ public void bytesReceived(long count)
+ {
+ }
+
+ public boolean preFailover(boolean redirect)
+ {
+ // Prevent Failover
+ return false;
+ }
+
+ public boolean preResubscribe()
+ {
+ return false;
+ }
+
+ public void failoverComplete()
+ {
+ }
}