summaryrefslogtreecommitdiff
path: root/java/perftests/src/main
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-05 15:54:44 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-05 15:54:44 +0000
commitd8af3d27e9ad089c9fa52d0943c82da856102bb2 (patch)
tree3ab516ad66f6f9aa2891565985e5566f11492bbb /java/perftests/src/main
parentef2e26031e8de47461a8653ecb764019fe4b603e (diff)
downloadqpid-python-d8af3d27e9ad089c9fa52d0943c82da856102bb2.tar.gz
QPID-388 : hand merged the changes done in perftesting branch
QPID-395 : hand merged the changes done in perftesting branch QPID-375 : default queue config properties should now be under <queues> tag git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@514703 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests/src/main')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java6
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java64
2 files changed, 52 insertions, 18 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
index 1a37f47b35..013bda5927 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
@@ -73,11 +73,12 @@ public class PingClient extends PingPongProducer
public PingClient(String brokerDetails, String username, String password, String virtualpath, String destinationName,
String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
- int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique) throws Exception
+ int txBatchSize, int noOfDestinations, int rate, boolean pubsub, boolean unique,
+ int ackMode, long pausetime) throws Exception
{
super(brokerDetails, username, password, virtualpath, destinationName, selector, transacted, persistent, messageSize,
verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, txBatchSize, noOfDestinations, rate,
- pubsub, unique);
+ pubsub, unique, ackMode, pausetime);
_pingClientCount++;
}
@@ -104,5 +105,4 @@ public class PingClient extends PingPongProducer
return _pingClientCount;
}
}
-
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 57d5c37fc6..f2fbd29314 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -163,6 +163,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public static final String UNIQUE_PROPNAME = "uniqueDests";
+ public static final String ACK_MODE_PROPNAME = "ackMode";
+
+ public static final String PAUSE_AFTER_BATCH_PROPNAME = "pausetimeAfterEachBatch";
+
/**
* Used to set up a default message size.
*/
@@ -285,6 +289,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
public static final boolean DEFAULT_UNIQUE = true;
+ public static final int DEFAULT_ACK_MODE = Session.NO_ACKNOWLEDGE;
+
/**
* Holds the name of the property to store nanosecond timestamps in ping messages with.
*/
@@ -325,6 +331,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
protected boolean _persistent;
+ private int _ackMode = Session.NO_ACKNOWLEDGE;
+
/**
* Determines what size of messages this producer sends.
*/
@@ -421,6 +429,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
*/
protected int _txBatchSize = 1;
+ private static long _pausetimeAfterEachBatch = 0;
+
/**
* Holds the number of consumers that will be attached to each topic.
* Each pings will result in a reply from each of the attached clients
@@ -460,7 +470,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
- boolean pubsub, boolean unique) throws Exception
+ boolean pubsub, boolean unique, int ackMode, long pause) throws Exception
{
_logger.debug("public PingPongProducer(String brokerDetails = " + brokerDetails + ", String username = " + username
+ ", String password = " + password + ", String virtualpath = " + virtualpath
@@ -470,7 +480,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
+ afterCommit + ", boolean beforeCommit = " + beforeCommit + ", boolean afterSend = " + afterSend
+ ", boolean beforeSend = " + beforeSend + ", boolean failOnce = " + failOnce + ", int txBatchSize = "
+ txBatchSize + ", int noOfDestinations = " + noOfDestinations + ", int rate = " + rate
- + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique + "): called");
+ + ", boolean pubsub = " + pubsub + ", boolean unique = " + unique
+ + ", ackMode = " + ackMode + "): called");
// Keep all the relevant options.
_persistent = persistent;
@@ -484,7 +495,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_txBatchSize = txBatchSize;
_isPubSub = pubsub;
_isUnique = unique;
-
+ _pausetimeAfterEachBatch = pause;
+ if (ackMode != 0)
+ {
+ _ackMode = ackMode;
+ }
+
// Check that one or more destinations were specified.
if (noOfDestinations < 1)
{
@@ -498,8 +514,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath);
// Create transactional or non-transactional sessions, based on the command line arguments.
- _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ _producerSession = (Session) getConnection().createSession(transacted, _ackMode);
+ _consumerSession = (Session) getConnection().createSession(transacted, _ackMode);
// Set up a throttle to control the send rate, if a rate > 0 is specified.
if (rate > 0)
@@ -537,7 +553,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "test";
+ String virtualpath = DEFAULT_VIRTUAL_PATH;
String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
boolean verbose = true;
boolean transacted = config.isTransacted();
@@ -597,7 +613,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
PingPongProducer pingProducer =
new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize, destCount, rate, pubsub, false);
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub, false, 0, 0);
pingProducer.getConnection().start();
@@ -687,31 +703,31 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
AMQDestination destination;
- int id;
+ String id;
// Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
if (unique)
{
_logger.debug("Creating unique destinations.");
- id = _queueJVMSequenceID.incrementAndGet();
+ id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
}
else
{
_logger.debug("Creating shared destinations.");
- id = _queueSharedId.incrementAndGet();
+ id = "_" + _queueSharedId.incrementAndGet();
}
// Check if this is a pub/sub pinger, in which case create topics.
if (_isPubSub)
{
- _logger.debug("Creating topics.");
destination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id);
+ _logger.debug("Created topic " + destination);
}
// Otherwise this is a p2p pinger, in which case create queues.
else
{
- _logger.debug("Creating queues.");
destination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, rootName + id);
+ _logger.debug("Created queue " + destination);
}
// Keep the destination.
@@ -834,6 +850,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
}
+ public int pingAndWaitForReply(int numPings, long timeout, String messageCorrelationId)
+ throws JMSException, InterruptedException
+ {
+ return pingAndWaitForReply(null, numPings, timeout, messageCorrelationId);
+ }
+
/**
* Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
* before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
@@ -936,6 +958,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ if (message == null)
+ {
+ message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
+ }
+
message.setJMSCorrelationID(messageCorrelationId);
// Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
@@ -967,6 +994,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
commitTx(_producerSession);
committed = true;
+ /* This pause is required for some cases. eg in load testing when sessions are non-transacted the
+ Mina IO layer can't clear the cache in time. So this pause gives enough time for mina to clear
+ the cache (without this mina throws OutOfMemoryError). pause() will check if time is != 0
+ */
+ pause(_pausetimeAfterEachBatch);
}
// Spew out per message timings on every message sonly in verbose mode.
@@ -1013,10 +1045,10 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
}
- /*public Destination getReplyDestination()
+ public Destination getReplyDestination()
{
- return _replyDestination;
- }*/
+ return getReplyDestinations().get(0);
+ }
/**
* Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
@@ -1203,7 +1235,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
doFailover();
}
+ long l = System.currentTimeMillis();
session.commit();
+ _logger.debug("Time taken to commit :" + (System.currentTimeMillis() - l) + " ms" );
if (_failAfterCommit)
{