summaryrefslogtreecommitdiff
path: root/qpid/java/perftests
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-12-27 10:16:13 +0000
committerKeith Wall <kwall@apache.org>2012-12-27 10:16:13 +0000
commita49cadaefb3efc5f966cca00fbe5a8a58ee956dc (patch)
treecd3dfa265fd5564364df304eff841df2fcf7996f /qpid/java/perftests
parent0c0a95ec35af64197af588c0e981fb5e026bef53 (diff)
downloadqpid-python-a49cadaefb3efc5f966cca00fbe5a8a58ee956dc.tar.gz
NO-JIRA: [Java Broker] Perf Tests - tweak queue drain algorithm to better handle a slow broker (exposed by new batch size tests)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1426152 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests')
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java10
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java28
2 files changed, 25 insertions, 13 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
index 8c69e5694b..8d25f86b77 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
@@ -103,16 +103,22 @@ public class ConsumerParticipant implements Participant
}
Date end = new Date();
- int numberOfMessagesSent = _totalNumberOfMessagesReceived.get();
+ int numberOfMessagesReceived = _totalNumberOfMessagesReceived.get();
long totalPayloadSize = _totalPayloadSizeOfAllMessagesReceived.get();
int payloadSize = getPayloadSizeForResultIfConstantOrZeroOtherwise(_allConsumedPayloadSizes);
+ if (LOGGER.isInfoEnabled())
+ {
+ LOGGER.info("Consumer {} finished consuming. Number of messages consumed: {}",
+ getName(), numberOfMessagesReceived);
+ }
+
ConsumerParticipantResult result = _resultFactory.createForConsumer(
getName(),
registeredClientName,
_command,
acknowledgeMode,
- numberOfMessagesSent,
+ numberOfMessagesReceived,
payloadSize,
totalPayloadSize,
start, end, _messageLatencies);
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
index 0b906d228f..ef2cfb6cd4 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/jms/QpidQueueCreator.java
@@ -25,7 +25,6 @@ import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
-import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.disttest.DistributedTestException;
@@ -34,12 +33,13 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class QpidQueueCreator implements QueueCreator
{
private static final Logger LOGGER = LoggerFactory.getLogger(QpidQueueCreator.class);
private static final FieldTable EMPTY_QUEUE_BIND_ARGUMENTS = new FieldTable();
private static final String QUEUE_CREATOR_DRAIN_POLL_TIMEOUT = "qpid.disttest.queue.creator.drainPollTime";
- private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 5000);
+ private static int _drainPollTimeout = Integer.getInteger(QUEUE_CREATOR_DRAIN_POLL_TIMEOUT, 500);
@Override
public void createQueues(Connection connection, Session session, List<QueueConfig> configs)
@@ -61,10 +61,8 @@ public class QpidQueueCreator implements QueueCreator
// drainQueue method is added because deletion of queue with a lot
// of messages takes time and might cause the timeout exception
- if (queueHasMessages(amqSession, destination))
- {
- drainQueue(connection, destination);
- }
+ drainQueue(connection, destination);
+
deleteQueue(amqSession, destination.getAMQQueueName());
}
}
@@ -81,13 +79,12 @@ public class QpidQueueCreator implements QueueCreator
}
}
- private boolean queueHasMessages(AMQSession<?, ?> amqSession, AMQDestination destination)
+ private long getQueueDepth(AMQSession<?, ?> amqSession, AMQDestination destination)
{
try
{
long queueDepth = amqSession.getQueueDepth(destination);
- LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), queueDepth);
- return queueDepth > 0;
+ return queueDepth;
}
catch (Exception e)
{
@@ -103,10 +100,19 @@ public class QpidQueueCreator implements QueueCreator
LOGGER.debug("About to drain the queue {}", destination.getQueueName());
noAckSession = connection.createSession(false, org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
MessageConsumer messageConsumer = noAckSession.createConsumer(destination);
+
+ long currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination);
int counter = 0;
- while(messageConsumer.receive(_drainPollTimeout) != null)
+ while (currentQueueDepth > 0)
{
- counter++;
+ LOGGER.info("Queue {} has {} message(s)", destination.getQueueName(), currentQueueDepth);
+
+ while(messageConsumer.receive(_drainPollTimeout) != null)
+ {
+ counter++;
+ }
+
+ currentQueueDepth = getQueueDepth((AMQSession<?,?>)noAckSession, destination);
}
LOGGER.info("Drained {} message(s) from queue {} ", counter, destination.getQueueName());
messageConsumer.close();