diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2010-02-25 15:16:51 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2010-02-25 15:16:51 +0000 |
| commit | 573f4ab8b94f50eb7f2dfd434d7301d5fbcc14ce (patch) | |
| tree | c857c72f2632ffab6983ca1302f19802aebc73ba /qpid/java/perftests/src | |
| parent | 0756f789b1edae00c28906717e7594cde217017f (diff) | |
| download | qpid-python-573f4ab8b94f50eb7f2dfd434d7301d5fbcc14ce.tar.gz | |
QPID-2421 : Augmented Async Performance test to take new 'preFill' value, that puts <n> messages onto the broker destination before the test begins.
When running on a non-TX'd producer session the use of the new 'delayBeforeConsume' will pause the client for <n> ms before the test starts, giving the producer session time to flush.
This new functionality can be explored with the new 'testWithPreFill' script.
The 'numConsumer' parameter was augmented to allow a 0 value which disables all the consumers. This can be seen with the 'fillBroker' script.
To complement that a new 'consumeOnly' boolean was added to disable sending messages. This can be seen with the 'drainBroker' scripts.
All scripts are located in java/perftests/etc/scripts
Merged from 0.5.x-dev commit r916304
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@916318 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/src')
3 files changed, 177 insertions, 13 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java index 89fc805a34..d8fea85477 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -133,8 +133,11 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA { // _logger.debug("public void testAsyncPingOk(int numPings): called"); + // get prefill count to update the expected count + int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); + // Ensure that at least one ping was requeusted. - if (numPings == 0) + if (numPings + preFill == 0) { _logger.error("Number of pings requested was zero."); fail("Number of pings requested was zero."); @@ -149,16 +152,24 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // String messageCorrelationId = perThreadSetup._correlationId; // _logger.debug("messageCorrelationId = " + messageCorrelationId); + // Initialize the count and timing controller for the new correlation id. PerCorrelationId perCorrelationId = new PerCorrelationId(); TimingController tc = getTimingController().getControllerForCurrentThread(); perCorrelationId._tc = tc; - perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); + perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill); perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); + // Start the client that will have been paused due to preFill requirement. + // or if we have not yet started client because messages are sitting on broker. + if (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME)) + { + pingClient.start(); + } + // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId); + int numReplies = pingClient.pingAndWaitForReply(null, numPings , preFill, timeout, perThreadSetup._correlationId); // Check that all the replies were received and log a fail if they were not. if (numReplies < perCorrelationId._expectedCount) diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java index 94b8ea662e..0f9603303b 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java @@ -141,9 +141,65 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware perThreadSetup._pingClient = new PingClient(testParameters); perThreadSetup._pingClient.establishConnection(true, true); } - // Start the client connection - perThreadSetup._pingClient.start(); + // Prefill the broker unless we are in consume only mode. + int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); + if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0) + { + // Manually set the correlation ID to 1. This is not ideal but it is the + // value that the main test loop will use. + perThreadSetup._pingClient.pingNoWaitForReply(null, preFill, "1"); + + // Note with a large preFill and non-tx session the messages will be + // rapidly pushed in to the mina buffers. OOM's are a real risk here. + // Should perhaps consider using a TX session for the prefill. + + long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME); + + // Only delay if we have consumers and a delayBeforeConsume + if ((testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0) + && delayBeforeConsume > 0) + { + + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); + // Only do logging if in verbose mode. + if (verbose) + { + if (delayBeforeConsume > 60000) + { + long minutes = delayBeforeConsume / 60000; + long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000; + long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000); + _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test."); + } + else + { + _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test."); + } + } + + Thread.sleep(delayBeforeConsume); + + if (verbose) + { + _logger.info("Starting Test."); + } + } + + // We can't start the client's here as the test client has not yet been configured to receieve messages. + // only when the test method is executed will the correlationID map be set up and ready to consume + // the messages we have sent here. + } + else //Only start the consumer if we are not preFilling. + { + // Only start the consumer if we don't have messages waiting to be received. + // we need to set up the correlationID mapping first + if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME)) + { + // Start the client connection + perThreadSetup._pingClient.start(); + } + } // Attach the per-thread set to the thread. threadSetup.set(perThreadSetup); } @@ -157,7 +213,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware * Performs test fixture clean */ public void threadTearDown() - { + { _logger.debug("public void threadTearDown(): called"); try diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index f994cd138e..e3769e415e 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -324,6 +324,25 @@ public class PingPongProducer implements Runnable, ExceptionListener /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; + /** Holds the name of the property to get the number of message to prefill the broker with before starting the main test. */ + public static final String PREFILL_PROPNAME = "preFill"; + + /** Defines the default value for the number of messages to prefill. 0,default, no messages. */ + public static final int PREFILL_DEFAULT = 0; + + /** Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled. */ + public static final String DELAY_BEFORE_CONSUME_PROPNAME = "delayBeforeConsume"; + + /** Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay. */ + public static final long DELAY_BEFORE_CONSUME = 0; + + /** Holds the name of the property to get when no messasges should be sent. */ + public static final String CONSUME_ONLY_PROPNAME = "consumeOnly"; + + /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */ + public static final boolean CONSUME_ONLY_DEFAULT = false; + + /** Holds the default configuration properties. */ public static ParsedProperties defaults = new ParsedProperties(); @@ -360,6 +379,9 @@ public class PingPongProducer implements Runnable, ExceptionListener defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); + defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT); + defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME); + defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT); } /** Allows setting of client ID on the connection, rather than through the connection URL. */ @@ -455,6 +477,24 @@ public class PingPongProducer implements Runnable, ExceptionListener */ protected int _maxPendingSize; + /** + * Holds the number of messages to send during the setup phase, before the clients start consuming. + */ + private Integer _preFill; + + /** + * Holds the time in ms to wait after preFilling before starting thet test. + */ + private Long _delayBeforeConsume; + + /** + * Holds a boolean value of wither this test should just consume, i.e. skips + * sending messages, but still expects to receive the specified number. + * Use in conjuction with numConsumers=0 to fill the broker. + */ + private boolean _consumeOnly; + + /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); @@ -588,6 +628,9 @@ public class PingPongProducer implements Runnable, ExceptionListener _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME); _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); + _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME); + _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME); + _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME); // Check that one or more destinations were specified. if (_noOfDestinations < 1) @@ -638,7 +681,10 @@ public class PingPongProducer implements Runnable, ExceptionListener } // Create the destinations to send pings to and receive replies from. - _replyDestination = _consumerSession[0].createTemporaryQueue(); + if (_noOfConsumers > 0) + { + _replyDestination = _consumerSession[0].createTemporaryQueue(); + } createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); // Create the message producer only if instructed to. @@ -871,6 +917,14 @@ public class PingPongProducer implements Runnable, ExceptionListener { _consumer = new MessageConsumer[_noOfConsumers]; + // If we don't have consumers then ensure we have created the + // destination. + if (_noOfConsumers == 0) + { + _producerSession.createConsumer(destination, selector, + NO_LOCAL_DEFAULT).close(); + } + for (int i = 0; i < _noOfConsumers; i++) { // Create a consumer for the destination and set this pinger to listen to its messages. @@ -980,6 +1034,11 @@ public class PingPongProducer implements Runnable, ExceptionListener // When running in client ack mode, an ack is done instead of a commit, on the commit batch // size boundaries. long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); + // _noOfConsumers can be set to 0 on the command line but we will not get here to + // divide by 0 as this is executed by the onMessage code when a message is recevied. + // no consumers means no message reception. + + // log.debug("commitCount = " + commitCount); if ((commitCount % _txBatchSize) == 0) @@ -1014,6 +1073,7 @@ public class PingPongProducer implements Runnable, ExceptionListener else { log.warn("Got unexpected message with correlationId: " + correlationID); + log.warn("Map contains:" + perCorrelationIds.entrySet()); } } else @@ -1037,13 +1097,18 @@ public class PingPongProducer implements Runnable, ExceptionListener * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify * the correlation id. * + * Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination + * with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings. + * + * If pre-fill is specified then the consumers will start paused to allow the prefilling to occur. + * * @param message The message to send. If this is null, one is generated. * @param numPings The number of ping messages to send. * @param timeout The timeout in milliseconds. * @param messageCorrelationId The message correlation id. If this is null, one is generated. * * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait - * for all prematurely. + * for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent. * * @throws JMSException All underlying JMSExceptions are allowed to fall through. * @throws InterruptedException When interrupted by a timeout @@ -1051,6 +1116,16 @@ public class PingPongProducer implements Runnable, ExceptionListener public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) throws JMSException, InterruptedException { + return pingAndWaitForReply(message, numPings, 0, timeout, messageCorrelationId); + } + + public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId) + throws JMSException, InterruptedException + { + + // If we are runnning a consumeOnly test then don't send any messages + + /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ @@ -1071,29 +1146,41 @@ public class PingPongProducer implements Runnable, ExceptionListener // countdown needs to be done before the chained listener can be called. PerCorrelationId perCorrelationId = new PerCorrelationId(); - perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1); + int totalPingsRequested = numPings + preFill; + perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(totalPingsRequested) + 1); perCorrelationIds.put(messageCorrelationId, perCorrelationId); // Set up the current time as the start time for pinging on the correlation id. This is used to determine // timeouts. perCorrelationId.timeOutStart = System.nanoTime(); - // Send the specifed number of messages. + // Send the specifed number of messages for this test pingNoWaitForReply(message, numPings, messageCorrelationId); boolean timedOut; boolean allMessagesReceived; int numReplies; + // We don't have a consumer so don't try and wait for the messages. + // this does mean that if the producerSession is !TXed then we may + // get to exit before all msgs have been received. + // + // Return the number of requested messages, this will let the test + // report a pass. + if (_noOfConsumers == 0) + { + return totalPingsRequested; + } + do { // Block the current thread until replies to all the messages are received, or it times out. perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); // Work out how many replies were receieved. - numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount(); + numReplies = getExpectedNumPings(totalPingsRequested) - (int) perCorrelationId.trafficLight.getCount(); - allMessagesReceived = numReplies == getExpectedNumPings(numPings); + allMessagesReceived = numReplies == getExpectedNumPings(totalPingsRequested); // log.debug("numReplies = " + numReplies); // log.debug("allMessagesReceived = " + allMessagesReceived); @@ -1108,7 +1195,7 @@ public class PingPongProducer implements Runnable, ExceptionListener } while (!timedOut && !allMessagesReceived); - if ((numReplies < getExpectedNumPings(numPings)) && _verbose) + if ((numReplies < getExpectedNumPings(totalPingsRequested)) && _verbose) { log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); } @@ -1146,6 +1233,12 @@ public class PingPongProducer implements Runnable, ExceptionListener /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ + // If we are runnning a consumeOnly test then don't send any messages + if (_consumeOnly) + { + return; + } + if (message == null) { message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); @@ -1667,6 +1760,10 @@ public class PingPongProducer implements Runnable, ExceptionListener /** * Calculates how many pings are expected to be received for the given number sent. * + * Note : that if you have set noConsumers to 0 then this will also return 0 + * in the case of PubSub testing. This is correct as without consumers there + * will be no-one to receive the sent messages so they will be unable to respond. + * * @param numpings The number of pings that will be sent. * * @return The number that should be received, for the test to pass. |
