From 3835253610d675befdc21ff6943470c94d23f7c8 Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Wed, 31 Jan 2007 11:29:33 +0000 Subject: (Patch submitted by Rupert Smith) Added a ping latency test. Uploaded new junit-toolkit snapshot for self timed tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@501804 13f79535-47bb-0310-9956-ffa450edef68 --- java/perftests/pom.xml | 1 + .../apache/qpid/requestreply/PingPongProducer.java | 18 +- .../org/apache/qpid/ping/PingLatencyTestPerf.java | 317 +++++++++++++++++++++ 3 files changed, 329 insertions(+), 7 deletions(-) create mode 100644 java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java (limited to 'java/perftests') diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 712be2e34a..758678fc84 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -154,6 +154,7 @@ -n Ping-Once -s [1] -r 1 -t testPingOk -o . org.apache.qpid.ping.PingTestPerf -n Ping-Once-Async -s [1] -r 1 -t testAsyncPingOk -o . org.apache.qpid.ping.PingAsyncTestPerf + -n Ping-Latency -s [1000] -d 10S -t testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf -n Ping-Tx -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf transacted=true diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 310ec5f5e3..885277c533 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -198,6 +198,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Holds the default verbose mode. */ public static final boolean DEFAULT_VERBOSE = false; + /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ + public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; + /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ private static AtomicLong idGenerator = new AtomicLong(0L); @@ -629,12 +632,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Print out ping times for every message in verbose mode only. if (_verbose) { - Long timestamp = message.getLongProperty("timestamp"); + Long timestamp = message.getLongProperty(MESSAGE_TIMESTAMP_PROPNAME); if (timestamp != null) { - long diff = System.currentTimeMillis() - timestamp; - _logger.trace("Time for round trip: " + diff); + long diff = System.nanoTime() - timestamp; + _logger.trace("Time for round trip (nanos): " + diff); } } } @@ -762,7 +765,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis committed = false; // Re-timestamp the message. - message.setLongProperty("timestamp", System.currentTimeMillis()); + message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); // Round robin the destinations as the messages are sent. //return _destinationCount; @@ -805,7 +808,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { // Generate a sample message and time stamp it. ObjectMessage msg = getTestMessage(_replyDestination, _messageSize, _persistent); - msg.setLongProperty("timestamp", System.currentTimeMillis()); + msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); // Send the message and wait for a reply. pingAndWaitForReply(msg, DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT); @@ -863,8 +866,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException { ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); - // Timestamp the message. - //msg.setLongProperty("timestamp", System.currentTimeMillis()); + + // Timestamp the message in nanoseconds. + msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); return msg; } diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java new file mode 100644 index 0000000000..620ddd13f7 --- /dev/null +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java @@ -0,0 +1,317 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +/** + * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing + * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for + * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from + * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than + * waiting until all expected replies are received. + * + *

This test does not output timings for every single ping message, as when running at high volume, writing the test + * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The frequency + * of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the default of every + * {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. + * + *

The size parameter logged for each individual ping is set to the size of the batch of messages that the individual + * timed ping was taken from, rather than 1 for a single message. This is so that the total throughput (messages / time) + * can be calculated in order to examine the relationship between throughput and latency. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Send many ping messages and output timings for sampled individual pings. + *
+ */ +public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "BatchSize"; + + /** Holds the default test results logging batch size. */ + public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; + + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; + + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + + /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ + private Map perCorrelationIds = + Collections.synchronizedMap(new HashMap()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ + public PingLatencyTestPerf(String name) + { + super(name); + + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Latency Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingLatencyTestPerf("testPingLatency")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until + * all replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + */ + public void testPingLatency(int numPings) throws Exception + { + _logger.debug("public void testPingLatency(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); + _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = numPings; + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these + // messages. + pingClient.setChainedMessageListener(batchedResultsListener); + + // Generate a sample message of the specified size. + ObjectMessage msg = + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < numPings) + { + tc.completeTest(false, 0); + } + + // Remove the chained message listener from the ping producer. + pingClient.removeChainedMessageListener(); + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(messageCorrelationId); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can + * be attached to the pinger, in order to receive notifications about every message received and the number remaining + * to be received. Whenever the number remaining crosses a batch size boundary this results listener outputs a test + * timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount) throws JMSException + { + _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); + + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + + // Extract the send time from the message and work out from the current time, what the ping latency was. + // The ping producer time stamps messages in nanoseconds. + long startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); + long now = System.nanoTime(); + long pingTime = now - startTime; + + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + + tc.completeTest(true, receivedInBatch, pingTime); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of + * the total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } +} -- cgit v1.2.1