diff options
Diffstat (limited to 'java/perftests/src/test')
| -rw-r--r-- | java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java | 288 | ||||
| -rw-r--r-- | java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java | 60 |
2 files changed, 320 insertions, 28 deletions
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java new file mode 100644 index 0000000000..d3ce064831 --- /dev/null +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.ping; + +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.TimingController; + +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import javax.jms.JMSException; +import javax.jms.Message; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; +import org.apache.log4j.Logger; + +import java.util.concurrent.CountDownLatch; + +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + + private TimingController _timingController; + + private final CountDownLatch _completedLock = new CountDownLatch(1); + + private AsyncMessageListener _listener; + + private volatile boolean _done = false; + + public PingAsyncTestPerf(String name) + { + super(name); + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + protected void setUp() throws Exception + { + // Create the test setups on a per thread basis, only if they have not already been created. + + if (threadSetup.get() == null) + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // Extract the test set up paramaeters. + String brokerDetails = testParameters.getProperty(BROKER_PROPNAME); + String username = "guest"; + String password = "guest"; + String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME); + int queueCount = Integer.parseInt(testParameters.getProperty(PING_QUEUE_COUNT_PROPNAME)); + String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME); + boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)); + boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)); + String selector = null; + boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME)); + int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)); + + boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT)); + boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT)); + boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND)); + boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND)); + boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE)); + + int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE)); + int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE)); + + int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME)); + + // This is synchronized because there is a race condition, which causes one connection to sleep if + // all threads try to create connection concurrently + synchronized (this) + { + // Establish a client to ping a Queue and listen the reply back from same Queue + perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath, + queueName, selector, transacted, persistent, + messageSize, verbose, + afterCommit, beforeCommit, afterSend, beforeSend, failOnce, + commitbatchSize, queueCount, rate); + + + _listener = new AsyncMessageListener(batchSize); + + perThreadSetup._pingItselfClient.setMessageListener(_listener); + // Start the client connection + perThreadSetup._pingItselfClient.getConnection().start(); + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + } + } + + + public void testAsyncPingOk(int numPings) + { + _timingController = this.getTimingController(); + + _listener.setTotalMessages(numPings); + + PerThreadSetup perThreadSetup = threadSetup.get(); + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + ObjectMessage msg = null; + + try + { + msg = perThreadSetup._pingItselfClient.getTestMessage(null, + Integer.parseInt(testParameters.getProperty( + MESSAGE_SIZE_PROPNAME)), + Boolean.parseBoolean(testParameters.getProperty( + PERSISTENT_MODE_PROPNAME))); + } + catch (JMSException e) + { + + } + + // start the test + long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME)); + + try + { + perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings); + } + catch (JMSException e) + { + e.printStackTrace(); + Assert.fail("JMS Exception Recevied" + e); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + + while (!_done) + { + try + { + _logger.info("awating test finish"); + + _completedLock.await(); + } + catch (InterruptedException e) + { + //ignore + } + } + + // Fail the test if the timeout was exceeded. + int numReplies = _listener.getReplyCount(); + + _logger.info("Test Finished"); + + if (numReplies != numPings) + + { + Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); + try + { + _timingController.completeTest(false); + } + catch (InterruptedException e) + { + //ignore + } + } + } + + + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + public TimingController getTimingController() + { + return _timingController; + } + + + private class AsyncMessageListener implements MessageListener + { + private int _messageRecevied; + private int _totalMessages; + private int _batchSize; + + public AsyncMessageListener(int batchSize, int totalMessages) + { + _batchSize = batchSize; + _totalMessages = totalMessages; + _messageRecevied = 0; + } + + public AsyncMessageListener(int batchSize) + { + _batchSize = batchSize; + _totalMessages = -1; + _messageRecevied = 0; + } + + public void setTotalMessages(int newTotal) + { + _totalMessages = newTotal; + } + + public void onMessage(Message message) + { + _logger.info("Message Recevied"); + try + { + _messageRecevied++; + if (_messageRecevied == _batchSize) + { + if (_timingController != null) + { + _timingController.completeTest(true); + } + } + } + catch (InterruptedException e) + { + doDone(); + } + + if (_totalMessages == -1 || _messageRecevied == _totalMessages) + { + _logger.info("Test Completed.. signalling"); + doDone(); + } + } + + private void doDone() + { + _done = true; + _completedLock.countDown(); + try + { + _timingController.completeTest(true); + } + catch (InterruptedException e) + { + //ignore + } + } + + public int getReplyCount() + { + return _messageRecevied; + } + } + +} diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index c9896ce063..402d72d6db 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -42,94 +42,96 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll /**
* Holds the name of the property to get the test message size from.
*/
- private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+ protected static final String MESSAGE_SIZE_PROPNAME = "messageSize";
/**
* Holds the name of the property to get the ping queue name from.
*/
- private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+ protected static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
/**
* holds the queue count, if the test is being performed with multiple queues
*/
- private static final String PING_QUEUE_COUNT_PROPNAME = "queues";
+ protected static final String PING_QUEUE_COUNT_PROPNAME = "queues";
/**
* Holds the name of the property to get the test delivery mode from.
*/
- private static final String PERSISTENT_MODE_PROPNAME = "persistent";
+ protected static final String PERSISTENT_MODE_PROPNAME = "persistent";
/**
* Holds the name of the property to get the test transactional mode from.
*/
- private static final String TRANSACTED_PROPNAME = "transacted";
+ protected static final String TRANSACTED_PROPNAME = "transacted";
/**
* Holds the name of the property to get the test broker url from.
*/
- private static final String BROKER_PROPNAME = "broker";
+ protected static final String BROKER_PROPNAME = "broker";
/**
* Holds the name of the property to get the test broker virtual path.
*/
- private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
+ protected static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
/**
* Holds the name of the property to get the waiting timeout for response messages.
*/
- private static final String TIMEOUT_PROPNAME = "timeout";
+ protected static final String TIMEOUT_PROPNAME = "timeout";
/** Holds the name of the property to get the message rate from. */
- private static final String RATE_PROPNAME = "rate";
+ protected static final String RATE_PROPNAME = "rate";
- private static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
+ protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
/**
* Holds the size of message body to attach to the ping messages.
*/
- private static final int MESSAGE_SIZE_DEFAULT = 0;
+ protected static final int MESSAGE_SIZE_DEFAULT = 0;
- private static final int BATCH_SIZE_DEFAULT = 2;
+ protected static final int BATCH_SIZE_DEFAULT = 2;
+ protected static final int COMMIT_BATCH_SIZE_DEFAULT = BATCH_SIZE_DEFAULT;
/**
* Holds the name of the queue to which pings are sent.
*/
- private static final String PING_QUEUE_NAME_DEFAULT = "ping";
+ protected static final String PING_QUEUE_NAME_DEFAULT = "ping";
/**
* Holds the message delivery mode to use for the test.
*/
- private static final boolean PERSISTENT_MODE_DEFAULT = false;
+ protected static final boolean PERSISTENT_MODE_DEFAULT = false;
/**
* Holds the transactional mode to use for the test.
*/
- private static final boolean TRANSACTED_DEFAULT = false;
+ protected static final boolean TRANSACTED_DEFAULT = false;
/**
* Holds the default broker url for the test.
*/
- private static final String BROKER_DEFAULT = "tcp://localhost:5672";
+ protected static final String BROKER_DEFAULT = "tcp://localhost:5672";
/**
* Holds the default virtual path for the test.
*/
- private static final String VIRTUAL_PATH_DEFAULT = "/test";
+ protected static final String VIRTUAL_PATH_DEFAULT = "/test";
/**
* Sets a default ping timeout.
*/
- private static final long TIMEOUT_DEFAULT = 3000;
+ protected static final long TIMEOUT_DEFAULT = 3000;
/** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */
private static final int RATE_DEFAULT = 0;
- private static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
- private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
- private static final String FAIL_AFTER_SEND = "FailAfterSend";
- private static final String FAIL_BEFORE_SEND = "FailBeforeSend";
- private static final String BATCH_SIZE = "BatchSize";
- private static final String FAIL_ONCE = "FailOnce";
+ protected static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
+ protected static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
+ protected static final String FAIL_AFTER_SEND = "FailAfterSend";
+ protected static final String FAIL_BEFORE_SEND = "FailBeforeSend";
+ protected static final String COMMIT_BATCH_SIZE = "CommitBatchSize";
+ protected static final String BATCH_SIZE = "BatchSize";
+ protected static final String FAIL_ONCE = "FailOnce";
/**
* Thread local to hold the per-thread test setup fields.
@@ -139,7 +141,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
// the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
// of the test parameters to log with the results.
- private Properties testParameters = System.getProperties();
+ protected Properties testParameters = System.getProperties();
//private Properties testParameters = new ContextualProperties(System.getProperties());
public PingTestPerf(String name)
@@ -154,6 +156,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll setSystemPropertyIfNull(FAIL_ONCE, "true");
setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT));
+ setSystemPropertyIfNull(COMMIT_BATCH_SIZE, Integer.toString(COMMIT_BATCH_SIZE_DEFAULT));
setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
@@ -181,7 +184,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll //return new junit.framework.TestSuite(PingTestPerf.class);
}
- private static void setSystemPropertyIfNull(String propName, String propValue)
+ protected static void setSystemPropertyIfNull(String propName, String propValue)
{
if (System.getProperty(propName) == null)
{
@@ -223,6 +226,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll }
}
+
protected void setUp() throws Exception
{
// Log4j will propagate the test name as a thread local in all log output.
@@ -293,11 +297,11 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll }
}
- private static class PerThreadSetup
+ protected static class PerThreadSetup
{
/**
* Holds the test ping client.
*/
- private TestPingItself _pingItselfClient;
+ protected TestPingItself _pingItselfClient;
}
}
|
