summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rwxr-xr-xjava/perftests/bin/testPingClient.sh2
-rwxr-xr-xjava/perftests/bin/testPingProducer.sh2
-rw-r--r--java/perftests/pom.xml85
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java70
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java193
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java166
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java334
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java175
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java301
-rw-r--r--java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java245
-rw-r--r--java/pom.xml11
11 files changed, 1321 insertions, 263 deletions
diff --git a/java/perftests/bin/testPingClient.sh b/java/perftests/bin/testPingClient.sh
index d819584999..4eca4a7999 100755
--- a/java/perftests/bin/testPingClient.sh
+++ b/java/perftests/bin/testPingClient.sh
@@ -30,4 +30,4 @@ echo $thehosts
# XXX -Xms1024m -XX:NewSize=300m
. ./setupclasspath.sh
echo $CP
-$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.pingpong.TestPingClient $thehosts guest guest /test "$@"
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.ping.TestPingClient $thehosts guest guest /test "$@"
diff --git a/java/perftests/bin/testPingProducer.sh b/java/perftests/bin/testPingProducer.sh
index 72c573780a..39ab487b60 100755
--- a/java/perftests/bin/testPingProducer.sh
+++ b/java/perftests/bin/testPingProducer.sh
@@ -30,4 +30,4 @@ echo $thehosts
# XXX -Xms1024m -XX:NewSize=300m
. ./setupclasspath.sh
echo $CP
-$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.pingpong.TestPingProducer $thehosts /test
+$JAVA_HOME/bin/java -cp $CP -Damqj.logging.level="warn" -Damqj.test.logging.level="info" -Dlog4j.configuration=src/perftests.log4j org.apache.qpid.ping.TestPingProducer $thehosts /test
diff --git a/java/perftests/pom.xml b/java/perftests/pom.xml
index bfa24d99a3..874cbd3e52 100644
--- a/java/perftests/pom.xml
+++ b/java/perftests/pom.xml
@@ -49,20 +49,16 @@
<artifactId>log4j</artifactId>
</dependency>
- <!-- Here JUnit is a deliberate 'compile' scope dependency, it will be packaged with the perf testing tools. -->
+ <!-- Test dependencies. -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <scope>compile</scope>
</dependency>
- <!-- Will be added to maven repo soon. JUnit test runner that can add repeats/concurrenct/timing etc to tests.
<dependency>
<groupId>uk.co.thebadgerset</groupId>
<artifactId>junit-toolkit</artifactId>
- <scope>compile</scope>
</dependency>
- -->
</dependencies>
@@ -80,6 +76,85 @@
</configuration>
</plugin>
+ <!-- The JUnit Toolkit maven2 plugin is in the process of being added to the maven repository. It will take a day or two from 16/1/2007.
+
+ Configures the toolkit test runner for performance testing. These can be run from within maven, or by using the generated
+ scripts.
+
+ To run from maven:
+
+ mvn uk.co.thebadgerset:junit-toolkit-maven-plugin:tktest
+
+ To run from the command line (after doing assembly:assembly goal):
+
+ java -cp target/test_jar-jar-with-dependencies.jar uk.co.thebadgerset.junit.extensions.OldTKTestRunner -s 1 -r 100000 -o target org.apache.qpid.requestreply.PingPongTestPerf
+
+ To generate the scripts do:
+
+ mvn uk.co.thebadgerset:junit-toolkit-maven-plugin:scripts
+
+ Then to run the scripts do (after doing assembly:assembly goal):
+
+ ./bin/script_name or ./bin/script_name.bat
+
+ These scripts can find everything in the 'all test dependencies' jar.
+ -->
+ <!--
+ <plugin>
+ <groupId>uk.co.thebadgerset</groupId>
+ <artifactId>junit-toolkit-maven-plugin</artifactId>
+ <version>0.3</version>
+
+ <configuration>
+ <commands>
+ <!## Run the ping pong test once. This is just to check toolkit test runner is working. Real tests to follow. ##>
+ <param>-s 1 -r 1 -o target org.apache.qpid.requestreply.PingPongTestPerf</param>
+ <!## Add more here... <param> ... </param> ##>
+
+ <!## A little bit of work needs to be done on TKTestRunner before the following syntax works. See Javadoc for details. ##>
+ <!## Thread ramp up: <param>-c [1,5,10,50,100] -r 1000 -o target org.apache.qpid.requestreply.PingPongTestPerf</param> ##>
+ <!## Run for a length of time: <param>-d1H -o target org.apache.qpid.requestreply.PingPongTestPerf</param> ##>
+ <!## Configure test parameters: <param>pingQueue=myping persistent=true transacted=true messageSize=1000 -d10M ... </param> ##>
+ <!## And so on. ##>
+ </commands>
+ </configuration>
+
+ <executions>
+ <execution>
+ <phase>test</phase>
+ <!##<goals>
+ <goal>tktest</goal>
+ </goals>##>
+ </execution>
+ </executions>
+ </plugin>
+ -->
+
+ <!-- Bundles all the dependencies, fully expanded into a single jar, required to run the tests.
+
+ Usefull when bundling system, integration or performance tests into a convenient
+ package to hand over to testers. To use it run:
+
+ java -cp target/your_app_name-all-test-deps.jar path.to.your.Class
+
+ or often:
+
+ java -cp target/your_app_name-all-test-deps.jar junit.framework.textui.TestRunner path.to.your.test.Class
+
+ or other JUnit test runner invocations.
+ -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>jar-with-dependencies.xml</descriptor>
+ </descriptors>
+ <outputDirectory>target</outputDirectory>
+ <workDirectory>target/assembly/work</workDirectory>
+ </configuration>
+ </plugin>
+
</plugins>
<!-- Include source files in built jar -->
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
new file mode 100644
index 0000000000..3c1a476d51
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
@@ -0,0 +1,70 @@
+package org.apache.qpid.ping;
+
+import javax.jms.JMSException;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.jms.Session;
+
+/**
+ * Provides functionality common to all ping clients. Provides the ability to manage a session and a convenience method
+ * to commit on the current transaction.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Commit the current transcation.
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public abstract class AbstractPingClient
+{
+ private static final Logger _logger = Logger.getLogger(TestPingClient.class);
+
+ /** Used to keep a handle on the JMS session to send replies using. */
+ protected Session _session;
+
+ /**
+ * Creates an abstract ping client to manage the specified transcation.
+ *
+ * @param session The session.
+ */
+ public AbstractPingClient(Session session)
+ {
+ _session = session;
+ }
+
+ /**
+ * Convenience method to commit the transaction on the session associated with this bounce back client.
+ *
+ * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ */
+ protected void commitTx() throws JMSException
+ {
+ if (_session.getTransacted())
+ {
+ try
+ {
+ _session.commit();
+ _logger.trace("Session Commited.");
+ }
+ catch (JMSException e)
+ {
+ _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+ try
+ {
+ _session.rollback();
+ _logger.debug("Message rolled back.");
+ }
+ catch (JMSException jmse)
+ {
+ _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+ // Both commit and rollback failed. Throw the rollback exception.
+ throw jmse;
+ }
+ }
+ }
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
new file mode 100644
index 0000000000..e2c2d5b440
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
@@ -0,0 +1,193 @@
+package org.apache.qpid.ping;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.jms.Session;
+
+/**
+ * This abstract class captures functionality that is common to all ping producers. It provides functionality to
+ * manage a session, and a convenience method to commit a transaction on the session. It also provides a framework
+ * for running a ping loop, and terminating that loop on exceptions or a shutdown handler.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Manage session.
+ * <tr><td> Provide clean shutdown on exception or shutdown hook.
+ * <tr><td> Provide useable shutdown hook implementation.
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public abstract class AbstractPingProducer implements Runnable, ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class);
+
+ /** Holds the current Qpid session to send and receive pings on. */
+ protected Session _session;
+
+ /** Used to tell the ping loop when to terminate, it only runs while this is true. */
+ protected boolean _publish = true;
+
+ /**
+ * Creates an AbstractPingProducer on a session.
+ */
+ public AbstractPingProducer(Session session)
+ {
+ _session = session;
+ }
+
+ /**
+ * Generates a test message of the specified size.
+ *
+ * @param session The Qpid session under which to generate the message.
+ * @param replyQueue The reply-to destination for the message.
+ * @param messageSize The desired size of the message in bytes.
+ * @param currentTime The timestamp to add to the message as a "timestamp" property.
+ *
+ * @return A freshly generated test message.
+ *
+ * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
+ */
+ public static ObjectMessage getTestMessage(Session session, Queue replyQueue, int messageSize, long currentTime,
+ boolean persistent) throws JMSException
+ {
+ ObjectMessage msg;
+
+ if (messageSize != 0)
+ {
+ msg = TestMessageFactory.newObjectMessage(session, messageSize);
+ }
+ else
+ {
+ msg = session.createObjectMessage();
+ }
+
+ // Set the messages persistent delivery flag.
+ msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ // Timestamp the message.
+ msg.setLongProperty("timestamp", currentTime);
+
+ // Ensure that the temporary reply queue is set as the reply to destination for the message.
+ if (replyQueue != null)
+ {
+ msg.setJMSReplyTo(replyQueue);
+ }
+
+ return msg;
+ }
+
+ /**
+ * Convenience method for a short pause.
+ *
+ * @param sleepTime The time in milliseconds to pause for.
+ */
+ public static void pause(long sleepTime)
+ {
+ if (sleepTime > 0)
+ {
+ try
+ {
+ Thread.sleep(sleepTime);
+ }
+ catch (InterruptedException ie)
+ { }
+ }
+ }
+
+ public abstract void pingLoop();
+
+ /**
+ * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
+ * flag has been cleared.
+ */
+ public void stop()
+ {
+ _publish = false;
+ }
+
+ /**
+ * Implements a ping loop that repeatedly pings until the publish flag becomes false.
+ */
+ public void run()
+ {
+ // Keep running until the publish flag is cleared.
+ while (_publish)
+ {
+ pingLoop();
+ }
+ }
+
+ /**
+ * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
+ * connection, this clears the publish flag which in turn will halt the ping loop.
+ *
+ * @param e The exception that triggered this callback method.
+ */
+ public void onException(JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+
+ /**
+ * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
+ * with the runtime system as a shutdown hook.
+ *
+ * @return A shutdown hook for the ping loop.
+ */
+ public Thread getShutdownHook()
+ {
+ return new Thread(new Runnable()
+ {
+ public void run()
+ {
+ stop();
+ }
+ });
+ }
+
+ /**
+ * Convenience method to commit the transaction on the session associated with this pinger.
+ *
+ * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ */
+ protected void commitTx() throws JMSException
+ {
+ if (_session.getTransacted())
+ {
+ try
+ {
+ _session.commit();
+ _logger.trace("Session Commited.");
+ }
+ catch (JMSException e)
+ {
+ _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+ // Warn that the bounce back client is not available.
+ if (e.getLinkedException() instanceof AMQNoConsumersException)
+ {
+ _logger.debug("No consumers on queue.");
+ }
+
+ try
+ {
+ _session.rollback();
+ _logger.trace("Message rolled back.");
+ }
+ catch (JMSException jmse)
+ {
+ _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+ // Both commit and rollback failed. Throw the rollback exception.
+ throw jmse;
+ }
+ }
+ }
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
index c96f6bd61d..3063e83127 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,112 +18,104 @@
* under the License.
*
*/
-package org.apache.qpid.pingpong;
+package org.apache.qpid.ping;
+
+import java.net.InetAddress;
+
+import javax.jms.*;
import org.apache.log4j.Logger;
-import org.apache.log4j.Level;
+
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.jms.Session;
-import javax.jms.*;
-import java.net.InetAddress;
-
-public class TestPingClient
+/**
+ * PingClient is a message listener that received time stamped ping messages. It can work out how long a ping took,
+ * provided that its clokc is synchronized to that of the ping producer, or by running it on the same machine (or jvm)
+ * as the ping producer.
+ *
+ * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
+ * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
+ * be disabled for real timing tests as writing to the console will slow things down.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide command line invocation to start the ping consumer on a configurable broker url.
+ * </table>
+ */
+class TestPingClient extends AbstractPingClient implements MessageListener
{
private static final Logger _logger = Logger.getLogger(TestPingClient.class);
- private static class TestPingMessageListener implements MessageListener
+ /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
+ private boolean _verbose = false;
+
+ /**
+ * Creates a PingPongClient on the specified session.
+ *
+ * @param session The JMS Session for the ping pon client to run on.
+ * @param consumer The message consumer to receive the messages with.
+ * @param verbose If set to <tt>true</tt> will output timing information on every message.
+ */
+ public TestPingClient(Session session, MessageConsumer consumer, boolean verbose) throws JMSException
{
- public TestPingMessageListener()
- {
- }
-
- long _lastTimestamp = 0L;
- long _lastTimestampString = 0L;
-
- public void onMessage(javax.jms.Message message)
- {
- if (_logger.isInfoEnabled())
- {
- long timestamp = 0L;
- long timestampString = 0L;
-
- try
- {
- timestamp = message.getLongProperty("timestamp");
- timestampString = Long.parseLong(message.getStringProperty("timestampString"));
-
- if (timestampString != timestamp)
- {
- _logger.info("Timetamps differ!:\n" +
- "timestamp:" + timestamp + "\n" +
- "timestampString:" + timestampString);
- }
-
- }
- catch (JMSException jmse)
- {
- //ignore
- }
+ // Hang on to the session for the replies.
+ super(session);
- long diff = timestamp - _lastTimestamp;
- _lastTimestamp = timestamp;
-
- long stringDiff = timestampString - _lastTimestampString;
-
- _lastTimestampString = timestampString;
-
- _logger.info("Ping: T:" + diff + "ms, TS:" + stringDiff);
-
- // _logger.info(_name + " got message '" + message + "\n");
- }
- }
+ // Set this up to listen for messages on the queue.
+ consumer.setMessageListener(this);
}
+ /**
+ * Starts a stand alone ping-pong client running in verbose mode.
+ *
+ * @param args
+ */
public static void main(String[] args)
{
- _logger.setLevel(Level.INFO);
-
_logger.info("Starting...");
+ // Display help on the command line.
if (args.length < 4)
{
- System.out.println("Usage: brokerdetails username password virtual-path [selector] ");
+ System.out.println("Usage: brokerdetails username password virtual-path [transacted] [selector]");
System.exit(1);
}
+
+ // Extract all comman line parameters.
+ String brokerDetails = args[0];
+ String username = args[1];
+ String password = args[2];
+ String virtualpath = args[3];
+ boolean transacted = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
+ String selector = (args.length == 6) ? args[5] : null;
+
try
{
InetAddress address = InetAddress.getLocalHost();
- AMQConnection con1 = new AMQConnection(args[0], args[1], args[2],
- address.getHostName(), args[3]);
+ AMQConnection con1 = new AMQConnection(brokerDetails, username, password, address.getHostName(), virtualpath);
_logger.info("Connected with URL:" + con1.toURL());
-
- final org.apache.qpid.jms.Session session1 = (org.apache.qpid.jms.Session)
- con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- String selector = null;
+ // Create a transactional or non-transactional session depending on the command line parameter.
+ Session session = null;
- if (args.length == 5)
+ if (transacted)
{
- selector = args[4];
- _logger.info("Message selector is <" + selector + ">...");
+ session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.SESSION_TRANSACTED);
}
- else
+ else if (!transacted)
{
- _logger.info("Not using message selector");
+ session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
-
Queue q = new AMQQueue("ping");
- MessageConsumer consumer1 = session1.createConsumer(q,
- 1, false, false, selector);
+ MessageConsumer consumer = session.createConsumer(q, 1, false, false, selector);
+ new TestPingClient(session, consumer, true);
- consumer1.setMessageListener(new TestPingMessageListener());
con1.start();
}
catch (Throwable t)
@@ -134,5 +126,35 @@ public class TestPingClient
System.out.println("Waiting...");
}
-}
+ /**
+ * This is a callback method that is notified of all messages for which this has been registered as a message
+ * listener on a message consumer.
+ *
+ * @param message The message that triggered this callback.
+ */
+ public void onMessage(javax.jms.Message message)
+ {
+ try
+ {
+ // Spew out some timing information if verbose mode is on.
+ if (_verbose)
+ {
+ Long timestamp = message.getLongProperty("timestamp");
+
+ if (timestamp != null)
+ {
+ long diff = System.currentTimeMillis() - timestamp;
+ _logger.info("Ping time: " + diff);
+ }
+ }
+
+ // Commit the transaction if running in transactional mode.
+ commitTx();
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
index bb9e17615e..d47650d049 100644
--- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,226 +18,196 @@
* under the License.
*
*/
-package org.apache.qpid.pingpong;
+package org.apache.qpid.ping;
+
+import java.net.InetAddress;
+
+import javax.jms.*;
import org.apache.log4j.Logger;
+
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
-import javax.jms.*;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
/**
- * A client that behaves as follows:
- * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
- * <li>Creates a temporary queue</li>
- * <li>Creates messages containing a property that is the name of the temporary queue</li>
- * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
- * </ul>
+ * PingProducer is a client that sends timestamped pings to a queue. It is designed to be run from the command line
+ * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session and
+ * configured message producer.
+ *
+ * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
+ * does all its work through helper methods, so that code wishing to run a ping cycle is not forced to do so
+ * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
+ * also registered to terminate the ping loop cleanly.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a ping cycle.
+ * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
+ * </table>
*/
-public class TestPingProducer implements ExceptionListener
+class TestPingProducer extends AbstractPingProducer
{
- private static final Logger _log = Logger.getLogger(TestPingProducer.class);
-
- private AMQConnection _connection;
-
- private static int _messageSize = 0;
- private boolean _publish;
-
- private long SLEEP_TIME = 250L;
-
-// private class CallbackHandler implements MessageListener
-// {
-//
-// private int _actualMessageCount;
-//
-//
-// public void onMessage(Message m)
-// {
-// if (_log.isDebugEnabled())
-// {
-// _log.debug("Message received: " + m);
-// }
-// _actualMessageCount++;
-// if (_actualMessageCount % 1000 == 0)
-// {
-// _log.info("Received message count: " + _actualMessageCount);
-// }
-// }
-// }
-
- public TestPingProducer(boolean TRANSACTED, String brokerDetails, String clientID,
- String virtualpath) throws AMQException, URLSyntaxException
- {
- try
- {
- createConnection(brokerDetails, clientID, virtualpath);
+ private static final Logger _logger = Logger.getLogger(TestPingProducer.class);
- Session session;
+ /** Used to set up a default message size. */
+ private static final int DEFAULT_MESSAGE_SIZE = 0;
- if (TRANSACTED)
- {
- session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
- }
- else
- {
- session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
+ /** Used to define how long to wait between pings. */
+ private static final long SLEEP_TIME = 250;
- String queue = "ping";
- AMQQueue destination = new AMQQueue(queue);
- MessageProducer producer = (MessageProducer) session.createProducer(destination);
+ /** Used to define how long to wait before assuming that a ping has timed out. */
+ private static final long TIMEOUT = 3000;
- _connection.setExceptionListener(this);
+ /** Holds the name of the queue to send pings on. */
+ private static final String PING_QUEUE_NAME = "ping";
+ private static TestPingProducer _pingProducer;
- _connection.start();
+ /** Holds the message producer to send the pings through. */
+ private MessageProducer _producer;
- while (_publish)
- {
-/*
- TextMessage msg = session.createTextMessage(
- "Presented to in conjunction with Mahnah Mahnah and the Snowths: " + ++messageNumber);
-*/
- ObjectMessage msg = null;
- if (_messageSize != 0)
- {
- msg = TestMessageFactory.newObjectMessage(session, _messageSize);
- }
- else
- {
- msg = session.createObjectMessage();
- }
-
- msg.setStringProperty("timestampString", Long.toString(System.currentTimeMillis()));
- msg.setLongProperty("timestamp", System.currentTimeMillis());
-
- ((BasicMessageProducer) producer).send(msg, DeliveryMode.NON_PERSISTENT, true);
- _log.info("Message Sent.");
- _log.debug(msg);
-
-
- if (TRANSACTED)
- {
- try
- {
- session.commit();
- _log.debug("Session Commited.");
- }
- catch (JMSException e)
- {
- _log.trace("JMSException on commit:" + e);
- try
- {
- session.rollback();
- _log.debug("Message rolled back.");
- }
- catch (JMSException jsme)
- {
- _log.trace("JMSE on rollback:" + jsme);
- }
-
-
- if (e.getLinkedException() instanceof AMQNoConsumersException)
- {
- _log.info("No Consumers on queue:'" + queue + "'");
- continue;
- }
- }
- }
-
-
- if (SLEEP_TIME > 0)
- {
- try
- {
- Thread.sleep(SLEEP_TIME);
- }
- catch (InterruptedException ie)
- {
- //do nothing
- }
- }
-
-
- }
+ /** Determines whether this producer sends persistent messages from the run method. */
+ private boolean _persistent;
- }
- catch (JMSException e)
- {
- _publish = false;
- e.printStackTrace();
- }
+ /** Holds the message size to send, from the run method. */
+ private int _messageSize;
+
+ public TestPingProducer(Session session, MessageProducer producer) throws JMSException
+ {
+ super(session);
+ _producer = producer;
}
- private void createConnection(String brokerDetails, String clientID, String virtualpath) throws AMQException, URLSyntaxException
+ public TestPingProducer(Session session, MessageProducer producer, boolean persistent, int messageSize)
+ throws JMSException
{
- _publish = true;
- _connection = new AMQConnection(brokerDetails, "guest", "guest",
- clientID, virtualpath);
- _log.info("Connected with URL:" + _connection.toURL());
+ this(session, producer);
+
+ _persistent = persistent;
+ _messageSize = messageSize;
}
/**
- * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
- * means the server will allocate a name.
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs
+ * to be started to bounce the pings back again.
+ *
+ * <p/>The command line takes from 2 to 4 arguments:
+ * <p/><table>
+ * <tr><td>brokerDetails <td> The broker connection string.
+ * <tr><td>virtualPath <td> The virtual path.
+ * <tr><td>transacted <td> A boolean flag, telling this client whether or not to use transactions.
+ * <tr><td>size <td> The size of ping messages to use, in bytes.
+ * </table>
+ *
+ * @param args The command line arguments as defined above.
*/
- public static void main(String[] args)
+ public static void main(String[] args) throws Exception
{
+ // Extract the command line.
if (args.length < 2)
{
- System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [transacted] [message size in bytes]");
+ System.err.println(
+ "Usage: TestPingPublisher <brokerDetails> <virtual path> [transacted] [persistent] [message size in bytes]");
System.exit(0);
}
- try
- {
- InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
- boolean transacted = false;
- if (args.length == 3 )
- {
- transacted = Boolean.parseBoolean(args[2]);
- }
- else if (args.length > 3 )
- {
- transacted = Boolean.parseBoolean(args[2]);
- _messageSize = Integer.parseInt(args[3]);
- }
-
- new TestPingProducer(transacted, args[0], clientID, args[1]);
- }
- catch (UnknownHostException e)
- {
- e.printStackTrace();
- }
- catch (AMQException e)
+
+ String brokerDetails = args[0];
+ String virtualpath = args[1];
+ boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false;
+ boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
+ int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE;
+
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+
+ Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath);
+
+ // Create a transactional or non-transactional session, based on the command line arguments.
+ Session session;
+
+ if (transacted)
{
- System.err.println("Error in client: " + e);
- e.printStackTrace();
+ session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
}
- catch (URLSyntaxException e)
+ else
{
- System.err.println("Error in connection arguments : " + e);
+ session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
- //System.exit(0);
+ // Create a queue to send the pings on.
+ Queue pingQueue = new AMQQueue(PING_QUEUE_NAME);
+ MessageProducer producer = (MessageProducer) session.createProducer(pingQueue);
+
+ // Create a ping producer to handle the request/wait/reply cycle.
+ _pingProducer = new TestPingProducer(session, producer, persistent, messageSize);
+
+ // Start the message consumers running.
+ _connection.start();
+
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
+
+ // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too.
+ _connection.setExceptionListener(_pingProducer);
+ Thread pingThread = new Thread(_pingProducer);
+ pingThread.run();
+
+ // Run until the ping loop is terminated.
+ pingThread.join();
}
/**
- * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
+ * Sends the specified ping message.
+ *
+ * @param message The message to send.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
- public void onException(JMSException e)
+ public void ping(Message message) throws JMSException
{
- System.err.println(e.getMessage());
+ _producer.send(message);
+
+ // Keep the messageId to correlate with the reply.
+ String messageId = message.getJMSMessageID();
+
+ // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
+ // this method, as the message will not be sent until the transaction is committed.
+ commitTx();
+ }
+ /**
+ * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
+ * flag has been cleared.
+ */
+ public void stop()
+ {
_publish = false;
- e.printStackTrace(System.err);
+ }
+
+ /**
+ * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
+ * waits for short pauses in between each.
+ */
+ public void pingLoop()
+ {
+ try
+ {
+ // Generate a sample message and time stamp it.
+ ObjectMessage msg = getTestMessage(_session, null, _messageSize, System.currentTimeMillis(), _persistent);
+ msg.setLongProperty("timestamp", System.currentTimeMillis());
+
+ // Send the message.
+ ping(msg);
+
+ // Introduce a short pause if desired.
+ pause(SLEEP_TIME);
+ }
+ catch (JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java
new file mode 100644
index 0000000000..bee75bb1eb
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongClient.java
@@ -0,0 +1,175 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.ping.AbstractPingClient;
+
+/**
+ * PingPongClient is a message listener the bounces back messages to their reply to destination. This is used to return
+ * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes too.
+ *
+ * <p/>The message id from the received message is extracted, and placed into the reply as the correlation id. Messages
+ * are bounced back to the reply-to destination. The original sender of the message has the option to use either a unique
+ * temporary queue or the correlation id to correlate the original message to the reply.
+ *
+ * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
+ * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
+ * be disabled for real timing tests as writing to the console will slow things down.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Bounce back messages to their reply to destination.
+ * <tr><td> Provide command line invocation to start the bounce back on a configurable broker url.
+ * </table>
+ *
+ * @todo Rename this to BounceBackClient or something similar.
+ */
+public class PingPongClient extends AbstractPingClient implements MessageListener
+{
+ private static final Logger _logger = Logger.getLogger(PingPongClient.class);
+
+ /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
+ private boolean _verbose = false;
+
+ /**
+ * Creates a PingPongClient on the specified session.
+ *
+ * @param session The JMS Session for the ping pon client to run on.
+ * @param consumer The message consumer to receive the messages with.
+ * @param verbose If set to <tt>true</tt> will output timing information on every message.
+ */
+ public PingPongClient(Session session, MessageConsumer consumer, boolean verbose) throws JMSException
+ {
+ // Hang on to the session for the replies.
+ super(session);
+
+ // Set this up to listen for messages on the queue.
+ consumer.setMessageListener(this);
+ }
+
+ /**
+ * Starts a stand alone ping-pong client running in verbose mode.
+ *
+ * @param args
+ */
+ public static void main(String[] args)
+ {
+ _logger.info("Starting...");
+
+ // Display help on the command line.
+ if (args.length < 4)
+ {
+ System.out.println("Usage: brokerdetails username password virtual-path [transacted] [selector]");
+ System.exit(1);
+ }
+
+ // Extract all comman line parameters.
+ String brokerDetails = args[0];
+ String username = args[1];
+ String password = args[2];
+ String virtualpath = args[3];
+ boolean transacted = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
+ String selector = (args.length == 6) ? args[5] : null;
+
+ try
+ {
+ InetAddress address = InetAddress.getLocalHost();
+
+ AMQConnection con1 = new AMQConnection(brokerDetails, username, password, address.getHostName(), virtualpath);
+
+ _logger.info("Connected with URL:" + con1.toURL());
+
+ // Create a transactional or non-transactional session depending on the command line parameter.
+ Session session = null;
+
+ if (transacted)
+ {
+ session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.SESSION_TRANSACTED);
+ }
+ else if (!transacted)
+ {
+ session = (org.apache.qpid.jms.Session) con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ Queue q = new AMQQueue("ping");
+
+ MessageConsumer consumer = session.createConsumer(q, 1, false, false, selector);
+ new PingPongClient(session, consumer, true);
+
+ con1.start();
+ }
+ catch (Throwable t)
+ {
+ System.err.println("Fatal error: " + t);
+ t.printStackTrace();
+ }
+
+ System.out.println("Waiting...");
+ }
+
+ /**
+ * This is a callback method that is notified of all messages for which this has been registered as a message
+ * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
+ * destination of the message.
+ *
+ * @param message The message that triggered this callback.
+ */
+ public void onMessage(javax.jms.Message message)
+ {
+ try
+ {
+ // Spew out some timing information if verbose mode is on.
+ if (_verbose)
+ {
+ Long timestamp = message.getLongProperty("timestamp");
+
+ if (timestamp != null)
+ {
+ long diff = System.currentTimeMillis() - timestamp;
+ _logger.info("Ping time: " + diff);
+ }
+ }
+
+ // Correlate the reply to the original.
+ message.setJMSCorrelationID(message.getJMSMessageID());
+
+ // Send the receieved message as the pong reply.
+ MessageProducer producer = _session.createProducer(message.getJMSReplyTo());
+ producer.send(message);
+
+ // Commit the transaction if running in transactional mode.
+ commitTx();
+ }
+ catch (JMSException e)
+ {
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
new file mode 100644
index 0000000000..8bb2da8b6f
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -0,0 +1,301 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQNoConsumersException;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.ping.AbstractPingProducer;
+import org.apache.qpid.util.concurrent.BooleanLatch;
+
+/**
+ * PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
+ * client (see {@link org.apache.qpid.requestreply.PingPongClient} for the bounce back client). It is designed to be run from the command line
+ * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session,
+ * message producer and message consumer to run the ping-pong cycle on.
+ *
+ * <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
+ * This means that this class has to do some work to correlate pings with pongs; it expectes the original message
+ * id in the ping to be bounced back in the correlation id. If a new temporary queue per ping were used, then
+ * this correlation would not need to be done.
+ *
+ * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
+ * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
+ * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
+ * also registered to terminate the ping-pong loop cleanly.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Provide a ping and wait for response cycle.
+ * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
+ * </table>
+ *
+ * @todo Make temp queue per ping a command line option.
+ *
+ * @todo Make the queue name a command line option.
+ */
+public class PingPongProducer extends AbstractPingProducer implements Runnable, MessageListener, ExceptionListener
+{
+ private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
+
+ /** Used to set up a default message size. */
+ private static final int DEFAULT_MESSAGE_SIZE = 0;
+
+ /** Used to define how long to wait between pings. */
+ private static final long SLEEP_TIME = 250;
+
+ /** Used to define how long to wait before assuming that a ping has timed out. */
+ private static final long TIMEOUT = 3000;
+
+ /** Holds the name of the queue to send pings on. */
+ private static final String PING_QUEUE_NAME = "ping";
+
+ /** Keeps track of the ping producer instance used in the run loop. */
+ private static PingPongProducer _pingProducer;
+
+ /** Holds the message producer to send the pings through. */
+ private MessageProducer _producer;
+
+ /** Holds the queue to send the ping replies to. */
+ private Queue _replyQueue;
+
+ /** Determines whether this producer sends persistent messages from the run method. */
+ private boolean _persistent;
+
+ /** Holds the message size to send, from the run method. */
+ private int _messageSize;
+
+ /** Holds a map from message ids to latches on which threads wait for replies. */
+ private Map<String, BooleanLatch> trafficLights = new HashMap<String, BooleanLatch>();
+
+ /** Holds a map from message ids to correlated replies. */
+ private Map<String, Message> replies = new HashMap<String, Message>();
+
+ public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer)
+ throws JMSException
+ {
+ super(session);
+ _producer = producer;
+ _replyQueue = replyQueue;
+
+ consumer.setMessageListener(this);
+ }
+
+ public PingPongProducer(Session session, Queue replyQueue, MessageProducer producer, MessageConsumer consumer,
+ boolean persistent, int messageSize) throws JMSException
+ {
+ this(session, replyQueue, producer, consumer);
+
+ _persistent = persistent;
+ _messageSize = messageSize;
+ }
+
+ /**
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongClient} also needs
+ * to be started to bounce the pings back again.
+ *
+ * <p/>The command line takes from 2 to 4 arguments:
+ * <p/><table>
+ * <tr><td>brokerDetails <td> The broker connection string.
+ * <tr><td>virtualPath <td> The virtual path.
+ * <tr><td>transacted <td> A boolean flag, telling this client whether or not to use transactions.
+ * <tr><td>size <td> The size of ping messages to use, in bytes.
+ * </table>
+ *
+ * @param args The command line arguments as defined above.
+ */
+ public static void main(String[] args) throws Exception
+ {
+ // Extract the command line.
+ if (args.length < 2)
+ {
+ System.err.println(
+ "Usage: TestPingPublisher <brokerDetails> <virtual path> [transacted] [persistent] [message size in bytes]");
+ System.exit(0);
+ }
+
+ String brokerDetails = args[0];
+ String virtualpath = args[1];
+ boolean transacted = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : false;
+ boolean persistent = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
+ int messageSize = (args.length >= 5) ? Integer.parseInt(args[4]) : DEFAULT_MESSAGE_SIZE;
+
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
+
+ Connection _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath);
+
+ // Create a transactional or non-transactional session, based on the command line arguments.
+ Session session;
+
+ if (transacted)
+ {
+ session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else
+ {
+ session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ // Create a queue to send the pings on.
+ Queue pingQueue = new AMQQueue(PING_QUEUE_NAME);
+ MessageProducer producer = (MessageProducer) session.createProducer(pingQueue);
+
+ // Create a temporary queue to reply with the pongs on.
+ Queue replyQueue = session.createTemporaryQueue();
+
+ // Create a message consumer to get the replies with.
+ MessageConsumer consumer = session.createConsumer(replyQueue);
+
+ // Create a ping producer to handle the request/wait/reply cycle.
+ _pingProducer = new PingPongProducer(session, replyQueue, producer, consumer, persistent, messageSize);
+
+ // Start the message consumers running.
+ _connection.start();
+
+ // Create a shutdown hook to terminate the ping-pong producer.
+ Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
+
+ // Start the ping loop running, ensuring that it is registered to listen for exceptions on the connection too.
+ _connection.setExceptionListener(_pingProducer);
+ Thread pingThread = new Thread(_pingProducer);
+ pingThread.run();
+
+ // Run until the ping loop is terminated.
+ pingThread.join();
+ }
+
+ /**
+ * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
+ * correlating reply may be waiting on.
+ *
+ * @param message The received message.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ // Store the reply.
+ String correlationID = message.getJMSCorrelationID();
+ replies.put(correlationID, message);
+
+ // Turn the traffic light to green.
+ BooleanLatch trafficLight = trafficLights.get(correlationID);
+
+ if (trafficLight != null)
+ {
+ trafficLight.signal();
+ }
+ else
+ {
+ _logger.debug("There was no thread waiting for reply: " + correlationID);
+ }
+ }
+ catch (JMSException e)
+ {
+ _logger.warn("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Sends the specified ping message and then waits for a correlating reply. If the wait times out before a reply
+ * arrives, then a null reply is returned from this method.
+ *
+ * @param message The message to send.
+ * @param timeout The timeout in milliseconds.
+ *
+ * @return The reply, or null if no reply arrives before the timeout.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public Message pingAndWaitForReply(Message message, long timeout) throws JMSException
+ {
+ _producer.send(message);
+
+ // Keep the messageId to correlate with the reply.
+ String messageId = message.getJMSMessageID();
+
+ // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
+ // this method, as the message will not be sent until the transaction is committed.
+ commitTx();
+
+ // Block the current thread until a reply to the message is received, or it times out.
+ BooleanLatch trafficLight = new BooleanLatch();
+ trafficLights.put(messageId, trafficLight);
+
+ // Note that this call expects a timeout in nanoseconds, millisecond timeout is multiplied up.
+ trafficLight.await(timeout * 1000);
+
+ // Check the replies to see if one was generated, if not then the reply timed out.
+ Message result = replies.get(messageId);
+
+ return result;
+ }
+
+ /**
+ * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
+ * connection, this clears the publish flag which in turn will halt the ping loop.
+ *
+ * @param e The exception that triggered this callback method.
+ */
+ public void onException(JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+
+ /**
+ * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
+ * waits for replies and inserts short pauses in between each.
+ */
+ public void pingLoop()
+ {
+ try
+ {
+ // Generate a sample message and time stamp it.
+ ObjectMessage msg = getTestMessage(_session, _replyQueue, _messageSize, System.currentTimeMillis(), _persistent);
+ msg.setLongProperty("timestamp", System.currentTimeMillis());
+
+ // Send the message and wait for a reply.
+ pingAndWaitForReply(msg, TIMEOUT);
+
+ // Introduce a short pause if desired.
+ pause(SLEEP_TIME);
+ }
+ catch (JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+ }
+}
diff --git a/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
new file mode 100644
index 0000000000..74f1a899cf
--- /dev/null
+++ b/java/perftests/src/test/java/org/apache/qpid/requestreply/PingPongTestPerf.java
@@ -0,0 +1,245 @@
+package org.apache.qpid.requestreply;
+
+import java.net.InetAddress;
+import java.util.Properties;
+
+import javax.jms.*;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.MessageProducer;
+import org.apache.qpid.jms.Session;
+
+/**
+ * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run
+ * many times simultaneously. A full round trip ping sends a message from a producer to a conumer, then the consumer
+ * replies to the message on a temporary queue.
+ *
+ * <p/>A single run of the test using the default JUnit test runner will result in the sending and timing of a single
+ * full round trip ping. This test may be scaled up using a suitable JUnit test runner. See {@link TKTestRunner} or
+ * {@link PPTestRunner} for more information on how to do this.
+ *
+ * <p/>The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a
+ * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run,
+ * except if the connection is lost in which case an attempt to re-establish the setup is made.
+ *
+ * <p/>The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that
+ * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the
+ * temporary queue.
+ *
+ * <p/>Configurable test properties: message size, transacted or not, persistent or not. Broker connection details.
+ *
+ * <p><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * </table>
+ *
+ * @author Rupert Smith
+ */
+public class PingPongTestPerf extends TestCase implements ExceptionListener //, TimingControllerAware
+{
+ private static Logger _logger = Logger.getLogger(PingPongTestPerf.class);
+
+ /** Holds the name of the property to get the test message size from. */
+ private static final String MESSAGE_SIZE_PROPNAME = "messageSize";
+
+ /** Holds the name of the property to get the ping queue name from. */
+ private static final String PING_QUEUE_NAME_PROPNAME = "pingQueue";
+
+ /** Holds the name of the property to get the test delivery mode from. */
+ private static final String PERSISTENT_MODE_PROPNAME = "persistent";
+
+ /** Holds the name of the property to get the test transactional mode from. */
+ private static final String TRANSACTED_PROPNAME = "transacted";
+
+ /** Holds the name of the property to get the test broker url from. */
+ private static final String BROKER_PROPNAME = "broker";
+
+ /** Holds the name of the property to get the test broker virtual path. */
+ private static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
+
+ /** Holds the size of message body to attach to the ping messages. */
+ private static final int MESSAGE_SIZE_DEFAULT = 0;
+
+ /** Holds the name of the queue to which pings are sent. */
+ private static final String PING_QUEUE_NAME_DEFAULT = "ping";
+
+ /** Holds the message delivery mode to use for the test. */
+ private static final boolean PERSISTENT_MODE_DEFAULT = false;
+
+ /** Holds the transactional mode to use for the test. */
+ private static final boolean TRANSACTED_DEFAULT = false;
+
+ /** Holds the default broker url for the test. */
+ private static final String BROKER_DEFAULT = "tcp://localhost:5672";
+
+ /** Holds the default virtual path for the test. */
+ private static final String VIRTUAL_PATH_DEFAULT = "/test";
+
+ /** Sets a default ping timeout. */
+ private static final long TIMEOUT = 3000;
+
+ // Sets up the test parameters with defaults.
+ static
+ {
+ setSystemPropertyIfNull(MESSAGE_SIZE_PROPNAME, Integer.toString(MESSAGE_SIZE_DEFAULT));
+ setSystemPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
+ setSystemPropertyIfNull(PERSISTENT_MODE_PROPNAME, Boolean.toString(PERSISTENT_MODE_DEFAULT));
+ setSystemPropertyIfNull(TRANSACTED_PROPNAME, Boolean.toString(TRANSACTED_DEFAULT));
+ setSystemPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
+ setSystemPropertyIfNull(VIRTUAL_PATH_PROPNAME, VIRTUAL_PATH_DEFAULT);
+ }
+
+ /** Holds the test ping-pong producer. */
+ private PingPongProducer _testPingProducer;
+
+ // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in
+ // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner
+ // of the test parameters to log with the results.
+ private Properties testParameters = System.getProperties();
+ //private Properties testParameters = new ContextualProperties(System.getProperties());
+
+ /** Holds the connection to the broker. */
+ private Connection _connection = null;
+
+ /** Holds the current session to the broker. */
+ private Session _session;
+
+ /** Holds the destination to send the ping messages to. */
+ private Queue _pingQueue;
+
+ /** Holds the destination to send replies to. */
+ private Queue _replyQueue;
+
+ /** Holds a message producer, set up on the ping destination, to send messages through. */
+ private MessageProducer _producer;
+
+ /** Holds a message consumer, set up on the ping destination, to receive pings through. */
+ private MessageConsumer _pingConsumer;
+
+ /** Holds a message consumer, set up on the pong destination, to receive replies through. */
+ private MessageConsumer _pongConsumer;
+
+ /** Holds a failure flag, which gets set if the connection to the broker goes down. */
+ private boolean _failure;
+
+ public PingPongTestPerf(String name)
+ {
+ super(name);
+ }
+
+ private static void setSystemPropertyIfNull(String propName, String propValue)
+ {
+ if (System.getProperty(propName) == null)
+ {
+ System.setProperty(propName, propValue);
+ }
+ }
+
+ public void testPingPongOk() throws Exception
+ {
+ // Generate a sample message. This message is already time stamped and has its reply-to destination set.
+ ObjectMessage msg =
+ PingPongProducer.getTestMessage(_session, _replyQueue,
+ Integer.parseInt(testParameters.getProperty(MESSAGE_SIZE_PROPNAME)),
+ System.currentTimeMillis(),
+ Boolean.parseBoolean(testParameters.getProperty(PERSISTENT_MODE_PROPNAME)));
+
+ // Use the test timing controller to reset the test timer now and obtain the current time.
+ // This can be used to remove the message creation time from the test.
+ //TestTimingController timingUtils = getTimingController();
+ //long startTime = timingUtils.restart();
+
+ // Send the message and wait for a reply.
+ Message reply = _testPingProducer.pingAndWaitForReply(msg, TIMEOUT);
+
+ // Fail the test if the timeout was exceeded.
+ if (reply == null)
+ {
+ Assert.fail("The ping timed out for message id: " + msg.getJMSMessageID());
+ }
+ }
+
+ /**
+ * This is a callback method that is registered to receive any JMSExceptions that occurr on the connection to
+ * the broker. It sets a failure flag to indicate that there is an error condition.
+ *
+ * @param e The JMSException that triggered this callback method.
+ *
+ * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
+ */
+ public void onException(JMSException e)
+ {
+ // Set the failure flag.
+ _failure = true;
+
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
+
+ protected void setUp() throws Exception
+ {
+ // Log4j will propagate the test name as a thread local in all log output.
+ NDC.push(getName());
+
+ // Ensure that the connection, session and ping queue are established, if they have not already been.
+ if (_connection == null)
+ {
+ // Create a client id that identifies the client machine.
+ String clientID = InetAddress.getLocalHost().getHostName() + System.currentTimeMillis();
+
+ // Connect to the broker.
+ _connection = new AMQConnection(testParameters.getProperty(BROKER_PROPNAME), "guest", "guest", clientID,
+ testParameters.getProperty(VIRTUAL_PATH_PROPNAME));
+ _connection.setExceptionListener(this);
+
+ // Create a transactional or non-transactional session, based on the test properties, if a session has not
+ // already been created.
+ if (Boolean.parseBoolean(testParameters.getProperty(TRANSACTED_PROPNAME)))
+ {
+ _session = (Session) _connection.createSession(true, Session.SESSION_TRANSACTED);
+ }
+ else
+ {
+ _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ // Create a queue to send the pings on.
+ _pingQueue = new AMQQueue(testParameters.getProperty(PING_QUEUE_NAME_PROPNAME));
+ _producer = (MessageProducer) _session.createProducer(_pingQueue);
+
+ // Create a temporary queue to reply with the pongs on.
+ _replyQueue = _session.createTemporaryQueue();
+
+ // Create the ping and pong consumers on their respective destinations.
+ _pingConsumer = _session.createConsumer(_pingQueue);
+ _pongConsumer = _session.createConsumer(_replyQueue);
+
+ // Establish a bounce back client on the ping queue to bounce back the pings.
+ new org.apache.qpid.requestreply.PingPongClient(_session, _pingConsumer, false);
+
+ // Establish a ping-pong client on the ping queue to send pings and wait for replies.
+ _testPingProducer = new org.apache.qpid.requestreply.PingPongProducer(_session, _replyQueue, _producer,
+ _pongConsumer);
+
+ _connection.start();
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ _connection.close();
+ }
+ finally
+ {
+ NDC.pop();
+ }
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index b00c36c002..fdaba94ce6 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -337,6 +337,15 @@
</execution>
</executions>
</plugin>
+
+ <!-- The JUnit Toolkit maven2 plugin is in the process of being added to the maven repository. It will take a day or two from 16/1/2007.
+ <plugin>
+ <groupId>uk.co.thebadgerset</groupId>
+ <artifactId>junit-toolkit-maven-plugin</artifactId>
+ <version>0.3</version>
+ </plugin>
+ -->
+
</plugins>
</pluginManagement>
@@ -437,14 +446,12 @@
<scope>test</scope>
</dependency>
- <!-- Will be added to maven repo soon. JUnit test runner that can add repeats/concurrenct/timing etc to tests.
<dependency>
<groupId>uk.co.thebadgerset</groupId>
<artifactId>junit-toolkit</artifactId>
<version>0.3</version>
<scope>test</scope>
</dependency>
- -->
<!-- Qpid Version Dependencies -->
<dependency>