From 573f4ab8b94f50eb7f2dfd434d7301d5fbcc14ce Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Thu, 25 Feb 2010 15:16:51 +0000 Subject: QPID-2421 : Augmented Async Performance test to take new 'preFill' value, that puts messages onto the broker destination before the test begins. When running on a non-TX'd producer session the use of the new 'delayBeforeConsume' will pause the client for ms before the test starts, giving the producer session time to flush. This new functionality can be explored with the new 'testWithPreFill' script. The 'numConsumer' parameter was augmented to allow a 0 value which disables all the consumers. This can be seen with the 'fillBroker' script. To complement that a new 'consumeOnly' boolean was added to disable sending messages. This can be seen with the 'drainBroker' scripts. All scripts are located in java/perftests/etc/scripts Merged from 0.5.x-dev commit r916304 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@916318 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/java/perftests/etc/scripts/drainBroker.sh | 41 ++++++++ qpid/java/perftests/etc/scripts/fillBroker.sh | 41 ++++++++ qpid/java/perftests/etc/scripts/testWithPreFill.sh | 41 ++++++++ .../org/apache/qpid/ping/PingAsyncTestPerf.java | 17 +++- .../java/org/apache/qpid/ping/PingTestPerf.java | 62 +++++++++++- .../apache/qpid/requestreply/PingPongProducer.java | 111 +++++++++++++++++++-- 6 files changed, 300 insertions(+), 13 deletions(-) create mode 100755 qpid/java/perftests/etc/scripts/drainBroker.sh create mode 100755 qpid/java/perftests/etc/scripts/fillBroker.sh create mode 100755 qpid/java/perftests/etc/scripts/testWithPreFill.sh (limited to 'qpid/java') diff --git a/qpid/java/perftests/etc/scripts/drainBroker.sh b/qpid/java/perftests/etc/scripts/drainBroker.sh new file mode 100755 index 0000000000..eea7209f03 --- /dev/null +++ b/qpid/java/perftests/etc/scripts/drainBroker.sh @@ -0,0 +1,41 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if [ -z "$QPID_HOME" ]; then + export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0)))) + export PATH=${PATH}:${QPID_HOME}/bin +fi + +# Parse arguements taking all - prefixed args as JAVA_OPTS +for arg in "$@"; do + if [[ $arg == -java:* ]]; then + JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " + else + ARGS="${ARGS}$arg " + fi +done + +# Set classpath to include Qpid jar with all required jars in manifest +QPID_LIBS=$QPID_HOME/lib/qpid-all.jar + +# Set other variables used by the qpid-run script before calling +export JAVA=java JAVA_VM=-server JAVA_MEM=-Xmx1024m QPID_CLASSPATH=$QPID_LIBS + +. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01 -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=0 consumeOnly=true consumeOnly=true${ARGS} diff --git a/qpid/java/perftests/etc/scripts/fillBroker.sh b/qpid/java/perftests/etc/scripts/fillBroker.sh new file mode 100755 index 0000000000..5b7de6f999 --- /dev/null +++ b/qpid/java/perftests/etc/scripts/fillBroker.sh @@ -0,0 +1,41 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if [ -z "$QPID_HOME" ]; then + export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0)))) + export PATH=${PATH}:${QPID_HOME}/bin +fi + +# Parse arguements taking all - prefixed args as JAVA_OPTS +for arg in "$@"; do + if [[ $arg == -java:* ]]; then + JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " + else + ARGS="${ARGS}$arg " + fi +done + +# Set classpath to include Qpid jar with all required jars in manifest +QPID_LIBS=$QPID_HOME/lib/qpid-all.jar + +# Set other variables used by the qpid-run script before calling +export JAVA=java JAVA_VM=-server JAVA_MEM=-Xmx1024m QPID_CLASSPATH=$QPID_LIBS + +. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01 -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=0 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=0 ${ARGS} diff --git a/qpid/java/perftests/etc/scripts/testWithPreFill.sh b/qpid/java/perftests/etc/scripts/testWithPreFill.sh new file mode 100755 index 0000000000..721ecf6ecc --- /dev/null +++ b/qpid/java/perftests/etc/scripts/testWithPreFill.sh @@ -0,0 +1,41 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if [ -z "$QPID_HOME" ]; then + export QPID_HOME=$(dirname $(dirname $(dirname $(readlink -f $0)))) + export PATH=${PATH}:${QPID_HOME}/bin +fi + +# Parse arguements taking all - prefixed args as JAVA_OPTS +for arg in "$@"; do + if [[ $arg == -java:* ]]; then + JAVA_OPTS="${JAVA_OPTS}-`echo $arg|cut -d ':' -f 2` " + else + ARGS="${ARGS}$arg " + fi +done + +# Set classpath to include Qpid jar with all required jars in manifest +QPID_LIBS=$QPID_HOME/lib/qpid-all.jar + +# Set other variables used by the qpid-run script before calling +export JAVA=java JAVA_VM=-server JAVA_MEM=-Xmx1024m QPID_CLASSPATH=$QPID_LIBS + +. qpid-run -Xms256m -Dlog4j.configuration=file://${QPID_HOME}/etc/perftests.log4j -Dbadger.level=warn -Damqj.test.logging.level=info -Damqj.logging.level=warn org.apache.qpid.junit.extensions.TKTestRunner -n PQBT-TX-Qpid-01 -s[1000] -c[1] -o $QPID_WORK/results -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf persistent=true pubsub=false uniqueDests=true numConsumers=1 transacted=true consTransacted=true consAckMode=0 commitBatchSize=1 batchSize=1000 messageSize=256 destinationCount=1 rate=0 maxPending=0 preFill=1000 delayBeforeConsume=1000 ${ARGS} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java index 89fc805a34..d8fea85477 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java @@ -133,8 +133,11 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA { // _logger.debug("public void testAsyncPingOk(int numPings): called"); + // get prefill count to update the expected count + int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); + // Ensure that at least one ping was requeusted. - if (numPings == 0) + if (numPings + preFill == 0) { _logger.error("Number of pings requested was zero."); fail("Number of pings requested was zero."); @@ -149,16 +152,24 @@ public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerA // String messageCorrelationId = perThreadSetup._correlationId; // _logger.debug("messageCorrelationId = " + messageCorrelationId); + // Initialize the count and timing controller for the new correlation id. PerCorrelationId perCorrelationId = new PerCorrelationId(); TimingController tc = getTimingController().getControllerForCurrentThread(); perCorrelationId._tc = tc; - perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); + perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings + preFill); perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); + // Start the client that will have been paused due to preFill requirement. + // or if we have not yet started client because messages are sitting on broker. + if (preFill > 0 || testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME)) + { + pingClient.start(); + } + // Send the requested number of messages, and wait until they have all been received. long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId); + int numReplies = pingClient.pingAndWaitForReply(null, numPings , preFill, timeout, perThreadSetup._correlationId); // Check that all the replies were received and log a fail if they were not. if (numReplies < perCorrelationId._expectedCount) diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java index 94b8ea662e..0f9603303b 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java @@ -141,9 +141,65 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware perThreadSetup._pingClient = new PingClient(testParameters); perThreadSetup._pingClient.establishConnection(true, true); } - // Start the client connection - perThreadSetup._pingClient.start(); + // Prefill the broker unless we are in consume only mode. + int preFill = testParameters.getPropertyAsInteger(PingPongProducer.PREFILL_PROPNAME); + if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME) && preFill > 0) + { + // Manually set the correlation ID to 1. This is not ideal but it is the + // value that the main test loop will use. + perThreadSetup._pingClient.pingNoWaitForReply(null, preFill, "1"); + + // Note with a large preFill and non-tx session the messages will be + // rapidly pushed in to the mina buffers. OOM's are a real risk here. + // Should perhaps consider using a TX session for the prefill. + + long delayBeforeConsume = testParameters.getPropertyAsLong(PingPongProducer.DELAY_BEFORE_CONSUME_PROPNAME); + + // Only delay if we have consumers and a delayBeforeConsume + if ((testParameters.getPropertyAsInteger(PingPongProducer.NUM_CONSUMERS_PROPNAME) > 0) + && delayBeforeConsume > 0) + { + + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); + // Only do logging if in verbose mode. + if (verbose) + { + if (delayBeforeConsume > 60000) + { + long minutes = delayBeforeConsume / 60000; + long seconds = (delayBeforeConsume - (minutes * 60000)) / 1000; + long ms = delayBeforeConsume - (minutes * 60000) - (seconds * 1000); + _logger.info("Delaying for " + minutes + "m " + seconds + "s " + ms + "ms before starting test."); + } + else + { + _logger.info("Delaying for " + delayBeforeConsume + "ms before starting test."); + } + } + + Thread.sleep(delayBeforeConsume); + + if (verbose) + { + _logger.info("Starting Test."); + } + } + + // We can't start the client's here as the test client has not yet been configured to receieve messages. + // only when the test method is executed will the correlationID map be set up and ready to consume + // the messages we have sent here. + } + else //Only start the consumer if we are not preFilling. + { + // Only start the consumer if we don't have messages waiting to be received. + // we need to set up the correlationID mapping first + if (!testParameters.getPropertyAsBoolean(PingPongProducer.CONSUME_ONLY_PROPNAME)) + { + // Start the client connection + perThreadSetup._pingClient.start(); + } + } // Attach the per-thread set to the thread. threadSetup.set(perThreadSetup); } @@ -157,7 +213,7 @@ public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware * Performs test fixture clean */ public void threadTearDown() - { + { _logger.debug("public void threadTearDown(): called"); try diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index f994cd138e..e3769e415e 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -324,6 +324,25 @@ public class PingPongProducer implements Runnable, ExceptionListener /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; + /** Holds the name of the property to get the number of message to prefill the broker with before starting the main test. */ + public static final String PREFILL_PROPNAME = "preFill"; + + /** Defines the default value for the number of messages to prefill. 0,default, no messages. */ + public static final int PREFILL_DEFAULT = 0; + + /** Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled. */ + public static final String DELAY_BEFORE_CONSUME_PROPNAME = "delayBeforeConsume"; + + /** Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay. */ + public static final long DELAY_BEFORE_CONSUME = 0; + + /** Holds the name of the property to get when no messasges should be sent. */ + public static final String CONSUME_ONLY_PROPNAME = "consumeOnly"; + + /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */ + public static final boolean CONSUME_ONLY_DEFAULT = false; + + /** Holds the default configuration properties. */ public static ParsedProperties defaults = new ParsedProperties(); @@ -360,6 +379,9 @@ public class PingPongProducer implements Runnable, ExceptionListener defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); + defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT); + defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME); + defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT); } /** Allows setting of client ID on the connection, rather than through the connection URL. */ @@ -455,6 +477,24 @@ public class PingPongProducer implements Runnable, ExceptionListener */ protected int _maxPendingSize; + /** + * Holds the number of messages to send during the setup phase, before the clients start consuming. + */ + private Integer _preFill; + + /** + * Holds the time in ms to wait after preFilling before starting thet test. + */ + private Long _delayBeforeConsume; + + /** + * Holds a boolean value of wither this test should just consume, i.e. skips + * sending messages, but still expects to receive the specified number. + * Use in conjuction with numConsumers=0 to fill the broker. + */ + private boolean _consumeOnly; + + /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); @@ -588,6 +628,9 @@ public class PingPongProducer implements Runnable, ExceptionListener _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME); _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); + _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME); + _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME); + _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME); // Check that one or more destinations were specified. if (_noOfDestinations < 1) @@ -638,7 +681,10 @@ public class PingPongProducer implements Runnable, ExceptionListener } // Create the destinations to send pings to and receive replies from. - _replyDestination = _consumerSession[0].createTemporaryQueue(); + if (_noOfConsumers > 0) + { + _replyDestination = _consumerSession[0].createTemporaryQueue(); + } createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); // Create the message producer only if instructed to. @@ -871,6 +917,14 @@ public class PingPongProducer implements Runnable, ExceptionListener { _consumer = new MessageConsumer[_noOfConsumers]; + // If we don't have consumers then ensure we have created the + // destination. + if (_noOfConsumers == 0) + { + _producerSession.createConsumer(destination, selector, + NO_LOCAL_DEFAULT).close(); + } + for (int i = 0; i < _noOfConsumers; i++) { // Create a consumer for the destination and set this pinger to listen to its messages. @@ -980,6 +1034,11 @@ public class PingPongProducer implements Runnable, ExceptionListener // When running in client ack mode, an ack is done instead of a commit, on the commit batch // size boundaries. long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); + // _noOfConsumers can be set to 0 on the command line but we will not get here to + // divide by 0 as this is executed by the onMessage code when a message is recevied. + // no consumers means no message reception. + + // log.debug("commitCount = " + commitCount); if ((commitCount % _txBatchSize) == 0) @@ -1014,6 +1073,7 @@ public class PingPongProducer implements Runnable, ExceptionListener else { log.warn("Got unexpected message with correlationId: " + correlationID); + log.warn("Map contains:" + perCorrelationIds.entrySet()); } } else @@ -1037,13 +1097,18 @@ public class PingPongProducer implements Runnable, ExceptionListener * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify * the correlation id. * + * Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination + * with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings. + * + * If pre-fill is specified then the consumers will start paused to allow the prefilling to occur. + * * @param message The message to send. If this is null, one is generated. * @param numPings The number of ping messages to send. * @param timeout The timeout in milliseconds. * @param messageCorrelationId The message correlation id. If this is null, one is generated. * * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait - * for all prematurely. + * for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent. * * @throws JMSException All underlying JMSExceptions are allowed to fall through. * @throws InterruptedException When interrupted by a timeout @@ -1051,6 +1116,16 @@ public class PingPongProducer implements Runnable, ExceptionListener public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) throws JMSException, InterruptedException { + return pingAndWaitForReply(message, numPings, 0, timeout, messageCorrelationId); + } + + public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId) + throws JMSException, InterruptedException + { + + // If we are runnning a consumeOnly test then don't send any messages + + /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ @@ -1071,29 +1146,41 @@ public class PingPongProducer implements Runnable, ExceptionListener // countdown needs to be done before the chained listener can be called. PerCorrelationId perCorrelationId = new PerCorrelationId(); - perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1); + int totalPingsRequested = numPings + preFill; + perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(totalPingsRequested) + 1); perCorrelationIds.put(messageCorrelationId, perCorrelationId); // Set up the current time as the start time for pinging on the correlation id. This is used to determine // timeouts. perCorrelationId.timeOutStart = System.nanoTime(); - // Send the specifed number of messages. + // Send the specifed number of messages for this test pingNoWaitForReply(message, numPings, messageCorrelationId); boolean timedOut; boolean allMessagesReceived; int numReplies; + // We don't have a consumer so don't try and wait for the messages. + // this does mean that if the producerSession is !TXed then we may + // get to exit before all msgs have been received. + // + // Return the number of requested messages, this will let the test + // report a pass. + if (_noOfConsumers == 0) + { + return totalPingsRequested; + } + do { // Block the current thread until replies to all the messages are received, or it times out. perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); // Work out how many replies were receieved. - numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount(); + numReplies = getExpectedNumPings(totalPingsRequested) - (int) perCorrelationId.trafficLight.getCount(); - allMessagesReceived = numReplies == getExpectedNumPings(numPings); + allMessagesReceived = numReplies == getExpectedNumPings(totalPingsRequested); // log.debug("numReplies = " + numReplies); // log.debug("allMessagesReceived = " + allMessagesReceived); @@ -1108,7 +1195,7 @@ public class PingPongProducer implements Runnable, ExceptionListener } while (!timedOut && !allMessagesReceived); - if ((numReplies < getExpectedNumPings(numPings)) && _verbose) + if ((numReplies < getExpectedNumPings(totalPingsRequested)) && _verbose) { log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); } @@ -1146,6 +1233,12 @@ public class PingPongProducer implements Runnable, ExceptionListener /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ + // If we are runnning a consumeOnly test then don't send any messages + if (_consumeOnly) + { + return; + } + if (message == null) { message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); @@ -1667,6 +1760,10 @@ public class PingPongProducer implements Runnable, ExceptionListener /** * Calculates how many pings are expected to be received for the given number sent. * + * Note : that if you have set noConsumers to 0 then this will also return 0 + * in the case of PubSub testing. This is correct as without consumers there + * will be no-one to receive the sent messages so they will be unable to respond. + * * @param numpings The number of pings that will be sent. * * @return The number that should be received, for the test to pass. -- cgit v1.2.1