From 68f53d8e26783fa6833de5000951d27776c10467 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 5 Feb 2007 09:49:59 +0000 Subject: Update to performance testing to allow the use of shared destinations. This allows topics to have multiple consumers and the total message counts updated correctly. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@503609 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/ping/PingClient.java | 17 + .../apache/qpid/requestreply/PingPongProducer.java | 378 ++++++++++++++------- .../org/apache/qpid/ping/PingAsyncTestPerf.java | 16 +- .../java/org/apache/qpid/ping/PingTestPerf.java | 15 +- 4 files changed, 290 insertions(+), 136 deletions(-) (limited to 'java') diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java index 0e3d753fea..1a37f47b35 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -38,6 +38,8 @@ import org.apache.qpid.requestreply.PingPongProducer; */ public class PingClient extends PingPongProducer { + private static int _pingClientCount; + /** * Creates a ping producer with the specified parameters, of which there are many. See their individual comments * for details. This constructor creates ping pong producer but de-registers its reply-to destination message @@ -76,6 +78,8 @@ public class PingClient extends PingPongProducer super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate, pubsub, unique); + + _pingClientCount++; } /** @@ -88,4 +92,17 @@ public class PingClient extends PingPongProducer { return _pingDestinations; } + + public int getConsumersPerTopic() + { + if (_isUnique) + { + return 1; + } + else + { + return _pingClientCount; + } + } + } diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index c2962e48c9..8def95f7b1 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -50,21 +50,21 @@ import uk.co.thebadgerset.junit.extensions.Throttle; /** * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back * client (see {@link PingPongBouncer} for the bounce back client). - * + *

*

The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. * This means that this class has to do some work to correlate pings with pongs; it expectes the original message * correlation id in the ping to be bounced back in the reply correlation id. - * + *

*

This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. * It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings * within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform * failover testing. - * + *

*

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is * also registered to terminate the ping-pong loop cleanly. - * + *

*

*
CRC Card
Responsibilities Collaborations *
Provide a ping and wait for all responses cycle. @@ -72,55 +72,67 @@ import uk.co.thebadgerset.junit.extensions.Throttle; *
* * @todo The use of a ping rate {@link #DEFAULT_RATE} and waits between pings {@link #DEFAULT_SLEEP_TIME} are overlapping. - * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to - * be created and configured by the test runner from the -f command line option and made available through - * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware - * tests. - * + * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to + * be created and configured by the test runner from the -f command line option and made available through + * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware + * tests. * @todo Make acknowledege mode a test option. - * * @todo Make the message listener a static for all replies to be sent to? It won't be any more of a bottle neck than - * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process - * messages concurrently for different ids. Needs to be static so that when using a chained message listener and - * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to - * be picked up by the PPP that it is atteched to. - * + * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process + * messages concurrently for different ids. Needs to be static so that when using a chained message listener and + * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to + * be picked up by the PPP that it is atteched to. * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock - * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message - * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means - * that the last message waits until all other messages have been handled before releasing producers but allows - * messages to be processed concurrently, unlike the current synchronized block. - * + * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message + * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means + * that the last message waits until all other messages have been handled before releasing producers but allows + * messages to be processed concurrently, unlike the current synchronized block. * @todo Need to multiply up the number of expected messages for pubsub tests as each can be received by many consumers? */ public class PingPongProducer implements Runnable, MessageListener, ExceptionListener { private static final Logger _logger = Logger.getLogger(PingPongProducer.class); - /** Holds the name of the property to get the test message size from. */ + /** + * Holds the name of the property to get the test message size from. + */ public static final String MESSAGE_SIZE_PROPNAME = "messagesize"; - /** Holds the name of the property to get the ping queue name from. */ + /** + * Holds the name of the property to get the ping queue name from. + */ public static final String PING_QUEUE_NAME_PROPNAME = "destinationname"; - /** Holds the name of the property to get the test delivery mode from. */ + /** + * Holds the name of the property to get the test delivery mode from. + */ public static final String PERSISTENT_MODE_PROPNAME = "persistent"; - /** Holds the name of the property to get the test transactional mode from. */ + /** + * Holds the name of the property to get the test transactional mode from. + */ public static final String TRANSACTED_PROPNAME = "transacted"; - /** Holds the name of the property to get the test broker url from. */ + /** + * Holds the name of the property to get the test broker url from. + */ public static final String BROKER_PROPNAME = "broker"; - /** Holds the name of the property to get the test broker virtual path. */ + /** + * Holds the name of the property to get the test broker virtual path. + */ public static final String VIRTUAL_PATH_PROPNAME = "virtualPath"; - /** Holds the name of the property to get the message rate from. */ + /** + * Holds the name of the property to get the message rate from. + */ public static final String RATE_PROPNAME = "rate"; public static final String VERBOSE_OUTPUT_PROPNAME = "verbose"; - /** Holds the true or false depending on wether it is P2P test or PubSub */ + /** + * Holds the true or false depending on wether it is P2P test or PubSub + */ public static final String IS_PUBSUB_PROPNAME = "pubsub"; public static final String FAIL_AFTER_COMMIT_PROPNAME = "FailAfterCommit"; @@ -141,96 +153,147 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount"; - /** Holds the name of the property to get the waiting timeout for response messages. */ + /** + * Holds the name of the property to get the waiting timeout for response messages. + */ public static final String TIMEOUT_PROPNAME = "timeout"; public static final String COMMIT_BATCH_SIZE_PROPNAME = "CommitBatchSize"; public static final String UNIQUE_PROPNAME = "uniqueDests"; - /** Used to set up a default message size. */ + /** + * Used to set up a default message size. + */ public static final int DEFAULT_MESSAGE_SIZE = 0; - /** Holds the name of the default destination to send pings on. */ + /** + * Holds the name of the default destination to send pings on. + */ public static final String DEFAULT_PING_DESTINATION_NAME = "ping"; - /** Defines the default number of destinations to ping. */ + /** + * Defines the default number of destinations to ping. + */ public static final int DEFAULT_DESTINATION_COUNT = 1; - /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ + /** + * Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. + */ public static final int DEFAULT_RATE = 0; - /** Defines the default wait between pings. */ + /** + * Defines the default wait between pings. + */ public static final long DEFAULT_SLEEP_TIME = 250; - /** Default time to wait before assuming that a ping has timed out. */ + /** + * Default time to wait before assuming that a ping has timed out. + */ public static final long DEFAULT_TIMEOUT = 30000; - /** Defines the default number of pings to send in each transaction when running transactionally. */ + /** + * Defines the default number of pings to send in each transaction when running transactionally. + */ public static final int DEFAULT_TX_BATCH_SIZE = 100; - /** Defines the default prefetch size to use when consuming messages. */ + /** + * Defines the default prefetch size to use when consuming messages. + */ public static final int DEFAULT_PREFETCH = 100; - /** Defines the default value of the no local flag to use when consuming messages. */ + /** + * Defines the default value of the no local flag to use when consuming messages. + */ public static final boolean DEFAULT_NO_LOCAL = false; - /** Defines the default value of the exclusive flag to use when consuming messages. */ + /** + * Defines the default value of the exclusive flag to use when consuming messages. + */ public static final boolean DEFAULT_EXCLUSIVE = false; - /** Holds the message delivery mode to use for the test. */ + /** + * Holds the message delivery mode to use for the test. + */ public static final boolean DEFAULT_PERSISTENT_MODE = false; - /** Holds the transactional mode to use for the test. */ + /** + * Holds the transactional mode to use for the test. + */ public static final boolean DEFAULT_TRANSACTED = false; - /** Holds the default broker url for the test. */ + /** + * Holds the default broker url for the test. + */ public static final String DEFAULT_BROKER = "tcp://localhost:5672"; - /** Holds the default virtual path for the test. */ + /** + * Holds the default virtual path for the test. + */ public static final String DEFAULT_VIRTUAL_PATH = "test"; - /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ + /** + * Holds the pub/sub mode default, true means ping a topic, false means ping a queue. + */ public static final boolean DEFAULT_PUBSUB = false; - /** Holds the default broker log on username. */ + /** + * Holds the default broker log on username. + */ public static final String DEFAULT_USERNAME = "guest"; - /** Holds the default broker log on password. */ + /** + * Holds the default broker log on password. + */ public static final String DEFAULT_PASSWORD = "guest"; - /** Holds the default message selector. */ + /** + * Holds the default message selector. + */ public static final String DEFAULT_SELECTOR = null; - /** Holds the default failover after commit test flag. */ + /** + * Holds the default failover after commit test flag. + */ public static final String DEFAULT_FAIL_AFTER_COMMIT = "false"; - /** Holds the default failover before commit test flag. */ + /** + * Holds the default failover before commit test flag. + */ public static final String DEFAULT_FAIL_BEFORE_COMMIT = "false"; - /** Holds the default failover after send test flag. */ + /** + * Holds the default failover after send test flag. + */ public static final String DEFAULT_FAIL_AFTER_SEND = "false"; - /** Holds the default failover before send test flag. */ + /** + * Holds the default failover before send test flag. + */ public static final String DEFAULT_FAIL_BEFORE_SEND = "false"; - /** Holds the default failover only once flag, true means only do one failover, false means failover on every commit cycle. */ + /** + * Holds the default failover only once flag, true means only do one failover, false means failover on every commit cycle. + */ public static final String DEFAULT_FAIL_ONCE = "true"; - /** Holds the default verbose mode. */ + /** + * Holds the default verbose mode. + */ public static final boolean DEFAULT_VERBOSE = false; public static final boolean DEFAULT_UNIQUE = true; - /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ + /** + * Holds the name of the property to store nanosecond timestamps in ping messages with. + */ public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; - /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ + /** + * A source for providing sequential unique correlation ids. These will be unique within the same JVM. + */ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); - /** A source for providing unique ids to PingPongProducer. */ - private static AtomicInteger _pingProducerIdGenerator; - /** * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross * multiple ping producers on the same JVM. @@ -238,9 +301,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /*private static Map trafficLights = Collections.synchronizedMap(new HashMap());*/ private static Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); + Collections.synchronizedMap(new HashMap()); - /** A convenient formatter to use when time stamping output. */ + /** + * A convenient formatter to use when time stamping output. + */ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); /** @@ -249,69 +314,118 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger(); - /** Destination where the response messages will arrive. */ + /** + * Destination where the response messages will arrive. + */ private Destination _replyDestination; - /** Determines whether this producer sends persistent messages. */ + /** + * Determines whether this producer sends persistent messages. + */ protected boolean _persistent; - /** Determines what size of messages this producer sends. */ + /** + * Determines what size of messages this producer sends. + */ protected int _messageSize; - /** Used to indicate that the ping loop should print out whenever it pings. */ + /** + * Used to indicate that the ping loop should print out whenever it pings. + */ protected boolean _verbose = false; - /** Holds the session on which ping replies are received. */ + /** + * Holds the session on which ping replies are received. + */ protected Session _consumerSession; - /** Used to restrict the sending rate to a specified limit. */ + /** + * Used to restrict the sending rate to a specified limit. + */ private Throttle _rateLimiter = null; - /** Holds a message listener that this message listener chains all its messages to. */ + /** + * Holds a message listener that this message listener chains all its messages to. + */ private ChainedMessageListener _chainedMessageListener = null; - /** Flag used to indicate if this is a point to point or pub/sub ping client. */ + /** + * Flag used to indicate if this is a point to point or pub/sub ping client. + */ protected boolean _isPubSub = false; + /** + * Flag used to indicate if the destinations should be unique client. + */ + protected static boolean _isUnique = false; + /** * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers * on the same JVM using this id generator will allow them to ping on the same queues. */ protected AtomicInteger _queueSharedId = new AtomicInteger(); - /** Used to tell the ping loop when to terminate, it only runs while this is true. */ + /** + * Used to tell the ping loop when to terminate, it only runs while this is true. + */ protected boolean _publish = true; - /** Holds the connection to the broker. */ + /** + * Holds the connection to the broker. + */ private Connection _connection; - /** Holds the producer session, needed to create ping messages. */ + /** + * Holds the producer session, needed to create ping messages. + */ private Session _producerSession; - /** Holds the set of destiniations that this ping producer pings. */ + /** + * Holds the set of destiniations that this ping producer pings. + */ protected List _pingDestinations = new ArrayList(); - /** Holds the message producer to send the pings through. */ + /** + * Holds the message producer to send the pings through. + */ protected MessageProducer _producer; - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ + /** + * Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. + */ protected boolean _failBeforeCommit = false; - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ + /** + * Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. + */ protected boolean _failAfterCommit = false; - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ + /** + * Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. + */ protected boolean _failBeforeSend = false; - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ + /** + * Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. + */ protected boolean _failAfterSend = false; - /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ + /** + * Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. + */ protected boolean _failOnce = true; - /** Holds the number of sends that should be performed in every transaction when using transactions. */ + /** + * Holds the number of sends that should be performed in every transaction when using transactions. + */ protected int _txBatchSize = 1; + /** + * Holds the number of consumers that will be attached to each topic. + * Each pings will result in a reply from each of the attached clients + */ + static int _consumersPerTopic = 1; + /** * Creates a ping producer with the specified parameters, of which there are many. See their individual comments * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it, @@ -339,7 +453,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * possible, with no rate restriction. * @param pubsub True to ping topics, false to ping queues. * @param unique True to use unique destinations for each ping pong producer, false to share. - * * @throws Exception Any exceptions are allowed to fall through. */ public PingPongProducer(String brokerDetails, String username, String password, String virtualpath, @@ -369,6 +482,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _failOnce = failOnce; _txBatchSize = txBatchSize; _isPubSub = pubsub; + _isUnique = unique; // Check that one or more destinations were specified. if (noOfDestinations < 1) @@ -407,6 +521,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * to be started to bounce the pings back again. * * @param args The command line arguments. + * @throws Exception When something went wrong with the test */ public static void main(String[] args) throws Exception { @@ -479,9 +594,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Create a ping producer to handle the request/wait/reply cycle. PingPongProducer pingProducer = - new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector, - transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend, - beforeSend, failOnce, batchSize, destCount, rate, pubsub, false); + new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector, + transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend, + beforeSend, failOnce, batchSize, destCount, rate, pubsub, false); pingProducer.getConnection().start(); @@ -511,7 +626,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis Thread.sleep(sleepTime); } catch (InterruptedException ie) - { } + { + //ignore + } } } @@ -555,11 +672,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param rootName The root of the name, or actual name if only one is being created. * @param unique true to make the destinations unique to this pinger, false to share * the numbering with all pingers on the same JVM. - * * @throws JMSException Any JMSExceptions are allowed to fall through. */ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique) - throws JMSException + throws JMSException { _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = " @@ -568,7 +684,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Create the desired number of ping destinations and consumers for them. for (int i = 0; i < noOfDestinations; i++) { - AMQDestination destination = null; + AMQDestination destination; int id; @@ -701,11 +817,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param message The message to send. * @param numPings The number of ping messages to send. * @param timeout The timeout in milliseconds. - * * @return The number of replies received. This may be less than the number sent if the timeout terminated the * wait for all prematurely. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * @throws InterruptedException When interrupted by a timeout. */ public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException { @@ -727,14 +842,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param numPings The number of ping messages to send. * @param timeout The timeout in milliseconds. * @param messageCorrelationId The message correlation id. - * * @return The number of replies received. This may be less than the number sent if the timeout terminated the * wait for all prematurely. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * @throws InterruptedException When interrupted by a timeout */ public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) - throws JMSException, InterruptedException + throws JMSException, InterruptedException { _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called"); @@ -747,7 +861,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // chained message listener must be called before this sender can be unblocked, but that decrementing the // countdown needs to be done before the chained listener can be called. PerCorrelationId perCorrelationId = new PerCorrelationId(); - perCorrelationId.trafficLight = new CountDownLatch(numPings + 1); + + perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1); perCorrelationIds.put(messageCorrelationId, perCorrelationId); // Set up the current time as the start time for pinging on the correlation id. This is used to determine @@ -767,11 +882,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); // Work out how many replies were receieved. - numReplies = numPings - (int) perCorrelationId.trafficLight.getCount(); - allMessagesReceived = numReplies >= numPings; + numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount(); + + allMessagesReceived = numReplies == getExpectedNumPings(numPings); - _logger.debug("numReplies = "+ numReplies); - _logger.debug("allMessagesReceived = "+ allMessagesReceived); + _logger.debug("numReplies = " + numReplies); + _logger.debug("allMessagesReceived = " + allMessagesReceived); // Recheck the timeout condition. long now = System.nanoTime(); @@ -783,7 +899,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } while (!timedOut && !allMessagesReceived); - if ((numReplies < numPings) && _verbose) + if ((numReplies < getExpectedNumPings(numPings)) && _verbose) { _logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); } @@ -812,7 +928,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param message The message to send. * @param numPings The number of pings to send. * @param messageCorrelationId A correlation id to place on all messages sent. - * * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException @@ -927,9 +1042,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param replyQueue The reply-to destination for the message. * @param messageSize The desired size of the message in bytes. * @param persistent true if the message should use persistent delivery, false otherwise. - * * @return A freshly generated test message. - * * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. */ public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException @@ -984,12 +1097,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public Thread getShutdownHook() { return new Thread(new Runnable() + { + public void run() { - public void run() - { - stop(); - } - }); + stop(); + } + }); } /** @@ -1007,7 +1120,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * * @param destinations The destinations to listen to. * @param selector A selector to filter the messages with. - * * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. */ public void createReplyConsumers(Collection destinations, String selector) throws JMSException @@ -1019,8 +1131,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { // Create a consumer for the destination and set this pinger to listen to its messages. MessageConsumer consumer = - _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE, - selector); + _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE, + selector); consumer.setMessageListener(this); } } @@ -1043,19 +1155,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** * Convenience method to commit the transaction on the specified session. If the session to commit on is not * a transactional session, this method does nothing (unless the failover after send flag is set). - * + *

*

If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit * is applied. This flag applies whether the pinger is transactional or not. - * + *

*

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker * after the commit is applied. These flags will only apply if using a transactional pinger. * + * @param session The session to commit * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - * - * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit - * method, because commits only apply to transactional pingers, but fail after send applied to transactional - * and non-transactional alike. + *

+ * //todo @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit + * method, because commits only apply to transactional pingers, but fail after send applied to transactional + * and non-transactional alike. */ protected void commitTx(Session session) throws JMSException { @@ -1136,7 +1249,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * * @param destination The destination to send to. * @param message The message to send. - * * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through. */ protected void sendMessage(Destination destination, Message message) throws JMSException @@ -1174,17 +1286,35 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis System.in.read(); } catch (IOException e) - { } + { + //ignore + } System.out.println("Continuing."); } + /** + * This value will be changed by PingClient to represent the number of clients connected to each topic. + * + * @return int The number of consumers subscribing to each topic. + */ + public int getConsumersPerTopic() + { + return _consumersPerTopic; + } + + public int getExpectedNumPings(int numpings) + { + return numpings * getConsumersPerTopic(); + } + + /** * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's * {@link PingPongProducer#onMessage} method is called, the chained listener set through the * {@link PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected * count of messages with that correlation id. - * + *

* Provided only one pinger is producing messages with that correlation id, the chained listener will always be * given unique message counts. It will always be called while the producer waiting for all messages to arrive is * still blocked. @@ -1200,10 +1330,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */ protected static class PerCorrelationId { - /** Holds a countdown on number of expected messages. */ + /** + * Holds a countdown on number of expected messages. + */ CountDownLatch trafficLight; - /** Holds the last timestamp that the timeout was reset to. */ + /** + * Holds the last timestamp that the timeout was reset to. + */ Long timeOutStart; } } diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index c01987cfc0..347031ff51 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -70,7 +70,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ private Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); + Collections.synchronizedMap(new HashMap()); /** Holds the batched results listener, that does logging on batch boundaries. */ private BatchedResultsListener batchedResultsListener = null; @@ -91,6 +91,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA /** * Compile all the tests into a test suite. + * @return The test suite to run. Should only contain testAsyncPingOk method. */ public static Test suite() { @@ -128,6 +129,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA * all replies have been received or a time out occurs before exiting this method. * * @param numPings The number of pings to send. + * @throws Exception pass all errors out to the test harness */ public void testAsyncPingOk(int numPings) throws Exception { @@ -151,7 +153,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA PerCorrelationId perCorrelationId = new PerCorrelationId(); TimingController tc = getTimingController().getControllerForCurrentThread(); perCorrelationId._tc = tc; - perCorrelationId._expectedCount = numPings; + perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); perCorrelationIds.put(messageCorrelationId, perCorrelationId); // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these @@ -160,18 +162,18 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Generate a sample message of the specified size. ObjectMessage msg = - pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, messageCorrelationId); // Check that all the replies were received and log a fail if they were not. - if (numReplies < numPings) + if (numReplies < perCorrelationId._expectedCount) { - tc.completeTest(false, numPings - numReplies); + tc.completeTest(false, numPings - perCorrelationId._expectedCount); } // Remove the chained message listener from the ping producer. diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index 0d0df0128e..07d7fad471 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -110,6 +110,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware /** * Compile all the tests into a test suite. + * @return The test method testPingOk. */ public static Test suite() { @@ -139,18 +140,18 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware // Generate a sample message. This message is already time stamped and has its reply-to destination set. ObjectMessage msg = - perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger( - PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean( - PingPongProducer.PERSISTENT_MODE_PROPNAME)); + perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger( + PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean( + PingPongProducer.PERSISTENT_MODE_PROPNAME)); // start the test long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout); // Fail the test if the timeout was exceeded. - if (numReplies != numPings) + if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) { Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + numReplies); @@ -191,7 +192,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware // Extract the test set up paramaeters. int destinationscount = - Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME)); + Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME)); // This is synchronized because there is a race condition, which causes one connection to sleep if // all threads try to create connection concurrently. -- cgit v1.2.1