diff options
| author | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-05 15:54:44 +0000 |
|---|---|---|
| committer | Bhupendra Bhusman Bhardwaj <bhupendrab@apache.org> | 2007-03-05 15:54:44 +0000 |
| commit | d8af3d27e9ad089c9fa52d0943c82da856102bb2 (patch) | |
| tree | 3ab516ad66f6f9aa2891565985e5566f11492bbb /java/perftests/src/main | |
| parent | ef2e26031e8de47461a8653ecb764019fe4b603e (diff) | |
| download | qpid-python-d8af3d27e9ad089c9fa52d0943c82da856102bb2.tar.gz | |
QPID-388 : hand merged the changes done in perftesting branch
QPID-395 : hand merged the changes done in perftesting branch
QPID-375 : default queue config properties should now be under <queues> tag
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@514703 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests/src/main')
| -rw-r--r-- | java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java | 6 | ||||
| -rw-r--r-- | java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java | 64 |
2 files changed, 52 insertions, 18 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java index 1a37f47b35..013bda5927 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -73,11 +73,12 @@ public class PingClient extends PingPongProducer public PingClient(String brokerDetails, String username, String password, String virtualpath, String destinationName,
String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
- int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique) throws Exception
+ int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique,
+ int ackMode, long pausetime) throws Exception
{
super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
- pubsub, unique);
+ pubsub, unique, ackMode, pausetime);
_pingClientCount++;
}
@@ -104,5 +105,4 @@ public class PingClient extends PingPongProducer return _pingClientCount;
}
}
-
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index 57d5c37fc6..f2fbd29314 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -163,6 +163,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final String UNIQUE_PROPNAME = "uniqueDests";
+ public static final String ACK_MODE_PROPNAME = "ackMode";
+
+ public static final String PAUSE_AFTER_BATCH_PROPNAME = "pausetimeAfterEachBatch";
+
/**
* Used to set up a default message size.
*/
@@ -285,6 +289,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis public static final boolean DEFAULT_UNIQUE = true;
+ public static final int DEFAULT_ACK_MODE = Session.NO_ACKNOWLEDGE;
+
/**
* Holds the name of the property to store nanosecond timestamps in ping messages with.
*/
@@ -325,6 +331,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
protected boolean _persistent;
+ private int _ackMode = Session.NO_ACKNOWLEDGE;
+
/**
* Determines what size of messages this producer sends.
*/
@@ -421,6 +429,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis */
protected int _txBatchSize = 1;
+ private static long _pausetimeAfterEachBatch = 0;
+
/**
* Holds the number of consumers that will be attached to each topic.
* Each pings will result in a reply from each of the attached clients
@@ -460,7 +470,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
- boolean pubsub, boolean unique) throws Exception
+ boolean pubsub, boolean unique, int ackMode, long pause) throws Exception
{
_logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username
+ ", String password = " + password + ", String virtualpath = " + virtualpath
@@ -470,7 +480,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis + afterCommit + ", boolean beforeCommit = " + beforeCommit + ", boolean afterSend = " + afterSend
+ ", boolean beforeSend = " + beforeSend + ", boolean failOnce = " + failOnce + ", int txBatchSize = "
+ txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate
- + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + "): called");
+ + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique
+ + ", ackMode = " + ackMode + "): called");
// Keep all the relevant options.
_persistent = persistent;
@@ -484,7 +495,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _txBatchSize = txBatchSize;
_isPubSub = pubsub;
_isUnique = unique;
-
+ _pausetimeAfterEachBatch = pause;
+ if (ackMode != 0)
+ {
+ _ackMode = ackMode;
+ }
+
// Check that one or more destinations were specified.
if (noOfDestinations < 1)
{
@@ -498,8 +514,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath);
// Create transactional or non-transactional sessions, based on the command line arguments.
- _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ _producerSession = (Session) getConnection().createSession(transacted, _ackMode);
+ _consumerSession = (Session) getConnection().createSession(transacted, _ackMode);
// Set up a throttle to control the send rate, if a rate > 0 is specified.
if (rate > 0)
@@ -537,7 +553,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "test";
+ String virtualpath = DEFAULT_VIRTUAL_PATH;
String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
boolean verbose = true;
boolean transacted = config.isTransacted();
@@ -597,7 +613,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis PingPongProducer pingProducer =
new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0, 0);
pingProducer.getConnection().start();
@@ -687,31 +703,31 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis {
AMQDestination destination;
- int id;
+ String id;
// Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
if (unique)
{
_logger.debug("Creating unique destinations.");
- id = _queueJVMSequenceID.incrementAndGet();
+ id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
}
else
{
_logger.debug("Creating shared destinations.");
- id = _queueSharedId.incrementAndGet();
+ id = "_" + _queueSharedId.incrementAndGet();
}
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
- _logger.debug("Creating topics.");
destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
+ _logger.debug("Created topic " + destination);
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
- _logger.debug("Creating queues.");
destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
+ _logger.debug("Created queue " + destination);
}
// Keep the destination.
@@ -834,6 +850,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
}
+ public int pingAndWaitForReply(int numPings, long timeout, String messageCorrelationId)
+ throws JMSException, InterruptedException
+ {
+ return pingAndWaitForReply(null, numPings, timeout, messageCorrelationId);
+ }
+
/**
* Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
* before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
@@ -936,6 +958,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ if (message == null)
+ {
+ message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
+ }
+
message.setJMSCorrelationID(messageCorrelationId);
// Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
@@ -967,6 +994,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis {
commitTx(_producerSession);
committed = true;
+ /* This pause is required for some cases. eg in load testing when sessions are non-transacted the
+ Mina IO layer can't clear the cache in time. So this pause gives enough time for mina to clear
+ the cache (without this mina throws OutOfMemoryError). pause() will check if time is != 0
+ */
+ pause(_pausetimeAfterEachBatch);
}
// Spew out per message timings on every message sonly in verbose mode.
@@ -1013,10 +1045,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis }
}
- /*public Destination getReplyDestination()
+ public Destination getReplyDestination()
{
- return _replyDestination;
- }*/
+ return getReplyDestinations().get(0);
+ }
/**
* Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
@@ -1203,7 +1235,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis doFailover();
}
+ long l = System.currentTimeMillis();
session.commit();
+ _logger.debug("Time taken to commit :" + (System.currentTimeMillis() - l) + " ms" );
if (_failAfterCommit)
{
|
