diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-02-05 09:49:59 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-05 09:49:59 +0000 |
| commit | 68f53d8e26783fa6833de5000951d27776c10467 (patch) | |
| tree | 83134bb0fe536f40b0ed483c49221e874ff94912 /java/perftests | |
| parent | 8949f938b39c2a2235f31bb2035174eedc9ba7b7 (diff) | |
| download | qpid-python-68f53d8e26783fa6833de5000951d27776c10467.tar.gz | |
Update to performance testing to allow the use of shared destinations. This allows topics to have multiple consumers and the total message counts updated correctly.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@503609 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests')
4 files changed, 290 insertions, 136 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java index 0e3d753fea..1a37f47b35 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -38,6 +38,8 @@ import org.apache.qpid.requestreply.PingPongProducer; */
public class PingClient extends PingPongProducer
{
+ private static int _pingClientCount;
+
/**
* Creates a ping producer with the specified parameters, of which there are many. See their individual comments
* for details. This constructor creates ping pong producer but de-registers its reply-to destination message
@@ -76,6 +78,8 @@ public class PingClient extends PingPongProducer super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
pubsub, unique);
+
+ _pingClientCount++;
}
/**
@@ -88,4 +92,17 @@ public class PingClient extends PingPongProducer {
return _pingDestinations;
}
+
+ public int getConsumersPerTopic()
+ {
+ if (_isUnique)
+ {
+ return 1;
+ }
+ else
+ {
+ return _pingClientCount;
+ }
+ }
+
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index c2962e48c9..8def95f7b1 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -50,21 +50,21 @@ import uk.co.thebadgerset.junit.extensions.Throttle; /**
* PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
* client (see {@link PingPongBouncer} for the bounce back client).
- *
+ * <p/>
* <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
* This means that this class has to do some work to correlate pings with pongs; it expectes the original message
* correlation id in the ping to be bounced back in the reply correlation id.
- *
+ * <p/>
* <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor.
* It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings
* within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform
* failover testing.
- *
+ * <p/>
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
* does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
* by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
* also registered to terminate the ping-pong loop cleanly.
- *
+ * <p/>
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Provide a ping and wait for all responses cycle.
@@ -72,55 +72,67 @@ import uk.co.thebadgerset.junit.extensions.Throttle; * </table>
*
* @todo The use of a ping rate {@link #DEFAULT_RATE} and waits between pings {@link #DEFAULT_SLEEP_TIME} are overlapping.
- * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to
- * be created and configured by the test runner from the -f command line option and made available through
- * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware
- * tests.
- *
+ * Use the rate and throttling only. Ideally, optionally pass the rate throttle into the ping method, throttle to
+ * be created and configured by the test runner from the -f command line option and made available through
+ * the timing controller on timing aware tests or by throttling rate of calling tests methods on non-timing aware
+ * tests.
* @todo Make acknowledege mode a test option.
- *
* @todo Make the message listener a static for all replies to be sent to? It won't be any more of a bottle neck than
- * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process
- * messages concurrently for different ids. Needs to be static so that when using a chained message listener and
- * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to
- * be picked up by the PPP that it is atteched to.
- *
+ * having one per PingPongProducer, as will synchronize on message correlation id, allowing threads to process
+ * messages concurrently for different ids. Needs to be static so that when using a chained message listener and
+ * shared destinations between multiple PPPs, it gets notified about all replies, not just those that happen to
+ * be picked up by the PPP that it is atteched to.
* @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock
- * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message
- * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means
- * that the last message waits until all other messages have been handled before releasing producers but allows
- * messages to be processed concurrently, unlike the current synchronized block.
- *
+ * pair. Obtian read lock on all messages, before decrementing the message count. At the end of the on message
+ * method add a block that obtains the write lock for the very last message, releases any waiting producer. Means
+ * that the last message waits until all other messages have been handled before releasing producers but allows
+ * messages to be processed concurrently, unlike the current synchronized block.
* @todo Need to multiply up the number of expected messages for pubsub tests as each can be received by many consumers?
*/
public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
{
private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
- /** Holds the name of the property to get the test message size from. */
+ /**
+ * Holds the name of the property to get the test message size from.
+ */
public static final String MESSAGE_SIZE_PROPNAME = "messagesize";
- /** Holds the name of the property to get the ping queue name from. */
+ /**
+ * Holds the name of the property to get the ping queue name from.
+ */
public static final String PING_QUEUE_NAME_PROPNAME = "destinationname";
- /** Holds the name of the property to get the test delivery mode from. */
+ /**
+ * Holds the name of the property to get the test delivery mode from.
+ */
public static final String PERSISTENT_MODE_PROPNAME = "persistent";
- /** Holds the name of the property to get the test transactional mode from. */
+ /**
+ * Holds the name of the property to get the test transactional mode from.
+ */
public static final String TRANSACTED_PROPNAME = "transacted";
- /** Holds the name of the property to get the test broker url from. */
+ /**
+ * Holds the name of the property to get the test broker url from.
+ */
public static final String BROKER_PROPNAME = "broker";
- /** Holds the name of the property to get the test broker virtual path. */
+ /**
+ * Holds the name of the property to get the test broker virtual path.
+ */
public static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
- /** Holds the name of the property to get the message rate from. */
+ /**
+ * Holds the name of the property to get the message rate from.
+ */
public static final String RATE_PROPNAME = "rate";
public static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
- /** Holds the true or false depending on wether it is P2P test or PubSub */
+ /**
+ * Holds the true or false depending on wether it is P2P test or PubSub
+ */
public static final String IS_PUBSUB_PROPNAME = "pubsub";
public static final String FAIL_AFTER_COMMIT_PROPNAME = "FailAfterCommit";
@@ -141,96 +153,147 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount";
- /** Holds the name of the property to get the waiting timeout for response messages. */
+ /**
+ * Holds the name of the property to get the waiting timeout for response messages.
+ */
public static final String TIMEOUT_PROPNAME = "timeout";
public static final String COMMIT_BATCH_SIZE_PROPNAME = "CommitBatchSize";
public static final String UNIQUE_PROPNAME = "uniqueDests";
- /** Used to set up a default message size. */
+ /**
+ * Used to set up a default message size.
+ */
public static final int DEFAULT_MESSAGE_SIZE = 0;
- /** Holds the name of the default destination to send pings on. */
+ /**
+ * Holds the name of the default destination to send pings on.
+ */
public static final String DEFAULT_PING_DESTINATION_NAME = "ping";
- /** Defines the default number of destinations to ping. */
+ /**
+ * Defines the default number of destinations to ping.
+ */
public static final int DEFAULT_DESTINATION_COUNT = 1;
- /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
+ /**
+ * Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction.
+ */
public static final int DEFAULT_RATE = 0;
- /** Defines the default wait between pings. */
+ /**
+ * Defines the default wait between pings.
+ */
public static final long DEFAULT_SLEEP_TIME = 250;
- /** Default time to wait before assuming that a ping has timed out. */
+ /**
+ * Default time to wait before assuming that a ping has timed out.
+ */
public static final long DEFAULT_TIMEOUT = 30000;
- /** Defines the default number of pings to send in each transaction when running transactionally. */
+ /**
+ * Defines the default number of pings to send in each transaction when running transactionally.
+ */
public static final int DEFAULT_TX_BATCH_SIZE = 100;
- /** Defines the default prefetch size to use when consuming messages. */
+ /**
+ * Defines the default prefetch size to use when consuming messages.
+ */
public static final int DEFAULT_PREFETCH = 100;
- /** Defines the default value of the no local flag to use when consuming messages. */
+ /**
+ * Defines the default value of the no local flag to use when consuming messages.
+ */
public static final boolean DEFAULT_NO_LOCAL = false;
- /** Defines the default value of the exclusive flag to use when consuming messages. */
+ /**
+ * Defines the default value of the exclusive flag to use when consuming messages.
+ */
public static final boolean DEFAULT_EXCLUSIVE = false;
- /** Holds the message delivery mode to use for the test. */
+ /**
+ * Holds the message delivery mode to use for the test.
+ */
public static final boolean DEFAULT_PERSISTENT_MODE = false;
- /** Holds the transactional mode to use for the test. */
+ /**
+ * Holds the transactional mode to use for the test.
+ */
public static final boolean DEFAULT_TRANSACTED = false;
- /** Holds the default broker url for the test. */
+ /**
+ * Holds the default broker url for the test.
+ */
public static final String DEFAULT_BROKER = "tcp://localhost:5672";
- /** Holds the default virtual path for the test. */
+ /**
+ * Holds the default virtual path for the test.
+ */
public static final String DEFAULT_VIRTUAL_PATH = "test";
- /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
+ /**
+ * Holds the pub/sub mode default, true means ping a topic, false means ping a queue.
+ */
public static final boolean DEFAULT_PUBSUB = false;
- /** Holds the default broker log on username. */
+ /**
+ * Holds the default broker log on username.
+ */
public static final String DEFAULT_USERNAME = "guest";
- /** Holds the default broker log on password. */
+ /**
+ * Holds the default broker log on password.
+ */
public static final String DEFAULT_PASSWORD = "guest";
- /** Holds the default message selector. */
+ /**
+ * Holds the default message selector.
+ */
public static final String DEFAULT_SELECTOR = null;
- /** Holds the default failover after commit test flag. */
+ /**
+ * Holds the default failover after commit test flag.
+ */
public static final String DEFAULT_FAIL_AFTER_COMMIT = "false";
- /** Holds the default failover before commit test flag. */
+ /**
+ * Holds the default failover before commit test flag.
+ */
public static final String DEFAULT_FAIL_BEFORE_COMMIT = "false";
- /** Holds the default failover after send test flag. */
+ /**
+ * Holds the default failover after send test flag.
+ */
public static final String DEFAULT_FAIL_AFTER_SEND = "false";
- /** Holds the default failover before send test flag. */
+ /**
+ * Holds the default failover before send test flag.
+ */
public static final String DEFAULT_FAIL_BEFORE_SEND = "false";
- /** Holds the default failover only once flag, true means only do one failover, false means failover on every commit cycle. */
+ /**
+ * Holds the default failover only once flag, true means only do one failover, false means failover on every commit cycle.
+ */
public static final String DEFAULT_FAIL_ONCE = "true";
- /** Holds the default verbose mode. */
+ /**
+ * Holds the default verbose mode.
+ */
public static final boolean DEFAULT_VERBOSE = false;
public static final boolean DEFAULT_UNIQUE = true;
- /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
+ /**
+ * Holds the name of the property to store nanosecond timestamps in ping messages with.
+ */
public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
- /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
+ /**
+ * A source for providing sequential unique correlation ids. These will be unique within the same JVM.
+ */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
- /** A source for providing unique ids to PingPongProducer. */
- private static AtomicInteger _pingProducerIdGenerator;
-
/**
* Holds a map from message ids to latches on which threads wait for replies. This map is shared accross
* multiple ping producers on the same JVM.
@@ -238,9 +301,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /*private static Map<String, CountDownLatch> trafficLights =
Collections.synchronizedMap(new HashMap<String, CountDownLatch>());*/
private static Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
- /** A convenient formatter to use when time stamping output. */
+ /**
+ * A convenient formatter to use when time stamping output.
+ */
protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
/**
@@ -249,70 +314,119 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
- /** Destination where the response messages will arrive. */
+ /**
+ * Destination where the response messages will arrive.
+ */
private Destination _replyDestination;
- /** Determines whether this producer sends persistent messages. */
+ /**
+ * Determines whether this producer sends persistent messages.
+ */
protected boolean _persistent;
- /** Determines what size of messages this producer sends. */
+ /**
+ * Determines what size of messages this producer sends.
+ */
protected int _messageSize;
- /** Used to indicate that the ping loop should print out whenever it pings. */
+ /**
+ * Used to indicate that the ping loop should print out whenever it pings.
+ */
protected boolean _verbose = false;
- /** Holds the session on which ping replies are received. */
+ /**
+ * Holds the session on which ping replies are received.
+ */
protected Session _consumerSession;
- /** Used to restrict the sending rate to a specified limit. */
+ /**
+ * Used to restrict the sending rate to a specified limit.
+ */
private Throttle _rateLimiter = null;
- /** Holds a message listener that this message listener chains all its messages to. */
+ /**
+ * Holds a message listener that this message listener chains all its messages to.
+ */
private ChainedMessageListener _chainedMessageListener = null;
- /** Flag used to indicate if this is a point to point or pub/sub ping client. */
+ /**
+ * Flag used to indicate if this is a point to point or pub/sub ping client.
+ */
protected boolean _isPubSub = false;
/**
+ * Flag used to indicate if the destinations should be unique client.
+ */
+ protected static boolean _isUnique = false;
+
+ /**
* This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
* on the same JVM using this id generator will allow them to ping on the same queues.
*/
protected AtomicInteger _queueSharedId = new AtomicInteger();
- /** Used to tell the ping loop when to terminate, it only runs while this is true. */
+ /**
+ * Used to tell the ping loop when to terminate, it only runs while this is true.
+ */
protected boolean _publish = true;
- /** Holds the connection to the broker. */
+ /**
+ * Holds the connection to the broker.
+ */
private Connection _connection;
- /** Holds the producer session, needed to create ping messages. */
+ /**
+ * Holds the producer session, needed to create ping messages.
+ */
private Session _producerSession;
- /** Holds the set of destiniations that this ping producer pings. */
+ /**
+ * Holds the set of destiniations that this ping producer pings.
+ */
protected List<Destination> _pingDestinations = new ArrayList<Destination>();
- /** Holds the message producer to send the pings through. */
+ /**
+ * Holds the message producer to send the pings through.
+ */
protected MessageProducer _producer;
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
+ /**
+ * Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit.
+ */
protected boolean _failBeforeCommit = false;
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
+ /**
+ * Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit.
+ */
protected boolean _failAfterCommit = false;
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
+ /**
+ * Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send.
+ */
protected boolean _failBeforeSend = false;
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
+ /**
+ * Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send.
+ */
protected boolean _failAfterSend = false;
- /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
+ /**
+ * Flag used to indicate that failover prompting should only be done on the first commit, not on every commit.
+ */
protected boolean _failOnce = true;
- /** Holds the number of sends that should be performed in every transaction when using transactions. */
+ /**
+ * Holds the number of sends that should be performed in every transaction when using transactions.
+ */
protected int _txBatchSize = 1;
/**
+ * Holds the number of consumers that will be attached to each topic.
+ * Each pings will result in a reply from each of the attached clients
+ */
+ static int _consumersPerTopic = 1;
+
+ /**
* Creates a ping producer with the specified parameters, of which there are many. See their individual comments
* for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it,
* to send and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
@@ -339,7 +453,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * possible, with no rate restriction.
* @param pubsub True to ping topics, false to ping queues.
* @param unique True to use unique destinations for each ping pong producer, false to share.
- *
* @throws Exception Any exceptions are allowed to fall through.
*/
public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
@@ -369,6 +482,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _failOnce = failOnce;
_txBatchSize = txBatchSize;
_isPubSub = pubsub;
+ _isUnique = unique;
// Check that one or more destinations were specified.
if (noOfDestinations < 1)
@@ -407,6 +521,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * to be started to bounce the pings back again.
*
* @param args The command line arguments.
+ * @throws Exception When something went wrong with the test
*/
public static void main(String[] args) throws Exception
{
@@ -479,9 +594,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Create a ping producer to handle the request/wait/reply cycle.
PingPongProducer pingProducer =
- new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
- transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
+ new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
+ transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
pingProducer.getConnection().start();
@@ -511,7 +626,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis Thread.sleep(sleepTime);
}
catch (InterruptedException ie)
- { }
+ {
+ //ignore
+ }
}
}
@@ -555,11 +672,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param rootName The root of the name, or actual name if only one is being created.
* @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share
* the numbering with all pingers on the same JVM.
- *
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
- throws JMSException
+ throws JMSException
{
_logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
+ ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
@@ -568,7 +684,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Create the desired number of ping destinations and consumers for them.
for (int i = 0; i < noOfDestinations; i++)
{
- AMQDestination destination = null;
+ AMQDestination destination;
int id;
@@ -701,11 +817,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param message The message to send.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
- *
* @return The number of replies received. This may be less than the number sent if the timeout terminated the
* wait for all prematurely.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws InterruptedException When interrupted by a timeout.
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
{
@@ -727,14 +842,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
* @param messageCorrelationId The message correlation id.
- *
* @return The number of replies received. This may be less than the number sent if the timeout terminated the
* wait for all prematurely.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ * @throws InterruptedException When interrupted by a timeout
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
+ throws JMSException, InterruptedException
{
_logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
@@ -747,7 +861,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // chained message listener must be called before this sender can be unblocked, but that decrementing the
// countdown needs to be done before the chained listener can be called.
PerCorrelationId perCorrelationId = new PerCorrelationId();
- perCorrelationId.trafficLight = new CountDownLatch(numPings + 1);
+
+ perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1);
perCorrelationIds.put(messageCorrelationId, perCorrelationId);
// Set up the current time as the start time for pinging on the correlation id. This is used to determine
@@ -767,11 +882,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
// Work out how many replies were receieved.
- numReplies = numPings - (int) perCorrelationId.trafficLight.getCount();
- allMessagesReceived = numReplies >= numPings;
+ numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount();
+
+ allMessagesReceived = numReplies == getExpectedNumPings(numPings);
- _logger.debug("numReplies = "+ numReplies);
- _logger.debug("allMessagesReceived = "+ allMessagesReceived);
+ _logger.debug("numReplies = " + numReplies);
+ _logger.debug("allMessagesReceived = " + allMessagesReceived);
// Recheck the timeout condition.
long now = System.nanoTime();
@@ -783,7 +899,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
while (!timedOut && !allMessagesReceived);
- if ((numReplies < numPings) && _verbose)
+ if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
{
_logger.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
}
@@ -812,7 +928,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param message The message to send.
* @param numPings The number of pings to send.
* @param messageCorrelationId A correlation id to place on all messages sent.
- *
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
@@ -927,9 +1042,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @param replyQueue The reply-to destination for the message.
* @param messageSize The desired size of the message in bytes.
* @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
- *
* @return A freshly generated test message.
- *
* @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
*/
public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
@@ -984,12 +1097,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public Thread getShutdownHook()
{
return new Thread(new Runnable()
+ {
+ public void run()
{
- public void run()
- {
- stop();
- }
- });
+ stop();
+ }
+ });
}
/**
@@ -1007,7 +1120,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis *
* @param destinations The destinations to listen to.
* @param selector A selector to filter the messages with.
- *
* @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
*/
public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
@@ -1019,8 +1131,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis {
// Create a consumer for the destination and set this pinger to listen to its messages.
MessageConsumer consumer =
- _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
- selector);
+ _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
+ selector);
consumer.setMessageListener(this);
}
}
@@ -1043,19 +1155,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /**
* Convenience method to commit the transaction on the specified session. If the session to commit on is not
* a transactional session, this method does nothing (unless the failover after send flag is set).
- *
+ * <p/>
* <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit
* is applied. This flag applies whether the pinger is transactional or not.
- *
+ * <p/>
* <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
* commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
* after the commit is applied. These flags will only apply if using a transactional pinger.
*
+ * @param session The session to commit
* @throws javax.jms.JMSException If the commit fails and then the rollback fails.
- *
- * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
- * method, because commits only apply to transactional pingers, but fail after send applied to transactional
- * and non-transactional alike.
+ * <p/>
+ * //todo @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
+ * method, because commits only apply to transactional pingers, but fail after send applied to transactional
+ * and non-transactional alike.
*/
protected void commitTx(Session session) throws JMSException
{
@@ -1136,7 +1249,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis *
* @param destination The destination to send to.
* @param message The message to send.
- *
* @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
*/
protected void sendMessage(Destination destination, Message message) throws JMSException
@@ -1174,17 +1286,35 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis System.in.read();
}
catch (IOException e)
- { }
+ {
+ //ignore
+ }
System.out.println("Continuing.");
}
/**
+ * This value will be changed by PingClient to represent the number of clients connected to each topic.
+ *
+ * @return int The number of consumers subscribing to each topic.
+ */
+ public int getConsumersPerTopic()
+ {
+ return _consumersPerTopic;
+ }
+
+ public int getExpectedNumPings(int numpings)
+ {
+ return numpings * getConsumersPerTopic();
+ }
+
+
+ /**
* Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's
* {@link PingPongProducer#onMessage} method is called, the chained listener set through the
* {@link PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected
* count of messages with that correlation id.
- *
+ * <p/>
* Provided only one pinger is producing messages with that correlation id, the chained listener will always be
* given unique message counts. It will always be called while the producer waiting for all messages to arrive is
* still blocked.
@@ -1200,10 +1330,14 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
protected static class PerCorrelationId
{
- /** Holds a countdown on number of expected messages. */
+ /**
+ * Holds a countdown on number of expected messages.
+ */
CountDownLatch trafficLight;
- /** Holds the last timestamp that the timeout was reset to. */
+ /**
+ * Holds the last timestamp that the timeout was reset to.
+ */
Long timeOutStart;
}
}
diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java index c01987cfc0..347031ff51 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -70,7 +70,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ private Map<String, PerCorrelationId> perCorrelationIds = - Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); /** Holds the batched results listener, that does logging on batch boundaries. */ private BatchedResultsListener batchedResultsListener = null; @@ -91,6 +91,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA /** * Compile all the tests into a test suite. + * @return The test suite to run. Should only contain testAsyncPingOk method. */ public static Test suite() { @@ -128,6 +129,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA * all replies have been received or a time out occurs before exiting this method. * * @param numPings The number of pings to send. + * @throws Exception pass all errors out to the test harness */ public void testAsyncPingOk(int numPings) throws Exception { @@ -151,7 +153,7 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA PerCorrelationId perCorrelationId = new PerCorrelationId(); TimingController tc = getTimingController().getControllerForCurrentThread(); perCorrelationId._tc = tc; - perCorrelationId._expectedCount = numPings; + perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); perCorrelationIds.put(messageCorrelationId, perCorrelationId); // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these @@ -160,18 +162,18 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // Generate a sample message of the specified size. ObjectMessage msg = - pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, messageCorrelationId); // Check that all the replies were received and log a fail if they were not. - if (numReplies < numPings) + if (numReplies < perCorrelationId._expectedCount) { - tc.completeTest(false, numPings - numReplies); + tc.completeTest(false, numPings - perCorrelationId._expectedCount); } // Remove the chained message listener from the ping producer. diff --git a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java index 0d0df0128e..07d7fad471 100644 --- a/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java +++ b/java/perftests/src/test/java/org/apache/qpid/ping/PingTestPerf.java @@ -110,6 +110,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware /**
* Compile all the tests into a test suite.
+ * @return The test method testPingOk.
*/
public static Test suite()
{
@@ -139,18 +140,18 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware // Generate a sample message. This message is already time stamped and has its reply-to destination set.
ObjectMessage msg =
- perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(
- PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(
- PingPongProducer.PERSISTENT_MODE_PROPNAME));
+ perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
+ testParameters.getPropertyAsInteger(
+ PingPongProducer.MESSAGE_SIZE_PROPNAME),
+ testParameters.getPropertyAsBoolean(
+ PingPongProducer.PERSISTENT_MODE_PROPNAME));
// start the test
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout);
// Fail the test if the timeout was exceeded.
- if (numReplies != numPings)
+ if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings))
{
Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
+ numReplies);
@@ -191,7 +192,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware // Extract the test set up paramaeters.
int destinationscount =
- Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
+ Integer.parseInt(testParameters.getProperty(PingPongProducer.PING_DESTINATION_COUNT_PROPNAME));
// This is synchronized because there is a race condition, which causes one connection to sleep if
// all threads try to create connection concurrently.
|
