diff options
| author | Keith Wall <kwall@apache.org> | 2012-12-27 10:16:13 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2012-12-27 10:16:13 +0000 |
| commit | a49cadaefb3efc5f966cca00fbe5a8a58ee956dc (patch) | |
| tree | cd3dfa265fd5564364df304eff841df2fcf7996f /qpid/java/perftests | |
| parent | 0c0a95ec35af64197af588c0e981fb5e026bef53 (diff) | |
| download | qpid-python-a49cadaefb3efc5f966cca00fbe5a8a58ee956dc.tar.gz | |
NO-JIRA: [Java Broker] Perf Tests - tweak queue drain algorithm to better handle a slow broker (exposed by new batch size tests)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1426152 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests')
| -rw-r--r-- | qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java | 10 | ||||
| -rw-r--r-- | qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java | 28 |
2 files changed, 25 insertions, 13 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java index 8c69e5694b..8d25f86b77 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java @@ -103,16 +103,22 @@ public class ConsumerParticipant implements Participant } Date end = new Date(); - int numberOfMessagesSent = _totalNumberOfMessagesReceived.get(); + int numberOfMessagesReceived = _totalNumberOfMessagesReceived.get(); long totalPayloadSize = _totalPayloadSizeOfAllMessagesReceived.get(); int payloadSize = getPayloadSizeForResultIfConstantOrZeroOtherwise(_allConsumedPayloadSizes); + if (LOGGER.isInfoEnabled()) + { + LOGGER.info("Consumer {} finished consuming. Number of messages consumed: {}", + getName(), numberOfMessagesReceived); + } + ConsumerParticipantResult result = _resultFactory.createForConsumer( getName(), registeredClientName, _command, acknowledgeMode, - numberOfMessagesSent, + numberOfMessagesReceived, payloadSize, totalPayloadSize, start, end, _messageLatencies); diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java index 0b906d228f..ef2cfb6cd4 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java @@ -25,7 +25,6 @@ import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; -import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; import org.apache.qpid.disttest.DistributedTestException; @@ -34,12 +33,13 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class QpidQueueCreator implements QueueCreator { private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class); private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable(); private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime"; - private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 5000); + private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500); @Override public void createQueues(Connection connection, Session session, List<QueueConfig> configs) @@ -61,10 +61,8 @@ public class QpidQueueCreator implements QueueCreator // drainQueue method is added because deletion of queue with a lot // of messages takes time and might cause the timeout exception - if (queueHasMessages(amqSession, destination)) - { - drainQueue(connection, destination); - } + drainQueue(connection, destination); + deleteQueue(amqSession, destination.getAMQQueueName()); } } @@ -81,13 +79,12 @@ public class QpidQueueCreator implements QueueCreator } } - private boolean queueHasMessages(AMQSession<?, ?> amqSession, AMQDestination destination) + private long getQueueDepth(AMQSession<?, ?> amqSession, AMQDestination destination) { try { long queueDepth = amqSession.getQueueDepth(destination); - LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), queueDepth); - return queueDepth > 0; + return queueDepth; } catch (Exception e) { @@ -103,10 +100,19 @@ public class QpidQueueCreator implements QueueCreator LOGGER.debug("About to drain the queue {}", destination.getQueueName()); noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE); MessageConsumer messageConsumer = noAckSession.createConsumer(destination); + + long currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination); int counter = 0; - while(messageConsumer.receive(_drainPollTimeout) != null) + while (currentQueueDepth > 0) { - counter++; + LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), currentQueueDepth); + + while(messageConsumer.receive(_drainPollTimeout) != null) + { + counter++; + } + + currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination); } LOGGER.info("Drained {} message(s) from queue {} ", counter, destination.getQueueName()); messageConsumer.close(); |
