From 17754a0a51b6d103f3db84af629faae9a398d963 Mon Sep 17 00:00:00 2001 From: Arnaud Simon Date: Thu, 23 Aug 2007 12:10:42 +0000 Subject: updated from M2 branch git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@568952 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/message/TestMessageFactory.java | 25 +- .../org/apache/qpid/ping/PingAsyncTestPerf.java | 292 +++++++++++++ .../main/java/org/apache/qpid/ping/PingClient.java | 25 +- .../org/apache/qpid/ping/PingDurableClient.java | 38 +- .../org/apache/qpid/ping/PingLatencyTestPerf.java | 314 ++++++++++++++ .../org/apache/qpid/ping/PingSendOnlyClient.java | 2 +- .../java/org/apache/qpid/ping/PingTestPerf.java | 196 +++++++++ .../apache/qpid/requestreply/PingPongBouncer.java | 10 +- .../apache/qpid/requestreply/PingPongProducer.java | 478 +++++++++++++-------- .../apache/qpid/requestreply/PingPongTestPerf.java | 251 +++++++++++ .../org/apache/qpid/ping/PingAsyncTestPerf.java | 294 ------------- .../org/apache/qpid/ping/PingLatencyTestPerf.java | 335 --------------- .../java/org/apache/qpid/ping/PingTestPerf.java | 196 --------- .../apache/qpid/requestreply/PingPongTestPerf.java | 251 ----------- 14 files changed, 1419 insertions(+), 1288 deletions(-) create mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java create mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java create mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java create mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java delete mode 100644 java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java delete mode 100644 java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java delete mode 100644 java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java delete mode 100644 java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java (limited to 'java') diff --git a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java index eeb4021f34..64ccb719b6 100644 --- a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java +++ b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java @@ -1,18 +1,21 @@ /* * - * Copyright (c) 2006 The Apache Software Foundation + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. * */ package org.apache.qpid.client.message; diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java new file mode 100644 index 0000000000..06081e6ebf --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -0,0 +1,292 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; + +import javax.jms.JMSException; +import javax.jms.Message; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller + * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test + * that it extends because it can output timings as replies are received, rather than waiting until all expected replies + * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending + * and recieving clients working asynchronously. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
Send many ping messages and output timings asynchronously on batches received. + *
+ */ +public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; + + /** Holds the default test results logging batch size. */ + public static final int TEST_RESULTS_BATCH_SIZE_DEFAULT = 1000; + + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; + + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + + /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ + private Map perCorrelationIds = + Collections.synchronizedMap(new HashMap()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ + public PingAsyncTestPerf(String name) + { + super(name); + + // Sets up the test parameters with defaults. + testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT)); + } + + /** + * Compile all the tests into a test suite. + * @return The test suite to run. Should only contain testAsyncPingOk method. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until + * all replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + * @throws Exception pass all errors out to the test harness + */ + public void testAsyncPingOk(int numPings) throws Exception + { + // _logger.debug("public void testAsyncPingOk(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + fail("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet()); + // String messageCorrelationId = perThreadSetup._correlationId; + // _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); + perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < perCorrelationId._expectedCount) + { + perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount); + } + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(perThreadSetup._correlationId); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the + * pinger, in order to receive notifications about every message received and the number remaining to be + * received. Whenever the number remaining crosses a batch size boundary this results listener outputs + * a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException + { + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + /*_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + + "): called on batch boundary for message id: " + correlationId + " with thread id: " + + Thread.currentThread().getId());*/ + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + tc.completeTest(true, receivedInBatch); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of + * the total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java index e3b0249ed3..ac12436951 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -20,12 +20,14 @@ */ package org.apache.qpid.ping; -import java.util.List; -import java.util.Properties; +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; import javax.jms.Destination; -import org.apache.qpid.requestreply.PingPongProducer; +import java.util.List; +import java.util.Properties; /** * PingClient is a {@link PingPongProducer} that does not need a {@link org.apache.qpid.requestreply.PingPongBouncer} @@ -36,7 +38,7 @@ import org.apache.qpid.requestreply.PingPongProducer; * are created they will all be run in parallel and be active in sending and consuming pings at the same time. * If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear * pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of - * active ping clients. The {@link #getConsumersPerTopic()} method is used to supply this multiplier under these + * active ping clients. The {@link #getConsumersPerDestination()} method is used to supply this multiplier under these * conditions. * *

@@ -47,6 +49,9 @@ import org.apache.qpid.requestreply.PingPongProducer; */ public class PingClient extends PingPongProducer { + /** Used for debugging. */ + private final Logger log = Logger.getLogger(PingClient.class); + /** Used to count the number of ping clients created. */ private static int _pingClientCount; @@ -82,15 +87,21 @@ public class PingClient extends PingPongProducer * * @return The scaling up of the number of expected pub/sub pings. */ - public int getConsumersPerTopic() + public int getConsumersPerDestination() { + log.debug("public int getConsumersPerDestination(): called"); + if (_isUnique) { - return 1; + log.debug("1 consumer per destination."); + + return _noOfConsumers; } else { - return _pingClientCount; + log.debug(_pingClientCount + " consumers per destination."); + + return _pingClientCount * _noOfConsumers; } } } diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java index 2765986868..db6f384914 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -20,18 +20,6 @@ */ package org.apache.qpid.ping; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; - import org.apache.log4j.Logger; import org.apache.qpid.requestreply.PingPongProducer; @@ -40,6 +28,15 @@ import org.apache.qpid.util.CommandLineParser; import uk.co.thebadgerset.junit.extensions.util.MathUtils; import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import javax.jms.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + /** * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop @@ -167,7 +164,8 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList try { // Create a ping producer overriding its defaults with all options passed on the command line. - Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {})); + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); PingDurableClient pingProducer = new PingDurableClient(options); // Create a shutdown hook to terminate the ping-pong producer. @@ -219,7 +217,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList // Establish the connection and the message producer. establishConnection(true, false); - getConnection().start(); + _connection.start(); Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); @@ -329,8 +327,8 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList _queueSharedID = new AtomicInteger(); establishConnection(false, true); - _consumer.setMessageListener(null); - _connection.start(); + _consumer[0].setMessageListener(null); + _consumerConnection[0].start(); // Try to receive all of the pings that were successfully sent. int messagesReceived = 0; @@ -339,7 +337,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList while (!endCondition) { // Message received = _consumer.receiveNoWait(); - Message received = _consumer.receive(TIME_OUT); + Message received = _consumer[0].receive(TIME_OUT); log.debug("received = " + received); if (received != null) @@ -362,11 +360,11 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList } // Ensure messages received are committed. - if (_transacted) + if (_consTransacted) { try { - _consumerSession.commit(); + _consumerSession[0].commit(); System.out.println("Committed for all messages received."); } catch (JMSException e) @@ -375,7 +373,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList System.out.println("Error during commit."); try { - _consumerSession.rollback(); + _consumerSession[0].rollback(); System.out.println("Rolled back on all messages received."); } catch (JMSException e2) diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java new file mode 100644 index 0000000000..55414664da --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java @@ -0,0 +1,314 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.message.AMQMessage; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.requestreply.PingPongProducer; + +import uk.co.thebadgerset.junit.extensions.TimingController; +import uk.co.thebadgerset.junit.extensions.TimingControllerAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing + * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for + * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from + * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than + * waiting until all expected replies are received. + * + *

This test does not output timings for every single ping message, as when running at high volume, writing the test + * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The + * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the + * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. + * + *

The size parameter logged for each individual ping is set to the size of the batch of messages that the + * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput + * (messages / time) can be calculated in order to examine the relationship between throughput and latency. + * + *

CRC Card
CRC Card
Responsibilities Collaborations
Send many ping + * messages and output timings for sampled individual pings.
+ */ +public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; + + /** Holds the default test results logging batch size. */ + public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; + + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; + + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + + /** + * Holds test specifics by correlation id. This consists of the expected number of messages and the timing + * controler. + */ + private Map perCorrelationIds = + Collections.synchronizedMap(new HashMap()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ + public PingLatencyTestPerf(String name) + { + super(name); + + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); + } + + /** Compile all the tests into a test suite. */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Latency Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingLatencyTestPerf("testPingLatency")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all + * replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + */ + public void testPingLatency(int numPings) throws Exception + { + _logger.debug("public void testPingLatency(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); + _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = numPings; + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these + // messages. + pingClient.setChainedMessageListener(batchedResultsListener); + + // Generate a sample message of the specified size. + Message msg = + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < numPings) + { + tc.completeTest(false, 0); + } + + // Remove the chained message listener from the ping producer. + pingClient.removeChainedMessageListener(); + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(messageCorrelationId); + } + + /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can + * be attached to the pinger, in order to receive notifications about every message received and the number + * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener + * outputs a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + private boolean _strictAMQP; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + _strictAMQP = + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, + AMQSession.STRICT_AMQP_DEFAULT)); + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException + { + _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); + + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + tc.completeTest(true, receivedInBatch, latency); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of the + * total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java index bbe337ca0a..2879f0c322 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java @@ -57,7 +57,7 @@ public class PingSendOnlyClient extends PingDurableClient try { // Create a ping producer overriding its defaults with all options passed on the command line. - Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {})); + Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); PingSendOnlyClient pingProducer = new PingSendOnlyClient(options); // Create a shutdown hook to terminate the ping-pong producer. diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java new file mode 100644 index 0000000000..375007584b --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java @@ -0,0 +1,196 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; +import uk.co.thebadgerset.junit.extensions.TestThreadAware; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +import javax.jms.*; + +/** + * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times + * simultaneously to simluate many clients/producers/connections. + * + *

A single run of the test using the default JUnit test runner will result in the sending and timing of a single + * full round trip ping. This test may be scaled up using a suitable JUnit test runner. + * + *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a + * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, + * except if the connection is lost in which case an attempt to re-establish the setup is made. + * + *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that + * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the + * temporary queue. + * + *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ */ +public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware +{ + private static Logger _logger = Logger.getLogger(PingTestPerf.class); + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal threadSetup = new ThreadLocal(); + + /** Holds a property reader to extract the test parameters from. */ + protected ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); + + public PingTestPerf(String name) + { + super(name); + + _logger.debug("testParameters = " + testParameters); + } + + /** + * Compile all the tests into a test suite. + * @return The test method testPingOk. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingTestPerf("testPingOk")); + + return suite; + } + + public void testPingOk(int numPings) throws Exception + { + if (numPings == 0) + { + Assert.fail("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + + if (perThreadSetup == null) + { + Assert.fail("Could not get per thread test setup, it was null."); + } + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + Message msg = + perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // start the test + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null); + + // Fail the test if the timeout was exceeded. + if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) + { + Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + + numReplies); + } + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // This is synchronized because there is a race condition, which causes one connection to sleep if + // all threads try to create connection concurrently. + synchronized (this) + { + // Establish a client to ping a Destination and listen the reply back from same Destination + perThreadSetup._pingClient = new PingClient(testParameters); + perThreadSetup._pingClient.establishConnection(true, true); + } + // Start the client connection + perThreadSetup._pingClient.start(); + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * Performs test fixture clean + */ + public void threadTearDown() + { + _logger.debug("public void threadTearDown(): called"); + + try + { + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + if ((perThreadSetup != null) && (perThreadSetup._pingClient != null)) + { + perThreadSetup._pingClient.close(); + } + } + } + catch (JMSException e) + { + _logger.warn("There was an exception during per thread tear down."); + } + finally + { + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); + } + } + + protected static class PerThreadSetup + { + /** + * Holds the test ping client. + */ + protected PingClient _pingClient; + protected String _correlationId; + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java index 78ab7c4c73..82b36bf233 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java @@ -92,10 +92,10 @@ public class PingPongBouncer implements MessageListener /** The producer for sending replies with. */ private MessageProducer _replyProducer; - /** The consumer session. */ + /** The consumer controlSession. */ private Session _consumerSession; - /** The producer session. */ + /** The producer controlSession. */ private Session _producerSession; /** Holds the connection to the broker. */ @@ -149,7 +149,7 @@ public class PingPongBouncer implements MessageListener // Set up the failover notifier. getConnection().setConnectionListener(new FailoverNotifier()); - // Create a session to listen for messages on and one to send replies on, transactional depending on the + // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the // command line option. _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); @@ -323,8 +323,8 @@ public class PingPongBouncer implements MessageListener } /** - * Convenience method to commit the transaction on the specified session. If the session to commit on is not - * a transactional session, this method does nothing. + * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not + * a transactional controlSession, this method does nothing. * *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index d5d1c304e9..bd34fd8f20 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -20,21 +20,8 @@ */ package org.apache.qpid.requestreply; -import java.io.IOException; -import java.net.InetAddress; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.*; - import org.apache.log4j.Logger; +import org.apache.log4j.NDC; import org.apache.qpid.AMQException; import org.apache.qpid.client.*; @@ -51,6 +38,18 @@ import uk.co.thebadgerset.junit.extensions.BatchedThrottle; import uk.co.thebadgerset.junit.extensions.Throttle; import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import javax.jms.*; + +import java.io.IOException; +import java.net.InetAddress; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + /** * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens @@ -86,10 +85,11 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * username guest The username to access the broker with. * password guest The password to access the broker with. * selector null Not used. Defines a message selector to filter pings with. - * destinationCount 1 The number of receivers listening to the pings. + * destinationCount 1 The number of destinations to send pings to. + * numConsumers 1 The number of consumers on each destination. * timeout 30000 In milliseconds. The timeout to stop waiting for replies. * commitBatchSize 1 The number of messages per transaction in transactional mode. - * uniqueDests true Whether each receiver only listens to one ping destination or all. + * uniqueDests true Whether each receivers only listens to one ping destination or all. * durableDests false Whether or not durable destinations are used. * ackMode AUTO_ACK The message acknowledgement mode. Possible values are: * 0 - SESSION_TRANSACTED @@ -98,6 +98,10 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * 3 - DUPS_OK_ACKNOWLEDGE * 257 - NO_ACKNOWLEDGE * 258 - PRE_ACKNOWLEDGE + * consTransacted false Whether or not consumers use transactions. Defaults to the same value + * as the 'transacted' option if not seperately defined. + * consAckMode AUTO_ACK The message acknowledgement mode for consumers. Defaults to the same + * value as 'ackMode' if not seperately defined. * maxPending 0 The maximum size in bytes, of messages sent but not yet received. * Limits the volume of messages currently buffered on the client * or broker. Can help scale test clients by limiting amount of buffered @@ -131,8 +135,9 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * Instead make mina use a bounded blocking buffer, or other form of back pressure, to stop data being written * faster than it can be sent. */ -public class PingPongProducer implements Runnable, MessageListener, ExceptionListener +public class PingPongProducer implements Runnable /*, MessageListener*/, ExceptionListener { + /** Used for debugging. */ private static final Logger log = Logger.getLogger(PingPongProducer.class); /** Holds the name of the property to get the test message size from. */ @@ -159,6 +164,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Holds the transactional mode to use for the test. */ public static final boolean TRANSACTED_DEFAULT = false; + public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted"; + public static final boolean CONSUMER_TRANSACTED_DEFAULT = false; + /** Holds the name of the property to get the test broker url from. */ public static final String BROKER_PROPNAME = "broker"; @@ -237,12 +245,18 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Holds the default message selector. */ public static final String SELECTOR_DEFAULT = ""; - /** Holds the name of the proeprty to get the destination count from. */ + /** Holds the name of the property to get the destination count from. */ public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; /** Defines the default number of destinations to ping. */ public static final int DESTINATION_COUNT_DEFAULT = 1; + /** Holds the name of the property to get the number of consumers per destination from. */ + public static final String NUM_CONSUMERS_PROPNAME = "numConsumers"; + + /** Defines the default number consumers per destination. */ + public static final int NUM_CONSUMERS_DEFAULT = 1; + /** Holds the name of the property to get the waiting timeout for response messages. */ public static final String TIMEOUT_PROPNAME = "timeout"; @@ -270,6 +284,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Defines the default message acknowledgement mode. */ public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode"; + public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + public static final String MAX_PENDING_PROPNAME = "maxPending"; public static final int MAX_PENDING_DEFAULT = 0; @@ -297,8 +314,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT); defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT); defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); @@ -311,6 +330,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT); defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); + defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT); defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); @@ -323,12 +343,15 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis protected String _destinationName; protected String _selector; protected boolean _transacted; + protected boolean _consTransacted; /** Determines whether this producer sends persistent messages. */ protected boolean _persistent; /** Holds the acknowledgement mode used for sending and receiving messages. */ - private int _ackMode; + protected int _ackMode; + + protected int _consAckMode; /** Determines what size of messages this producer sends. */ protected int _messageSize; @@ -363,7 +386,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Holds the number of sends that should be performed in every transaction when using transactions. */ protected int _txBatchSize; + /** Holds the number of destinations to ping. */ protected int _noOfDestinations; + + /** Holds the number of consumers per destination. */ + protected int _noOfConsumers; + + /** Holds the maximum send rate in herz. */ protected int _rate; /** @@ -373,7 +402,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis protected int _maxPendingSize; /** - * Holds a monitor which is used to synchronize sender and receiver threads, where the sender has elected + * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected * to wait until the number of unreceived message is reduced before continuing to send. */ protected Object _sendPauseMonitor = new Object(); @@ -397,10 +426,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Holds the connection to the broker. */ protected Connection _connection; - /** Holds the session on which ping replies are received. */ - protected Session _consumerSession; + /** Holds the consumer connections. */ + protected Connection[] _consumerConnection; - /** Holds the producer session, needed to create ping messages. */ + /** Holds the controlSession on which ping replies are received. */ + protected Session[] _consumerSession; + + /** Holds the producer controlSession, needed to create ping messages. */ protected Session _producerSession; /** Holds the destination where the response messages will arrive. */ @@ -434,18 +466,15 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis protected MessageProducer _producer; /** Holds the message consumer to receive the ping replies through. */ - protected MessageConsumer _consumer; - - /** - * Holds the number of consumers that will be attached to each topic. Each pings will result in a reply from each of the - * attached clients - */ - static int _consumersPerTopic = 1; + protected MessageConsumer[] _consumer; /** The prompt to display when asking the user to kill the broker for failover testing. */ private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; private String _clientID; + /** Keeps count of the total messages sent purely for debugging purposes. */ + private static AtomicInteger numSent = new AtomicInteger(); + /** * Creates a ping producer with the specified parameters, of which there are many. See the class level comments * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on @@ -457,7 +486,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public PingPongProducer(Properties overrides) throws Exception { - log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); + // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); // Create a set of parsed properties from the defaults overriden by the passed in values. ParsedProperties properties = new ParsedProperties(defaults); @@ -471,6 +500,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME); _selector = properties.getProperty(SELECTOR_PROPNAME); _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME); + _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME); _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME); @@ -481,11 +511,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME); _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME); + _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME); _rate = properties.getPropertyAsInteger(RATE_PROPNAME); _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME); + _consAckMode = properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); // Check that one or more destinations were specified. @@ -516,7 +548,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public void establishConnection(boolean producer, boolean consumer) throws Exception { - log.debug("public void establishConnection(): called"); + // log.debug("public void establishConnection(): called"); // Generate a unique identifying name for this client, based on it ip address and the current time. InetAddress address = InetAddress.getLocalHost(); @@ -526,11 +558,17 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis createConnection(_clientID); // Create transactional or non-transactional sessions, based on the command line arguments. - _producerSession = (Session) getConnection().createSession(_transacted, _ackMode); - _consumerSession = (Session) getConnection().createSession(_transacted, _ackMode); + _producerSession = (Session) _connection.createSession(_transacted, _ackMode); + + _consumerSession = new Session[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerSession[i] = (Session) _consumerConnection[i].createSession(_consTransacted, _consAckMode); + } // Create the destinations to send pings to and receive replies from. - _replyDestination = _consumerSession.createTemporaryQueue(); + _replyDestination = _consumerSession[0].createTemporaryQueue(); createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); // Create the message producer only if instructed to. @@ -557,7 +595,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ protected void createConnection(String clientID) throws AMQException, URLSyntaxException { + // log.debug("protected void createConnection(String clientID = " + clientID + "): called"); + + // log.debug("Creating a connection for the message producer."); + _connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath); + + // log.debug("Creating " + _noOfConsumers + " connections for the consumers."); + + _consumerConnection = new Connection[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerConnection[i] = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath); + } } /** @@ -570,20 +621,21 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { try { - Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {})); + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); // Create a ping producer overriding its defaults with all options passed on the command line. PingPongProducer pingProducer = new PingPongProducer(options); pingProducer.establishConnection(true, true); // Start the ping producers dispatch thread running. - pingProducer.getConnection().start(); + pingProducer._connection.start(); // Create a shutdown hook to terminate the ping-pong producer. Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - pingProducer.getConnection().setExceptionListener(pingProducer); + pingProducer._connection.setExceptionListener(pingProducer); // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. Thread pingThread = new Thread(pingProducer); @@ -624,12 +676,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public List getReplyDestinations() { - log.debug("public List getReplyDestinations(): called"); + // log.debug("public List getReplyDestinations(): called"); List replyDestinations = new ArrayList(); replyDestinations.add(_replyDestination); - log.debug("replyDestinations = " + replyDestinations); + // log.debug("replyDestinations = " + replyDestinations); return replyDestinations; } @@ -642,12 +694,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public void createProducer() throws JMSException { - log.debug("public void createProducer(): called"); + // log.debug("public void createProducer(): called"); _producer = (MessageProducer) _producerSession.createProducer(null); _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); + // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); } /** @@ -665,14 +717,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, boolean durable) throws JMSException, AMQException { - log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " - + durable + "): called"); + + durable + "): called");*/ _pingDestinations = new ArrayList(); // Create the desired number of ping destinations and consumers for them. - log.debug("Creating " + noOfDestinations + " destinations to ping."); + // log.debug("Creating " + noOfDestinations + " destinations to ping."); for (int i = 0; i < noOfDestinations; i++) { @@ -683,12 +735,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. if (unique) { - log.debug("Creating unique destinations."); + // log.debug("Creating unique destinations."); id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); } else { - log.debug("Creating shared destinations."); + // log.debug("Creating shared destinations."); id = "_" + _queueSharedID.incrementAndGet(); } @@ -698,14 +750,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis if (!durable) { destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id); - log.debug("Created non-durable topic " + destination); + // log.debug("Created non-durable topic " + destination); } else { destination = AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id), _clientID, (AMQConnection) _connection); - log.debug("Created durable topic " + destination); + // log.debug("Created durable topic " + destination); } } // Otherwise this is a p2p pinger, in which case create queues. @@ -719,7 +771,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null, ExchangeDefaults.DIRECT_EXCHANGE_NAME); - log.debug("Created queue " + destination); + // log.debug("Created queue " + destination); } // Keep the destination. @@ -737,20 +789,36 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public void createReplyConsumers(Collection destinations, String selector) throws JMSException { - log.debug("public void createReplyConsumers(Collection destinations = " + destinations - + ", String selector = " + selector + "): called"); + /*log.debug("public void createReplyConsumers(Collection destinations = " + destinations + + ", String selector = " + selector + "): called");*/ - log.debug("Creating " + destinations.size() + " reply consumers."); + // log.debug("There are " + destinations.size() + " destinations."); + // log.debug("Creating " + _noOfConsumers + " consumers on each destination."); + // log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); for (Destination destination : destinations) { - // Create a consumer for the destination and set this pinger to listen to its messages. - _consumer = - _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT, - selector); - _consumer.setMessageListener(this); + _consumer = new MessageConsumer[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + // Create a consumer for the destination and set this pinger to listen to its messages. + _consumer[i] = + _consumerSession[i].createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT, + selector); + + final int consumerNo = i; - log.debug("Set this to listen to replies sent to destination: " + destination); + _consumer[i].setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + onMessageWithConsumerNo(message, consumerNo); + } + }); + + // log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); + } } } @@ -761,97 +829,123 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * * @param message The received message. */ - public void onMessage(Message message) + public void onMessageWithConsumerNo(Message message, int consumerNo) { - // log.debug("public void onMessage(Message message): called"); - + // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); try { + long now = System.nanoTime(); + long timestamp = getTimestamp(message); + long pingTime = now - timestamp; + + // NDC.push("cons" + consumerNo); + // Extract the messages correlation id. String correlationID = message.getJMSCorrelationID(); // log.debug("correlationID = " + correlationID); - // Countdown on the traffic light if there is one for the matching correlation id. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); + int num = message.getIntProperty("MSG_NUM"); + // log.info("Message " + num + " received."); + + boolean isRedelivered = message.getJMSRedelivered(); + // log.debug("isRedelivered = " + isRedelivered); - if (perCorrelationId != null) + if (!isRedelivered) { - CountDownLatch trafficLight = perCorrelationId.trafficLight; + // Countdown on the traffic light if there is one for the matching correlation id. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); - // Restart the timeout timer on every message. - perCorrelationId.timeOutStart = System.nanoTime(); + if (perCorrelationId != null) + { + CountDownLatch trafficLight = perCorrelationId.trafficLight; - // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); + // Restart the timeout timer on every message. + perCorrelationId.timeOutStart = System.nanoTime(); - // Decrement the countdown latch. Before this point, it is possible that two threads might enter this - // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block - // ensures that each thread will get a unique value for the remaining messages. - long trueCount = -1; - long remainingCount = -1; + // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); - synchronized (trafficLight) - { - trafficLight.countDown(); + // Decrement the countdown latch. Before this point, it is possible that two threads might enter this + // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block + // ensures that each thread will get a unique value for the remaining messages. + long trueCount = -1; + long remainingCount = -1; - trueCount = trafficLight.getCount(); - remainingCount = trueCount - 1; + synchronized (trafficLight) + { + trafficLight.countDown(); - // Decrement the count of sent but not yet received messages. - int unreceived = _unreceived.decrementAndGet(); - int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); + trueCount = trafficLight.getCount(); + remainingCount = trueCount - 1; - // Release a waiting sender if there is one. - synchronized (_sendPauseMonitor) - { - if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize)) - // && (_sendPauseBarrier.getNumberWaiting() == 1)) + // Decrement the count of sent but not yet received messages. + int unreceived = _unreceived.decrementAndGet(); + int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); + + // Release a waiting sender if there is one. + synchronized (_sendPauseMonitor) { - log.debug("unreceived size estimate under limit = " + unreceivedSize); - - // Wait on the send pause barrier for the limit to be re-established. - /*try - {*/ - // _sendPauseBarrier.await(); - _sendPauseMonitor.notify(); - /*} - catch (InterruptedException e) + if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize)) + // && (_sendPauseBarrier.getNumberWaiting() == 1)) { - throw new RuntimeException(e); + // log.debug("unreceived size estimate under limit = " + unreceivedSize); + + // Wait on the send pause barrier for the limit to be re-established. + /*try + {*/ + // _sendPauseBarrier.await(); + _sendPauseMonitor.notify(); + /*} + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (BrokenBarrierException e) + { + throw new RuntimeException(e); + }*/ } - catch (BrokenBarrierException e) - { - throw new RuntimeException(e); - }*/ } - } - // log.debug("remainingCount = " + remainingCount); - // log.debug("trueCount = " + trueCount); + // NDC.push("/rem" + remainingCount); - // Commit on transaction batch size boundaries. At this point in time the waiting producer remains - // blocked, even on the last message. - if ((remainingCount % _txBatchSize) == 0) - { - commitTx(_consumerSession); - } + // log.debug("remainingCount = " + remainingCount); + // log.debug("trueCount = " + trueCount); - // Forward the message and remaining count to any interested chained message listener. - if (_chainedMessageListener != null) - { - _chainedMessageListener.onMessage(message, (int) remainingCount); - } + // Commit on transaction batch size boundaries. At this point in time the waiting producer remains + // blocked, even on the last message. + // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on + // each batch boundary. For pub/sub each consumer gets every message so no division is done. + long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); + // log.debug("commitCount = " + commitCount); - // Check if this is the last message, in which case release any waiting producers. This is done - // after the transaction has been committed and any listeners notified. - if (trueCount == 1) - { - trafficLight.countDown(); + if ((commitCount % _txBatchSize) == 0) + { + // log.debug("Trying commit for consumer " + consumerNo + "."); + commitTx(_consumerSession[consumerNo]); + } + + // Forward the message and remaining count to any interested chained message listener. + if (_chainedMessageListener != null) + { + _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime); + } + + // Check if this is the last message, in which case release any waiting producers. This is done + // after the transaction has been committed and any listeners notified. + if (trueCount == 1) + { + trafficLight.countDown(); + } } } + else + { + log.warn("Got unexpected message with correlationId: " + correlationID); + } } else { - log.warn("Got unexpected message with correlationId: " + correlationID); + log.warn("Got redelivered message, ignoring."); } // Print out ping times for every message in verbose mode only. @@ -870,8 +964,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { log.warn("There was a JMSException: " + e.getMessage(), e); } - - // log.debug("public void onMessage(Message message): ending"); + finally + { + // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); + // NDC.clear(); + } } /** @@ -893,8 +990,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) throws JMSException, InterruptedException { - log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " - + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called"); + /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ // Generate a unique correlation id to put on the messages before sending them, if one was not specified. if (messageCorrelationId == null) @@ -904,6 +1001,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis try { + // NDC.push("prod"); + // Create a count down latch to count the number of replies with. This is created before the messages are // sent so that the replies cannot be received before the count down is created. // One is added to this, so that the last reply becomes a special case. The special case is that the @@ -935,16 +1034,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis allMessagesReceived = numReplies == getExpectedNumPings(numPings); - log.debug("numReplies = " + numReplies); - log.debug("allMessagesReceived = " + allMessagesReceived); + // log.debug("numReplies = " + numReplies); + // log.debug("allMessagesReceived = " + allMessagesReceived); // Recheck the timeout condition. long now = System.nanoTime(); long lastMessageReceievedAt = perCorrelationId.timeOutStart; timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000); - log.debug("now = " + now); - log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); + // log.debug("now = " + now); + // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); } while (!timedOut && !allMessagesReceived); @@ -957,9 +1056,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis log.info("Got all replies on id, " + messageCorrelationId); } - commitTx(_consumerSession); + // commitTx(_consumerSession); - log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); + // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); return numReplies; } @@ -967,6 +1066,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // so will be a memory leak if this is not done. finally { + // NDC.pop(); perCorrelationIds.remove(messageCorrelationId); } } @@ -982,8 +1082,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException { - log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings - + ", String messageCorrelationId = " + messageCorrelationId + "): called"); + /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ if (message == null) { @@ -1067,7 +1167,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis if (unreceivedSize > _maxPendingSize) { - log.debug("unreceived size estimate over limit = " + unreceivedSize); + // log.debug("unreceived size estimate over limit = " + unreceivedSize); // Wait on the send pause barrier for the limit to be re-established. try @@ -1096,11 +1196,19 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Send the message either to its round robin destination, or its default destination. if (destination == null) { + int num = numSent.incrementAndGet(); + message.setIntProperty("MSG_NUM", num); + setTimestamp(message); _producer.send(message); + // log.info("Message " + num + " sent."); } else { + int num = numSent.incrementAndGet(); + message.setIntProperty("MSG_NUM", num); + setTimestamp(message); _producer.send(destination, message); + // log.info("Message " + num + " sent."); } // Increase the unreceived size, this may actually happen aftern the message is recevied. @@ -1118,6 +1226,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. if (((i + 1) % _txBatchSize) == 0) { + // log.debug("Trying commit on producer session."); committed = commitTx(_producerSession); } @@ -1135,7 +1244,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { // Generate a sample message and time stamp it. Message msg = getTestMessage(_replyDestination, _messageSize, _persistent); - setTimestamp(msg); + // setTimestamp(msg); // Send the message and wait for a reply. pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null); @@ -1143,12 +1252,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis catch (JMSException e) { _publish = false; - log.debug("There was a JMSException: " + e.getMessage(), e); + // log.debug("There was a JMSException: " + e.getMessage(), e); } catch (InterruptedException e) { _publish = false; - log.debug("There was an interruption: " + e.getMessage(), e); + // log.debug("There was an interruption: " + e.getMessage(), e); } } @@ -1186,7 +1295,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Timestamp the message in nanoseconds. - setTimestamp(msg); + // setTimestamp(msg); return msg; } @@ -1227,6 +1336,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _publish = false; } + public void start() throws JMSException + { + _connection.start(); + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerConnection[i].start(); + } + } + /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ public void run() { @@ -1245,7 +1364,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public void onException(JMSException e) { - log.debug("public void onException(JMSException e = " + e + "): called", e); + // log.debug("public void onException(JMSException e = " + e + "): called", e); _publish = false; } @@ -1266,16 +1385,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }); } - /** - * Gets the underlying connection that this ping client is running on. - * - * @return The underlying connection that this ping client is running on. - */ - public Connection getConnection() - { - return _connection; - } - /** * Closes the pingers connection. * @@ -1283,14 +1392,23 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public void close() throws JMSException { - log.debug("public void close(): called"); + // log.debug("public void close(): called"); try { if (_connection != null) { _connection.close(); - log.debug("Close connection."); + // log.debug("Close connection."); + } + + for (int i = 0; i < _noOfConsumers; i++) + { + if (_consumerConnection[i] != null) + { + _consumerConnection[i].close(); + // log.debug("Closed consumer connection."); + } } } finally @@ -1298,6 +1416,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _connection = null; _producerSession = null; _consumerSession = null; + _consumerConnection = null; _producer = null; _consumer = null; _pingDestinations = null; @@ -1306,8 +1425,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * Convenience method to commit the transaction on the specified session. If the session to commit on is not a - * transactional session, this method does nothing (unless the failover after send flag is set). + * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a + * transactional controlSession, this method does nothing (unless the failover after send flag is set). * *

If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is * applied. This flag applies whether the pinger is transactional or not. @@ -1316,9 +1435,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the * commit is applied. These flags will only apply if using a transactional pinger. * - * @param session The session to commit + * @param session The controlSession to commit * - * @return true if the session was committed, false if it was not. + * @return true if the controlSession was committed, false if it was not. * * @throws javax.jms.JMSException If the commit fails and then the rollback fails. * @@ -1347,6 +1466,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis if (session.getTransacted()) { + // log.debug("Session is transacted."); + try { if (_failBeforeCommit) @@ -1360,10 +1481,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis waitForUser(KILL_BROKER_PROMPT); } - // long l = System.nanoTime(); + long start = System.nanoTime(); session.commit(); committed = true; - // log.debug("Time taken to commit :" + ((System.nanoTime() - l) / 1000000f) + " ms"); + // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); if (_failAfterCommit) { @@ -1376,26 +1497,26 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis waitForUser(KILL_BROKER_PROMPT); } - // log.trace("Session Commited."); + // log.debug("Session Commited."); } catch (JMSException e) { - log.debug("JMSException on commit:" + e.getMessage(), e); + // log.debug("JMSException on commit:" + e.getMessage(), e); // Warn that the bounce back client is not available. if (e.getLinkedException() instanceof AMQNoConsumersException) { - log.debug("No consumers on queue."); + // log.debug("No consumers on queue."); } try { session.rollback(); - log.debug("Message rolled back."); + // log.debug("Message rolled back."); } catch (JMSException jmse) { - log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); + // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); // Both commit and rollback failed. Throw the rollback exception. throw jmse; @@ -1428,23 +1549,34 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * This value will be changed by PingClient to represent the number of clients connected to each topic. + * Gets the number of consumers that are listening to each destination in the test. * * @return int The number of consumers subscribing to each topic. */ - public int getConsumersPerTopic() + public int getConsumersPerDestination() { - return _consumersPerTopic; + return _noOfConsumers; } + /** + * Calculates how many pings are expected to be received for the given number sent. + * + * @param numpings The number of pings that will be sent. + * + * @return The number that should be received, for the test to pass. + */ public int getExpectedNumPings(int numpings) { - return numpings * getConsumersPerTopic(); + // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called"); + + // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers."); + + return numpings * (_isPubSub ? getConsumersPerDestination() : 1); } /** * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link - * PingPongProducer#onMessage} method is called, the chained listener set through the {@link + * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of * messages with that correlation id. * @@ -1454,7 +1586,17 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ public static interface ChainedMessageListener { - public void onMessage(Message message, int remainingCount) throws JMSException; + /** + * Notifies interested listeners about message arrival and important test stats, the number of messages + * remaining in the test, and the messages send timestamp. + * + * @param message The newly arrived message. + * @param remainingCount The number of messages left to complete the test. + * @param latency The nanosecond latency of the message. + * + * @throws JMSException Any JMS exceptions is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException; } /** diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java new file mode 100644 index 0000000000..f289fe0db2 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -0,0 +1,251 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; +import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; +import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; + +import javax.jms.*; + +/** + * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run + * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from + * a producer to a conumer, then the consumer replies to the message on a temporary queue. + * + *

A single run of the test using the default JUnit test runner will result in the sending and timing of the number + * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled + * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more + * information on how to do this. + * + *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a + * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads + * gets its own connection/producer/consumer, this is only re-established if the connection is lost. + * + *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that + * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come + * back on the temporary queue. + * + *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. + * + *

+ *
CRC Card
Responsibilities Collaborations + *
+ */ +public class PingPongTestPerf extends AsymptoticTestCase +{ + private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal threadSetup = new ThreadLocal(); + + // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in + // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner + // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. + // private Properties testParameters = System.getProperties(); + private ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); + + public PingPongTestPerf(String name) + { + super(name); + + _logger.debug(testParameters); + + // Sets up the test parameters with defaults. + /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, + Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT)); + testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, + PingPongProducer.PING_QUEUE_NAME_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, + Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, + Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, + Boolean.toString(PingPongProducer.VERBOSE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, + Boolean.toString(PingPongProducer.PUBSUB_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, + Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, + PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, + PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, + PingPongProducer.FAIL_AFTER_SEND_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, + PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, + Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, + Integer.toString(PingPongProducer.ACK_MODE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, + PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping-Pong Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingPongTestPerf("testPingPongOk")); + + return suite; + } + + private static void setSystemPropertyIfNull(String propName, String propValue) + { + if (System.getProperty(propName) == null) + { + System.setProperty(propName, propValue); + } + } + + public void testPingPongOk(int numPings) throws Exception + { + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + Message msg = + perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the message and wait for a reply. + int numReplies = + perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null); + + // Fail the test if the timeout was exceeded. + if (numReplies != numPings) + { + Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings); + } + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + try + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // Extract the test set up paramaeters. + String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); + String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); + String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); + String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME); + String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); + boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); + boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); + String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); + boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME); + + synchronized (this) + { + // Establish a bounce back client on the ping queue to bounce back the pings. + perThreadSetup._testPingBouncer = + new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent, + transacted, selector, verbose, pubsub); + + // Start the connections for client and producer running. + perThreadSetup._testPingBouncer.getConnection().start(); + + // Establish a ping-pong client on the ping queue to send the pings and receive replies with. + perThreadSetup._testPingProducer = new PingPongProducer(testParameters); + perThreadSetup._testPingProducer.establishConnection(true, true); + perThreadSetup._testPingProducer.start(); + } + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * Performs test fixture clean + */ + public void threadTearDown() + { + _logger.debug("public void threadTearDown(): called"); + + try + { + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + perThreadSetup._testPingProducer.close(); + // perThreadSetup._testPingBouncer.close(); + } + + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); + } + catch (JMSException e) + { + _logger.warn("There was an exception during per thread tear down."); + } + } + + protected static class PerThreadSetup + { + /** + * Holds the test ping-pong producer. + */ + private PingPongProducer _testPingProducer; + + /** + * Holds the test ping client. + */ + private PingPongBouncer _testPingBouncer; + } +} diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java deleted file mode 100644 index 6c7f22c19a..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.ObjectMessage; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; - -import uk.co.thebadgerset.junit.extensions.TimingController; -import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; - -/** - * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller - * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test - * that it extends because it can output timings as replies are received, rather than waiting until all expected replies - * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending - * and recieving clients working asynchronously. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Send many ping messages and output timings asynchronously on batches received. - *
- */ -public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware -{ - private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); - - /** Holds the name of the property to get the test results logging batch size. */ - public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; - - /** Holds the default test results logging batch size. */ - public static final int TEST_RESULTS_BATCH_SIZE_DEFAULT = 1000; - - /** Used to hold the timing controller passed from the test runner. */ - private TimingController _timingController; - - /** Used to generate unique correlation ids for each test run. */ - private AtomicLong corellationIdGenerator = new AtomicLong(); - - /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ - private Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); - - /** Holds the batched results listener, that does logging on batch boundaries. */ - private BatchedResultsListener batchedResultsListener = null; - - /** - * Creates a new asynchronous ping performance test with the specified name. - * - * @param name The test name. - */ - public PingAsyncTestPerf(String name) - { - super(name); - - // Sets up the test parameters with defaults. - testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, - Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT)); - } - - /** - * Compile all the tests into a test suite. - * @return The test suite to run. Should only contain testAsyncPingOk method. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); - - return suite; - } - - /** - * Accepts a timing controller from the test runner. - * - * @param timingController The timing controller to register mutliple timings with. - */ - public void setTimingController(TimingController timingController) - { - _timingController = timingController; - } - - /** - * Gets the timing controller passed in by the test runner. - * - * @return The timing controller passed in by the test runner. - */ - public TimingController getTimingController() - { - return _timingController; - } - - /** - * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until - * all replies have been received or a time out occurs before exiting this method. - * - * @param numPings The number of pings to send. - * @throws Exception pass all errors out to the test harness - */ - public void testAsyncPingOk(int numPings) throws Exception - { - // _logger.debug("public void testAsyncPingOk(int numPings): called"); - - // Ensure that at least one ping was requeusted. - if (numPings == 0) - { - _logger.error("Number of pings requested was zero."); - fail("Number of pings requested was zero."); - } - - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - PingClient pingClient = perThreadSetup._pingClient; - - // Advance the correlation id of messages to send, to make it unique for this run. - perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet()); - // String messageCorrelationId = perThreadSetup._correlationId; - // _logger.debug("messageCorrelationId = " + messageCorrelationId); - - // Initialize the count and timing controller for the new correlation id. - PerCorrelationId perCorrelationId = new PerCorrelationId(); - TimingController tc = getTimingController().getControllerForCurrentThread(); - perCorrelationId._tc = tc; - perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); - perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); - - // Send the requested number of messages, and wait until they have all been received. - long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId); - - // Check that all the replies were received and log a fail if they were not. - if (numReplies < perCorrelationId._expectedCount) - { - perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount); - } - - // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. - perCorrelationIds.remove(perThreadSetup._correlationId); - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - _logger.debug("public void threadSetUp(): called"); - - try - { - // Call the set up method in the super class. This creates a PingClient pinger. - super.threadSetUp(); - - // Create the chained message listener, only if it has not already been created. This is set up with the - // batch size property, to tell it what batch size to output results on. A synchronized block is used to - // ensure that only one thread creates this. - synchronized (this) - { - if (batchedResultsListener == null) - { - int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); - batchedResultsListener = new BatchedResultsListener(batchSize); - } - } - - // Get the set up that the super class created. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Register the chained message listener on the pinger to do its asynchronous test timings from. - perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the - * pinger, in order to receive notifications about every message received and the number remaining to be - * received. Whenever the number remaining crosses a batch size boundary this results listener outputs - * a test timing for the actual number of messages received in the current batch. - */ - private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener - { - /** The test results logging batch size. */ - int _batchSize; - - /** - * Creates a results listener on the specified batch size. - * - * @param batchSize The batch size to use. - */ - public BatchedResultsListener(int batchSize) - { - _batchSize = batchSize; - } - - /** - * This callback method is called from all of the pingers that this test creates. It uses the correlation id - * from the message to identify the timing controller for the test thread that was responsible for sending those - * messages. - * - * @param message The message. - * @param remainingCount The count of messages remaining to be received with a particular correlation id. - * - * @throws JMSException Any underlying JMSException is allowed to fall through. - */ - public void onMessage(Message message, int remainingCount) throws JMSException - { - // Check if a batch boundary has been crossed. - if ((remainingCount % _batchSize) == 0) - { - // Extract the correlation id from the message. - String correlationId = message.getJMSCorrelationID(); - - /*_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount - + "): called on batch boundary for message id: " + correlationId + " with thread id: " - + Thread.currentThread().getId());*/ - - // Get the details for the correlation id and check that they are not null. They can become null - // if a test times out. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); - if (perCorrelationId != null) - { - // Get the timing controller and expected count for this correlation id. - TimingController tc = perCorrelationId._tc; - int expected = perCorrelationId._expectedCount; - - // Calculate how many messages were actually received in the last batch. This will be the batch size - // except where the number expected is not a multiple of the batch size and this is the first remaining - // count to cross a batch size boundary, in which case it will be the number expected modulo the batch - // size. - int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; - - // Register a test result for the correlation id. - try - { - tc.completeTest(true, receivedInBatch); - } - catch (InterruptedException e) - { - // Ignore this. It means the test runner wants to stop as soon as possible. - _logger.warn("Got InterruptedException.", e); - } - } - // Else ignore, test timed out. Should log a fail here? - } - } - } - - /** - * Holds state specific to each correlation id, needed to output test results. This consists of the count of - * the total expected number of messages, and the timing controller for the thread sending those message ids. - */ - private static class PerCorrelationId - { - public int _expectedCount; - public TimingController _tc; - } -} diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java deleted file mode 100644 index af612d5430..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingLatencyTestPerf.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.ObjectMessage; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.message.AMQMessage; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.requestreply.PingPongProducer; - -import uk.co.thebadgerset.junit.extensions.TimingController; -import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; - -/** - * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing - * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for - * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from - * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than - * waiting until all expected replies are received. - * - *

This test does not output timings for every single ping message, as when running at high volume, writing the test - * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The - * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the - * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. - * - *

The size parameter logged for each individual ping is set to the size of the batch of messages that the - * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput - * (messages / time) can be calculated in order to examine the relationship between throughput and latency. - * - *

CRC Card
Responsibilities Collaborations
Send many ping - * messages and output timings for sampled individual pings.
- */ -public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware -{ - private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class); - - /** Holds the name of the property to get the test results logging batch size. */ - public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; - - /** Holds the default test results logging batch size. */ - public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; - - /** Used to hold the timing controller passed from the test runner. */ - private TimingController _timingController; - - /** Used to generate unique correlation ids for each test run. */ - private AtomicLong corellationIdGenerator = new AtomicLong(); - - /** - * Holds test specifics by correlation id. This consists of the expected number of messages and the timing - * controler. - */ - private Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); - - /** Holds the batched results listener, that does logging on batch boundaries. */ - private BatchedResultsListener batchedResultsListener = null; - - /** - * Creates a new asynchronous ping performance test with the specified name. - * - * @param name The test name. - */ - public PingLatencyTestPerf(String name) - { - super(name); - - // Sets up the test parameters with defaults. - ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, - Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); - } - - /** Compile all the tests into a test suite. */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Latency Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingLatencyTestPerf("testPingLatency")); - - return suite; - } - - /** - * Accepts a timing controller from the test runner. - * - * @param timingController The timing controller to register mutliple timings with. - */ - public void setTimingController(TimingController timingController) - { - _timingController = timingController; - } - - /** - * Gets the timing controller passed in by the test runner. - * - * @return The timing controller passed in by the test runner. - */ - public TimingController getTimingController() - { - return _timingController; - } - - /** - * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all - * replies have been received or a time out occurs before exiting this method. - * - * @param numPings The number of pings to send. - */ - public void testPingLatency(int numPings) throws Exception - { - _logger.debug("public void testPingLatency(int numPings): called"); - - // Ensure that at least one ping was requeusted. - if (numPings == 0) - { - _logger.error("Number of pings requested was zero."); - } - - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - PingClient pingClient = perThreadSetup._pingClient; - - // Advance the correlation id of messages to send, to make it unique for this run. - String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); - _logger.debug("messageCorrelationId = " + messageCorrelationId); - - // Initialize the count and timing controller for the new correlation id. - PerCorrelationId perCorrelationId = new PerCorrelationId(); - TimingController tc = getTimingController().getControllerForCurrentThread(); - perCorrelationId._tc = tc; - perCorrelationId._expectedCount = numPings; - perCorrelationIds.put(messageCorrelationId, perCorrelationId); - - // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these - // messages. - pingClient.setChainedMessageListener(batchedResultsListener); - - // Generate a sample message of the specified size. - Message msg = - pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // Send the requested number of messages, and wait until they have all been received. - long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null); - - // Check that all the replies were received and log a fail if they were not. - if (numReplies < numPings) - { - tc.completeTest(false, 0); - } - - // Remove the chained message listener from the ping producer. - pingClient.removeChainedMessageListener(); - - // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. - perCorrelationIds.remove(messageCorrelationId); - } - - /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ - public void threadSetUp() - { - _logger.debug("public void threadSetUp(): called"); - - try - { - // Call the set up method in the super class. This creates a PingClient pinger. - super.threadSetUp(); - - // Create the chained message listener, only if it has not already been created. This is set up with the - // batch size property, to tell it what batch size to output results on. A synchronized block is used to - // ensure that only one thread creates this. - synchronized (this) - { - if (batchedResultsListener == null) - { - int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); - batchedResultsListener = new BatchedResultsListener(batchSize); - } - } - - // Get the set up that the super class created. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Register the chained message listener on the pinger to do its asynchronous test timings from. - perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can - * be attached to the pinger, in order to receive notifications about every message received and the number - * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener - * outputs a test timing for the actual number of messages received in the current batch. - */ - private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener - { - /** The test results logging batch size. */ - int _batchSize; - private boolean _strictAMQP; - - /** - * Creates a results listener on the specified batch size. - * - * @param batchSize The batch size to use. - */ - public BatchedResultsListener(int batchSize) - { - _batchSize = batchSize; - _strictAMQP = - Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, - AMQSession.STRICT_AMQP_DEFAULT)); - } - - /** - * This callback method is called from all of the pingers that this test creates. It uses the correlation id - * from the message to identify the timing controller for the test thread that was responsible for sending those - * messages. - * - * @param message The message. - * @param remainingCount The count of messages remaining to be received with a particular correlation id. - * - * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. - */ - public void onMessage(Message message, int remainingCount) throws JMSException - { - _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); - - // Check if a batch boundary has been crossed. - if ((remainingCount % _batchSize) == 0) - { - // Extract the correlation id from the message. - String correlationId = message.getJMSCorrelationID(); - - // Get the details for the correlation id and check that they are not null. They can become null - // if a test times out. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); - if (perCorrelationId != null) - { - // Get the timing controller and expected count for this correlation id. - TimingController tc = perCorrelationId._tc; - int expected = perCorrelationId._expectedCount; - - // Extract the send time from the message and work out from the current time, what the ping latency was. - // The ping producer time stamps messages in nanoseconds. - long startTime; - - if (_strictAMQP) - { - Long value = - ((AMQMessage) message).getTimestampProperty(new AMQShortString( - PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME)); - - startTime = ((value == null) ? 0L : value); - } - else - { - startTime = message.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); - } - - long now = System.nanoTime(); - long pingTime = now - startTime; - - // Calculate how many messages were actually received in the last batch. This will be the batch size - // except where the number expected is not a multiple of the batch size and this is the first remaining - // count to cross a batch size boundary, in which case it will be the number expected modulo the batch - // size. - int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; - - // Register a test result for the correlation id. - try - { - - tc.completeTest(true, receivedInBatch, pingTime); - } - catch (InterruptedException e) - { - // Ignore this. It means the test runner wants to stop as soon as possible. - _logger.warn("Got InterruptedException.", e); - } - } - // Else ignore, test timed out. Should log a fail here? - } - } - } - - /** - * Holds state specific to each correlation id, needed to output test results. This consists of the count of the - * total expected number of messages, and the timing controller for the thread sending those message ids. - */ - private static class PerCorrelationId - { - public int _expectedCount; - public TimingController _tc; - } -} diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java deleted file mode 100644 index 46333db844..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import junit.framework.Assert; -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; - -import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; -import uk.co.thebadgerset.junit.extensions.TestThreadAware; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; - -import javax.jms.*; - -/** - * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times - * simultaneously to simluate many clients/producers/connections. - * - *

A single run of the test using the default JUnit test runner will result in the sending and timing of a single - * full round trip ping. This test may be scaled up using a suitable JUnit test runner. - * - *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a - * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, - * except if the connection is lost in which case an attempt to re-establish the setup is made. - * - *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that - * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the - * temporary queue. - * - *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- */ -public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware -{ - private static Logger _logger = Logger.getLogger(PingTestPerf.class); - - /** Thread local to hold the per-thread test setup fields. */ - ThreadLocal threadSetup = new ThreadLocal(); - - /** Holds a property reader to extract the test parameters from. */ - protected ParsedProperties testParameters = - TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); - - public PingTestPerf(String name) - { - super(name); - - _logger.debug("testParameters = " + testParameters); - } - - /** - * Compile all the tests into a test suite. - * @return The test method testPingOk. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingTestPerf("testPingOk")); - - return suite; - } - - public void testPingOk(int numPings) throws Exception - { - if (numPings == 0) - { - Assert.fail("Number of pings requested was zero."); - } - - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - - if (perThreadSetup == null) - { - Assert.fail("Could not get per thread test setup, it was null."); - } - - // Generate a sample message. This message is already time stamped and has its reply-to destination set. - Message msg = - perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // start the test - long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null); - - // Fail the test if the timeout was exceeded. - if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) - { - Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " - + numReplies); - } - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - _logger.debug("public void threadSetUp(): called"); - - try - { - PerThreadSetup perThreadSetup = new PerThreadSetup(); - - // This is synchronized because there is a race condition, which causes one connection to sleep if - // all threads try to create connection concurrently. - synchronized (this) - { - // Establish a client to ping a Destination and listen the reply back from same Destination - perThreadSetup._pingClient = new PingClient(testParameters); - perThreadSetup._pingClient.establishConnection(true, true); - } - // Start the client connection - perThreadSetup._pingClient.getConnection().start(); - - // Attach the per-thread set to the thread. - threadSetup.set(perThreadSetup); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * Performs test fixture clean - */ - public void threadTearDown() - { - _logger.debug("public void threadTearDown(): called"); - - try - { - // Get the per thread test fixture. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Close the pingers so that it cleans up its connection cleanly. - synchronized (this) - { - if ((perThreadSetup != null) && (perThreadSetup._pingClient != null)) - { - perThreadSetup._pingClient.close(); - } - } - } - catch (JMSException e) - { - _logger.warn("There was an exception during per thread tear down."); - } - finally - { - // Ensure the per thread fixture is reclaimed. - threadSetup.remove(); - } - } - - protected static class PerThreadSetup - { - /** - * Holds the test ping client. - */ - protected PingClient _pingClient; - protected String _correlationId; - } -} diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java deleted file mode 100644 index f4a6dc6554..0000000000 --- a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import javax.jms.*; - -import junit.framework.Assert; -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; - -/** - * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run - * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from - * a producer to a conumer, then the consumer replies to the message on a temporary queue. - * - *

A single run of the test using the default JUnit test runner will result in the sending and timing of the number - * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled - * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more - * information on how to do this. - * - *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a - * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads - * gets its own connection/producer/consumer, this is only re-established if the connection is lost. - * - *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that - * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come - * back on the temporary queue. - * - *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- */ -public class PingPongTestPerf extends AsymptoticTestCase -{ - private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); - - /** Thread local to hold the per-thread test setup fields. */ - ThreadLocal threadSetup = new ThreadLocal(); - - // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in - // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner - // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. - // private Properties testParameters = System.getProperties(); - private ParsedProperties testParameters = - TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); - - public PingPongTestPerf(String name) - { - super(name); - - _logger.debug(testParameters); - - // Sets up the test parameters with defaults. - /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, - Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, - Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT)); - testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, - PingPongProducer.PING_QUEUE_NAME_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, - Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, - Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, - Boolean.toString(PingPongProducer.VERBOSE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, - Boolean.toString(PingPongProducer.PUBSUB_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, - Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, - Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, - PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, - PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, - PingPongProducer.FAIL_AFTER_SEND_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, - PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, - Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, - Integer.toString(PingPongProducer.ACK_MODE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, - PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ - } - - /** - * Compile all the tests into a test suite. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping-Pong Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingPongTestPerf("testPingPongOk")); - - return suite; - } - - private static void setSystemPropertyIfNull(String propName, String propValue) - { - if (System.getProperty(propName) == null) - { - System.setProperty(propName, propValue); - } - } - - public void testPingPongOk(int numPings) throws Exception - { - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Generate a sample message. This message is already time stamped and has its reply-to destination set. - Message msg = - perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // Send the message and wait for a reply. - int numReplies = - perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null); - - // Fail the test if the timeout was exceeded. - if (numReplies != numPings) - { - Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings); - } - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - try - { - PerThreadSetup perThreadSetup = new PerThreadSetup(); - - // Extract the test set up paramaeters. - String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); - String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); - String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); - String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME); - String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); - boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); - boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); - String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); - boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); - boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME); - - synchronized (this) - { - // Establish a bounce back client on the ping queue to bounce back the pings. - perThreadSetup._testPingBouncer = - new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent, - transacted, selector, verbose, pubsub); - - // Start the connections for client and producer running. - perThreadSetup._testPingBouncer.getConnection().start(); - - // Establish a ping-pong client on the ping queue to send the pings and receive replies with. - perThreadSetup._testPingProducer = new PingPongProducer(testParameters); - perThreadSetup._testPingProducer.establishConnection(true, true); - perThreadSetup._testPingProducer.getConnection().start(); - } - - // Attach the per-thread set to the thread. - threadSetup.set(perThreadSetup); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * Performs test fixture clean - */ - public void threadTearDown() - { - _logger.debug("public void threadTearDown(): called"); - - try - { - // Get the per thread test fixture. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Close the pingers so that it cleans up its connection cleanly. - synchronized (this) - { - perThreadSetup._testPingProducer.close(); - // perThreadSetup._testPingBouncer.close(); - } - - // Ensure the per thread fixture is reclaimed. - threadSetup.remove(); - } - catch (JMSException e) - { - _logger.warn("There was an exception during per thread tear down."); - } - } - - protected static class PerThreadSetup - { - /** - * Holds the test ping-pong producer. - */ - private PingPongProducer _testPingProducer; - - /** - * Holds the test ping client. - */ - private PingPongBouncer _testPingBouncer; - } -} -- cgit v1.2.1