From 29bb07b8dcfc3fdcdbf581205d146e4a0f294e92 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Fri, 17 Aug 2007 10:19:54 +0000 Subject: Tests enhanced for fanout style tests with many consumers per destination. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@567003 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/ping/PingDurableClient.java | 17 ++-- .../java/org/apache/qpid/ping/PingTestPerf.java | 2 +- .../apache/qpid/requestreply/PingPongProducer.java | 113 +++++++++++++++------ .../apache/qpid/requestreply/PingPongTestPerf.java | 6 +- 4 files changed, 95 insertions(+), 43 deletions(-) (limited to 'java/perftests/src') diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java index c5f71b4774..fddb72a582 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -28,10 +28,7 @@ import org.apache.qpid.util.CommandLineParser; import uk.co.thebadgerset.junit.extensions.util.MathUtils; import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; +import javax.jms.*; import java.io.BufferedReader; import java.io.IOException; @@ -220,7 +217,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList // Establish the connection and the message producer. establishConnection(true, false); - getConnection().start(); + _connection.start(); Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); @@ -330,8 +327,8 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList _queueSharedID = new AtomicInteger(); establishConnection(false, true); - _consumer.setMessageListener(null); - _connection.start(); + _consumer[0].setMessageListener(null); + _consumerConnection[0].start(); // Try to receive all of the pings that were successfully sent. int messagesReceived = 0; @@ -340,7 +337,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList while (!endCondition) { // Message received = _consumer.receiveNoWait(); - Message received = _consumer.receive(TIME_OUT); + Message received = _consumer[0].receive(TIME_OUT); log.debug("received = " + received); if (received != null) @@ -367,7 +364,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList { try { - _consumerSession.commit(); + _consumerSession[0].commit(); System.out.println("Committed for all messages received."); } catch (JMSException e) @@ -376,7 +373,7 @@ public class PingDurableClient extends PingPongProducer implements ExceptionList System.out.println("Error during commit."); try { - _consumerSession.rollback(); + _consumerSession[0].rollback(); System.out.println("Rolled back on all messages received."); } catch (JMSException e2) diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java index 46333db844..0c2aa80a09 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java @@ -142,7 +142,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware perThreadSetup._pingClient.establishConnection(true, true); } // Start the client connection - perThreadSetup._pingClient.getConnection().start(); + perThreadSetup._pingClient.start(); // Attach the per-thread set to the thread. threadSetup.set(perThreadSetup); diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 1a731b2b7e..660c0b6523 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -84,7 +84,8 @@ import java.util.concurrent.atomic.AtomicLong; * username guest The username to access the broker with. * password guest The password to access the broker with. * selector null Not used. Defines a message selector to filter pings with. - * destinationCount 1 The number of receivers listening to the pings. + * destinationCount 1 The number of destinations to send pings to. + * numConsumers 1 The number of consumers on each destination. * timeout 30000 In milliseconds. The timeout to stop waiting for replies. * commitBatchSize 1 The number of messages per transaction in transactional mode. * uniqueDests true Whether each receivers only listens to one ping destination or all. @@ -129,7 +130,7 @@ import java.util.concurrent.atomic.AtomicLong; * Instead make mina use a bounded blocking buffer, or other form of back pressure, to stop data being written * faster than it can be sent. */ -public class PingPongProducer implements Runnable, MessageListener, ExceptionListener +public class PingPongProducer implements Runnable /*, MessageListener*/, ExceptionListener { private static final Logger log = Logger.getLogger(PingPongProducer.class); @@ -241,6 +242,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Defines the default number of destinations to ping. */ public static final int DESTINATION_COUNT_DEFAULT = 1; + /** Holds the name of the property to get the number of consumers per destination from. */ + public static final String NUM_CONSUMERS_PROPNAME = "numConsumers"; + + /** Defines the default number consumers per destination. */ + public static final int NUM_CONSUMERS_DEFAULT = 1; + /** Holds the name of the property to get the waiting timeout for response messages. */ public static final String TIMEOUT_PROPNAME = "timeout"; @@ -309,6 +316,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT); defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); + defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT); defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); @@ -361,7 +369,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Holds the number of sends that should be performed in every transaction when using transactions. */ protected int _txBatchSize; + /** Holds the number of destinations to ping. */ protected int _noOfDestinations; + + /** Holds the number of consumers per destination. */ + protected int _noOfConsumers; + + /** Holds the maximum send rate in herz. */ protected int _rate; /** @@ -395,8 +409,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Holds the connection to the broker. */ protected Connection _connection; + /** Holds the consumer connections. */ + protected Connection[] _consumerConnection; + /** Holds the controlSession on which ping replies are received. */ - protected Session _consumerSession; + protected Session[] _consumerSession; /** Holds the producer controlSession, needed to create ping messages. */ protected Session _producerSession; @@ -432,7 +449,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis protected MessageProducer _producer; /** Holds the message consumer to receive the ping replies through. */ - protected MessageConsumer _consumer; + protected MessageConsumer[] _consumer; /** * Holds the number of consumers that will be attached to each destination in the test. Each pings will result in @@ -479,6 +496,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME); _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME); + _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME); _rate = properties.getPropertyAsInteger(RATE_PROPNAME); _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); @@ -524,11 +542,17 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis createConnection(_clientID); // Create transactional or non-transactional sessions, based on the command line arguments. - _producerSession = (Session) getConnection().createSession(_transacted, _ackMode); - _consumerSession = (Session) getConnection().createSession(_transacted, _ackMode); + _producerSession = (Session) _connection.createSession(_transacted, _ackMode); + + _consumerSession = new Session[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerSession[i] = (Session) _consumerConnection[i].createSession(_transacted, _ackMode); + } // Create the destinations to send pings to and receive replies from. - _replyDestination = _consumerSession.createTemporaryQueue(); + _replyDestination = _consumerSession[0].createTemporaryQueue(); createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); // Create the message producer only if instructed to. @@ -556,6 +580,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis protected void createConnection(String clientID) throws AMQException, URLSyntaxException { _connection = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath); + + _consumerConnection = new Connection[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerConnection[i] = new AMQConnection(_brokerDetails, _username, _password, clientID, _virtualpath); + } } /** @@ -576,13 +607,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis pingProducer.establishConnection(true, true); // Start the ping producers dispatch thread running. - pingProducer.getConnection().start(); + pingProducer._connection.start(); // Create a shutdown hook to terminate the ping-pong producer. Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - pingProducer.getConnection().setExceptionListener(pingProducer); + pingProducer._connection.setExceptionListener(pingProducer); // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. Thread pingThread = new Thread(pingProducer); @@ -743,13 +774,27 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis for (Destination destination : destinations) { - // Create a consumer for the destination and set this pinger to listen to its messages. - _consumer = - _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT, - selector); - _consumer.setMessageListener(this); + _consumer = new MessageConsumer[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + // Create a consumer for the destination and set this pinger to listen to its messages. + _consumer[i] = + _consumerSession[i].createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT, + selector); + + final int consumerNo = i; + + _consumer[i].setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + onMessageWithConsumerNo(message, consumerNo); + } + }); - log.debug("Set this to listen to replies sent to destination: " + destination); + log.debug("Set this to listen to replies sent to destination: " + destination); + } } } @@ -760,7 +805,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * * @param message The received message. */ - public void onMessage(Message message) + public void onMessageWithConsumerNo(Message message, int consumerNo) { // log.debug("public void onMessage(Message message): called"); @@ -831,7 +876,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // blocked, even on the last message. if ((remainingCount % _txBatchSize) == 0) { - commitTx(_consumerSession); + commitTx(_consumerSession[consumerNo]); } // Forward the message and remaining count to any interested chained message listener. @@ -956,7 +1001,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis log.info("Got all replies on id, " + messageCorrelationId); } - commitTx(_consumerSession); + // commitTx(_consumerSession); log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); @@ -1226,6 +1271,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _publish = false; } + public void start() throws JMSException + { + _connection.start(); + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerConnection[i].start(); + } + } + /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ public void run() { @@ -1265,16 +1320,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }); } - /** - * Gets the underlying connection that this ping client is running on. - * - * @return The underlying connection that this ping client is running on. - */ - public Connection getConnection() - { - return _connection; - } - /** * Closes the pingers connection. * @@ -1291,12 +1336,22 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _connection.close(); log.debug("Close connection."); } + + for (int i = 0; i < _noOfConsumers; i++) + { + if (_consumerConnection[i] != null) + { + _consumerConnection[i].close(); + log.debug("Closed consumer connection."); + } + } } finally { _connection = null; _producerSession = null; _consumerSession = null; + _consumerConnection = null; _producer = null; _consumer = null; _pingDestinations = null; @@ -1452,7 +1507,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link - * PingPongProducer#onMessage} method is called, the chained listener set through the {@link + * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of * messages with that correlation id. * diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java index f4a6dc6554..e52deeecf1 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.requestreply; -import javax.jms.*; - import junit.framework.Assert; import junit.framework.Test; import junit.framework.TestSuite; @@ -32,6 +30,8 @@ import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; +import javax.jms.*; + /** * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from @@ -196,7 +196,7 @@ public class PingPongTestPerf extends AsymptoticTestCase // Establish a ping-pong client on the ping queue to send the pings and receive replies with. perThreadSetup._testPingProducer = new PingPongProducer(testParameters); perThreadSetup._testPingProducer.establishConnection(true, true); - perThreadSetup._testPingProducer.getConnection().start(); + perThreadSetup._testPingProducer.start(); } // Attach the per-thread set to the thread. -- cgit v1.2.1