From f3dc157e59ec686e42334bb2f6bae3c1f97b2daf Mon Sep 17 00:00:00 2001 From: Robert Greig Date: Tue, 30 Jan 2007 16:40:20 +0000 Subject: (Submitted by Rupert Smith) Ping tests refactored. Unused ping test classes removed. JUnit-toolkit 0.5-SNAPSHOT added to the build. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@501455 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/ping/PingAsyncTestPerf.java | 555 ++++++++++----------- .../java/org/apache/qpid/ping/PingTestPerf.java | 332 +++++------- .../org/apache/qpid/ping/ThrottleTestPerf.java | 63 --- .../apache/qpid/requestreply/PingPongTestPerf.java | 301 +++++------ 4 files changed, 537 insertions(+), 714 deletions(-) delete mode 100644 java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java (limited to 'java/perftests/src/test') diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index bd39ec34a1..e10e6353b7 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -1,311 +1,302 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.ping; -//import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -//import uk.co.thebadgerset.junit.extensions.TimingController; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.ObjectMessage; -import junit.framework.Assert; import junit.framework.Test; import junit.framework.TestSuite; + import org.apache.log4j.Logger; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.CountDownLatch; +import org.apache.qpid.requestreply.PingPongProducer; +import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -public class PingAsyncTestPerf extends PingTestPerf //implements TimingControllerAware +/** + * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller + * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test + * that it extends because it can output timings as replies are received, rather than waiting until all expected replies + * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending + * and recieving clients working asynchronously. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Send many ping messages and output timings asynchronously on batches received. + *
+ */ +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware { -// private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "BatchSize"; + + /** Holds the default test results logging batch size. */ + public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; -// private TimingController _timingController; + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; -// private AsyncMessageListener _listener; + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ + private Map perCorrelationIds = + Collections.synchronizedMap(new HashMap()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ public PingAsyncTestPerf(String name) { super(name); + + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until + * all replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + */ + public void testAsyncPingOk(int numPings) throws Exception + { + _logger.debug("public void testAsyncPingOk(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); + _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = numPings; + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these + // messages. + pingClient.setChainedMessageListener(batchedResultsListener); + + // Generate a sample message of the specified size. + ObjectMessage msg = + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < numPings) + { + tc.completeTest(false, 0); + } + + // Remove the chained message listener from the ping producer. + pingClient.removeChainedMessageListener(); + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(messageCorrelationId); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } } -// /** -// * Compile all the tests into a test suite. -// */ -// public static Test suite() -// { -// // Build a new test suite -// TestSuite suite = new TestSuite("Ping Performance Tests"); -// -// // Run performance tests in read committed mode. -// suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); -// -// return suite; -// } -// -// protected void setUp() throws Exception -// { -// // Create the test setups on a per thread basis, only if they have not already been created. -// -// if (threadSetup.get() == null) -// { -// PerThreadSetup perThreadSetup = new PerThreadSetup(); -// -// // Extract the test set up paramaeters. -// String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); -// String username = "guest"; -// String password = "guest"; -// String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); -// int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME)); -// String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME); -// boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); -// boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); -// String selector = null; -// boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); -// int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); -// int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); -// boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); -// -// -// boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); -// boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); -// boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); -// boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); -// boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); -// -// int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); -// int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE)); -// -// // This is synchronized because there is a race condition, which causes one connection to sleep if -// // all threads try to create connection concurrently -// synchronized (this) -// { -// // Establish a client to ping a Queue and listen the reply back from same Queue -// perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, -// destinationname, selector, transacted, persistent, -// messageSize, verbose, -// afterCommit, beforeCommit, afterSend, beforeSend, failOnce, -// commitbatchSize, destinationscount, rate, pubsub); -// } -// -// // Attach the per-thread set to the thread. -// threadSetup.set(perThreadSetup); -// -// _listener = new AsyncMessageListener(batchSize); -// -// perThreadSetup._pingItselfClient.setMessageListener(_listener); -// // Start the client connection -// perThreadSetup._pingItselfClient.getConnection().start(); -// -// } -// } -// -// -// public void testAsyncPingOk(int numPings) -// { -// _timingController = this.getTimingController(); -// -// _listener.setTotalMessages(numPings); -// -// PerThreadSetup perThreadSetup = threadSetup.get(); -// if (numPings == 0) -// { -// _logger.error("Number of pings requested was zero."); -// fail("Number of pings requested was zero."); -// } -// -// // Generate a sample message. This message is already time stamped and has its reply-to destination set. -// ObjectMessage msg = null; -// -// try -// { -// msg = perThreadSetup._pingItselfClient.getTestMessage(null, -// Integer.parseInt(testParameters.getProperty( -// MESSAGE_SIZE_PROPNAME)), -// Boolean.parseBoolean(testParameters.getProperty( -// PERSISTENT_MODE_PROPNAME))); -// } -// catch (JMSException e) -// { -// -// } -// -// // start the test -// long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); -// -// String correlationID = Long.toString(perThreadSetup._pingItselfClient.getNewID()); -// -// try -// { -// _logger.debug("Sending messages"); -// -// perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings, correlationID); -// -// _logger.debug("All sent"); -// } -// catch (JMSException e) -// { -// e.printStackTrace(); -// Assert.fail("JMS Exception Received" + e); -// } -// catch (InterruptedException e) -// { -// e.printStackTrace(); -// } -// -// try -// { -// _logger.debug("Awating test finish"); -// -// perThreadSetup._pingItselfClient.getEndLock(correlationID).await(timeout, TimeUnit.MILLISECONDS); -// -// if (perThreadSetup._pingItselfClient.getEndLock(correlationID).getCount() != 0) -// { -// _logger.error("Timeout occured"); -// } -// //Allow the time out to exit the loop. -// } -// catch (InterruptedException e) -// { -// //ignore -// _logger.error("Awaiting test end was interrupted."); -// -// } -// -// // Fail the test if the timeout was exceeded. -// int numReplies = numPings - (int) perThreadSetup._pingItselfClient.removeLock(correlationID).getCount(); -// -// _logger.info("Test Finished"); -// -// if (numReplies != numPings) -// { -// try -// { -// perThreadSetup._pingItselfClient.commitTx(perThreadSetup._pingItselfClient.getConsumerSession()); -// } -// catch (JMSException e) -// { -// _logger.error("Error commiting received messages", e); -// } -// try -// { -// if (_timingController != null) -// { -// _logger.trace("Logging missing message count"); -// _timingController.completeTest(false, numPings - numReplies); -// } -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); -// } -// } -// -// public void setTimingController(TimingController timingController) -// { -// _timingController = timingController; -// } -// -// public TimingController getTimingController() -// { -// return _timingController; -// } -// -// -// private class AsyncMessageListener implements MessageListener -// { -// private volatile int _totalMessages; -// private int _batchSize; -// PerThreadSetup _perThreadSetup; -// -// public AsyncMessageListener(int batchSize) -// { -// this(batchSize, -1); -// } -// -// public AsyncMessageListener(int batchSize, int totalMessages) -// { -// _batchSize = batchSize; -// _totalMessages = totalMessages; -// _perThreadSetup = threadSetup.get(); -// } -// -// public void setTotalMessages(int newTotal) -// { -// _totalMessages = newTotal; -// } -// -// public void onMessage(Message message) -// { -// try -// { -// _logger.trace("Message Received"); -// -// CountDownLatch count = _perThreadSetup._pingItselfClient.getEndLock(message.getJMSCorrelationID()); -// -// if (count != null) -// { -// int messagesLeft = (int) count.getCount() - 1;// minus one as we haven't yet counted the current message -// -// if ((messagesLeft % _batchSize) == 0) -// { -// doDone(_batchSize); -// } -// else if (messagesLeft == 0) -// { -// doDone(_totalMessages % _batchSize); -// } -// } -// -// } -// catch (JMSException e) -// { -// _logger.warn("There was a JMSException", e); -// } -// -// } -// -// private void doDone(int messageCount) -// { -// _logger.trace("Messages received:" + messageCount); -// _logger.trace("Total Messages :" + _totalMessages); -// -// try -// { -// if (_timingController != null) -// { -// _timingController.completeTest(true, messageCount); -// } -// } -// catch (InterruptedException e) -// { -// //ignore -// } -// } -// -// } + /** + * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the + * pinger, in order to receive notifications about every message received and the number remaining to be + * received. Whenever the number remaining crosses a batch size boundary this results listener outputs + * a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount) throws JMSException + { + _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); + + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + + tc.completeTest(true, receivedInBatch); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of + * the total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } } diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index fbc67881c2..c4e72f4bb6 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -1,7 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.ping; -import java.util.Properties; - import javax.jms.*; import junit.framework.Assert; @@ -10,169 +28,85 @@ import junit.framework.TestSuite; import org.apache.log4j.Logger; +import org.apache.qpid.requestreply.PingPongProducer; + import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; +import uk.co.thebadgerset.junit.extensions.TestThreadAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; /** * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times * simultaneously to simluate many clients/producers/connections. - *

+ * *

A single run of the test using the default JUnit test runner will result in the sending and timing of a single * full round trip ping. This test may be scaled up using a suitable JUnit test runner. - *

+ * *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, * except if the connection is lost in which case an attempt to re-establish the setup is made. - *

+ * *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the * temporary queue. - *

+ * *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - *

+ * *

*
CRC Card
Responsibilities Collaborations *
* * @author Rupert Smith */ -public class PingTestPerf extends AsymptoticTestCase //implements TimingControllerAware +public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware { private static Logger _logger = Logger.getLogger(PingTestPerf.class); - /** - * Holds the name of the property to get the test message size from. - */ - protected static final String MESSAGE_SIZE_PROPNAME = "messagesize"; - - /** - * Holds the name of the property to get the ping queue name from. - */ - protected static final String PING_DESTINATION_NAME_PROPNAME = "destinationname"; - - /** - * holds the queue count, if the test is being performed with multiple queues - */ - protected static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount"; - - /** - * Holds the name of the property to get the test delivery mode from. - */ - protected static final String PERSISTENT_MODE_PROPNAME = "persistent"; - - /** - * Holds the name of the property to get the test transactional mode from. - */ - protected static final String TRANSACTED_PROPNAME = "transacted"; - - /** - * Holds the name of the property to get the test broker url from. - */ - protected static final String BROKER_PROPNAME = "broker"; - - /** - * Holds the name of the property to get the test broker virtual path. - */ - protected static final String VIRTUAL_PATH_PROPNAME = "virtualPath"; - - /** - * Holds the name of the property to get the waiting timeout for response messages. - */ - protected static final String TIMEOUT_PROPNAME = "timeout"; - - /** Holds the name of the property to get the message rate from. */ - protected static final String RATE_PROPNAME = "rate"; - - protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose"; - - /** Holds the true or false depending on wether it is P2P test or PubSub */ - protected static final String IS_PUBSUB_PROPNAME = "pubsub"; - /** - * Holds the size of message body to attach to the ping messages. - */ - protected static final int MESSAGE_SIZE_DEFAULT = 1024; - - protected static final int BATCH_SIZE_DEFAULT = 1000; - - protected static final int COMMIT_BATCH_SIZE_DEFAULT = BATCH_SIZE_DEFAULT; - - /** - * Holds the name of the queue to which pings are sent. - */ - private static final String PING_DESTINATION_NAME_DEFAULT = "ping"; - - /** - * Holds the message delivery mode to use for the test. - */ - protected static final boolean PERSISTENT_MODE_DEFAULT = false; - - /** - * Holds the transactional mode to use for the test. - */ - protected static final boolean TRANSACTED_DEFAULT = false; - - /** - * Holds the default broker url for the test. - */ - protected static final String BROKER_DEFAULT = "tcp://localhost:5672"; - - /** - * Holds the default virtual path for the test. - */ - protected static final String VIRTUAL_PATH_DEFAULT = "/test"; - - /** - * Sets a default ping timeout. - */ - protected static final long TIMEOUT_DEFAULT = 3000; - - /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */ - private static final int RATE_DEFAULT = 0; - - protected static final String FAIL_AFTER_COMMIT = "FailAfterCommit"; - protected static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit"; - protected static final String FAIL_AFTER_SEND = "FailAfterSend"; - protected static final String FAIL_BEFORE_SEND = "FailBeforeSend"; - protected static final String COMMIT_BATCH_SIZE = "CommitBatchSize"; - protected static final String BATCH_SIZE = "BatchSize"; - protected static final String FAIL_ONCE = "FailOnce"; - - /** - * Thread local to hold the per-thread test setup fields. - */ + /** Thread local to hold the per-thread test setup fields. */ ThreadLocal threadSetup = new ThreadLocal(); - Object _lock = new Object(); - - // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in - // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner - // of the test parameters to log with the results. - protected Properties testParameters = System.getProperties(); - //private Properties testParameters = new ContextualProperties(System.getProperties()); + /** Holds a property reader to extract the test parameters from. */ + protected ParsedProperties testParameters = new ParsedProperties(System.getProperties()); public PingTestPerf(String name) { super(name); - // Sets up the test parameters with defaults. - setSystemPropertyIfNull(FAIL_AFTER_COMMIT, "false"); - setSystemPropertyIfNull(FAIL_BEFORE_COMMIT, "false"); - setSystemPropertyIfNull(FAIL_AFTER_SEND, "false"); - setSystemPropertyIfNull(FAIL_BEFORE_SEND, "false"); - setSystemPropertyIfNull(FAIL_ONCE, "true"); - - setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT)); - setSystemPropertyIfNull(COMMIT_BATCH_SIZE, Integer.toString(COMMIT_BATCH_SIZE_DEFAULT)); - setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT)); - setSystemPropertyIfNull(PING_DESTINATION_NAME_PROPNAME, PING_DESTINATION_NAME_DEFAULT); - setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT)); - setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT)); - setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); - setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT); - setSystemPropertyIfNull(TIMEOUT_PROPNAME, Long.toString(TIMEOUT_DEFAULT)); - setSystemPropertyIfNull(PING_DESTINATION_COUNT_PROPNAME, Integer.toString(0)); - setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false)); - setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT)); - setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false)); + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, + PingPongProducer.DEFAULT_PING_DESTINATION_NAME); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_VERBOSE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_RATE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_PUBSUB)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, + Long.toString(PingPongProducer.DEFAULT_TIMEOUT)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, + PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, + PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, + PingPongProducer.DEFAULT_FAIL_AFTER_SEND); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, + PingPongProducer.DEFAULT_FAIL_BEFORE_SEND); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE); } /** @@ -187,20 +121,6 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll suite.addTest(new PingTestPerf("testPingOk")); return suite; - //return new junit.framework.TestSuite(PingTestPerf.class); - } - - protected static void setSystemPropertyIfNull(String propName, String propValue) - { - if (System.getProperty(propName) == null) - { - System.setProperty(propName, propValue); - } - } - - public void testPing(int jim) throws Exception - { - testPingOk(1); } public void testPingOk(int numPings) throws Exception @@ -214,15 +134,15 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll // Generate a sample message. This message is already time stamped and has its reply-to destination set. ObjectMessage msg = - perThreadSetup._pingItselfClient.getTestMessage(null, - Integer.parseInt(testParameters.getProperty( - MESSAGE_SIZE_PROPNAME)), - Boolean.parseBoolean(testParameters.getProperty( - PERSISTENT_MODE_PROPNAME))); + perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger( + PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean( + PingPongProducer.PERSISTENT_MODE_PROPNAME)); // start the test - long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); - int numReplies = perThreadSetup._pingItselfClient.pingAndWaitForReply(msg, numPings, timeout); + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout); // Fail the test if the timeout was exceeded. if (numReplies != numPings) @@ -232,75 +152,87 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll } } - - protected void setUp() throws Exception + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() { - // Log4j will propagate the test name as a thread local in all log output. - // Carefull when using this, it can cause memory leaks when not cleaned up properly. - //NDC.push(getName()); + _logger.debug("public void threadSetUp(): called"); - // Create the test setups on a per thread basis, only if they have not already been created. - - if (threadSetup.get() == null) + try { PerThreadSetup perThreadSetup = new PerThreadSetup(); // Extract the test set up paramaeters. - String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); - String username = "guest"; - String password = "guest"; - String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); - int destinationscount = Integer.parseInt(testParameters.getProperty(PING_DESTINATION_COUNT_PROPNAME)); - String destinationname = testParameters.getProperty(PING_DESTINATION_NAME_PROPNAME); - boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); - boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); - String selector = null; - boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); - int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); - int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); - boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); - - boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); - boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); - boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); - boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); - boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); + String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); + String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); + String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); + String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME); + String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); + boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); + boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); + String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME); + int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME); + int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME); + boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME); + boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME); + boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME); + boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME); + boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME); + int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME); + Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME); - int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); + // Extract the test set up paramaeters. + int destinationscount = + Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME)); // This is synchronized because there is a race condition, which causes one connection to sleep if - // all threads try to create connection concurrently - synchronized (_lock) + // all threads try to create connection concurrently. + synchronized (this) { // Establish a client to ping a Destination and listen the reply back from same Destination - perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, - destinationname, selector, transacted, persistent, - messageSize, verbose, afterCommit, beforeCommit, - afterSend, beforeSend, failOnce, batchSize, destinationscount, - rate, pubsub); + perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName, + selector, transacted, persistent, messageSize, verbose, + failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend, + failOnce, batchSize, destinationscount, rate, pubsub); } // Start the client connection - perThreadSetup._pingItselfClient.getConnection().start(); + perThreadSetup._pingClient.getConnection().start(); // Attach the per-thread set to the thread. threadSetup.set(perThreadSetup); } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } } - protected void tearDown() throws Exception + /** + * Performs test fixture clean + */ + public void threadTearDown() { + _logger.debug("public void threadTearDown(): called"); + try { - /* - if ((_pingItselfClient != null) && (_pingItselfClient.getConnection() != null)) + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) { - _pingItselfClient.getConnection().close(); + perThreadSetup._pingClient.close(); } - */ + + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); } - finally + catch (JMSException e) { - //NDC.pop(); + _logger.warn("There was an exception during per thread tear down."); } } @@ -309,6 +241,6 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll /** * Holds the test ping client. */ - protected TestPingItself _pingItselfClient; + protected PingClient _pingClient; } } diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java deleted file mode 100644 index 274c8b5fc8..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/ping/ThrottleTestPerf.java +++ /dev/null @@ -1,63 +0,0 @@ -package org.apache.qpid.ping; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; - -/** - * Tests the {@link Throttle} implementation. Test timings can be taken using this test class to confirm that the - * throttle works as it should, and what the maximum rate is that it works reliably. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Enable test timings to be taken to confirm that the throttle works at the correct rate. - *
- * - * @author Rupert Smith - */ -public class ThrottleTestPerf extends AsymptoticTestCase -{ - ThreadLocal threadSetup = new ThreadLocal(); - - public ThrottleTestPerf(String name) - { - super(name); - } - - /** - * Compile all the tests into a test suite. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping-Pong Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new ThrottleTestPerf("testThrottle")); - - return suite; - } - - public void testThrottle(int opsPerSecond) - { - Throttle throttle = threadSetup.get(); - - // Setting this on every test call won't cause any harm, convenient to use the size parameter for this. - throttle.setRate(opsPerSecond); - - // Run the test at the throttled rate, do this for the num of opsPerSecond, then every test should take 1 second. - for (int i = 0; i < opsPerSecond; i++) - { - throttle.throttle(); - } - } - - protected void setUp() - { - if (threadSetup.get() == null) - { - threadSetup.set(new Throttle()); - } - } -} diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java index fca133c425..81967d332a 100644 --- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -1,7 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ package org.apache.qpid.requestreply; -import java.util.Properties; - import javax.jms.*; import junit.framework.Assert; @@ -11,149 +29,87 @@ import junit.framework.TestSuite; import org.apache.log4j.Logger; import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; /** * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from * a producer to a conumer, then the consumer replies to the message on a temporary queue. - *

+ * *

A single run of the test using the default JUnit test runner will result in the sending and timing of the number * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled - * up using a suitable JUnit test runner. See {@link TKTestRunner} or {@link PPTestRunner} for more information on how - * to do this. - *

+ * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more + * information on how to do this. + * *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads * gets its own connection/producer/consumer, this is only re-established if the connection is lost. - *

+ * *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come * back on the temporary queue. - *

+ * *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - *

+ * *

*
CRC Card
Responsibilities Collaborations *
* * @author Rupert Smith */ -public class PingPongTestPerf extends AsymptoticTestCase //implements TimingControllerAware +public class PingPongTestPerf extends AsymptoticTestCase { private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); - /** - * Holds the name of the property to get the test message size from. - */ - private static final String MESSAGE_SIZE_PROPNAME = "messagesize"; - - /** - * Holds the name of the property to get the ping queue name from. - */ - private static final String PING_QUEUE_NAME_PROPNAME = "destinationname"; - - /** - * Holds the name of the property to get the test delivery mode from. - */ - private static final String PERSISTENT_MODE_PROPNAME = "persistent"; - - /** - * Holds the name of the property to get the test transactional mode from. - */ - private static final String TRANSACTED_PROPNAME = "transacted"; - - /** - * Holds the name of the property to get the test broker url from. - */ - private static final String BROKER_PROPNAME = "broker"; - - /** - * Holds the name of the property to get the test broker virtual path. - */ - private static final String VIRTUAL_PATH_PROPNAME = "virtualPath"; - - /** - * Holds the size of message body to attach to the ping messages. - */ - private static final int MESSAGE_SIZE_DEFAULT = 0; - - private static final int BATCH_SIZE_DEFAULT = 2; - - /** - * Holds the name of the queue to which pings are sent. - */ - private static final String PING_QUEUE_NAME_DEFAULT = "ping"; - - /** - * Holds the message delivery mode to use for the test. - */ - private static final boolean PERSISTENT_MODE_DEFAULT = false; - - /** - * Holds the transactional mode to use for the test. - */ - private static final boolean TRANSACTED_DEFAULT = false; - - /** - * Holds the default broker url for the test. - */ - private static final String BROKER_DEFAULT = "tcp://localhost:5672"; - - /** - * Holds the default virtual path for the test. - */ - private static final String VIRTUAL_PATH_DEFAULT = "/test"; - - /** - * Sets a default ping timeout. - */ - private static final long TIMEOUT = 15000; - - /** Holds the name of the property to get the message rate from. */ - private static final String RATE_PROPNAME = "rate"; - - private static final String VERBOSE_OUTPUT_PROPNAME = "verbose"; - - /** Holds the true or false depending on wether it is P2P test or PubSub */ - private static final String IS_PUBSUB_PROPNAME = "pubsub"; - - /** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */ - private static final int RATE_DEFAULT = 0; - - private static final String FAIL_AFTER_COMMIT = "FailAfterCommit"; - private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit"; - private static final String FAIL_AFTER_SEND = "FailAfterSend"; - private static final String FAIL_BEFORE_SEND = "FailBeforeSend"; - private static final String BATCH_SIZE = "BatchSize"; - private static final String FAIL_ONCE = "FailOnce"; - - /** - * Thread local to hold the per-thread test setup fields. - */ + /** Thread local to hold the per-thread test setup fields. */ ThreadLocal threadSetup = new ThreadLocal(); - Object _lock = new Object(); // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. - private Properties testParameters = System.getProperties(); - //private Properties testParameters = new ContextualProperties(System.getProperties()); + //private Properties testParameters = System.getProperties(); + private ParsedProperties testParameters = new ParsedProperties(System.getProperties()); public PingPongTestPerf(String name) { super(name); // Sets up the test parameters with defaults. - setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT)); - setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT)); - setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); - setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT)); - setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT)); - setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); - setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT); - setSystemPropertyIfNull(VERBOSE_OUTPUT_PROPNAME, Boolean.toString(false)); - setSystemPropertyIfNull(RATE_PROPNAME, Integer.toString(RATE_DEFAULT)); - setSystemPropertyIfNull(IS_PUBSUB_PROPNAME, Boolean.toString(false)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, + PingPongProducer.DEFAULT_PING_DESTINATION_NAME); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_VERBOSE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_RATE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME, + Boolean.toString(PingPongProducer.DEFAULT_PUBSUB)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, + Long.toString(PingPongProducer.DEFAULT_TIMEOUT)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME, + Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT)); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, + PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, + PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, + PingPongProducer.DEFAULT_FAIL_AFTER_SEND); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, + PingPongProducer.DEFAULT_FAIL_BEFORE_SEND); + ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE); } /** @@ -185,19 +141,15 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont // Generate a sample message. This message is already time stamped and has its reply-to destination set. ObjectMessage msg = - perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestination(), - Integer.parseInt(testParameters.getProperty( - MESSAGE_SIZE_PROPNAME)), - Boolean.parseBoolean(testParameters.getProperty( - PERSISTENT_MODE_PROPNAME))); - - // Use the test timing controller to reset the test timer now and obtain the current time. - // This can be used to remove the message creation time from the test. - //TestTimingController timingUtils = getTimingController(); - //long startTime = timingUtils.restart(); + perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger( + PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean( + PingPongProducer.PERSISTENT_MODE_PROPNAME)); // Send the message and wait for a reply. - int numReplies = perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, TIMEOUT); + int numReplies = + perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.DEFAULT_TIMEOUT); // Fail the test if the timeout was exceeded. if (numReplies != numPings) @@ -206,82 +158,93 @@ public class PingPongTestPerf extends AsymptoticTestCase //implements TimingCont } } - protected void setUp() throws Exception + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() { - // Log4j will propagate the test name as a thread local in all log output. - // Carefull when using this, it can cause memory leaks when not cleaned up properly. - //NDC.push(getName()); - - // Create the test setups on a per thread basis, only if they have not already been created. - if (threadSetup.get() == null) + try { PerThreadSetup perThreadSetup = new PerThreadSetup(); // Extract the test set up paramaeters. - String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); - String username = "guest"; - String password = "guest"; - String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); - String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME); - boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); - boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); - String selector = null; - boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); - int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); - int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); - boolean pubsub = Boolean.parseBoolean(testParameters.getProperty(IS_PUBSUB_PROPNAME)); - - boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); - boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); - boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); - boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); - int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); - Boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); - - synchronized(_lock) + String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); + String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); + String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); + String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME); + String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); + boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); + boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); + String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME); + int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME); + int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME); + boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME); + boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME); + boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME); + boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME); + boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME); + int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME); + Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME); + + synchronized (this) { // Establish a bounce back client on the ping queue to bounce back the pings. - perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualpath, - queueName, persistent, transacted, selector, verbose, pubsub); + perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualPath, + destinationName, persistent, transacted, selector, + verbose, pubsub); // Start the connections for client and producer running. perThreadSetup._testPingBouncer.getConnection().start(); // Establish a ping-pong client on the ping queue to send the pings with. - perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualpath, - queueName, selector, transacted, persistent, messageSize, - verbose, afterCommit, beforeCommit, afterSend, - beforeSend, failOnce, batchSize, 0, rate, pubsub); + perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualPath, + destinationName, selector, transacted, persistent, + messageSize, verbose, failAfterCommit, + failBeforeCommit, failAfterSend, failBeforeSend, + failOnce, batchSize, 0, rate, pubsub); perThreadSetup._testPingProducer.getConnection().start(); } // Attach the per-thread set to the thread. threadSetup.set(perThreadSetup); } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } } - protected void tearDown() throws Exception + /** + * Performs test fixture clean + */ + public void threadTearDown() { + _logger.debug("public void threadTearDown(): called"); + try { - /**if ((_testPingBouncer != null) && (_testPingBouncer.getConnection() != null)) - { - _testPingBouncer.getConnection().close(); - } - - if ((_testPingProducer != null) && (_testPingProducer.getConnection() != null)) - { - _testPingProducer.getConnection().close(); - }*/ + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + perThreadSetup._testPingProducer.close(); + //perThreadSetup._testPingBouncer.close(); + } + + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); } - finally + catch (JMSException e) { - //NDC.pop(); + _logger.warn("There was an exception during per thread tear down."); } } - private static class PerThreadSetup + protected static class PerThreadSetup { /** * Holds the test ping-pong producer. -- cgit v1.2.1