diff options
Diffstat (limited to 'java/perftests/src/test')
4 files changed, 537 insertions, 714 deletions
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index bd39ec34a1..e10e6353b7 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -1,311 +1,302 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.ping; -//import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -//import uk.co.thebadgerset.junit.extensions.TimingController; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.ObjectMessage; -import junit.framework.Assert; import junit.framework.Test; import junit.framework.TestSuite; + import org.apache.log4j.Logger; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.CountDownLatch; +import org.apache.qpid.requestreply.PingPongProducer; +import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -public class PingAsyncTestPerf extends PingTestPerf //implements TimingControllerAware +/** + * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller + * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test + * that it extends because it can output timings as replies are received, rather than waiting until all expected replies + * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending + * and recieving clients working asynchronously. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><td> Responsibilities <th> Collaborations + * <tr><td> Send many ping messages and output timings asynchronously on batches received. + * </table> + */ +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware { -// private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "BatchSize"; + + /** Holds the default test results logging batch size. */ + public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; -// private TimingController _timingController; + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; -// private AsyncMessageListener _listener; + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ + private Map<String, PerCorrelationId> perCorrelationIds = + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ public PingAsyncTestPerf(String name) { super(name); + + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until + * all replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + */ + public void testAsyncPingOk(int numPings) throws Exception + { + _logger.debug("public void testAsyncPingOk(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); + _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = numPings; + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these + // messages. + pingClient.setChainedMessageListener(batchedResultsListener); + + // Generate a sample message of the specified size. + ObjectMessage msg = + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < numPings) + { + tc.completeTest(false, 0); + } + + // Remove the chained message listener from the ping producer. + pingClient.removeChainedMessageListener(); + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(messageCorrelationId); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } } -// /** -// * Compile all the tests into a test suite. -// */ -// public static Test suite() -// { -// // Build a new test suite -// TestSuite suite = new TestSuite("Ping Performance Tests"); -// -// // Run performance tests in read committed mode. -// suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); -// -// return suite; -// } -// -// protected void setUp() throws Exception -// { -// // Create the test setups on a per thread basis, only if they have not already been created. -// -// if (threadSetup.get() == null) -// { -// PerThreadSetup perThreadSetup = new PerThreadSetup(); -// -// // Extract the test set up paramaeters. -// String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); -// String username = "guest"; -// String password = "guest"; -// String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); -// int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME)); -// String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME); -// boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); -// boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); -// String selector = null; -// boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); -// int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); -// int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); -// boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); -// -// -// boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); -// boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); -// boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); -// boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); -// boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); -// -// int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); -// int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE)); -// -// // This is synchronized because there is a race condition, which causes one connection to sleep if -// // all threads try to create connection concurrently -// synchronized (this) -// { -// // Establish a client to ping a Queue and listen the reply back from same Queue -// perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, -// destinationname, selector, transacted, persistent, -// messageSize, verbose, -// afterCommit, beforeCommit, afterSend, beforeSend, failOnce, -// commitbatchSize, destinationscount, rate, pubsub); -// } -// -// // Attach the per-thread set to the thread. -// threadSetup.set(perThreadSetup); -// -// _listener = new AsyncMessageListener(batchSize); -// -// perThreadSetup._pingItselfClient.setMessageListener(_listener); -// // Start the client connection -// perThreadSetup._pingItselfClient.getConnection().start(); -// -// } -// } -// -// -// public void testAsyncPingOk(int numPings) -// { -// _timingController = this.getTimingController(); -// -// _listener.setTotalMessages(numPings); -// -// PerThreadSetup perThreadSetup = threadSetup.get(); -// if (numPings == 0) -// { -// _logger.error("Number of pings requested was zero."); -// fail("Number of pings requested was zero."); -// } -// -// // Generate a sample message. This message is already time stamped and has its reply-to destination set. -// ObjectMessage msg = null; -// -// try -// { -// msg = perThreadSetup._pingItselfClient.getTestMessage(null, -// Integer.parseInt(testParameters.getProperty( -// MESSAGE_SIZE_PROPNAME)), -// Boolean.parseBoolean(testParameters.getProperty( -// PERSISTENT_MODE_PROPNAME))); -// } -// catch (JMSException e) -// { -// -// } -// -// // start the test -// long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); -// -// String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID()); -// -// try -// { -// _logger.debug("Sending messages"); -// -// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID); -// -// _logger.debug("All sent"); -// } -// catch (JMSException e) -// { -// e.printStackTrace(); -// Assert.fail("JMS Exception Received" + e); -// } -// catch (InterruptedException e) -// { -// e.printStackTrace(); -// } -// -// try -// { -// _logger.debug("Awating test finish"); -// -// perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, TimeUnit.MILLISECONDS); -// -// if (perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0) -// { -// _logger.error("Timeout occured"); -// } -// //Allow the time out to exit the loop. -// } -// catch (InterruptedException e) -// { -// //ignore -// _logger.error("Awaiting test end was interrupted."); -// -// } -// -// // Fail the test if the timeout was exceeded. -// int numReplies = numPings - (int) perThreadSetup._pingItselfClient.removeLock(correlationID).getCount(); -// -// _logger.info("Test Finished"); -// -// if (numReplies != numPings) -// { -// try -// { -// perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession()); -// } -// catch (JMSException e) -// { -// _logger.error("Error commiting received messages", e); -// } -// try -// { -// if (_timingController != null) -// { -// _logger.trace("Logging missing message count"); -// _timingController.completeTest(false, numPings - numReplies); -// } -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); -// } -// } -// -// public void setTimingController(TimingController timingController) -// { -// _timingController = timingController; -// } -// -// public TimingController getTimingController() -// { -// return _timingController; -// } -// -// -// private class AsyncMessageListener implements MessageListener -// { -// private volatile int _totalMessages; -// private int _batchSize; -// PerThreadSetup _perThreadSetup; -// -// public AsyncMessageListener(int batchSize) -// { -// this(batchSize, -1); -// } -// -// public AsyncMessageListener(int batchSize, int totalMessages) -// { -// _batchSize = batchSize; -// _totalMessages = totalMessages; -// _perThreadSetup = threadSetup.get(); -// } -// -// public void setTotalMessages(int newTotal) -// { -// _totalMessages = newTotal; -// } -// -// public void onMessage(Message message) -// { -// try -// { -// _logger.trace("Message Received"); -// -// CountDownLatch count = _perThreadSetup._pingItselfClient.getEndLock(message.getJMSCorrelationID()); -// -// if (count != null) -// { -// int messagesLeft = (int) count.getCount() - 1;// minus one as we haven't yet counted the current message -// -// if ((messagesLeft % _batchSize) == 0) -// { -// doDone(_batchSize); -// } -// else if (messagesLeft == 0) -// { -// doDone(_totalMessages % _batchSize); -// } -// } -// -// } -// catch (JMSException e) -// { -// _logger.warn("There was a JMSException", e); -// } -// -// } -// -// private void doDone(int messageCount) -// { -// _logger.trace("Messages received:" + messageCount); -// _logger.trace("Total Messages :" + _totalMessages); -// -// try -// { -// if (_timingController != null) -// { -// _timingController.completeTest(true, messageCount); -// } -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// } -// -// } + /** + * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the + * pinger, in order to receive notifications about every message received and the number remaining to be + * received. Whenever the number remaining crosses a batch size boundary this results listener outputs + * a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount) throws JMSException + { + _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); + + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + + tc.completeTest(true, receivedInBatch); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of + * the total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } } diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index fbc67881c2..c4e72f4bb6 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -1,7 +1,25 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.ping;
-import java.util.Properties;
-
import javax.jms.*;
import junit.framework.Assert;
@@ -10,169 +28,85 @@ import junit.framework.TestSuite; import org.apache.log4j.Logger;
+import org.apache.qpid.requestreply.PingPongProducer;
+
import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.TestThreadAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
/**
* PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times
* simultaneously to simluate many clients/producers/connections.
- * <p/>
+ *
* <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single
* full round trip ping. This test may be scaled up using a suitable JUnit test runner.
- * <p/>
+ *
* <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
* temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run,
* except if the connection is lost in which case an attempt to re-establish the setup is made.
- * <p/>
+ *
* <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
* is the name of the temporary queue, fires off a message on the original queue and waits for a response on the
* temporary queue.
- * <p/>
+ *
* <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
- * <p/>
+ *
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* </table>
*
* @author Rupert Smith
*/
-public class PingTestPerf extends AsymptoticTestCase //implements TimingControllerAware
+public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
{
private static Logger _logger = Logger.getLogger(PingTestPerf.class);
- /**
- * Holds the name of the property to get the test message size from.
- */
- protected static final String MESSAGE_SIZE_PROPNAME = "messagesize";
-
- /**
- * Holds the name of the property to get the ping queue name from.
- */
- protected static final String PING_DESTINATION_NAME_PROPNAME = "destinationname";
-
- /**
- * holds the queue count, if the test is being performed with multiple queues
- */
- protected static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount";
-
- /**
- * Holds the name of the property to get the test delivery mode from.
- */
- protected static final String PERSISTENT_MODE_PROPNAME = "persistent";
-
- /**
- * Holds the name of the property to get the test transactional mode from.
- */
- protected static final String TRANSACTED_PROPNAME = "transacted";
-
- /**
- * Holds the name of the property to get the test broker url from.
- */
- protected static final String BROKER_PROPNAME = "broker";
-
- /**
- * Holds the name of the property to get the test broker virtual path.
- */
- protected static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
-
- /**
- * Holds the name of the property to get the waiting timeout for response messages.
- */
- protected static final String TIMEOUT_PROPNAME = "timeout";
-
- /** Holds the name of the property to get the message rate from. */
- protected static final String RATE_PROPNAME = "rate";
-
- protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
-
- /** Holds the true or false depending on wether it is P2P test or PubSub */
- protected static final String IS_PUBSUB_PROPNAME = "pubsub";
- /**
- * Holds the size of message body to attach to the ping messages.
- */
- protected static final int MESSAGE_SIZE_DEFAULT = 1024;
-
- protected static final int BATCH_SIZE_DEFAULT = 1000;
-
- protected static final int COMMIT_BATCH_SIZE_DEFAULT = BATCH_SIZE_DEFAULT;
-
- /**
- * Holds the name of the queue to which pings are sent.
- */
- private static final String PING_DESTINATION_NAME_DEFAULT = "ping";
-
- /**
- * Holds the message delivery mode to use for the test.
- */
- protected static final boolean PERSISTENT_MODE_DEFAULT = false;
-
- /**
- * Holds the transactional mode to use for the test.
- */
- protected static final boolean TRANSACTED_DEFAULT = false;
-
- /**
- * Holds the default broker url for the test.
- */
- protected static final String BROKER_DEFAULT = "tcp://localhost:5672";
-
- /**
- * Holds the default virtual path for the test.
- */
- protected static final String VIRTUAL_PATH_DEFAULT = "/test";
-
- /**
- * Sets a default ping timeout.
- */
- protected static final long TIMEOUT_DEFAULT = 3000;
-
- /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */
- private static final int RATE_DEFAULT = 0;
-
- protected static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
- protected static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
- protected static final String FAIL_AFTER_SEND = "FailAfterSend";
- protected static final String FAIL_BEFORE_SEND = "FailBeforeSend";
- protected static final String COMMIT_BATCH_SIZE = "CommitBatchSize";
- protected static final String BATCH_SIZE = "BatchSize";
- protected static final String FAIL_ONCE = "FailOnce";
-
- /**
- * Thread local to hold the per-thread test setup fields.
- */
+ /** Thread local to hold the per-thread test setup fields. */
ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
- Object _lock = new Object();
-
- // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
- // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
- // of the test parameters to log with the results.
- protected Properties testParameters = System.getProperties();
- //private Properties testParameters = new ContextualProperties(System.getProperties());
+ /** Holds a property reader to extract the test parameters from. */
+ protected ParsedProperties testParameters = new ParsedProperties(System.getProperties());
public PingTestPerf(String name)
{
super(name);
- // Sets up the test parameters with defaults.
- setSystemPropertyIfNull(FAIL_AFTER_COMMIT, "false");
- setSystemPropertyIfNull(FAIL_BEFORE_COMMIT, "false");
- setSystemPropertyIfNull(FAIL_AFTER_SEND, "false");
- setSystemPropertyIfNull(FAIL_BEFORE_SEND, "false");
- setSystemPropertyIfNull(FAIL_ONCE, "true");
-
- setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT));
- setSystemPropertyIfNull(COMMIT_BATCH_SIZE, Integer.toString(COMMIT_BATCH_SIZE_DEFAULT));
- setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
- setSystemPropertyIfNull(PING_DESTINATION_NAME_PROPNAME, PING_DESTINATION_NAME_DEFAULT);
- setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
- setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
- setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
- setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
- setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT));
- setSystemPropertyIfNull(PING_DESTINATION_COUNT_PROPNAME, Integer.toString(0));
- setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false));
- setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
- setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false));
+ // Sets up the test parameters with defaults.
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+ PingPongProducer.DEFAULT_PING_DESTINATION_NAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_VERBOSE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_RATE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PUBSUB));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME,
+ Long.toString(PingPongProducer.DEFAULT_TIMEOUT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
}
/**
@@ -187,20 +121,6 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll suite.addTest(new PingTestPerf("testPingOk"));
return suite;
- //return new junit.framework.TestSuite(PingTestPerf.class);
- }
-
- protected static void setSystemPropertyIfNull(String propName, String propValue)
- {
- if (System.getProperty(propName) == null)
- {
- System.setProperty(propName, propValue);
- }
- }
-
- public void testPing(int jim) throws Exception
- {
- testPingOk(1);
}
public void testPingOk(int numPings) throws Exception
@@ -214,15 +134,15 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll // Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
- perThreadSetup._pingItselfClient.getTestMessage(null,
- Integer.parseInt(testParameters.getProperty(
- MESSAGE_SIZE_PROPNAME)),
- Boolean.parseBoolean(testParameters.getProperty(
- PERSISTENT_MODE_PROPNAME)));
+ perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(
+ PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(
+ PingPongProducer.PERSISTENT_MODE_PROPNAME));
// start the test
- long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
- int numReplies = perThreadSetup._pingItselfClient.pingAndWaitForReply(msg, numPings, timeout);
+ long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+ int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
// Fail the test if the timeout was exceeded.
if (numReplies != numPings)
@@ -232,75 +152,87 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll }
}
-
- protected void setUp() throws Exception
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
{
- // Log4j will propagate the test name as a thread local in all log output.
- // Carefull when using this, it can cause memory leaks when not cleaned up properly.
- //NDC.push(getName());
+ _logger.debug("public void threadSetUp(): called");
- // Create the test setups on a per thread basis, only if they have not already been created.
-
- if (threadSetup.get() == null)
+ try
{
PerThreadSetup perThreadSetup = new PerThreadSetup();
// Extract the test set up paramaeters.
- String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
- String username = "guest";
- String password = "guest";
- String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
- int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME));
- String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME);
- boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
- boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
- String selector = null;
- boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
- int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
- int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
- boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
-
- boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
- boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
- boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
- boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
- boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
+ String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
+ String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
+ String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+ String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME);
+ String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
+ boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
+ boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
+ String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
+ boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME);
+ int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
+ int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
+ boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME);
+ boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
+ boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
+ boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
+ boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
+ int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
+ Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
- int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
+ // Extract the test set up paramaeters.
+ int destinationscount =
+ Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
// This is synchronized because there is a race condition, which causes one connection to sleep if
- // all threads try to create connection concurrently
- synchronized (_lock)
+ // all threads try to create connection concurrently.
+ synchronized (this)
{
// Establish a client to ping a Destination and listen the reply back from same Destination
- perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
- destinationname, selector, transacted, persistent,
- messageSize, verbose, afterCommit, beforeCommit,
- afterSend, beforeSend, failOnce, batchSize, destinationscount,
- rate, pubsub);
+ perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName,
+ selector, transacted, persistent, messageSize, verbose,
+ failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
+ failOnce, batchSize, destinationscount, rate, pubsub);
}
// Start the client connection
- perThreadSetup._pingItselfClient.getConnection().start();
+ perThreadSetup._pingClient.getConnection().start();
// Attach the per-thread set to the thread.
threadSetup.set(perThreadSetup);
}
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
}
- protected void tearDown() throws Exception
+ /**
+ * Performs test fixture clean
+ */
+ public void threadTearDown()
{
+ _logger.debug("public void threadTearDown(): called");
+
try
{
- /*
- if ((_pingItselfClient != null) && (_pingItselfClient.getConnection() != null))
+ // Get the per thread test fixture.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Close the pingers so that it cleans up its connection cleanly.
+ synchronized (this)
{
- _pingItselfClient.getConnection().close();
+ perThreadSetup._pingClient.close();
}
- */
+
+ // Ensure the per thread fixture is reclaimed.
+ threadSetup.remove();
}
- finally
+ catch (JMSException e)
{
- //NDC.pop();
+ _logger.warn("There was an exception during per thread tear down.");
}
}
@@ -309,6 +241,6 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll /**
* Holds the test ping client.
*/
- protected TestPingItself _pingItselfClient;
+ protected PingClient _pingClient;
}
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java deleted file mode 100644 index 274c8b5fc8..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.apache.qpid.ping;
-
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
-
-/**
- * Tests the {@link Throttle} implementation. Test timings can be taken using this test class to confirm that the
- * throttle works as it should, and what the maximum rate is that it works reliably.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Enable test timings to be taken to confirm that the throttle works at the correct rate.
- * </table>
- *
- * @author Rupert Smith
- */
-public class ThrottleTestPerf extends AsymptoticTestCase
-{
- ThreadLocal<Throttle> threadSetup = new ThreadLocal<Throttle>();
-
- public ThrottleTestPerf(String name)
- {
- super(name);
- }
-
- /**
- * Compile all the tests into a test suite.
- */
- public static Test suite()
- {
- // Build a new test suite
- TestSuite suite = new TestSuite("Ping-Pong Performance Tests");
-
- // Run performance tests in read committed mode.
- suite.addTest(new ThrottleTestPerf("testThrottle"));
-
- return suite;
- }
-
- public void testThrottle(int opsPerSecond)
- {
- Throttle throttle = threadSetup.get();
-
- // Setting this on every test call won't cause any harm, convenient to use the size parameter for this.
- throttle.setRate(opsPerSecond);
-
- // Run the test at the throttled rate, do this for the num of opsPerSecond, then every test should take 1 second.
- for (int i = 0; i < opsPerSecond; i++)
- {
- throttle.throttle();
- }
- }
-
- protected void setUp()
- {
- if (threadSetup.get() == null)
- {
- threadSetup.set(new Throttle());
- }
- }
-}
diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java index fca133c425..81967d332a 100644 --- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -1,7 +1,25 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.requestreply;
-import java.util.Properties;
-
import javax.jms.*;
import junit.framework.Assert;
@@ -11,149 +29,87 @@ import junit.framework.TestSuite; import org.apache.log4j.Logger;
import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
/**
* PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
* many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from
* a producer to a conumer, then the consumer replies to the message on a temporary queue.
- * <p/>
+ *
* <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number
* of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled
- * up using a suitable JUnit test runner. See {@link TKTestRunner} or {@link PPTestRunner} for more information on how
- * to do this.
- * <p/>
+ * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more
+ * information on how to do this.
+ *
* <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
* temporary queue for replies. This setup is only established once for all the test repeats, but each test threads
* gets its own connection/producer/consumer, this is only re-established if the connection is lost.
- * <p/>
+ *
* <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
* is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come
* back on the temporary queue.
- * <p/>
+ *
* <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
- * <p/>
+ *
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* </table>
*
* @author Rupert Smith
*/
-public class PingPongTestPerf extends AsymptoticTestCase //implements TimingControllerAware
+public class PingPongTestPerf extends AsymptoticTestCase
{
private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
- /**
- * Holds the name of the property to get the test message size from.
- */
- private static final String MESSAGE_SIZE_PROPNAME = "messagesize";
-
- /**
- * Holds the name of the property to get the ping queue name from.
- */
- private static final String PING_QUEUE_NAME_PROPNAME = "destinationname";
-
- /**
- * Holds the name of the property to get the test delivery mode from.
- */
- private static final String PERSISTENT_MODE_PROPNAME = "persistent";
-
- /**
- * Holds the name of the property to get the test transactional mode from.
- */
- private static final String TRANSACTED_PROPNAME = "transacted";
-
- /**
- * Holds the name of the property to get the test broker url from.
- */
- private static final String BROKER_PROPNAME = "broker";
-
- /**
- * Holds the name of the property to get the test broker virtual path.
- */
- private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
-
- /**
- * Holds the size of message body to attach to the ping messages.
- */
- private static final int MESSAGE_SIZE_DEFAULT = 0;
-
- private static final int BATCH_SIZE_DEFAULT = 2;
-
- /**
- * Holds the name of the queue to which pings are sent.
- */
- private static final String PING_QUEUE_NAME_DEFAULT = "ping";
-
- /**
- * Holds the message delivery mode to use for the test.
- */
- private static final boolean PERSISTENT_MODE_DEFAULT = false;
-
- /**
- * Holds the transactional mode to use for the test.
- */
- private static final boolean TRANSACTED_DEFAULT = false;
-
- /**
- * Holds the default broker url for the test.
- */
- private static final String BROKER_DEFAULT = "tcp://localhost:5672";
-
- /**
- * Holds the default virtual path for the test.
- */
- private static final String VIRTUAL_PATH_DEFAULT = "/test";
-
- /**
- * Sets a default ping timeout.
- */
- private static final long TIMEOUT = 15000;
-
- /** Holds the name of the property to get the message rate from. */
- private static final String RATE_PROPNAME = "rate";
-
- private static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
-
- /** Holds the true or false depending on wether it is P2P test or PubSub */
- private static final String IS_PUBSUB_PROPNAME = "pubsub";
-
- /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */
- private static final int RATE_DEFAULT = 0;
-
- private static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
- private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
- private static final String FAIL_AFTER_SEND = "FailAfterSend";
- private static final String FAIL_BEFORE_SEND = "FailBeforeSend";
- private static final String BATCH_SIZE = "BatchSize";
- private static final String FAIL_ONCE = "FailOnce";
-
- /**
- * Thread local to hold the per-thread test setup fields.
- */
+ /** Thread local to hold the per-thread test setup fields. */
ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
- Object _lock = new Object();
// Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
// the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
// of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
- private Properties testParameters = System.getProperties();
- //private Properties testParameters = new ContextualProperties(System.getProperties());
+ //private Properties testParameters = System.getProperties();
+ private ParsedProperties testParameters = new ParsedProperties(System.getProperties());
public PingPongTestPerf(String name)
{
super(name);
// Sets up the test parameters with defaults.
- setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT));
- setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
- setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
- setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
- setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
- setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
- setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
- setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false));
- setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT));
- setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+ PingPongProducer.DEFAULT_PING_DESTINATION_NAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_VERBOSE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_RATE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PUBSUB));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME,
+ Long.toString(PingPongProducer.DEFAULT_TIMEOUT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
}
/**
@@ -185,19 +141,15 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont // Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
- perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestination(),
- Integer.parseInt(testParameters.getProperty(
- MESSAGE_SIZE_PROPNAME)),
- Boolean.parseBoolean(testParameters.getProperty(
- PERSISTENT_MODE_PROPNAME)));
-
- // Use the test timing controller to reset the test timer now and obtain the current time.
- // This can be used to remove the message creation time from the test.
- //TestTimingController timingUtils = getTimingController();
- //long startTime = timingUtils.restart();
+ perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(
+ PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(
+ PingPongProducer.PERSISTENT_MODE_PROPNAME));
// Send the message and wait for a reply.
- int numReplies = perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, TIMEOUT);
+ int numReplies =
+ perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.DEFAULT_TIMEOUT);
// Fail the test if the timeout was exceeded.
if (numReplies != numPings)
@@ -206,82 +158,93 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont }
}
- protected void setUp() throws Exception
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
{
- // Log4j will propagate the test name as a thread local in all log output.
- // Carefull when using this, it can cause memory leaks when not cleaned up properly.
- //NDC.push(getName());
-
- // Create the test setups on a per thread basis, only if they have not already been created.
- if (threadSetup.get() == null)
+ try
{
PerThreadSetup perThreadSetup = new PerThreadSetup();
// Extract the test set up paramaeters.
- String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
- String username = "guest";
- String password = "guest";
- String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
- String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
- boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
- boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
- String selector = null;
- boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
- int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
- int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
- boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME));
-
- boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
- boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
- boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
- boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
- int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
- Boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
-
- synchronized(_lock)
+ String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
+ String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
+ String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+ String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME);
+ String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
+ boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
+ boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
+ String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
+ boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME);
+ int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
+ int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
+ boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME);
+ boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
+ boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
+ boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
+ boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
+ int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
+ Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
+
+ synchronized (this)
{
// Establish a bounce back client on the ping queue to bounce back the pings.
- perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath,
- queueName, persistent, transacted, selector, verbose, pubsub);
+ perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualPath,
+ destinationName, persistent, transacted, selector,
+ verbose, pubsub);
// Start the connections for client and producer running.
perThreadSetup._testPingBouncer.getConnection().start();
// Establish a ping-pong client on the ping queue to send the pings with.
- perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath,
- queueName, selector, transacted, persistent, messageSize,
- verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, 0, rate, pubsub);
+ perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualPath,
+ destinationName, selector, transacted, persistent,
+ messageSize, verbose, failAfterCommit,
+ failBeforeCommit, failAfterSend, failBeforeSend,
+ failOnce, batchSize, 0, rate, pubsub);
perThreadSetup._testPingProducer.getConnection().start();
}
// Attach the per-thread set to the thread.
threadSetup.set(perThreadSetup);
}
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
}
- protected void tearDown() throws Exception
+ /**
+ * Performs test fixture clean
+ */
+ public void threadTearDown()
{
+ _logger.debug("public void threadTearDown(): called");
+
try
{
- /**if ((_testPingBouncer != null) && (_testPingBouncer.getConnection() != null))
- {
- _testPingBouncer.getConnection().close();
- }
-
- if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null))
- {
- _testPingProducer.getConnection().close();
- }*/
+ // Get the per thread test fixture.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Close the pingers so that it cleans up its connection cleanly.
+ synchronized (this)
+ {
+ perThreadSetup._testPingProducer.close();
+ //perThreadSetup._testPingBouncer.close();
+ }
+
+ // Ensure the per thread fixture is reclaimed.
+ threadSetup.remove();
}
- finally
+ catch (JMSException e)
{
- //NDC.pop();
+ _logger.warn("There was an exception during per thread tear down.");
}
}
- private static class PerThreadSetup
+ protected static class PerThreadSetup
{
/**
* Holds the test ping-pong producer.
|
