summaryrefslogtreecommitdiff
path: root/java/perftests/src/test
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-02-14 20:02:03 +0000
committerKim van der Riet <kpvdr@apache.org>2007-02-14 20:02:03 +0000
commita22f3f594d6eee7d610fb4f140e18cddd7c880f6 (patch)
tree5adb376ed217d2debaff1c0bdd59af1a1c93e829 /java/perftests/src/test
parent9cb1922884c5b258c961046e6fd48e5152aa79d5 (diff)
downloadqpid-python-a22f3f594d6eee7d610fb4f140e18cddd7c880f6.tar.gz
First backmerge from trunk to 0-9 branch for Java. Not all java tests passing yet
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507672 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests/src/test')
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java302
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java317
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java246
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java259
4 files changed, 1124 insertions, 0 deletions
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
new file mode 100644
index 0000000000..e10e6353b7
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -0,0 +1,302 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller
+ * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test
+ * that it extends because it can output timings as replies are received, rather than waiting until all expected replies
+ * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending
+ * and recieving clients working asynchronously.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><td> Responsibilities <th> Collaborations
+ * <tr><td> Send many ping messages and output timings asynchronously on batches received.
+ * </table>
+ */
+public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware
+{
+ private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class);
+
+ /** Holds the name of the property to get the test results logging batch size. */
+ public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "BatchSize";
+
+ /** Holds the default test results logging batch size. */
+ public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
+
+ /** Used to hold the timing controller passed from the test runner. */
+ private TimingController _timingController;
+
+ /** Used to generate unique correlation ids for each test run. */
+ private AtomicLong corellationIdGenerator = new AtomicLong();
+
+ /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */
+ private Map<String, PerCorrelationId> perCorrelationIds =
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+
+ /** Holds the batched results listener, that does logging on batch boundaries. */
+ private BatchedResultsListener batchedResultsListener = null;
+
+ /**
+ * Creates a new asynchronous ping performance test with the specified name.
+ *
+ * @param name The test name.
+ */
+ public PingAsyncTestPerf(String name)
+ {
+ super(name);
+
+ // Sets up the test parameters with defaults.
+ ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
+ Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingAsyncTestPerf("testAsyncPingOk"));
+
+ return suite;
+ }
+
+ /**
+ * Accepts a timing controller from the test runner.
+ *
+ * @param timingController The timing controller to register mutliple timings with.
+ */
+ public void setTimingController(TimingController timingController)
+ {
+ _timingController = timingController;
+ }
+
+ /**
+ * Gets the timing controller passed in by the test runner.
+ *
+ * @return The timing controller passed in by the test runner.
+ */
+ public TimingController getTimingController()
+ {
+ return _timingController;
+ }
+
+ /**
+ * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until
+ * all replies have been received or a time out occurs before exiting this method.
+ *
+ * @param numPings The number of pings to send.
+ */
+ public void testAsyncPingOk(int numPings) throws Exception
+ {
+ _logger.debug("public void testAsyncPingOk(int numPings): called");
+
+ // Ensure that at least one ping was requeusted.
+ if (numPings == 0)
+ {
+ _logger.error("Number of pings requested was zero.");
+ }
+
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+ PingClient pingClient = perThreadSetup._pingClient;
+
+ // Advance the correlation id of messages to send, to make it unique for this run.
+ String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet());
+ _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
+ // Initialize the count and timing controller for the new correlation id.
+ PerCorrelationId perCorrelationId = new PerCorrelationId();
+ TimingController tc = getTimingController().getControllerForCurrentThread();
+ perCorrelationId._tc = tc;
+ perCorrelationId._expectedCount = numPings;
+ perCorrelationIds.put(messageCorrelationId, perCorrelationId);
+
+ // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
+ // messages.
+ pingClient.setChainedMessageListener(batchedResultsListener);
+
+ // Generate a sample message of the specified size.
+ ObjectMessage msg =
+ pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // Send the requested number of messages, and wait until they have all been received.
+ long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+ int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout);
+
+ // Check that all the replies were received and log a fail if they were not.
+ if (numReplies < numPings)
+ {
+ tc.completeTest(false, 0);
+ }
+
+ // Remove the chained message listener from the ping producer.
+ pingClient.removeChainedMessageListener();
+
+ // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
+ perCorrelationIds.remove(messageCorrelationId);
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ _logger.debug("public void threadSetUp(): called");
+
+ try
+ {
+ // Call the set up method in the super class. This creates a PingClient pinger.
+ super.threadSetUp();
+
+ // Create the chained message listener, only if it has not already been created. This is set up with the
+ // batch size property, to tell it what batch size to output results on. A synchronized block is used to
+ // ensure that only one thread creates this.
+ synchronized (this)
+ {
+ if (batchedResultsListener == null)
+ {
+ int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
+ batchedResultsListener = new BatchedResultsListener(batchSize);
+ }
+ }
+
+ // Get the set up that the super class created.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Register the chained message listener on the pinger to do its asynchronous test timings from.
+ perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the
+ * pinger, in order to receive notifications about every message received and the number remaining to be
+ * received. Whenever the number remaining crosses a batch size boundary this results listener outputs
+ * a test timing for the actual number of messages received in the current batch.
+ */
+ private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
+ {
+ /** The test results logging batch size. */
+ int _batchSize;
+
+ /**
+ * Creates a results listener on the specified batch size.
+ *
+ * @param batchSize The batch size to use.
+ */
+ public BatchedResultsListener(int batchSize)
+ {
+ _batchSize = batchSize;
+ }
+
+ /**
+ * This callback method is called from all of the pingers that this test creates. It uses the correlation id
+ * from the message to identify the timing controller for the test thread that was responsible for sending those
+ * messages.
+ *
+ * @param message The message.
+ * @param remainingCount The count of messages remaining to be received with a particular correlation id.
+ *
+ * @throws JMSException Any underlying JMSException is allowed to fall through.
+ */
+ public void onMessage(Message message, int remainingCount) throws JMSException
+ {
+ _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called");
+
+ // Check if a batch boundary has been crossed.
+ if ((remainingCount % _batchSize) == 0)
+ {
+ // Extract the correlation id from the message.
+ String correlationId = message.getJMSCorrelationID();
+
+ // Get the details for the correlation id and check that they are not null. They can become null
+ // if a test times out.
+ PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
+ if (perCorrelationId != null)
+ {
+ // Get the timing controller and expected count for this correlation id.
+ TimingController tc = perCorrelationId._tc;
+ int expected = perCorrelationId._expectedCount;
+
+ // Calculate how many messages were actually received in the last batch. This will be the batch size
+ // except where the number expected is not a multiple of the batch size and this is the first remaining
+ // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
+ // size.
+ int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize;
+
+ // Register a test result for the correlation id.
+ try
+ {
+
+ tc.completeTest(true, receivedInBatch);
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore this. It means the test runner wants to stop as soon as possible.
+ _logger.warn("Got InterruptedException.", e);
+ }
+ }
+ // Else ignore, test timed out. Should log a fail here?
+ }
+ }
+ }
+
+ /**
+ * Holds state specific to each correlation id, needed to output test results. This consists of the count of
+ * the total expected number of messages, and the timing controller for the thread sending those message ids.
+ */
+ private static class PerCorrelationId
+ {
+ public int _expectedCount;
+ public TimingController _tc;
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
new file mode 100644
index 0000000000..620ddd13f7
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
@@ -0,0 +1,317 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing
+ * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for
+ * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from
+ * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than
+ * waiting until all expected replies are received.
+ *
+ * <p/>This test does not output timings for every single ping message, as when running at high volume, writing the test
+ * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The frequency
+ * of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the default of every
+ * {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
+ *
+ * <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the individual
+ * timed ping was taken from, rather than 1 for a single message. This is so that the total throughput (messages / time)
+ * can be calculated in order to examine the relationship between throughput and latency.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><td> Responsibilities <th> Collaborations
+ * <tr><td> Send many ping messages and output timings for sampled individual pings.
+ * </table>
+ */
+public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware
+{
+ private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class);
+
+ /** Holds the name of the property to get the test results logging batch size. */
+ public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "BatchSize";
+
+ /** Holds the default test results logging batch size. */
+ public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
+
+ /** Used to hold the timing controller passed from the test runner. */
+ private TimingController _timingController;
+
+ /** Used to generate unique correlation ids for each test run. */
+ private AtomicLong corellationIdGenerator = new AtomicLong();
+
+ /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */
+ private Map<String, PerCorrelationId> perCorrelationIds =
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+
+ /** Holds the batched results listener, that does logging on batch boundaries. */
+ private BatchedResultsListener batchedResultsListener = null;
+
+ /**
+ * Creates a new asynchronous ping performance test with the specified name.
+ *
+ * @param name The test name.
+ */
+ public PingLatencyTestPerf(String name)
+ {
+ super(name);
+
+ // Sets up the test parameters with defaults.
+ ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
+ Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Latency Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingLatencyTestPerf("testPingLatency"));
+
+ return suite;
+ }
+
+ /**
+ * Accepts a timing controller from the test runner.
+ *
+ * @param timingController The timing controller to register mutliple timings with.
+ */
+ public void setTimingController(TimingController timingController)
+ {
+ _timingController = timingController;
+ }
+
+ /**
+ * Gets the timing controller passed in by the test runner.
+ *
+ * @return The timing controller passed in by the test runner.
+ */
+ public TimingController getTimingController()
+ {
+ return _timingController;
+ }
+
+ /**
+ * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until
+ * all replies have been received or a time out occurs before exiting this method.
+ *
+ * @param numPings The number of pings to send.
+ */
+ public void testPingLatency(int numPings) throws Exception
+ {
+ _logger.debug("public void testPingLatency(int numPings): called");
+
+ // Ensure that at least one ping was requeusted.
+ if (numPings == 0)
+ {
+ _logger.error("Number of pings requested was zero.");
+ }
+
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+ PingClient pingClient = perThreadSetup._pingClient;
+
+ // Advance the correlation id of messages to send, to make it unique for this run.
+ String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet());
+ _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
+ // Initialize the count and timing controller for the new correlation id.
+ PerCorrelationId perCorrelationId = new PerCorrelationId();
+ TimingController tc = getTimingController().getControllerForCurrentThread();
+ perCorrelationId._tc = tc;
+ perCorrelationId._expectedCount = numPings;
+ perCorrelationIds.put(messageCorrelationId, perCorrelationId);
+
+ // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
+ // messages.
+ pingClient.setChainedMessageListener(batchedResultsListener);
+
+ // Generate a sample message of the specified size.
+ ObjectMessage msg =
+ pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // Send the requested number of messages, and wait until they have all been received.
+ long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+ int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout);
+
+ // Check that all the replies were received and log a fail if they were not.
+ if (numReplies < numPings)
+ {
+ tc.completeTest(false, 0);
+ }
+
+ // Remove the chained message listener from the ping producer.
+ pingClient.removeChainedMessageListener();
+
+ // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
+ perCorrelationIds.remove(messageCorrelationId);
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ _logger.debug("public void threadSetUp(): called");
+
+ try
+ {
+ // Call the set up method in the super class. This creates a PingClient pinger.
+ super.threadSetUp();
+
+ // Create the chained message listener, only if it has not already been created. This is set up with the
+ // batch size property, to tell it what batch size to output results on. A synchronized block is used to
+ // ensure that only one thread creates this.
+ synchronized (this)
+ {
+ if (batchedResultsListener == null)
+ {
+ int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
+ batchedResultsListener = new BatchedResultsListener(batchSize);
+ }
+ }
+
+ // Get the set up that the super class created.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Register the chained message listener on the pinger to do its asynchronous test timings from.
+ perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can
+ * be attached to the pinger, in order to receive notifications about every message received and the number remaining
+ * to be received. Whenever the number remaining crosses a batch size boundary this results listener outputs a test
+ * timing for the actual number of messages received in the current batch.
+ */
+ private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
+ {
+ /** The test results logging batch size. */
+ int _batchSize;
+
+ /**
+ * Creates a results listener on the specified batch size.
+ *
+ * @param batchSize The batch size to use.
+ */
+ public BatchedResultsListener(int batchSize)
+ {
+ _batchSize = batchSize;
+ }
+
+ /**
+ * This callback method is called from all of the pingers that this test creates. It uses the correlation id
+ * from the message to identify the timing controller for the test thread that was responsible for sending those
+ * messages.
+ *
+ * @param message The message.
+ * @param remainingCount The count of messages remaining to be received with a particular correlation id.
+ *
+ * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through.
+ */
+ public void onMessage(Message message, int remainingCount) throws JMSException
+ {
+ _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called");
+
+ // Check if a batch boundary has been crossed.
+ if ((remainingCount % _batchSize) == 0)
+ {
+ // Extract the correlation id from the message.
+ String correlationId = message.getJMSCorrelationID();
+
+ // Get the details for the correlation id and check that they are not null. They can become null
+ // if a test times out.
+ PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
+ if (perCorrelationId != null)
+ {
+ // Get the timing controller and expected count for this correlation id.
+ TimingController tc = perCorrelationId._tc;
+ int expected = perCorrelationId._expectedCount;
+
+ // Extract the send time from the message and work out from the current time, what the ping latency was.
+ // The ping producer time stamps messages in nanoseconds.
+ long startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
+ long now = System.nanoTime();
+ long pingTime = now - startTime;
+
+ // Calculate how many messages were actually received in the last batch. This will be the batch size
+ // except where the number expected is not a multiple of the batch size and this is the first remaining
+ // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
+ // size.
+ int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize;
+
+ // Register a test result for the correlation id.
+ try
+ {
+
+ tc.completeTest(true, receivedInBatch, pingTime);
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore this. It means the test runner wants to stop as soon as possible.
+ _logger.warn("Got InterruptedException.", e);
+ }
+ }
+ // Else ignore, test timed out. Should log a fail here?
+ }
+ }
+ }
+
+ /**
+ * Holds state specific to each correlation id, needed to output test results. This consists of the count of
+ * the total expected number of messages, and the timing controller for the thread sending those message ids.
+ */
+ private static class PerCorrelationId
+ {
+ public int _expectedCount;
+ public TimingController _tc;
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
new file mode 100644
index 0000000000..c4e72f4bb6
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
@@ -0,0 +1,246 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import javax.jms.*;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.TestThreadAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times
+ * simultaneously to simluate many clients/producers/connections.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single
+ * full round trip ping. This test may be scaled up using a suitable JUnit test runner.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run,
+ * except if the connection is lost in which case an attempt to re-establish the setup is made.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
+ * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the
+ * temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
+{
+ private static Logger _logger = Logger.getLogger(PingTestPerf.class);
+
+ /** Thread local to hold the per-thread test setup fields. */
+ ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
+
+ /** Holds a property reader to extract the test parameters from. */
+ protected ParsedProperties testParameters = new ParsedProperties(System.getProperties());
+
+ public PingTestPerf(String name)
+ {
+ super(name);
+
+ // Sets up the test parameters with defaults.
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+ PingPongProducer.DEFAULT_PING_DESTINATION_NAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_VERBOSE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_RATE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PUBSUB));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME,
+ Long.toString(PingPongProducer.DEFAULT_TIMEOUT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingTestPerf("testPingOk"));
+
+ return suite;
+ }
+
+ public void testPingOk(int numPings) throws Exception
+ {
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+ if (numPings == 0)
+ {
+ _logger.error("Number of pings requested was zero.");
+ }
+
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ ObjectMessage msg =
+ perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(
+ PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(
+ PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // start the test
+ long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+ int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
+
+ // Fail the test if the timeout was exceeded.
+ if (numReplies != numPings)
+ {
+ Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
+ + numReplies);
+ }
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ _logger.debug("public void threadSetUp(): called");
+
+ try
+ {
+ PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+ // Extract the test set up paramaeters.
+ String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
+ String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
+ String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+ String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME);
+ String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
+ boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
+ boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
+ String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
+ boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME);
+ int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
+ int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
+ boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME);
+ boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
+ boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
+ boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
+ boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
+ int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
+ Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
+
+ // Extract the test set up paramaeters.
+ int destinationscount =
+ Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
+
+ // This is synchronized because there is a race condition, which causes one connection to sleep if
+ // all threads try to create connection concurrently.
+ synchronized (this)
+ {
+ // Establish a client to ping a Destination and listen the reply back from same Destination
+ perThreadSetup._pingClient = new PingClient(brokerDetails, username, password, virtualPath, destinationName,
+ selector, transacted, persistent, messageSize, verbose,
+ failAfterCommit, failBeforeCommit, failAfterSend, failBeforeSend,
+ failOnce, batchSize, destinationscount, rate, pubsub);
+ }
+ // Start the client connection
+ perThreadSetup._pingClient.getConnection().start();
+
+ // Attach the per-thread set to the thread.
+ threadSetup.set(perThreadSetup);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * Performs test fixture clean
+ */
+ public void threadTearDown()
+ {
+ _logger.debug("public void threadTearDown(): called");
+
+ try
+ {
+ // Get the per thread test fixture.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Close the pingers so that it cleans up its connection cleanly.
+ synchronized (this)
+ {
+ perThreadSetup._pingClient.close();
+ }
+
+ // Ensure the per thread fixture is reclaimed.
+ threadSetup.remove();
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was an exception during per thread tear down.");
+ }
+ }
+
+ protected static class PerThreadSetup
+ {
+ /**
+ * Holds the test ping client.
+ */
+ protected PingClient _pingClient;
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
new file mode 100644
index 0000000000..81967d332a
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
@@ -0,0 +1,259 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import javax.jms.*;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+/**
+ * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
+ * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from
+ * a producer to a conumer, then the consumer replies to the message on a temporary queue.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number
+ * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled
+ * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more
+ * information on how to do this.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads
+ * gets its own connection/producer/consumer, this is only re-established if the connection is lost.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
+ * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come
+ * back on the temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class PingPongTestPerf extends AsymptoticTestCase
+{
+ private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
+
+ /** Thread local to hold the per-thread test setup fields. */
+ ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
+
+ // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
+ // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
+ // of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
+ //private Properties testParameters = System.getProperties();
+ private ParsedProperties testParameters = new ParsedProperties(System.getProperties());
+
+ public PingPongTestPerf(String name)
+ {
+ super(name);
+
+ // Sets up the test parameters with defaults.
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_MESSAGE_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+ PingPongProducer.DEFAULT_PING_DESTINATION_NAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PERSISTENT_MODE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_TRANSACTED));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.DEFAULT_BROKER);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.DEFAULT_USERNAME);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.DEFAULT_PASSWORD);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VIRTUAL_PATH_PROPNAME, PingPongProducer.DEFAULT_VIRTUAL_PATH);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.VERBOSE_OUTPUT_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_VERBOSE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.RATE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_RATE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.IS_PUBSUB_PROPNAME,
+ Boolean.toString(PingPongProducer.DEFAULT_PUBSUB));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_TX_BATCH_SIZE));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME,
+ Long.toString(PingPongProducer.DEFAULT_TIMEOUT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME,
+ Integer.toString(PingPongProducer.DEFAULT_DESTINATION_COUNT));
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_COMMIT);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_AFTER_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+ PingPongProducer.DEFAULT_FAIL_BEFORE_SEND);
+ ParsedProperties.setSysPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.DEFAULT_FAIL_ONCE);
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping-Pong Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingPongTestPerf("testPingPongOk"));
+
+ return suite;
+ }
+
+ private static void setSystemPropertyIfNull(String propName, String propValue)
+ {
+ if (System.getProperty(propName) == null)
+ {
+ System.setProperty(propName, propValue);
+ }
+ }
+
+ public void testPingPongOk(int numPings) throws Exception
+ {
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ ObjectMessage msg =
+ perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(
+ PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(
+ PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // Send the message and wait for a reply.
+ int numReplies =
+ perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.DEFAULT_TIMEOUT);
+
+ // Fail the test if the timeout was exceeded.
+ if (numReplies != numPings)
+ {
+ Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings);
+ }
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ try
+ {
+ PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+ // Extract the test set up paramaeters.
+ String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
+ String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
+ String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+ String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_PATH_PROPNAME);
+ String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
+ boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
+ boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
+ String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
+ boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_OUTPUT_PROPNAME);
+ int messageSize = testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME);
+ int rate = testParameters.getPropertyAsInteger(PingPongProducer.RATE_PROPNAME);
+ boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.IS_PUBSUB_PROPNAME);
+ boolean failAfterCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME);
+ boolean failBeforeCommit = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME);
+ boolean failAfterSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_AFTER_SEND_PROPNAME);
+ boolean failBeforeSend = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME);
+ int batchSize = testParameters.getPropertyAsInteger(PingPongProducer.COMMIT_BATCH_SIZE_PROPNAME);
+ Boolean failOnce = testParameters.getPropertyAsBoolean(PingPongProducer.FAIL_ONCE_PROPNAME);
+
+ synchronized (this)
+ {
+ // Establish a bounce back client on the ping queue to bounce back the pings.
+ perThreadSetup._testPingBouncer = new PingPongBouncer(brokerDetails, username, password, virtualPath,
+ destinationName, persistent, transacted, selector,
+ verbose, pubsub);
+
+ // Start the connections for client and producer running.
+ perThreadSetup._testPingBouncer.getConnection().start();
+
+ // Establish a ping-pong client on the ping queue to send the pings with.
+
+ perThreadSetup._testPingProducer = new PingPongProducer(brokerDetails, username, password, virtualPath,
+ destinationName, selector, transacted, persistent,
+ messageSize, verbose, failAfterCommit,
+ failBeforeCommit, failAfterSend, failBeforeSend,
+ failOnce, batchSize, 0, rate, pubsub);
+ perThreadSetup._testPingProducer.getConnection().start();
+ }
+
+ // Attach the per-thread set to the thread.
+ threadSetup.set(perThreadSetup);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * Performs test fixture clean
+ */
+ public void threadTearDown()
+ {
+ _logger.debug("public void threadTearDown(): called");
+
+ try
+ {
+ // Get the per thread test fixture.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Close the pingers so that it cleans up its connection cleanly.
+ synchronized (this)
+ {
+ perThreadSetup._testPingProducer.close();
+ //perThreadSetup._testPingBouncer.close();
+ }
+
+ // Ensure the per thread fixture is reclaimed.
+ threadSetup.remove();
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was an exception during per thread tear down.");
+ }
+ }
+
+ protected static class PerThreadSetup
+ {
+ /**
+ * Holds the test ping-pong producer.
+ */
+ private PingPongProducer _testPingProducer;
+
+ /**
+ * Holds the test ping client.
+ */
+ private PingPongBouncer _testPingBouncer;
+ }
+}