diff options
| author | Keith Wall <kwall@apache.org> | 2012-08-23 22:15:42 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-08-23 22:15:42 +0000 |
| commit | 82a0f4e085a32dd0e2e3e5bdf19f772806ca353f (patch) | |
| tree | c1bcc56a96fbf680afeff0435cd73a9f879ec1f5 /java/perftests | |
| parent | 4b785ac3e36581bd2cf56f8225965526f9c3239f (diff) | |
| download | qpid-python-82a0f4e085a32dd0e2e3e5bdf19f772806ca353f.tar.gz | |
QPID-4053: Change performance test qpid queue creator to drain the queue before the deletion to avoid timeouts
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1376735 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests')
5 files changed, 85 insertions, 43 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java index c80e641e5c..782f7ae2fd 100644 --- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java +++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/ControllerJmsDelegate.java @@ -224,12 +224,12 @@ public class ControllerJmsDelegate public void createQueues(List<QueueConfig> queues) { - _queueCreator.createQueues(_session, queues); + _queueCreator.createQueues(_connection, _session, queues); } public void deleteQueues(List<QueueConfig> queues) { - _queueCreator.deleteQueues(_session, queues); + _queueCreator.deleteQueues(_connection, _session, queues); } public void addCommandListener(CommandListener commandListener) diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java index 4d4850eccf..d7e0007b28 100644 --- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java +++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/NoOpQueueCreator.java @@ -20,18 +20,19 @@ package org.apache.qpid.disttest.jms; import java.util.List; +import javax.jms.Connection; import javax.jms.Session; import org.apache.qpid.disttest.controller.config.QueueConfig; public class NoOpQueueCreator implements QueueCreator { @Override - public void createQueues(Session session, List<QueueConfig> configs) + public void createQueues(Connection connection, Session session, List<QueueConfig> configs) { } @Override - public void deleteQueues(Session session, List<QueueConfig> configs) + public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs) { } } diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java index 6874abe7d4..4ce8efeae2 100644 --- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java +++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java @@ -20,11 +20,15 @@ package org.apache.qpid.disttest.jms; import java.util.List; +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.disttest.DistributedTestException; import org.apache.qpid.disttest.controller.config.QueueConfig; +import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +38,7 @@ public class QpidQueueCreator implements QueueCreator private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable(); @Override - public void createQueues(Session session, List<QueueConfig> configs) + public void createQueues(Connection connection, Session session, List<QueueConfig> configs) { AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; for (QueueConfig queueConfig : configs) @@ -44,12 +48,65 @@ public class QpidQueueCreator implements QueueCreator } @Override - public void deleteQueues(Session session, List<QueueConfig> configs) + public void deleteQueues(Connection connection, Session session, List<QueueConfig> configs) { AMQSession<?, ?> amqSession = (AMQSession<?, ?>)session; for (QueueConfig queueConfig : configs) { - deleteQueue(amqSession, queueConfig); + AMQDestination destination = createAMQDestination(amqSession, queueConfig); + + // drainQueue method is added because deletion of queue with a lot + // of messages takes time and might cause the timeout exception + drainQueue(connection, destination); + deleteQueue(amqSession, destination.getAMQQueueName()); + } + } + + private AMQDestination createAMQDestination(AMQSession<?, ?> amqSession, QueueConfig queueConfig) + { + try + { + return (AMQDestination) amqSession.createQueue(queueConfig.getName()); + } + catch (Exception e) + { + throw new DistributedTestException("Failed to create amq destionation object:" + queueConfig, e); + } + } + + private void drainQueue(Connection connection, AMQDestination destination) + { + Session noAckSession = null; + try + { + LOGGER.debug("About to drain the queue " + destination); + noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); + MessageConsumer messageConsumer = noAckSession.createConsumer(destination); + int counter = 0; + while(messageConsumer.receive(1000l) != null) + { + counter++; + } + LOGGER.debug("Drained " + counter + " messages from queue " + destination); + messageConsumer.close(); + } + catch (Exception e) + { + throw new DistributedTestException("Failed to drain queue:" + destination, e); + } + finally + { + if (noAckSession != null) + { + try + { + noAckSession.close(); + } + catch (JMSException e) + { + throw new DistributedTestException("Failed to close n/a session:" + noAckSession, e); + } + } } } @@ -74,20 +131,19 @@ public class QpidQueueCreator implements QueueCreator } } - private void deleteQueue(AMQSession<?, ?> session, QueueConfig queueConfig) + private void deleteQueue(AMQSession<?, ?> session, AMQShortString queueName) { try { // The Qpid AMQSession API currently makes the #deleteQueue method protected and the // raw protocol method public. This should be changed then we should switch the below to // use #deleteQueue. - AMQDestination destination = (AMQDestination) session.createQueue(queueConfig.getName()); - session.sendQueueDelete(destination.getAMQQueueName()); - LOGGER.debug("Deleted queue " + queueConfig.getName()); + session.sendQueueDelete(queueName); + LOGGER.debug("Deleted queue " + queueName); } catch (Exception e) { - throw new DistributedTestException("Failed to delete queue:" + queueConfig.getName(), e); + throw new DistributedTestException("Failed to delete queue:" + queueName, e); } } } diff --git a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java index 0947dd53cb..a37cd7888c 100644 --- a/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java +++ b/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QueueCreator.java @@ -20,12 +20,13 @@ package org.apache.qpid.disttest.jms; import java.util.List; +import javax.jms.Connection; import javax.jms.Session; import org.apache.qpid.disttest.controller.config.QueueConfig; public interface QueueCreator { - public void createQueues(final Session session, final List<QueueConfig> configs); - public void deleteQueues(final Session session, final List<QueueConfig> configs); + void createQueues(Connection connection, Session session, List<QueueConfig> configs); + void deleteQueues(Connection connection, Session session, List<QueueConfig> configs); } diff --git a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java index 784e43469e..59396d46c0 100644 --- a/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java +++ b/java/perftests/src/test/java/org/apache/qpid/systest/disttest/QpidQueueCreatorTest.java @@ -29,7 +29,6 @@ import javax.jms.Session; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; -import org.apache.qpid.disttest.DistributedTestException; import org.apache.qpid.disttest.controller.config.QueueConfig; import org.apache.qpid.disttest.jms.QpidQueueCreator; @@ -37,6 +36,9 @@ public class QpidQueueCreatorTest extends DistributedTestSystemTestBase { private static final Map<String, Object> EMPTY_ATTRIBUTES = Collections.emptyMap(); + private static final boolean QUEUE_DURABILITY = true; + + private Connection _connection; private QpidQueueCreator _creator; private Session _session; private List<QueueConfig> _configs; @@ -46,20 +48,20 @@ public class QpidQueueCreatorTest extends DistributedTestSystemTestBase public void setUp() throws Exception { super.setUp(); - Connection connection = getConnection(); - _session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + _connection = getConnection(); + _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); _creator = new QpidQueueCreator(); _configs = new ArrayList<QueueConfig>(); - _queueName = "direct://amq.direct//" + getTestQueueName(); + _queueName = "direct://amq.direct//" + getTestQueueName() + "?durable='" + QUEUE_DURABILITY + "'"; } public void testCreateQueueWithoutAttributes() throws Exception { - _configs.add(new QueueConfig(_queueName, true, EMPTY_ATTRIBUTES)); + _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, EMPTY_ATTRIBUTES)); assertQueueBound(_queueName, false); - _creator.createQueues(_session, _configs); + _creator.createQueues(_connection, _session, _configs); assertQueueBound(_queueName, true); } @@ -68,46 +70,28 @@ public class QpidQueueCreatorTest extends DistributedTestSystemTestBase { Map<String, Object> attributes = new HashMap<String, Object>(); attributes.put("x-qpid-priorities", Integer.valueOf(5)); - _configs.add(new QueueConfig(_queueName, true, attributes)); + _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, attributes)); assertQueueBound(_queueName, false); - _creator.createQueues(_session, _configs); + _creator.createQueues(_connection, _session, _configs); assertQueueBound(_queueName, true); } public void testDeleteQueues() throws Exception { - _configs.add(new QueueConfig(_queueName, true, EMPTY_ATTRIBUTES)); + _configs.add(new QueueConfig(_queueName, QUEUE_DURABILITY, EMPTY_ATTRIBUTES)); assertQueueBound(_queueName, false); - _creator.createQueues(_session, _configs); + _creator.createQueues(_connection, _session, _configs); assertQueueBound(_queueName, true); - _creator.deleteQueues(_session, _configs); + _creator.deleteQueues(_connection, _session, _configs); assertQueueBound(_queueName, false); } - public void testDeleteQueueThatDoesNotExist() throws Exception - { - String queueThatDoesNotExist = _queueName; - List<QueueConfig> configs = new ArrayList<QueueConfig>(); - Map<String, Object> attributes = Collections.emptyMap(); - configs.add(new QueueConfig(queueThatDoesNotExist, true, attributes)); - - try - { - _creator.deleteQueues(_session, configs); - fail("Exception not thrown"); - } - catch (DistributedTestException e) - { - // PASS - } - } - private void assertQueueBound(String queueName, boolean isBound) throws Exception { AMQDestination destination = (AMQDestination)_session.createQueue(queueName); |
