From b9f5458e54ca28db50c0629e8022cbfeb587d42f Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 9 May 2007 15:08:56 +0000 Subject: Merged revisions 536141-536162,536165-536243 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r536141 | ritchiem | 2007-05-08 10:54:30 +0100 (Tue, 08 May 2007) | 1 line Added default password file for use with Base64MD5PassswordFilePrincipalDatabase ........ r536243 | rgreig | 2007-05-08 17:31:27 +0100 (Tue, 08 May 2007) | 1 line Some robustness added to tests by limiting buffered messages. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536559 13f79535-47bb-0310-9956-ffa450edef68 --- java/perftests/pom.xml | 4 +- .../apache/qpid/requestreply/PingPongProducer.java | 134 +++++++++++++++------ 2 files changed, 99 insertions(+), 39 deletions(-) (limited to 'java') diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 3cae761554..8dc7fab1d0 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -137,7 +137,7 @@ -Xmx - 3072m + 256m log4j.configuration @@ -161,7 +161,7 @@ -n Ping-Once -s [1] -r 1 -t testPingOk -o . org.apache.qpid.ping.PingTestPerf -n Ping-Once-Async -s [1] -r 1 -t testAsyncPingOk -o . org.apache.qpid.ping.PingAsyncTestPerf - -n Ping-Latency -s [1000] -d 10S -t testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf + -n Ping-Latency -s [1000] -d 10S -t testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf rate=100 -n Ping-Tx -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf transacted=true diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 642f3077fd..1b4fa6b779 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -25,7 +25,9 @@ import java.net.InetAddress; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -36,8 +38,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.client.*; -import org.apache.qpid.client.message.TestMessageFactory; import org.apache.qpid.client.message.AMQMessage; +import org.apache.qpid.client.message.TestMessageFactory; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.MessageProducer; @@ -96,7 +98,10 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * 3 - DUPS_OK_ACKNOWLEDGE * 257 - NO_ACKNOWLEDGE * 258 - PRE_ACKNOWLEDGE - * pauseBatch 0 In milliseconds. A pause to insert between transaction batches. + * maxPending 0 The maximum size in bytes, of messages send but not yet received. + * Limits the volume of messages currently buffered on the client + * or broker. Can help scale test clients by limiting amount of buffered + * data to avoid out of memory errors. * * *

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop @@ -265,11 +270,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Defines the default message acknowledgement mode. */ public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; - /** Holds the name of the property to get the pause between batches property from. */ - public static final String PAUSE_AFTER_BATCH_PROPNAME = "pauseBatch"; - - /** Defines the default time in milliseconds to wait between commit batches. */ - public static final long PAUSE_AFTER_BATCH_DEFAULT = 0L; + public static final String MAX_PENDING_PROPNAME = "maxPending"; + public static final int MAX_PENDING_DEFAULT = 0; /** Defines the default prefetch size to use when consuming messages. */ public static final int PREFETCH_DEFAULT = 100; @@ -310,8 +312,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); - defaults.setPropertyIfNull(PAUSE_AFTER_BATCH_PROPNAME, PAUSE_AFTER_BATCH_DEFAULT); defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); + defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); } protected String _brokerDetails; @@ -364,8 +366,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis protected int _noOfDestinations; protected int _rate; - /** Holds the wait time to insert between every batch of messages committed. */ - private long _pauseBatch; + /** + * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended + * if this limit is breached. + */ + protected int _maxPendingSize; + + /** + * Holds a cyclic barrier which is used to synchronize sender and receiver threads, where the sender has elected + * to wait until the number of unreceived message is reduced before continuing to send. + */ + protected CyclicBarrier _sendPauseBarrier = new CyclicBarrier(2); + + /** Keeps a count of the number of message currently sent but not received. */ + protected AtomicInteger _unreceived = new AtomicInteger(0); /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); @@ -375,7 +389,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * ping producers on the same JVM. */ private static Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); + Collections.synchronizedMap(new HashMap()); /** A convenient formatter to use when time stamping output. */ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); @@ -472,7 +486,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME); - _pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME); + _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); // Check that one or more destinations were specified. if (_noOfDestinations < 1) @@ -556,7 +570,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { try { - Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][]{})); + Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {})); // Create a ping producer overriding its defaults with all options passed on the command line. PingPongProducer pingProducer = new PingPongProducer(options); @@ -598,8 +612,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis Thread.sleep(sleepTime); } catch (InterruptedException ie) - { - } + { } } } @@ -650,11 +663,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @throws JMSException Any JMSExceptions are allowed to fall through. */ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, - boolean durable) throws JMSException, AMQException + boolean durable) throws JMSException, AMQException { log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " - + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " - + durable + "): called"); + + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " + + durable + "): called"); _pingDestinations = new ArrayList(); @@ -690,8 +703,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis else { destination = - AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id), - _clientID, (AMQConnection) _connection); + AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id), + _clientID, (AMQConnection) _connection); log.debug("Created durable topic " + destination); } } @@ -700,11 +713,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { AMQShortString destinationName = new AMQShortString(rootName + id); destination = - new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false, - _isDurable); + new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false, + _isDurable); ((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false); ((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null, - ExchangeDefaults.DIRECT_EXCHANGE_NAME); + ExchangeDefaults.DIRECT_EXCHANGE_NAME); log.debug("Created queue " + destination); } @@ -725,7 +738,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public void createReplyConsumers(Collection destinations, String selector) throws JMSException { log.debug("public void createReplyConsumers(Collection destinations = " + destinations - + ", String selector = " + selector + "): called"); + + ", String selector = " + selector + "): called"); log.debug("Creating " + destinations.size() + " reply consumers."); @@ -733,8 +746,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { // Create a consumer for the destination and set this pinger to listen to its messages. _consumer = - _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT, - selector); + _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT, + selector); _consumer.setMessageListener(this); log.debug("Set this to listen to replies sent to destination: " + destination); @@ -783,6 +796,31 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis trueCount = trafficLight.getCount(); remainingCount = trueCount - 1; + // Decrement the count of sent but not yet received messages. + int unreceived = _unreceived.decrementAndGet(); + int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); + + // Release a waiting sender if there is one. + if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize) + && (_sendPauseBarrier.getNumberWaiting() == 1)) + { + log.debug("unreceived size estimate under limit = " + unreceivedSize); + + // Wait on the send pause barrier for the limit to be re-established. + try + { + _sendPauseBarrier.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (BrokenBarrierException e) + { + throw new RuntimeException(e); + } + } + // log.debug("remainingCount = " + remainingCount); // log.debug("trueCount = " + trueCount); @@ -849,10 +887,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @throws InterruptedException When interrupted by a timeout */ public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) - throws JMSException, InterruptedException + throws JMSException, InterruptedException { log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " - + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called"); + + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called"); // Generate a unique correlation id to put on the messages before sending them, if one was not specified. if (messageCorrelationId == null) @@ -941,7 +979,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException { log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings - + ", String messageCorrelationId = " + messageCorrelationId + "): called"); + + ", String messageCorrelationId = " + messageCorrelationId + "): called"); if (message == null) { @@ -1014,6 +1052,29 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis waitForUser(KILL_BROKER_PROMPT); } + // Increase the count of sent but not yet received messages. + int unreceived = _unreceived.getAndIncrement(); + int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize)); + + if ((_maxPendingSize > 0) && (unreceivedSize > _maxPendingSize)) + { + log.debug("unreceived size estimate over limit = " + unreceivedSize); + + // Wait on the send pause barrier for the limit to be re-established. + try + { + _sendPauseBarrier.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (BrokenBarrierException e) + { + throw new RuntimeException(e); + } + } + // Send the message either to its round robin destination, or its default destination. if (destination == null) { @@ -1128,7 +1189,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis { Long value = ((AMQMessage) msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME)); - return value == null ? 0L : value; + return (value == null) ? 0L : value; } else { @@ -1136,7 +1197,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } } - /** * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag * has been cleared. @@ -1177,12 +1237,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public Thread getShutdownHook() { return new Thread(new Runnable() - { - public void run() - { - stop(); - } - }); + { + public void run() + { + stop(); + } + }); } /** -- cgit v1.2.1