summaryrefslogtreecommitdiff
path: root/qpid/java/perftests/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2010-02-25 15:16:51 +0000
committerMartin Ritchie <ritchiem@apache.org>2010-02-25 15:16:51 +0000
commit573f4ab8b94f50eb7f2dfd434d7301d5fbcc14ce (patch)
treec857c72f2632ffab6983ca1302f19802aebc73ba /qpid/java/perftests/src
parent0756f789b1edae00c28906717e7594cde217017f (diff)
downloadqpid-python-573f4ab8b94f50eb7f2dfd434d7301d5fbcc14ce.tar.gz
QPID-2421 : Augmented Async Performance test to take new 'preFill' value, that puts <n> messages onto the broker destination before the test begins.
When running on a non-TX'd producer session the use of the new 'delayBeforeConsume' will pause the client for <n> ms before the test starts, giving the producer session time to flush. This new functionality can be explored with the new 'testWithPreFill' script. The 'numConsumer' parameter was augmented to allow a 0 value which disables all the consumers. This can be seen with the 'fillBroker' script. To complement that a new 'consumeOnly' boolean was added to disable sending messages. This can be seen with the 'drainBroker' scripts. All scripts are located in java/perftests/etc/scripts Merged from 0.5.x-dev commit r916304 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@916318 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/src')
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java17
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java62
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java111
3 files changed, 177 insertions, 13 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
index 89fc805a34..d8fea85477 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java
@@ -133,8 +133,11 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
{
// _logger.debug("public void testAsyncPingOk(int numPings): called");
+ // get prefill count to update the expected count
+ int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+
// Ensure that at least one ping was requeusted.
- if (numPings == 0)
+ if (numPings + preFill == 0)
{
_logger.error("Number of pings requested was zero.");
fail("Number of pings requested was zero.");
@@ -149,16 +152,24 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA
// String messageCorrelationId = perThreadSetup._correlationId;
// _logger.debug("messageCorrelationId = " + messageCorrelationId);
+
// Initialize the count and timing controller for the new correlation id.
PerCorrelationId perCorrelationId = new PerCorrelationId();
TimingController tc = getTimingController().getControllerForCurrentThread();
perCorrelationId._tc = tc;
- perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings);
+ perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill);
perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId);
+ // Start the client that will have been paused due to preFill requirement.
+ // or if we have not yet started client because messages are sitting on broker.
+ if (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+ {
+ pingClient.start();
+ }
+
// Send the requested number of messages, and wait until they have all been received.
long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId);
+ int numReplies = pingClient.pingAndWaitForReply(null, numPings , preFill, timeout, perThreadSetup._correlationId);
// Check that all the replies were received and log a fail if they were not.
if (numReplies < perCorrelationId._expectedCount)
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
index 94b8ea662e..0f9603303b 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java
@@ -141,9 +141,65 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
perThreadSetup._pingClient = new PingClient(testParameters);
perThreadSetup._pingClient.establishConnection(true, true);
}
- // Start the client connection
- perThreadSetup._pingClient.start();
+ // Prefill the broker unless we are in consume only mode.
+ int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME);
+ if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0)
+ {
+ // Manually set the correlation ID to 1. This is not ideal but it is the
+ // value that the main test loop will use.
+ perThreadSetup._pingClient.pingNoWaitForReply(null, preFill, "1");
+
+ // Note with a large preFill and non-tx session the messages will be
+ // rapidly pushed in to the mina buffers. OOM's are a real risk here.
+ // Should perhaps consider using a TX session for the prefill.
+
+ long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME);
+
+ // Only delay if we have consumers and a delayBeforeConsume
+ if ((testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0)
+ && delayBeforeConsume > 0)
+ {
+
+ boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
+ // Only do logging if in verbose mode.
+ if (verbose)
+ {
+ if (delayBeforeConsume > 60000)
+ {
+ long minutes = delayBeforeConsume / 60000;
+ long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000;
+ long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000);
+ _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test.");
+ }
+ else
+ {
+ _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test.");
+ }
+ }
+
+ Thread.sleep(delayBeforeConsume);
+
+ if (verbose)
+ {
+ _logger.info("Starting Test.");
+ }
+ }
+
+ // We can't start the client's here as the test client has not yet been configured to receieve messages.
+ // only when the test method is executed will the correlationID map be set up and ready to consume
+ // the messages we have sent here.
+ }
+ else //Only start the consumer if we are not preFilling.
+ {
+ // Only start the consumer if we don't have messages waiting to be received.
+ // we need to set up the correlationID mapping first
+ if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME))
+ {
+ // Start the client connection
+ perThreadSetup._pingClient.start();
+ }
+ }
// Attach the per-thread set to the thread.
threadSetup.set(perThreadSetup);
}
@@ -157,7 +213,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
* Performs test fixture clean
*/
public void threadTearDown()
- {
+ {
_logger.debug("public void threadTearDown(): called");
try
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index f994cd138e..e3769e415e 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -324,6 +324,25 @@ public class PingPongProducer implements Runnable, ExceptionListener
/** Holds the name of the property to store nanosecond timestamps in ping messages with. */
public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
+ /** Holds the name of the property to get the number of message to prefill the broker with before starting the main test. */
+ public static final String PREFILL_PROPNAME = "preFill";
+
+ /** Defines the default value for the number of messages to prefill. 0,default, no messages. */
+ public static final int PREFILL_DEFAULT = 0;
+
+ /** Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled. */
+ public static final String DELAY_BEFORE_CONSUME_PROPNAME = "delayBeforeConsume";
+
+ /** Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay. */
+ public static final long DELAY_BEFORE_CONSUME = 0;
+
+ /** Holds the name of the property to get when no messasges should be sent. */
+ public static final String CONSUME_ONLY_PROPNAME = "consumeOnly";
+
+ /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */
+ public static final boolean CONSUME_ONLY_DEFAULT = false;
+
+
/** Holds the default configuration properties. */
public static ParsedProperties defaults = new ParsedProperties();
@@ -360,6 +379,9 @@ public class PingPongProducer implements Runnable, ExceptionListener
defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
+ defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT);
+ defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME);
+ defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT);
}
/** Allows setting of client ID on the connection, rather than through the connection URL. */
@@ -455,6 +477,24 @@ public class PingPongProducer implements Runnable, ExceptionListener
*/
protected int _maxPendingSize;
+ /**
+ * Holds the number of messages to send during the setup phase, before the clients start consuming.
+ */
+ private Integer _preFill;
+
+ /**
+ * Holds the time in ms to wait after preFilling before starting thet test.
+ */
+ private Long _delayBeforeConsume;
+
+ /**
+ * Holds a boolean value of wither this test should just consume, i.e. skips
+ * sending messages, but still expects to receive the specified number.
+ * Use in conjuction with numConsumers=0 to fill the broker.
+ */
+ private boolean _consumeOnly;
+
+
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
@@ -588,6 +628,9 @@ public class PingPongProducer implements Runnable, ExceptionListener
_ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
_consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
_maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
+ _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME);
+ _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME);
+ _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME);
// Check that one or more destinations were specified.
if (_noOfDestinations < 1)
@@ -638,7 +681,10 @@ public class PingPongProducer implements Runnable, ExceptionListener
}
// Create the destinations to send pings to and receive replies from.
- _replyDestination = _consumerSession[0].createTemporaryQueue();
+ if (_noOfConsumers > 0)
+ {
+ _replyDestination = _consumerSession[0].createTemporaryQueue();
+ }
createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
// Create the message producer only if instructed to.
@@ -871,6 +917,14 @@ public class PingPongProducer implements Runnable, ExceptionListener
{
_consumer = new MessageConsumer[_noOfConsumers];
+ // If we don't have consumers then ensure we have created the
+ // destination.
+ if (_noOfConsumers == 0)
+ {
+ _producerSession.createConsumer(destination, selector,
+ NO_LOCAL_DEFAULT).close();
+ }
+
for (int i = 0; i < _noOfConsumers; i++)
{
// Create a consumer for the destination and set this pinger to listen to its messages.
@@ -980,6 +1034,11 @@ public class PingPongProducer implements Runnable, ExceptionListener
// When running in client ack mode, an ack is done instead of a commit, on the commit batch
// size boundaries.
long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
+ // _noOfConsumers can be set to 0 on the command line but we will not get here to
+ // divide by 0 as this is executed by the onMessage code when a message is recevied.
+ // no consumers means no message reception.
+
+
// log.debug("commitCount = " + commitCount);
if ((commitCount % _txBatchSize) == 0)
@@ -1014,6 +1073,7 @@ public class PingPongProducer implements Runnable, ExceptionListener
else
{
log.warn("Got unexpected message with correlationId: " + correlationID);
+ log.warn("Map contains:" + perCorrelationIds.entrySet());
}
}
else
@@ -1037,13 +1097,18 @@ public class PingPongProducer implements Runnable, ExceptionListener
* before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
* the correlation id.
*
+ * Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination
+ * with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings.
+ *
+ * If pre-fill is specified then the consumers will start paused to allow the prefilling to occur.
+ *
* @param message The message to send. If this is null, one is generated.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
* @param messageCorrelationId The message correlation id. If this is null, one is generated.
*
* @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
- * for all prematurely.
+ * for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent.
*
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
* @throws InterruptedException When interrupted by a timeout
@@ -1051,6 +1116,16 @@ public class PingPongProducer implements Runnable, ExceptionListener
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
throws JMSException, InterruptedException
{
+ return pingAndWaitForReply(message, numPings, 0, timeout, messageCorrelationId);
+ }
+
+ public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId)
+ throws JMSException, InterruptedException
+ {
+
+ // If we are runnning a consumeOnly test then don't send any messages
+
+
/*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
@@ -1071,29 +1146,41 @@ public class PingPongProducer implements Runnable, ExceptionListener
// countdown needs to be done before the chained listener can be called.
PerCorrelationId perCorrelationId = new PerCorrelationId();
- perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1);
+ int totalPingsRequested = numPings + preFill;
+ perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(totalPingsRequested) + 1);
perCorrelationIds.put(messageCorrelationId, perCorrelationId);
// Set up the current time as the start time for pinging on the correlation id. This is used to determine
// timeouts.
perCorrelationId.timeOutStart = System.nanoTime();
- // Send the specifed number of messages.
+ // Send the specifed number of messages for this test
pingNoWaitForReply(message, numPings, messageCorrelationId);
boolean timedOut;
boolean allMessagesReceived;
int numReplies;
+ // We don't have a consumer so don't try and wait for the messages.
+ // this does mean that if the producerSession is !TXed then we may
+ // get to exit before all msgs have been received.
+ //
+ // Return the number of requested messages, this will let the test
+ // report a pass.
+ if (_noOfConsumers == 0)
+ {
+ return totalPingsRequested;
+ }
+
do
{
// Block the current thread until replies to all the messages are received, or it times out.
perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
// Work out how many replies were receieved.
- numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount();
+ numReplies = getExpectedNumPings(totalPingsRequested) - (int) perCorrelationId.trafficLight.getCount();
- allMessagesReceived = numReplies == getExpectedNumPings(numPings);
+ allMessagesReceived = numReplies == getExpectedNumPings(totalPingsRequested);
// log.debug("numReplies = " + numReplies);
// log.debug("allMessagesReceived = " + allMessagesReceived);
@@ -1108,7 +1195,7 @@ public class PingPongProducer implements Runnable, ExceptionListener
}
while (!timedOut && !allMessagesReceived);
- if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
+ if ((numReplies < getExpectedNumPings(totalPingsRequested)) && _verbose)
{
log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
}
@@ -1146,6 +1233,12 @@ public class PingPongProducer implements Runnable, ExceptionListener
/*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
+ // If we are runnning a consumeOnly test then don't send any messages
+ if (_consumeOnly)
+ {
+ return;
+ }
+
if (message == null)
{
message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
@@ -1667,6 +1760,10 @@ public class PingPongProducer implements Runnable, ExceptionListener
/**
* Calculates how many pings are expected to be received for the given number sent.
*
+ * Note : that if you have set noConsumers to 0 then this will also return 0
+ * in the case of PubSub testing. This is correct as without consumers there
+ * will be no-one to receive the sent messages so they will be unable to respond.
+ *
* @param numpings The number of pings that will be sent.
*
* @return The number that should be received, for the test to pass.