From 0133aa9d1f0224a7bea9a2684261d8bc09e64dad Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Wed, 3 Oct 2007 11:23:08 +0000 Subject: Added reliability tests for all ack modes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@581566 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/requestreply/PingPongProducer.java | 225 ++++++++++++--------- 1 file changed, 125 insertions(+), 100 deletions(-) (limited to 'java/perftests/src') diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index ed77332e19..bca67bb0ce 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -154,7 +154,10 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti /** Holds the transactional mode to use for the test. */ public static final boolean TRANSACTED_DEFAULT = false; + /** Holds the name of the property to get the test consumer transacted mode from. */ public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted"; + + /** Holds the consumer transactional mode default setting. */ public static final boolean CONSUMER_TRANSACTED_DEFAULT = false; /** Holds the name of the property to get the test broker url from. */ @@ -265,7 +268,10 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti /** Defines the default value for the unique destinations property. */ public static final boolean UNIQUE_DESTS_DEFAULT = true; + /** Holds the name of the property to get the durable destinations flag from. */ public static final String DURABLE_DESTS_PROPNAME = "durableDests"; + + /** Defines the default value of the durable destinations flag. */ public static final boolean DURABLE_DESTS_DEFAULT = false; /** Holds the name of the proeprty to get the message acknowledgement mode from. */ @@ -274,10 +280,16 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti /** Defines the default message acknowledgement mode. */ public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + /** Holds the name of the property to get the consumers message acknowledgement mode from. */ public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode"; + + /** Defines the default consumers message acknowledgement mode. */ public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + /** Holds the name of the property to get the maximum pending message size setting from. */ public static final String MAX_PENDING_PROPNAME = "maxPending"; + + /** Defines the default value for the maximum pending message size setting. 0 means no limit. */ public static final int MAX_PENDING_DEFAULT = 0; /** Defines the default prefetch size to use when consuming messages. */ @@ -326,21 +338,37 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); } + /** Holds the broker url. */ protected String _brokerDetails; + + /** Holds the username to access the broker with. */ protected String _username; + + /** Holds the password to access the broker with. */ protected String _password; + + /** Holds the virtual host on the broker to run the tests through. */ protected String _virtualpath; + + /** Holds the root name from which to generate test destination names. */ protected String _destinationName; + + /** Holds the message selector to filter the pings with. */ protected String _selector; + + /** Holds the producers transactional mode flag. */ protected boolean _transacted; + + /** Holds the consumers transactional mode flag. */ protected boolean _consTransacted; /** Determines whether this producer sends persistent messages. */ protected boolean _persistent; - /** Holds the acknowledgement mode used for sending and receiving messages. */ + /** Holds the acknowledgement mode used for the producers. */ protected int _ackMode; + /** Holds the acknowledgement mode setting for the consumers. */ protected int _consAckMode; /** Determines what size of messages this producer sends. */ @@ -395,7 +423,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected * to wait until the number of unreceived message is reduced before continuing to send. */ - protected Object _sendPauseMonitor = new Object(); + protected static final Object _sendPauseMonitor = new Object(); /** Keeps a count of the number of message currently sent but not received. */ protected static AtomicInteger _unreceived = new AtomicInteger(0); @@ -413,7 +441,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti /** A convenient formatter to use when time stamping output. */ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - /** Holds the connection to the broker. */ + /** Holds the connection for the message producer. */ protected Connection _connection; /** Holds the consumer connections. */ @@ -460,6 +488,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti /** The prompt to display when asking the user to kill the broker for failover testing. */ private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; + + /** Holds the name for this test client to be identified to the broker with. */ private String _clientID; /** Keeps count of the total messages sent purely for debugging purposes. */ @@ -470,7 +500,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on * it, to send and recieve its pings and replies on. * - * @param overrides Properties containing any desired overrides to the defaults. + * @param overrides Properties containing any desired overrides to the defaults. * * @throws Exception Any exceptions are allowed to fall through. */ @@ -506,8 +536,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); - _ackMode = properties.getPropertyAsInteger(ACK_MODE_PROPNAME); - _consAckMode = properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); + _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME); + _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); // Check that one or more destinations were specified. @@ -696,13 +726,15 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * Creates consumers for the specified number of destinations. The destinations themselves are also created by this * method. * - * @param noOfDestinations The number of destinations to create consumers for. - * @param selector The message selector to filter the consumers with. - * @param rootName The root of the name, or actual name if only one is being created. - * @param unique true to make the destinations unique to this pinger, false to share the - * numbering with all pingers on the same JVM. + * @param noOfDestinations The number of destinations to create consumers for. + * @param selector The message selector to filter the consumers with. + * @param rootName The root of the name, or actual name if only one is being created. + * @param unique true to make the destinations unique to this pinger, false to share the + * numbering with all pingers on the same JVM. + * @param durable If the destinations are durable topics. * * @throws JMSException Any JMSExceptions are allowed to fall through. + * @throws AMQException Any AMQExceptions are allowed to fall through. */ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, boolean durable) throws JMSException, AMQException @@ -817,7 +849,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the * replies map. * - * @param message The received message. + * @param message The received message. + * @param consumerNo The consumer number within this test pinger instance. */ public void onMessageWithConsumerNo(Message message, int consumerNo) { @@ -834,7 +867,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti String correlationID = message.getJMSCorrelationID(); // log.debug("correlationID = " + correlationID); - int num = message.getIntProperty("MSG_NUM"); + // int num = message.getIntProperty("MSG_NUM"); // log.info("Message " + num + " received."); boolean isRedelivered = message.getJMSRedelivered(); @@ -857,8 +890,8 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Decrement the countdown latch. Before this point, it is possible that two threads might enter this // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block // ensures that each thread will get a unique value for the remaining messages. - long trueCount = -1; - long remainingCount = -1; + long trueCount; + long remainingCount; synchronized (trafficLight) { @@ -890,18 +923,28 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // log.debug("remainingCount = " + remainingCount); // log.debug("trueCount = " + trueCount); - // Commit on transaction batch size boundaries. At this point in time the waiting producer remains - // blocked, even on the last message. + // Commit on transaction batch size boundaries. At this point in time the waiting producer + // remains blocked, even on the last message. // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on // each batch boundary. For pub/sub each consumer gets every message so no division is done. + // When running in client ack mode, an ack is done instead of a commit, on the commit batch + // size boundaries. long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); // log.debug("commitCount = " + commitCount); if ((commitCount % _txBatchSize) == 0) { - // log.debug("Trying commit for consumer " + consumerNo + "."); - commitTx(_consumerSession[consumerNo]); - // log.info("Tx committed on consumer " + consumerNo); + if (_consAckMode == 2) + { + // log.debug("Doing client ack for consumer " + consumerNo + "."); + message.acknowledge(); + } + else + { + // log.debug("Trying commit for consumer " + consumerNo + "."); + commitTx(_consumerSession[consumerNo]); + // log.info("Tx committed on consumer " + consumerNo); + } } // Forward the message and remaining count to any interested chained message listener. @@ -927,18 +970,6 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti { log.warn("Got redelivered message, ignoring."); } - - // Print out ping times for every message in verbose mode only. - /*if (_verbose) - { - Long timestamp = message.getLongProperty(MESSAGE_TIMESTAMP_PROPNAME); - - if (timestamp != null) - { - long diff = System.nanoTime() - timestamp; - //log.trace("Time for round trip (nanos): " + diff); - } - }*/ } catch (JMSException e) { @@ -1000,9 +1031,9 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Send the specifed number of messages. pingNoWaitForReply(message, numPings, messageCorrelationId); - boolean timedOut = false; - boolean allMessagesReceived = false; - int numReplies = 0; + boolean timedOut; + boolean allMessagesReceived; + int numReplies; do { @@ -1080,9 +1111,6 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti // Send all of the ping messages. for (int i = 0; i < numPings; i++) { - // Reset the committed flag to indicate that there may be uncommitted messages. - committed = false; - // Re-timestamp the message. // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); @@ -1125,16 +1153,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti Destination destination = _pingDestinations.get(i % _pingDestinations.size()); // Prompt the user to kill the broker when doing failover testing. - if (_failBeforeSend) - { - if (_failOnce) - { - _failBeforeSend = false; - } - - // log.trace("Failing Before Send"); - waitForUser(KILL_BROKER_PROMPT); - } + _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend); // If necessary, wait until the max pending message size comes within its limit. synchronized (_sendPauseMonitor) @@ -1193,28 +1212,24 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti } // Send the message either to its round robin destination, or its default destination. + int num = numSent.incrementAndGet(); + message.setIntProperty("MSG_NUM", num); + setTimestamp(message); + if (destination == null) { - int num = numSent.incrementAndGet(); - message.setIntProperty("MSG_NUM", num); - setTimestamp(message); _producer.send(message); - // log.info("Message " + num + " sent."); } else { - int num = numSent.incrementAndGet(); - message.setIntProperty("MSG_NUM", num); - setTimestamp(message); _producer.send(destination, message); - // log.info("Message " + num + " sent."); } // Increase the unreceived size, this may actually happen after the message is received. // The unreceived size is incremented by the number of consumers that will get a copy of the message, // in pub/sub mode. // _unreceived.getAndIncrement(); - int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); + /*int newUnreceivedCount =*/ _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); // log.debug("newUnreceivedCount = " + newUnreceivedCount); // Apply message rate throttling if a rate limit has been set up. @@ -1236,6 +1251,31 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti return committed; } + /** + * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the + * test that the failure has occurred, before the method returns. + * + * @param failFlag The fail flag to test. + * + * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only + * used once, then reset. + */ + private boolean waitForUserToPromptOnFailure(boolean failFlag) + { + if (failFlag) + { + if (_failOnce) + { + failFlag = false; + } + + // log.trace("Failing Before Send"); + waitForUser(KILL_BROKER_PROMPT); + } + + return failFlag; + } + /** * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will @@ -1294,15 +1334,16 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti */ public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException { - ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); - - // Timestamp the message in nanoseconds. - - // setTimestamp(msg); - - return msg; + return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); } + /** + * Sets the current time in nanoseconds as the timestamp on the message. + * + * @param msg The message to timestamp. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ protected void setTimestamp(Message msg) throws JMSException { if (((AMQSession) _producerSession).isStrictAMQP()) @@ -1315,9 +1356,17 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti } } + /** + * Extracts the nanosecond timestamp from a message. + * + * @param msg The message to extract the time stamp from. + * + * @return The timestamp in nanos. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ protected long getTimestamp(Message msg) throws JMSException { - if (((AMQSession) _producerSession).isStrictAMQP()) { Long value = ((AMQMessage) msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME)); @@ -1331,7 +1380,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti } /** - * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag + * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag * has been cleared. */ public void stop() @@ -1339,6 +1388,11 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti _publish = false; } + /** + * Starts the producer and consumer connections. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ public void start() throws JMSException { // log.debug("public void start(): called"); @@ -1393,7 +1447,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti } /** - * Closes the pingers connection. + * Closes all of the producer and consumer connections. * * @throws JMSException All JMSException are allowed to fall through. */ @@ -1460,18 +1514,7 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti boolean committed = false; - // log.trace("Batch time reached"); - if (_failAfterSend) - { - // log.trace("Batch size reached"); - if (_failOnce) - { - _failAfterSend = false; - } - - // log.trace("Failing After Send"); - waitForUser(KILL_BROKER_PROMPT); - } + _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend); if (session.getTransacted()) { @@ -1479,32 +1522,14 @@ public class PingPongProducer implements Runnable /*, MessageListener*/, Excepti try { - if (_failBeforeCommit) - { - if (_failOnce) - { - _failBeforeCommit = false; - } - - // log.trace("Failing Before Commit"); - waitForUser(KILL_BROKER_PROMPT); - } + _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit); - long start = System.nanoTime(); + // long start = System.nanoTime(); session.commit(); committed = true; // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); - if (_failAfterCommit) - { - if (_failOnce) - { - _failAfterCommit = false; - } - - // log.trace("Failing After Commit"); - waitForUser(KILL_BROKER_PROMPT); - } + _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit); // log.debug("Session Commited."); } -- cgit v1.2.1