summaryrefslogtreecommitdiff
path: root/java/perftests/src
diff options
context:
space:
mode:
authorRupert Smith <rupertlssmith@apache.org>2007-10-03 11:23:08 +0000
committerRupert Smith <rupertlssmith@apache.org>2007-10-03 11:23:08 +0000
commit0133aa9d1f0224a7bea9a2684261d8bc09e64dad (patch)
tree7a40818c0c012bdd5b9ed4159d25c4a09d9bbe9e /java/perftests/src
parentff3aa7b61a1b7454e15958ae03a7f09e687dd754 (diff)
downloadqpid-python-0133aa9d1f0224a7bea9a2684261d8bc09e64dad.tar.gz
Added reliability tests for all ack modes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@581566 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests/src')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java225
1 files changed, 125 insertions, 100 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index ed77332e19..bca67bb0ce 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -154,7 +154,10 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** Holds the transactional mode to use for the test. */
public static final boolean TRANSACTED_DEFAULT = false;
+ /** Holds the name of the property to get the test consumer transacted mode from. */
public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
+
+ /** Holds the consumer transactional mode default setting. */
public static final boolean CONSUMER_TRANSACTED_DEFAULT = false;
/** Holds the name of the property to get the test broker url from. */
@@ -265,7 +268,10 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** Defines the default value for the unique destinations property. */
public static final boolean UNIQUE_DESTS_DEFAULT = true;
+ /** Holds the name of the property to get the durable destinations flag from. */
public static final String DURABLE_DESTS_PROPNAME = "durableDests";
+
+ /** Defines the default value of the durable destinations flag. */
public static final boolean DURABLE_DESTS_DEFAULT = false;
/** Holds the name of the proeprty to get the message acknowledgement mode from. */
@@ -274,10 +280,16 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** Defines the default message acknowledgement mode. */
public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ /** Holds the name of the property to get the consumers message acknowledgement mode from. */
public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
+
+ /** Defines the default consumers message acknowledgement mode. */
public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
+ /** Holds the name of the property to get the maximum pending message size setting from. */
public static final String MAX_PENDING_PROPNAME = "maxPending";
+
+ /** Defines the default value for the maximum pending message size setting. 0 means no limit. */
public static final int MAX_PENDING_DEFAULT = 0;
/** Defines the default prefetch size to use when consuming messages. */
@@ -326,21 +338,37 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
}
+ /** Holds the broker url. */
protected String _brokerDetails;
+
+ /** Holds the username to access the broker with. */
protected String _username;
+
+ /** Holds the password to access the broker with. */
protected String _password;
+
+ /** Holds the virtual host on the broker to run the tests through. */
protected String _virtualpath;
+
+ /** Holds the root name from which to generate test destination names. */
protected String _destinationName;
+
+ /** Holds the message selector to filter the pings with. */
protected String _selector;
+
+ /** Holds the producers transactional mode flag. */
protected boolean _transacted;
+
+ /** Holds the consumers transactional mode flag. */
protected boolean _consTransacted;
/** Determines whether this producer sends persistent messages. */
protected boolean _persistent;
- /** Holds the acknowledgement mode used for sending and receiving messages. */
+ /** Holds the acknowledgement mode used for the producers. */
protected int _ackMode;
+ /** Holds the acknowledgement mode setting for the consumers. */
protected int _consAckMode;
/** Determines what size of messages this producer sends. */
@@ -395,7 +423,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
* Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
* to wait until the number of unreceived message is reduced before continuing to send.
*/
- protected Object _sendPauseMonitor = new Object();
+ protected static final Object _sendPauseMonitor = new Object();
/** Keeps a count of the number of message currently sent but not received. */
protected static AtomicInteger _unreceived = new AtomicInteger(0);
@@ -413,7 +441,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** A convenient formatter to use when time stamping output. */
protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
- /** Holds the connection to the broker. */
+ /** Holds the connection for the message producer. */
protected Connection _connection;
/** Holds the consumer connections. */
@@ -460,6 +488,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
/** The prompt to display when asking the user to kill the broker for failover testing. */
private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
+
+ /** Holds the name for this test client to be identified to the broker with. */
private String _clientID;
/** Keeps count of the total messages sent purely for debugging purposes. */
@@ -470,7 +500,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
* for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
* it, to send and recieve its pings and replies on.
*
- * @param overrides Properties containing any desired overrides to the defaults.
+ * @param overrides Properties containing any desired overrides to the defaults.
*
* @throws Exception Any exceptions are allowed to fall through.
*/
@@ -506,8 +536,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
_isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
_isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
_isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
- _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
- _consAckMode = properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
+ _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
+ _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
_maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
// Check that one or more destinations were specified.
@@ -696,13 +726,15 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
* Creates consumers for the specified number of destinations. The destinations themselves are also created by this
* method.
*
- * @param noOfDestinations The number of destinations to create consumers for.
- * @param selector The message selector to filter the consumers with.
- * @param rootName The root of the name, or actual name if only one is being created.
- * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the
- * numbering with all pingers on the same JVM.
+ * @param noOfDestinations The number of destinations to create consumers for.
+ * @param selector The message selector to filter the consumers with.
+ * @param rootName The root of the name, or actual name if only one is being created.
+ * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the
+ * numbering with all pingers on the same JVM.
+ * @param durable If the destinations are durable topics.
*
* @throws JMSException Any JMSExceptions are allowed to fall through.
+ * @throws AMQException Any AMQExceptions are allowed to fall through.
*/
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
boolean durable) throws JMSException, AMQException
@@ -817,7 +849,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
* correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the
* replies map.
*
- * @param message The received message.
+ * @param message The received message.
+ * @param consumerNo The consumer number within this test pinger instance.
*/
public void onMessageWithConsumerNo(Message message, int consumerNo)
{
@@ -834,7 +867,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
String correlationID = message.getJMSCorrelationID();
// log.debug("correlationID = " + correlationID);
- int num = message.getIntProperty("MSG_NUM");
+ // int num = message.getIntProperty("MSG_NUM");
// log.info("Message " + num + " received.");
boolean isRedelivered = message.getJMSRedelivered();
@@ -857,8 +890,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// Decrement the countdown latch. Before this point, it is possible that two threads might enter this
// method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
// ensures that each thread will get a unique value for the remaining messages.
- long trueCount = -1;
- long remainingCount = -1;
+ long trueCount;
+ long remainingCount;
synchronized (trafficLight)
{
@@ -890,18 +923,28 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// log.debug("remainingCount = " + remainingCount);
// log.debug("trueCount = " + trueCount);
- // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
- // blocked, even on the last message.
+ // Commit on transaction batch size boundaries. At this point in time the waiting producer
+ // remains blocked, even on the last message.
// Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on
// each batch boundary. For pub/sub each consumer gets every message so no division is done.
+ // When running in client ack mode, an ack is done instead of a commit, on the commit batch
+ // size boundaries.
long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
// log.debug("commitCount = " + commitCount);
if ((commitCount % _txBatchSize) == 0)
{
- // log.debug("Trying commit for consumer " + consumerNo + ".");
- commitTx(_consumerSession[consumerNo]);
- // log.info("Tx committed on consumer " + consumerNo);
+ if (_consAckMode == 2)
+ {
+ // log.debug("Doing client ack for consumer " + consumerNo + ".");
+ message.acknowledge();
+ }
+ else
+ {
+ // log.debug("Trying commit for consumer " + consumerNo + ".");
+ commitTx(_consumerSession[consumerNo]);
+ // log.info("Tx committed on consumer " + consumerNo);
+ }
}
// Forward the message and remaining count to any interested chained message listener.
@@ -927,18 +970,6 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
{
log.warn("Got redelivered message, ignoring.");
}
-
- // Print out ping times for every message in verbose mode only.
- /*if (_verbose)
- {
- Long timestamp = message.getLongProperty(MESSAGE_TIMESTAMP_PROPNAME);
-
- if (timestamp != null)
- {
- long diff = System.nanoTime() - timestamp;
- //log.trace("Time for round trip (nanos): " + diff);
- }
- }*/
}
catch (JMSException e)
{
@@ -1000,9 +1031,9 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// Send the specifed number of messages.
pingNoWaitForReply(message, numPings, messageCorrelationId);
- boolean timedOut = false;
- boolean allMessagesReceived = false;
- int numReplies = 0;
+ boolean timedOut;
+ boolean allMessagesReceived;
+ int numReplies;
do
{
@@ -1080,9 +1111,6 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
// Send all of the ping messages.
for (int i = 0; i < numPings; i++)
{
- // Reset the committed flag to indicate that there may be uncommitted messages.
- committed = false;
-
// Re-timestamp the message.
// message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
@@ -1125,16 +1153,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
Destination destination = _pingDestinations.get(i % _pingDestinations.size());
// Prompt the user to kill the broker when doing failover testing.
- if (_failBeforeSend)
- {
- if (_failOnce)
- {
- _failBeforeSend = false;
- }
-
- // log.trace("Failing Before Send");
- waitForUser(KILL_BROKER_PROMPT);
- }
+ _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
// If necessary, wait until the max pending message size comes within its limit.
synchronized (_sendPauseMonitor)
@@ -1193,28 +1212,24 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
}
// Send the message either to its round robin destination, or its default destination.
+ int num = numSent.incrementAndGet();
+ message.setIntProperty("MSG_NUM", num);
+ setTimestamp(message);
+
if (destination == null)
{
- int num = numSent.incrementAndGet();
- message.setIntProperty("MSG_NUM", num);
- setTimestamp(message);
_producer.send(message);
- // log.info("Message " + num + " sent.");
}
else
{
- int num = numSent.incrementAndGet();
- message.setIntProperty("MSG_NUM", num);
- setTimestamp(message);
_producer.send(destination, message);
- // log.info("Message " + num + " sent.");
}
// Increase the unreceived size, this may actually happen after the message is received.
// The unreceived size is incremented by the number of consumers that will get a copy of the message,
// in pub/sub mode.
// _unreceived.getAndIncrement();
- int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
+ /*int newUnreceivedCount =*/ _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
// log.debug("newUnreceivedCount = " + newUnreceivedCount);
// Apply message rate throttling if a rate limit has been set up.
@@ -1237,6 +1252,31 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
}
/**
+ * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the
+ * test that the failure has occurred, before the method returns.
+ *
+ * @param failFlag The fail flag to test.
+ *
+ * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only
+ * used once, then reset.
+ */
+ private boolean waitForUserToPromptOnFailure(boolean failFlag)
+ {
+ if (failFlag)
+ {
+ if (_failOnce)
+ {
+ failFlag = false;
+ }
+
+ // log.trace("Failing Before Send");
+ waitForUser(KILL_BROKER_PROMPT);
+ }
+
+ return failFlag;
+ }
+
+ /**
* Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch
* size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will
* terminate the pinger.
@@ -1294,15 +1334,16 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
*/
public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
{
- ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
-
- // Timestamp the message in nanoseconds.
-
- // setTimestamp(msg);
-
- return msg;
+ return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
}
+ /**
+ * Sets the current time in nanoseconds as the timestamp on the message.
+ *
+ * @param msg The message to timestamp.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
protected void setTimestamp(Message msg) throws JMSException
{
if (((AMQSession) _producerSession).isStrictAMQP())
@@ -1315,9 +1356,17 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
}
}
+ /**
+ * Extracts the nanosecond timestamp from a message.
+ *
+ * @param msg The message to extract the time stamp from.
+ *
+ * @return The timestamp in nanos.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
protected long getTimestamp(Message msg) throws JMSException
{
-
if (((AMQSession) _producerSession).isStrictAMQP())
{
Long value = ((AMQMessage) msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
@@ -1331,7 +1380,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
}
/**
- * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag
+ * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag
* has been cleared.
*/
public void stop()
@@ -1339,6 +1388,11 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
_publish = false;
}
+ /**
+ * Starts the producer and consumer connections.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
public void start() throws JMSException
{
// log.debug("public void start(): called");
@@ -1393,7 +1447,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
}
/**
- * Closes the pingers connection.
+ * Closes all of the producer and consumer connections.
*
* @throws JMSException All JMSException are allowed to fall through.
*/
@@ -1460,18 +1514,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
boolean committed = false;
- // log.trace("Batch time reached");
- if (_failAfterSend)
- {
- // log.trace("Batch size reached");
- if (_failOnce)
- {
- _failAfterSend = false;
- }
-
- // log.trace("Failing After Send");
- waitForUser(KILL_BROKER_PROMPT);
- }
+ _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend);
if (session.getTransacted())
{
@@ -1479,32 +1522,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti
try
{
- if (_failBeforeCommit)
- {
- if (_failOnce)
- {
- _failBeforeCommit = false;
- }
-
- // log.trace("Failing Before Commit");
- waitForUser(KILL_BROKER_PROMPT);
- }
+ _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit);
- long start = System.nanoTime();
+ // long start = System.nanoTime();
session.commit();
committed = true;
// log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");
- if (_failAfterCommit)
- {
- if (_failOnce)
- {
- _failAfterCommit = false;
- }
-
- // log.trace("Failing After Commit");
- waitForUser(KILL_BROKER_PROMPT);
- }
+ _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit);
// log.debug("Session Commited.");
}