summaryrefslogtreecommitdiff
path: root/qpid/java/perftests/src/main
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-08-23 22:15:42 +0000
committerKeith Wall <kwall@apache.org>2012-08-23 22:15:42 +0000
commit2c5d0ff2702109d419511a83a5ece876885e8839 (patch)
tree75a6a5b5352ad070b67c73ad6dc0faabd8188757 /qpid/java/perftests/src/main
parent0fec9c39916981c735d565694d8c25d121768256 (diff)
downloadqpid-python-2c5d0ff2702109d419511a83a5ece876885e8839.tar.gz
QPID-4053: Change performance test qpid queue creator to drain the queue before the deletion to avoid timeouts
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1376735 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/src/main')
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java4
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java5
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java72
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java5
4 files changed, 72 insertions, 14 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
index c80e641e5c..782f7ae2fd 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java
@@ -224,12 +224,12 @@ public class ControllerJmsDelegate
public void createQueues(List<QueueConfig> queues)
{
- _queueCreator.createQueues(_session, queues);
+ _queueCreator.createQueues(_connection, _session, queues);
}
public void deleteQueues(List<QueueConfig> queues)
{
- _queueCreator.deleteQueues(_session, queues);
+ _queueCreator.deleteQueues(_connection, _session, queues);
}
public void addCommandListener(CommandListener commandListener)
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
index 4d4850eccf..d7e0007b28 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java
@@ -20,18 +20,19 @@ package org.apache.qpid.disttest.jms;
import java.util.List;
+import javax.jms.Connection;
import javax.jms.Session;
import org.apache.qpid.disttest.controller.config.QueueConfig;
public class NoOpQueueCreator implements QueueCreator
{
@Override
- public void createQueues(Session session, List<QueueConfig> configs)
+ public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
{
}
@Override
- public void deleteQueues(Session session, List<QueueConfig> configs)
+ public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs)
{
}
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
index 6874abe7d4..4ce8efeae2 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
@@ -20,11 +20,15 @@ package org.apache.qpid.disttest.jms;
import java.util.List;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.controller.config.QueueConfig;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +38,7 @@ public class QpidQueueCreator implements QueueCreator
private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
@Override
- public void createQueues(Session session, List<QueueConfig> configs)
+ public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
{
AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session;
for (QueueConfig queueConfig : configs)
@@ -44,12 +48,65 @@ public class QpidQueueCreator implements QueueCreator
}
@Override
- public void deleteQueues(Session session, List<QueueConfig> configs)
+ public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs)
{
AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session;
for (QueueConfig queueConfig : configs)
{
- deleteQueue(amqSession, queueConfig);
+ AMQDestination destination = createAMQDestination(amqSession, queueConfig);
+
+ // drainQueue method is added because deletion of queue with a lot
+ // of messages takes time and might cause the timeout exception
+ drainQueue(connection, destination);
+ deleteQueue(amqSession, destination.getAMQQueueName());
+ }
+ }
+
+ private AMQDestination createAMQDestination(AMQSession<?, ?> amqSession, QueueConfig queueConfig)
+ {
+ try
+ {
+ return (AMQDestination) amqSession.createQueue(queueConfig.getName());
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to create amq destionation object:" + queueConfig, e);
+ }
+ }
+
+ private void drainQueue(Connection connection, AMQDestination destination)
+ {
+ Session noAckSession = null;
+ try
+ {
+ LOGGER.debug("About to drain the queue " + destination);
+ noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = noAckSession.createConsumer(destination);
+ int counter = 0;
+ while(messageConsumer.receive(1000l) != null)
+ {
+ counter++;
+ }
+ LOGGER.debug("Drained " + counter + " messages from queue " + destination);
+ messageConsumer.close();
+ }
+ catch (Exception e)
+ {
+ throw new DistributedTestException("Failed to drain queue:" + destination, e);
+ }
+ finally
+ {
+ if (noAckSession != null)
+ {
+ try
+ {
+ noAckSession.close();
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Failed to close n/a session:" + noAckSession, e);
+ }
+ }
}
}
@@ -74,20 +131,19 @@ public class QpidQueueCreator implements QueueCreator
}
}
- private void deleteQueue(AMQSession<?, ?> session, QueueConfig queueConfig)
+ private void deleteQueue(AMQSession<?, ?> session, AMQShortString queueName)
{
try
{
// The Qpid AMQSession API currently makes the #deleteQueue method protected and the
// raw protocol method public. This should be changed then we should switch the below to
// use #deleteQueue.
- AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName());
- session.sendQueueDelete(destination.getAMQQueueName());
- LOGGER.debug("Deleted queue " + queueConfig.getName());
+ session.sendQueueDelete(queueName);
+ LOGGER.debug("Deleted queue " + queueName);
}
catch (Exception e)
{
- throw new DistributedTestException("Failed to delete queue:" + queueConfig.getName(), e);
+ throw new DistributedTestException("Failed to delete queue:" + queueName, e);
}
}
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
index 0947dd53cb..a37cd7888c 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java
@@ -20,12 +20,13 @@ package org.apache.qpid.disttest.jms;
import java.util.List;
+import javax.jms.Connection;
import javax.jms.Session;
import org.apache.qpid.disttest.controller.config.QueueConfig;
public interface QueueCreator
{
- public void createQueues(final Session session, final List<QueueConfig> configs);
- public void deleteQueues(final Session session, final List<QueueConfig> configs);
+ void createQueues(Connection connection, Session session, List<QueueConfig> configs);
+ void deleteQueues(Connection connection, Session session, List<QueueConfig> configs);
}