diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-05-09 15:08:56 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-05-09 15:08:56 +0000 |
| commit | b9f5458e54ca28db50c0629e8022cbfeb587d42f (patch) | |
| tree | 3d1627fbb71fabb402a574dd5e02861a3e381ce5 /java | |
| parent | 0ffd731bce5980d517c2c42a10c0a3d3f22b9636 (diff) | |
| download | qpid-python-b9f5458e54ca28db50c0629e8022cbfeb587d42f.tar.gz | |
Merged revisions 536141-536162,536165-536243 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2
........
r536141 | ritchiem | 2007-05-08 10:54:30 +0100 (Tue, 08 May 2007) | 1 line
Added default password file for use with Base64MD5PassswordFilePrincipalDatabase
........
r536243 | rgreig | 2007-05-08 17:31:27 +0100 (Tue, 08 May 2007) | 1 line
Some robustness added to tests by limiting buffered messages.
........
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@536559 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/perftests/pom.xml | 4 | ||||
| -rw-r--r-- | java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java | 134 |
2 files changed, 99 insertions, 39 deletions
diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index 3cae761554..8dc7fab1d0 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -137,7 +137,7 @@ </property> <property> <name>-Xmx</name> - <value>3072m</value> + <value>256m</value> </property> <property> <name>log4j.configuration</name> @@ -161,7 +161,7 @@ <!-- Single pings. These can be scaled up by overriding the parameters when calling the test script. --> <Ping-Once>-n Ping-Once -s [1] -r 1 -t testPingOk -o . org.apache.qpid.ping.PingTestPerf</Ping-Once> <Ping-Once-Async>-n Ping-Once-Async -s [1] -r 1 -t testAsyncPingOk -o . org.apache.qpid.ping.PingAsyncTestPerf</Ping-Once-Async> - <Ping-Latency>-n Ping-Latency -s [1000] -d 10S -t testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf</Ping-Latency> + <Ping-Latency>-n Ping-Latency -s [1000] -d 10S -t testPingLatency -o . org.apache.qpid.ping.PingLatencyTestPerf rate=100</Ping-Latency> <!-- More example Tests. These are examples to exercise all the features of the test harness. Can scale up with option overrides. --> <Ping-Tx>-n Ping-Tx -s [100] -o . -t testAsyncPingOk org.apache.qpid.ping.PingAsyncTestPerf transacted=true</Ping-Tx> diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 642f3077fd..1b4fa6b779 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -25,7 +25,9 @@ import java.net.InetAddress; import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -36,8 +38,8 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException;
import org.apache.qpid.client.*;
-import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.client.message.AMQMessage;
+import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.MessageProducer;
@@ -96,7 +98,10 @@ import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; * 3 - DUPS_OK_ACKNOWLEDGE
* 257 - NO_ACKNOWLEDGE
* 258 - PRE_ACKNOWLEDGE
- * <tr><td> pauseBatch <td> 0 <td> In milliseconds. A pause to insert between transaction batches.
+ * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages send but not yet received.
+ * Limits the volume of messages currently buffered on the client
+ * or broker. Can help scale test clients by limiting amount of buffered
+ * data to avoid out of memory errors.
* </table>
*
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
@@ -265,11 +270,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis /** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
- /** Holds the name of the property to get the pause between batches property from. */
- public static final String PAUSE_AFTER_BATCH_PROPNAME = "pauseBatch";
-
- /** Defines the default time in milliseconds to wait between commit batches. */
- public static final long PAUSE_AFTER_BATCH_DEFAULT = 0L;
+ public static final String MAX_PENDING_PROPNAME = "maxPending";
+ public static final int MAX_PENDING_DEFAULT = 0;
/** Defines the default prefetch size to use when consuming messages. */
public static final int PREFETCH_DEFAULT = 100;
@@ -310,8 +312,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
- defaults.setPropertyIfNull(PAUSE_AFTER_BATCH_PROPNAME, PAUSE_AFTER_BATCH_DEFAULT);
defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
+ defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
}
protected String _brokerDetails;
@@ -364,8 +366,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis protected int _noOfDestinations;
protected int _rate;
- /** Holds the wait time to insert between every batch of messages committed. */
- private long _pauseBatch;
+ /**
+ * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended
+ * if this limit is breached.
+ */
+ protected int _maxPendingSize;
+
+ /**
+ * Holds a cyclic barrier which is used to synchronize sender and receiver threads, where the sender has elected
+ * to wait until the number of unreceived message is reduced before continuing to send.
+ */
+ protected CyclicBarrier _sendPauseBarrier = new CyclicBarrier(2);
+
+ /** Keeps a count of the number of message currently sent but not received. */
+ protected AtomicInteger _unreceived = new AtomicInteger(0);
/** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
@@ -375,7 +389,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * ping producers on the same JVM.
*/
private static Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
/** A convenient formatter to use when time stamping output. */
protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
@@ -472,7 +486,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
_isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
_ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
- _pauseBatch = properties.getPropertyAsLong(PAUSE_AFTER_BATCH_PROPNAME);
+ _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
// Check that one or more destinations were specified.
if (_noOfDestinations < 1)
@@ -556,7 +570,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis {
try
{
- Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][]{}));
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
// Create a ping producer overriding its defaults with all options passed on the command line.
PingPongProducer pingProducer = new PingPongProducer(options);
@@ -598,8 +612,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis Thread.sleep(sleepTime);
}
catch (InterruptedException ie)
- {
- }
+ { }
}
}
@@ -650,11 +663,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @throws JMSException Any JMSExceptions are allowed to fall through.
*/
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
- boolean durable) throws JMSException, AMQException
+ boolean durable) throws JMSException, AMQException
{
log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
- + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
- + durable + "): called");
+ + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
+ + durable + "): called");
_pingDestinations = new ArrayList<Destination>();
@@ -690,8 +703,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis else
{
destination =
- AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
- _clientID, (AMQConnection) _connection);
+ AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
+ _clientID, (AMQConnection) _connection);
log.debug("Created durable topic " + destination);
}
}
@@ -700,11 +713,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis {
AMQShortString destinationName = new AMQShortString(rootName + id);
destination =
- new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false,
- _isDurable);
+ new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false,
+ _isDurable);
((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false);
((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null,
- ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ ExchangeDefaults.DIRECT_EXCHANGE_NAME);
log.debug("Created queue " + destination);
}
@@ -725,7 +738,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
{
log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
- + ", String selector = " + selector + "): called");
+ + ", String selector = " + selector + "): called");
log.debug("Creating " + destinations.size() + " reply consumers.");
@@ -733,8 +746,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis {
// Create a consumer for the destination and set this pinger to listen to its messages.
_consumer =
- _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
- selector);
+ _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
+ selector);
_consumer.setMessageListener(this);
log.debug("Set this to listen to replies sent to destination: " + destination);
@@ -783,6 +796,31 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis trueCount = trafficLight.getCount();
remainingCount = trueCount - 1;
+ // Decrement the count of sent but not yet received messages.
+ int unreceived = _unreceived.decrementAndGet();
+ int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
+
+ // Release a waiting sender if there is one.
+ if ((_maxPendingSize > 0) && (unreceivedSize < _maxPendingSize)
+ && (_sendPauseBarrier.getNumberWaiting() == 1))
+ {
+ log.debug("unreceived size estimate under limit = " + unreceivedSize);
+
+ // Wait on the send pause barrier for the limit to be re-established.
+ try
+ {
+ _sendPauseBarrier.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (BrokenBarrierException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
// log.debug("remainingCount = " + remainingCount);
// log.debug("trueCount = " + trueCount);
@@ -849,10 +887,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis * @throws InterruptedException When interrupted by a timeout
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
+ throws JMSException, InterruptedException
{
log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
- + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
// Generate a unique correlation id to put on the messages before sending them, if one was not specified.
if (messageCorrelationId == null)
@@ -941,7 +979,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
{
log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
- + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ + ", String messageCorrelationId = " + messageCorrelationId + "): called");
if (message == null)
{
@@ -1014,6 +1052,29 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis waitForUser(KILL_BROKER_PROMPT);
}
+ // Increase the count of sent but not yet received messages.
+ int unreceived = _unreceived.getAndIncrement();
+ int unreceivedSize = (unreceived * ((_messageSize == 0) ? 1 : _messageSize));
+
+ if ((_maxPendingSize > 0) && (unreceivedSize > _maxPendingSize))
+ {
+ log.debug("unreceived size estimate over limit = " + unreceivedSize);
+
+ // Wait on the send pause barrier for the limit to be re-established.
+ try
+ {
+ _sendPauseBarrier.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (BrokenBarrierException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
// Send the message either to its round robin destination, or its default destination.
if (destination == null)
{
@@ -1128,7 +1189,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis {
Long value = ((AMQMessage) msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
- return value == null ? 0L : value;
+ return (value == null) ? 0L : value;
}
else
{
@@ -1136,7 +1197,6 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
}
-
/**
* Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag
* has been cleared.
@@ -1177,12 +1237,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public Thread getShutdownHook()
{
return new Thread(new Runnable()
- {
- public void run()
- {
- stop();
- }
- });
+ {
+ public void run()
+ {
+ stop();
+ }
+ });
}
/**
|
