summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-08-23 12:10:42 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-08-23 12:10:42 +0000
commit17754a0a51b6d103f3db84af629faae9a398d963 (patch)
tree40bc01e419e5fd151d0ff02ade976f1bad66a36e /java
parent7204cc0539a8cef2c0519d533c6a3c3874913b7b (diff)
downloadqpid-python-17754a0a51b6d103f3db84af629faae9a398d963.tar.gz
updated from M2 branch
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@568952 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java25
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java (renamed from java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java)20
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java25
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java38
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java (renamed from java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java)649
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java2
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java (renamed from java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java)392
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java10
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java478
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java (renamed from java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java)502
10 files changed, 1136 insertions, 1005 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
index eeb4021f34..64ccb719b6 100644
--- a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
+++ b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
@@ -1,18 +1,21 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.client.message;
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index 6c7f22c19a..06081e6ebf 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -20,15 +20,6 @@
*/
package org.apache.qpid.ping;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-
import junit.framework.Test;
import junit.framework.TestSuite;
@@ -38,7 +29,14 @@ import org.apache.qpid.requestreply.PingPongProducer;
import uk.co.thebadgerset.junit.extensions.TimingController;
import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
-import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
/**
* PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller
@@ -239,7 +237,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
*
* @throws JMSException Any underlying JMSException is allowed to fall through.
*/
- public void onMessage(Message message, int remainingCount) throws JMSException
+ public void onMessage(Message message, int remainingCount, long latency) throws JMSException
{
// Check if a batch boundary has been crossed.
if ((remainingCount % _batchSize) == 0)
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
index e3b0249ed3..ac12436951 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.ping;
-import java.util.List;
-import java.util.Properties;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
import javax.jms.Destination;
-import org.apache.qpid.requestreply.PingPongProducer;
+import java.util.List;
+import java.util.Properties;
/**
* PingClient is a {@link PingPongProducer} that does not need a {@link org.apache.qpid.requestreply.PingPongBouncer}
@@ -36,7 +38,7 @@ import org.apache.qpid.requestreply.PingPongProducer;
* are created they will all be run in parallel and be active in sending and consuming pings at the same time.
* If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear
* pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of
- * active ping clients. The {@link #getConsumersPerTopic()} method is used to supply this multiplier under these
+ * active ping clients. The {@link #getConsumersPerDestination()} method is used to supply this multiplier under these
* conditions.
*
* <p/><table id="crc"><caption>CRC Card</caption>
@@ -47,6 +49,9 @@ import org.apache.qpid.requestreply.PingPongProducer;
*/
public class PingClient extends PingPongProducer
{
+ /** Used for debugging. */
+ private final Logger log = Logger.getLogger(PingClient.class);
+
/** Used to count the number of ping clients created. */
private static int _pingClientCount;
@@ -82,15 +87,21 @@ public class PingClient extends PingPongProducer
*
* @return The scaling up of the number of expected pub/sub pings.
*/
- public int getConsumersPerTopic()
+ public int getConsumersPerDestination()
{
+ log.debug("public int getConsumersPerDestination(): called");
+
if (_isUnique)
{
- return 1;
+ log.debug("1 consumer per destination.");
+
+ return _noOfConsumers;
}
else
{
- return _pingClientCount;
+ log.debug(_pingClientCount + " consumers per destination.");
+
+ return _pingClientCount * _noOfConsumers;
}
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
index 2765986868..db6f384914 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
@@ -20,18 +20,6 @@
*/
package org.apache.qpid.ping;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
import org.apache.log4j.Logger;
import org.apache.qpid.requestreply.PingPongProducer;
@@ -40,6 +28,15 @@ import org.apache.qpid.util.CommandLineParser;
import uk.co.thebadgerset.junit.extensions.util.MathUtils;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import javax.jms.*;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and
* receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop
@@ -167,7 +164,8 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
try
{
// Create a ping producer overriding its defaults with all options passed on the command line.
- Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
+ Properties options =
+ CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
PingDurableClient pingProducer = new PingDurableClient(options);
// Create a shutdown hook to terminate the ping-pong producer.
@@ -219,7 +217,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
// Establish the connection and the message producer.
establishConnection(true, false);
- getConnection().start();
+ _connection.start();
Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
@@ -329,8 +327,8 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
_queueSharedID = new AtomicInteger();
establishConnection(false, true);
- _consumer.setMessageListener(null);
- _connection.start();
+ _consumer[0].setMessageListener(null);
+ _consumerConnection[0].start();
// Try to receive all of the pings that were successfully sent.
int messagesReceived = 0;
@@ -339,7 +337,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
while (!endCondition)
{
// Message received = _consumer.receiveNoWait();
- Message received = _consumer.receive(TIME_OUT);
+ Message received = _consumer[0].receive(TIME_OUT);
log.debug("received = " + received);
if (received != null)
@@ -362,11 +360,11 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
}
// Ensure messages received are committed.
- if (_transacted)
+ if (_consTransacted)
{
try
{
- _consumerSession.commit();
+ _consumerSession[0].commit();
System.out.println("Committed for all messages received.");
}
catch (JMSException e)
@@ -375,7 +373,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList
System.out.println("Error during commit.");
try
{
- _consumerSession.rollback();
+ _consumerSession[0].rollback();
System.out.println("Rolled back on all messages received.");
}
catch (JMSException e2)
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
index af612d5430..55414664da 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java
@@ -1,335 +1,314 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.message.AMQMessage;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.requestreply.PingPongProducer;
-
-import uk.co.thebadgerset.junit.extensions.TimingController;
-import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
-import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
-
-/**
- * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing
- * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for
- * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from
- * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than
- * waiting until all expected replies are received.
- *
- * <p/>This test does not output timings for every single ping message, as when running at high volume, writing the test
- * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The
- * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the
- * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
- *
- * <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the
- * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput
- * (messages / time) can be calculated in order to examine the relationship between throughput and latency.
- *
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><td> Responsibilities <th> Collaborations <tr><td> Send many ping
- * messages and output timings for sampled individual pings. </table>
- */
-public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware
-{
- private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class);
-
- /** Holds the name of the property to get the test results logging batch size. */
- public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize";
-
- /** Holds the default test results logging batch size. */
- public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
-
- /** Used to hold the timing controller passed from the test runner. */
- private TimingController _timingController;
-
- /** Used to generate unique correlation ids for each test run. */
- private AtomicLong corellationIdGenerator = new AtomicLong();
-
- /**
- * Holds test specifics by correlation id. This consists of the expected number of messages and the timing
- * controler.
- */
- private Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
-
- /** Holds the batched results listener, that does logging on batch boundaries. */
- private BatchedResultsListener batchedResultsListener = null;
-
- /**
- * Creates a new asynchronous ping performance test with the specified name.
- *
- * @param name The test name.
- */
- public PingLatencyTestPerf(String name)
- {
- super(name);
-
- // Sets up the test parameters with defaults.
- ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
- Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
- }
-
- /** Compile all the tests into a test suite. */
- public static Test suite()
- {
- // Build a new test suite
- TestSuite suite = new TestSuite("Ping Latency Tests");
-
- // Run performance tests in read committed mode.
- suite.addTest(new PingLatencyTestPerf("testPingLatency"));
-
- return suite;
- }
-
- /**
- * Accepts a timing controller from the test runner.
- *
- * @param timingController The timing controller to register mutliple timings with.
- */
- public void setTimingController(TimingController timingController)
- {
- _timingController = timingController;
- }
-
- /**
- * Gets the timing controller passed in by the test runner.
- *
- * @return The timing controller passed in by the test runner.
- */
- public TimingController getTimingController()
- {
- return _timingController;
- }
-
- /**
- * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all
- * replies have been received or a time out occurs before exiting this method.
- *
- * @param numPings The number of pings to send.
- */
- public void testPingLatency(int numPings) throws Exception
- {
- _logger.debug("public void testPingLatency(int numPings): called");
-
- // Ensure that at least one ping was requeusted.
- if (numPings == 0)
- {
- _logger.error("Number of pings requested was zero.");
- }
-
- // Get the per thread test setup to run the test through.
- PerThreadSetup perThreadSetup = threadSetup.get();
- PingClient pingClient = perThreadSetup._pingClient;
-
- // Advance the correlation id of messages to send, to make it unique for this run.
- String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet());
- _logger.debug("messageCorrelationId = " + messageCorrelationId);
-
- // Initialize the count and timing controller for the new correlation id.
- PerCorrelationId perCorrelationId = new PerCorrelationId();
- TimingController tc = getTimingController().getControllerForCurrentThread();
- perCorrelationId._tc = tc;
- perCorrelationId._expectedCount = numPings;
- perCorrelationIds.put(messageCorrelationId, perCorrelationId);
-
- // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
- // messages.
- pingClient.setChainedMessageListener(batchedResultsListener);
-
- // Generate a sample message of the specified size.
- Message msg =
- pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
-
- // Send the requested number of messages, and wait until they have all been received.
- long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
-
- // Check that all the replies were received and log a fail if they were not.
- if (numReplies < numPings)
- {
- tc.completeTest(false, 0);
- }
-
- // Remove the chained message listener from the ping producer.
- pingClient.removeChainedMessageListener();
-
- // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
- perCorrelationIds.remove(messageCorrelationId);
- }
-
- /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */
- public void threadSetUp()
- {
- _logger.debug("public void threadSetUp(): called");
-
- try
- {
- // Call the set up method in the super class. This creates a PingClient pinger.
- super.threadSetUp();
-
- // Create the chained message listener, only if it has not already been created. This is set up with the
- // batch size property, to tell it what batch size to output results on. A synchronized block is used to
- // ensure that only one thread creates this.
- synchronized (this)
- {
- if (batchedResultsListener == null)
- {
- int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
- batchedResultsListener = new BatchedResultsListener(batchSize);
- }
- }
-
- // Get the set up that the super class created.
- PerThreadSetup perThreadSetup = threadSetup.get();
-
- // Register the chained message listener on the pinger to do its asynchronous test timings from.
- perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
- }
- catch (Exception e)
- {
- _logger.warn("There was an exception during per thread setup.", e);
- }
- }
-
- /**
- * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can
- * be attached to the pinger, in order to receive notifications about every message received and the number
- * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener
- * outputs a test timing for the actual number of messages received in the current batch.
- */
- private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
- {
- /** The test results logging batch size. */
- int _batchSize;
- private boolean _strictAMQP;
-
- /**
- * Creates a results listener on the specified batch size.
- *
- * @param batchSize The batch size to use.
- */
- public BatchedResultsListener(int batchSize)
- {
- _batchSize = batchSize;
- _strictAMQP =
- Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP,
- AMQSession.STRICT_AMQP_DEFAULT));
- }
-
- /**
- * This callback method is called from all of the pingers that this test creates. It uses the correlation id
- * from the message to identify the timing controller for the test thread that was responsible for sending those
- * messages.
- *
- * @param message The message.
- * @param remainingCount The count of messages remaining to be received with a particular correlation id.
- *
- * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through.
- */
- public void onMessage(Message message, int remainingCount) throws JMSException
- {
- _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called");
-
- // Check if a batch boundary has been crossed.
- if ((remainingCount % _batchSize) == 0)
- {
- // Extract the correlation id from the message.
- String correlationId = message.getJMSCorrelationID();
-
- // Get the details for the correlation id and check that they are not null. They can become null
- // if a test times out.
- PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
- if (perCorrelationId != null)
- {
- // Get the timing controller and expected count for this correlation id.
- TimingController tc = perCorrelationId._tc;
- int expected = perCorrelationId._expectedCount;
-
- // Extract the send time from the message and work out from the current time, what the ping latency was.
- // The ping producer time stamps messages in nanoseconds.
- long startTime;
-
- if (_strictAMQP)
- {
- Long value =
- ((AMQMessage) message).getTimestampProperty(new AMQShortString(
- PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME));
-
- startTime = ((value == null) ? 0L : value);
- }
- else
- {
- startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
- }
-
- long now = System.nanoTime();
- long pingTime = now - startTime;
-
- // Calculate how many messages were actually received in the last batch. This will be the batch size
- // except where the number expected is not a multiple of the batch size and this is the first remaining
- // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
- // size.
- int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize;
-
- // Register a test result for the correlation id.
- try
- {
-
- tc.completeTest(true, receivedInBatch, pingTime);
- }
- catch (InterruptedException e)
- {
- // Ignore this. It means the test runner wants to stop as soon as possible.
- _logger.warn("Got InterruptedException.", e);
- }
- }
- // Else ignore, test timed out. Should log a fail here?
- }
- }
- }
-
- /**
- * Holds state specific to each correlation id, needed to output test results. This consists of the count of the
- * total expected number of messages, and the timing controller for the thread sending those message ids.
- */
- private static class PerCorrelationId
- {
- public int _expectedCount;
- public TimingController _tc;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.AMQMessage;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.TimingController;
+import uk.co.thebadgerset.junit.extensions.TimingControllerAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing
+ * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for
+ * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from
+ * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than
+ * waiting until all expected replies are received.
+ *
+ * <p/>This test does not output timings for every single ping message, as when running at high volume, writing the test
+ * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The
+ * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the
+ * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
+ *
+ * <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the
+ * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput
+ * (messages / time) can be calculated in order to examine the relationship between throughput and latency.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption> <tr><td> Responsibilities <th> Collaborations <tr><td> Send many ping
+ * messages and output timings for sampled individual pings. </table>
+ */
+public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware
+{
+ private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class);
+
+ /** Holds the name of the property to get the test results logging batch size. */
+ public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize";
+
+ /** Holds the default test results logging batch size. */
+ public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
+
+ /** Used to hold the timing controller passed from the test runner. */
+ private TimingController _timingController;
+
+ /** Used to generate unique correlation ids for each test run. */
+ private AtomicLong corellationIdGenerator = new AtomicLong();
+
+ /**
+ * Holds test specifics by correlation id. This consists of the expected number of messages and the timing
+ * controler.
+ */
+ private Map<String, PerCorrelationId> perCorrelationIds =
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+
+ /** Holds the batched results listener, that does logging on batch boundaries. */
+ private BatchedResultsListener batchedResultsListener = null;
+
+ /**
+ * Creates a new asynchronous ping performance test with the specified name.
+ *
+ * @param name The test name.
+ */
+ public PingLatencyTestPerf(String name)
+ {
+ super(name);
+
+ // Sets up the test parameters with defaults.
+ ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
+ Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
+ }
+
+ /** Compile all the tests into a test suite. */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Latency Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingLatencyTestPerf("testPingLatency"));
+
+ return suite;
+ }
+
+ /**
+ * Accepts a timing controller from the test runner.
+ *
+ * @param timingController The timing controller to register mutliple timings with.
+ */
+ public void setTimingController(TimingController timingController)
+ {
+ _timingController = timingController;
+ }
+
+ /**
+ * Gets the timing controller passed in by the test runner.
+ *
+ * @return The timing controller passed in by the test runner.
+ */
+ public TimingController getTimingController()
+ {
+ return _timingController;
+ }
+
+ /**
+ * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all
+ * replies have been received or a time out occurs before exiting this method.
+ *
+ * @param numPings The number of pings to send.
+ */
+ public void testPingLatency(int numPings) throws Exception
+ {
+ _logger.debug("public void testPingLatency(int numPings): called");
+
+ // Ensure that at least one ping was requeusted.
+ if (numPings == 0)
+ {
+ _logger.error("Number of pings requested was zero.");
+ }
+
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+ PingClient pingClient = perThreadSetup._pingClient;
+
+ // Advance the correlation id of messages to send, to make it unique for this run.
+ String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet());
+ _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
+ // Initialize the count and timing controller for the new correlation id.
+ PerCorrelationId perCorrelationId = new PerCorrelationId();
+ TimingController tc = getTimingController().getControllerForCurrentThread();
+ perCorrelationId._tc = tc;
+ perCorrelationId._expectedCount = numPings;
+ perCorrelationIds.put(messageCorrelationId, perCorrelationId);
+
+ // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
+ // messages.
+ pingClient.setChainedMessageListener(batchedResultsListener);
+
+ // Generate a sample message of the specified size.
+ Message msg =
+ pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // Send the requested number of messages, and wait until they have all been received.
+ long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+ int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
+
+ // Check that all the replies were received and log a fail if they were not.
+ if (numReplies < numPings)
+ {
+ tc.completeTest(false, 0);
+ }
+
+ // Remove the chained message listener from the ping producer.
+ pingClient.removeChainedMessageListener();
+
+ // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
+ perCorrelationIds.remove(messageCorrelationId);
+ }
+
+ /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */
+ public void threadSetUp()
+ {
+ _logger.debug("public void threadSetUp(): called");
+
+ try
+ {
+ // Call the set up method in the super class. This creates a PingClient pinger.
+ super.threadSetUp();
+
+ // Create the chained message listener, only if it has not already been created. This is set up with the
+ // batch size property, to tell it what batch size to output results on. A synchronized block is used to
+ // ensure that only one thread creates this.
+ synchronized (this)
+ {
+ if (batchedResultsListener == null)
+ {
+ int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
+ batchedResultsListener = new BatchedResultsListener(batchSize);
+ }
+ }
+
+ // Get the set up that the super class created.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Register the chained message listener on the pinger to do its asynchronous test timings from.
+ perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can
+ * be attached to the pinger, in order to receive notifications about every message received and the number
+ * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener
+ * outputs a test timing for the actual number of messages received in the current batch.
+ */
+ private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
+ {
+ /** The test results logging batch size. */
+ int _batchSize;
+ private boolean _strictAMQP;
+
+ /**
+ * Creates a results listener on the specified batch size.
+ *
+ * @param batchSize The batch size to use.
+ */
+ public BatchedResultsListener(int batchSize)
+ {
+ _batchSize = batchSize;
+ _strictAMQP =
+ Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP,
+ AMQSession.STRICT_AMQP_DEFAULT));
+ }
+
+ /**
+ * This callback method is called from all of the pingers that this test creates. It uses the correlation id
+ * from the message to identify the timing controller for the test thread that was responsible for sending those
+ * messages.
+ *
+ * @param message The message.
+ * @param remainingCount The count of messages remaining to be received with a particular correlation id.
+ *
+ * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through.
+ */
+ public void onMessage(Message message, int remainingCount, long latency) throws JMSException
+ {
+ _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called");
+
+ // Check if a batch boundary has been crossed.
+ if ((remainingCount % _batchSize) == 0)
+ {
+ // Extract the correlation id from the message.
+ String correlationId = message.getJMSCorrelationID();
+
+ // Get the details for the correlation id and check that they are not null. They can become null
+ // if a test times out.
+ PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
+ if (perCorrelationId != null)
+ {
+ // Get the timing controller and expected count for this correlation id.
+ TimingController tc = perCorrelationId._tc;
+ int expected = perCorrelationId._expectedCount;
+
+ // Calculate how many messages were actually received in the last batch. This will be the batch size
+ // except where the number expected is not a multiple of the batch size and this is the first remaining
+ // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
+ // size.
+ int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize;
+
+ // Register a test result for the correlation id.
+ try
+ {
+ tc.completeTest(true, receivedInBatch, latency);
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore this. It means the test runner wants to stop as soon as possible.
+ _logger.warn("Got InterruptedException.", e);
+ }
+ }
+ // Else ignore, test timed out. Should log a fail here?
+ }
+ }
+ }
+
+ /**
+ * Holds state specific to each correlation id, needed to output test results. This consists of the count of the
+ * total expected number of messages, and the timing controller for the thread sending those message ids.
+ */
+ private static class PerCorrelationId
+ {
+ public int _expectedCount;
+ public TimingController _tc;
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
index bbe337ca0a..2879f0c322 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
@@ -57,7 +57,7 @@ public class PingSendOnlyClient extends PingDurableClient
try
{
// Create a ping producer overriding its defaults with all options passed on the command line.
- Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
PingSendOnlyClient pingProducer = new PingSendOnlyClient(options);
// Create a shutdown hook to terminate the ping-pong producer.
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
index 46333db844..375007584b 100644
--- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
@@ -1,196 +1,196 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import junit.framework.Assert;
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.requestreply.PingPongProducer;
-
-import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
-import uk.co.thebadgerset.junit.extensions.TestThreadAware;
-import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
-import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
-
-import javax.jms.*;
-
-/**
- * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times
- * simultaneously to simluate many clients/producers/connections.
- *
- * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single
- * full round trip ping. This test may be scaled up using a suitable JUnit test runner.
- *
- * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
- * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run,
- * except if the connection is lost in which case an attempt to re-establish the setup is made.
- *
- * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
- * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the
- * temporary queue.
- *
- * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * </table>
- */
-public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
-{
- private static Logger _logger = Logger.getLogger(PingTestPerf.class);
-
- /** Thread local to hold the per-thread test setup fields. */
- ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
-
- /** Holds a property reader to extract the test parameters from. */
- protected ParsedProperties testParameters =
- TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
-
- public PingTestPerf(String name)
- {
- super(name);
-
- _logger.debug("testParameters = " + testParameters);
- }
-
- /**
- * Compile all the tests into a test suite.
- * @return The test method testPingOk.
- */
- public static Test suite()
- {
- // Build a new test suite
- TestSuite suite = new TestSuite("Ping Performance Tests");
-
- // Run performance tests in read committed mode.
- suite.addTest(new PingTestPerf("testPingOk"));
-
- return suite;
- }
-
- public void testPingOk(int numPings) throws Exception
- {
- if (numPings == 0)
- {
- Assert.fail("Number of pings requested was zero.");
- }
-
- // Get the per thread test setup to run the test through.
- PerThreadSetup perThreadSetup = threadSetup.get();
-
- if (perThreadSetup == null)
- {
- Assert.fail("Could not get per thread test setup, it was null.");
- }
-
- // Generate a sample message. This message is already time stamped and has its reply-to destination set.
- Message msg =
- perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
-
- // start the test
- long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
-
- // Fail the test if the timeout was exceeded.
- if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings))
- {
- Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
- + numReplies);
- }
- }
-
- /**
- * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
- */
- public void threadSetUp()
- {
- _logger.debug("public void threadSetUp(): called");
-
- try
- {
- PerThreadSetup perThreadSetup = new PerThreadSetup();
-
- // This is synchronized because there is a race condition, which causes one connection to sleep if
- // all threads try to create connection concurrently.
- synchronized (this)
- {
- // Establish a client to ping a Destination and listen the reply back from same Destination
- perThreadSetup._pingClient = new PingClient(testParameters);
- perThreadSetup._pingClient.establishConnection(true, true);
- }
- // Start the client connection
- perThreadSetup._pingClient.getConnection().start();
-
- // Attach the per-thread set to the thread.
- threadSetup.set(perThreadSetup);
- }
- catch (Exception e)
- {
- _logger.warn("There was an exception during per thread setup.", e);
- }
- }
-
- /**
- * Performs test fixture clean
- */
- public void threadTearDown()
- {
- _logger.debug("public void threadTearDown(): called");
-
- try
- {
- // Get the per thread test fixture.
- PerThreadSetup perThreadSetup = threadSetup.get();
-
- // Close the pingers so that it cleans up its connection cleanly.
- synchronized (this)
- {
- if ((perThreadSetup != null) && (perThreadSetup._pingClient != null))
- {
- perThreadSetup._pingClient.close();
- }
- }
- }
- catch (JMSException e)
- {
- _logger.warn("There was an exception during per thread tear down.");
- }
- finally
- {
- // Ensure the per thread fixture is reclaimed.
- threadSetup.remove();
- }
- }
-
- protected static class PerThreadSetup
- {
- /**
- * Holds the test ping client.
- */
- protected PingClient _pingClient;
- protected String _correlationId;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.ping;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.requestreply.PingPongProducer;
+
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.TestThreadAware;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+import javax.jms.*;
+
+/**
+ * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times
+ * simultaneously to simluate many clients/producers/connections.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single
+ * full round trip ping. This test may be scaled up using a suitable JUnit test runner.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run,
+ * except if the connection is lost in which case an attempt to re-establish the setup is made.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
+ * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the
+ * temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ */
+public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
+{
+ private static Logger _logger = Logger.getLogger(PingTestPerf.class);
+
+ /** Thread local to hold the per-thread test setup fields. */
+ ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
+
+ /** Holds a property reader to extract the test parameters from. */
+ protected ParsedProperties testParameters =
+ TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
+
+ public PingTestPerf(String name)
+ {
+ super(name);
+
+ _logger.debug("testParameters = " + testParameters);
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ * @return The test method testPingOk.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingTestPerf("testPingOk"));
+
+ return suite;
+ }
+
+ public void testPingOk(int numPings) throws Exception
+ {
+ if (numPings == 0)
+ {
+ Assert.fail("Number of pings requested was zero.");
+ }
+
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ if (perThreadSetup == null)
+ {
+ Assert.fail("Could not get per thread test setup, it was null.");
+ }
+
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ Message msg =
+ perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // start the test
+ long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
+ int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
+
+ // Fail the test if the timeout was exceeded.
+ if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings))
+ {
+ Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
+ + numReplies);
+ }
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ _logger.debug("public void threadSetUp(): called");
+
+ try
+ {
+ PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+ // This is synchronized because there is a race condition, which causes one connection to sleep if
+ // all threads try to create connection concurrently.
+ synchronized (this)
+ {
+ // Establish a client to ping a Destination and listen the reply back from same Destination
+ perThreadSetup._pingClient = new PingClient(testParameters);
+ perThreadSetup._pingClient.establishConnection(true, true);
+ }
+ // Start the client connection
+ perThreadSetup._pingClient.start();
+
+ // Attach the per-thread set to the thread.
+ threadSetup.set(perThreadSetup);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * Performs test fixture clean
+ */
+ public void threadTearDown()
+ {
+ _logger.debug("public void threadTearDown(): called");
+
+ try
+ {
+ // Get the per thread test fixture.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Close the pingers so that it cleans up its connection cleanly.
+ synchronized (this)
+ {
+ if ((perThreadSetup != null) && (perThreadSetup._pingClient != null))
+ {
+ perThreadSetup._pingClient.close();
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was an exception during per thread tear down.");
+ }
+ finally
+ {
+ // Ensure the per thread fixture is reclaimed.
+ threadSetup.remove();
+ }
+ }
+
+ protected static class PerThreadSetup
+ {
+ /**
+ * Holds the test ping client.
+ */
+ protected PingClient _pingClient;
+ protected String _correlationId;
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
index 78ab7c4c73..82b36bf233 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
@@ -92,10 +92,10 @@ public class PingPongBouncer implements MessageListener
/** The producer for sending replies with. */
private MessageProducer _replyProducer;
- /** The consumer session. */
+ /** The consumer controlSession. */
private Session _consumerSession;
- /** The producer session. */
+ /** The producer controlSession. */
private Session _producerSession;
/** Holds the connection to the broker. */
@@ -149,7 +149,7 @@ public class PingPongBouncer implements MessageListener
// Set up the failover notifier.
getConnection().setConnectionListener(new FailoverNotifier());
- // Create a session to listen for messages on and one to send replies on, transactional depending on the
+ // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the
// command line option.
_consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
_producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
@@ -323,8 +323,8 @@ public class PingPongBouncer implements MessageListener
}
/**
- * Convenience method to commit the transaction on the specified session. If the session to commit on is not
- * a transactional session, this method does nothing.
+ * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not
+ * a transactional controlSession, this method does nothing.
*
* <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
* commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index d5d1c304e9..bd34fd8f20 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -20,21 +20,8 @@
*/
package org.apache.qpid.requestreply;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.*;
-
import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.*;
@@ -51,6 +38,18 @@ import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
import uk.co.thebadgerset.junit.extensions.Throttle;
import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import javax.jms.*;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may
* either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens
@@ -86,10 +85,11 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
* <tr><td> username <td> guest <td> The username to access the broker with.
* <tr><td> password <td> guest <td> The password to access the broker with.
* <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
- * <tr><td> destinationCount <td> 1 <td> The number of receivers listening to the pings.
+ * <tr><td> destinationCount <td> 1 <td> The number of destinations to send pings to.
+ * <tr><td> numConsumers <td> 1 <td> The number of consumers on each destination.
* <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
* <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
- * <tr><td> uniqueDests <td> true <td> Whether each receiver only listens to one ping destination or all.
+ * <tr><td> uniqueDests <td> true <td> Whether each receivers only listens to one ping destination or all.
* <tr><td> durableDests <td> false <td> Whether or not durable destinations are used.
* <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
* 0 - SESSION_TRANSACTED
@@ -98,6 +98,10 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
* 3 - DUPS_OK_ACKNOWLEDGE
* 257 - NO_ACKNOWLEDGE
* 258 - PRE_ACKNOWLEDGE
+ * <tr><td> consTransacted <td> false <td> Whether or not consumers use transactions. Defaults to the same value
+ * as the 'transacted' option if not seperately defined.
+ * <tr><td> consAckMode <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same
+ * value as 'ackMode' if not seperately defined.
* <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received.
* Limits the volume of messages currently buffered on the client
* or broker. Can help scale test clients by limiting amount of buffered
@@ -131,8 +135,9 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
* Instead make mina use a bounded blocking buffer, or other form of back pressure, to stop data being written
* faster than it can be sent.
*/
-public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
+public class PingPongProducer implements Runnable /*, MessageListener*/, ExceptionListener
{
+ /** Used for debugging. */
private static final Logger log = Logger.getLogger(PingPongProducer.class);
/** Holds the name of the property to get the test message size from. */
@@ -159,6 +164,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** Holds the transactional mode to use for the test. */
public static final boolean TRANSACTED_DEFAULT = false;
+ public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
+ public static final boolean CONSUMER_TRANSACTED_DEFAULT = false;
+
/** Holds the name of the property to get the test broker url from. */
public static final String BROKER_PROPNAME = "broker";
@@ -237,12 +245,18 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** Holds the default message selector. */
public static final String SELECTOR_DEFAULT = "";
- /** Holds the name of the proeprty to get the destination count from. */
+ /** Holds the name of the property to get the destination count from. */
public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
/** Defines the default number of destinations to ping. */
public static final int DESTINATION_COUNT_DEFAULT = 1;
+ /** Holds the name of the property to get the number of consumers per destination from. */
+ public static final String NUM_CONSUMERS_PROPNAME = "numConsumers";
+
+ /** Defines the default number consumers per destination. */
+ public static final int NUM_CONSUMERS_DEFAULT = 1;
+
/** Holds the name of the property to get the waiting timeout for response messages. */
public static final String TIMEOUT_PROPNAME = "timeout";
@@ -270,6 +284,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
+ public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+
public static final String MAX_PENDING_PROPNAME = "maxPending";
public static final int MAX_PENDING_DEFAULT = 0;
@@ -297,8 +314,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
+ defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT);
defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
+ defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT);
defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
@@ -311,6 +330,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT);
defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
+ defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT);
defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
@@ -323,12 +343,15 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
protected String _destinationName;
protected String _selector;
protected boolean _transacted;
+ protected boolean _consTransacted;
/** Determines whether this producer sends persistent messages. */
protected boolean _persistent;
/** Holds the acknowledgement mode used for sending and receiving messages. */
- private int _ackMode;
+ protected int _ackMode;
+
+ protected int _consAckMode;
/** Determines what size of messages this producer sends. */
protected int _messageSize;
@@ -363,7 +386,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** Holds the number of sends that should be performed in every transaction when using transactions. */
protected int _txBatchSize;
+ /** Holds the number of destinations to ping. */
protected int _noOfDestinations;
+
+ /** Holds the number of consumers per destination. */
+ protected int _noOfConsumers;
+
+ /** Holds the maximum send rate in herz. */
protected int _rate;
/**
@@ -373,7 +402,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
protected int _maxPendingSize;
/**
- * Holds a monitor which is used to synchronize sender and receiver threads, where the sender has elected
+ * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
* to wait until the number of unreceived message is reduced before continuing to send.
*/
protected Object _sendPauseMonitor = new Object();
@@ -397,10 +426,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/** Holds the connection to the broker. */
protected Connection _connection;
- /** Holds the session on which ping replies are received. */
- protected Session _consumerSession;
+ /** Holds the consumer connections. */
+ protected Connection[] _consumerConnection;
- /** Holds the producer session, needed to create ping messages. */
+ /** Holds the controlSession on which ping replies are received. */
+ protected Session[] _consumerSession;
+
+ /** Holds the producer controlSession, needed to create ping messages. */
protected Session _producerSession;
/** Holds the destination where the response messages will arrive. */
@@ -434,18 +466,15 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
protected MessageProducer _producer;
/** Holds the message consumer to receive the ping replies through. */
- protected MessageConsumer _consumer;
-
- /**
- * Holds the number of consumers that will be attached to each topic. Each pings will result in a reply from each of the
- * attached clients
- */
- static int _consumersPerTopic = 1;
+ protected MessageConsumer[] _consumer;
/** The prompt to display when asking the user to kill the broker for failover testing. */
private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
private String _clientID;
+ /** Keeps count of the total messages sent purely for debugging purposes. */
+ private static AtomicInteger numSent = new AtomicInteger();
+
/**
* Creates a ping producer with the specified parameters, of which there are many. See the class level comments
* for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
@@ -457,7 +486,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public PingPongProducer(Properties overrides) throws Exception
{
- log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
+ // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
// Create a set of parsed properties from the defaults overriden by the passed in values.
ParsedProperties properties = new ParsedProperties(defaults);
@@ -471,6 +500,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
_selector = properties.getProperty(SELECTOR_PROPNAME);
_transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
+ _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
_persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
_messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
_verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
@@ -481,11 +511,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME);
_txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
_noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
+ _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME);
_rate = properties.getPropertyAsInteger(RATE_PROPNAME);
_isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
_isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
_isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
_ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ _consAckMode = properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
_maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
// Check that one or more destinations were specified.
@@ -516,7 +548,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public void establishConnection(boolean producer, boolean consumer) throws Exception
{
- log.debug("public void establishConnection(): called");
+ // log.debug("public void establishConnection(): called");
// Generate a unique identifying name for this client, based on it ip address and the current time.
InetAddress address = InetAddress.getLocalHost();
@@ -526,11 +558,17 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
createConnection(_clientID);
// Create transactional or non-transactional sessions, based on the command line arguments.
- _producerSession = (Session) getConnection().createSession(_transacted, _ackMode);
- _consumerSession = (Session) getConnection().createSession(_transacted, _ackMode);
+ _producerSession = (Session) _connection.createSession(_transacted, _ackMode);
+
+ _consumerSession = new Session[_noOfConsumers];
+
+ for (int i = 0; i < _noOfConsumers; i++)
+ {
+ _consumerSession[i] = (Session) _consumerConnection[i].createSession(_consTransacted, _consAckMode);
+ }
// Create the destinations to send pings to and receive replies from.
- _replyDestination = _consumerSession.createTemporaryQueue();
+ _replyDestination = _consumerSession[0].createTemporaryQueue();
createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
// Create the message producer only if instructed to.
@@ -557,7 +595,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
protected void createConnection(String clientID) throws AMQException, URLSyntaxException
{
+ // log.debug("protected void createConnection(String clientID = " + clientID + "): called");
+
+ // log.debug("Creating a connection for the message producer.");
+
_connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
+
+ // log.debug("Creating " + _noOfConsumers + " connections for the consumers.");
+
+ _consumerConnection = new Connection[_noOfConsumers];
+
+ for (int i = 0; i < _noOfConsumers; i++)
+ {
+ _consumerConnection[i] = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath);
+ }
}
/**
@@ -570,20 +621,21 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
try
{
- Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
+ Properties options =
+ CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
// Create a ping producer overriding its defaults with all options passed on the command line.
PingPongProducer pingProducer = new PingPongProducer(options);
pingProducer.establishConnection(true, true);
// Start the ping producers dispatch thread running.
- pingProducer.getConnection().start();
+ pingProducer._connection.start();
// Create a shutdown hook to terminate the ping-pong producer.
Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
// Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- pingProducer.getConnection().setExceptionListener(pingProducer);
+ pingProducer._connection.setExceptionListener(pingProducer);
// Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
Thread pingThread = new Thread(pingProducer);
@@ -624,12 +676,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public List<Destination> getReplyDestinations()
{
- log.debug("public List<Destination> getReplyDestinations(): called");
+ // log.debug("public List<Destination> getReplyDestinations(): called");
List<Destination> replyDestinations = new ArrayList<Destination>();
replyDestinations.add(_replyDestination);
- log.debug("replyDestinations = " + replyDestinations);
+ // log.debug("replyDestinations = " + replyDestinations);
return replyDestinations;
}
@@ -642,12 +694,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public void createProducer() throws JMSException
{
- log.debug("public void createProducer(): called");
+ // log.debug("public void createProducer(): called");
_producer = (MessageProducer) _producerSession.createProducer(null);
_producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
+ // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
}
/**
@@ -665,14 +717,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
boolean durable) throws JMSException, AMQException
{
- log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
+ /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
+ selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
- + durable + "): called");
+ + durable + "): called");*/
_pingDestinations = new ArrayList<Destination>();
// Create the desired number of ping destinations and consumers for them.
- log.debug("Creating " + noOfDestinations + " destinations to ping.");
+ // log.debug("Creating " + noOfDestinations + " destinations to ping.");
for (int i = 0; i < noOfDestinations; i++)
{
@@ -683,12 +735,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
if (unique)
{
- log.debug("Creating unique destinations.");
+ // log.debug("Creating unique destinations.");
id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
}
else
{
- log.debug("Creating shared destinations.");
+ // log.debug("Creating shared destinations.");
id = "_" + _queueSharedID.incrementAndGet();
}
@@ -698,14 +750,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
if (!durable)
{
destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
- log.debug("Created non-durable topic " + destination);
+ // log.debug("Created non-durable topic " + destination);
}
else
{
destination =
AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
_clientID, (AMQConnection) _connection);
- log.debug("Created durable topic " + destination);
+ // log.debug("Created durable topic " + destination);
}
}
// Otherwise this is a p2p pinger, in which case create queues.
@@ -719,7 +771,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null,
ExchangeDefaults.DIRECT_EXCHANGE_NAME);
- log.debug("Created queue " + destination);
+ // log.debug("Created queue " + destination);
}
// Keep the destination.
@@ -737,20 +789,36 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
{
- log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
- + ", String selector = " + selector + "): called");
+ /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ + ", String selector = " + selector + "): called");*/
- log.debug("Creating " + destinations.size() + " reply consumers.");
+ // log.debug("There are " + destinations.size() + " destinations.");
+ // log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
+ // log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));
for (Destination destination : destinations)
{
- // Create a consumer for the destination and set this pinger to listen to its messages.
- _consumer =
- _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
- selector);
- _consumer.setMessageListener(this);
+ _consumer = new MessageConsumer[_noOfConsumers];
+
+ for (int i = 0; i < _noOfConsumers; i++)
+ {
+ // Create a consumer for the destination and set this pinger to listen to its messages.
+ _consumer[i] =
+ _consumerSession[i].createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
+ selector);
+
+ final int consumerNo = i;
- log.debug("Set this to listen to replies sent to destination: " + destination);
+ _consumer[i].setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ onMessageWithConsumerNo(message, consumerNo);
+ }
+ });
+
+ // log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
+ }
}
}
@@ -761,97 +829,123 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*
* @param message The received message.
*/
- public void onMessage(Message message)
+ public void onMessageWithConsumerNo(Message message, int consumerNo)
{
- // log.debug("public void onMessage(Message message): called");
-
+ // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called");
try
{
+ long now = System.nanoTime();
+ long timestamp = getTimestamp(message);
+ long pingTime = now - timestamp;
+
+ // NDC.push("cons" + consumerNo);
+
// Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
// log.debug("correlationID = " + correlationID);
- // Countdown on the traffic light if there is one for the matching correlation id.
- PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
+ int num = message.getIntProperty("MSG_NUM");
+ // log.info("Message " + num + " received.");
+
+ boolean isRedelivered = message.getJMSRedelivered();
+ // log.debug("isRedelivered = " + isRedelivered);
- if (perCorrelationId != null)
+ if (!isRedelivered)
{
- CountDownLatch trafficLight = perCorrelationId.trafficLight;
+ // Countdown on the traffic light if there is one for the matching correlation id.
+ PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
- // Restart the timeout timer on every message.
- perCorrelationId.timeOutStart = System.nanoTime();
+ if (perCorrelationId != null)
+ {
+ CountDownLatch trafficLight = perCorrelationId.trafficLight;
- // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+ // Restart the timeout timer on every message.
+ perCorrelationId.timeOutStart = System.nanoTime();
- // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
- // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
- // ensures that each thread will get a unique value for the remaining messages.
- long trueCount = -1;
- long remainingCount = -1;
+ // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
- synchronized (trafficLight)
- {
- trafficLight.countDown();
+ // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
+ // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
+ // ensures that each thread will get a unique value for the remaining messages.
+ long trueCount = -1;
+ long remainingCount = -1;
- trueCount = trafficLight.getCount();
- remainingCount = trueCount - 1;
+ synchronized (trafficLight)
+ {
+ trafficLight.countDown();
- // Decrement the count of sent but not yet received messages.
- int unreceived = _unreceived.decrementAndGet();
- int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
+ trueCount = trafficLight.getCount();
+ remainingCount = trueCount - 1;
- // Release a waiting sender if there is one.
- synchronized (_sendPauseMonitor)
- {
- if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
- // && (_sendPauseBarrier.getNumberWaiting() == 1))
+ // Decrement the count of sent but not yet received messages.
+ int unreceived = _unreceived.decrementAndGet();
+ int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
+
+ // Release a waiting sender if there is one.
+ synchronized (_sendPauseMonitor)
{
- log.debug("unreceived size estimate under limit = " + unreceivedSize);
-
- // Wait on the send pause barrier for the limit to be re-established.
- /*try
- {*/
- // _sendPauseBarrier.await();
- _sendPauseMonitor.notify();
- /*}
- catch (InterruptedException e)
+ if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize))
+ // && (_sendPauseBarrier.getNumberWaiting() == 1))
{
- throw new RuntimeException(e);
+ // log.debug("unreceived size estimate under limit = " + unreceivedSize);
+
+ // Wait on the send pause barrier for the limit to be re-established.
+ /*try
+ {*/
+ // _sendPauseBarrier.await();
+ _sendPauseMonitor.notify();
+ /*}
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (BrokenBarrierException e)
+ {
+ throw new RuntimeException(e);
+ }*/
}
- catch (BrokenBarrierException e)
- {
- throw new RuntimeException(e);
- }*/
}
- }
- // log.debug("remainingCount = " + remainingCount);
- // log.debug("trueCount = " + trueCount);
+ // NDC.push("/rem" + remainingCount);
- // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
- // blocked, even on the last message.
- if ((remainingCount % _txBatchSize) == 0)
- {
- commitTx(_consumerSession);
- }
+ // log.debug("remainingCount = " + remainingCount);
+ // log.debug("trueCount = " + trueCount);
- // Forward the message and remaining count to any interested chained message listener.
- if (_chainedMessageListener != null)
- {
- _chainedMessageListener.onMessage(message, (int) remainingCount);
- }
+ // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
+ // blocked, even on the last message.
+ // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on
+ // each batch boundary. For pub/sub each consumer gets every message so no division is done.
+ long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
+ // log.debug("commitCount = " + commitCount);
- // Check if this is the last message, in which case release any waiting producers. This is done
- // after the transaction has been committed and any listeners notified.
- if (trueCount == 1)
- {
- trafficLight.countDown();
+ if ((commitCount % _txBatchSize) == 0)
+ {
+ // log.debug("Trying commit for consumer " + consumerNo + ".");
+ commitTx(_consumerSession[consumerNo]);
+ }
+
+ // Forward the message and remaining count to any interested chained message listener.
+ if (_chainedMessageListener != null)
+ {
+ _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime);
+ }
+
+ // Check if this is the last message, in which case release any waiting producers. This is done
+ // after the transaction has been committed and any listeners notified.
+ if (trueCount == 1)
+ {
+ trafficLight.countDown();
+ }
}
}
+ else
+ {
+ log.warn("Got unexpected message with correlationId: " + correlationID);
+ }
}
else
{
- log.warn("Got unexpected message with correlationId: " + correlationID);
+ log.warn("Got redelivered message, ignoring.");
}
// Print out ping times for every message in verbose mode only.
@@ -870,8 +964,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
log.warn("There was a JMSException: " + e.getMessage(), e);
}
-
- // log.debug("public void onMessage(Message message): ending");
+ finally
+ {
+ // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending");
+ // NDC.clear();
+ }
}
/**
@@ -893,8 +990,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
throws JMSException, InterruptedException
{
- log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
- + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
// Generate a unique correlation id to put on the messages before sending them, if one was not specified.
if (messageCorrelationId == null)
@@ -904,6 +1001,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
try
{
+ // NDC.push("prod");
+
// Create a count down latch to count the number of replies with. This is created before the messages are
// sent so that the replies cannot be received before the count down is created.
// One is added to this, so that the last reply becomes a special case. The special case is that the
@@ -935,16 +1034,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
allMessagesReceived = numReplies == getExpectedNumPings(numPings);
- log.debug("numReplies = " + numReplies);
- log.debug("allMessagesReceived = " + allMessagesReceived);
+ // log.debug("numReplies = " + numReplies);
+ // log.debug("allMessagesReceived = " + allMessagesReceived);
// Recheck the timeout condition.
long now = System.nanoTime();
long lastMessageReceievedAt = perCorrelationId.timeOutStart;
timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);
- log.debug("now = " + now);
- log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
+ // log.debug("now = " + now);
+ // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
}
while (!timedOut && !allMessagesReceived);
@@ -957,9 +1056,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
log.info("Got all replies on id, " + messageCorrelationId);
}
- commitTx(_consumerSession);
+ // commitTx(_consumerSession);
- log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
+ // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
return numReplies;
}
@@ -967,6 +1066,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// so will be a memory leak if this is not done.
finally
{
+ // NDC.pop();
perCorrelationIds.remove(messageCorrelationId);
}
}
@@ -982,8 +1082,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
{
- log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
- + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
if (message == null)
{
@@ -1067,7 +1167,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
if (unreceivedSize > _maxPendingSize)
{
- log.debug("unreceived size estimate over limit = " + unreceivedSize);
+ // log.debug("unreceived size estimate over limit = " + unreceivedSize);
// Wait on the send pause barrier for the limit to be re-established.
try
@@ -1096,11 +1196,19 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Send the message either to its round robin destination, or its default destination.
if (destination == null)
{
+ int num = numSent.incrementAndGet();
+ message.setIntProperty("MSG_NUM", num);
+ setTimestamp(message);
_producer.send(message);
+ // log.info("Message " + num + " sent.");
}
else
{
+ int num = numSent.incrementAndGet();
+ message.setIntProperty("MSG_NUM", num);
+ setTimestamp(message);
_producer.send(destination, message);
+ // log.info("Message " + num + " sent.");
}
// Increase the unreceived size, this may actually happen aftern the message is recevied.
@@ -1118,6 +1226,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
if (((i + 1) % _txBatchSize) == 0)
{
+ // log.debug("Trying commit on producer session.");
committed = commitTx(_producerSession);
}
@@ -1135,7 +1244,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
// Generate a sample message and time stamp it.
Message msg = getTestMessage(_replyDestination, _messageSize, _persistent);
- setTimestamp(msg);
+ // setTimestamp(msg);
// Send the message and wait for a reply.
pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
@@ -1143,12 +1252,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
catch (JMSException e)
{
_publish = false;
- log.debug("There was a JMSException: " + e.getMessage(), e);
+ // log.debug("There was a JMSException: " + e.getMessage(), e);
}
catch (InterruptedException e)
{
_publish = false;
- log.debug("There was an interruption: " + e.getMessage(), e);
+ // log.debug("There was an interruption: " + e.getMessage(), e);
}
}
@@ -1186,7 +1295,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Timestamp the message in nanoseconds.
- setTimestamp(msg);
+ // setTimestamp(msg);
return msg;
}
@@ -1227,6 +1336,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_publish = false;
}
+ public void start() throws JMSException
+ {
+ _connection.start();
+
+ for (int i = 0; i < _noOfConsumers; i++)
+ {
+ _consumerConnection[i].start();
+ }
+ }
+
/** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
public void run()
{
@@ -1245,7 +1364,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public void onException(JMSException e)
{
- log.debug("public void onException(JMSException e = " + e + "): called", e);
+ // log.debug("public void onException(JMSException e = " + e + "): called", e);
_publish = false;
}
@@ -1267,30 +1386,29 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Gets the underlying connection that this ping client is running on.
- *
- * @return The underlying connection that this ping client is running on.
- */
- public Connection getConnection()
- {
- return _connection;
- }
-
- /**
* Closes the pingers connection.
*
* @throws JMSException All JMSException are allowed to fall through.
*/
public void close() throws JMSException
{
- log.debug("public void close(): called");
+ // log.debug("public void close(): called");
try
{
if (_connection != null)
{
_connection.close();
- log.debug("Close connection.");
+ // log.debug("Close connection.");
+ }
+
+ for (int i = 0; i < _noOfConsumers; i++)
+ {
+ if (_consumerConnection[i] != null)
+ {
+ _consumerConnection[i].close();
+ // log.debug("Closed consumer connection.");
+ }
}
}
finally
@@ -1298,6 +1416,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_connection = null;
_producerSession = null;
_consumerSession = null;
+ _consumerConnection = null;
_producer = null;
_consumer = null;
_pingDestinations = null;
@@ -1306,8 +1425,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Convenience method to commit the transaction on the specified session. If the session to commit on is not a
- * transactional session, this method does nothing (unless the failover after send flag is set).
+ * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a
+ * transactional controlSession, this method does nothing (unless the failover after send flag is set).
*
* <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is
* applied. This flag applies whether the pinger is transactional or not.
@@ -1316,9 +1435,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the
* commit is applied. These flags will only apply if using a transactional pinger.
*
- * @param session The session to commit
+ * @param session The controlSession to commit
*
- * @return <tt>true</tt> if the session was committed, <tt>false</tt> if it was not.
+ * @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not.
*
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
*
@@ -1347,6 +1466,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
if (session.getTransacted())
{
+ // log.debug("Session is transacted.");
+
try
{
if (_failBeforeCommit)
@@ -1360,10 +1481,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
waitForUser(KILL_BROKER_PROMPT);
}
- // long l = System.nanoTime();
+ long start = System.nanoTime();
session.commit();
committed = true;
- // log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms");
+ // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");
if (_failAfterCommit)
{
@@ -1376,26 +1497,26 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
waitForUser(KILL_BROKER_PROMPT);
}
- // log.trace("Session Commited.");
+ // log.debug("Session Commited.");
}
catch (JMSException e)
{
- log.debug("JMSException on commit:" + e.getMessage(), e);
+ // log.debug("JMSException on commit:" + e.getMessage(), e);
// Warn that the bounce back client is not available.
if (e.getLinkedException() instanceof AMQNoConsumersException)
{
- log.debug("No consumers on queue.");
+ // log.debug("No consumers on queue.");
}
try
{
session.rollback();
- log.debug("Message rolled back.");
+ // log.debug("Message rolled back.");
}
catch (JMSException jmse)
{
- log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
+ // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
// Both commit and rollback failed. Throw the rollback exception.
throw jmse;
@@ -1428,23 +1549,34 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * This value will be changed by PingClient to represent the number of clients connected to each topic.
+ * Gets the number of consumers that are listening to each destination in the test.
*
* @return int The number of consumers subscribing to each topic.
*/
- public int getConsumersPerTopic()
+ public int getConsumersPerDestination()
{
- return _consumersPerTopic;
+ return _noOfConsumers;
}
+ /**
+ * Calculates how many pings are expected to be received for the given number sent.
+ *
+ * @param numpings The number of pings that will be sent.
+ *
+ * @return The number that should be received, for the test to pass.
+ */
public int getExpectedNumPings(int numpings)
{
- return numpings * getConsumersPerTopic();
+ // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called");
+
+ // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers.");
+
+ return numpings * (_isPubSub ? getConsumersPerDestination() : 1);
}
/**
* Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link
- * PingPongProducer#onMessage} method is called, the chained listener set through the {@link
+ * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link
* PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of
* messages with that correlation id.
*
@@ -1454,7 +1586,17 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
public static interface ChainedMessageListener
{
- public void onMessage(Message message, int remainingCount) throws JMSException;
+ /**
+ * Notifies interested listeners about message arrival and important test stats, the number of messages
+ * remaining in the test, and the messages send timestamp.
+ *
+ * @param message The newly arrived message.
+ * @param remainingCount The number of messages left to complete the test.
+ * @param latency The nanosecond latency of the message.
+ *
+ * @throws JMSException Any JMS exceptions is allowed to fall through.
+ */
+ public void onMessage(Message message, int remainingCount, long latency) throws JMSException;
}
/**
diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
index f4a6dc6554..f289fe0db2 100644
--- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
@@ -1,251 +1,251 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.requestreply;
-
-import javax.jms.*;
-
-import junit.framework.Assert;
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.apache.log4j.Logger;
-
-import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
-import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
-import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
-
-/**
- * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
- * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from
- * a producer to a conumer, then the consumer replies to the message on a temporary queue.
- *
- * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number
- * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled
- * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more
- * information on how to do this.
- *
- * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
- * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads
- * gets its own connection/producer/consumer, this is only re-established if the connection is lost.
- *
- * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
- * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come
- * back on the temporary queue.
- *
- * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * </table>
- */
-public class PingPongTestPerf extends AsymptoticTestCase
-{
- private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
-
- /** Thread local to hold the per-thread test setup fields. */
- ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
-
- // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
- // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
- // of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
- // private Properties testParameters = System.getProperties();
- private ParsedProperties testParameters =
- TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
-
- public PingPongTestPerf(String name)
- {
- super(name);
-
- _logger.debug(testParameters);
-
- // Sets up the test parameters with defaults.
- /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT));
- testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
- PingPongProducer.PING_QUEUE_NAME_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
- Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
- Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME,
- Boolean.toString(PingPongProducer.VERBOSE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME,
- Boolean.toString(PingPongProducer.PUBSUB_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
- Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
- PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
- PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
- PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
- PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME,
- Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
- Integer.toString(PingPongProducer.ACK_MODE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
- PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
- }
-
- /**
- * Compile all the tests into a test suite.
- */
- public static Test suite()
- {
- // Build a new test suite
- TestSuite suite = new TestSuite("Ping-Pong Performance Tests");
-
- // Run performance tests in read committed mode.
- suite.addTest(new PingPongTestPerf("testPingPongOk"));
-
- return suite;
- }
-
- private static void setSystemPropertyIfNull(String propName, String propValue)
- {
- if (System.getProperty(propName) == null)
- {
- System.setProperty(propName, propValue);
- }
- }
-
- public void testPingPongOk(int numPings) throws Exception
- {
- // Get the per thread test setup to run the test through.
- PerThreadSetup perThreadSetup = threadSetup.get();
-
- // Generate a sample message. This message is already time stamped and has its reply-to destination set.
- Message msg =
- perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
-
- // Send the message and wait for a reply.
- int numReplies =
- perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null);
-
- // Fail the test if the timeout was exceeded.
- if (numReplies != numPings)
- {
- Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings);
- }
- }
-
- /**
- * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
- */
- public void threadSetUp()
- {
- try
- {
- PerThreadSetup perThreadSetup = new PerThreadSetup();
-
- // Extract the test set up paramaeters.
- String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
- String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
- String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
- String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
- String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
- boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
- boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
- String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
- boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
- boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME);
-
- synchronized (this)
- {
- // Establish a bounce back client on the ping queue to bounce back the pings.
- perThreadSetup._testPingBouncer =
- new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent,
- transacted, selector, verbose, pubsub);
-
- // Start the connections for client and producer running.
- perThreadSetup._testPingBouncer.getConnection().start();
-
- // Establish a ping-pong client on the ping queue to send the pings and receive replies with.
- perThreadSetup._testPingProducer = new PingPongProducer(testParameters);
- perThreadSetup._testPingProducer.establishConnection(true, true);
- perThreadSetup._testPingProducer.getConnection().start();
- }
-
- // Attach the per-thread set to the thread.
- threadSetup.set(perThreadSetup);
- }
- catch (Exception e)
- {
- _logger.warn("There was an exception during per thread setup.", e);
- }
- }
-
- /**
- * Performs test fixture clean
- */
- public void threadTearDown()
- {
- _logger.debug("public void threadTearDown(): called");
-
- try
- {
- // Get the per thread test fixture.
- PerThreadSetup perThreadSetup = threadSetup.get();
-
- // Close the pingers so that it cleans up its connection cleanly.
- synchronized (this)
- {
- perThreadSetup._testPingProducer.close();
- // perThreadSetup._testPingBouncer.close();
- }
-
- // Ensure the per thread fixture is reclaimed.
- threadSetup.remove();
- }
- catch (JMSException e)
- {
- _logger.warn("There was an exception during per thread tear down.");
- }
- }
-
- protected static class PerThreadSetup
- {
- /**
- * Holds the test ping-pong producer.
- */
- private PingPongProducer _testPingProducer;
-
- /**
- * Holds the test ping client.
- */
- private PingPongBouncer _testPingBouncer;
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import junit.framework.Assert;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+
+import org.apache.log4j.Logger;
+
+import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase;
+import uk.co.thebadgerset.junit.extensions.util.ParsedProperties;
+import uk.co.thebadgerset.junit.extensions.util.TestContextProperties;
+
+import javax.jms.*;
+
+/**
+ * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
+ * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from
+ * a producer to a conumer, then the consumer replies to the message on a temporary queue.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number
+ * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled
+ * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more
+ * information on how to do this.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads
+ * gets its own connection/producer/consumer, this is only re-established if the connection is lost.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
+ * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come
+ * back on the temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ */
+public class PingPongTestPerf extends AsymptoticTestCase
+{
+ private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
+
+ /** Thread local to hold the per-thread test setup fields. */
+ ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
+
+ // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
+ // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
+ // of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
+ // private Properties testParameters = System.getProperties();
+ private ParsedProperties testParameters =
+ TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
+
+ public PingPongTestPerf(String name)
+ {
+ super(name);
+
+ _logger.debug(testParameters);
+
+ // Sets up the test parameters with defaults.
+ /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT));
+ testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
+ PingPongProducer.PING_QUEUE_NAME_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
+ Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
+ Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME,
+ Boolean.toString(PingPongProducer.VERBOSE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME,
+ Boolean.toString(PingPongProducer.PUBSUB_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
+ Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
+ Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
+ PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
+ PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
+ PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
+ PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
+ testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME,
+ Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
+ Integer.toString(PingPongProducer.ACK_MODE_DEFAULT));
+ testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
+ PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
+ }
+
+ /**
+ * Compile all the tests into a test suite.
+ */
+ public static Test suite()
+ {
+ // Build a new test suite
+ TestSuite suite = new TestSuite("Ping-Pong Performance Tests");
+
+ // Run performance tests in read committed mode.
+ suite.addTest(new PingPongTestPerf("testPingPongOk"));
+
+ return suite;
+ }
+
+ private static void setSystemPropertyIfNull(String propName, String propValue)
+ {
+ if (System.getProperty(propName) == null)
+ {
+ System.setProperty(propName, propValue);
+ }
+ }
+
+ public void testPingPongOk(int numPings) throws Exception
+ {
+ // Get the per thread test setup to run the test through.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ Message msg =
+ perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
+
+ // Send the message and wait for a reply.
+ int numReplies =
+ perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null);
+
+ // Fail the test if the timeout was exceeded.
+ if (numReplies != numPings)
+ {
+ Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings);
+ }
+ }
+
+ /**
+ * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
+ */
+ public void threadSetUp()
+ {
+ try
+ {
+ PerThreadSetup perThreadSetup = new PerThreadSetup();
+
+ // Extract the test set up paramaeters.
+ String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
+ String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
+ String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+ String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
+ String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
+ boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
+ boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
+ String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
+ boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
+ boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME);
+
+ synchronized (this)
+ {
+ // Establish a bounce back client on the ping queue to bounce back the pings.
+ perThreadSetup._testPingBouncer =
+ new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent,
+ transacted, selector, verbose, pubsub);
+
+ // Start the connections for client and producer running.
+ perThreadSetup._testPingBouncer.getConnection().start();
+
+ // Establish a ping-pong client on the ping queue to send the pings and receive replies with.
+ perThreadSetup._testPingProducer = new PingPongProducer(testParameters);
+ perThreadSetup._testPingProducer.establishConnection(true, true);
+ perThreadSetup._testPingProducer.start();
+ }
+
+ // Attach the per-thread set to the thread.
+ threadSetup.set(perThreadSetup);
+ }
+ catch (Exception e)
+ {
+ _logger.warn("There was an exception during per thread setup.", e);
+ }
+ }
+
+ /**
+ * Performs test fixture clean
+ */
+ public void threadTearDown()
+ {
+ _logger.debug("public void threadTearDown(): called");
+
+ try
+ {
+ // Get the per thread test fixture.
+ PerThreadSetup perThreadSetup = threadSetup.get();
+
+ // Close the pingers so that it cleans up its connection cleanly.
+ synchronized (this)
+ {
+ perThreadSetup._testPingProducer.close();
+ // perThreadSetup._testPingBouncer.close();
+ }
+
+ // Ensure the per thread fixture is reclaimed.
+ threadSetup.remove();
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was an exception during per thread tear down.");
+ }
+ }
+
+ protected static class PerThreadSetup
+ {
+ /**
+ * Holds the test ping-pong producer.
+ */
+ private PingPongProducer _testPingProducer;
+
+ /**
+ * Holds the test ping client.
+ */
+ private PingPongBouncer _testPingBouncer;
+ }
+}