From 039cf25002ca2f395da9b8ce6a45d34aab894b8c Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Thu, 2 Aug 2007 09:55:47 +0000 Subject: Moved everything from test to main. Simpler to have a single jar for manifest with classpath jar creation. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@562059 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/perftests/QpidTestThroughputPerf.java | 170 +++++++++++ .../org/apache/qpid/ping/PingAsyncTestPerf.java | 294 ++++++++++++++++++ .../org/apache/qpid/ping/PingLatencyTestPerf.java | 335 +++++++++++++++++++++ .../java/org/apache/qpid/ping/PingTestPerf.java | 196 ++++++++++++ .../apache/qpid/requestreply/PingPongTestPerf.java | 251 +++++++++++++++ .../qpid/perftests/QpidTestThroughputPerf.java | 170 ----------- .../org/apache/qpid/ping/PingAsyncTestPerf.java | 294 ------------------ .../org/apache/qpid/ping/PingLatencyTestPerf.java | 335 --------------------- .../java/org/apache/qpid/ping/PingTestPerf.java | 196 ------------ .../apache/qpid/requestreply/PingPongTestPerf.java | 251 --------------- 10 files changed, 1246 insertions(+), 1246 deletions(-) create mode 100644 java/perftests/src/main/java/org/apache/qpid/perftests/QpidTestThroughputPerf.java create mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java create mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java create mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java create mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java delete mode 100644 java/perftests/src/test/java/org/apache/qpid/perftests/QpidTestThroughputPerf.java delete mode 100644 java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java delete mode 100644 java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java delete mode 100644 java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java delete mode 100644 java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java (limited to 'java/perftests/src') diff --git a/java/perftests/src/main/java/org/apache/qpid/perftests/QpidTestThroughputPerf.java b/java/perftests/src/main/java/org/apache/qpid/perftests/QpidTestThroughputPerf.java new file mode 100644 index 0000000000..760d1c84a4 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/perftests/QpidTestThroughputPerf.java @@ -0,0 +1,170 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.perftests; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.test.framework.Assertion; +import org.apache.qpid.test.framework.Circuit; +import org.apache.qpid.test.framework.FrameworkBaseCase; +import org.apache.qpid.test.framework.MessagingTestConfigProperties; +import org.apache.qpid.test.framework.sequencers.CircuitFactory; + +import uk.co.thebadgerset.junit.extensions.TestThreadAware; +import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +import java.util.LinkedList; + +/** + * QpidTestThroughputPerf runs a test over a {@link Circuit} controlled by the test parameters. It logs timings of + * the time required to receive samples consisting of batches of messages. As the time samples is taken over a reasonable + * sized message batch, it measures message throughput. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ *
+ * + * @todo Check that all of the messages were sent. Check that the receiving end got the same number of messages as + * the publishing end. + */ +public class QpidTestThroughputPerf extends FrameworkBaseCase implements TimingControllerAware, TestThreadAware +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(QpidTestThroughputPerf.class); + + /** Holds the timing controller, used to log test timings from self-timed tests. */ + private TimingController timingController; + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal threadSetup = new ThreadLocal(); + + /** + * Creates a new test case with the specified name. + * + * @param name The test case name. + */ + public QpidTestThroughputPerf(String name) + { + super(name); + } + + /** + * Performs the a basic P2P test case. + * + * @param numMessages The number of messages to send in the test. + */ + public void testThroughput(int numMessages) + { + log.debug("public void testThroughput(): called"); + + PerThreadSetup setup = threadSetup.get(); + assertNoFailures(setup.testCircuit.test(numMessages, new LinkedList())); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as known to the test + * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test + * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case + * name "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + log.debug("public String getTestCaseNameForTestMethod(String methodName = " + methodName + "): called"); + + return "DEFAULT_CIRCUIT_TEST"; + } + + /** + * Used by test runners that can supply a {@link uk.co.thebadgerset.junit.extensions.TimingController} to set the + * controller on an aware test. + * + * @param controller The timing controller. + */ + public void setTimingController(TimingController controller) + { + timingController = controller; + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + // Get the test parameters, any overrides on the command line will have been applied. + ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + // Customize the test parameters. + testProps.setProperty("TEST_NAME", "DEFAULT_CIRCUIT_TEST"); + testProps.setProperty(MessagingTestConfigProperties.SEND_DESTINATION_NAME_ROOT_PROPNAME, "testqueue"); + + // Get the test circuit factory to create test circuits and run the standard test procedure through. + CircuitFactory circuitFactory = getCircuitFactory(); + + // Create the test circuit. This projects the circuit onto the available test nodes and connects it up. + Circuit testCircuit = circuitFactory.createCircuit(testProps); + + // Store the test configuration for the thread. + PerThreadSetup setup = new PerThreadSetup(); + setup.testCircuit = testCircuit; + threadSetup.set(setup); + } + + /** + * Called when a test thread is destroyed. + */ + public void threadTearDown() + { } + + /** + * Holds the per-thread test configurations. + */ + protected static class PerThreadSetup + { + /** Holds the test circuit to run tests on. */ + Circuit testCircuit; + } + + /** + * Compiles all the tests in this class into a suite. + * + * @return The test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Qpid Throughput Performance Tests"); + + suite.addTest(new QpidTestThroughputPerf("testThroughput")); + + return suite; + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java new file mode 100644 index 0000000000..6c7f22c19a --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -0,0 +1,294 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +/** + * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller + * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test + * that it extends because it can output timings as replies are received, rather than waiting until all expected replies + * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending + * and recieving clients working asynchronously. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Send many ping messages and output timings asynchronously on batches received. + *
+ */ +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; + + /** Holds the default test results logging batch size. */ + public static final int TEST_RESULTS_BATCH_SIZE_DEFAULT = 1000; + + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; + + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + + /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ + private Map perCorrelationIds = + Collections.synchronizedMap(new HashMap()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ + public PingAsyncTestPerf(String name) + { + super(name); + + // Sets up the test parameters with defaults. + testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT)); + } + + /** + * Compile all the tests into a test suite. + * @return The test suite to run. Should only contain testAsyncPingOk method. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until + * all replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + * @throws Exception pass all errors out to the test harness + */ + public void testAsyncPingOk(int numPings) throws Exception + { + // _logger.debug("public void testAsyncPingOk(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + fail("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet()); + // String messageCorrelationId = perThreadSetup._correlationId; + // _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); + perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < perCorrelationId._expectedCount) + { + perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount); + } + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(perThreadSetup._correlationId); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the + * pinger, in order to receive notifications about every message received and the number remaining to be + * received. Whenever the number remaining crosses a batch size boundary this results listener outputs + * a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount) throws JMSException + { + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + /*_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + + "): called on batch boundary for message id: " + correlationId + " with thread id: " + + Thread.currentThread().getId());*/ + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + tc.completeTest(true, receivedInBatch); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of + * the total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java new file mode 100644 index 0000000000..af612d5430 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java @@ -0,0 +1,335 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.message.AMQMessage; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.requestreply.PingPongProducer; + +import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +/** + * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing + * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for + * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from + * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than + * waiting until all expected replies are received. + * + *

This test does not output timings for every single ping message, as when running at high volume, writing the test + * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The + * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the + * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. + * + *

The size parameter logged for each individual ping is set to the size of the batch of messages that the + * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput + * (messages / time) can be calculated in order to examine the relationship between throughput and latency. + * + *

CRC Card
Responsibilities Collaborations
Send many ping + * messages and output timings for sampled individual pings.
+ */ +public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; + + /** Holds the default test results logging batch size. */ + public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; + + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; + + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + + /** + * Holds test specifics by correlation id. This consists of the expected number of messages and the timing + * controler. + */ + private Map perCorrelationIds = + Collections.synchronizedMap(new HashMap()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ + public PingLatencyTestPerf(String name) + { + super(name); + + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); + } + + /** Compile all the tests into a test suite. */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Latency Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingLatencyTestPerf("testPingLatency")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all + * replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + */ + public void testPingLatency(int numPings) throws Exception + { + _logger.debug("public void testPingLatency(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); + _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = numPings; + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these + // messages. + pingClient.setChainedMessageListener(batchedResultsListener); + + // Generate a sample message of the specified size. + Message msg = + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < numPings) + { + tc.completeTest(false, 0); + } + + // Remove the chained message listener from the ping producer. + pingClient.removeChainedMessageListener(); + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(messageCorrelationId); + } + + /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can + * be attached to the pinger, in order to receive notifications about every message received and the number + * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener + * outputs a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + private boolean _strictAMQP; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + _strictAMQP = + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, + AMQSession.STRICT_AMQP_DEFAULT)); + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount) throws JMSException + { + _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); + + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + + // Extract the send time from the message and work out from the current time, what the ping latency was. + // The ping producer time stamps messages in nanoseconds. + long startTime; + + if (_strictAMQP) + { + Long value = + ((AMQMessage) message).getTimestampProperty(new AMQShortString( + PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME)); + + startTime = ((value == null) ? 0L : value); + } + else + { + startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); + } + + long now = System.nanoTime(); + long pingTime = now - startTime; + + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + + tc.completeTest(true, receivedInBatch, pingTime); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of the + * total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java new file mode 100644 index 0000000000..46333db844 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java @@ -0,0 +1,196 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; +import uk.co.thebadgerset.junit.extensions.TestThreadAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +import javax.jms.*; + +/** + * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times + * simultaneously to simluate many clients/producers/connections. + * + *

A single run of the test using the default JUnit test runner will result in the sending and timing of a single + * full round trip ping. This test may be scaled up using a suitable JUnit test runner. + * + *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a + * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, + * except if the connection is lost in which case an attempt to re-establish the setup is made. + * + *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that + * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the + * temporary queue. + * + *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ */ +public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware +{ + private static Logger _logger = Logger.getLogger(PingTestPerf.class); + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal threadSetup = new ThreadLocal(); + + /** Holds a property reader to extract the test parameters from. */ + protected ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); + + public PingTestPerf(String name) + { + super(name); + + _logger.debug("testParameters = " + testParameters); + } + + /** + * Compile all the tests into a test suite. + * @return The test method testPingOk. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingTestPerf("testPingOk")); + + return suite; + } + + public void testPingOk(int numPings) throws Exception + { + if (numPings == 0) + { + Assert.fail("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + + if (perThreadSetup == null) + { + Assert.fail("Could not get per thread test setup, it was null."); + } + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + Message msg = + perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // start the test + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null); + + // Fail the test if the timeout was exceeded. + if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) + { + Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + + numReplies); + } + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // This is synchronized because there is a race condition, which causes one connection to sleep if + // all threads try to create connection concurrently. + synchronized (this) + { + // Establish a client to ping a Destination and listen the reply back from same Destination + perThreadSetup._pingClient = new PingClient(testParameters); + perThreadSetup._pingClient.establishConnection(true, true); + } + // Start the client connection + perThreadSetup._pingClient.getConnection().start(); + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * Performs test fixture clean + */ + public void threadTearDown() + { + _logger.debug("public void threadTearDown(): called"); + + try + { + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + if ((perThreadSetup != null) && (perThreadSetup._pingClient != null)) + { + perThreadSetup._pingClient.close(); + } + } + } + catch (JMSException e) + { + _logger.warn("There was an exception during per thread tear down."); + } + finally + { + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); + } + } + + protected static class PerThreadSetup + { + /** + * Holds the test ping client. + */ + protected PingClient _pingClient; + protected String _correlationId; + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java new file mode 100644 index 0000000000..f4a6dc6554 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -0,0 +1,251 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import javax.jms.*; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +/** + * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run + * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from + * a producer to a conumer, then the consumer replies to the message on a temporary queue. + * + *

A single run of the test using the default JUnit test runner will result in the sending and timing of the number + * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled + * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more + * information on how to do this. + * + *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a + * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads + * gets its own connection/producer/consumer, this is only re-established if the connection is lost. + * + *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that + * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come + * back on the temporary queue. + * + *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ */ +public class PingPongTestPerf extends AsymptoticTestCase +{ + private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal threadSetup = new ThreadLocal(); + + // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in + // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner + // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. + // private Properties testParameters = System.getProperties(); + private ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); + + public PingPongTestPerf(String name) + { + super(name); + + _logger.debug(testParameters); + + // Sets up the test parameters with defaults. + /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, + Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT)); + testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, + PingPongProducer.PING_QUEUE_NAME_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, + Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, + Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, + Boolean.toString(PingPongProducer.VERBOSE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, + Boolean.toString(PingPongProducer.PUBSUB_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, + Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, + PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, + PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, + PingPongProducer.FAIL_AFTER_SEND_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, + PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, + Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, + Integer.toString(PingPongProducer.ACK_MODE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, + PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping-Pong Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingPongTestPerf("testPingPongOk")); + + return suite; + } + + private static void setSystemPropertyIfNull(String propName, String propValue) + { + if (System.getProperty(propName) == null) + { + System.setProperty(propName, propValue); + } + } + + public void testPingPongOk(int numPings) throws Exception + { + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + Message msg = + perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the message and wait for a reply. + int numReplies = + perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null); + + // Fail the test if the timeout was exceeded. + if (numReplies != numPings) + { + Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings); + } + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + try + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // Extract the test set up paramaeters. + String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); + String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); + String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); + String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME); + String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); + boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); + boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); + String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); + boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME); + + synchronized (this) + { + // Establish a bounce back client on the ping queue to bounce back the pings. + perThreadSetup._testPingBouncer = + new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent, + transacted, selector, verbose, pubsub); + + // Start the connections for client and producer running. + perThreadSetup._testPingBouncer.getConnection().start(); + + // Establish a ping-pong client on the ping queue to send the pings and receive replies with. + perThreadSetup._testPingProducer = new PingPongProducer(testParameters); + perThreadSetup._testPingProducer.establishConnection(true, true); + perThreadSetup._testPingProducer.getConnection().start(); + } + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * Performs test fixture clean + */ + public void threadTearDown() + { + _logger.debug("public void threadTearDown(): called"); + + try + { + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + perThreadSetup._testPingProducer.close(); + // perThreadSetup._testPingBouncer.close(); + } + + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); + } + catch (JMSException e) + { + _logger.warn("There was an exception during per thread tear down."); + } + } + + protected static class PerThreadSetup + { + /** + * Holds the test ping-pong producer. + */ + private PingPongProducer _testPingProducer; + + /** + * Holds the test ping client. + */ + private PingPongBouncer _testPingBouncer; + } +} diff --git a/java/perftests/src/test/java/org/apache/qpid/perftests/QpidTestThroughputPerf.java b/java/perftests/src/test/java/org/apache/qpid/perftests/QpidTestThroughputPerf.java deleted file mode 100644 index 760d1c84a4..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/perftests/QpidTestThroughputPerf.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.perftests; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.test.framework.Assertion; -import org.apache.qpid.test.framework.Circuit; -import org.apache.qpid.test.framework.FrameworkBaseCase; -import org.apache.qpid.test.framework.MessagingTestConfigProperties; -import org.apache.qpid.test.framework.sequencers.CircuitFactory; - -import uk.co.thebadgerset.junit.extensions.TestThreadAware; -import uk.co.thebadgerset.junit.extensions.TimingController; -import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; - -import java.util.LinkedList; - -/** - * QpidTestThroughputPerf runs a test over a {@link Circuit} controlled by the test parameters. It logs timings of - * the time required to receive samples consisting of batches of messages. As the time samples is taken over a reasonable - * sized message batch, it measures message throughput. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- *
- * - * @todo Check that all of the messages were sent. Check that the receiving end got the same number of messages as - * the publishing end. - */ -public class QpidTestThroughputPerf extends FrameworkBaseCase implements TimingControllerAware, TestThreadAware -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(QpidTestThroughputPerf.class); - - /** Holds the timing controller, used to log test timings from self-timed tests. */ - private TimingController timingController; - - /** Thread local to hold the per-thread test setup fields. */ - ThreadLocal threadSetup = new ThreadLocal(); - - /** - * Creates a new test case with the specified name. - * - * @param name The test case name. - */ - public QpidTestThroughputPerf(String name) - { - super(name); - } - - /** - * Performs the a basic P2P test case. - * - * @param numMessages The number of messages to send in the test. - */ - public void testThroughput(int numMessages) - { - log.debug("public void testThroughput(): called"); - - PerThreadSetup setup = threadSetup.get(); - assertNoFailures(setup.testCircuit.test(numMessages, new LinkedList())); - } - - /** - * Should provide a translation from the junit method name of a test to its test case name as known to the test - * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test - * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case - * name "TC2_BasicP2P". - * - * @param methodName The name of the JUnit test method. - * - * @return The name of the corresponding interop test case. - */ - public String getTestCaseNameForTestMethod(String methodName) - { - log.debug("public String getTestCaseNameForTestMethod(String methodName = " + methodName + "): called"); - - return "DEFAULT_CIRCUIT_TEST"; - } - - /** - * Used by test runners that can supply a {@link uk.co.thebadgerset.junit.extensions.TimingController} to set the - * controller on an aware test. - * - * @param controller The timing controller. - */ - public void setTimingController(TimingController controller) - { - timingController = controller; - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - // Get the test parameters, any overrides on the command line will have been applied. - ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); - - // Customize the test parameters. - testProps.setProperty("TEST_NAME", "DEFAULT_CIRCUIT_TEST"); - testProps.setProperty(MessagingTestConfigProperties.SEND_DESTINATION_NAME_ROOT_PROPNAME, "testqueue"); - - // Get the test circuit factory to create test circuits and run the standard test procedure through. - CircuitFactory circuitFactory = getCircuitFactory(); - - // Create the test circuit. This projects the circuit onto the available test nodes and connects it up. - Circuit testCircuit = circuitFactory.createCircuit(testProps); - - // Store the test configuration for the thread. - PerThreadSetup setup = new PerThreadSetup(); - setup.testCircuit = testCircuit; - threadSetup.set(setup); - } - - /** - * Called when a test thread is destroyed. - */ - public void threadTearDown() - { } - - /** - * Holds the per-thread test configurations. - */ - protected static class PerThreadSetup - { - /** Holds the test circuit to run tests on. */ - Circuit testCircuit; - } - - /** - * Compiles all the tests in this class into a suite. - * - * @return The test suite. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Qpid Throughput Performance Tests"); - - suite.addTest(new QpidTestThroughputPerf("testThroughput")); - - return suite; - } -} diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java deleted file mode 100644 index 6c7f22c19a..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.ObjectMessage; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; - -import uk.co.thebadgerset.junit.extensions.TimingController; -import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; - -/** - * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller - * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test - * that it extends because it can output timings as replies are received, rather than waiting until all expected replies - * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending - * and recieving clients working asynchronously. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Send many ping messages and output timings asynchronously on batches received. - *
- */ -public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware -{ - private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); - - /** Holds the name of the property to get the test results logging batch size. */ - public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; - - /** Holds the default test results logging batch size. */ - public static final int TEST_RESULTS_BATCH_SIZE_DEFAULT = 1000; - - /** Used to hold the timing controller passed from the test runner. */ - private TimingController _timingController; - - /** Used to generate unique correlation ids for each test run. */ - private AtomicLong corellationIdGenerator = new AtomicLong(); - - /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ - private Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); - - /** Holds the batched results listener, that does logging on batch boundaries. */ - private BatchedResultsListener batchedResultsListener = null; - - /** - * Creates a new asynchronous ping performance test with the specified name. - * - * @param name The test name. - */ - public PingAsyncTestPerf(String name) - { - super(name); - - // Sets up the test parameters with defaults. - testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, - Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT)); - } - - /** - * Compile all the tests into a test suite. - * @return The test suite to run. Should only contain testAsyncPingOk method. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); - - return suite; - } - - /** - * Accepts a timing controller from the test runner. - * - * @param timingController The timing controller to register mutliple timings with. - */ - public void setTimingController(TimingController timingController) - { - _timingController = timingController; - } - - /** - * Gets the timing controller passed in by the test runner. - * - * @return The timing controller passed in by the test runner. - */ - public TimingController getTimingController() - { - return _timingController; - } - - /** - * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until - * all replies have been received or a time out occurs before exiting this method. - * - * @param numPings The number of pings to send. - * @throws Exception pass all errors out to the test harness - */ - public void testAsyncPingOk(int numPings) throws Exception - { - // _logger.debug("public void testAsyncPingOk(int numPings): called"); - - // Ensure that at least one ping was requeusted. - if (numPings == 0) - { - _logger.error("Number of pings requested was zero."); - fail("Number of pings requested was zero."); - } - - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - PingClient pingClient = perThreadSetup._pingClient; - - // Advance the correlation id of messages to send, to make it unique for this run. - perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet()); - // String messageCorrelationId = perThreadSetup._correlationId; - // _logger.debug("messageCorrelationId = " + messageCorrelationId); - - // Initialize the count and timing controller for the new correlation id. - PerCorrelationId perCorrelationId = new PerCorrelationId(); - TimingController tc = getTimingController().getControllerForCurrentThread(); - perCorrelationId._tc = tc; - perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); - perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); - - // Send the requested number of messages, and wait until they have all been received. - long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId); - - // Check that all the replies were received and log a fail if they were not. - if (numReplies < perCorrelationId._expectedCount) - { - perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount); - } - - // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. - perCorrelationIds.remove(perThreadSetup._correlationId); - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - _logger.debug("public void threadSetUp(): called"); - - try - { - // Call the set up method in the super class. This creates a PingClient pinger. - super.threadSetUp(); - - // Create the chained message listener, only if it has not already been created. This is set up with the - // batch size property, to tell it what batch size to output results on. A synchronized block is used to - // ensure that only one thread creates this. - synchronized (this) - { - if (batchedResultsListener == null) - { - int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); - batchedResultsListener = new BatchedResultsListener(batchSize); - } - } - - // Get the set up that the super class created. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Register the chained message listener on the pinger to do its asynchronous test timings from. - perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the - * pinger, in order to receive notifications about every message received and the number remaining to be - * received. Whenever the number remaining crosses a batch size boundary this results listener outputs - * a test timing for the actual number of messages received in the current batch. - */ - private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener - { - /** The test results logging batch size. */ - int _batchSize; - - /** - * Creates a results listener on the specified batch size. - * - * @param batchSize The batch size to use. - */ - public BatchedResultsListener(int batchSize) - { - _batchSize = batchSize; - } - - /** - * This callback method is called from all of the pingers that this test creates. It uses the correlation id - * from the message to identify the timing controller for the test thread that was responsible for sending those - * messages. - * - * @param message The message. - * @param remainingCount The count of messages remaining to be received with a particular correlation id. - * - * @throws JMSException Any underlying JMSException is allowed to fall through. - */ - public void onMessage(Message message, int remainingCount) throws JMSException - { - // Check if a batch boundary has been crossed. - if ((remainingCount % _batchSize) == 0) - { - // Extract the correlation id from the message. - String correlationId = message.getJMSCorrelationID(); - - /*_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount - + "): called on batch boundary for message id: " + correlationId + " with thread id: " - + Thread.currentThread().getId());*/ - - // Get the details for the correlation id and check that they are not null. They can become null - // if a test times out. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); - if (perCorrelationId != null) - { - // Get the timing controller and expected count for this correlation id. - TimingController tc = perCorrelationId._tc; - int expected = perCorrelationId._expectedCount; - - // Calculate how many messages were actually received in the last batch. This will be the batch size - // except where the number expected is not a multiple of the batch size and this is the first remaining - // count to cross a batch size boundary, in which case it will be the number expected modulo the batch - // size. - int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; - - // Register a test result for the correlation id. - try - { - tc.completeTest(true, receivedInBatch); - } - catch (InterruptedException e) - { - // Ignore this. It means the test runner wants to stop as soon as possible. - _logger.warn("Got InterruptedException.", e); - } - } - // Else ignore, test timed out. Should log a fail here? - } - } - } - - /** - * Holds state specific to each correlation id, needed to output test results. This consists of the count of - * the total expected number of messages, and the timing controller for the thread sending those message ids. - */ - private static class PerCorrelationId - { - public int _expectedCount; - public TimingController _tc; - } -} diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java deleted file mode 100644 index af612d5430..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.ObjectMessage; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.message.AMQMessage; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.requestreply.PingPongProducer; - -import uk.co.thebadgerset.junit.extensions.TimingController; -import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; - -/** - * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing - * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for - * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from - * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than - * waiting until all expected replies are received. - * - *

This test does not output timings for every single ping message, as when running at high volume, writing the test - * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The - * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the - * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. - * - *

The size parameter logged for each individual ping is set to the size of the batch of messages that the - * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput - * (messages / time) can be calculated in order to examine the relationship between throughput and latency. - * - *

CRC Card
Responsibilities Collaborations
Send many ping - * messages and output timings for sampled individual pings.
- */ -public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware -{ - private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class); - - /** Holds the name of the property to get the test results logging batch size. */ - public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; - - /** Holds the default test results logging batch size. */ - public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; - - /** Used to hold the timing controller passed from the test runner. */ - private TimingController _timingController; - - /** Used to generate unique correlation ids for each test run. */ - private AtomicLong corellationIdGenerator = new AtomicLong(); - - /** - * Holds test specifics by correlation id. This consists of the expected number of messages and the timing - * controler. - */ - private Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); - - /** Holds the batched results listener, that does logging on batch boundaries. */ - private BatchedResultsListener batchedResultsListener = null; - - /** - * Creates a new asynchronous ping performance test with the specified name. - * - * @param name The test name. - */ - public PingLatencyTestPerf(String name) - { - super(name); - - // Sets up the test parameters with defaults. - ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, - Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); - } - - /** Compile all the tests into a test suite. */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Latency Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingLatencyTestPerf("testPingLatency")); - - return suite; - } - - /** - * Accepts a timing controller from the test runner. - * - * @param timingController The timing controller to register mutliple timings with. - */ - public void setTimingController(TimingController timingController) - { - _timingController = timingController; - } - - /** - * Gets the timing controller passed in by the test runner. - * - * @return The timing controller passed in by the test runner. - */ - public TimingController getTimingController() - { - return _timingController; - } - - /** - * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all - * replies have been received or a time out occurs before exiting this method. - * - * @param numPings The number of pings to send. - */ - public void testPingLatency(int numPings) throws Exception - { - _logger.debug("public void testPingLatency(int numPings): called"); - - // Ensure that at least one ping was requeusted. - if (numPings == 0) - { - _logger.error("Number of pings requested was zero."); - } - - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - PingClient pingClient = perThreadSetup._pingClient; - - // Advance the correlation id of messages to send, to make it unique for this run. - String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); - _logger.debug("messageCorrelationId = " + messageCorrelationId); - - // Initialize the count and timing controller for the new correlation id. - PerCorrelationId perCorrelationId = new PerCorrelationId(); - TimingController tc = getTimingController().getControllerForCurrentThread(); - perCorrelationId._tc = tc; - perCorrelationId._expectedCount = numPings; - perCorrelationIds.put(messageCorrelationId, perCorrelationId); - - // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these - // messages. - pingClient.setChainedMessageListener(batchedResultsListener); - - // Generate a sample message of the specified size. - Message msg = - pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // Send the requested number of messages, and wait until they have all been received. - long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null); - - // Check that all the replies were received and log a fail if they were not. - if (numReplies < numPings) - { - tc.completeTest(false, 0); - } - - // Remove the chained message listener from the ping producer. - pingClient.removeChainedMessageListener(); - - // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. - perCorrelationIds.remove(messageCorrelationId); - } - - /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ - public void threadSetUp() - { - _logger.debug("public void threadSetUp(): called"); - - try - { - // Call the set up method in the super class. This creates a PingClient pinger. - super.threadSetUp(); - - // Create the chained message listener, only if it has not already been created. This is set up with the - // batch size property, to tell it what batch size to output results on. A synchronized block is used to - // ensure that only one thread creates this. - synchronized (this) - { - if (batchedResultsListener == null) - { - int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); - batchedResultsListener = new BatchedResultsListener(batchSize); - } - } - - // Get the set up that the super class created. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Register the chained message listener on the pinger to do its asynchronous test timings from. - perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can - * be attached to the pinger, in order to receive notifications about every message received and the number - * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener - * outputs a test timing for the actual number of messages received in the current batch. - */ - private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener - { - /** The test results logging batch size. */ - int _batchSize; - private boolean _strictAMQP; - - /** - * Creates a results listener on the specified batch size. - * - * @param batchSize The batch size to use. - */ - public BatchedResultsListener(int batchSize) - { - _batchSize = batchSize; - _strictAMQP = - Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, - AMQSession.STRICT_AMQP_DEFAULT)); - } - - /** - * This callback method is called from all of the pingers that this test creates. It uses the correlation id - * from the message to identify the timing controller for the test thread that was responsible for sending those - * messages. - * - * @param message The message. - * @param remainingCount The count of messages remaining to be received with a particular correlation id. - * - * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. - */ - public void onMessage(Message message, int remainingCount) throws JMSException - { - _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); - - // Check if a batch boundary has been crossed. - if ((remainingCount % _batchSize) == 0) - { - // Extract the correlation id from the message. - String correlationId = message.getJMSCorrelationID(); - - // Get the details for the correlation id and check that they are not null. They can become null - // if a test times out. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); - if (perCorrelationId != null) - { - // Get the timing controller and expected count for this correlation id. - TimingController tc = perCorrelationId._tc; - int expected = perCorrelationId._expectedCount; - - // Extract the send time from the message and work out from the current time, what the ping latency was. - // The ping producer time stamps messages in nanoseconds. - long startTime; - - if (_strictAMQP) - { - Long value = - ((AMQMessage) message).getTimestampProperty(new AMQShortString( - PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME)); - - startTime = ((value == null) ? 0L : value); - } - else - { - startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); - } - - long now = System.nanoTime(); - long pingTime = now - startTime; - - // Calculate how many messages were actually received in the last batch. This will be the batch size - // except where the number expected is not a multiple of the batch size and this is the first remaining - // count to cross a batch size boundary, in which case it will be the number expected modulo the batch - // size. - int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; - - // Register a test result for the correlation id. - try - { - - tc.completeTest(true, receivedInBatch, pingTime); - } - catch (InterruptedException e) - { - // Ignore this. It means the test runner wants to stop as soon as possible. - _logger.warn("Got InterruptedException.", e); - } - } - // Else ignore, test timed out. Should log a fail here? - } - } - } - - /** - * Holds state specific to each correlation id, needed to output test results. This consists of the count of the - * total expected number of messages, and the timing controller for the thread sending those message ids. - */ - private static class PerCorrelationId - { - public int _expectedCount; - public TimingController _tc; - } -} diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java deleted file mode 100644 index 46333db844..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import junit.framework.Assert; -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; - -import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; -import uk.co.thebadgerset.junit.extensions.TestThreadAware; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; - -import javax.jms.*; - -/** - * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times - * simultaneously to simluate many clients/producers/connections. - * - *

A single run of the test using the default JUnit test runner will result in the sending and timing of a single - * full round trip ping. This test may be scaled up using a suitable JUnit test runner. - * - *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a - * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, - * except if the connection is lost in which case an attempt to re-establish the setup is made. - * - *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that - * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the - * temporary queue. - * - *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- */ -public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware -{ - private static Logger _logger = Logger.getLogger(PingTestPerf.class); - - /** Thread local to hold the per-thread test setup fields. */ - ThreadLocal threadSetup = new ThreadLocal(); - - /** Holds a property reader to extract the test parameters from. */ - protected ParsedProperties testParameters = - TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); - - public PingTestPerf(String name) - { - super(name); - - _logger.debug("testParameters = " + testParameters); - } - - /** - * Compile all the tests into a test suite. - * @return The test method testPingOk. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingTestPerf("testPingOk")); - - return suite; - } - - public void testPingOk(int numPings) throws Exception - { - if (numPings == 0) - { - Assert.fail("Number of pings requested was zero."); - } - - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - - if (perThreadSetup == null) - { - Assert.fail("Could not get per thread test setup, it was null."); - } - - // Generate a sample message. This message is already time stamped and has its reply-to destination set. - Message msg = - perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // start the test - long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null); - - // Fail the test if the timeout was exceeded. - if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) - { - Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " - + numReplies); - } - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - _logger.debug("public void threadSetUp(): called"); - - try - { - PerThreadSetup perThreadSetup = new PerThreadSetup(); - - // This is synchronized because there is a race condition, which causes one connection to sleep if - // all threads try to create connection concurrently. - synchronized (this) - { - // Establish a client to ping a Destination and listen the reply back from same Destination - perThreadSetup._pingClient = new PingClient(testParameters); - perThreadSetup._pingClient.establishConnection(true, true); - } - // Start the client connection - perThreadSetup._pingClient.getConnection().start(); - - // Attach the per-thread set to the thread. - threadSetup.set(perThreadSetup); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * Performs test fixture clean - */ - public void threadTearDown() - { - _logger.debug("public void threadTearDown(): called"); - - try - { - // Get the per thread test fixture. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Close the pingers so that it cleans up its connection cleanly. - synchronized (this) - { - if ((perThreadSetup != null) && (perThreadSetup._pingClient != null)) - { - perThreadSetup._pingClient.close(); - } - } - } - catch (JMSException e) - { - _logger.warn("There was an exception during per thread tear down."); - } - finally - { - // Ensure the per thread fixture is reclaimed. - threadSetup.remove(); - } - } - - protected static class PerThreadSetup - { - /** - * Holds the test ping client. - */ - protected PingClient _pingClient; - protected String _correlationId; - } -} diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java deleted file mode 100644 index f4a6dc6554..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import javax.jms.*; - -import junit.framework.Assert; -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; - -/** - * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run - * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from - * a producer to a conumer, then the consumer replies to the message on a temporary queue. - * - *

A single run of the test using the default JUnit test runner will result in the sending and timing of the number - * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled - * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more - * information on how to do this. - * - *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a - * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads - * gets its own connection/producer/consumer, this is only re-established if the connection is lost. - * - *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that - * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come - * back on the temporary queue. - * - *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- */ -public class PingPongTestPerf extends AsymptoticTestCase -{ - private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); - - /** Thread local to hold the per-thread test setup fields. */ - ThreadLocal threadSetup = new ThreadLocal(); - - // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in - // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner - // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. - // private Properties testParameters = System.getProperties(); - private ParsedProperties testParameters = - TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); - - public PingPongTestPerf(String name) - { - super(name); - - _logger.debug(testParameters); - - // Sets up the test parameters with defaults. - /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, - Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, - Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT)); - testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, - PingPongProducer.PING_QUEUE_NAME_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, - Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, - Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, - Boolean.toString(PingPongProducer.VERBOSE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, - Boolean.toString(PingPongProducer.PUBSUB_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, - Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, - Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, - PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, - PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, - PingPongProducer.FAIL_AFTER_SEND_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, - PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, - Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, - Integer.toString(PingPongProducer.ACK_MODE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, - PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ - } - - /** - * Compile all the tests into a test suite. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping-Pong Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingPongTestPerf("testPingPongOk")); - - return suite; - } - - private static void setSystemPropertyIfNull(String propName, String propValue) - { - if (System.getProperty(propName) == null) - { - System.setProperty(propName, propValue); - } - } - - public void testPingPongOk(int numPings) throws Exception - { - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Generate a sample message. This message is already time stamped and has its reply-to destination set. - Message msg = - perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // Send the message and wait for a reply. - int numReplies = - perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null); - - // Fail the test if the timeout was exceeded. - if (numReplies != numPings) - { - Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings); - } - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - try - { - PerThreadSetup perThreadSetup = new PerThreadSetup(); - - // Extract the test set up paramaeters. - String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); - String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); - String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); - String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME); - String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); - boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); - boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); - String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); - boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); - boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME); - - synchronized (this) - { - // Establish a bounce back client on the ping queue to bounce back the pings. - perThreadSetup._testPingBouncer = - new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent, - transacted, selector, verbose, pubsub); - - // Start the connections for client and producer running. - perThreadSetup._testPingBouncer.getConnection().start(); - - // Establish a ping-pong client on the ping queue to send the pings and receive replies with. - perThreadSetup._testPingProducer = new PingPongProducer(testParameters); - perThreadSetup._testPingProducer.establishConnection(true, true); - perThreadSetup._testPingProducer.getConnection().start(); - } - - // Attach the per-thread set to the thread. - threadSetup.set(perThreadSetup); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * Performs test fixture clean - */ - public void threadTearDown() - { - _logger.debug("public void threadTearDown(): called"); - - try - { - // Get the per thread test fixture. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Close the pingers so that it cleans up its connection cleanly. - synchronized (this) - { - perThreadSetup._testPingProducer.close(); - // perThreadSetup._testPingBouncer.close(); - } - - // Ensure the per thread fixture is reclaimed. - threadSetup.remove(); - } - catch (JMSException e) - { - _logger.warn("There was an exception during per thread tear down."); - } - } - - protected static class PerThreadSetup - { - /** - * Holds the test ping-pong producer. - */ - private PingPongProducer _testPingProducer; - - /** - * Holds the test ping client. - */ - private PingPongBouncer _testPingBouncer; - } -} -- cgit v1.2.1