summaryrefslogtreecommitdiff
path: root/qpid/java/perftests/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2012-07-02 14:26:36 +0000
committerKeith Wall <kwall@apache.org>2012-07-02 14:26:36 +0000
commit220f542e8e00932cfcd3d4c4f7832c58b7fef4cd (patch)
tree257838ce9fa6491e2333ef15fd0203c24bb23297 /qpid/java/perftests/src
parent7d96daca11cde3df099c652b671e3bf8ab2627b6 (diff)
downloadqpid-python-220f542e8e00932cfcd3d4c4f7832c58b7fef4cd.tar.gz
QPID-4089: Add latency tests into java performance test framework
Applied patch from Oleksandr Rudyy <orudyy@gmail.com>. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1356250 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/src')
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java55
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java20
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java2
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java63
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java12
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java7
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java24
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/SeriesStatistics.java111
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java23
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/SeriesStatisticsTest.java43
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregatorTest.java52
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/CSVFormaterTest.java5
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/expectedOutput.csv4
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java14
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/producerAndConsumerInSeparateClients.json3
-rw-r--r--qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java6
16 files changed, 416 insertions, 28 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
index 1b5e8276c2..368a25c929 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ConsumerParticipant.java
@@ -20,13 +20,16 @@
package org.apache.qpid.disttest.client;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -53,12 +56,17 @@ public class ConsumerParticipant implements Participant
private long _startTime;
private volatile Exception _asyncMessageListenerException;
+ private List<Long> _messageLatencies;
public ConsumerParticipant(final ClientJmsDelegate delegate, final CreateConsumerCommand command)
{
_jmsDelegate = delegate;
_command = command;
_resultFactory = new ParticipantResultFactory();
+ if (command.isEvaluateLatency())
+ {
+ _messageLatencies = new ArrayList<Long>();
+ }
}
@Override
@@ -105,7 +113,7 @@ public class ConsumerParticipant implements Participant
numberOfMessagesSent,
payloadSize,
totalPayloadSize,
- start, end);
+ start, end, _messageLatencies);
return result;
}
@@ -130,25 +138,42 @@ public class ConsumerParticipant implements Participant
*/
private boolean processMessage(Message message)
{
- int messageCount = _totalNumberOfMessagesReceived.incrementAndGet();
- if (LOGGER.isTraceEnabled())
- {
- LOGGER.trace("message " + messageCount + " received by " + this);
- }
- int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message);
- _allConsumedPayloadSizes.add(messagePayloadSize);
- _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize);
-
+ int messageCount = message == null? _totalNumberOfMessagesReceived.get() : _totalNumberOfMessagesReceived.incrementAndGet() ;
boolean batchEnabled = _command.getBatchSize() > 0;
boolean batchComplete = batchEnabled && messageCount % _command.getBatchSize() == 0;
-
- if (!batchEnabled || batchComplete)
+ if (message != null)
{
- if (LOGGER.isTraceEnabled() && batchEnabled)
+ if (LOGGER.isTraceEnabled())
+ {
+ LOGGER.trace("message " + messageCount + " received by " + this);
+ }
+ int messagePayloadSize = _jmsDelegate.calculatePayloadSizeFrom(message);
+ _allConsumedPayloadSizes.add(messagePayloadSize);
+ _totalPayloadSizeOfAllMessagesReceived.addAndGet(messagePayloadSize);
+
+ if (_command.isEvaluateLatency())
+ {
+ long mesageTimestamp;
+ try
+ {
+ mesageTimestamp = message.getJMSTimestamp();
+ }
+ catch (JMSException e)
+ {
+ throw new DistributedTestException("Cannot get message timestamp!", e);
+ }
+ long latency = System.currentTimeMillis() - mesageTimestamp;
+ _messageLatencies.add(latency);
+ }
+
+ if (!batchEnabled || batchComplete)
{
- LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
+ if (LOGGER.isTraceEnabled() && batchEnabled)
+ {
+ LOGGER.trace("Committing: batch size " + _command.getBatchSize() );
+ }
+ _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
}
- _jmsDelegate.commitOrAcknowledgeMessage(message, _command.getSessionName());
}
boolean reachedExpectedNumberOfMessages = _command.getNumberOfMessages() > 0 && messageCount >= _command.getNumberOfMessages();
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java
index 7f6b96b87c..50c0a74ccd 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/client/ParticipantResultFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.qpid.disttest.client;
+import java.util.Collection;
import java.util.Date;
import org.apache.qpid.disttest.message.ConsumerParticipantResult;
@@ -26,12 +27,24 @@ import org.apache.qpid.disttest.message.CreateParticpantCommand;
import org.apache.qpid.disttest.message.CreateProducerCommand;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.apache.qpid.disttest.message.ProducerParticipantResult;
+import org.apache.qpid.disttest.results.aggregation.SeriesStatistics;
public class ParticipantResultFactory
{
- public ConsumerParticipantResult createForConsumer(String participantName, String clientRegisteredName, CreateConsumerCommand command, int acknowledgeMode, int numberOfMessagesReceived, int payloadSize, long totalPayloadReceived, Date start, Date end)
+ public ConsumerParticipantResult createForConsumer(String participantName, String clientRegisteredName,
+ CreateConsumerCommand command, int acknowledgeMode, int numberOfMessagesReceived, int payloadSize,
+ long totalPayloadReceived, Date start, Date end)
+ {
+ return createForConsumer(participantName, clientRegisteredName, command, acknowledgeMode, numberOfMessagesReceived,
+ payloadSize, totalPayloadReceived, start, end, null);
+ }
+
+ public ConsumerParticipantResult createForConsumer(String participantName, String clientRegisteredName,
+ CreateConsumerCommand command, int acknowledgeMode, int numberOfMessagesReceived, int payloadSize,
+ long totalPayloadReceived, Date start, Date end, Collection<Long> messageLatencies)
{
ConsumerParticipantResult consumerParticipantResult = new ConsumerParticipantResult();
+ consumerParticipantResult.setMessageLatencies(messageLatencies);
setTestProperties(consumerParticipantResult, command, participantName, clientRegisteredName, acknowledgeMode);
setTestResultProperties(consumerParticipantResult, numberOfMessagesReceived, payloadSize, totalPayloadReceived, start, end);
@@ -45,6 +58,11 @@ public class ParticipantResultFactory
consumerParticipantResult.setTotalNumberOfConsumers(1);
consumerParticipantResult.setTotalNumberOfProducers(0);
+ SeriesStatistics statistics = new SeriesStatistics(messageLatencies);
+ consumerParticipantResult.setAverageLatency(statistics.getAverage());
+ consumerParticipantResult.setMinLatency(statistics.getMinimum());
+ consumerParticipantResult.setMaxLatency(statistics.getMaximum());
+ consumerParticipantResult.setLatencyStandardDeviation(statistics.getStandardDeviation());
return consumerParticipantResult;
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java
index ed47e02667..8b4503b0ad 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/controller/config/ConsumerConfig.java
@@ -10,6 +10,7 @@ public class ConsumerConfig extends ParticipantConfig
private String _selector;
private boolean _noLocal;
private boolean _synchronous;
+ private boolean _evaluateLatency;
// For Gson
public ConsumerConfig()
@@ -58,6 +59,7 @@ public class ConsumerConfig extends ParticipantConfig
createConsumerCommand.setSelector(_selector);
createConsumerCommand.setNoLocal(_noLocal);
createConsumerCommand.setSynchronous(_synchronous);
+ createConsumerCommand.setEvaluateLatency(_evaluateLatency);
return createConsumerCommand;
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java
index f92e3ea538..566d4e2076 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ConsumerParticipantResult.java
@@ -25,6 +25,8 @@ import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SELECTOR;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_SYNCHRONOUS_CONSUMER;
import static org.apache.qpid.disttest.message.ParticipantAttribute.IS_TOPIC;
+import java.util.Collection;
+
public class ConsumerParticipantResult extends ParticipantResult
{
private boolean _topic;
@@ -34,6 +36,12 @@ public class ConsumerParticipantResult extends ParticipantResult
private boolean _noLocal;
private boolean _synchronousConsumer;
+ private Collection<Long> _messageLatencies;
+ private long _minLatency;
+ private long _maxLatency;
+ private double _averageLatency;
+ private double _latencyStandardDeviation;
+
public ConsumerParticipantResult()
{
super(CommandType.CONSUMER_PARTICIPANT_RESULT);
@@ -115,4 +123,59 @@ public class ConsumerParticipantResult extends ParticipantResult
{
return _topic;
}
+
+ public Collection<Long> getMessageLatencies()
+ {
+ return _messageLatencies;
+ }
+
+ public void setMessageLatencies(Collection<Long> messageLatencies)
+ {
+ _messageLatencies = messageLatencies;
+ }
+
+ @OutputAttribute(attribute=ParticipantAttribute.MIN_LATENCY)
+ public long getMinLatency()
+ {
+ return _minLatency;
+ }
+
+ public void setMinLatency(long minLatency)
+ {
+ _minLatency = minLatency;
+ }
+
+ @OutputAttribute(attribute=ParticipantAttribute.MAX_LATENCY)
+ public long getMaxLatency()
+ {
+ return _maxLatency;
+ }
+
+ public void setMaxLatency(long maxLatency)
+ {
+ _maxLatency = maxLatency;
+ }
+
+ @OutputAttribute(attribute=ParticipantAttribute.AVERAGE_LATENCY)
+ public double getAverageLatency()
+ {
+ return _averageLatency;
+ }
+
+ public void setAverageLatency(double averageLatency)
+ {
+ _averageLatency = averageLatency;
+ }
+
+ @OutputAttribute(attribute=ParticipantAttribute.LATENCY_STANDARD_DEVIATION)
+ public double getLatencyStandardDeviation()
+ {
+ return _latencyStandardDeviation;
+ }
+
+ public void setLatencyStandardDeviation(double latencyStandardDeviation)
+ {
+ _latencyStandardDeviation = latencyStandardDeviation;
+ }
+
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java
index 678e428f94..68c21fbf83 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/CreateConsumerCommand.java
@@ -28,7 +28,7 @@ public class CreateConsumerCommand extends CreateParticpantCommand
private boolean _noLocal;
private boolean _synchronous;
private long _receiveTimeout = 5000;
-
+ private boolean _evaluateLatency;
public CreateConsumerCommand()
{
@@ -105,4 +105,14 @@ public class CreateConsumerCommand extends CreateParticpantCommand
{
return _receiveTimeout;
}
+
+ public boolean isEvaluateLatency()
+ {
+ return _evaluateLatency;
+ }
+
+ public void setEvaluateLatency(boolean evaluateLatency)
+ {
+ _evaluateLatency = evaluateLatency;
+ }
}
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java
index ccc7c0d9fb..0644ec16a3 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/message/ParticipantAttribute.java
@@ -54,7 +54,12 @@ public enum ParticipantAttribute
TOTAL_PAYLOAD_PROCESSED("totalPayloadProcessedB"),
THROUGHPUT("throughputKbPerS"),
TIME_TAKEN("timeTakenMs"),
- ERROR_MESSAGE("errorMessage");
+ ERROR_MESSAGE("errorMessage"),
+ MIN_LATENCY("minLatency"),
+ MAX_LATENCY("maxLatency"),
+ AVERAGE_LATENCY("averageLatency"),
+ LATENCY_STANDARD_DEVIATION("latencyStandardDeviation")
+ ;
private String _displayName;
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java
index 207d0131eb..dde717c71b 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/ParticipantResultAggregator.java
@@ -23,6 +23,7 @@ import java.util.Date;
import java.util.NavigableSet;
import java.util.TreeSet;
+import org.apache.qpid.disttest.message.ConsumerParticipantResult;
import org.apache.qpid.disttest.message.ParticipantResult;
public class ParticipantResultAggregator
@@ -44,6 +45,8 @@ public class ParticipantResultAggregator
private NavigableSet<Integer> _encounteredAcknowledgeMode = new TreeSet<Integer>();
private NavigableSet<String> _encountedTestNames = new TreeSet<String>();
+ private SeriesStatistics _latencyStatistics = new SeriesStatistics();
+
public ParticipantResultAggregator(Class<? extends ParticipantResult> targetClass, String aggregateResultName)
{
_aggregatedResultName = aggregateResultName;
@@ -56,12 +59,31 @@ public class ParticipantResultAggregator
{
rollupConstantAttributes(result);
computeVariableAttributes(result);
+ if (result instanceof ConsumerParticipantResult)
+ {
+ ConsumerParticipantResult consumerParticipantResult = (ConsumerParticipantResult)result;
+ _latencyStatistics.addMessageLatencies(consumerParticipantResult.getMessageLatencies());
+ _latencyStatistics.aggregate();
+ }
}
}
public ParticipantResult getAggregatedResult()
{
- ParticipantResult aggregatedResult = new ParticipantResult(_aggregatedResultName);
+ ParticipantResult aggregatedResult;
+ if (_targetClass == ConsumerParticipantResult.class)
+ {
+ ConsumerParticipantResult consumerParticipantResult = new ConsumerParticipantResult(_aggregatedResultName);
+ consumerParticipantResult.setAverageLatency(_latencyStatistics.getAverage());
+ consumerParticipantResult.setMinLatency(_latencyStatistics.getMinimum());
+ consumerParticipantResult.setMaxLatency(_latencyStatistics.getMaximum());
+ consumerParticipantResult.setLatencyStandardDeviation(_latencyStatistics.getStandardDeviation());
+ aggregatedResult = consumerParticipantResult;
+ }
+ else
+ {
+ aggregatedResult = new ParticipantResult(_aggregatedResultName);
+ }
setRolledUpConstantAttributes(aggregatedResult);
setComputedVariableAttributes(aggregatedResult);
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/SeriesStatistics.java b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/SeriesStatistics.java
new file mode 100644
index 0000000000..b93c210473
--- /dev/null
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/disttest/results/aggregation/SeriesStatistics.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.results.aggregation;
+
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class SeriesStatistics
+{
+ private long _minValue;
+ private long _maxValue;
+ private double _mean;
+ private double _standardDeviation;
+ private Collection<Long> _series = new CopyOnWriteArrayList<Long>();
+
+ public SeriesStatistics()
+ {
+ super();
+ }
+
+ public SeriesStatistics(Collection<Long> messageLatencies)
+ {
+ setMessageLatencies(messageLatencies);
+ }
+
+ public void addMessageLatencies(Collection<Long> messageLatencies)
+ {
+ if (messageLatencies != null)
+ {
+ _series.addAll(messageLatencies);
+ }
+ }
+
+ public void setMessageLatencies(Collection<Long> messageLatencies)
+ {
+ _series = messageLatencies;
+ aggregate();
+ }
+
+ public void aggregate()
+ {
+ if (_series != null && _series.size() > 0)
+ {
+ long minLatency = Long.MAX_VALUE;
+ long maxLatency = Long.MIN_VALUE;
+ long totalLatency = 0;
+ for (Long latency : _series)
+ {
+ totalLatency += latency;
+ minLatency = Math.min(minLatency, latency);
+ maxLatency = Math.max(maxLatency, latency);
+ }
+ _mean = ((double) totalLatency) / (double) _series.size();
+ _minValue = minLatency;
+ _maxValue = maxLatency;
+ double sum = 0;
+ for (Long latency : _series)
+ {
+ double diff = latency - _mean;
+ sum += diff * diff;
+ }
+ long size = _series.size() == 1 ? 1: _series.size() - 1;
+ _standardDeviation = Math.sqrt(sum / (double) size);
+ }
+ else
+ {
+ _mean = 0;
+ _minValue = 0;
+ _maxValue = 0;
+ _standardDeviation = 0;
+ }
+ }
+
+ public long getMinimum()
+ {
+ return _minValue;
+ }
+
+ public long getMaximum()
+ {
+ return _maxValue;
+ }
+
+ public double getAverage()
+ {
+ return _mean;
+ }
+
+ public double getStandardDeviation()
+ {
+ return _standardDeviation;
+ }
+}
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java
index ff7cfd2b41..58589d36f4 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/client/ConsumerParticipantTest.java
@@ -29,6 +29,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Collection;
+
import javax.jms.Message;
import javax.jms.Session;
@@ -36,6 +38,7 @@ import junit.framework.TestCase;
import org.apache.qpid.disttest.DistributedTestException;
import org.apache.qpid.disttest.jms.ClientJmsDelegate;
+import org.apache.qpid.disttest.message.ConsumerParticipantResult;
import org.apache.qpid.disttest.message.CreateConsumerCommand;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.mockito.InOrder;
@@ -177,4 +180,24 @@ public class ConsumerParticipantTest extends TestCase
verify(_delegate).closeTestConsumer(PARTICIPANT_NAME1);
}
+ public void testLatency() throws Exception
+ {
+ int numberOfMessages = 1;
+ long totalPayloadSize = PAYLOAD_SIZE_PER_MESSAGE * numberOfMessages;
+ _command.setNumberOfMessages(numberOfMessages);
+ _command.setEvaluateLatency(true);
+ _consumerParticipant = new ConsumerParticipant(_delegate, _command);
+ ParticipantResult result = _consumerParticipant.doIt(CLIENT_NAME);
+
+ assertExpectedConsumerResults(result, PARTICIPANT_NAME1, CLIENT_NAME, _testStartTime,
+ Session.CLIENT_ACKNOWLEDGE, null, numberOfMessages, PAYLOAD_SIZE_PER_MESSAGE, totalPayloadSize, null);
+
+ _inOrder.verify(_delegate).consumeMessage(PARTICIPANT_NAME1, RECEIVE_TIMEOUT);
+ _inOrder.verify(_delegate).calculatePayloadSizeFrom(_mockMessage);
+ _inOrder.verify(_delegate).commitOrAcknowledgeMessage(_mockMessage, SESSION_NAME1);
+ assertTrue("Unexpected consuemr results", result instanceof ConsumerParticipantResult);
+ Collection<Long> latencies = ((ConsumerParticipantResult)result).getMessageLatencies();
+ assertNotNull("Message latency is not cllected", latencies);
+ assertEquals("Unexpected message latency results", 1, latencies.size());
+ }
}
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/SeriesStatisticsTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/SeriesStatisticsTest.java
new file mode 100644
index 0000000000..ec8da8418f
--- /dev/null
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/SeriesStatisticsTest.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.disttest.results.aggregation;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import junit.framework.TestCase;
+
+public class SeriesStatisticsTest extends TestCase
+{
+ public static Collection<Long> SERIES = Arrays.asList(new Long[] { 2l, 4l, 4l, 4l, 5l, 5l, 7l, 9l, 5l });
+
+ public void testAggregate()
+ {
+ SeriesStatistics results = new SeriesStatistics();
+ results.addMessageLatencies(SERIES);
+ results.aggregate();
+ assertEquals("Unexpected average", 5.0, results.getAverage(), 0.01);
+ assertEquals("Unexpected min", 2, results.getMinimum());
+ assertEquals("Unexpected max", 9, results.getMaximum());
+ assertEquals("Unexpected standard deviation", 2.0, results.getStandardDeviation(), 0.01);
+ }
+
+}
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregatorTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregatorTest.java
index a803120cc6..9c00e7cf1c 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregatorTest.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/aggregation/TestResultAggregatorTest.java
@@ -19,6 +19,7 @@
package org.apache.qpid.disttest.results.aggregation;
import java.util.Date;
+import java.util.List;
import junit.framework.TestCase;
@@ -26,8 +27,6 @@ import org.apache.qpid.disttest.controller.TestResult;
import org.apache.qpid.disttest.message.ConsumerParticipantResult;
import org.apache.qpid.disttest.message.ParticipantResult;
import org.apache.qpid.disttest.message.ProducerParticipantResult;
-import org.apache.qpid.disttest.results.aggregation.AggregatedTestResult;
-import org.apache.qpid.disttest.results.aggregation.TestResultAggregator;
public class TestResultAggregatorTest extends TestCase
{
@@ -105,6 +104,55 @@ public class TestResultAggregatorTest extends TestCase
assertEquals(TestResultAggregator.AGGREGATED_ERROR_MESSAGE, aggregatedTestResult.getAllParticipantResult().getErrorMessage());
}
+ public void testAggregateResultsForConsumerWithLatencyResults() throws Exception
+ {
+ TestResult originalTestResult = createResultsFromTest();
+ List<ParticipantResult> results = originalTestResult.getParticipantResults();
+ for (ParticipantResult participantResult : results)
+ {
+ if (participantResult instanceof ConsumerParticipantResult)
+ {
+ ((ConsumerParticipantResult)participantResult).setMessageLatencies(SeriesStatisticsTest.SERIES);
+ break;
+ }
+ }
+
+ int numberOfOriginalParticipantResults = originalTestResult.getParticipantResults().size();
+ int expectedNumberOfResults = numberOfOriginalParticipantResults + EXPECTED_NUMBER_OF_AGGREGATED_RESULTS;
+
+ AggregatedTestResult aggregatedTestResult = _aggregator.aggregateTestResult(originalTestResult);
+
+ aggregatedTestResult.getAllConsumerParticipantResult().getTotalPayloadProcessed();
+ assertEquals(expectedNumberOfResults, aggregatedTestResult.getParticipantResults().size());
+
+ assertMinimalAggregatedResults(
+ aggregatedTestResult.getAllConsumerParticipantResult(),
+ TEST1_NAME, TEST1_ITERATION_NUMBER,
+ BATCH_SIZE, NUMBER_OF_MESSAGES_CONSUMED_IN_TOTAL, 2, 0);
+
+ assertLatencyAggregatedResults(aggregatedTestResult.getAllConsumerParticipantResult());
+
+ assertMinimalAggregatedResults(
+ aggregatedTestResult.getAllProducerParticipantResult(),
+ TEST1_NAME, TEST1_ITERATION_NUMBER,
+ BATCH_SIZE, NUMBER_OF_MESSAGES_PRODUCED, 0, 1);
+
+ assertMinimalAggregatedResults(
+ aggregatedTestResult.getAllParticipantResult(),
+ TEST1_NAME, TEST1_ITERATION_NUMBER,
+ BATCH_SIZE, NUMBER_OF_MESSAGES_CONSUMED_IN_TOTAL, 2, 1);
+ }
+
+ private void assertLatencyAggregatedResults(ParticipantResult allConsumerParticipantResult)
+ {
+ assertTrue("Unexpected result", allConsumerParticipantResult instanceof ConsumerParticipantResult);
+ ConsumerParticipantResult results = (ConsumerParticipantResult)allConsumerParticipantResult;
+ assertEquals("Unexpected average", 5.0, results.getAverageLatency(), 0.01);
+ assertEquals("Unexpected min", 2, results.getMinLatency());
+ assertEquals("Unexpected max", 9, results.getMaxLatency());
+ assertEquals("Unexpected standard deviation", 2.0, results.getLatencyStandardDeviation(), 0.01);
+ }
+
private void assertMinimalAggregatedResults(ParticipantResult result, String expectedTestName, int expectedIterationNumber, int expectedBatchSize, long expectedNumberOfMessagesProcessed, int expectedTotalNumberOfConsumers, int expectedTotalNumberOfProducers)
{
assertEquals("Unexpected test name in " + result.getParticipantName(), expectedTestName, result.getTestName());
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/CSVFormaterTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/CSVFormaterTest.java
index 6b4157a9b3..7c6cfed402 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/CSVFormaterTest.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/CSVFormaterTest.java
@@ -118,7 +118,10 @@ public class CSVFormaterTest extends TestCase
participantAttributes.put(THROUGHPUT, 2048);
participantAttributes.put(TIME_TAKEN, 1000);
participantAttributes.put(ERROR_MESSAGE, "error");
-
+ participantAttributes.put(MIN_LATENCY, 2l);
+ participantAttributes.put(MAX_LATENCY, 9l);
+ participantAttributes.put(AVERAGE_LATENCY, 5.0f);
+ participantAttributes.put(LATENCY_STANDARD_DEVIATION, 2.0f);
return participantAttributes;
}
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/expectedOutput.csv b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/expectedOutput.csv
index cfffb1e549..ada2303d46 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/expectedOutput.csv
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/disttest/results/formatting/expectedOutput.csv
@@ -1,2 +1,2 @@
-testName,iterationNumber,clientName,participantName,numberOfMessages,payloadSizeB,priority,timeToLiveMs,acknowledgeMode,deliveryMode,batchSize,maximumDurationMs,producerStartDelayMs,producerIntervalMs,isTopic,isDurableSubscription,isBrowsingSubscription,isSelector,isNoLocal,isSynchronousConsumer,totalNumberOfConsumers,totalNumberOfProducers,totalPayloadProcessedB,throughputKbPerS,timeTakenMs,errorMessage
-TEST1,0,CONFIGURED_CLIENT1,PARTICIPANT,0,1,2,3,4,5,6,7,8,9,true,false,true,false,true,false,1,2,1024,2048,1000,error \ No newline at end of file
+testName,iterationNumber,clientName,participantName,numberOfMessages,payloadSizeB,priority,timeToLiveMs,acknowledgeMode,deliveryMode,batchSize,maximumDurationMs,producerStartDelayMs,producerIntervalMs,isTopic,isDurableSubscription,isBrowsingSubscription,isSelector,isNoLocal,isSynchronousConsumer,totalNumberOfConsumers,totalNumberOfProducers,totalPayloadProcessedB,throughputKbPerS,timeTakenMs,errorMessage,minLatency,maxLatency,averageLatency,latencyStandardDeviation
+TEST1,0,CONFIGURED_CLIENT1,PARTICIPANT,0,1,2,3,4,5,6,7,8,9,true,false,true,false,true,false,1,2,1024,2048,1000,error,2,9,5.0,2.0
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java
index e06ace156f..c103af501c 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/ControllerAndClientTest.java
@@ -23,6 +23,7 @@ import static org.apache.qpid.systest.disttest.SystemTestConstants.COMMAND_RESPO
import static org.apache.qpid.systest.disttest.SystemTestConstants.REGISTRATION_TIMEOUT;
import static org.apache.qpid.systest.disttest.SystemTestConstants.TEST_RESULT_TIMEOUT;
+import java.util.Collection;
import java.util.List;
import javax.jms.Message;
@@ -73,6 +74,19 @@ public class ControllerAndClientTest extends DistributedTestSystemTestBase
List<ParticipantResult> test1ParticipantResults = testResult1.getParticipantResults();
assertEquals("Unexpected number of participant results for test 1", 2, test1ParticipantResults.size());
assertParticipantNames(test1ParticipantResults, "participantConsumer1", "participantProducer1");
+ ConsumerParticipantResult result = null;
+ for (ParticipantResult participantResult : test1ParticipantResults)
+ {
+ if (participantResult instanceof ConsumerParticipantResult)
+ {
+ result = (ConsumerParticipantResult)participantResult;
+ break;
+ }
+ }
+ assertNotNull("Consumer results not recived", result);
+ Collection<Long> latencies = result.getMessageLatencies();
+ assertNotNull("Latency results are not collected", latencies);
+ assertEquals("Unexpected latency results", 1, latencies.size());
}
public void testProducerClient() throws Exception
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/producerAndConsumerInSeparateClients.json b/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/producerAndConsumerInSeparateClients.json
index 8d210dce84..a008dc40d8 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/producerAndConsumerInSeparateClients.json
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/controllerandclient/producerAndConsumerInSeparateClients.json
@@ -42,7 +42,8 @@
{
"_name": "participantConsumer1",
"_destinationName": "direct://amq.direct//testQueue",
- "_numberOfMessages": 1
+ "_numberOfMessages": 1,
+ "_evaluateLatency": true
}
]
}
diff --git a/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java b/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java
index 63c9b42858..7e58e1b5b1 100644
--- a/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java
+++ b/qpid/java/perftests/src/test/java/org/apache/qpid/systest/disttest/endtoend/EndToEndTest.java
@@ -62,10 +62,10 @@ public class EndToEndTest extends QpidBrokerTestCase
assertEquals("Unexpected number of lines in CSV", numberOfExpectedRows, csvLines.length);
assertDataRowsHaveCorrectTestAndClientName("End To End 1", "producingClient", "participantProducer1", csvLines[1], 1);
- assertDataRowsHaveCorrectTestAndClientName("End To End 1", "consumingClient", "participantConsumer1", csvLines[2], 1);
+ assertDataRowsHaveCorrectTestAndClientName("End To End 1", "consumingClient", "participantConsumer1", csvLines[3], 1);
- assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_PARTICIPANTS_NAME, csvLines[3], 1);
- assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_CONSUMER_PARTICIPANTS_NAME, csvLines[4], 1);
+ assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_PARTICIPANTS_NAME, csvLines[4], 1);
+ assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_CONSUMER_PARTICIPANTS_NAME, csvLines[2], 1);
assertDataRowsHaveCorrectTestAndClientName("End To End 1", "", TestResultAggregator.ALL_PRODUCER_PARTICIPANTS_NAME, csvLines[5], 1);
}