diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2008-04-24 17:49:03 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2008-04-24 17:49:03 +0000 |
| commit | e32debe1df7d0a837e30cd937fb7a18fc5cfa203 (patch) | |
| tree | eda0dabab6fa1855d05df1ce8cab3f823f3e56ed /qpid/java/perftests | |
| parent | 1a18bfd552d2065df2da734b42258249b01c4271 (diff) | |
| download | qpid-python-e32debe1df7d0a837e30cd937fb7a18fc5cfa203.tar.gz | |
QPID-832 : Fix eol-style
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@651325 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests')
9 files changed, 3782 insertions, 3782 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java index b9632eee4c..0afec83b19 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java @@ -1,107 +1,107 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.requestreply.PingPongProducer;
-
-import javax.jms.Destination;
-
-import java.util.List;
-import java.util.Properties;
-
-/**
- * PingClient is a {@link PingPongProducer} that does not need a {@link org.apache.qpid.requestreply.PingPongBouncer}
- * to send replies to its pings. It simply listens to its own ping destinations, rather than seperate reply queues.
- * It is an all in one ping client, that produces and consumes its own pings.
- *
- * <p/>The constructor increments a count of the number of ping clients created. It is assumed that where many
- * are created they will all be run in parallel and be active in sending and consuming pings at the same time.
- * If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear
- * pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of
- * active ping clients. The {@link #getConsumersPerDestination()} method is used to supply this multiplier under these
- * conditions.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Create a ping producer that listens to its own pings <td> {@link PingPongProducer}
- * <tr><td> Count the number of ping producers and produce multiplier for scaling up messages expected over topic pings.
- * </table>
- */
-public class PingClient extends PingPongProducer
-{
- /** Used for debugging. */
- private final Logger log = Logger.getLogger(PingClient.class);
-
- /** Used to count the number of ping clients created. */
- private static int _pingClientCount;
-
- /**
- * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
- * for {@link PingPongProducer} for details. This constructor creates a connection to the broker and creates
- * producer and consumer sessions on it, to send and recieve its pings and replies on.
- *
- * @param overrides Properties containing any desired overrides to the defaults.
- *
- * @throws Exception Any exceptions are allowed to fall through.
- */
- public PingClient(Properties overrides) throws Exception
- {
- super(overrides);
-
- _pingClientCount++;
- }
-
- /**
- * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the
- * effect of making this pinger listen to its own pings.
- *
- * @return The ping destinations.
- */
- public List<Destination> getReplyDestinations()
- {
- return _pingDestinations;
- }
-
- /**
- * Supplies the multiplier for the number of ping clients that will hear each ping when doing pub/sub pinging.
- *
- * @return The scaling up of the number of expected pub/sub pings.
- */
- public int getConsumersPerDestination()
- {
- log.debug("public int getConsumersPerDestination(): called");
-
- if (_isUnique)
- {
- log.debug(_noOfConsumers + " consumer per destination.");
-
- return _noOfConsumers;
- }
- else
- {
- log.debug((_pingClientCount * _noOfConsumers) + " consumers per destination.");
-
- return _pingClientCount * _noOfConsumers;
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import javax.jms.Destination; + +import java.util.List; +import java.util.Properties; + +/** + * PingClient is a {@link PingPongProducer} that does not need a {@link org.apache.qpid.requestreply.PingPongBouncer} + * to send replies to its pings. It simply listens to its own ping destinations, rather than seperate reply queues. + * It is an all in one ping client, that produces and consumes its own pings. + * + * <p/>The constructor increments a count of the number of ping clients created. It is assumed that where many + * are created they will all be run in parallel and be active in sending and consuming pings at the same time. + * If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear + * pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of + * active ping clients. The {@link #getConsumersPerDestination()} method is used to supply this multiplier under these + * conditions. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Create a ping producer that listens to its own pings <td> {@link PingPongProducer} + * <tr><td> Count the number of ping producers and produce multiplier for scaling up messages expected over topic pings. + * </table> + */ +public class PingClient extends PingPongProducer +{ + /** Used for debugging. */ + private final Logger log = Logger.getLogger(PingClient.class); + + /** Used to count the number of ping clients created. */ + private static int _pingClientCount; + + /** + * Creates a ping producer with the specified parameters, of which there are many. See the class level comments + * for {@link PingPongProducer} for details. This constructor creates a connection to the broker and creates + * producer and consumer sessions on it, to send and recieve its pings and replies on. + * + * @param overrides Properties containing any desired overrides to the defaults. + * + * @throws Exception Any exceptions are allowed to fall through. + */ + public PingClient(Properties overrides) throws Exception + { + super(overrides); + + _pingClientCount++; + } + + /** + * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the + * effect of making this pinger listen to its own pings. + * + * @return The ping destinations. + */ + public List<Destination> getReplyDestinations() + { + return _pingDestinations; + } + + /** + * Supplies the multiplier for the number of ping clients that will hear each ping when doing pub/sub pinging. + * + * @return The scaling up of the number of expected pub/sub pings. + */ + public int getConsumersPerDestination() + { + log.debug("public int getConsumersPerDestination(): called"); + + if (_isUnique) + { + log.debug(_noOfConsumers + " consumer per destination."); + + return _noOfConsumers; + } + else + { + log.debug((_pingClientCount * _noOfConsumers) + " consumers per destination."); + + return _pingClientCount * _noOfConsumers; + } + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java index 0c8c19243a..a15897c82b 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java @@ -1,452 +1,452 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.requestreply.PingPongProducer;
-import org.apache.qpid.util.CommandLineParser;
-
-import org.apache.qpid.junit.extensions.util.MathUtils;
-import org.apache.qpid.junit.extensions.util.ParsedProperties;
-
-import javax.jms.*;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and
- * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop
- * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the
- * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with
- * failure conditions when using durable messaging.
- *
- * <p/>The events that can stop it from sending are input from the user on the console, failure of its connection to
- * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases
- * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings
- * with.
- *
- * <p/>The event to re-connect and attempt to recieve the pings is input from the user on the console.
- *
- * <p/>This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and
- * additionally accepts the following parameters:
- *
- * <p/><table><caption>Parameters</caption>
- * <tr><th> Parameter <th> Default <th> Comments
- * <tr><td> numMessages <td> 100 <td> The total number of messages to send.
- * <tr><td> numMessagesToAction <td> -1 <td> The number of messages to send before taking a custom 'action'.
- * <tr><td> duration <td> 30S <td> The length of time to ping for. (Format dDhHmMsS, for d days, h hours,
- * m minutes and s seconds).
- * </table>
- *
- * <p/>This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up
- * when no parameters are specified.
- *
- * <p/><table><caption>Parameters</caption>
- * <tr><th> Parameter <th> Default <th> Comments
- * <tr><td> uniqueDests <td> false <td> Prevents destination names being timestamped.
- * <tr><td> transacted <td> true <td> Only makes sense to test with transactions.
- * <tr><td> persistent <td> true <td> Only makes sense to test persistent.
- * <tr><td> durableDests <td> true <td> Should use durable queues with persistent messages.
- * <tr><td> commitBatchSize <td> 10
- * <tr><td> rate <td> 20 <td> Total default test time is 5 seconds.
- * </table>
- *
- * <p/>When a number of messages or duration is specified, this ping client will ping until the first of those limits
- * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will
- * wait for the second signal before receiving its pings.
- *
- * <p/>This class provides a mechanism for extensions to add arbitrary actions, after a particular number of messages
- * have been sent. When the number of messages equal the value set in the 'numMessagesToAction' property is method,
- * the {@link #takeAction} method is called. By default this does nothing, but extensions of this class can provide
- * custom behaviour with alternative implementations of this method (for example taking a backup).
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Send and receive pings.
- * <tr><td> Accept user input to signal stop sending.
- * <tr><td> Accept user input to signal start receiving.
- * <tr><td> Provide feedback on pings sent versus pings received.
- * <tr><td> Provide extension point for arbitrary action on a particular message count.
- * </table>
- */
-public class PingDurableClient extends PingPongProducer implements ExceptionListener
-{
- private static final Logger log = Logger.getLogger(PingDurableClient.class);
-
- public static final String NUM_MESSAGES_PROPNAME = "numMessages";
- public static final String NUM_MESSAGES_DEFAULT = "100";
- public static final String DURATION_PROPNAME = "duration";
- public static final String DURATION_DEFAULT = "30S";
- public static final String NUM_MESSAGES_TO_ACTION_PROPNAME = "numMessagesToAction";
- public static final String NUM_MESSAGES_TO_ACTION_DEFAULT = "-1";
-
- /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */
- private static final long TIME_OUT = 3000;
-
- static
- {
- defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT);
- defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT);
- defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false");
- defaults.setProperty(TRANSACTED_PROPNAME, "true");
- defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
- defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
- defaults.setProperty(RATE_PROPNAME, "20");
- defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
- defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT);
- }
-
- /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */
- private int numMessages;
-
- /** Holds the number of messages to send before taking triggering the action. */
- private int numMessagesToAction;
-
- /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */
- private long duration;
-
- /** Used to indciate that this application should terminate. Set by the shutdown hook. */
- private boolean terminate = false;
-
- /**
- * @throws Exception Any exceptions are allowed to fall through.
- */
- public PingDurableClient(Properties overrides) throws Exception
- {
- super(overrides);
- log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called");
-
- // Extract the additional configuration parameters.
- ParsedProperties properties = new ParsedProperties(defaults);
- properties.putAll(overrides);
-
- numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME);
- String durationSpec = properties.getProperty(DURATION_PROPNAME);
- numMessagesToAction = properties.getPropertyAsInteger(NUM_MESSAGES_TO_ACTION_PROPNAME);
-
- if (durationSpec != null)
- {
- duration = MathUtils.parseDuration(durationSpec) * 1000000;
- }
- }
-
- /**
- * Starts the ping/wait/receive process.
- *
- * @param args The command line arguments.
- */
- public static void main(String[] args)
- {
- try
- {
- // Create a ping producer overriding its defaults with all options passed on the command line.
- Properties options =
- CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
- PingDurableClient pingProducer = new PingDurableClient(options);
-
- // Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
-
- // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- // pingProducer.getConnection().setExceptionListener(pingProducer);
-
- // Run the test procedure.
- int sent = pingProducer.send();
- pingProducer.closeConnection();
- pingProducer.waitForUser("Press return to begin receiving the pings.");
- pingProducer.receive(sent);
-
- System.exit(0);
- }
- catch (Exception e)
- {
- System.err.println(e.getMessage());
- log.error("Top level handler caught execption.", e);
- System.exit(1);
- }
- }
-
- /**
- * Performs the main test procedure implemented by this ping client. See the class level comment for details.
- */
- protected int send() throws Exception
- {
- log.debug("public void sendWaitReceive(): called");
-
- log.debug("duration = " + duration);
- log.debug("numMessages = " + numMessages);
-
- if (duration > 0)
- {
- System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds.");
- }
-
- if (_rate > 0)
- {
- System.out.println("Sending at " + _rate + " messages per second.");
- }
-
- if (numMessages > 0)
- {
- System.out.println("Sending up to " + numMessages + " messages.");
- }
-
- // Establish the connection and the message producer.
- establishConnection(true, false);
- _connection.start();
-
- Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
-
- // Send pings until a terminating condition is received.
- boolean endCondition = false;
- int messagesSent = 0;
- int messagesCommitted = 0;
- int messagesNotCommitted = 0;
- long start = System.nanoTime();
-
- // Clear console in.
- clearConsole();
-
- while (!endCondition)
- {
- boolean committed = false;
-
- try
- {
- committed = sendMessage(messagesSent, message) && _transacted;
-
- messagesSent++;
- messagesNotCommitted++;
-
- // Keep count of the number of messsages currently committed and pending commit.
- if (committed)
- {
- log.debug("Adding " + messagesNotCommitted + " messages to the committed count.");
- messagesCommitted += messagesNotCommitted;
- messagesNotCommitted = 0;
-
- System.out.println("Commited: " + messagesCommitted);
- }
- }
- catch (JMSException e)
- {
- log.debug("Got JMSException whilst sending.");
- _publish = false;
- }
-
- // Perform the arbitrary action if the number of messages sent has reached the right number.
- if (messagesSent == numMessagesToAction)
- {
- System.out.println("At action point, Messages sent = " + messagesSent + ", Messages Committed = "
- + messagesCommitted + ", Messages not Committed = " + messagesNotCommitted);
- takeAction();
- }
-
- // Determine if the end condition has been met, based on the number of messages, time passed, errors on
- // the connection or user input.
- long now = System.nanoTime();
-
- if ((duration != 0) && ((now - start) > duration))
- {
- System.out.println("Send halted because duration expired.");
- endCondition = true;
- }
- else if ((numMessages != 0) && (messagesSent >= numMessages))
- {
- System.out.println("Send halted because # messages completed.");
- endCondition = true;
- }
- else if (System.in.available() > 0)
- {
- System.out.println("Send halted by user input.");
- endCondition = true;
-
- clearConsole();
- }
- else if (!_publish)
- {
- System.out.println("Send halted by error on the connection.");
- endCondition = true;
- }
- }
-
- log.debug("messagesSent = " + messagesSent);
- log.debug("messagesCommitted = " + messagesCommitted);
- log.debug("messagesNotCommitted = " + messagesNotCommitted);
-
- System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted
- + ", Messages not Committed = " + messagesNotCommitted);
-
- return messagesSent;
- }
-
- protected void closeConnection()
- {
- // Clean up the connection.
- try
- {
- close();
- }
- catch (JMSException e)
- {
- log.debug("There was an error whilst closing the connection: " + e, e);
- System.out.println("There was an error whilst closing the connection.");
-
- // Ignore as did best could manage to clean up.
- }
- }
-
- protected void receive(int messagesSent) throws Exception
- {
- // Re-establish the connection and the message consumer.
- _queueJVMSequenceID = new AtomicInteger();
- _queueSharedID = new AtomicInteger();
-
- establishConnection(false, true);
- _consumer[0].setMessageListener(null);
- _consumerConnection[0].start();
-
- // Try to receive all of the pings that were successfully sent.
- int messagesReceived = 0;
- boolean endCondition = false;
-
- while (!endCondition)
- {
- // Message received = _consumer.receiveNoWait();
- Message received = _consumer[0].receive(TIME_OUT);
- log.debug("received = " + received);
-
- if (received != null)
- {
- messagesReceived++;
- }
-
- // Determine if the end condition has been met, based on the number of messages and time passed since last
- // receiving a message.
- if (received == null)
- {
- System.out.println("Timed out.");
- endCondition = true;
- }
- else if (messagesReceived >= messagesSent)
- {
- System.out.println("Got all messages.");
- endCondition = true;
- }
- }
-
- // Ensure messages received are committed.
- if (_consTransacted)
- {
- try
- {
- _consumerSession[0].commit();
- System.out.println("Committed for all messages received.");
- }
- catch (JMSException e)
- {
- log.debug("Error during commit: " + e, e);
- System.out.println("Error during commit.");
- try
- {
- _consumerSession[0].rollback();
- System.out.println("Rolled back on all messages received.");
- }
- catch (JMSException e2)
- {
- log.debug("Error during rollback: " + e, e);
- System.out.println("Error on roll back of all messages received.");
- }
-
- }
- }
-
- log.debug("messagesReceived = " + messagesReceived);
-
- System.out.println("Messages received: " + messagesReceived);
-
- // Clean up the connection.
- close();
- }
-
- /**
- * Clears any pending input from the console.
- */
- private void clearConsole()
- {
- try
- {
- BufferedReader bis = new BufferedReader(new InputStreamReader(System.in));
-
- // System.in.skip(System.in.available());
- while (bis.ready())
- {
- bis.readLine();
- }
- }
- catch (IOException e)
- { }
- }
-
- /**
- * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the
- * effect of making this pinger listen to its own pings.
- *
- * @return The ping destinations.
- */
- public List<Destination> getReplyDestinations()
- {
- return _pingDestinations;
- }
-
- /**
- * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with
- * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the
- * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving
- * message should stop, not that the application should termiante.
- *
- * @return A shutdown hook for the ping loop.
- */
- public Thread getShutdownHook()
- {
- return new Thread(new Runnable()
- {
- public void run()
- {
- stop();
- terminate = true;
- }
- });
- }
-
- /**
- * Performs an aribtrary action once the 'numMesagesToAction' count is reached on sending messages. This default
- * implementation does nothing.
- */
- public void takeAction()
- { }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; +import org.apache.qpid.util.CommandLineParser; + +import org.apache.qpid.junit.extensions.util.MathUtils; +import org.apache.qpid.junit.extensions.util.ParsedProperties; + +import javax.jms.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and + * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop + * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the + * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with + * failure conditions when using durable messaging. + * + * <p/>The events that can stop it from sending are input from the user on the console, failure of its connection to + * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases + * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings + * with. + * + * <p/>The event to re-connect and attempt to recieve the pings is input from the user on the console. + * + * <p/>This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and + * additionally accepts the following parameters: + * + * <p/><table><caption>Parameters</caption> + * <tr><th> Parameter <th> Default <th> Comments + * <tr><td> numMessages <td> 100 <td> The total number of messages to send. + * <tr><td> numMessagesToAction <td> -1 <td> The number of messages to send before taking a custom 'action'. + * <tr><td> duration <td> 30S <td> The length of time to ping for. (Format dDhHmMsS, for d days, h hours, + * m minutes and s seconds). + * </table> + * + * <p/>This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up + * when no parameters are specified. + * + * <p/><table><caption>Parameters</caption> + * <tr><th> Parameter <th> Default <th> Comments + * <tr><td> uniqueDests <td> false <td> Prevents destination names being timestamped. + * <tr><td> transacted <td> true <td> Only makes sense to test with transactions. + * <tr><td> persistent <td> true <td> Only makes sense to test persistent. + * <tr><td> durableDests <td> true <td> Should use durable queues with persistent messages. + * <tr><td> commitBatchSize <td> 10 + * <tr><td> rate <td> 20 <td> Total default test time is 5 seconds. + * </table> + * + * <p/>When a number of messages or duration is specified, this ping client will ping until the first of those limits + * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will + * wait for the second signal before receiving its pings. + * + * <p/>This class provides a mechanism for extensions to add arbitrary actions, after a particular number of messages + * have been sent. When the number of messages equal the value set in the 'numMessagesToAction' property is method, + * the {@link #takeAction} method is called. By default this does nothing, but extensions of this class can provide + * custom behaviour with alternative implementations of this method (for example taking a backup). + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Send and receive pings. + * <tr><td> Accept user input to signal stop sending. + * <tr><td> Accept user input to signal start receiving. + * <tr><td> Provide feedback on pings sent versus pings received. + * <tr><td> Provide extension point for arbitrary action on a particular message count. + * </table> + */ +public class PingDurableClient extends PingPongProducer implements ExceptionListener +{ + private static final Logger log = Logger.getLogger(PingDurableClient.class); + + public static final String NUM_MESSAGES_PROPNAME = "numMessages"; + public static final String NUM_MESSAGES_DEFAULT = "100"; + public static final String DURATION_PROPNAME = "duration"; + public static final String DURATION_DEFAULT = "30S"; + public static final String NUM_MESSAGES_TO_ACTION_PROPNAME = "numMessagesToAction"; + public static final String NUM_MESSAGES_TO_ACTION_DEFAULT = "-1"; + + /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */ + private static final long TIME_OUT = 3000; + + static + { + defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT); + defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT); + defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false"); + defaults.setProperty(TRANSACTED_PROPNAME, "true"); + defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true"); + defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10"); + defaults.setProperty(RATE_PROPNAME, "20"); + defaults.setProperty(DURABLE_DESTS_PROPNAME, "true"); + defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT); + } + + /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */ + private int numMessages; + + /** Holds the number of messages to send before taking triggering the action. */ + private int numMessagesToAction; + + /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */ + private long duration; + + /** Used to indciate that this application should terminate. Set by the shutdown hook. */ + private boolean terminate = false; + + /** + * @throws Exception Any exceptions are allowed to fall through. + */ + public PingDurableClient(Properties overrides) throws Exception + { + super(overrides); + log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called"); + + // Extract the additional configuration parameters. + ParsedProperties properties = new ParsedProperties(defaults); + properties.putAll(overrides); + + numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME); + String durationSpec = properties.getProperty(DURATION_PROPNAME); + numMessagesToAction = properties.getPropertyAsInteger(NUM_MESSAGES_TO_ACTION_PROPNAME); + + if (durationSpec != null) + { + duration = MathUtils.parseDuration(durationSpec) * 1000000; + } + } + + /** + * Starts the ping/wait/receive process. + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Create a ping producer overriding its defaults with all options passed on the command line. + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); + PingDurableClient pingProducer = new PingDurableClient(options); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + // pingProducer.getConnection().setExceptionListener(pingProducer); + + // Run the test procedure. + int sent = pingProducer.send(); + pingProducer.closeConnection(); + pingProducer.waitForUser("Press return to begin receiving the pings."); + pingProducer.receive(sent); + + System.exit(0); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + /** + * Performs the main test procedure implemented by this ping client. See the class level comment for details. + */ + protected int send() throws Exception + { + log.debug("public void sendWaitReceive(): called"); + + log.debug("duration = " + duration); + log.debug("numMessages = " + numMessages); + + if (duration > 0) + { + System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds."); + } + + if (_rate > 0) + { + System.out.println("Sending at " + _rate + " messages per second."); + } + + if (numMessages > 0) + { + System.out.println("Sending up to " + numMessages + " messages."); + } + + // Establish the connection and the message producer. + establishConnection(true, false); + _connection.start(); + + Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); + + // Send pings until a terminating condition is received. + boolean endCondition = false; + int messagesSent = 0; + int messagesCommitted = 0; + int messagesNotCommitted = 0; + long start = System.nanoTime(); + + // Clear console in. + clearConsole(); + + while (!endCondition) + { + boolean committed = false; + + try + { + committed = sendMessage(messagesSent, message) && _transacted; + + messagesSent++; + messagesNotCommitted++; + + // Keep count of the number of messsages currently committed and pending commit. + if (committed) + { + log.debug("Adding " + messagesNotCommitted + " messages to the committed count."); + messagesCommitted += messagesNotCommitted; + messagesNotCommitted = 0; + + System.out.println("Commited: " + messagesCommitted); + } + } + catch (JMSException e) + { + log.debug("Got JMSException whilst sending."); + _publish = false; + } + + // Perform the arbitrary action if the number of messages sent has reached the right number. + if (messagesSent == numMessagesToAction) + { + System.out.println("At action point, Messages sent = " + messagesSent + ", Messages Committed = " + + messagesCommitted + ", Messages not Committed = " + messagesNotCommitted); + takeAction(); + } + + // Determine if the end condition has been met, based on the number of messages, time passed, errors on + // the connection or user input. + long now = System.nanoTime(); + + if ((duration != 0) && ((now - start) > duration)) + { + System.out.println("Send halted because duration expired."); + endCondition = true; + } + else if ((numMessages != 0) && (messagesSent >= numMessages)) + { + System.out.println("Send halted because # messages completed."); + endCondition = true; + } + else if (System.in.available() > 0) + { + System.out.println("Send halted by user input."); + endCondition = true; + + clearConsole(); + } + else if (!_publish) + { + System.out.println("Send halted by error on the connection."); + endCondition = true; + } + } + + log.debug("messagesSent = " + messagesSent); + log.debug("messagesCommitted = " + messagesCommitted); + log.debug("messagesNotCommitted = " + messagesNotCommitted); + + System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted + + ", Messages not Committed = " + messagesNotCommitted); + + return messagesSent; + } + + protected void closeConnection() + { + // Clean up the connection. + try + { + close(); + } + catch (JMSException e) + { + log.debug("There was an error whilst closing the connection: " + e, e); + System.out.println("There was an error whilst closing the connection."); + + // Ignore as did best could manage to clean up. + } + } + + protected void receive(int messagesSent) throws Exception + { + // Re-establish the connection and the message consumer. + _queueJVMSequenceID = new AtomicInteger(); + _queueSharedID = new AtomicInteger(); + + establishConnection(false, true); + _consumer[0].setMessageListener(null); + _consumerConnection[0].start(); + + // Try to receive all of the pings that were successfully sent. + int messagesReceived = 0; + boolean endCondition = false; + + while (!endCondition) + { + // Message received = _consumer.receiveNoWait(); + Message received = _consumer[0].receive(TIME_OUT); + log.debug("received = " + received); + + if (received != null) + { + messagesReceived++; + } + + // Determine if the end condition has been met, based on the number of messages and time passed since last + // receiving a message. + if (received == null) + { + System.out.println("Timed out."); + endCondition = true; + } + else if (messagesReceived >= messagesSent) + { + System.out.println("Got all messages."); + endCondition = true; + } + } + + // Ensure messages received are committed. + if (_consTransacted) + { + try + { + _consumerSession[0].commit(); + System.out.println("Committed for all messages received."); + } + catch (JMSException e) + { + log.debug("Error during commit: " + e, e); + System.out.println("Error during commit."); + try + { + _consumerSession[0].rollback(); + System.out.println("Rolled back on all messages received."); + } + catch (JMSException e2) + { + log.debug("Error during rollback: " + e, e); + System.out.println("Error on roll back of all messages received."); + } + + } + } + + log.debug("messagesReceived = " + messagesReceived); + + System.out.println("Messages received: " + messagesReceived); + + // Clean up the connection. + close(); + } + + /** + * Clears any pending input from the console. + */ + private void clearConsole() + { + try + { + BufferedReader bis = new BufferedReader(new InputStreamReader(System.in)); + + // System.in.skip(System.in.available()); + while (bis.ready()) + { + bis.readLine(); + } + } + catch (IOException e) + { } + } + + /** + * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the + * effect of making this pinger listen to its own pings. + * + * @return The ping destinations. + */ + public List<Destination> getReplyDestinations() + { + return _pingDestinations; + } + + /** + * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with + * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the + * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving + * message should stop, not that the application should termiante. + * + * @return A shutdown hook for the ping loop. + */ + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + stop(); + terminate = true; + } + }); + } + + /** + * Performs an aribtrary action once the 'numMesagesToAction' count is reached on sending messages. This default + * implementation does nothing. + */ + public void takeAction() + { } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java index 215dbcefa3..5ba4004c56 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java @@ -1,311 +1,311 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.requestreply.PingPongProducer;
-
-import org.apache.qpid.junit.extensions.TimingController;
-import org.apache.qpid.junit.extensions.TimingControllerAware;
-import org.apache.qpid.junit.extensions.util.ParsedProperties;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing
- * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for
- * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from
- * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than
- * waiting until all expected replies are received.
- *
- * <p/>This test does not output timings for every single ping message, as when running at high volume, writing the test
- * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The
- * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the
- * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}.
- *
- * <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the
- * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput
- * (messages / time) can be calculated in order to examine the relationship between throughput and latency.
- *
- * <p/><table id="crc"><caption>CRC Card</caption> <tr><td> Responsibilities <th> Collaborations <tr><td> Send many ping
- * messages and output timings for sampled individual pings. </table>
- */
-public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware
-{
- private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class);
-
- /** Holds the name of the property to get the test results logging batch size. */
- public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize";
-
- /** Holds the default test results logging batch size. */
- public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000;
-
- /** Used to hold the timing controller passed from the test runner. */
- private TimingController _timingController;
-
- /** Used to generate unique correlation ids for each test run. */
- private AtomicLong corellationIdGenerator = new AtomicLong();
-
- /**
- * Holds test specifics by correlation id. This consists of the expected number of messages and the timing
- * controler.
- */
- private Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
-
- /** Holds the batched results listener, that does logging on batch boundaries. */
- private BatchedResultsListener batchedResultsListener = null;
-
- /**
- * Creates a new asynchronous ping performance test with the specified name.
- *
- * @param name The test name.
- */
- public PingLatencyTestPerf(String name)
- {
- super(name);
-
- // Sets up the test parameters with defaults.
- ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME,
- Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE));
- }
-
- /** Compile all the tests into a test suite. */
- public static Test suite()
- {
- // Build a new test suite
- TestSuite suite = new TestSuite("Ping Latency Tests");
-
- // Run performance tests in read committed mode.
- suite.addTest(new PingLatencyTestPerf("testPingLatency"));
-
- return suite;
- }
-
- /**
- * Accepts a timing controller from the test runner.
- *
- * @param timingController The timing controller to register mutliple timings with.
- */
- public void setTimingController(TimingController timingController)
- {
- _timingController = timingController;
- }
-
- /**
- * Gets the timing controller passed in by the test runner.
- *
- * @return The timing controller passed in by the test runner.
- */
- public TimingController getTimingController()
- {
- return _timingController;
- }
-
- /**
- * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all
- * replies have been received or a time out occurs before exiting this method.
- *
- * @param numPings The number of pings to send.
- */
- public void testPingLatency(int numPings) throws Exception
- {
- _logger.debug("public void testPingLatency(int numPings): called");
-
- // Ensure that at least one ping was requeusted.
- if (numPings == 0)
- {
- _logger.error("Number of pings requested was zero.");
- }
-
- // Get the per thread test setup to run the test through.
- PerThreadSetup perThreadSetup = threadSetup.get();
- PingClient pingClient = perThreadSetup._pingClient;
-
- // Advance the correlation id of messages to send, to make it unique for this run.
- String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet());
- _logger.debug("messageCorrelationId = " + messageCorrelationId);
-
- // Initialize the count and timing controller for the new correlation id.
- PerCorrelationId perCorrelationId = new PerCorrelationId();
- TimingController tc = getTimingController().getControllerForCurrentThread();
- perCorrelationId._tc = tc;
- perCorrelationId._expectedCount = numPings;
- perCorrelationIds.put(messageCorrelationId, perCorrelationId);
-
- // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these
- // messages.
- pingClient.setChainedMessageListener(batchedResultsListener);
-
- // Generate a sample message of the specified size.
- Message msg =
- pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
-
- // Send the requested number of messages, and wait until they have all been received.
- long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
-
- // Check that all the replies were received and log a fail if they were not.
- if (numReplies < numPings)
- {
- tc.completeTest(false, 0);
- }
-
- // Remove the chained message listener from the ping producer.
- pingClient.removeChainedMessageListener();
-
- // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up.
- perCorrelationIds.remove(messageCorrelationId);
- }
-
- /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */
- public void threadSetUp()
- {
- _logger.debug("public void threadSetUp(): called");
-
- try
- {
- // Call the set up method in the super class. This creates a PingClient pinger.
- super.threadSetUp();
-
- // Create the chained message listener, only if it has not already been created. This is set up with the
- // batch size property, to tell it what batch size to output results on. A synchronized block is used to
- // ensure that only one thread creates this.
- synchronized (this)
- {
- if (batchedResultsListener == null)
- {
- int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME));
- batchedResultsListener = new BatchedResultsListener(batchSize);
- }
- }
-
- // Get the set up that the super class created.
- PerThreadSetup perThreadSetup = threadSetup.get();
-
- // Register the chained message listener on the pinger to do its asynchronous test timings from.
- perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener);
- }
- catch (Exception e)
- {
- _logger.warn("There was an exception during per thread setup.", e);
- }
- }
-
- /**
- * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can
- * be attached to the pinger, in order to receive notifications about every message received and the number
- * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener
- * outputs a test timing for the actual number of messages received in the current batch.
- */
- private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener
- {
- /** The test results logging batch size. */
- int _batchSize;
- private boolean _strictAMQP;
-
- /**
- * Creates a results listener on the specified batch size.
- *
- * @param batchSize The batch size to use.
- */
- public BatchedResultsListener(int batchSize)
- {
- _batchSize = batchSize;
- _strictAMQP =
- Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP,
- AMQSession.STRICT_AMQP_DEFAULT));
- }
-
- /**
- * This callback method is called from all of the pingers that this test creates. It uses the correlation id
- * from the message to identify the timing controller for the test thread that was responsible for sending those
- * messages.
- *
- * @param message The message.
- * @param remainingCount The count of messages remaining to be received with a particular correlation id.
- *
- * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through.
- */
- public void onMessage(Message message, int remainingCount, long latency) throws JMSException
- {
- _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called");
-
- // Check if a batch boundary has been crossed.
- if ((remainingCount % _batchSize) == 0)
- {
- // Extract the correlation id from the message.
- String correlationId = message.getJMSCorrelationID();
-
- // Get the details for the correlation id and check that they are not null. They can become null
- // if a test times out.
- PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId);
- if (perCorrelationId != null)
- {
- // Get the timing controller and expected count for this correlation id.
- TimingController tc = perCorrelationId._tc;
- int expected = perCorrelationId._expectedCount;
-
- // Calculate how many messages were actually received in the last batch. This will be the batch size
- // except where the number expected is not a multiple of the batch size and this is the first remaining
- // count to cross a batch size boundary, in which case it will be the number expected modulo the batch
- // size.
- int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize;
-
- // Register a test result for the correlation id.
- try
- {
- tc.completeTest(true, receivedInBatch, latency);
- }
- catch (InterruptedException e)
- {
- // Ignore this. It means the test runner wants to stop as soon as possible.
- _logger.warn("Got InterruptedException.", e);
- }
- }
- // Else ignore, test timed out. Should log a fail here?
- }
- }
- }
-
- /**
- * Holds state specific to each correlation id, needed to output test results. This consists of the count of the
- * total expected number of messages, and the timing controller for the thread sending those message ids.
- */
- private static class PerCorrelationId
- {
- public int _expectedCount;
- public TimingController _tc;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.requestreply.PingPongProducer; + +import org.apache.qpid.junit.extensions.TimingController; +import org.apache.qpid.junit.extensions.TimingControllerAware; +import org.apache.qpid.junit.extensions.util.ParsedProperties; + +import javax.jms.JMSException; +import javax.jms.Message; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing + * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for + * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from + * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than + * waiting until all expected replies are received. + * + * <p/>This test does not output timings for every single ping message, as when running at high volume, writing the test + * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The + * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the + * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. + * + * <p/>The size parameter logged for each individual ping is set to the size of the batch of messages that the + * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput + * (messages / time) can be calculated in order to examine the relationship between throughput and latency. + * + * <p/><table id="crc"><caption>CRC Card</caption> <tr><td> Responsibilities <th> Collaborations <tr><td> Send many ping + * messages and output timings for sampled individual pings. </table> + */ +public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware +{ + private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class); + + /** Holds the name of the property to get the test results logging batch size. */ + public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; + + /** Holds the default test results logging batch size. */ + public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; + + /** Used to hold the timing controller passed from the test runner. */ + private TimingController _timingController; + + /** Used to generate unique correlation ids for each test run. */ + private AtomicLong corellationIdGenerator = new AtomicLong(); + + /** + * Holds test specifics by correlation id. This consists of the expected number of messages and the timing + * controler. + */ + private Map<String, PerCorrelationId> perCorrelationIds = + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + + /** Holds the batched results listener, that does logging on batch boundaries. */ + private BatchedResultsListener batchedResultsListener = null; + + /** + * Creates a new asynchronous ping performance test with the specified name. + * + * @param name The test name. + */ + public PingLatencyTestPerf(String name) + { + super(name); + + // Sets up the test parameters with defaults. + ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, + Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); + } + + /** Compile all the tests into a test suite. */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Latency Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingLatencyTestPerf("testPingLatency")); + + return suite; + } + + /** + * Accepts a timing controller from the test runner. + * + * @param timingController The timing controller to register mutliple timings with. + */ + public void setTimingController(TimingController timingController) + { + _timingController = timingController; + } + + /** + * Gets the timing controller passed in by the test runner. + * + * @return The timing controller passed in by the test runner. + */ + public TimingController getTimingController() + { + return _timingController; + } + + /** + * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all + * replies have been received or a time out occurs before exiting this method. + * + * @param numPings The number of pings to send. + */ + public void testPingLatency(int numPings) throws Exception + { + _logger.debug("public void testPingLatency(int numPings): called"); + + // Ensure that at least one ping was requeusted. + if (numPings == 0) + { + _logger.error("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + PingClient pingClient = perThreadSetup._pingClient; + + // Advance the correlation id of messages to send, to make it unique for this run. + String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); + _logger.debug("messageCorrelationId = " + messageCorrelationId); + + // Initialize the count and timing controller for the new correlation id. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + TimingController tc = getTimingController().getControllerForCurrentThread(); + perCorrelationId._tc = tc; + perCorrelationId._expectedCount = numPings; + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these + // messages. + pingClient.setChainedMessageListener(batchedResultsListener); + + // Generate a sample message of the specified size. + Message msg = + pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the requested number of messages, and wait until they have all been received. + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null); + + // Check that all the replies were received and log a fail if they were not. + if (numReplies < numPings) + { + tc.completeTest(false, 0); + } + + // Remove the chained message listener from the ping producer. + pingClient.removeChainedMessageListener(); + + // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. + perCorrelationIds.remove(messageCorrelationId); + } + + /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + // Call the set up method in the super class. This creates a PingClient pinger. + super.threadSetUp(); + + // Create the chained message listener, only if it has not already been created. This is set up with the + // batch size property, to tell it what batch size to output results on. A synchronized block is used to + // ensure that only one thread creates this. + synchronized (this) + { + if (batchedResultsListener == null) + { + int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); + batchedResultsListener = new BatchedResultsListener(batchSize); + } + } + + // Get the set up that the super class created. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Register the chained message listener on the pinger to do its asynchronous test timings from. + perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can + * be attached to the pinger, in order to receive notifications about every message received and the number + * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener + * outputs a test timing for the actual number of messages received in the current batch. + */ + private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener + { + /** The test results logging batch size. */ + int _batchSize; + private boolean _strictAMQP; + + /** + * Creates a results listener on the specified batch size. + * + * @param batchSize The batch size to use. + */ + public BatchedResultsListener(int batchSize) + { + _batchSize = batchSize; + _strictAMQP = + Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, + AMQSession.STRICT_AMQP_DEFAULT)); + } + + /** + * This callback method is called from all of the pingers that this test creates. It uses the correlation id + * from the message to identify the timing controller for the test thread that was responsible for sending those + * messages. + * + * @param message The message. + * @param remainingCount The count of messages remaining to be received with a particular correlation id. + * + * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException + { + _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); + + // Check if a batch boundary has been crossed. + if ((remainingCount % _batchSize) == 0) + { + // Extract the correlation id from the message. + String correlationId = message.getJMSCorrelationID(); + + // Get the details for the correlation id and check that they are not null. They can become null + // if a test times out. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); + if (perCorrelationId != null) + { + // Get the timing controller and expected count for this correlation id. + TimingController tc = perCorrelationId._tc; + int expected = perCorrelationId._expectedCount; + + // Calculate how many messages were actually received in the last batch. This will be the batch size + // except where the number expected is not a multiple of the batch size and this is the first remaining + // count to cross a batch size boundary, in which case it will be the number expected modulo the batch + // size. + int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; + + // Register a test result for the correlation id. + try + { + tc.completeTest(true, receivedInBatch, latency); + } + catch (InterruptedException e) + { + // Ignore this. It means the test runner wants to stop as soon as possible. + _logger.warn("Got InterruptedException.", e); + } + } + // Else ignore, test timed out. Should log a fail here? + } + } + } + + /** + * Holds state specific to each correlation id, needed to output test results. This consists of the count of the + * total expected number of messages, and the timing controller for the thread sending those message ids. + */ + private static class PerCorrelationId + { + public int _expectedCount; + public TimingController _tc; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java index 2879f0c322..04e723069a 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java @@ -1,93 +1,93 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.ping;
-
-import java.util.Properties;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.message.TestMessageFactory;
-import org.apache.qpid.util.CommandLineParser;
-
-/**
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * </table>
- */
-public class PingSendOnlyClient extends PingDurableClient
-{
- private static final Logger log = Logger.getLogger(PingSendOnlyClient.class);
-
- public PingSendOnlyClient(Properties overrides) throws Exception
- {
- super(overrides);
- }
-
- /**
- * Starts the ping/wait/receive process.
- *
- * @param args The command line arguments.
- */
- public static void main(String[] args)
- {
- try
- {
- // Create a ping producer overriding its defaults with all options passed on the command line.
- Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
- PingSendOnlyClient pingProducer = new PingSendOnlyClient(options);
-
- // Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
-
- // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- // pingProducer.getConnection().setExceptionListener(pingProducer);
-
- // Run the test procedure.
- int sent = pingProducer.send();
- pingProducer.waitForUser("Press return to close connection and quit.");
- pingProducer.closeConnection();
-
- System.exit(0);
- }
- catch (Exception e)
- {
- System.err.println(e.getMessage());
- log.error("Top level handler caught execption.", e);
- System.exit(1);
- }
- }
-
- public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
- {
- Message msg = TestMessageFactory.newTextMessage(_producerSession, messageSize);
-
- // Timestamp the message in nanoseconds.
- msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
-
- return msg;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.ping; + +import java.util.Properties; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.ObjectMessage; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.message.TestMessageFactory; +import org.apache.qpid.util.CommandLineParser; + +/** + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * </table> + */ +public class PingSendOnlyClient extends PingDurableClient +{ + private static final Logger log = Logger.getLogger(PingSendOnlyClient.class); + + public PingSendOnlyClient(Properties overrides) throws Exception + { + super(overrides); + } + + /** + * Starts the ping/wait/receive process. + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + // Create a ping producer overriding its defaults with all options passed on the command line. + Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); + PingSendOnlyClient pingProducer = new PingSendOnlyClient(options); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + // pingProducer.getConnection().setExceptionListener(pingProducer); + + // Run the test procedure. + int sent = pingProducer.send(); + pingProducer.waitForUser("Press return to close connection and quit."); + pingProducer.closeConnection(); + + System.exit(0); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException + { + Message msg = TestMessageFactory.newTextMessage(_producerSession, messageSize); + + // Timestamp the message in nanoseconds. + msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + + return msg; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java index fb071a87fe..94b8ea662e 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java @@ -1,196 +1,196 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import junit.framework.Assert;
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.requestreply.PingPongProducer;
-
-import org.apache.qpid.junit.extensions.AsymptoticTestCase;
-import org.apache.qpid.junit.extensions.TestThreadAware;
-import org.apache.qpid.junit.extensions.util.ParsedProperties;
-import org.apache.qpid.junit.extensions.util.TestContextProperties;
-
-import javax.jms.*;
-
-/**
- * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times
- * simultaneously to simluate many clients/producers/connections.
- *
- * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single
- * full round trip ping. This test may be scaled up using a suitable JUnit test runner.
- *
- * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
- * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run,
- * except if the connection is lost in which case an attempt to re-establish the setup is made.
- *
- * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
- * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the
- * temporary queue.
- *
- * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * </table>
- */
-public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware
-{
- private static Logger _logger = Logger.getLogger(PingTestPerf.class);
-
- /** Thread local to hold the per-thread test setup fields. */
- ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
-
- /** Holds a property reader to extract the test parameters from. */
- protected ParsedProperties testParameters =
- TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
-
- public PingTestPerf(String name)
- {
- super(name);
-
- _logger.debug("testParameters = " + testParameters);
- }
-
- /**
- * Compile all the tests into a test suite.
- * @return The test method testPingOk.
- */
- public static Test suite()
- {
- // Build a new test suite
- TestSuite suite = new TestSuite("Ping Performance Tests");
-
- // Run performance tests in read committed mode.
- suite.addTest(new PingTestPerf("testPingOk"));
-
- return suite;
- }
-
- public void testPingOk(int numPings) throws Exception
- {
- if (numPings == 0)
- {
- Assert.fail("Number of pings requested was zero.");
- }
-
- // Get the per thread test setup to run the test through.
- PerThreadSetup perThreadSetup = threadSetup.get();
-
- if (perThreadSetup == null)
- {
- Assert.fail("Could not get per thread test setup, it was null.");
- }
-
- // Generate a sample message. This message is already time stamped and has its reply-to destination set.
- Message msg =
- perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
-
- // start the test
- long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME));
- int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null);
-
- // Fail the test if the timeout was exceeded.
- if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings))
- {
- Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = "
- + numReplies);
- }
- }
-
- /**
- * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
- */
- public void threadSetUp()
- {
- _logger.debug("public void threadSetUp(): called");
-
- try
- {
- PerThreadSetup perThreadSetup = new PerThreadSetup();
-
- // This is synchronized because there is a race condition, which causes one connection to sleep if
- // all threads try to create connection concurrently.
- synchronized (this)
- {
- // Establish a client to ping a Destination and listen the reply back from same Destination
- perThreadSetup._pingClient = new PingClient(testParameters);
- perThreadSetup._pingClient.establishConnection(true, true);
- }
- // Start the client connection
- perThreadSetup._pingClient.start();
-
- // Attach the per-thread set to the thread.
- threadSetup.set(perThreadSetup);
- }
- catch (Exception e)
- {
- _logger.warn("There was an exception during per thread setup.", e);
- }
- }
-
- /**
- * Performs test fixture clean
- */
- public void threadTearDown()
- {
- _logger.debug("public void threadTearDown(): called");
-
- try
- {
- // Get the per thread test fixture.
- PerThreadSetup perThreadSetup = threadSetup.get();
-
- // Close the pingers so that it cleans up its connection cleanly.
- synchronized (this)
- {
- if ((perThreadSetup != null) && (perThreadSetup._pingClient != null))
- {
- perThreadSetup._pingClient.close();
- }
- }
- }
- catch (JMSException e)
- {
- _logger.warn("There was an exception during per thread tear down.");
- }
- finally
- {
- // Ensure the per thread fixture is reclaimed.
- threadSetup.remove();
- }
- }
-
- protected static class PerThreadSetup
- {
- /**
- * Holds the test ping client.
- */
- protected PingClient _pingClient;
- protected String _correlationId;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.ping; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.requestreply.PingPongProducer; + +import org.apache.qpid.junit.extensions.AsymptoticTestCase; +import org.apache.qpid.junit.extensions.TestThreadAware; +import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.junit.extensions.util.TestContextProperties; + +import javax.jms.*; + +/** + * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times + * simultaneously to simluate many clients/producers/connections. + * + * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single + * full round trip ping. This test may be scaled up using a suitable JUnit test runner. + * + * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a + * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, + * except if the connection is lost in which case an attempt to re-establish the setup is made. + * + * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that + * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the + * temporary queue. + * + * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * </table> + */ +public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware +{ + private static Logger _logger = Logger.getLogger(PingTestPerf.class); + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>(); + + /** Holds a property reader to extract the test parameters from. */ + protected ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); + + public PingTestPerf(String name) + { + super(name); + + _logger.debug("testParameters = " + testParameters); + } + + /** + * Compile all the tests into a test suite. + * @return The test method testPingOk. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingTestPerf("testPingOk")); + + return suite; + } + + public void testPingOk(int numPings) throws Exception + { + if (numPings == 0) + { + Assert.fail("Number of pings requested was zero."); + } + + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + + if (perThreadSetup == null) + { + Assert.fail("Could not get per thread test setup, it was null."); + } + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + Message msg = + perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // start the test + long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); + int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null); + + // Fail the test if the timeout was exceeded. + if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) + { + Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " + + numReplies); + } + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + _logger.debug("public void threadSetUp(): called"); + + try + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // This is synchronized because there is a race condition, which causes one connection to sleep if + // all threads try to create connection concurrently. + synchronized (this) + { + // Establish a client to ping a Destination and listen the reply back from same Destination + perThreadSetup._pingClient = new PingClient(testParameters); + perThreadSetup._pingClient.establishConnection(true, true); + } + // Start the client connection + perThreadSetup._pingClient.start(); + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * Performs test fixture clean + */ + public void threadTearDown() + { + _logger.debug("public void threadTearDown(): called"); + + try + { + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + if ((perThreadSetup != null) && (perThreadSetup._pingClient != null)) + { + perThreadSetup._pingClient.close(); + } + } + } + catch (JMSException e) + { + _logger.warn("There was an exception during per thread tear down."); + } + finally + { + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); + } + } + + protected static class PerThreadSetup + { + /** + * Holds the test ping client. + */ + protected PingClient _pingClient; + protected String _correlationId; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java index 0712557383..8e010ccf07 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java @@ -1,453 +1,453 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.requestreply;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import javax.jms.*;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.Session;
-import org.apache.qpid.topic.Config;
-import org.apache.qpid.exchange.ExchangeDefaults;
-
-/**
- * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
- * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
- * too.
- *
- * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages
- * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique
- * temporary queue or the correlation id to correlate the original message to the reply.
- *
- * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
- * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
- * be disabled for real timing tests as writing to the console will slow things down.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Bounce back messages to their reply to destination.
- * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
- * </table>
- *
- * @todo Replace the command line parsing with a neater tool.
- *
- * @todo Make verbose accept a number of messages, only prints to console every X messages.
- */
-public class PingPongBouncer implements MessageListener
-{
- private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);
-
- /** The default prefetch size for the message consumer. */
- private static final int PREFETCH = 1;
-
- /** The default no local flag for the message consumer. */
- private static final boolean NO_LOCAL = true;
-
- private static final String DEFAULT_DESTINATION_NAME = "ping";
-
- /** The default exclusive flag for the message consumer. */
- private static final boolean EXCLUSIVE = false;
-
- /** A convenient formatter to use when time stamping output. */
- protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
-
- /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
- private boolean _verbose = false;
-
- /** Determines whether this bounce back client bounces back messages persistently. */
- private boolean _persistent = false;
-
- private Destination _consumerDestination;
-
- /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
- private Destination _lastResponseDest;
-
- /** The producer for sending replies with. */
- private MessageProducer _replyProducer;
-
- /** The consumer controlSession. */
- private Session _consumerSession;
-
- /** The producer controlSession. */
- private Session _producerSession;
-
- /** Holds the connection to the broker. */
- private AMQConnection _connection;
-
- /** Flag used to indicate if this is a point to point or pub/sub ping client. */
- private boolean _isPubSub = false;
-
- /**
- * This flag is used to indicate that the user should be prompted to kill a broker, in order to test
- * failover, immediately before committing a transaction.
- */
- protected boolean _failBeforeCommit = false;
-
- /**
- * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test
- * failover, immediate after committing a transaction.
- */
- protected boolean _failAfterCommit = false;
-
- /**
- * Creates a PingPongBouncer on the specified producer and consumer sessions.
- *
- * @param brokerDetails The addresses of the brokers to connect to.
- * @param username The broker username.
- * @param password The broker password.
- * @param virtualpath The virtual host name within the broker.
- * @param destinationName The name of the queue to receive pings on
- * (or root of the queue name where many queues are generated).
- * @param persistent A flag to indicate that persistent message should be used.
- * @param transacted A flag to indicate that pings should be sent within transactions.
- * @param selector A message selector to filter received pings with.
- * @param verbose A flag to indicate that message timings should be sent to the console.
- *
- * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
- */
- public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
- String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose,
- boolean pubsub) throws Exception
- {
- // Create a client id to uniquely identify this client.
- InetAddress address = InetAddress.getLocalHost();
- String clientId = address.getHostName() + System.currentTimeMillis();
- _verbose = verbose;
- _persistent = persistent;
- setPubSub(pubsub);
- // Connect to the broker.
- setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
- _logger.info("Connected with URL:" + getConnection().toURL());
-
- // Set up the failover notifier.
- getConnection().setConnectionListener(new FailoverNotifier());
-
- // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the
- // command line option.
- _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
-
- // Create the queue to listen for message on.
- createConsumerDestination(destinationName);
- MessageConsumer consumer =
- _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
-
- // Create a producer for the replies, without a default destination.
- _replyProducer = _producerSession.createProducer(null);
- _replyProducer.setDisableMessageTimestamp(true);
- _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- // Set this up to listen for messages on the queue.
- consumer.setMessageListener(this);
- }
-
- /**
- * Starts a stand alone ping-pong client running in verbose mode.
- *
- * @param args
- */
- public static void main(String[] args) throws Exception
- {
- System.out.println("Starting...");
-
- // Display help on the command line.
- if (args.length == 0)
- {
- _logger.info("Running test with default values...");
- //usage();
- //System.exit(0);
- }
-
- // Extract all command line parameters.
- Config config = new Config();
- config.setOptions(args);
- String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "test";
- String destinationName = config.getDestination();
- if (destinationName == null)
- {
- destinationName = DEFAULT_DESTINATION_NAME;
- }
-
- String selector = config.getSelector();
- boolean transacted = config.isTransacted();
- boolean persistent = config.usePersistentMessages();
- boolean pubsub = config.isPubSub();
- boolean verbose = true;
-
- //String selector = null;
-
- // Instantiate the ping pong client with the command line options and start it running.
- PingPongBouncer pingBouncer =
- new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted,
- selector, verbose, pubsub);
- pingBouncer.getConnection().start();
-
- System.out.println("Waiting...");
- }
-
- private static void usage()
- {
- System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n"
- + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n"
- + "-persistent : (true/false). Default is false\n"
- + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n");
- }
-
- /**
- * This is a callback method that is notified of all messages for which this has been registered as a message
- * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
- * destination of the message.
- *
- * @param message The message that triggered this callback.
- */
- public void onMessage(Message message)
- {
- try
- {
- String messageCorrelationId = message.getJMSCorrelationID();
- if (_verbose)
- {
- _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "
- + messageCorrelationId);
- }
-
- // Get the reply to destination from the message and check it is set.
- Destination responseDest = message.getJMSReplyTo();
-
- if (responseDest == null)
- {
- _logger.debug("Cannot send reply because reply-to destination is null.");
-
- return;
- }
-
- // Spew out some timing information if verbose mode is on.
- if (_verbose)
- {
- Long timestamp = message.getLongProperty("timestamp");
-
- if (timestamp != null)
- {
- long diff = System.currentTimeMillis() - timestamp;
- _logger.info("Time to bounce point: " + diff);
- }
- }
-
- // Correlate the reply to the original.
- message.setJMSCorrelationID(messageCorrelationId);
-
- // Send the receieved message as the pong reply.
- _replyProducer.send(responseDest, message);
-
- if (_verbose)
- {
- _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, "
- + messageCorrelationId);
- }
-
- // Commit the transaction if running in transactional mode.
- commitTx(_producerSession);
- }
- catch (JMSException e)
- {
- _logger.debug("There was a JMSException: " + e.getMessage(), e);
- }
- }
-
- /**
- * Gets the underlying connection that this ping client is running on.
- *
- * @return The underlying connection that this ping client is running on.
- */
- public AMQConnection getConnection()
- {
- return _connection;
- }
-
- /**
- * Sets the connection that this ping client is using.
- *
- * @param connection The ping connection.
- */
- public void setConnection(AMQConnection connection)
- {
- this._connection = connection;
- }
-
- /**
- * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic.
- *
- * @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
- */
- public void setPubSub(boolean pubsub)
- {
- _isPubSub = pubsub;
- }
-
- /**
- * Checks whether this client is a p2p or pub/sub ping client.
- *
- * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
- */
- public boolean isPubSub()
- {
- return _isPubSub;
- }
-
- /**
- * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not
- * a transactional controlSession, this method does nothing.
- *
- * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
- * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
- * after the commit is applied.
- *
- * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
- */
- protected void commitTx(Session session) throws JMSException
- {
- if (session.getTransacted())
- {
- try
- {
- if (_failBeforeCommit)
- {
- _logger.debug("Failing Before Commit");
- doFailover();
- }
-
- session.commit();
-
- if (_failAfterCommit)
- {
- _logger.debug("Failing After Commit");
- doFailover();
- }
-
- _logger.debug("Session Commited.");
- }
- catch (JMSException e)
- {
- _logger.trace("JMSException on commit:" + e.getMessage(), e);
-
- try
- {
- session.rollback();
- _logger.debug("Message rolled back.");
- }
- catch (JMSException jmse)
- {
- _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
-
- // Both commit and rollback failed. Throw the rollback exception.
- throw jmse;
- }
- }
- }
- }
-
- /**
- * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block
- * until the user supplied some input on the terminal.
- *
- * @param broker The name of the broker to terminate.
- */
- protected void doFailover(String broker)
- {
- System.out.println("Kill Broker " + broker + " now.");
- try
- {
- System.in.read();
- }
- catch (IOException e)
- { }
-
- System.out.println("Continuing.");
- }
-
- /**
- * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
- * until the user supplied some input on the terminal.
- */
- protected void doFailover()
- {
- System.out.println("Kill Broker now.");
- try
- {
- System.in.read();
- }
- catch (IOException e)
- { }
-
- System.out.println("Continuing.");
-
- }
-
- private void createConsumerDestination(String name)
- {
- if (isPubSub())
- {
- _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name);
- }
- else
- {
- _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name);
- }
- }
-
- /**
- * A connection listener that logs out any failover complete events. Could do more interesting things with this
- * at some point...
- */
- public static class FailoverNotifier implements ConnectionListener
- {
- public void bytesSent(long count)
- { }
-
- public void bytesReceived(long count)
- { }
-
- public boolean preFailover(boolean redirect)
- {
- return true;
- }
-
- public boolean preResubscribe()
- {
- return true;
- }
-
- public void failoverComplete()
- {
- _logger.info("App got failover complete callback.");
- }
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import java.io.IOException; +import java.net.InetAddress; +import java.text.SimpleDateFormat; +import java.util.Date; + +import javax.jms.*; + +import org.apache.log4j.Logger; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.jms.ConnectionListener; +import org.apache.qpid.jms.Session; +import org.apache.qpid.topic.Config; +import org.apache.qpid.exchange.ExchangeDefaults; + +/** + * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return + * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes + * too. + * + * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages + * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique + * temporary queue or the correlation id to correlate the original message to the reply. + * + * <p/>There is a verbose mode flag which causes information about each ping to be output to the console + * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should + * be disabled for real timing tests as writing to the console will slow things down. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Bounce back messages to their reply to destination. + * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url. + * </table> + * + * @todo Replace the command line parsing with a neater tool. + * + * @todo Make verbose accept a number of messages, only prints to console every X messages. + */ +public class PingPongBouncer implements MessageListener +{ + private static final Logger _logger = Logger.getLogger(PingPongBouncer.class); + + /** The default prefetch size for the message consumer. */ + private static final int PREFETCH = 1; + + /** The default no local flag for the message consumer. */ + private static final boolean NO_LOCAL = true; + + private static final String DEFAULT_DESTINATION_NAME = "ping"; + + /** The default exclusive flag for the message consumer. */ + private static final boolean EXCLUSIVE = false; + + /** A convenient formatter to use when time stamping output. */ + protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); + + /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ + private boolean _verbose = false; + + /** Determines whether this bounce back client bounces back messages persistently. */ + private boolean _persistent = false; + + private Destination _consumerDestination; + + /** Keeps track of the response destination of the previous message for the last reply to producer cache. */ + private Destination _lastResponseDest; + + /** The producer for sending replies with. */ + private MessageProducer _replyProducer; + + /** The consumer controlSession. */ + private Session _consumerSession; + + /** The producer controlSession. */ + private Session _producerSession; + + /** Holds the connection to the broker. */ + private AMQConnection _connection; + + /** Flag used to indicate if this is a point to point or pub/sub ping client. */ + private boolean _isPubSub = false; + + /** + * This flag is used to indicate that the user should be prompted to kill a broker, in order to test + * failover, immediately before committing a transaction. + */ + protected boolean _failBeforeCommit = false; + + /** + * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test + * failover, immediate after committing a transaction. + */ + protected boolean _failAfterCommit = false; + + /** + * Creates a PingPongBouncer on the specified producer and consumer sessions. + * + * @param brokerDetails The addresses of the brokers to connect to. + * @param username The broker username. + * @param password The broker password. + * @param virtualpath The virtual host name within the broker. + * @param destinationName The name of the queue to receive pings on + * (or root of the queue name where many queues are generated). + * @param persistent A flag to indicate that persistent message should be used. + * @param transacted A flag to indicate that pings should be sent within transactions. + * @param selector A message selector to filter received pings with. + * @param verbose A flag to indicate that message timings should be sent to the console. + * + * @throws Exception All underlying exceptions allowed to fall through. This is only test code... + */ + public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath, + String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose, + boolean pubsub) throws Exception + { + // Create a client id to uniquely identify this client. + InetAddress address = InetAddress.getLocalHost(); + String clientId = address.getHostName() + System.currentTimeMillis(); + _verbose = verbose; + _persistent = persistent; + setPubSub(pubsub); + // Connect to the broker. + setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath)); + _logger.info("Connected with URL:" + getConnection().toURL()); + + // Set up the failover notifier. + getConnection().setConnectionListener(new FailoverNotifier()); + + // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the + // command line option. + _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + // Create the queue to listen for message on. + createConsumerDestination(destinationName); + MessageConsumer consumer = + _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector); + + // Create a producer for the replies, without a default destination. + _replyProducer = _producerSession.createProducer(null); + _replyProducer.setDisableMessageTimestamp(true); + _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + // Set this up to listen for messages on the queue. + consumer.setMessageListener(this); + } + + /** + * Starts a stand alone ping-pong client running in verbose mode. + * + * @param args + */ + public static void main(String[] args) throws Exception + { + System.out.println("Starting..."); + + // Display help on the command line. + if (args.length == 0) + { + _logger.info("Running test with default values..."); + //usage(); + //System.exit(0); + } + + // Extract all command line parameters. + Config config = new Config(); + config.setOptions(args); + String brokerDetails = config.getHost() + ":" + config.getPort(); + String virtualpath = "test"; + String destinationName = config.getDestination(); + if (destinationName == null) + { + destinationName = DEFAULT_DESTINATION_NAME; + } + + String selector = config.getSelector(); + boolean transacted = config.isTransacted(); + boolean persistent = config.usePersistentMessages(); + boolean pubsub = config.isPubSub(); + boolean verbose = true; + + //String selector = null; + + // Instantiate the ping pong client with the command line options and start it running. + PingPongBouncer pingBouncer = + new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted, + selector, verbose, pubsub); + pingBouncer.getConnection().start(); + + System.out.println("Waiting..."); + } + + private static void usage() + { + System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" + + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" + + "-persistent : (true/false). Default is false\n" + + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n"); + } + + /** + * This is a callback method that is notified of all messages for which this has been registered as a message + * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to + * destination of the message. + * + * @param message The message that triggered this callback. + */ + public void onMessage(Message message) + { + try + { + String messageCorrelationId = message.getJMSCorrelationID(); + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, " + + messageCorrelationId); + } + + // Get the reply to destination from the message and check it is set. + Destination responseDest = message.getJMSReplyTo(); + + if (responseDest == null) + { + _logger.debug("Cannot send reply because reply-to destination is null."); + + return; + } + + // Spew out some timing information if verbose mode is on. + if (_verbose) + { + Long timestamp = message.getLongProperty("timestamp"); + + if (timestamp != null) + { + long diff = System.currentTimeMillis() - timestamp; + _logger.info("Time to bounce point: " + diff); + } + } + + // Correlate the reply to the original. + message.setJMSCorrelationID(messageCorrelationId); + + // Send the receieved message as the pong reply. + _replyProducer.send(responseDest, message); + + if (_verbose) + { + _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, " + + messageCorrelationId); + } + + // Commit the transaction if running in transactional mode. + commitTx(_producerSession); + } + catch (JMSException e) + { + _logger.debug("There was a JMSException: " + e.getMessage(), e); + } + } + + /** + * Gets the underlying connection that this ping client is running on. + * + * @return The underlying connection that this ping client is running on. + */ + public AMQConnection getConnection() + { + return _connection; + } + + /** + * Sets the connection that this ping client is using. + * + * @param connection The ping connection. + */ + public void setConnection(AMQConnection connection) + { + this._connection = connection; + } + + /** + * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic. + * + * @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue. + */ + public void setPubSub(boolean pubsub) + { + _isPubSub = pubsub; + } + + /** + * Checks whether this client is a p2p or pub/sub ping client. + * + * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue. + */ + public boolean isPubSub() + { + return _isPubSub; + } + + /** + * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not + * a transactional controlSession, this method does nothing. + * + * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the + * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker + * after the commit is applied. + * + * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + */ + protected void commitTx(Session session) throws JMSException + { + if (session.getTransacted()) + { + try + { + if (_failBeforeCommit) + { + _logger.debug("Failing Before Commit"); + doFailover(); + } + + session.commit(); + + if (_failAfterCommit) + { + _logger.debug("Failing After Commit"); + doFailover(); + } + + _logger.debug("Session Commited."); + } + catch (JMSException e) + { + _logger.trace("JMSException on commit:" + e.getMessage(), e); + + try + { + session.rollback(); + _logger.debug("Message rolled back."); + } + catch (JMSException jmse) + { + _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); + + // Both commit and rollback failed. Throw the rollback exception. + throw jmse; + } + } + } + } + + /** + * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block + * until the user supplied some input on the terminal. + * + * @param broker The name of the broker to terminate. + */ + protected void doFailover(String broker) + { + System.out.println("Kill Broker " + broker + " now."); + try + { + System.in.read(); + } + catch (IOException e) + { } + + System.out.println("Continuing."); + } + + /** + * Prompts the user to terminate the broker, in order to test failover functionality. This method will block + * until the user supplied some input on the terminal. + */ + protected void doFailover() + { + System.out.println("Kill Broker now."); + try + { + System.in.read(); + } + catch (IOException e) + { } + + System.out.println("Continuing."); + + } + + private void createConsumerDestination(String name) + { + if (isPubSub()) + { + _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name); + } + else + { + _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name); + } + } + + /** + * A connection listener that logs out any failover complete events. Could do more interesting things with this + * at some point... + */ + public static class FailoverNotifier implements ConnectionListener + { + public void bytesSent(long count) + { } + + public void bytesReceived(long count) + { } + + public boolean preFailover(boolean redirect) + { + return true; + } + + public boolean preResubscribe() + { + return true; + } + + public void failoverComplete() + { + _logger.info("App got failover complete callback."); + } + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index f328675488..4d8a736ec8 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -1,1720 +1,1720 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.requestreply;
-
-import org.apache.log4j.Logger;
-import org.apache.log4j.NDC;
-
-import org.apache.qpid.test.framework.TestUtils;
-
-import org.apache.qpid.junit.extensions.BatchedThrottle;
-import org.apache.qpid.junit.extensions.Throttle;
-import org.apache.qpid.junit.extensions.util.CommandLineParser;
-import org.apache.qpid.junit.extensions.util.ParsedProperties;
-
-import javax.jms.*;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may
- * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens
- * to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping
- * pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour
- * configurable.
- *
- * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This
- * means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation
- * id in the ping to be bounced back in the reply correlation id.
- *
- * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It
- * can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within
- * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover
- * testing. A complete list of accepted parameters, default values and comments on their usage is provided here:
- *
- * <p/><table><caption>Parameters</caption>
- * <tr><th> Parameter <th> Default <th> Comments
- * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers.
- * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping.
- * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used.
- * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions.
- * <tr><td> broker <td> tcp://localhost:5672 <td> Determines the broker to connect to.
- * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over.
- * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
- * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message.
- * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default.
- * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch.
- * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch.
- * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send.
- * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send.
- * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once.
- * <tr><td> username <td> guest <td> The username to access the broker with.
- * <tr><td> password <td> guest <td> The password to access the broker with.
- * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with.
- * <tr><td> destinationCount <td> 1 <td> The number of destinations to send pings to.
- * <tr><td> numConsumers <td> 1 <td> The number of consumers on each destination.
- * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies.
- * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode.
- * <tr><td> uniqueDests <td> true <td> Whether each receivers only listens to one ping destination or all.
- * <tr><td> durableDests <td> false <td> Whether or not durable destinations are used.
- * <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
- * 0 - SESSION_TRANSACTED
- * 1 - AUTO_ACKNOWLEDGE
- * 2 - CLIENT_ACKNOWLEDGE
- * 3 - DUPS_OK_ACKNOWLEDGE
- * 257 - NO_ACKNOWLEDGE
- * 258 - PRE_ACKNOWLEDGE
- * <tr><td> consTransacted <td> false <td> Whether or not consumers use transactions. Defaults to the same value
- * as the 'transacted' option if not seperately defined.
- * <tr><td> consAckMode <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same
- * value as 'ackMode' if not seperately defined.
- * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received.
- * Limits the volume of messages currently buffered on the client
- * or broker. Can help scale test clients by limiting amount of buffered
- * data to avoid out of memory errors.
- * </table>
- *
- * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
- * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by
- * starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also
- * registered to terminate the ping-pong loop cleanly.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Provide a ping and wait for all responses cycle.
- * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
- * </table>
- *
- * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair.
- * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a
- * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last
- * message waits until all other messages have been handled before releasing producers but allows messages to be
- * processed concurrently, unlike the current synchronized block.
- */
-public class PingPongProducer implements Runnable, ExceptionListener
-{
- /** Used for debugging. */
- private static final Logger log = Logger.getLogger(PingPongProducer.class);
-
- /** Holds the name of the property to determine whether of not client id is overridden at connection time. */
- public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId";
-
- /** Holds the default value of the override client id flag. */
- public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false";
-
- /** Holds the name of the property to define the JNDI factory name with. */
- public static final String FACTORY_NAME_PROPNAME = "factoryName";
-
- /** Holds the default JNDI name of the connection factory. */
- public static final String FACTORY_NAME_DEAFULT = "local";
-
- /** Holds the name of the property to set the JNDI initial context properties with. */
- public static final String FILE_PROPERTIES_PROPNAME = "properties";
-
- /** Holds the default file name of the JNDI initial context properties. */
- public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties";
-
- /** Holds the name of the property to get the test message size from. */
- public static final String MESSAGE_SIZE_PROPNAME = "messageSize";
-
- /** Used to set up a default message size. */
- public static final int MESSAGE_SIZE_DEAFULT = 0;
-
- /** Holds the name of the property to get the ping queue name from. */
- public static final String PING_QUEUE_NAME_PROPNAME = "destinationName";
-
- /** Holds the name of the default destination to send pings on. */
- public static final String PING_QUEUE_NAME_DEFAULT = "ping";
-
- /** Holds the name of the property to get the queue name postfix from. */
- public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix";
-
- /** Holds the default queue name postfix value. */
- public static final String QUEUE_NAME_POSTFIX_DEFAULT = "";
-
- /** Holds the name of the property to get the test delivery mode from. */
- public static final String PERSISTENT_MODE_PROPNAME = "persistent";
-
- /** Holds the message delivery mode to use for the test. */
- public static final boolean PERSISTENT_MODE_DEFAULT = false;
-
- /** Holds the name of the property to get the test transactional mode from. */
- public static final String TRANSACTED_PROPNAME = "transacted";
-
- /** Holds the transactional mode to use for the test. */
- public static final boolean TRANSACTED_DEFAULT = false;
-
- /** Holds the name of the property to get the test consumer transacted mode from. */
- public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";
-
- /** Holds the consumer transactional mode default setting. */
- public static final boolean CONSUMER_TRANSACTED_DEFAULT = false;
-
- /** Holds the name of the property to get the test broker url from. */
- public static final String BROKER_PROPNAME = "broker";
-
- /** Holds the default broker url for the test. */
- public static final String BROKER_DEFAULT = "tcp://localhost:5672";
-
- /** Holds the name of the property to get the test broker virtual path. */
- public static final String VIRTUAL_HOST_PROPNAME = "virtualHost";
-
- /** Holds the default virtual path for the test. */
- public static final String VIRTUAL_HOST_DEFAULT = "";
-
- /** Holds the name of the property to get the message rate from. */
- public static final String RATE_PROPNAME = "rate";
-
- /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
- public static final int RATE_DEFAULT = 0;
-
- /** Holds the name of the property to get the verbose mode proeprty from. */
- public static final String VERBOSE_PROPNAME = "verbose";
-
- /** Holds the default verbose mode. */
- public static final boolean VERBOSE_DEFAULT = false;
-
- /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */
- public static final String PUBSUB_PROPNAME = "pubsub";
-
- /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
- public static final boolean PUBSUB_DEFAULT = false;
-
- /** Holds the name of the property to get the fail after commit flag from. */
- public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit";
-
- /** Holds the default failover after commit test flag. */
- public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false;
-
- /** Holds the name of the proeprty to get the fail before commit flag from. */
- public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit";
-
- /** Holds the default failover before commit test flag. */
- public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false;
-
- /** Holds the name of the proeprty to get the fail after send flag from. */
- public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend";
-
- /** Holds the default failover after send test flag. */
- public static final boolean FAIL_AFTER_SEND_DEFAULT = false;
-
- /** Holds the name of the property to get the fail before send flag from. */
- public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend";
-
- /** Holds the default failover before send test flag. */
- public static final boolean FAIL_BEFORE_SEND_DEFAULT = false;
-
- /** Holds the name of the property to get the fail once flag from. */
- public static final String FAIL_ONCE_PROPNAME = "failOnce";
-
- /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */
- public static final boolean FAIL_ONCE_DEFAULT = true;
-
- /** Holds the name of the property to get the broker access username from. */
- public static final String USERNAME_PROPNAME = "username";
-
- /** Holds the default broker log on username. */
- public static final String USERNAME_DEFAULT = "guest";
-
- /** Holds the name of the property to get the broker access password from. */
- public static final String PASSWORD_PROPNAME = "password";
-
- /** Holds the default broker log on password. */
- public static final String PASSWORD_DEFAULT = "guest";
-
- /** Holds the name of the proeprty to get the. */
- public static final String SELECTOR_PROPNAME = "selector";
-
- /** Holds the default message selector. */
- public static final String SELECTOR_DEFAULT = "";
-
- /** Holds the name of the property to get the destination count from. */
- public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";
-
- /** Defines the default number of destinations to ping. */
- public static final int DESTINATION_COUNT_DEFAULT = 1;
-
- /** Holds the name of the property to get the number of consumers per destination from. */
- public static final String NUM_CONSUMERS_PROPNAME = "numConsumers";
-
- /** Defines the default number consumers per destination. */
- public static final int NUM_CONSUMERS_DEFAULT = 1;
-
- /** Holds the name of the property to get the waiting timeout for response messages. */
- public static final String TIMEOUT_PROPNAME = "timeout";
-
- /** Default time to wait before assuming that a ping has timed out. */
- public static final long TIMEOUT_DEFAULT = 30000;
-
- /** Holds the name of the property to get the commit batch size from. */
- public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize";
-
- /** Defines the default number of pings to send in each transaction when running transactionally. */
- public static final int TX_BATCH_SIZE_DEFAULT = 1;
-
- /** Holds the name of the property to get the unique destinations flag from. */
- public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests";
-
- /** Defines the default value for the unique destinations property. */
- public static final boolean UNIQUE_DESTS_DEFAULT = true;
-
- /** Holds the name of the property to get the durable destinations flag from. */
- public static final String DURABLE_DESTS_PROPNAME = "durableDests";
-
- /** Defines the default value of the durable destinations flag. */
- public static final boolean DURABLE_DESTS_DEFAULT = false;
-
- /** Holds the name of the proeprty to get the message acknowledgement mode from. */
- public static final String ACK_MODE_PROPNAME = "ackMode";
-
- /** Defines the default message acknowledgement mode. */
- public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
-
- /** Holds the name of the property to get the consumers message acknowledgement mode from. */
- public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";
-
- /** Defines the default consumers message acknowledgement mode. */
- public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;
-
- /** Holds the name of the property to get the maximum pending message size setting from. */
- public static final String MAX_PENDING_PROPNAME = "maxPending";
-
- /** Defines the default value for the maximum pending message size setting. 0 means no limit. */
- public static final int MAX_PENDING_DEFAULT = 0;
-
- /** Defines the default prefetch size to use when consuming messages. */
- public static final int PREFETCH_DEFAULT = 100;
-
- /** Defines the default value of the no local flag to use when consuming messages. */
- public static final boolean NO_LOCAL_DEFAULT = false;
-
- /** Defines the default value of the exclusive flag to use when consuming messages. */
- public static final boolean EXCLUSIVE_DEFAULT = false;
-
- /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
- public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";
-
- /** Holds the default configuration properties. */
- public static ParsedProperties defaults = new ParsedProperties();
-
- static
- {
- defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT);
- defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT);
- defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT);
- defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
- defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
- defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
- defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
- defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
- defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT);
- defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
- defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
- defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT);
- defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
- defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
- defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT);
- defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
- defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
- defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
- defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
- defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
- defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
- defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
- defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
- defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT);
- defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT);
- defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
- defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
- defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT);
- defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
- defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
- defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
- }
-
- /** Allows setting of client ID on the connection, rather than through the connection URL. */
- protected boolean _overrideClientId;
-
- /** Holds the JNDI name of the JMS connection factory. */
- protected String _factoryName;
-
- /** Holds the name of the properties file to configure JNDI with. */
- protected String _fileProperties;
-
- /** Holds the broker url. */
- protected String _brokerDetails;
-
- /** Holds the username to access the broker with. */
- protected String _username;
-
- /** Holds the password to access the broker with. */
- protected String _password;
-
- /** Holds the virtual host on the broker to run the tests through. */
- protected String _virtualpath;
-
- /** Holds the root name from which to generate test destination names. */
- protected String _destinationName;
-
- /** Holds the default queue name postfix value. */
- protected String _queueNamePostfix;
-
- /** Holds the message selector to filter the pings with. */
- protected String _selector;
-
- /** Holds the producers transactional mode flag. */
- protected boolean _transacted;
-
- /** Holds the consumers transactional mode flag. */
- protected boolean _consTransacted;
-
- /** Determines whether this producer sends persistent messages. */
- protected boolean _persistent;
-
- /** Holds the acknowledgement mode used for the producers. */
- protected int _ackMode;
-
- /** Holds the acknowledgement mode setting for the consumers. */
- protected int _consAckMode;
-
- /** Determines what size of messages this producer sends. */
- protected int _messageSize;
-
- /** Used to indicate that the ping loop should print out whenever it pings. */
- protected boolean _verbose;
-
- /** Flag used to indicate if this is a point to point or pub/sub ping client. */
- protected boolean _isPubSub;
-
- /** Flag used to indicate if the destinations should be unique client. */
- protected boolean _isUnique;
-
- /** Flag used to indicate that durable destination should be used. */
- protected boolean _isDurable;
-
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
- protected boolean _failBeforeCommit;
-
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
- protected boolean _failAfterCommit;
-
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
- protected boolean _failBeforeSend;
-
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
- protected boolean _failAfterSend;
-
- /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
- protected boolean _failOnce;
-
- /** Holds the number of sends that should be performed in every transaction when using transactions. */
- protected int _txBatchSize;
-
- /** Holds the number of destinations to ping. */
- protected int _noOfDestinations;
-
- /** Holds the number of consumers per destination. */
- protected int _noOfConsumers;
-
- /** Holds the maximum send rate in herz. */
- protected int _rate;
-
- /**
- * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended
- * if this limit is breached.
- */
- protected int _maxPendingSize;
-
- /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
- private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);
-
- /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */
- private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0);
-
- /** Holds this instances unique id. */
- private int instanceId;
-
- /**
- * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple
- * ping producers on the same JVM.
- */
- private static Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
-
- /** A convenient formatter to use when time stamping output. */
- protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
-
- /** Holds the connection for the message producer. */
- protected Connection _connection;
-
- /** Holds the consumer connections. */
- protected Connection[] _consumerConnection;
-
- /** Holds the controlSession on which ping replies are received. */
- protected Session[] _consumerSession;
-
- /** Holds the producer controlSession, needed to create ping messages. */
- protected Session _producerSession;
-
- /** Holds the destination where the response messages will arrive. */
- protected Destination _replyDestination;
-
- /** Holds the set of destinations that this ping producer pings. */
- protected List<Destination> _pingDestinations;
-
- /** Used to restrict the sending rate to a specified limit. */
- protected Throttle _rateLimiter;
-
- /** Holds a message listener that this message listener chains all its messages to. */
- protected ChainedMessageListener _chainedMessageListener = null;
-
- /**
- * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
- * creating multiple ping producers in the same JVM.
- */
- protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
-
- /**
- * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
- * on the same JVM using this id generator will allow them to ping on the same queues.
- */
- protected AtomicInteger _queueSharedID = new AtomicInteger();
-
- /** Used to tell the ping loop when to terminate, it only runs while this is true. */
- protected boolean _publish = true;
-
- /** Holds the message producer to send the pings through. */
- protected MessageProducer _producer;
-
- /** Holds the message consumer to receive the ping replies through. */
- protected MessageConsumer[] _consumer;
-
- /** The prompt to display when asking the user to kill the broker for failover testing. */
- private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";
-
- /** Holds the name for this test client to be identified to the broker with. */
- private String _clientID;
-
- /** Keeps count of the total messages sent purely for debugging purposes. */
- private static AtomicInteger numSent = new AtomicInteger();
-
- /**
- * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
- * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a
- * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an
- * equal chance to produce messages.
- */
- static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true);
-
- /** Keeps a count of the number of message currently sent but not received. */
- static AtomicInteger _unreceived = new AtomicInteger(0);
-
- /**
- * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
- * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
- * it, to send and recieve its pings and replies on.
- *
- * @param overrides Properties containing any desired overrides to the defaults.
- *
- * @throws Exception Any exceptions are allowed to fall through.
- */
- public PingPongProducer(Properties overrides) throws Exception
- {
- // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
- instanceId = _instanceIdGenerator.getAndIncrement();
-
- // Create a set of parsed properties from the defaults overriden by the passed in values.
- ParsedProperties properties = new ParsedProperties(defaults);
- properties.putAll(overrides);
-
- // Extract the configuration properties to set the pinger up with.
- _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME);
- _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME);
- _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME);
- _brokerDetails = properties.getProperty(BROKER_PROPNAME);
- _username = properties.getProperty(USERNAME_PROPNAME);
- _password = properties.getProperty(PASSWORD_PROPNAME);
- _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME);
- _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
- _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME);
- _selector = properties.getProperty(SELECTOR_PROPNAME);
- _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
- _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
- _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
- _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
- _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
- _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME);
- _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME);
- _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME);
- _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME);
- _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME);
- _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
- _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
- _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME);
- _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
- _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
- _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
- _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
- _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
- _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
- _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
-
- // Check that one or more destinations were specified.
- if (_noOfDestinations < 1)
- {
- throw new IllegalArgumentException("There must be at least one destination.");
- }
-
- // Set up a throttle to control the send rate, if a rate > 0 is specified.
- if (_rate > 0)
- {
- _rateLimiter = new BatchedThrottle();
- _rateLimiter.setRate(_rate);
- }
-
- // Create the connection and message producers/consumers.
- // establishConnection(true, true);
- }
-
- /**
- * Establishes a connection to the broker and creates message consumers and producers based on the parameters
- * that this ping client was created with.
- *
- * @param producer Flag to indicate whether or not the producer should be set up.
- * @param consumer Flag to indicate whether or not the consumers should be set up.
- *
- * @throws Exception Any exceptions are allowed to fall through.
- */
- public void establishConnection(boolean producer, boolean consumer) throws Exception
- {
- // log.debug("public void establishConnection(): called");
-
- // Generate a unique identifying name for this client, based on it ip address and the current time.
- InetAddress address = InetAddress.getLocalHost();
- // _clientID = address.getHostName() + System.currentTimeMillis();
- _clientID = "perftest_" + instanceId;
-
- // Create a connection to the broker.
- createConnection(_clientID);
-
- // Create transactional or non-transactional sessions, based on the command line arguments.
- _producerSession = _connection.createSession(_transacted, _ackMode);
-
- _consumerSession = new Session[_noOfConsumers];
-
- for (int i = 0; i < _noOfConsumers; i++)
- {
- _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode);
- }
-
- // Create the destinations to send pings to and receive replies from.
- _replyDestination = _consumerSession[0].createTemporaryQueue();
- createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);
-
- // Create the message producer only if instructed to.
- if (producer)
- {
- createProducer();
- }
-
- // Create the message consumer only if instructed to.
- if (consumer)
- {
- createReplyConsumers(getReplyDestinations(), _selector);
- }
- }
-
- /**
- * Establishes a connection to the broker, based on the configuration parameters that this ping client was
- * created with.
- *
- * @param clientID The clients identifier.
- *
- * @throws JMSException Underlying exceptions allowed to fall through.
- * @throws NamingException Underlying exceptions allowed to fall through.
- * @throws IOException Underlying exceptions allowed to fall through.
- */
- protected void createConnection(String clientID) throws JMSException, NamingException, IOException
- {
- // _log.debug("protected void createConnection(String clientID = " + clientID + "): called");
-
- // _log.debug("Creating a connection for the message producer.");
- File propsFile = new File(_fileProperties);
- InputStream is = new FileInputStream(propsFile);
- Properties properties = new Properties();
- properties.load(is);
-
- Context context = new InitialContext(properties);
- ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName);
- _connection = factory.createConnection(_username, _password);
-
- if (_overrideClientId)
- {
- _connection.setClientID(clientID);
- }
-
- // _log.debug("Creating " + _noOfConsumers + " connections for the consumers.");
-
- _consumerConnection = new Connection[_noOfConsumers];
-
- for (int i = 0; i < _noOfConsumers; i++)
- {
- _consumerConnection[i] = factory.createConnection(_username, _password);
- // _consumerConnection[i].setClientID(clientID);
- }
- }
-
- /**
- * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
- * to be started to bounce the pings back again.
- *
- * @param args The command line arguments.
- */
- public static void main(String[] args)
- {
- try
- {
- Properties options =
- CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());
-
- // Create a ping producer overriding its defaults with all options passed on the command line.
- PingPongProducer pingProducer = new PingPongProducer(options);
- pingProducer.establishConnection(true, true);
-
- // Start the ping producers dispatch thread running.
- pingProducer._connection.start();
-
- // Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
-
- // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- pingProducer._connection.setExceptionListener(pingProducer);
-
- // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
- Thread pingThread = new Thread(pingProducer);
- pingThread.run();
- pingThread.join();
- }
- catch (Exception e)
- {
- System.err.println(e.getMessage());
- log.error("Top level handler caught execption.", e);
- System.exit(1);
- }
- }
-
- /**
- * Convenience method for a short pause.
- *
- * @param sleepTime The time in milliseconds to pause for.
- */
- public static void pause(long sleepTime)
- {
- if (sleepTime > 0)
- {
- try
- {
- Thread.sleep(sleepTime);
- }
- catch (InterruptedException ie)
- { }
- }
- }
-
- /**
- * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to
- * destination of this pinger.
- *
- * @return The single reply to destination of this pinger, wrapped in a list.
- */
- public List<Destination> getReplyDestinations()
- {
- // log.debug("public List<Destination> getReplyDestinations(): called");
-
- List<Destination> replyDestinations = new ArrayList<Destination>();
- replyDestinations.add(_replyDestination);
-
- // log.debug("replyDestinations = " + replyDestinations);
-
- return replyDestinations;
- }
-
- /**
- * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
- * flag is set accoring the ping producer creation options.
- *
- * @throws JMSException Any JMSExceptions are allowed to fall through.
- */
- public void createProducer() throws JMSException
- {
- // log.debug("public void createProducer(): called");
-
- _producer = (MessageProducer) _producerSession.createProducer(null);
- _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
- }
-
- /**
- * Creates consumers for the specified number of destinations. The destinations themselves are also created by this
- * method.
- *
- * @param noOfDestinations The number of destinations to create consumers for.
- * @param selector The message selector to filter the consumers with.
- * @param rootName The root of the name, or actual name if only one is being created.
- * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the
- * numbering with all pingers on the same JVM.
- * @param durable If the destinations are durable topics.
- *
- * @throws JMSException Any JMSExceptions are allowed to fall through.
- */
- public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
- boolean durable) throws JMSException
- {
- /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
- + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
- + durable + "): called");*/
-
- _pingDestinations = new ArrayList<Destination>();
-
- // Create the desired number of ping destinations and consumers for them.
- // log.debug("Creating " + noOfDestinations + " destinations to ping.");
-
- for (int i = 0; i < noOfDestinations; i++)
- {
- Destination destination;
- String id;
-
- // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
- if (unique)
- {
- // log.debug("Creating unique destinations.");
- id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
- }
- else
- {
- // log.debug("Creating shared destinations.");
- id = "_" + _queueSharedID.incrementAndGet();
- }
-
- // Check if this is a pub/sub pinger, in which case create topics.
- if (_isPubSub)
- {
- destination = _producerSession.createTopic(rootName + id);
- // log.debug("Created non-durable topic " + destination);
-
- if (durable)
- {
- _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID());
- }
- }
- // Otherwise this is a p2p pinger, in which case create queues.
- else
- {
- destination = _producerSession.createQueue(rootName + id + _queueNamePostfix);
- // log.debug("Created queue " + destination);
- }
-
- // Keep the destination.
- _pingDestinations.add(destination);
- }
- }
-
- /**
- * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
- *
- * @param destinations The destinations to listen to.
- * @param selector A selector to filter the messages with.
- *
- * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
- */
- public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
- {
- /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
- + ", String selector = " + selector + "): called");*/
-
- log.debug("There are " + destinations.size() + " destinations.");
- log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
- log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));
-
- for (Destination destination : destinations)
- {
- _consumer = new MessageConsumer[_noOfConsumers];
-
- for (int i = 0; i < _noOfConsumers; i++)
- {
- // Create a consumer for the destination and set this pinger to listen to its messages.
- _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT);
-
- final int consumerNo = i;
-
- _consumer[i].setMessageListener(new MessageListener()
- {
- public void onMessage(Message message)
- {
- onMessageWithConsumerNo(message, consumerNo);
- }
- });
-
- log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
- }
- }
- }
-
- /**
- * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
- * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the
- * replies map.
- *
- * @param message The received message.
- * @param consumerNo The consumer number within this test pinger instance.
- */
- public void onMessageWithConsumerNo(Message message, int consumerNo)
- {
- // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called");
- try
- {
- long now = System.nanoTime();
- long timestamp = getTimestamp(message);
- long pingTime = now - timestamp;
-
- // NDC.push("id" + instanceId + "/cons" + consumerNo);
-
- // Extract the messages correlation id.
- String correlationID = message.getJMSCorrelationID();
- // log.debug("correlationID = " + correlationID);
-
- // int num = message.getIntProperty("MSG_NUM");
- // log.info("Message " + num + " received.");
-
- boolean isRedelivered = message.getJMSRedelivered();
- // log.debug("isRedelivered = " + isRedelivered);
-
- if (!isRedelivered)
- {
- // Countdown on the traffic light if there is one for the matching correlation id.
- PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
-
- if (perCorrelationId != null)
- {
- CountDownLatch trafficLight = perCorrelationId.trafficLight;
-
- // Restart the timeout timer on every message.
- perCorrelationId.timeOutStart = System.nanoTime();
-
- // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
-
- // Release waiting senders if there are some and using maxPending limit.
- if ((_maxPendingSize > 0))
- {
- // Decrement the count of sent but not yet received messages.
- int unreceived = _unreceived.decrementAndGet();
- int unreceivedSize =
- (unreceived * ((_messageSize == 0) ? 1 : _messageSize))
- / (_isPubSub ? getConsumersPerDestination() : 1);
-
- // log.debug("unreceived = " + unreceived);
- // log.debug("unreceivedSize = " + unreceivedSize);
-
- // synchronized (_sendPauseMonitor)
- // {
- if (unreceivedSize < _maxPendingSize)
- {
- _sendPauseMonitor.poll();
- }
- // }
- }
-
- // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
- // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
- // ensures that each thread will get a unique value for the remaining messages.
- long trueCount;
- long remainingCount;
-
- synchronized (trafficLight)
- {
- trafficLight.countDown();
-
- trueCount = trafficLight.getCount();
- remainingCount = trueCount - 1;
-
- // NDC.push("/rem" + remainingCount);
-
- // log.debug("remainingCount = " + remainingCount);
- // log.debug("trueCount = " + trueCount);
-
- // Commit on transaction batch size boundaries. At this point in time the waiting producer
- // remains blocked, even on the last message.
- // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on
- // each batch boundary. For pub/sub each consumer gets every message so no division is done.
- // When running in client ack mode, an ack is done instead of a commit, on the commit batch
- // size boundaries.
- long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
- // log.debug("commitCount = " + commitCount);
-
- if ((commitCount % _txBatchSize) == 0)
- {
- if (_consAckMode == 2)
- {
- // log.debug("Doing client ack for consumer " + consumerNo + ".");
- message.acknowledge();
- }
- else
- {
- // log.debug("Trying commit for consumer " + consumerNo + ".");
- commitTx(_consumerSession[consumerNo]);
- // log.info("Tx committed on consumer " + consumerNo);
- }
- }
-
- // Forward the message and remaining count to any interested chained message listener.
- if (_chainedMessageListener != null)
- {
- _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime);
- }
-
- // Check if this is the last message, in which case release any waiting producers. This is done
- // after the transaction has been committed and any listeners notified.
- if (trueCount == 1)
- {
- trafficLight.countDown();
- }
- }
- }
- else
- {
- log.warn("Got unexpected message with correlationId: " + correlationID);
- }
- }
- else
- {
- log.warn("Got redelivered message, ignoring.");
- }
- }
- catch (JMSException e)
- {
- log.warn("There was a JMSException: " + e.getMessage(), e);
- }
- finally
- {
- // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending");
- // NDC.clear();
- }
- }
-
- /**
- * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
- * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
- * the correlation id.
- *
- * @param message The message to send. If this is null, one is generated.
- * @param numPings The number of ping messages to send.
- * @param timeout The timeout in milliseconds.
- * @param messageCorrelationId The message correlation id. If this is null, one is generated.
- *
- * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
- * for all prematurely.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- * @throws InterruptedException When interrupted by a timeout
- */
- public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
- {
- /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
- + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
-
- // Generate a unique correlation id to put on the messages before sending them, if one was not specified.
- if (messageCorrelationId == null)
- {
- messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());
- }
-
- try
- {
- // NDC.push("prod");
-
- // Create a count down latch to count the number of replies with. This is created before the messages are
- // sent so that the replies cannot be received before the count down is created.
- // One is added to this, so that the last reply becomes a special case. The special case is that the
- // chained message listener must be called before this sender can be unblocked, but that decrementing the
- // countdown needs to be done before the chained listener can be called.
- PerCorrelationId perCorrelationId = new PerCorrelationId();
-
- perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1);
- perCorrelationIds.put(messageCorrelationId, perCorrelationId);
-
- // Set up the current time as the start time for pinging on the correlation id. This is used to determine
- // timeouts.
- perCorrelationId.timeOutStart = System.nanoTime();
-
- // Send the specifed number of messages.
- pingNoWaitForReply(message, numPings, messageCorrelationId);
-
- boolean timedOut;
- boolean allMessagesReceived;
- int numReplies;
-
- do
- {
- // Block the current thread until replies to all the messages are received, or it times out.
- perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);
-
- // Work out how many replies were receieved.
- numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount();
-
- allMessagesReceived = numReplies == getExpectedNumPings(numPings);
-
- // log.debug("numReplies = " + numReplies);
- // log.debug("allMessagesReceived = " + allMessagesReceived);
-
- // Recheck the timeout condition.
- long now = System.nanoTime();
- long lastMessageReceievedAt = perCorrelationId.timeOutStart;
- timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);
-
- // log.debug("now = " + now);
- // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
- }
- while (!timedOut && !allMessagesReceived);
-
- if ((numReplies < getExpectedNumPings(numPings)) && _verbose)
- {
- log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
- }
- else if (_verbose)
- {
- log.info("Got all replies on id, " + messageCorrelationId);
- }
-
- // commitTx(_consumerSession);
-
- // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
-
- return numReplies;
- }
- // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived,
- // so will be a memory leak if this is not done.
- finally
- {
- // NDC.pop();
- perCorrelationIds.remove(messageCorrelationId);
- }
- }
-
- /**
- * Sends the specified number of ping messages and does not wait for correlating replies.
- *
- * @param message The message to send.
- * @param numPings The number of pings to send.
- * @param messageCorrelationId A correlation id to place on all messages sent.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
- public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
- {
- /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
- + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/
-
- if (message == null)
- {
- message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
- }
-
- message.setJMSCorrelationID(messageCorrelationId);
-
- // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
- // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
- // needed.
- boolean committed = false;
-
- // Send all of the ping messages.
- for (int i = 0; i < numPings; i++)
- {
- // Re-timestamp the message.
- // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
-
- // Send the message, passing in the message count.
- committed = sendMessage(i, message);
-
- // Spew out per message timings on every message sonly in verbose mode.
- /*if (_verbose)
- {
- log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
- }*/
- }
-
- // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
- if (!committed)
- {
- commitTx(_producerSession);
- }
- }
-
- /**
- * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of
- * messages sent so far must be specified and is used to round robin the ping destinations (where there are more
- * than one), and to determine if the transaction batch size has been reached and the sent messages should be
- * committed.
- *
- * @param i The count of messages sent so far in a loop of multiple calls to this send method.
- * @param message The message to send.
- *
- * @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise.
- *
- * @throws JMSException All underlyiung JMSExceptions are allowed to fall through.
- */
- protected boolean sendMessage(int i, Message message) throws JMSException
- {
- try
- {
- NDC.push("id" + instanceId + "/prod");
-
- // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
- // log.debug("_txBatchSize = " + _txBatchSize);
-
- // Round robin the destinations as the messages are sent.
- Destination destination = _pingDestinations.get(i % _pingDestinations.size());
-
- // Prompt the user to kill the broker when doing failover testing.
- _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);
-
- // Get the test setup for the correlation id.
- String correlationID = message.getJMSCorrelationID();
- PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);
-
- // If necessary, wait until the max pending message size comes within its limit.
- if (_maxPendingSize > 0)
- {
- synchronized (_sendPauseMonitor)
- {
- // Used to keep track of the number of times that send has to wait.
- int numWaits = 0;
-
- // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with
- // the test timeout.
- int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);
-
- while (true)
- {
- // Get the size estimate of sent but not yet received messages.
- int unreceived = _unreceived.get();
- int unreceivedSize =
- (unreceived * ((_messageSize == 0) ? 1 : _messageSize))
- / (_isPubSub ? getConsumersPerDestination() : 1);
-
- // log.debug("unreceived = " + unreceived);
- // log.debug("unreceivedSize = " + unreceivedSize);
- // log.debug("_maxPendingSize = " + _maxPendingSize);
-
- if (unreceivedSize > _maxPendingSize)
- {
- // log.debug("unreceived size estimate over limit = " + unreceivedSize);
-
- // Fail the test if the send has had to wait more than the maximum allowed number of times.
- if (numWaits > waitLimit)
- {
- String errorMessage =
- "Send has had to wait for the unreceivedSize (" + unreceivedSize
- + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit
- + " times.";
- log.warn(errorMessage);
- throw new RuntimeException(errorMessage);
- }
-
- // Wait on the send pause barrier for the limit to be re-established.
- try
- {
- long start = System.nanoTime();
- // _sendPauseMonitor.wait(10000);
- _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS);
- long end = System.nanoTime();
-
- // Count the wait only if it was for > 99% of the requested wait time.
- if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99)
- {
- numWaits++;
- }
- }
- catch (InterruptedException e)
- {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- }
- else
- {
- break;
- }
- }
- }
- }
-
- // Send the message either to its round robin destination, or its default destination.
- // int num = numSent.incrementAndGet();
- // message.setIntProperty("MSG_NUM", num);
- setTimestamp(message);
-
- if (destination == null)
- {
- _producer.send(message);
- }
- else
- {
- _producer.send(destination, message);
- }
-
- // Increase the unreceived size, this may actually happen after the message is received.
- // The unreceived size is incremented by the number of consumers that will get a copy of the message,
- // in pub/sub mode.
- if (_maxPendingSize > 0)
- {
- int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
- // log.debug("newUnreceivedCount = " + newUnreceivedCount);
- }
-
- // Apply message rate throttling if a rate limit has been set up.
- if (_rateLimiter != null)
- {
- _rateLimiter.throttle();
- }
-
- // Call commit every time the commit batch size is reached.
- boolean committed = false;
-
- // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
- if (((i + 1) % _txBatchSize) == 0)
- {
- // log.debug("Trying commit on producer session.");
- committed = commitTx(_producerSession);
- }
-
- return committed;
- }
- finally
- {
- NDC.clear();
- }
- }
-
- /**
- * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the
- * test that the failure has occurred, before the method returns.
- *
- * @param failFlag The fail flag to test.
- *
- * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only
- * used once, then reset.
- */
- private boolean waitForUserToPromptOnFailure(boolean failFlag)
- {
- if (failFlag)
- {
- if (_failOnce)
- {
- failFlag = false;
- }
-
- // log.debug("Failing Before Send");
- waitForUser(KILL_BROKER_PROMPT);
- }
-
- return failFlag;
- }
-
- /**
- * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch
- * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will
- * terminate the pinger.
- */
- public void pingLoop()
- {
- try
- {
- // Generate a sample message and time stamp it.
- Message msg = getTestMessage(_replyDestination, _messageSize, _persistent);
- // setTimestamp(msg);
-
- // Send the message and wait for a reply.
- pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
- }
- catch (JMSException e)
- {
- _publish = false;
- // log.debug("There was a JMSException: " + e.getMessage(), e);
- }
- catch (InterruptedException e)
- {
- _publish = false;
- // log.debug("There was an interruption: " + e.getMessage(), e);
- }
- }
-
- /**
- * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
- * here.
- *
- * @param messageListener The chained message listener.
- */
- public void setChainedMessageListener(ChainedMessageListener messageListener)
- {
- _chainedMessageListener = messageListener;
- }
-
- /** Removes any chained message listeners from this pinger. */
- public void removeChainedMessageListener()
- {
- _chainedMessageListener = null;
- }
-
- /**
- * Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
- *
- * @param replyQueue The reply-to destination for the message.
- * @param messageSize The desired size of the message in bytes.
- * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
- *
- * @return A freshly generated test message.
- *
- * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
- */
- public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
- {
- // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
- return TestUtils.createTestMessageOfSize(_producerSession, messageSize);
- }
-
- /**
- * Sets the current time in nanoseconds as the timestamp on the message.
- *
- * @param msg The message to timestamp.
- *
- * @throws JMSException Any JMSExceptions are allowed to fall through.
- */
- protected void setTimestamp(Message msg) throws JMSException
- {
- /*if (((AMQSession)_producerSession).isStrictAMQP())
- {
- ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime());
- }
- else
- {*/
- msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
- // }
- }
-
- /**
- * Extracts the nanosecond timestamp from a message.
- *
- * @param msg The message to extract the time stamp from.
- *
- * @return The timestamp in nanos.
- *
- * @throws JMSException Any JMSExceptions are allowed to fall through.
- */
- protected long getTimestamp(Message msg) throws JMSException
- {
- /*if (((AMQSession)_producerSession).isStrictAMQP())
- {
- Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
-
- return (value == null) ? 0L : value;
- }
- else
- {*/
- return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
- // }
- }
-
- /**
- * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag
- * has been cleared.
- */
- public void stop()
- {
- _publish = false;
- }
-
- /**
- * Starts the producer and consumer connections.
- *
- * @throws JMSException Any JMSExceptions are allowed to fall through.
- */
- public void start() throws JMSException
- {
- // log.debug("public void start(): called");
-
- _connection.start();
- // log.debug("Producer started.");
-
- for (int i = 0; i < _noOfConsumers; i++)
- {
- _consumerConnection[i].start();
- // log.debug("Consumer " + i + " started.");
- }
- }
-
- /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
- public void run()
- {
- // Keep running until the publish flag is cleared.
- while (_publish)
- {
- pingLoop();
- }
- }
-
- /**
- * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
- * connection, this clears the publish flag which in turn will halt the ping loop.
- *
- * @param e The exception that triggered this callback method.
- */
- public void onException(JMSException e)
- {
- // log.debug("public void onException(JMSException e = " + e + "): called", e);
- _publish = false;
- }
-
- /**
- * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
- * with the runtime system as a shutdown hook.
- *
- * @return A shutdown hook for the ping loop.
- */
- public Thread getShutdownHook()
- {
- return new Thread(new Runnable()
- {
- public void run()
- {
- stop();
- }
- });
- }
-
- /**
- * Closes all of the producer and consumer connections.
- *
- * @throws JMSException All JMSException are allowed to fall through.
- */
- public void close() throws JMSException
- {
- // log.debug("public void close(): called");
-
- try
- {
- if (_connection != null)
- {
- // log.debug("Before close producer connection.");
- _connection.close();
- // log.debug("Closed producer connection.");
- }
-
- for (int i = 0; i < _noOfConsumers; i++)
- {
- if (_consumerConnection[i] != null)
- {
- // log.debug("Before close consumer connection " + i + ".");
- _consumerConnection[i].close();
- // log.debug("Closed consumer connection " + i + ".");
- }
- }
- }
- finally
- {
- _connection = null;
- _producerSession = null;
- _consumerSession = null;
- _consumerConnection = null;
- _producer = null;
- _consumer = null;
- _pingDestinations = null;
- _replyDestination = null;
- }
- }
-
- /**
- * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a
- * transactional controlSession, this method does nothing (unless the failover after send flag is set).
- *
- * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is
- * applied. This flag applies whether the pinger is transactional or not.
- *
- * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit
- * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the
- * commit is applied. These flags will only apply if using a transactional pinger.
- *
- * @param session The controlSession to commit
- *
- * @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not.
- *
- * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
- *
- * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
- * method, because commits only apply to transactional pingers, but fail after send applied to transactional and
- * non-transactional alike.
- */
- protected boolean commitTx(Session session) throws JMSException
- {
- // log.debug("protected void commitTx(Session session): called");
-
- boolean committed = false;
-
- _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend);
-
- if (session.getTransacted())
- {
- // log.debug("Session is transacted.");
-
- try
- {
- _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit);
-
- long start = System.nanoTime();
- session.commit();
- committed = true;
- // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");
-
- _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit);
-
- // log.debug("Session Commited.");
- }
- catch (JMSException e)
- {
- // log.debug("JMSException on commit:" + e.getMessage(), e);
-
- try
- {
- session.rollback();
- // log.debug("Message rolled back.");
- }
- catch (JMSException jmse)
- {
- // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);
-
- // Both commit and rollback failed. Throw the rollback exception.
- throw jmse;
- }
- }
- }
-
- return committed;
- }
-
- /**
- * Outputs a prompt to the console and waits for the user to press return.
- *
- * @param prompt The prompt to display on the console.
- */
- public void waitForUser(String prompt)
- {
- System.out.println(prompt);
-
- try
- {
- System.in.read();
- }
- catch (IOException e)
- {
- // Ignored.
- }
-
- System.out.println("Continuing.");
- }
-
- /**
- * Gets the number of consumers that are listening to each destination in the test.
- *
- * @return int The number of consumers subscribing to each topic.
- */
- public int getConsumersPerDestination()
- {
- return _noOfConsumers;
- }
-
- /**
- * Calculates how many pings are expected to be received for the given number sent.
- *
- * @param numpings The number of pings that will be sent.
- *
- * @return The number that should be received, for the test to pass.
- */
- public int getExpectedNumPings(int numpings)
- {
- // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called");
-
- // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers.");
-
- return numpings * (_isPubSub ? getConsumersPerDestination() : 1);
- }
-
- /**
- * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link
- * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link
- * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of
- * messages with that correlation id.
- *
- * <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be
- * given unique message counts. It will always be called while the producer waiting for all messages to arrive is
- * still blocked.
- */
- public static interface ChainedMessageListener
- {
- /**
- * Notifies interested listeners about message arrival and important test stats, the number of messages
- * remaining in the test, and the messages send timestamp.
- *
- * @param message The newly arrived message.
- * @param remainingCount The number of messages left to complete the test.
- * @param latency The nanosecond latency of the message.
- *
- * @throws JMSException Any JMS exceptions is allowed to fall through.
- */
- public void onMessage(Message message, int remainingCount, long latency) throws JMSException;
- }
-
- /**
- * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be
- * added to this: read/write lock to make onMessage more concurrent as described in class header comment.
- */
- protected static class PerCorrelationId
- {
- /** Holds a countdown on number of expected messages. */
- CountDownLatch trafficLight;
-
- /** Holds the last timestamp that the timeout was reset to. */
- Long timeOutStart;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import org.apache.qpid.test.framework.TestUtils; + +import org.apache.qpid.junit.extensions.BatchedThrottle; +import org.apache.qpid.junit.extensions.Throttle; +import org.apache.qpid.junit.extensions.util.CommandLineParser; +import org.apache.qpid.junit.extensions.util.ParsedProperties; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import java.io.*; +import java.net.InetAddress; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may + * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens + * to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping + * pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour + * configurable. + * + * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This + * means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation + * id in the ping to be bounced back in the reply correlation id. + * + * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It + * can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within + * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover + * testing. A complete list of accepted parameters, default values and comments on their usage is provided here: + * + * <p/><table><caption>Parameters</caption> + * <tr><th> Parameter <th> Default <th> Comments + * <tr><td> messageSize <td> 0 <td> Message size in bytes. Not including any headers. + * <tr><td> destinationName <td> ping <td> The root name to use to generate destination names to ping. + * <tr><td> persistent <td> false <td> Determines whether peristent delivery is used. + * <tr><td> transacted <td> false <td> Determines whether messages are sent/received in transactions. + * <tr><td> broker <td> tcp://localhost:5672 <td> Determines the broker to connect to. + * <tr><td> virtualHost <td> test <td> Determines the virtual host to send all ping over. + * <tr><td> rate <td> 0 <td> The maximum rate (in hertz) to send messages at. 0 means no limit. + * <tr><td> verbose <td> false <td> The verbose flag for debugging. Prints to console on every message. + * <tr><td> pubsub <td> false <td> Whether to ping topics or queues. Uses p2p by default. + * <tr><td> failAfterCommit <td> false <td> Whether to prompt user to kill broker after a commit batch. + * <tr><td> failBeforeCommit <td> false <td> Whether to prompt user to kill broker before a commit batch. + * <tr><td> failAfterSend <td> false <td> Whether to prompt user to kill broker after a send. + * <tr><td> failBeforeSend <td> false <td> Whether to prompt user to kill broker before a send. + * <tr><td> failOnce <td> true <td> Whether to prompt for failover only once. + * <tr><td> username <td> guest <td> The username to access the broker with. + * <tr><td> password <td> guest <td> The password to access the broker with. + * <tr><td> selector <td> null <td> Not used. Defines a message selector to filter pings with. + * <tr><td> destinationCount <td> 1 <td> The number of destinations to send pings to. + * <tr><td> numConsumers <td> 1 <td> The number of consumers on each destination. + * <tr><td> timeout <td> 30000 <td> In milliseconds. The timeout to stop waiting for replies. + * <tr><td> commitBatchSize <td> 1 <td> The number of messages per transaction in transactional mode. + * <tr><td> uniqueDests <td> true <td> Whether each receivers only listens to one ping destination or all. + * <tr><td> durableDests <td> false <td> Whether or not durable destinations are used. + * <tr><td> ackMode <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are: + * 0 - SESSION_TRANSACTED + * 1 - AUTO_ACKNOWLEDGE + * 2 - CLIENT_ACKNOWLEDGE + * 3 - DUPS_OK_ACKNOWLEDGE + * 257 - NO_ACKNOWLEDGE + * 258 - PRE_ACKNOWLEDGE + * <tr><td> consTransacted <td> false <td> Whether or not consumers use transactions. Defaults to the same value + * as the 'transacted' option if not seperately defined. + * <tr><td> consAckMode <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same + * value as 'ackMode' if not seperately defined. + * <tr><td> maxPending <td> 0 <td> The maximum size in bytes, of messages sent but not yet received. + * Limits the volume of messages currently buffered on the client + * or broker. Can help scale test clients by limiting amount of buffered + * data to avoid out of memory errors. + * </table> + * + * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop + * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by + * starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also + * registered to terminate the ping-pong loop cleanly. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Provide a ping and wait for all responses cycle. + * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url. + * </table> + * + * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair. + * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a + * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last + * message waits until all other messages have been handled before releasing producers but allows messages to be + * processed concurrently, unlike the current synchronized block. + */ +public class PingPongProducer implements Runnable, ExceptionListener +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(PingPongProducer.class); + + /** Holds the name of the property to determine whether of not client id is overridden at connection time. */ + public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId"; + + /** Holds the default value of the override client id flag. */ + public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false"; + + /** Holds the name of the property to define the JNDI factory name with. */ + public static final String FACTORY_NAME_PROPNAME = "factoryName"; + + /** Holds the default JNDI name of the connection factory. */ + public static final String FACTORY_NAME_DEAFULT = "local"; + + /** Holds the name of the property to set the JNDI initial context properties with. */ + public static final String FILE_PROPERTIES_PROPNAME = "properties"; + + /** Holds the default file name of the JNDI initial context properties. */ + public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties"; + + /** Holds the name of the property to get the test message size from. */ + public static final String MESSAGE_SIZE_PROPNAME = "messageSize"; + + /** Used to set up a default message size. */ + public static final int MESSAGE_SIZE_DEAFULT = 0; + + /** Holds the name of the property to get the ping queue name from. */ + public static final String PING_QUEUE_NAME_PROPNAME = "destinationName"; + + /** Holds the name of the default destination to send pings on. */ + public static final String PING_QUEUE_NAME_DEFAULT = "ping"; + + /** Holds the name of the property to get the queue name postfix from. */ + public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix"; + + /** Holds the default queue name postfix value. */ + public static final String QUEUE_NAME_POSTFIX_DEFAULT = ""; + + /** Holds the name of the property to get the test delivery mode from. */ + public static final String PERSISTENT_MODE_PROPNAME = "persistent"; + + /** Holds the message delivery mode to use for the test. */ + public static final boolean PERSISTENT_MODE_DEFAULT = false; + + /** Holds the name of the property to get the test transactional mode from. */ + public static final String TRANSACTED_PROPNAME = "transacted"; + + /** Holds the transactional mode to use for the test. */ + public static final boolean TRANSACTED_DEFAULT = false; + + /** Holds the name of the property to get the test consumer transacted mode from. */ + public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted"; + + /** Holds the consumer transactional mode default setting. */ + public static final boolean CONSUMER_TRANSACTED_DEFAULT = false; + + /** Holds the name of the property to get the test broker url from. */ + public static final String BROKER_PROPNAME = "broker"; + + /** Holds the default broker url for the test. */ + public static final String BROKER_DEFAULT = "tcp://localhost:5672"; + + /** Holds the name of the property to get the test broker virtual path. */ + public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; + + /** Holds the default virtual path for the test. */ + public static final String VIRTUAL_HOST_DEFAULT = ""; + + /** Holds the name of the property to get the message rate from. */ + public static final String RATE_PROPNAME = "rate"; + + /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ + public static final int RATE_DEFAULT = 0; + + /** Holds the name of the property to get the verbose mode proeprty from. */ + public static final String VERBOSE_PROPNAME = "verbose"; + + /** Holds the default verbose mode. */ + public static final boolean VERBOSE_DEFAULT = false; + + /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */ + public static final String PUBSUB_PROPNAME = "pubsub"; + + /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ + public static final boolean PUBSUB_DEFAULT = false; + + /** Holds the name of the property to get the fail after commit flag from. */ + public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit"; + + /** Holds the default failover after commit test flag. */ + public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false; + + /** Holds the name of the proeprty to get the fail before commit flag from. */ + public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit"; + + /** Holds the default failover before commit test flag. */ + public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false; + + /** Holds the name of the proeprty to get the fail after send flag from. */ + public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend"; + + /** Holds the default failover after send test flag. */ + public static final boolean FAIL_AFTER_SEND_DEFAULT = false; + + /** Holds the name of the property to get the fail before send flag from. */ + public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend"; + + /** Holds the default failover before send test flag. */ + public static final boolean FAIL_BEFORE_SEND_DEFAULT = false; + + /** Holds the name of the property to get the fail once flag from. */ + public static final String FAIL_ONCE_PROPNAME = "failOnce"; + + /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */ + public static final boolean FAIL_ONCE_DEFAULT = true; + + /** Holds the name of the property to get the broker access username from. */ + public static final String USERNAME_PROPNAME = "username"; + + /** Holds the default broker log on username. */ + public static final String USERNAME_DEFAULT = "guest"; + + /** Holds the name of the property to get the broker access password from. */ + public static final String PASSWORD_PROPNAME = "password"; + + /** Holds the default broker log on password. */ + public static final String PASSWORD_DEFAULT = "guest"; + + /** Holds the name of the proeprty to get the. */ + public static final String SELECTOR_PROPNAME = "selector"; + + /** Holds the default message selector. */ + public static final String SELECTOR_DEFAULT = ""; + + /** Holds the name of the property to get the destination count from. */ + public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; + + /** Defines the default number of destinations to ping. */ + public static final int DESTINATION_COUNT_DEFAULT = 1; + + /** Holds the name of the property to get the number of consumers per destination from. */ + public static final String NUM_CONSUMERS_PROPNAME = "numConsumers"; + + /** Defines the default number consumers per destination. */ + public static final int NUM_CONSUMERS_DEFAULT = 1; + + /** Holds the name of the property to get the waiting timeout for response messages. */ + public static final String TIMEOUT_PROPNAME = "timeout"; + + /** Default time to wait before assuming that a ping has timed out. */ + public static final long TIMEOUT_DEFAULT = 30000; + + /** Holds the name of the property to get the commit batch size from. */ + public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; + + /** Defines the default number of pings to send in each transaction when running transactionally. */ + public static final int TX_BATCH_SIZE_DEFAULT = 1; + + /** Holds the name of the property to get the unique destinations flag from. */ + public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests"; + + /** Defines the default value for the unique destinations property. */ + public static final boolean UNIQUE_DESTS_DEFAULT = true; + + /** Holds the name of the property to get the durable destinations flag from. */ + public static final String DURABLE_DESTS_PROPNAME = "durableDests"; + + /** Defines the default value of the durable destinations flag. */ + public static final boolean DURABLE_DESTS_DEFAULT = false; + + /** Holds the name of the proeprty to get the message acknowledgement mode from. */ + public static final String ACK_MODE_PROPNAME = "ackMode"; + + /** Defines the default message acknowledgement mode. */ + public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + + /** Holds the name of the property to get the consumers message acknowledgement mode from. */ + public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode"; + + /** Defines the default consumers message acknowledgement mode. */ + public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; + + /** Holds the name of the property to get the maximum pending message size setting from. */ + public static final String MAX_PENDING_PROPNAME = "maxPending"; + + /** Defines the default value for the maximum pending message size setting. 0 means no limit. */ + public static final int MAX_PENDING_DEFAULT = 0; + + /** Defines the default prefetch size to use when consuming messages. */ + public static final int PREFETCH_DEFAULT = 100; + + /** Defines the default value of the no local flag to use when consuming messages. */ + public static final boolean NO_LOCAL_DEFAULT = false; + + /** Defines the default value of the exclusive flag to use when consuming messages. */ + public static final boolean EXCLUSIVE_DEFAULT = false; + + /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ + public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; + + /** Holds the default configuration properties. */ + public static ParsedProperties defaults = new ParsedProperties(); + + static + { + defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT); + defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT); + defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT); + defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); + defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); + defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); + defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); + defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); + defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT); + defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); + defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT); + defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); + defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT); + defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); + defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); + defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); + defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT); + defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); + defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT); + defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT); + defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT); + defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT); + defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT); + defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); + defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); + defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT); + defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); + defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); + defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); + } + + /** Allows setting of client ID on the connection, rather than through the connection URL. */ + protected boolean _overrideClientId; + + /** Holds the JNDI name of the JMS connection factory. */ + protected String _factoryName; + + /** Holds the name of the properties file to configure JNDI with. */ + protected String _fileProperties; + + /** Holds the broker url. */ + protected String _brokerDetails; + + /** Holds the username to access the broker with. */ + protected String _username; + + /** Holds the password to access the broker with. */ + protected String _password; + + /** Holds the virtual host on the broker to run the tests through. */ + protected String _virtualpath; + + /** Holds the root name from which to generate test destination names. */ + protected String _destinationName; + + /** Holds the default queue name postfix value. */ + protected String _queueNamePostfix; + + /** Holds the message selector to filter the pings with. */ + protected String _selector; + + /** Holds the producers transactional mode flag. */ + protected boolean _transacted; + + /** Holds the consumers transactional mode flag. */ + protected boolean _consTransacted; + + /** Determines whether this producer sends persistent messages. */ + protected boolean _persistent; + + /** Holds the acknowledgement mode used for the producers. */ + protected int _ackMode; + + /** Holds the acknowledgement mode setting for the consumers. */ + protected int _consAckMode; + + /** Determines what size of messages this producer sends. */ + protected int _messageSize; + + /** Used to indicate that the ping loop should print out whenever it pings. */ + protected boolean _verbose; + + /** Flag used to indicate if this is a point to point or pub/sub ping client. */ + protected boolean _isPubSub; + + /** Flag used to indicate if the destinations should be unique client. */ + protected boolean _isUnique; + + /** Flag used to indicate that durable destination should be used. */ + protected boolean _isDurable; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ + protected boolean _failBeforeCommit; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ + protected boolean _failAfterCommit; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ + protected boolean _failBeforeSend; + + /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ + protected boolean _failAfterSend; + + /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ + protected boolean _failOnce; + + /** Holds the number of sends that should be performed in every transaction when using transactions. */ + protected int _txBatchSize; + + /** Holds the number of destinations to ping. */ + protected int _noOfDestinations; + + /** Holds the number of consumers per destination. */ + protected int _noOfConsumers; + + /** Holds the maximum send rate in herz. */ + protected int _rate; + + /** + * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended + * if this limit is breached. + */ + protected int _maxPendingSize; + + /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ + private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); + + /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */ + private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0); + + /** Holds this instances unique id. */ + private int instanceId; + + /** + * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple + * ping producers on the same JVM. + */ + private static Map<String, PerCorrelationId> perCorrelationIds = + Collections.synchronizedMap(new HashMap<String, PerCorrelationId>()); + + /** A convenient formatter to use when time stamping output. */ + protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); + + /** Holds the connection for the message producer. */ + protected Connection _connection; + + /** Holds the consumer connections. */ + protected Connection[] _consumerConnection; + + /** Holds the controlSession on which ping replies are received. */ + protected Session[] _consumerSession; + + /** Holds the producer controlSession, needed to create ping messages. */ + protected Session _producerSession; + + /** Holds the destination where the response messages will arrive. */ + protected Destination _replyDestination; + + /** Holds the set of destinations that this ping producer pings. */ + protected List<Destination> _pingDestinations; + + /** Used to restrict the sending rate to a specified limit. */ + protected Throttle _rateLimiter; + + /** Holds a message listener that this message listener chains all its messages to. */ + protected ChainedMessageListener _chainedMessageListener = null; + + /** + * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when + * creating multiple ping producers in the same JVM. + */ + protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger(); + + /** + * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers + * on the same JVM using this id generator will allow them to ping on the same queues. + */ + protected AtomicInteger _queueSharedID = new AtomicInteger(); + + /** Used to tell the ping loop when to terminate, it only runs while this is true. */ + protected boolean _publish = true; + + /** Holds the message producer to send the pings through. */ + protected MessageProducer _producer; + + /** Holds the message consumer to receive the ping replies through. */ + protected MessageConsumer[] _consumer; + + /** The prompt to display when asking the user to kill the broker for failover testing. */ + private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; + + /** Holds the name for this test client to be identified to the broker with. */ + private String _clientID; + + /** Keeps count of the total messages sent purely for debugging purposes. */ + private static AtomicInteger numSent = new AtomicInteger(); + + /** + * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected + * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a + * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an + * equal chance to produce messages. + */ + static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true); + + /** Keeps a count of the number of message currently sent but not received. */ + static AtomicInteger _unreceived = new AtomicInteger(0); + + /** + * Creates a ping producer with the specified parameters, of which there are many. See the class level comments + * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on + * it, to send and recieve its pings and replies on. + * + * @param overrides Properties containing any desired overrides to the defaults. + * + * @throws Exception Any exceptions are allowed to fall through. + */ + public PingPongProducer(Properties overrides) throws Exception + { + // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); + instanceId = _instanceIdGenerator.getAndIncrement(); + + // Create a set of parsed properties from the defaults overriden by the passed in values. + ParsedProperties properties = new ParsedProperties(defaults); + properties.putAll(overrides); + + // Extract the configuration properties to set the pinger up with. + _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME); + _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME); + _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME); + _brokerDetails = properties.getProperty(BROKER_PROPNAME); + _username = properties.getProperty(USERNAME_PROPNAME); + _password = properties.getProperty(PASSWORD_PROPNAME); + _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); + _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME); + _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME); + _selector = properties.getProperty(SELECTOR_PROPNAME); + _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME); + _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME); + _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); + _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); + _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME); + _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME); + _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME); + _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME); + _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME); + _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME); + _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); + _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME); + _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME); + _rate = properties.getPropertyAsInteger(RATE_PROPNAME); + _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); + _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); + _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); + _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME); + _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); + _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); + + // Check that one or more destinations were specified. + if (_noOfDestinations < 1) + { + throw new IllegalArgumentException("There must be at least one destination."); + } + + // Set up a throttle to control the send rate, if a rate > 0 is specified. + if (_rate > 0) + { + _rateLimiter = new BatchedThrottle(); + _rateLimiter.setRate(_rate); + } + + // Create the connection and message producers/consumers. + // establishConnection(true, true); + } + + /** + * Establishes a connection to the broker and creates message consumers and producers based on the parameters + * that this ping client was created with. + * + * @param producer Flag to indicate whether or not the producer should be set up. + * @param consumer Flag to indicate whether or not the consumers should be set up. + * + * @throws Exception Any exceptions are allowed to fall through. + */ + public void establishConnection(boolean producer, boolean consumer) throws Exception + { + // log.debug("public void establishConnection(): called"); + + // Generate a unique identifying name for this client, based on it ip address and the current time. + InetAddress address = InetAddress.getLocalHost(); + // _clientID = address.getHostName() + System.currentTimeMillis(); + _clientID = "perftest_" + instanceId; + + // Create a connection to the broker. + createConnection(_clientID); + + // Create transactional or non-transactional sessions, based on the command line arguments. + _producerSession = _connection.createSession(_transacted, _ackMode); + + _consumerSession = new Session[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode); + } + + // Create the destinations to send pings to and receive replies from. + _replyDestination = _consumerSession[0].createTemporaryQueue(); + createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); + + // Create the message producer only if instructed to. + if (producer) + { + createProducer(); + } + + // Create the message consumer only if instructed to. + if (consumer) + { + createReplyConsumers(getReplyDestinations(), _selector); + } + } + + /** + * Establishes a connection to the broker, based on the configuration parameters that this ping client was + * created with. + * + * @param clientID The clients identifier. + * + * @throws JMSException Underlying exceptions allowed to fall through. + * @throws NamingException Underlying exceptions allowed to fall through. + * @throws IOException Underlying exceptions allowed to fall through. + */ + protected void createConnection(String clientID) throws JMSException, NamingException, IOException + { + // _log.debug("protected void createConnection(String clientID = " + clientID + "): called"); + + // _log.debug("Creating a connection for the message producer."); + File propsFile = new File(_fileProperties); + InputStream is = new FileInputStream(propsFile); + Properties properties = new Properties(); + properties.load(is); + + Context context = new InitialContext(properties); + ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName); + _connection = factory.createConnection(_username, _password); + + if (_overrideClientId) + { + _connection.setClientID(clientID); + } + + // _log.debug("Creating " + _noOfConsumers + " connections for the consumers."); + + _consumerConnection = new Connection[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerConnection[i] = factory.createConnection(_username, _password); + // _consumerConnection[i].setClientID(clientID); + } + } + + /** + * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs + * to be started to bounce the pings back again. + * + * @param args The command line arguments. + */ + public static void main(String[] args) + { + try + { + Properties options = + CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); + + // Create a ping producer overriding its defaults with all options passed on the command line. + PingPongProducer pingProducer = new PingPongProducer(options); + pingProducer.establishConnection(true, true); + + // Start the ping producers dispatch thread running. + pingProducer._connection.start(); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); + + // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. + pingProducer._connection.setExceptionListener(pingProducer); + + // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. + Thread pingThread = new Thread(pingProducer); + pingThread.run(); + pingThread.join(); + } + catch (Exception e) + { + System.err.println(e.getMessage()); + log.error("Top level handler caught execption.", e); + System.exit(1); + } + } + + /** + * Convenience method for a short pause. + * + * @param sleepTime The time in milliseconds to pause for. + */ + public static void pause(long sleepTime) + { + if (sleepTime > 0) + { + try + { + Thread.sleep(sleepTime); + } + catch (InterruptedException ie) + { } + } + } + + /** + * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to + * destination of this pinger. + * + * @return The single reply to destination of this pinger, wrapped in a list. + */ + public List<Destination> getReplyDestinations() + { + // log.debug("public List<Destination> getReplyDestinations(): called"); + + List<Destination> replyDestinations = new ArrayList<Destination>(); + replyDestinations.add(_replyDestination); + + // log.debug("replyDestinations = " + replyDestinations); + + return replyDestinations; + } + + /** + * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery + * flag is set accoring the ping producer creation options. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + public void createProducer() throws JMSException + { + // log.debug("public void createProducer(): called"); + + _producer = (MessageProducer) _producerSession.createProducer(null); + _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + + // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); + } + + /** + * Creates consumers for the specified number of destinations. The destinations themselves are also created by this + * method. + * + * @param noOfDestinations The number of destinations to create consumers for. + * @param selector The message selector to filter the consumers with. + * @param rootName The root of the name, or actual name if only one is being created. + * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the + * numbering with all pingers on the same JVM. + * @param durable If the destinations are durable topics. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, + boolean durable) throws JMSException + { + /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " + + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " + + durable + "): called");*/ + + _pingDestinations = new ArrayList<Destination>(); + + // Create the desired number of ping destinations and consumers for them. + // log.debug("Creating " + noOfDestinations + " destinations to ping."); + + for (int i = 0; i < noOfDestinations; i++) + { + Destination destination; + String id; + + // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. + if (unique) + { + // log.debug("Creating unique destinations."); + id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); + } + else + { + // log.debug("Creating shared destinations."); + id = "_" + _queueSharedID.incrementAndGet(); + } + + // Check if this is a pub/sub pinger, in which case create topics. + if (_isPubSub) + { + destination = _producerSession.createTopic(rootName + id); + // log.debug("Created non-durable topic " + destination); + + if (durable) + { + _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID()); + } + } + // Otherwise this is a p2p pinger, in which case create queues. + else + { + destination = _producerSession.createQueue(rootName + id + _queueNamePostfix); + // log.debug("Created queue " + destination); + } + + // Keep the destination. + _pingDestinations.add(destination); + } + } + + /** + * Creates consumers for the specified destinations and registers this pinger to listen to their messages. + * + * @param destinations The destinations to listen to. + * @param selector A selector to filter the messages with. + * + * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. + */ + public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException + { + /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations + + ", String selector = " + selector + "): called");*/ + + log.debug("There are " + destinations.size() + " destinations."); + log.debug("Creating " + _noOfConsumers + " consumers on each destination."); + log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); + + for (Destination destination : destinations) + { + _consumer = new MessageConsumer[_noOfConsumers]; + + for (int i = 0; i < _noOfConsumers; i++) + { + // Create a consumer for the destination and set this pinger to listen to its messages. + _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT); + + final int consumerNo = i; + + _consumer[i].setMessageListener(new MessageListener() + { + public void onMessage(Message message) + { + onMessageWithConsumerNo(message, consumerNo); + } + }); + + log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); + } + } + } + + /** + * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a + * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the + * replies map. + * + * @param message The received message. + * @param consumerNo The consumer number within this test pinger instance. + */ + public void onMessageWithConsumerNo(Message message, int consumerNo) + { + // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); + try + { + long now = System.nanoTime(); + long timestamp = getTimestamp(message); + long pingTime = now - timestamp; + + // NDC.push("id" + instanceId + "/cons" + consumerNo); + + // Extract the messages correlation id. + String correlationID = message.getJMSCorrelationID(); + // log.debug("correlationID = " + correlationID); + + // int num = message.getIntProperty("MSG_NUM"); + // log.info("Message " + num + " received."); + + boolean isRedelivered = message.getJMSRedelivered(); + // log.debug("isRedelivered = " + isRedelivered); + + if (!isRedelivered) + { + // Countdown on the traffic light if there is one for the matching correlation id. + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); + + if (perCorrelationId != null) + { + CountDownLatch trafficLight = perCorrelationId.trafficLight; + + // Restart the timeout timer on every message. + perCorrelationId.timeOutStart = System.nanoTime(); + + // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); + + // Release waiting senders if there are some and using maxPending limit. + if ((_maxPendingSize > 0)) + { + // Decrement the count of sent but not yet received messages. + int unreceived = _unreceived.decrementAndGet(); + int unreceivedSize = + (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) + / (_isPubSub ? getConsumersPerDestination() : 1); + + // log.debug("unreceived = " + unreceived); + // log.debug("unreceivedSize = " + unreceivedSize); + + // synchronized (_sendPauseMonitor) + // { + if (unreceivedSize < _maxPendingSize) + { + _sendPauseMonitor.poll(); + } + // } + } + + // Decrement the countdown latch. Before this point, it is possible that two threads might enter this + // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block + // ensures that each thread will get a unique value for the remaining messages. + long trueCount; + long remainingCount; + + synchronized (trafficLight) + { + trafficLight.countDown(); + + trueCount = trafficLight.getCount(); + remainingCount = trueCount - 1; + + // NDC.push("/rem" + remainingCount); + + // log.debug("remainingCount = " + remainingCount); + // log.debug("trueCount = " + trueCount); + + // Commit on transaction batch size boundaries. At this point in time the waiting producer + // remains blocked, even on the last message. + // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on + // each batch boundary. For pub/sub each consumer gets every message so no division is done. + // When running in client ack mode, an ack is done instead of a commit, on the commit batch + // size boundaries. + long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); + // log.debug("commitCount = " + commitCount); + + if ((commitCount % _txBatchSize) == 0) + { + if (_consAckMode == 2) + { + // log.debug("Doing client ack for consumer " + consumerNo + "."); + message.acknowledge(); + } + else + { + // log.debug("Trying commit for consumer " + consumerNo + "."); + commitTx(_consumerSession[consumerNo]); + // log.info("Tx committed on consumer " + consumerNo); + } + } + + // Forward the message and remaining count to any interested chained message listener. + if (_chainedMessageListener != null) + { + _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime); + } + + // Check if this is the last message, in which case release any waiting producers. This is done + // after the transaction has been committed and any listeners notified. + if (trueCount == 1) + { + trafficLight.countDown(); + } + } + } + else + { + log.warn("Got unexpected message with correlationId: " + correlationID); + } + } + else + { + log.warn("Got redelivered message, ignoring."); + } + } + catch (JMSException e) + { + log.warn("There was a JMSException: " + e.getMessage(), e); + } + finally + { + // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); + // NDC.clear(); + } + } + + /** + * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out + * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify + * the correlation id. + * + * @param message The message to send. If this is null, one is generated. + * @param numPings The number of ping messages to send. + * @param timeout The timeout in milliseconds. + * @param messageCorrelationId The message correlation id. If this is null, one is generated. + * + * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait + * for all prematurely. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + * @throws InterruptedException When interrupted by a timeout + */ + public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) + throws JMSException, InterruptedException + { + /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " + + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ + + // Generate a unique correlation id to put on the messages before sending them, if one was not specified. + if (messageCorrelationId == null) + { + messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet()); + } + + try + { + // NDC.push("prod"); + + // Create a count down latch to count the number of replies with. This is created before the messages are + // sent so that the replies cannot be received before the count down is created. + // One is added to this, so that the last reply becomes a special case. The special case is that the + // chained message listener must be called before this sender can be unblocked, but that decrementing the + // countdown needs to be done before the chained listener can be called. + PerCorrelationId perCorrelationId = new PerCorrelationId(); + + perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1); + perCorrelationIds.put(messageCorrelationId, perCorrelationId); + + // Set up the current time as the start time for pinging on the correlation id. This is used to determine + // timeouts. + perCorrelationId.timeOutStart = System.nanoTime(); + + // Send the specifed number of messages. + pingNoWaitForReply(message, numPings, messageCorrelationId); + + boolean timedOut; + boolean allMessagesReceived; + int numReplies; + + do + { + // Block the current thread until replies to all the messages are received, or it times out. + perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); + + // Work out how many replies were receieved. + numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount(); + + allMessagesReceived = numReplies == getExpectedNumPings(numPings); + + // log.debug("numReplies = " + numReplies); + // log.debug("allMessagesReceived = " + allMessagesReceived); + + // Recheck the timeout condition. + long now = System.nanoTime(); + long lastMessageReceievedAt = perCorrelationId.timeOutStart; + timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000); + + // log.debug("now = " + now); + // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); + } + while (!timedOut && !allMessagesReceived); + + if ((numReplies < getExpectedNumPings(numPings)) && _verbose) + { + log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); + } + else if (_verbose) + { + log.info("Got all replies on id, " + messageCorrelationId); + } + + // commitTx(_consumerSession); + + // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); + + return numReplies; + } + // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived, + // so will be a memory leak if this is not done. + finally + { + // NDC.pop(); + perCorrelationIds.remove(messageCorrelationId); + } + } + + /** + * Sends the specified number of ping messages and does not wait for correlating replies. + * + * @param message The message to send. + * @param numPings The number of pings to send. + * @param messageCorrelationId A correlation id to place on all messages sent. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. + */ + public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException + { + /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings + + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ + + if (message == null) + { + message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); + } + + message.setJMSCorrelationID(messageCorrelationId); + + // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the + // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is + // needed. + boolean committed = false; + + // Send all of the ping messages. + for (int i = 0; i < numPings; i++) + { + // Re-timestamp the message. + // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + + // Send the message, passing in the message count. + committed = sendMessage(i, message); + + // Spew out per message timings on every message sonly in verbose mode. + /*if (_verbose) + { + log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); + }*/ + } + + // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages. + if (!committed) + { + commitTx(_producerSession); + } + } + + /** + * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of + * messages sent so far must be specified and is used to round robin the ping destinations (where there are more + * than one), and to determine if the transaction batch size has been reached and the sent messages should be + * committed. + * + * @param i The count of messages sent so far in a loop of multiple calls to this send method. + * @param message The message to send. + * + * @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise. + * + * @throws JMSException All underlyiung JMSExceptions are allowed to fall through. + */ + protected boolean sendMessage(int i, Message message) throws JMSException + { + try + { + NDC.push("id" + instanceId + "/prod"); + + // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); + // log.debug("_txBatchSize = " + _txBatchSize); + + // Round robin the destinations as the messages are sent. + Destination destination = _pingDestinations.get(i % _pingDestinations.size()); + + // Prompt the user to kill the broker when doing failover testing. + _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend); + + // Get the test setup for the correlation id. + String correlationID = message.getJMSCorrelationID(); + PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); + + // If necessary, wait until the max pending message size comes within its limit. + if (_maxPendingSize > 0) + { + synchronized (_sendPauseMonitor) + { + // Used to keep track of the number of times that send has to wait. + int numWaits = 0; + + // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with + // the test timeout. + int waitLimit = (int) (TIMEOUT_DEFAULT / 10000); + + while (true) + { + // Get the size estimate of sent but not yet received messages. + int unreceived = _unreceived.get(); + int unreceivedSize = + (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) + / (_isPubSub ? getConsumersPerDestination() : 1); + + // log.debug("unreceived = " + unreceived); + // log.debug("unreceivedSize = " + unreceivedSize); + // log.debug("_maxPendingSize = " + _maxPendingSize); + + if (unreceivedSize > _maxPendingSize) + { + // log.debug("unreceived size estimate over limit = " + unreceivedSize); + + // Fail the test if the send has had to wait more than the maximum allowed number of times. + if (numWaits > waitLimit) + { + String errorMessage = + "Send has had to wait for the unreceivedSize (" + unreceivedSize + + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit + + " times."; + log.warn(errorMessage); + throw new RuntimeException(errorMessage); + } + + // Wait on the send pause barrier for the limit to be re-established. + try + { + long start = System.nanoTime(); + // _sendPauseMonitor.wait(10000); + _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS); + long end = System.nanoTime(); + + // Count the wait only if it was for > 99% of the requested wait time. + if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99) + { + numWaits++; + } + } + catch (InterruptedException e) + { + // Restore the interrupted status + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + else + { + break; + } + } + } + } + + // Send the message either to its round robin destination, or its default destination. + // int num = numSent.incrementAndGet(); + // message.setIntProperty("MSG_NUM", num); + setTimestamp(message); + + if (destination == null) + { + _producer.send(message); + } + else + { + _producer.send(destination, message); + } + + // Increase the unreceived size, this may actually happen after the message is received. + // The unreceived size is incremented by the number of consumers that will get a copy of the message, + // in pub/sub mode. + if (_maxPendingSize > 0) + { + int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); + // log.debug("newUnreceivedCount = " + newUnreceivedCount); + } + + // Apply message rate throttling if a rate limit has been set up. + if (_rateLimiter != null) + { + _rateLimiter.throttle(); + } + + // Call commit every time the commit batch size is reached. + boolean committed = false; + + // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. + if (((i + 1) % _txBatchSize) == 0) + { + // log.debug("Trying commit on producer session."); + committed = commitTx(_producerSession); + } + + return committed; + } + finally + { + NDC.clear(); + } + } + + /** + * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the + * test that the failure has occurred, before the method returns. + * + * @param failFlag The fail flag to test. + * + * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only + * used once, then reset. + */ + private boolean waitForUserToPromptOnFailure(boolean failFlag) + { + if (failFlag) + { + if (_failOnce) + { + failFlag = false; + } + + // log.debug("Failing Before Send"); + waitForUser(KILL_BROKER_PROMPT); + } + + return failFlag; + } + + /** + * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch + * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will + * terminate the pinger. + */ + public void pingLoop() + { + try + { + // Generate a sample message and time stamp it. + Message msg = getTestMessage(_replyDestination, _messageSize, _persistent); + // setTimestamp(msg); + + // Send the message and wait for a reply. + pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null); + } + catch (JMSException e) + { + _publish = false; + // log.debug("There was a JMSException: " + e.getMessage(), e); + } + catch (InterruptedException e) + { + _publish = false; + // log.debug("There was an interruption: " + e.getMessage(), e); + } + } + + /** + * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set + * here. + * + * @param messageListener The chained message listener. + */ + public void setChainedMessageListener(ChainedMessageListener messageListener) + { + _chainedMessageListener = messageListener; + } + + /** Removes any chained message listeners from this pinger. */ + public void removeChainedMessageListener() + { + _chainedMessageListener = null; + } + + /** + * Generates a test message of the specified size, with the specified reply-to destination and persistence flag. + * + * @param replyQueue The reply-to destination for the message. + * @param messageSize The desired size of the message in bytes. + * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise. + * + * @return A freshly generated test message. + * + * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. + */ + public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException + { + // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); + return TestUtils.createTestMessageOfSize(_producerSession, messageSize); + } + + /** + * Sets the current time in nanoseconds as the timestamp on the message. + * + * @param msg The message to timestamp. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + protected void setTimestamp(Message msg) throws JMSException + { + /*if (((AMQSession)_producerSession).isStrictAMQP()) + { + ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime()); + } + else + {*/ + msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); + // } + } + + /** + * Extracts the nanosecond timestamp from a message. + * + * @param msg The message to extract the time stamp from. + * + * @return The timestamp in nanos. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + protected long getTimestamp(Message msg) throws JMSException + { + /*if (((AMQSession)_producerSession).isStrictAMQP()) + { + Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME)); + + return (value == null) ? 0L : value; + } + else + {*/ + return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); + // } + } + + /** + * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag + * has been cleared. + */ + public void stop() + { + _publish = false; + } + + /** + * Starts the producer and consumer connections. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + public void start() throws JMSException + { + // log.debug("public void start(): called"); + + _connection.start(); + // log.debug("Producer started."); + + for (int i = 0; i < _noOfConsumers; i++) + { + _consumerConnection[i].start(); + // log.debug("Consumer " + i + " started."); + } + } + + /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ + public void run() + { + // Keep running until the publish flag is cleared. + while (_publish) + { + pingLoop(); + } + } + + /** + * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the + * connection, this clears the publish flag which in turn will halt the ping loop. + * + * @param e The exception that triggered this callback method. + */ + public void onException(JMSException e) + { + // log.debug("public void onException(JMSException e = " + e + "): called", e); + _publish = false; + } + + /** + * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered + * with the runtime system as a shutdown hook. + * + * @return A shutdown hook for the ping loop. + */ + public Thread getShutdownHook() + { + return new Thread(new Runnable() + { + public void run() + { + stop(); + } + }); + } + + /** + * Closes all of the producer and consumer connections. + * + * @throws JMSException All JMSException are allowed to fall through. + */ + public void close() throws JMSException + { + // log.debug("public void close(): called"); + + try + { + if (_connection != null) + { + // log.debug("Before close producer connection."); + _connection.close(); + // log.debug("Closed producer connection."); + } + + for (int i = 0; i < _noOfConsumers; i++) + { + if (_consumerConnection[i] != null) + { + // log.debug("Before close consumer connection " + i + "."); + _consumerConnection[i].close(); + // log.debug("Closed consumer connection " + i + "."); + } + } + } + finally + { + _connection = null; + _producerSession = null; + _consumerSession = null; + _consumerConnection = null; + _producer = null; + _consumer = null; + _pingDestinations = null; + _replyDestination = null; + } + } + + /** + * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a + * transactional controlSession, this method does nothing (unless the failover after send flag is set). + * + * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is + * applied. This flag applies whether the pinger is transactional or not. + * + * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit + * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the + * commit is applied. These flags will only apply if using a transactional pinger. + * + * @param session The controlSession to commit + * + * @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not. + * + * @throws javax.jms.JMSException If the commit fails and then the rollback fails. + * + * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit + * method, because commits only apply to transactional pingers, but fail after send applied to transactional and + * non-transactional alike. + */ + protected boolean commitTx(Session session) throws JMSException + { + // log.debug("protected void commitTx(Session session): called"); + + boolean committed = false; + + _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend); + + if (session.getTransacted()) + { + // log.debug("Session is transacted."); + + try + { + _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit); + + long start = System.nanoTime(); + session.commit(); + committed = true; + // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); + + _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit); + + // log.debug("Session Commited."); + } + catch (JMSException e) + { + // log.debug("JMSException on commit:" + e.getMessage(), e); + + try + { + session.rollback(); + // log.debug("Message rolled back."); + } + catch (JMSException jmse) + { + // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); + + // Both commit and rollback failed. Throw the rollback exception. + throw jmse; + } + } + } + + return committed; + } + + /** + * Outputs a prompt to the console and waits for the user to press return. + * + * @param prompt The prompt to display on the console. + */ + public void waitForUser(String prompt) + { + System.out.println(prompt); + + try + { + System.in.read(); + } + catch (IOException e) + { + // Ignored. + } + + System.out.println("Continuing."); + } + + /** + * Gets the number of consumers that are listening to each destination in the test. + * + * @return int The number of consumers subscribing to each topic. + */ + public int getConsumersPerDestination() + { + return _noOfConsumers; + } + + /** + * Calculates how many pings are expected to be received for the given number sent. + * + * @param numpings The number of pings that will be sent. + * + * @return The number that should be received, for the test to pass. + */ + public int getExpectedNumPings(int numpings) + { + // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called"); + + // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers."); + + return numpings * (_isPubSub ? getConsumersPerDestination() : 1); + } + + /** + * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link + * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link + * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of + * messages with that correlation id. + * + * <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be + * given unique message counts. It will always be called while the producer waiting for all messages to arrive is + * still blocked. + */ + public static interface ChainedMessageListener + { + /** + * Notifies interested listeners about message arrival and important test stats, the number of messages + * remaining in the test, and the messages send timestamp. + * + * @param message The newly arrived message. + * @param remainingCount The number of messages left to complete the test. + * @param latency The nanosecond latency of the message. + * + * @throws JMSException Any JMS exceptions is allowed to fall through. + */ + public void onMessage(Message message, int remainingCount, long latency) throws JMSException; + } + + /** + * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be + * added to this: read/write lock to make onMessage more concurrent as described in class header comment. + */ + protected static class PerCorrelationId + { + /** Holds a countdown on number of expected messages. */ + CountDownLatch trafficLight; + + /** Holds the last timestamp that the timeout was reset to. */ + Long timeOutStart; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java index 2610b32220..009254c612 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -1,251 +1,251 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.requestreply;
-
-import junit.framework.Assert;
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.junit.extensions.AsymptoticTestCase;
-import org.apache.qpid.junit.extensions.util.ParsedProperties;
-import org.apache.qpid.junit.extensions.util.TestContextProperties;
-
-import javax.jms.*;
-
-/**
- * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
- * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from
- * a producer to a conumer, then the consumer replies to the message on a temporary queue.
- *
- * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number
- * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled
- * up using a suitable JUnit test runner. See {@link org.apache.qpid.junit.extensions.TKTestRunner} for more
- * information on how to do this.
- *
- * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
- * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads
- * gets its own connection/producer/consumer, this is only re-established if the connection is lost.
- *
- * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
- * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come
- * back on the temporary queue.
- *
- * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * </table>
- */
-public class PingPongTestPerf extends AsymptoticTestCase
-{
- private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
-
- /** Thread local to hold the per-thread test setup fields. */
- ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
-
- // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
- // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
- // of the test parameters to log with the results. It also providers some basic type parsing convenience methods.
- // private Properties testParameters = System.getProperties();
- private ParsedProperties testParameters =
- TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/);
-
- public PingPongTestPerf(String name)
- {
- super(name);
-
- _logger.debug(testParameters);
-
- // Sets up the test parameters with defaults.
- /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT));
- testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME,
- PingPongProducer.PING_QUEUE_NAME_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME,
- Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME,
- Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME,
- Boolean.toString(PingPongProducer.VERBOSE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME,
- Boolean.toString(PingPongProducer.PUBSUB_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME,
- Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME,
- Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME,
- PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME,
- PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME,
- PingPongProducer.FAIL_AFTER_SEND_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME,
- PingPongProducer.FAIL_BEFORE_SEND_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT);
- testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME,
- Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME,
- Integer.toString(PingPongProducer.ACK_MODE_DEFAULT));
- testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME,
- PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/
- }
-
- /**
- * Compile all the tests into a test suite.
- */
- public static Test suite()
- {
- // Build a new test suite
- TestSuite suite = new TestSuite("Ping-Pong Performance Tests");
-
- // Run performance tests in read committed mode.
- suite.addTest(new PingPongTestPerf("testPingPongOk"));
-
- return suite;
- }
-
- private static void setSystemPropertyIfNull(String propName, String propValue)
- {
- if (System.getProperty(propName) == null)
- {
- System.setProperty(propName, propValue);
- }
- }
-
- public void testPingPongOk(int numPings) throws Exception
- {
- // Get the per thread test setup to run the test through.
- PerThreadSetup perThreadSetup = threadSetup.get();
-
- // Generate a sample message. This message is already time stamped and has its reply-to destination set.
- Message msg =
- perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0),
- testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME),
- testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME));
-
- // Send the message and wait for a reply.
- int numReplies =
- perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null);
-
- // Fail the test if the timeout was exceeded.
- if (numReplies != numPings)
- {
- Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings);
- }
- }
-
- /**
- * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
- */
- public void threadSetUp()
- {
- try
- {
- PerThreadSetup perThreadSetup = new PerThreadSetup();
-
- // Extract the test set up paramaeters.
- String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
- String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
- String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
- String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
- String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
- boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
- boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
- String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME);
- boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME);
- boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME);
-
- synchronized (this)
- {
- // Establish a bounce back client on the ping queue to bounce back the pings.
- perThreadSetup._testPingBouncer =
- new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent,
- transacted, selector, verbose, pubsub);
-
- // Start the connections for client and producer running.
- perThreadSetup._testPingBouncer.getConnection().start();
-
- // Establish a ping-pong client on the ping queue to send the pings and receive replies with.
- perThreadSetup._testPingProducer = new PingPongProducer(testParameters);
- perThreadSetup._testPingProducer.establishConnection(true, true);
- perThreadSetup._testPingProducer.start();
- }
-
- // Attach the per-thread set to the thread.
- threadSetup.set(perThreadSetup);
- }
- catch (Exception e)
- {
- _logger.warn("There was an exception during per thread setup.", e);
- }
- }
-
- /**
- * Performs test fixture clean
- */
- public void threadTearDown()
- {
- _logger.debug("public void threadTearDown(): called");
-
- try
- {
- // Get the per thread test fixture.
- PerThreadSetup perThreadSetup = threadSetup.get();
-
- // Close the pingers so that it cleans up its connection cleanly.
- synchronized (this)
- {
- perThreadSetup._testPingProducer.close();
- // perThreadSetup._testPingBouncer.close();
- }
-
- // Ensure the per thread fixture is reclaimed.
- threadSetup.remove();
- }
- catch (JMSException e)
- {
- _logger.warn("There was an exception during per thread tear down.");
- }
- }
-
- protected static class PerThreadSetup
- {
- /**
- * Holds the test ping-pong producer.
- */
- private PingPongProducer _testPingProducer;
-
- /**
- * Holds the test ping client.
- */
- private PingPongBouncer _testPingBouncer;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.requestreply; + +import junit.framework.Assert; +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; + +import org.apache.qpid.junit.extensions.AsymptoticTestCase; +import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.junit.extensions.util.TestContextProperties; + +import javax.jms.*; + +/** + * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run + * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from + * a producer to a conumer, then the consumer replies to the message on a temporary queue. + * + * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of the number + * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled + * up using a suitable JUnit test runner. See {@link org.apache.qpid.junit.extensions.TKTestRunner} for more + * information on how to do this. + * + * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a + * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads + * gets its own connection/producer/consumer, this is only re-established if the connection is lost. + * + * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that + * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come + * back on the temporary queue. + * + * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * </table> + */ +public class PingPongTestPerf extends AsymptoticTestCase +{ + private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>(); + + // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in + // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner + // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. + // private Properties testParameters = System.getProperties(); + private ParsedProperties testParameters = + TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); + + public PingPongTestPerf(String name) + { + super(name); + + _logger.debug(testParameters); + + // Sets up the test parameters with defaults. + /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, + Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT)); + testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, + PingPongProducer.PING_QUEUE_NAME_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, + Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, + Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, + Boolean.toString(PingPongProducer.VERBOSE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, + Boolean.toString(PingPongProducer.PUBSUB_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, + Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, + Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, + PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, + PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, + PingPongProducer.FAIL_AFTER_SEND_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, + PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); + testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, + Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, + Integer.toString(PingPongProducer.ACK_MODE_DEFAULT)); + testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, + PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ + } + + /** + * Compile all the tests into a test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Ping-Pong Performance Tests"); + + // Run performance tests in read committed mode. + suite.addTest(new PingPongTestPerf("testPingPongOk")); + + return suite; + } + + private static void setSystemPropertyIfNull(String propName, String propValue) + { + if (System.getProperty(propName) == null) + { + System.setProperty(propName, propValue); + } + } + + public void testPingPongOk(int numPings) throws Exception + { + // Get the per thread test setup to run the test through. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Generate a sample message. This message is already time stamped and has its reply-to destination set. + Message msg = + perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), + testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), + testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); + + // Send the message and wait for a reply. + int numReplies = + perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null); + + // Fail the test if the timeout was exceeded. + if (numReplies != numPings) + { + Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings); + } + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + try + { + PerThreadSetup perThreadSetup = new PerThreadSetup(); + + // Extract the test set up paramaeters. + String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME); + String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); + String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); + String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME); + String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); + boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); + boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); + String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); + boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); + boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME); + + synchronized (this) + { + // Establish a bounce back client on the ping queue to bounce back the pings. + perThreadSetup._testPingBouncer = + new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent, + transacted, selector, verbose, pubsub); + + // Start the connections for client and producer running. + perThreadSetup._testPingBouncer.getConnection().start(); + + // Establish a ping-pong client on the ping queue to send the pings and receive replies with. + perThreadSetup._testPingProducer = new PingPongProducer(testParameters); + perThreadSetup._testPingProducer.establishConnection(true, true); + perThreadSetup._testPingProducer.start(); + } + + // Attach the per-thread set to the thread. + threadSetup.set(perThreadSetup); + } + catch (Exception e) + { + _logger.warn("There was an exception during per thread setup.", e); + } + } + + /** + * Performs test fixture clean + */ + public void threadTearDown() + { + _logger.debug("public void threadTearDown(): called"); + + try + { + // Get the per thread test fixture. + PerThreadSetup perThreadSetup = threadSetup.get(); + + // Close the pingers so that it cleans up its connection cleanly. + synchronized (this) + { + perThreadSetup._testPingProducer.close(); + // perThreadSetup._testPingBouncer.close(); + } + + // Ensure the per thread fixture is reclaimed. + threadSetup.remove(); + } + catch (JMSException e) + { + _logger.warn("There was an exception during per thread tear down."); + } + } + + protected static class PerThreadSetup + { + /** + * Holds the test ping-pong producer. + */ + private PingPongProducer _testPingProducer; + + /** + * Holds the test ping client. + */ + private PingPongBouncer _testPingBouncer; + } +} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java b/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java index f699295b06..0fcb0a8538 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/test/testcases/MessageThroughputPerf.java @@ -1,199 +1,199 @@ -/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.test.testcases;
-
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
-import org.apache.log4j.Logger;
-import org.apache.log4j.NDC;
-
-import org.apache.qpid.test.framework.Assertion;
-import org.apache.qpid.test.framework.Circuit;
-import org.apache.qpid.test.framework.FrameworkBaseCase;
-import org.apache.qpid.test.framework.MessagingTestConfigProperties;
-import org.apache.qpid.test.framework.sequencers.CircuitFactory;
-
-import org.apache.qpid.junit.extensions.TestThreadAware;
-import org.apache.qpid.junit.extensions.TimingController;
-import org.apache.qpid.junit.extensions.TimingControllerAware;
-import org.apache.qpid.junit.extensions.util.ParsedProperties;
-import org.apache.qpid.junit.extensions.util.TestContextProperties;
-
-import java.util.LinkedList;
-
-/**
- * MessageThroughputPerf runs a test over a {@link Circuit} controlled by the test parameters. It logs timings of
- * the time required to receive samples consisting of batches of messages.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Measure message throughput accross a test circuit. <td> {@link Circuit}
- * </table>
- *
- * @todo Check that all of the messages were sent. Check that the receiving end got the same number of messages as
- * the publishing end.
- *
- * @todo Set this up to run with zero sized tests. Size zero means send forever. Continuous sending to be interrupted
- * by completion of the test duration, or shutdown hook when the user presses Ctrl-C.
- */
-public class MessageThroughputPerf extends FrameworkBaseCase implements TimingControllerAware, TestThreadAware
-{
- /** Used for debugging. */
- private static final Logger log = Logger.getLogger(MessageThroughputPerf.class);
-
- /** Holds the timing controller, used to log test timings from self-timed tests. */
- private TimingController timingController;
-
- /** Thread local to hold the per-thread test setup fields. */
- ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>();
-
- /**
- * Creates a new test case with the specified name.
- *
- * @param name The test case name.
- */
- public MessageThroughputPerf(String name)
- {
- super(name);
- }
-
- /**
- * Performs the a basic P2P test case.
- *
- * @param numMessages The number of messages to send in the test.
- */
- public void testThroughput(int numMessages)
- {
- log.debug("public void testThroughput(): called");
-
- PerThreadSetup setup = threadSetup.get();
- assertNoFailures(setup.testCircuit.test(numMessages, new LinkedList<Assertion>()));
- }
-
- /**
- * Should provide a translation from the junit method name of a test to its test case name as known to the test
- * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test
- * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case
- * name "TC2_BasicP2P".
- *
- * @param methodName The name of the JUnit test method.
- *
- * @return The name of the corresponding interop test case.
- */
- public String getTestCaseNameForTestMethod(String methodName)
- {
- log.debug("public String getTestCaseNameForTestMethod(String methodName = " + methodName + "): called");
-
- return "DEFAULT_CIRCUIT_TEST";
- }
-
- /**
- * Used by test runners that can supply a {@link org.apache.qpid.junit.extensions.TimingController} to set the
- * controller on an aware test.
- *
- * @param controller The timing controller.
- */
- public void setTimingController(TimingController controller)
- {
- timingController = controller;
- }
-
- /**
- * Overrides the parent setUp method so that the in-vm broker creation is not done on a per test basis.
- *
- * @throws Exception Any exceptions allowed to fall through and fail the test.
- */
- protected void setUp() throws Exception
- {
- NDC.push(getName());
-
- testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
- }
-
- /**
- * Overrides the parent setUp method so that the in-vm broker clean-up is not done on a per test basis.
- */
- protected void tearDown()
- {
- NDC.pop();
- }
-
- /**
- * Performs test fixture creation on a per thread basis. This will only be called once for each test thread.
- */
- public void threadSetUp()
- {
- // Run the test setup tasks. This may create an in-vm broker, if a decorator has injected a task for this.
- taskHandler.runSetupTasks();
-
- // Get the test parameters, any overrides on the command line will have been applied.
- ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults);
-
- // Customize the test parameters.
- testProps.setProperty("TEST_NAME", "DEFAULT_CIRCUIT_TEST");
- testProps.setProperty(MessagingTestConfigProperties.SEND_DESTINATION_NAME_ROOT_PROPNAME, "testqueue");
-
- // Get the test circuit factory to create test circuits and run the standard test procedure through.
- CircuitFactory circuitFactory = getCircuitFactory();
-
- // Create the test circuit. This projects the circuit onto the available test nodes and connects it up.
- Circuit testCircuit = circuitFactory.createCircuit(testProps);
-
- // Store the test configuration for the thread.
- PerThreadSetup setup = new PerThreadSetup();
- setup.testCircuit = testCircuit;
- threadSetup.set(setup);
- }
-
- /**
- * Called when a test thread is destroyed.
- */
- public void threadTearDown()
- {
- // Run the test teardown tasks. This may destroy the in-vm broker, if a decorator has injected a task for this.
- taskHandler.runSetupTasks();
- }
-
- /**
- * Holds the per-thread test configurations.
- */
- protected static class PerThreadSetup
- {
- /** Holds the test circuit to run tests on. */
- Circuit testCircuit;
- }
-
- /**
- * Compiles all the tests in this class into a suite.
- *
- * @return The test suite.
- */
- public static Test suite()
- {
- // Build a new test suite
- TestSuite suite = new TestSuite("Qpid Throughput Performance Tests");
-
- suite.addTest(new MessageThroughputPerf("testThroughput"));
-
- return suite;
- }
-}
+/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.test.testcases; + +import junit.framework.Test; +import junit.framework.TestSuite; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; + +import org.apache.qpid.test.framework.Assertion; +import org.apache.qpid.test.framework.Circuit; +import org.apache.qpid.test.framework.FrameworkBaseCase; +import org.apache.qpid.test.framework.MessagingTestConfigProperties; +import org.apache.qpid.test.framework.sequencers.CircuitFactory; + +import org.apache.qpid.junit.extensions.TestThreadAware; +import org.apache.qpid.junit.extensions.TimingController; +import org.apache.qpid.junit.extensions.TimingControllerAware; +import org.apache.qpid.junit.extensions.util.ParsedProperties; +import org.apache.qpid.junit.extensions.util.TestContextProperties; + +import java.util.LinkedList; + +/** + * MessageThroughputPerf runs a test over a {@link Circuit} controlled by the test parameters. It logs timings of + * the time required to receive samples consisting of batches of messages. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Measure message throughput accross a test circuit. <td> {@link Circuit} + * </table> + * + * @todo Check that all of the messages were sent. Check that the receiving end got the same number of messages as + * the publishing end. + * + * @todo Set this up to run with zero sized tests. Size zero means send forever. Continuous sending to be interrupted + * by completion of the test duration, or shutdown hook when the user presses Ctrl-C. + */ +public class MessageThroughputPerf extends FrameworkBaseCase implements TimingControllerAware, TestThreadAware +{ + /** Used for debugging. */ + private static final Logger log = Logger.getLogger(MessageThroughputPerf.class); + + /** Holds the timing controller, used to log test timings from self-timed tests. */ + private TimingController timingController; + + /** Thread local to hold the per-thread test setup fields. */ + ThreadLocal<PerThreadSetup> threadSetup = new ThreadLocal<PerThreadSetup>(); + + /** + * Creates a new test case with the specified name. + * + * @param name The test case name. + */ + public MessageThroughputPerf(String name) + { + super(name); + } + + /** + * Performs the a basic P2P test case. + * + * @param numMessages The number of messages to send in the test. + */ + public void testThroughput(int numMessages) + { + log.debug("public void testThroughput(): called"); + + PerThreadSetup setup = threadSetup.get(); + assertNoFailures(setup.testCircuit.test(numMessages, new LinkedList<Assertion>())); + } + + /** + * Should provide a translation from the junit method name of a test to its test case name as known to the test + * clients that will run the test. The purpose of this is to convert the JUnit method name into the correct test + * case name to place into the test invite. For example the method "testP2P" might map onto the interop test case + * name "TC2_BasicP2P". + * + * @param methodName The name of the JUnit test method. + * + * @return The name of the corresponding interop test case. + */ + public String getTestCaseNameForTestMethod(String methodName) + { + log.debug("public String getTestCaseNameForTestMethod(String methodName = " + methodName + "): called"); + + return "DEFAULT_CIRCUIT_TEST"; + } + + /** + * Used by test runners that can supply a {@link org.apache.qpid.junit.extensions.TimingController} to set the + * controller on an aware test. + * + * @param controller The timing controller. + */ + public void setTimingController(TimingController controller) + { + timingController = controller; + } + + /** + * Overrides the parent setUp method so that the in-vm broker creation is not done on a per test basis. + * + * @throws Exception Any exceptions allowed to fall through and fail the test. + */ + protected void setUp() throws Exception + { + NDC.push(getName()); + + testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + } + + /** + * Overrides the parent setUp method so that the in-vm broker clean-up is not done on a per test basis. + */ + protected void tearDown() + { + NDC.pop(); + } + + /** + * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. + */ + public void threadSetUp() + { + // Run the test setup tasks. This may create an in-vm broker, if a decorator has injected a task for this. + taskHandler.runSetupTasks(); + + // Get the test parameters, any overrides on the command line will have been applied. + ParsedProperties testProps = TestContextProperties.getInstance(MessagingTestConfigProperties.defaults); + + // Customize the test parameters. + testProps.setProperty("TEST_NAME", "DEFAULT_CIRCUIT_TEST"); + testProps.setProperty(MessagingTestConfigProperties.SEND_DESTINATION_NAME_ROOT_PROPNAME, "testqueue"); + + // Get the test circuit factory to create test circuits and run the standard test procedure through. + CircuitFactory circuitFactory = getCircuitFactory(); + + // Create the test circuit. This projects the circuit onto the available test nodes and connects it up. + Circuit testCircuit = circuitFactory.createCircuit(testProps); + + // Store the test configuration for the thread. + PerThreadSetup setup = new PerThreadSetup(); + setup.testCircuit = testCircuit; + threadSetup.set(setup); + } + + /** + * Called when a test thread is destroyed. + */ + public void threadTearDown() + { + // Run the test teardown tasks. This may destroy the in-vm broker, if a decorator has injected a task for this. + taskHandler.runSetupTasks(); + } + + /** + * Holds the per-thread test configurations. + */ + protected static class PerThreadSetup + { + /** Holds the test circuit to run tests on. */ + Circuit testCircuit; + } + + /** + * Compiles all the tests in this class into a suite. + * + * @return The test suite. + */ + public static Test suite() + { + // Build a new test suite + TestSuite suite = new TestSuite("Qpid Throughput Performance Tests"); + + suite.addTest(new MessageThroughputPerf("testThroughput")); + + return suite; + } +} |
