diff options
| author | Keith Wall <kwall@apache.org> | 2012-08-23 22:15:42 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-08-23 22:15:42 +0000 |
| commit | 2c5d0ff2702109d419511a83a5ece876885e8839 (patch) | |
| tree | 75a6a5b5352ad070b67c73ad6dc0faabd8188757 /qpid/java/perftests/src/main | |
| parent | 0fec9c39916981c735d565694d8c25d121768256 (diff) | |
| download | qpid-python-2c5d0ff2702109d419511a83a5ece876885e8839.tar.gz | |
QPID-4053: Change performance test qpid queue creator to drain the queue before the deletion to avoid timeouts
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1376735 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/src/main')
4 files changed, 72 insertions, 14 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java index c80e641e5c..782f7ae2fd 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java @@ -224,12 +224,12 @@ public class ControllerJmsDelegate public void createQueues(List<QueueConfig> queues) { - _queueCreator.createQueues(_session, queues); + _queueCreator.createQueues(_connection, _session, queues); } public void deleteQueues(List<QueueConfig> queues) { - _queueCreator.deleteQueues(_session, queues); + _queueCreator.deleteQueues(_connection, _session, queues); } public void addCommandListener(CommandListener commandListener) diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java index 4d4850eccf..d7e0007b28 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java @@ -20,18 +20,19 @@ package org.apache.qpid.disttest.jms; import java.util.List; +import javax.jms.Connection; import javax.jms.Session; import org.apache.qpid.disttest.controller.config.QueueConfig; public class NoOpQueueCreator implements QueueCreator { @Override - public void createQueues(Session session, List<QueueConfig> configs) + public void createQueues(Connection connection, Session session, List<QueueConfig> configs) { } @Override - public void deleteQueues(Session session, List<QueueConfig> configs) + public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs) { } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java index 6874abe7d4..4ce8efeae2 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java @@ -20,11 +20,15 @@ package org.apache.qpid.disttest.jms; import java.util.List; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.disttest.DistributedTestException; import org.apache.qpid.disttest.controller.config.QueueConfig; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +38,7 @@ public class QpidQueueCreator implements QueueCreator private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable(); @Override - public void createQueues(Session session, List<QueueConfig> configs) + public void createQueues(Connection connection, Session session, List<QueueConfig> configs) { AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; for (QueueConfig queueConfig : configs) @@ -44,12 +48,65 @@ public class QpidQueueCreator implements QueueCreator } @Override - public void deleteQueues(Session session, List<QueueConfig> configs) + public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs) { AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; for (QueueConfig queueConfig : configs) { - deleteQueue(amqSession, queueConfig); + AMQDestination destination = createAMQDestination(amqSession, queueConfig); + + // drainQueue method is added because deletion of queue with a lot + // of messages takes time and might cause the timeout exception + drainQueue(connection, destination); + deleteQueue(amqSession, destination.getAMQQueueName()); + } + } + + private AMQDestination createAMQDestination(AMQSession<?, ?> amqSession, QueueConfig queueConfig) + { + try + { + return (AMQDestination) amqSession.createQueue(queueConfig.getName()); + } + catch (Exception e) + { + throw new DistributedTestException("Failed to create amq destionation object:" + queueConfig, e); + } + } + + private void drainQueue(Connection connection, AMQDestination destination) + { + Session noAckSession = null; + try + { + LOGGER.debug("About to drain the queue " + destination); + noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + MessageConsumer messageConsumer = noAckSession.createConsumer(destination); + int counter = 0; + while(messageConsumer.receive(1000l) != null) + { + counter++; + } + LOGGER.debug("Drained " + counter + " messages from queue " + destination); + messageConsumer.close(); + } + catch (Exception e) + { + throw new DistributedTestException("Failed to drain queue:" + destination, e); + } + finally + { + if (noAckSession != null) + { + try + { + noAckSession.close(); + } + catch (JMSException e) + { + throw new DistributedTestException("Failed to close n/a session:" + noAckSession, e); + } + } } } @@ -74,20 +131,19 @@ public class QpidQueueCreator implements QueueCreator } } - private void deleteQueue(AMQSession<?, ?> session, QueueConfig queueConfig) + private void deleteQueue(AMQSession<?, ?> session, AMQShortString queueName) { try { // The Qpid AMQSession API currently makes the #deleteQueue method protected and the // raw protocol method public. This should be changed then we should switch the below to // use #deleteQueue. - AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName()); - session.sendQueueDelete(destination.getAMQQueueName()); - LOGGER.debug("Deleted queue " + queueConfig.getName()); + session.sendQueueDelete(queueName); + LOGGER.debug("Deleted queue " + queueName); } catch (Exception e) { - throw new DistributedTestException("Failed to delete queue:" + queueConfig.getName(), e); + throw new DistributedTestException("Failed to delete queue:" + queueName, e); } } } diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java index 0947dd53cb..a37cd7888c 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java @@ -20,12 +20,13 @@ package org.apache.qpid.disttest.jms; import java.util.List; +import javax.jms.Connection; import javax.jms.Session; import org.apache.qpid.disttest.controller.config.QueueConfig; public interface QueueCreator { - public void createQueues(final Session session, final List<QueueConfig> configs); - public void deleteQueues(final Session session, final List<QueueConfig> configs); + void createQueues(Connection connection, Session session, List<QueueConfig> configs); + void deleteQueues(Connection connection, Session session, List<QueueConfig> configs); } |
