diff options
Diffstat (limited to 'java/perftests')
10 files changed, 1312 insertions, 261 deletions
diff --git a/java/perftests/bin/testPingClient.sh b/java/perftests/bin/testPingClient.sh index d819584999..4eca4a7999 100755 --- a/java/perftests/bin/testPingClient.sh +++ b/java/perftests/bin/testPingClient.sh @@ -30,4 +30,4 @@ echo $thehosts # XXX -Xms1024m -XX:NewSize=300m . ./setupclasspath.sh echo $CP -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.pingpong.TestPingClient $thehosts guest guest /test "$@" +$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.ping.TestPingClient $thehosts guest guest /test "$@" diff --git a/java/perftests/bin/testPingProducer.sh b/java/perftests/bin/testPingProducer.sh index 72c573780a..39ab487b60 100755 --- a/java/perftests/bin/testPingProducer.sh +++ b/java/perftests/bin/testPingProducer.sh @@ -30,4 +30,4 @@ echo $thehosts # XXX -Xms1024m -XX:NewSize=300m . ./setupclasspath.sh echo $CP -$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.pingpong.TestPingProducer $thehosts /test +$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.ping.TestPingProducer $thehosts /test diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml index bfa24d99a3..874cbd3e52 100644 --- a/java/perftests/pom.xml +++ b/java/perftests/pom.xml @@ -49,20 +49,16 @@ <artifactId>log4j</artifactId> </dependency> - <!-- Here JUnit is a deliberate 'compile' scope dependency, it will be packaged with the perf testing tools. --> + <!-- Test dependencies. --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <scope>compile</scope> </dependency> - <!-- Will be added to maven repo soon. JUnit test runner that can add repeats/concurrenct/timing etc to tests. <dependency> <groupId>uk.co.thebadgerset</groupId> <artifactId>junit-toolkit</artifactId> - <scope>compile</scope> </dependency> - --> </dependencies> @@ -80,6 +76,85 @@ </configuration> </plugin> + <!-- The JUnit Toolkit maven2 plugin is in the process of being added to the maven repository. It will take a day or two from 16/1/2007. + + Configures the toolkit test runner for performance testing. These can be run from within maven, or by using the generated + scripts. + + To run from maven: + + mvn uk.co.thebadgerset:junit-toolkit-maven-plugin:tktest + + To run from the command line (after doing assembly:assembly goal): + + java -cp target/test_jar-jar-with-dependencies.jar uk.co.thebadgerset.junit.extensions.OldTKTestRunner -s 1 -r 100000 -o target org.apache.qpid.requestreply.PingPongTestPerf + + To generate the scripts do: + + mvn uk.co.thebadgerset:junit-toolkit-maven-plugin:scripts + + Then to run the scripts do (after doing assembly:assembly goal): + + ./bin/script_name or ./bin/script_name.bat + + These scripts can find everything in the 'all test dependencies' jar. + --> + <!-- + <plugin> + <groupId>uk.co.thebadgerset</groupId> + <artifactId>junit-toolkit-maven-plugin</artifactId> + <version>0.3</version> + + <configuration> + <commands> + <!## Run the ping pong test once. This is just to check toolkit test runner is working. Real tests to follow. ##> + <param>-s 1 -r 1 -o target org.apache.qpid.requestreply.PingPongTestPerf</param> + <!## Add more here... <param> ... </param> ##> + + <!## A little bit of work needs to be done on TKTestRunner before the following syntax works. See Javadoc for details. ##> + <!## Thread ramp up: <param>-c [1,5,10,50,100] -r 1000 -o target org.apache.qpid.requestreply.PingPongTestPerf</param> ##> + <!## Run for a length of time: <param>-d1H -o target org.apache.qpid.requestreply.PingPongTestPerf</param> ##> + <!## Configure test parameters: <param>pingQueue=myping persistent=true transacted=true messageSize=1000 -d10M ... </param> ##> + <!## And so on. ##> + </commands> + </configuration> + + <executions> + <execution> + <phase>test</phase> + <!##<goals> + <goal>tktest</goal> + </goals>##> + </execution> + </executions> + </plugin> + --> + + <!-- Bundles all the dependencies, fully expanded into a single jar, required to run the tests. + + Usefull when bundling system, integration or performance tests into a convenient + package to hand over to testers. To use it run: + + java -cp target/your_app_name-all-test-deps.jar path.to.your.Class + + or often: + + java -cp target/your_app_name-all-test-deps.jar junit.framework.textui.TestRunner path.to.your.test.Class + + or other JUnit test runner invocations. + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptors> + <descriptor>jar-with-dependencies.xml</descriptor> + </descriptors> + <outputDirectory>target</outputDirectory> + <workDirectory>target/assembly/work</workDirectory> + </configuration> + </plugin> + </plugins> <!-- Include source files in built jar --> diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java new file mode 100644 index 0000000000..3c1a476d51 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java @@ -0,0 +1,70 @@ +package org.apache.qpid.ping;
+
+import javax.jms.JMSException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.jms.Session;
+
+/**
+ * Provides functionality common to all ping clients. Provides the ability to manage a session and a convenience method
+ * to commit on the current transaction.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Commit the current transcation.
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public abstract class AbstractPingClient
+{
+ private static final Logger _logger = Logger.getLogger(TestPingClient.class);
+
+ /** Used to keep a handle on the JMS session to send replies using. */
+ protected Session _session;
+
+ /**
+ * Creates an abstract ping client to manage the specified transcation.
+ *
+ * @param session The session.
+ */
+ public AbstractPingClient(Session session)
+ {
+ _session = session;
+ }
+
+ /**
+ * Convenience method to commit the transaction on the session associated with this bounce back client.
+ *
+ * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ */
+ protected void commitTx() throws JMSException
+ {
+ if (_session.getTransacted())
+ {
+ try
+ {
+ _session.commit();
+ _logger.trace("Session Commited.");
+ }
+ catch (JMSException e)
+ {
+ _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+ try
+ {
+ _session.rollback();
+ _logger.debug("Message rolled back.");
+ }
+ catch (JMSException jmse)
+ {
+ _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+ // Both commit and rollback failed. Throw the rollback exception.
+ throw jmse;
+ }
+ }
+ }
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java new file mode 100644 index 0000000000..e2c2d5b440 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java @@ -0,0 +1,193 @@ +package org.apache.qpid.ping;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.jms.Session;
+
+/**
+ * This abstract class captures functionality that is common to all ping producers. It provides functionality to
+ * manage a session, and a convenience method to commit a transaction on the session. It also provides a framework
+ * for running a ping loop, and terminating that loop on exceptions or a shutdown handler.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Manage session.
+ * <tr><td> Provide clean shutdown on exception or shutdown hook.
+ * <tr><td> Provide useable shutdown hook implementation.
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public abstract class AbstractPingProducer implements Runnable, ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class);
+
+ /** Holds the current Qpid session to send and receive pings on. */
+ protected Session _session;
+
+ /** Used to tell the ping loop when to terminate, it only runs while this is true. */
+ protected boolean _publish = true;
+
+ /**
+ * Creates an AbstractPingProducer on a session.
+ */
+ public AbstractPingProducer(Session session)
+ {
+ _session = session;
+ }
+
+ /**
+ * Generates a test message of the specified size.
+ *
+ * @param session The Qpid session under which to generate the message.
+ * @param replyQueue The reply-to destination for the message.
+ * @param messageSize The desired size of the message in bytes.
+ * @param currentTime The timestamp to add to the message as a "timestamp" property.
+ *
+ * @return A freshly generated test message.
+ *
+ * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
+ */
+ public static ObjectMessage getTestMessage(Session session, Queue replyQueue, int messageSize, long currentTime,
+ boolean persistent) throws JMSException
+ {
+ ObjectMessage msg;
+
+ if (messageSize != 0)
+ {
+ msg = TestMessageFactory.newObjectMessage(session, messageSize);
+ }
+ else
+ {
+ msg = session.createObjectMessage();
+ }
+
+ // Set the messages persistent delivery flag.
+ msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ // Timestamp the message.
+ msg.setLongProperty("timestamp", currentTime);
+
+ // Ensure that the temporary reply queue is set as the reply to destination for the message.
+ if (replyQueue != null)
+ {
+ msg.setJMSReplyTo(replyQueue);
+ }
+
+ return msg;
+ }
+
+ /**
+ * Convenience method for a short pause.
+ *
+ * @param sleepTime The time in milliseconds to pause for.
+ */
+ public static void pause(long sleepTime)
+ {
+ if (sleepTime > 0)
+ {
+ try
+ {
+ Thread.sleep(sleepTime);
+ }
+ catch (InterruptedException ie)
+ { }
+ }
+ }
+
+ public abstract void pingLoop();
+
+ /**
+ * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
+ * flag has been cleared.
+ */
+ public void stop()
+ {
+ _publish = false;
+ }
+
+ /**
+ * Implements a ping loop that repeatedly pings until the publish flag becomes false.
+ */
+ public void run()
+ {
+ // Keep running until the publish flag is cleared.
+ while (_publish)
+ {
+ pingLoop();
+ }
+ }
+
+ /**
+ * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
+ * connection, this clears the publish flag which in turn will halt the ping loop.
+ *
+ * @param e The exception that triggered this callback method.
+ */
+ public void onException(JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+
+ /**
+ * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
+ * with the runtime system as a shutdown hook.
+ *
+ * @return A shutdown hook for the ping loop.
+ */
+ public Thread getShutdownHook()
+ {
+ return new Thread(new Runnable()
+ {
+ public void run()
+ {
+ stop();
+ }
+ });
+ }
+
+ /**
+ * Convenience method to commit the transaction on the session associated with this pinger.
+ *
+ * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ */
+ protected void commitTx() throws JMSException
+ {
+ if (_session.getTransacted())
+ {
+ try
+ {
+ _session.commit();
+ _logger.trace("Session Commited.");
+ }
+ catch (JMSException e)
+ {
+ _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+ // Warn that the bounce back client is not available.
+ if (e.getLinkedException() instanceof AMQNoConsumersException)
+ {
+ _logger.debug("No consumers on queue.");
+ }
+
+ try
+ {
+ _session.rollback();
+ _logger.trace("Message rolled back.");
+ }
+ catch (JMSException jmse)
+ {
+ _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+ // Both commit and rollback failed. Throw the rollback exception.
+ throw jmse;
+ }
+ }
+ }
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java index c96f6bd61d..3063e83127 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,112 +18,104 @@ * under the License. * */ -package org.apache.qpid.pingpong; +package org.apache.qpid.ping; + +import java.net.InetAddress; + +import javax.jms.*; import org.apache.log4j.Logger; -import org.apache.log4j.Level; + import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.jms.Session; -import javax.jms.*; -import java.net.InetAddress; - -public class TestPingClient +/** + * PingClient is a message listener that received time stamped ping messages. It can work out how long a ping took, + * provided that its clokc is synchronized to that of the ping producer, or by running it on the same machine (or jvm) + * as the ping producer. + * + * <p/>There is a verbose mode flag which causes information about each ping to be output to the console + * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should + * be disabled for real timing tests as writing to the console will slow things down. + * + * <p><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Provide command line invocation to start the ping consumer on a configurable broker url. + * </table> + */ +class TestPingClient extends AbstractPingClient implements MessageListener { private static final Logger _logger = Logger.getLogger(TestPingClient.class); - private static class TestPingMessageListener implements MessageListener + /** Used to indicate that the reply generator should log timing info to the console (logger info level). */ + private boolean _verbose = false; + + /** + * Creates a PingPongClient on the specified session. + * + * @param session The JMS Session for the ping pon client to run on. + * @param consumer The message consumer to receive the messages with. + * @param verbose If set to <tt>true</tt> will output timing information on every message. + */ + public TestPingClient(Session session, MessageConsumer consumer, boolean verbose) throws JMSException { - public TestPingMessageListener() - { - } - - long _lastTimestamp = 0L; - long _lastTimestampString = 0L; - - public void onMessage(javax.jms.Message message) - { - if (_logger.isInfoEnabled()) - { - long timestamp = 0L; - long timestampString = 0L; - - try - { - timestamp = message.getLongProperty("timestamp"); - timestampString = Long.parseLong(message.getStringProperty("timestampString")); - - if (timestampString != timestamp) - { - _logger.info("Timetamps differ!:\n" + - "timestamp:" + timestamp + "\n" + - "timestampString:" + timestampString); - } - - } - catch (JMSException jmse) - { - //ignore - } + // Hang on to the session for the replies. + super(session); - long diff = timestamp - _lastTimestamp; - _lastTimestamp = timestamp; - - long stringDiff = timestampString - _lastTimestampString; - - _lastTimestampString = timestampString; - - _logger.info("Ping: T:" + diff + "ms, TS:" + stringDiff); - - // _logger.info(_name + " got message '" + message + "\n"); - } - } + // Set this up to listen for messages on the queue. + consumer.setMessageListener(this); } + /** + * Starts a stand alone ping-pong client running in verbose mode. + * + * @param args + */ public static void main(String[] args) { - _logger.setLevel(Level.INFO); - _logger.info("Starting..."); + // Display help on the command line. if (args.length < 4) { - System.out.println("Usage: brokerdetails username password virtual-path [selector] "); + System.out.println("Usage: brokerdetails username password virtual-path [transacted] [selector]"); System.exit(1); } + + // Extract all comman line parameters. + String brokerDetails = args[0]; + String username = args[1]; + String password = args[2]; + String virtualpath = args[3]; + boolean transacted = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false; + String selector = (args.length == 6) ? args[5] : null; + try { InetAddress address = InetAddress.getLocalHost(); - AMQConnection con1 = new AMQConnection(args[0], args[1], args[2], - address.getHostName(), args[3]); + AMQConnection con1 = new AMQConnection(brokerDetails, username, password, address.getHostName(), virtualpath); _logger.info("Connected with URL:" + con1.toURL()); - - final org.apache.qpid.jms.Session session1 = (org.apache.qpid.jms.Session) - con1.createSession(false, Session.AUTO_ACKNOWLEDGE); - - String selector = null; + // Create a transactional or non-transactional session depending on the command line parameter. + Session session = null; - if (args.length == 5) + if (transacted) { - selector = args[4]; - _logger.info("Message selector is <" + selector + ">..."); + session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.SESSION_TRANSACTED); } - else + else if (!transacted) { - _logger.info("Not using message selector"); + session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE); } - Queue q = new AMQQueue("ping"); - MessageConsumer consumer1 = session1.createConsumer(q, - 1, false, false, selector); + MessageConsumer consumer = session.createConsumer(q, 1, false, false, selector); + new TestPingClient(session, consumer, true); - consumer1.setMessageListener(new TestPingMessageListener()); con1.start(); } catch (Throwable t) @@ -134,5 +126,35 @@ public class TestPingClient System.out.println("Waiting..."); } -} + /** + * This is a callback method that is notified of all messages for which this has been registered as a message + * listener on a message consumer. + * + * @param message The message that triggered this callback. + */ + public void onMessage(javax.jms.Message message) + { + try + { + // Spew out some timing information if verbose mode is on. + if (_verbose) + { + Long timestamp = message.getLongProperty("timestamp"); + + if (timestamp != null) + { + long diff = System.currentTimeMillis() - timestamp; + _logger.info("Ping time: " + diff); + } + } + + // Commit the transaction if running in transactional mode. + commitTx(); + } + catch (JMSException e) + { + _logger.debug("There was a JMSException: " + e.getMessage(), e); + } + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java index bb9e17615e..d47650d049 100644 --- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,226 +18,196 @@ * under the License. * */ -package org.apache.qpid.pingpong; +package org.apache.qpid.ping; + +import java.net.InetAddress; + +import javax.jms.*; import org.apache.log4j.Logger; + import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.AMQException; -import org.apache.qpid.url.URLSyntaxException; -import org.apache.qpid.client.AMQNoConsumersException; -import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.message.TestMessageFactory; import org.apache.qpid.jms.MessageProducer; import org.apache.qpid.jms.Session; -import javax.jms.*; -import java.net.InetAddress; -import java.net.UnknownHostException; - /** - * A client that behaves as follows: - * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li> - * <li>Creates a temporary queue</li> - * <li>Creates messages containing a property that is the name of the temporary queue</li> - * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li> - * </ul> + * PingProducer is a client that sends timestamped pings to a queue. It is designed to be run from the command line + * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session and + * configured message producer. + * + * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop + * does all its work through helper methods, so that code wishing to run a ping cycle is not forced to do so + * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is + * also registered to terminate the ping loop cleanly. + * + * <p/><table id="crc"><caption>CRC Card</caption> + * <tr><th> Responsibilities <th> Collaborations + * <tr><td> Provide a ping cycle. + * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url. + * </table> */ -public class TestPingProducer implements ExceptionListener +class TestPingProducer extends AbstractPingProducer { - private static final Logger _log = Logger.getLogger(TestPingProducer.class); - - private AMQConnection _connection; - - private static int _messageSize = 0; - private boolean _publish; - - private long SLEEP_TIME = 250L; - -// private class CallbackHandler implements MessageListener -// { -// -// private int _actualMessageCount; -// -// -// public void onMessage(Message m) -// { -// if (_log.isDebugEnabled()) -// { -// _log.debug("Message received: " + m); -// } -// _actualMessageCount++; -// if (_actualMessageCount % 1000 == 0) -// { -// _log.info("Received message count: " + _actualMessageCount); -// } -// } -// } - - public TestPingProducer(boolean TRANSACTED, String brokerDetails, String clientID, - String virtualpath) throws AMQException, URLSyntaxException - { - try - { - createConnection(brokerDetails, clientID, virtualpath); + private static final Logger _logger = Logger.getLogger(TestPingProducer.class); - Session session; + /** Used to set up a default message size. */ + private static final int DEFAULT_MESSAGE_SIZE = 0; - if (TRANSACTED) - { - session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED); - } - else - { - session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } + /** Used to define how long to wait between pings. */ + private static final long SLEEP_TIME = 250; - String queue = "ping"; - AMQQueue destination = new AMQQueue(queue); - MessageProducer producer = (MessageProducer) session.createProducer(destination); + /** Used to define how long to wait before assuming that a ping has timed out. */ + private static final long TIMEOUT = 3000; - _connection.setExceptionListener(this); + /** Holds the name of the queue to send pings on. */ + private static final String PING_QUEUE_NAME = "ping"; + private static TestPingProducer _pingProducer; - _connection.start(); + /** Holds the message producer to send the pings through. */ + private MessageProducer _producer; - while (_publish) - { -/* - TextMessage msg = session.createTextMessage( - "Presented to in conjunction with Mahnah Mahnah and the Snowths: " + ++messageNumber); -*/ - ObjectMessage msg = null; - if (_messageSize != 0) - { - msg = TestMessageFactory.newObjectMessage(session, _messageSize); - } - else - { - msg = session.createObjectMessage(); - } - - msg.setStringProperty("timestampString", Long.toString(System.currentTimeMillis())); - msg.setLongProperty("timestamp", System.currentTimeMillis()); - - ((BasicMessageProducer) producer).send(msg, DeliveryMode.NON_PERSISTENT, true); - _log.info("Message Sent."); - _log.debug(msg); - - - if (TRANSACTED) - { - try - { - session.commit(); - _log.debug("Session Commited."); - } - catch (JMSException e) - { - _log.trace("JMSException on commit:" + e); - try - { - session.rollback(); - _log.debug("Message rolled back."); - } - catch (JMSException jsme) - { - _log.trace("JMSE on rollback:" + jsme); - } - - - if (e.getLinkedException() instanceof AMQNoConsumersException) - { - _log.info("No Consumers on queue:'" + queue + "'"); - continue; - } - } - } - - - if (SLEEP_TIME > 0) - { - try - { - Thread.sleep(SLEEP_TIME); - } - catch (InterruptedException ie) - { - //do nothing - } - } - - - } + /** Determines whether this producer sends persistent messages from the run method. */ + private boolean _persistent; - } - catch (JMSException e) - { - _publish = false; - e.printStackTrace(); - } + /** Holds the message size to send, from the run method. */ + private int _messageSize; + + public TestPingProducer(Session session, MessageProducer producer) throws JMSException + { + super(session); + _producer = producer; } - private void createConnection(String brokerDetails, String clientID, String virtualpath) throws AMQException, URLSyntaxException + public TestPingProducer(Session session, MessageProducer producer, boolean persistent, int messageSize) + throws JMSException { - _publish = true; - _connection = new AMQConnection(brokerDetails, "guest", "guest", - clientID, virtualpath); - _log.info("Connected with URL:" + _connection.toURL()); + this(session, producer); + + _persistent = persistent; + _messageSize = messageSize; } /** - * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank - * means the server will allocate a name. + * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs + * to be started to bounce the pings back again. + * + * <p/>The command line takes from 2 to 4 arguments: + * <p/><table> + * <tr><td>brokerDetails <td> The broker connection string. + * <tr><td>virtualPath <td> The virtual path. + * <tr><td>transacted <td> A boolean flag, telling this client whether or not to use transactions. + * <tr><td>size <td> The size of ping messages to use, in bytes. + * </table> + * + * @param args The command line arguments as defined above. */ - public static void main(String[] args) + public static void main(String[] args) throws Exception { + // Extract the command line. if (args.length < 2) { - System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [transacted] [message size in bytes]"); + System.err.println( + "Usage: TestPingPublisher <brokerDetails> <virtual path> [transacted] [persistent] [message size in bytes]"); System.exit(0); } - try - { - InetAddress address = InetAddress.getLocalHost(); - String clientID = address.getHostName() + System.currentTimeMillis(); - boolean transacted = false; - if (args.length == 3 ) - { - transacted = Boolean.parseBoolean(args[2]); - } - else if (args.length > 3 ) - { - transacted = Boolean.parseBoolean(args[2]); - _messageSize = Integer.parseInt(args[3]); - } - - new TestPingProducer(transacted, args[0], clientID, args[1]); - } - catch (UnknownHostException e) - { - e.printStackTrace(); - } - catch (AMQException e) + + String brokerDetails = args[0]; + String virtualpath = args[1]; + boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false; + boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false; + int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE; + + // Create a connection to the broker. + InetAddress address = InetAddress.getLocalHost(); + String clientID = address.getHostName() + System.currentTimeMillis(); + + Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath); + + // Create a transactional or non-transactional session, based on the command line arguments. + Session session; + + if (transacted) { - System.err.println("Error in client: " + e); - e.printStackTrace(); + session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED); } - catch (URLSyntaxException e) + else { - System.err.println("Error in connection arguments : " + e); + session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } - //System.exit(0); + // Create a queue to send the pings on. + Queue pingQueue = new AMQQueue(PING_QUEUE_NAME); + MessageProducer producer = (MessageProducer) session.createProducer(pingQueue); + + // Create a ping producer to handle the request/wait/reply cycle. + _pingProducer = new TestPingProducer(session, producer, persistent, messageSize); + + // Start the message consumers running. + _connection.start(); + + // Create a shutdown hook to terminate the ping-pong producer. + Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook()); + + // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too. + _connection.setExceptionListener(_pingProducer); + Thread pingThread = new Thread(_pingProducer); + pingThread.run(); + + // Run until the ping loop is terminated. + pingThread.join(); } /** - * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException) + * Sends the specified ping message. + * + * @param message The message to send. + * + * @throws JMSException All underlying JMSExceptions are allowed to fall through. */ - public void onException(JMSException e) + public void ping(Message message) throws JMSException { - System.err.println(e.getMessage()); + _producer.send(message); + + // Keep the messageId to correlate with the reply. + String messageId = message.getJMSMessageID(); + + // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of + // this method, as the message will not be sent until the transaction is committed. + commitTx(); + } + /** + * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this + * flag has been cleared. + */ + public void stop() + { _publish = false; - e.printStackTrace(System.err); + } + + /** + * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and + * waits for short pauses in between each. + */ + public void pingLoop() + { + try + { + // Generate a sample message and time stamp it. + ObjectMessage msg = getTestMessage(_session, null, _messageSize, System.currentTimeMillis(), _persistent); + msg.setLongProperty("timestamp", System.currentTimeMillis()); + + // Send the message. + ping(msg); + + // Introduce a short pause if desired. + pause(SLEEP_TIME); + } + catch (JMSException e) + { + _publish = false; + _logger.debug("There was a JMSException: " + e.getMessage(), e); + } } } diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java new file mode 100644 index 0000000000..bee75bb1eb --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java @@ -0,0 +1,175 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.ping.AbstractPingClient;
+
+/**
+ * PingPongClient is a message listener the bounces back messages to their reply to destination. This is used to return
+ * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes too.
+ *
+ * <p/>The message id from the received message is extracted, and placed into the reply as the correlation id. Messages
+ * are bounced back to the reply-to destination. The original sender of the message has the option to use either a unique
+ * temporary queue or the correlation id to correlate the original message to the reply.
+ *
+ * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
+ * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
+ * be disabled for real timing tests as writing to the console will slow things down.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Bounce back messages to their reply to destination.
+ * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
+ * </table>
+ *
+ * @todo Rename this to BounceBackClient or something similar.
+ */
+public class PingPongClient extends AbstractPingClient implements MessageListener
+{
+ private static final Logger _logger = Logger.getLogger(PingPongClient.class);
+
+ /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
+ private boolean _verbose = false;
+
+ /**
+ * Creates a PingPongClient on the specified session.
+ *
+ * @param session The JMS Session for the ping pon client to run on.
+ * @param consumer The message consumer to receive the messages with.
+ * @param verbose If set to <tt>true</tt> will output timing information on every message.
+ */
+ public PingPongClient(Session session, MessageConsumer consumer, boolean verbose) throws JMSException
+ {
+ // Hang on to the session for the replies.
+ super(session);
+
+ // Set this up to listen for messages on the queue.
+ consumer.setMessageListener(this);
+ }
+
+ /**
+ * Starts a stand alone ping-pong client running in verbose mode.
+ *
+ * @param args
+ */
+ public static void main(String[] args)
+ {
+ _logger.info("Starting...");
+
+ // Display help on the command line.
+ if (args.length < 4)
+ {
+ System.out.println("Usage: brokerdetails username password virtual-path [transacted] [selector]");
+ System.exit(1);
+ }
+
+ // Extract all comman line parameters.
+ String brokerDetails = args[0];
+ String username = args[1];
+ String password = args[2];
+ String virtualpath = args[3];
+ boolean transacted = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
+ String selector = (args.length == 6) ? args[5] : null;
+
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+
+ AMQConnection con1 = new AMQConnection(brokerDetails, username, password, address.getHostName(), virtualpath);
+
+ _logger.info("Connected with URL:" + con1.toURL());
+
+ // Create a transactional or non-transactional session depending on the command line parameter.
+ Session session = null;
+
+ if (transacted)
+ {
+ session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.SESSION_TRANSACTED);
+ }
+ else if (!transacted)
+ {
+ session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ Queue q = new AMQQueue("ping");
+
+ MessageConsumer consumer = session.createConsumer(q, 1, false, false, selector);
+ new PingPongClient(session, consumer, true);
+
+ con1.start();
+ }
+ catch (Throwable t)
+ {
+ System.err.println("Fatal error: " + t);
+ t.printStackTrace();
+ }
+
+ System.out.println("Waiting...");
+ }
+
+ /**
+ * This is a callback method that is notified of all messages for which this has been registered as a message
+ * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
+ * destination of the message.
+ *
+ * @param message The message that triggered this callback.
+ */
+ public void onMessage(javax.jms.Message message)
+ {
+ try
+ {
+ // Spew out some timing information if verbose mode is on.
+ if (_verbose)
+ {
+ Long timestamp = message.getLongProperty("timestamp");
+
+ if (timestamp != null)
+ {
+ long diff = System.currentTimeMillis() - timestamp;
+ _logger.info("Ping time: " + diff);
+ }
+ }
+
+ // Correlate the reply to the original.
+ message.setJMSCorrelationID(message.getJMSMessageID());
+
+ // Send the receieved message as the pong reply.
+ MessageProducer producer = _session.createProducer(message.getJMSReplyTo());
+ producer.send(message);
+
+ // Commit the transaction if running in transactional mode.
+ commitTx();
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java new file mode 100644 index 0000000000..8bb2da8b6f --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -0,0 +1,301 @@ +/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.ping.AbstractPingProducer;
+import org.apache.qpid.util.concurrent.BooleanLatch;
+
+/**
+ * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
+ * client (see {@link org.apache.qpid.requestreply.PingPongClient} for the bounce back client). It is designed to be run from the command line
+ * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session,
+ * message producer and message consumer to run the ping-pong cycle on.
+ *
+ * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
+ * This means that this class has to do some work to correlate pings with pongs; it expectes the original message
+ * id in the ping to be bounced back in the correlation id. If a new temporary queue per ping were used, then
+ * this correlation would not need to be done.
+ *
+ * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
+ * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
+ * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
+ * also registered to terminate the ping-pong loop cleanly.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a ping and wait for response cycle.
+ * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
+ * </table>
+ *
+ * @todo Make temp queue per ping a command line option.
+ *
+ * @todo Make the queue name a command line option.
+ */
+public class PingPongProducer extends AbstractPingProducer implements Runnable, MessageListener, ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
+
+ /** Used to set up a default message size. */
+ private static final int DEFAULT_MESSAGE_SIZE = 0;
+
+ /** Used to define how long to wait between pings. */
+ private static final long SLEEP_TIME = 250;
+
+ /** Used to define how long to wait before assuming that a ping has timed out. */
+ private static final long TIMEOUT = 3000;
+
+ /** Holds the name of the queue to send pings on. */
+ private static final String PING_QUEUE_NAME = "ping";
+
+ /** Keeps track of the ping producer instance used in the run loop. */
+ private static PingPongProducer _pingProducer;
+
+ /** Holds the message producer to send the pings through. */
+ private MessageProducer _producer;
+
+ /** Holds the queue to send the ping replies to. */
+ private Queue _replyQueue;
+
+ /** Determines whether this producer sends persistent messages from the run method. */
+ private boolean _persistent;
+
+ /** Holds the message size to send, from the run method. */
+ private int _messageSize;
+
+ /** Holds a map from message ids to latches on which threads wait for replies. */
+ private Map<String, BooleanLatch> trafficLights = new HashMap<String, BooleanLatch>();
+
+ /** Holds a map from message ids to correlated replies. */
+ private Map<String, Message> replies = new HashMap<String, Message>();
+
+ public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer)
+ throws JMSException
+ {
+ super(session);
+ _producer = producer;
+ _replyQueue = replyQueue;
+
+ consumer.setMessageListener(this);
+ }
+
+ public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer,
+ boolean persistent, int messageSize) throws JMSException
+ {
+ this(session, replyQueue, producer, consumer);
+
+ _persistent = persistent;
+ _messageSize = messageSize;
+ }
+
+ /**
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs
+ * to be started to bounce the pings back again.
+ *
+ * <p/>The command line takes from 2 to 4 arguments:
+ * <p/><table>
+ * <tr><td>brokerDetails <td> The broker connection string.
+ * <tr><td>virtualPath <td> The virtual path.
+ * <tr><td>transacted <td> A boolean flag, telling this client whether or not to use transactions.
+ * <tr><td>size <td> The size of ping messages to use, in bytes.
+ * </table>
+ *
+ * @param args The command line arguments as defined above.
+ */
+ public static void main(String[] args) throws Exception
+ {
+ // Extract the command line.
+ if (args.length < 2)
+ {
+ System.err.println(
+ "Usage: TestPingPublisher <brokerDetails> <virtual path> [transacted] [persistent] [message size in bytes]");
+ System.exit(0);
+ }
+
+ String brokerDetails = args[0];
+ String virtualpath = args[1];
+ boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false;
+ boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
+ int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE;
+
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+
+ Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath);
+
+ // Create a transactional or non-transactional session, based on the command line arguments.
+ Session session;
+
+ if (transacted)
+ {
+ session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else
+ {
+ session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ // Create a queue to send the pings on.
+ Queue pingQueue = new AMQQueue(PING_QUEUE_NAME);
+ MessageProducer producer = (MessageProducer) session.createProducer(pingQueue);
+
+ // Create a temporary queue to reply with the pongs on.
+ Queue replyQueue = session.createTemporaryQueue();
+
+ // Create a message consumer to get the replies with.
+ MessageConsumer consumer = session.createConsumer(replyQueue);
+
+ // Create a ping producer to handle the request/wait/reply cycle.
+ _pingProducer = new PingPongProducer(session, replyQueue, producer, consumer, persistent, messageSize);
+
+ // Start the message consumers running.
+ _connection.start();
+
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
+
+ // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too.
+ _connection.setExceptionListener(_pingProducer);
+ Thread pingThread = new Thread(_pingProducer);
+ pingThread.run();
+
+ // Run until the ping loop is terminated.
+ pingThread.join();
+ }
+
+ /**
+ * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
+ * correlating reply may be waiting on.
+ *
+ * @param message The received message.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ // Store the reply.
+ String correlationID = message.getJMSCorrelationID();
+ replies.put(correlationID, message);
+
+ // Turn the traffic light to green.
+ BooleanLatch trafficLight = trafficLights.get(correlationID);
+
+ if (trafficLight != null)
+ {
+ trafficLight.signal();
+ }
+ else
+ {
+ _logger.debug("There was no thread waiting for reply: " + correlationID);
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Sends the specified ping message and then waits for a correlating reply. If the wait times out before a reply
+ * arrives, then a null reply is returned from this method.
+ *
+ * @param message The message to send.
+ * @param timeout The timeout in milliseconds.
+ *
+ * @return The reply, or null if no reply arrives before the timeout.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public Message pingAndWaitForReply(Message message, long timeout) throws JMSException
+ {
+ _producer.send(message);
+
+ // Keep the messageId to correlate with the reply.
+ String messageId = message.getJMSMessageID();
+
+ // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
+ // this method, as the message will not be sent until the transaction is committed.
+ commitTx();
+
+ // Block the current thread until a reply to the message is received, or it times out.
+ BooleanLatch trafficLight = new BooleanLatch();
+ trafficLights.put(messageId, trafficLight);
+
+ // Note that this call expects a timeout in nanoseconds, millisecond timeout is multiplied up.
+ trafficLight.await(timeout * 1000);
+
+ // Check the replies to see if one was generated, if not then the reply timed out.
+ Message result = replies.get(messageId);
+
+ return result;
+ }
+
+ /**
+ * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
+ * connection, this clears the publish flag which in turn will halt the ping loop.
+ *
+ * @param e The exception that triggered this callback method.
+ */
+ public void onException(JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+
+ /**
+ * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
+ * waits for replies and inserts short pauses in between each.
+ */
+ public void pingLoop()
+ {
+ try
+ {
+ // Generate a sample message and time stamp it.
+ ObjectMessage msg = getTestMessage(_session, _replyQueue, _messageSize, System.currentTimeMillis(), _persistent);
+ msg.setLongProperty("timestamp", System.currentTimeMillis());
+
+ // Send the message and wait for a reply.
+ pingAndWaitForReply(msg, TIMEOUT);
+
+ // Introduce a short pause if desired.
+ pause(SLEEP_TIME);
+ }
+ catch (JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java new file mode 100644 index 0000000000..74f1a899cf --- /dev/null +++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java @@ -0,0 +1,245 @@ +package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+import java.util.Properties;
+
+import javax.jms.*;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+
+/**
+ * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
+ * many times simultaneously. A full round trip ping sends a message from a producer to a conumer, then the consumer
+ * replies to the message on a temporary queue.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single
+ * full round trip ping. This test may be scaled up using a suitable JUnit test runner. See {@link TKTestRunner} or
+ * {@link PPTestRunner} for more information on how to do this.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run,
+ * except if the connection is lost in which case an attempt to re-establish the setup is made.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
+ * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the
+ * temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class PingPongTestPerf extends TestCase implements ExceptionListener //, TimingControllerAware
+{
+ private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
+
+ /** Holds the name of the property to get the test message size from. */
+ private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+
+ /** Holds the name of the property to get the ping queue name from. */
+ private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+
+ /** Holds the name of the property to get the test delivery mode from. */
+ private static final String PERSISTENT_MODE_PROPNAME = "persistent";
+
+ /** Holds the name of the property to get the test transactional mode from. */
+ private static final String TRANSACTED_PROPNAME = "transacted";
+
+ /** Holds the name of the property to get the test broker url from. */
+ private static final String BROKER_PROPNAME = "broker";
+
+ /** Holds the name of the property to get the test broker virtual path. */
+ private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
+
+ /** Holds the size of message body to attach to the ping messages. */
+ private static final int MESSAGE_SIZE_DEFAULT = 0;
+
+ /** Holds the name of the queue to which pings are sent. */
+ private static final String PING_QUEUE_NAME_DEFAULT = "ping";
+
+ /** Holds the message delivery mode to use for the test. */
+ private static final boolean PERSISTENT_MODE_DEFAULT = false;
+
+ /** Holds the transactional mode to use for the test. */
+ private static final boolean TRANSACTED_DEFAULT = false;
+
+ /** Holds the default broker url for the test. */
+ private static final String BROKER_DEFAULT = "tcp://localhost:5672";
+
+ /** Holds the default virtual path for the test. */
+ private static final String VIRTUAL_PATH_DEFAULT = "/test";
+
+ /** Sets a default ping timeout. */
+ private static final long TIMEOUT = 3000;
+
+ // Sets up the test parameters with defaults.
+ static
+ {
+ setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
+ setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+ setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
+ setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
+ setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+ setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
+ }
+
+ /** Holds the test ping-pong producer. */
+ private PingPongProducer _testPingProducer;
+
+ // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
+ // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
+ // of the test parameters to log with the results.
+ private Properties testParameters = System.getProperties();
+ //private Properties testParameters = new ContextualProperties(System.getProperties());
+
+ /** Holds the connection to the broker. */
+ private Connection _connection = null;
+
+ /** Holds the current session to the broker. */
+ private Session _session;
+
+ /** Holds the destination to send the ping messages to. */
+ private Queue _pingQueue;
+
+ /** Holds the destination to send replies to. */
+ private Queue _replyQueue;
+
+ /** Holds a message producer, set up on the ping destination, to send messages through. */
+ private MessageProducer _producer;
+
+ /** Holds a message consumer, set up on the ping destination, to receive pings through. */
+ private MessageConsumer _pingConsumer;
+
+ /** Holds a message consumer, set up on the pong destination, to receive replies through. */
+ private MessageConsumer _pongConsumer;
+
+ /** Holds a failure flag, which gets set if the connection to the broker goes down. */
+ private boolean _failure;
+
+ public PingPongTestPerf(String name)
+ {
+ super(name);
+ }
+
+ private static void setSystemPropertyIfNull(String propName, String propValue)
+ {
+ if (System.getProperty(propName) == null)
+ {
+ System.setProperty(propName, propValue);
+ }
+ }
+
+ public void testPingPongOk() throws Exception
+ {
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ ObjectMessage msg =
+ PingPongProducer.getTestMessage(_session, _replyQueue,
+ Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)),
+ System.currentTimeMillis(),
+ Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)));
+
+ // Use the test timing controller to reset the test timer now and obtain the current time.
+ // This can be used to remove the message creation time from the test.
+ //TestTimingController timingUtils = getTimingController();
+ //long startTime = timingUtils.restart();
+
+ // Send the message and wait for a reply.
+ Message reply = _testPingProducer.pingAndWaitForReply(msg, TIMEOUT);
+
+ // Fail the test if the timeout was exceeded.
+ if (reply == null)
+ {
+ Assert.fail("The ping timed out for message id: " + msg.getJMSMessageID());
+ }
+ }
+
+ /**
+ * This is a callback method that is registered to receive any JMSExceptions that occurr on the connection to
+ * the broker. It sets a failure flag to indicate that there is an error condition.
+ *
+ * @param e The JMSException that triggered this callback method.
+ *
+ * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
+ */
+ public void onException(JMSException e)
+ {
+ // Set the failure flag.
+ _failure = true;
+
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+
+ protected void setUp() throws Exception
+ {
+ // Log4j will propagate the test name as a thread local in all log output.
+ NDC.push(getName());
+
+ // Ensure that the connection, session and ping queue are established, if they have not already been.
+ if (_connection == null)
+ {
+ // Create a client id that identifies the client machine.
+ String clientID = InetAddress.getLocalHost().getHostName() + System.currentTimeMillis();
+
+ // Connect to the broker.
+ _connection = new AMQConnection(testParameters.getProperty(BROKER_PROPNAME), "guest", "guest", clientID,
+ testParameters.getProperty(VIRTUAL_PATH_PROPNAME));
+ _connection.setExceptionListener(this);
+
+ // Create a transactional or non-transactional session, based on the test properties, if a session has not
+ // already been created.
+ if (Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)))
+ {
+ _session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else
+ {
+ _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ // Create a queue to send the pings on.
+ _pingQueue = new AMQQueue(testParameters.getProperty(PING_QUEUE_NAME_PROPNAME));
+ _producer = (MessageProducer) _session.createProducer(_pingQueue);
+
+ // Create a temporary queue to reply with the pongs on.
+ _replyQueue = _session.createTemporaryQueue();
+
+ // Create the ping and pong consumers on their respective destinations.
+ _pingConsumer = _session.createConsumer(_pingQueue);
+ _pongConsumer = _session.createConsumer(_replyQueue);
+
+ // Establish a bounce back client on the ping queue to bounce back the pings.
+ new org.apache.qpid.requestreply.PingPongClient(_session, _pingConsumer, false);
+
+ // Establish a ping-pong client on the ping queue to send pings and wait for replies.
+ _testPingProducer = new org.apache.qpid.requestreply.PingPongProducer(_session, _replyQueue, _producer,
+ _pongConsumer);
+
+ _connection.start();
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _connection.close();
+ }
+ finally
+ {
+ NDC.pop();
+ }
+ }
+}
|
