summaryrefslogtreecommitdiff
path: root/java/perftests/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'java/perftests/src/test')
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java288
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java60
2 files changed, 320 insertions, 28 deletions
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
new file mode 100644
index 0000000000..d3ce064831
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.ping;
+
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.TimingController;
+
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.CountDownLatch;
+
+public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware
+{
+ private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
+
+ private TimingController _timingController;
+
+ private final CountDownLatch _completedLock = new CountDownLatch(1);
+
+ private AsyncMessageListener _listener;
+
+ private volatile boolean _done = false;
+
+ public PingAsyncTestPerf(String name)
+ {
+ super(name);
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
+
+ return suite;
+ }
+
+ protected void setUp() throws Exception
+ {
+ // Create the test setups on a per thread basis, only if they have not already been created.
+
+ if (threadSetup.get() == null)
+ {
+ PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+ // Extract the test set up paramaeters.
+ String brokerDetails = testParameters.getProperty(BROKER_PROPNAME);
+ String username = "guest";
+ String password = "guest";
+ String virtualpath = testParameters.getProperty(VIRTUAL_PATH_PROPNAME);
+ int queueCount = Integer.parseInt(testParameters.getProperty(PING_QUEUE_COUNT_PROPNAME));
+ String queueName = testParameters.getProperty(PING_QUEUE_NAME_PROPNAME);
+ boolean persistent = Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME));
+ boolean transacted = Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME));
+ String selector = null;
+ boolean verbose = Boolean.parseBoolean(testParameters.getProperty(VERBOSE_OUTPUT_PROPNAME));
+ int messageSize = Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME));
+
+ boolean afterCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_COMMIT));
+ boolean beforeCommit = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_COMMIT));
+ boolean afterSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_AFTER_SEND));
+ boolean beforeSend = Boolean.parseBoolean(testParameters.getProperty(FAIL_BEFORE_SEND));
+ boolean failOnce = Boolean.parseBoolean(testParameters.getProperty(FAIL_ONCE));
+
+ int batchSize = Integer.parseInt(testParameters.getProperty(BATCH_SIZE));
+ int commitbatchSize = Integer.parseInt(testParameters.getProperty(COMMIT_BATCH_SIZE));
+
+ int rate = Integer.parseInt(testParameters.getProperty(RATE_PROPNAME));
+
+ // This is synchronized because there is a race condition, which causes one connection to sleep if
+ // all threads try to create connection concurrently
+ synchronized (this)
+ {
+ // Establish a client to ping a Queue and listen the reply back from same Queue
+ perThreadSetup._pingItselfClient = new TestPingItself(brokerDetails, username, password, virtualpath,
+ queueName, selector, transacted, persistent,
+ messageSize, verbose,
+ afterCommit, beforeCommit, afterSend, beforeSend, failOnce,
+ commitbatchSize, queueCount, rate);
+
+
+ _listener = new AsyncMessageListener(batchSize);
+
+ perThreadSetup._pingItselfClient.setMessageListener(_listener);
+ // Start the client connection
+ perThreadSetup._pingItselfClient.getConnection().start();
+
+ // Attach the per-thread set to the thread.
+ threadSetup.set(perThreadSetup);
+ }
+ }
+ }
+
+
+ public void testAsyncPingOk(int numPings)
+ {
+ _timingController = this.getTimingController();
+
+ _listener.setTotalMessages(numPings);
+
+ PerThreadSetup perThreadSetup = threadSetup.get();
+ if (numPings == 0)
+ {
+ _logger.error("Number of pings requested was zero.");
+ }
+
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ ObjectMessage msg = null;
+
+ try
+ {
+ msg = perThreadSetup._pingItselfClient.getTestMessage(null,
+ Integer.parseInt(testParameters.getProperty(
+ MESSAGE_SIZE_PROPNAME)),
+ Boolean.parseBoolean(testParameters.getProperty(
+ PERSISTENT_MODE_PROPNAME)));
+ }
+ catch (JMSException e)
+ {
+
+ }
+
+ // start the test
+ long timeout = Long.parseLong(testParameters.getProperty(TIMEOUT_PROPNAME));
+
+ try
+ {
+ perThreadSetup._pingItselfClient.pingNoWaitForReply(msg, numPings);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ Assert.fail("JMS Exception Recevied" + e);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+
+ while (!_done)
+ {
+ try
+ {
+ _logger.info("awating test finish");
+
+ _completedLock.await();
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ }
+
+ // Fail the test if the timeout was exceeded.
+ int numReplies = _listener.getReplyCount();
+
+ _logger.info("Test Finished");
+
+ if (numReplies != numPings)
+
+ {
+ Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies);
+ try
+ {
+ _timingController.completeTest(false);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ }
+ }
+
+
+ public void setTimingController(TimingController timingController)
+ {
+ _timingController = timingController;
+ }
+
+ public TimingController getTimingController()
+ {
+ return _timingController;
+ }
+
+
+ private class AsyncMessageListener implements MessageListener
+ {
+ private int _messageRecevied;
+ private int _totalMessages;
+ private int _batchSize;
+
+ public AsyncMessageListener(int batchSize, int totalMessages)
+ {
+ _batchSize = batchSize;
+ _totalMessages = totalMessages;
+ _messageRecevied = 0;
+ }
+
+ public AsyncMessageListener(int batchSize)
+ {
+ _batchSize = batchSize;
+ _totalMessages = -1;
+ _messageRecevied = 0;
+ }
+
+ public void setTotalMessages(int newTotal)
+ {
+ _totalMessages = newTotal;
+ }
+
+ public void onMessage(Message message)
+ {
+ _logger.info("Message Recevied");
+ try
+ {
+ _messageRecevied++;
+ if (_messageRecevied == _batchSize)
+ {
+ if (_timingController != null)
+ {
+ _timingController.completeTest(true);
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ doDone();
+ }
+
+ if (_totalMessages == -1 || _messageRecevied == _totalMessages)
+ {
+ _logger.info("Test Completed.. signalling");
+ doDone();
+ }
+ }
+
+ private void doDone()
+ {
+ _done = true;
+ _completedLock.countDown();
+ try
+ {
+ _timingController.completeTest(true);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+ }
+
+ public int getReplyCount()
+ {
+ return _messageRecevied;
+ }
+ }
+
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
index c9896ce063..402d72d6db 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
@@ -42,94 +42,96 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
/**
* Holds the name of the property to get the test message size from.
*/
- private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+ protected static final String MESSAGE_SIZE_PROPNAME = "messageSize";
/**
* Holds the name of the property to get the ping queue name from.
*/
- private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+ protected static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
/**
* holds the queue count, if the test is being performed with multiple queues
*/
- private static final String PING_QUEUE_COUNT_PROPNAME = "queues";
+ protected static final String PING_QUEUE_COUNT_PROPNAME = "queues";
/**
* Holds the name of the property to get the test delivery mode from.
*/
- private static final String PERSISTENT_MODE_PROPNAME = "persistent";
+ protected static final String PERSISTENT_MODE_PROPNAME = "persistent";
/**
* Holds the name of the property to get the test transactional mode from.
*/
- private static final String TRANSACTED_PROPNAME = "transacted";
+ protected static final String TRANSACTED_PROPNAME = "transacted";
/**
* Holds the name of the property to get the test broker url from.
*/
- private static final String BROKER_PROPNAME = "broker";
+ protected static final String BROKER_PROPNAME = "broker";
/**
* Holds the name of the property to get the test broker virtual path.
*/
- private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
+ protected static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
/**
* Holds the name of the property to get the waiting timeout for response messages.
*/
- private static final String TIMEOUT_PROPNAME = "timeout";
+ protected static final String TIMEOUT_PROPNAME = "timeout";
/** Holds the name of the property to get the message rate from. */
- private static final String RATE_PROPNAME = "rate";
+ protected static final String RATE_PROPNAME = "rate";
- private static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
+ protected static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
/**
* Holds the size of message body to attach to the ping messages.
*/
- private static final int MESSAGE_SIZE_DEFAULT = 0;
+ protected static final int MESSAGE_SIZE_DEFAULT = 0;
- private static final int BATCH_SIZE_DEFAULT = 2;
+ protected static final int BATCH_SIZE_DEFAULT = 2;
+ protected static final int COMMIT_BATCH_SIZE_DEFAULT = BATCH_SIZE_DEFAULT;
/**
* Holds the name of the queue to which pings are sent.
*/
- private static final String PING_QUEUE_NAME_DEFAULT = "ping";
+ protected static final String PING_QUEUE_NAME_DEFAULT = "ping";
/**
* Holds the message delivery mode to use for the test.
*/
- private static final boolean PERSISTENT_MODE_DEFAULT = false;
+ protected static final boolean PERSISTENT_MODE_DEFAULT = false;
/**
* Holds the transactional mode to use for the test.
*/
- private static final boolean TRANSACTED_DEFAULT = false;
+ protected static final boolean TRANSACTED_DEFAULT = false;
/**
* Holds the default broker url for the test.
*/
- private static final String BROKER_DEFAULT = "tcp://localhost:5672";
+ protected static final String BROKER_DEFAULT = "tcp://localhost:5672";
/**
* Holds the default virtual path for the test.
*/
- private static final String VIRTUAL_PATH_DEFAULT = "/test";
+ protected static final String VIRTUAL_PATH_DEFAULT = "/test";
/**
* Sets a default ping timeout.
*/
- private static final long TIMEOUT_DEFAULT = 3000;
+ protected static final long TIMEOUT_DEFAULT = 3000;
/** Holds the default rate. A value of zero means infinity, only values of 1 or greater are meaningfull. */
private static final int RATE_DEFAULT = 0;
- private static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
- private static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
- private static final String FAIL_AFTER_SEND = "FailAfterSend";
- private static final String FAIL_BEFORE_SEND = "FailBeforeSend";
- private static final String BATCH_SIZE = "BatchSize";
- private static final String FAIL_ONCE = "FailOnce";
+ protected static final String FAIL_AFTER_COMMIT = "FailAfterCommit";
+ protected static final String FAIL_BEFORE_COMMIT = "FailBeforeCommit";
+ protected static final String FAIL_AFTER_SEND = "FailAfterSend";
+ protected static final String FAIL_BEFORE_SEND = "FailBeforeSend";
+ protected static final String COMMIT_BATCH_SIZE = "CommitBatchSize";
+ protected static final String BATCH_SIZE = "BatchSize";
+ protected static final String FAIL_ONCE = "FailOnce";
/**
* Thread local to hold the per-thread test setup fields.
@@ -139,7 +141,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
// Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
// the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
// of the test parameters to log with the results.
- private Properties testParameters = System.getProperties();
+ protected Properties testParameters = System.getProperties();
//private Properties testParameters = new ContextualProperties(System.getProperties());
public PingTestPerf(String name)
@@ -154,6 +156,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
setSystemPropertyIfNull(FAIL_ONCE, "true");
setSystemPropertyIfNull(BATCH_SIZE, Integer.toString(BATCH_SIZE_DEFAULT));
+ setSystemPropertyIfNull(COMMIT_BATCH_SIZE, Integer.toString(COMMIT_BATCH_SIZE_DEFAULT));
setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
@@ -181,7 +184,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
//return new junit.framework.TestSuite(PingTestPerf.class);
}
- private static void setSystemPropertyIfNull(String propName, String propValue)
+ protected static void setSystemPropertyIfNull(String propName, String propValue)
{
if (System.getProperty(propName) == null)
{
@@ -223,6 +226,7 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
}
}
+
protected void setUp() throws Exception
{
// Log4j will propagate the test name as a thread local in all log output.
@@ -293,11 +297,11 @@ public class PingTestPerf extends AsymptoticTestCase //implements TimingControll
}
}
- private static class PerThreadSetup
+ protected static class PerThreadSetup
{
/**
* Holds the test ping client.
*/
- private TestPingItself _pingItselfClient;
+ protected TestPingItself _pingItselfClient;
}
}