summaryrefslogtreecommitdiff
path: root/java/perftests/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'java/perftests/src/main')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java204
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java524
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java192
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java223
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java249
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/TestPingPublisher.java197
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java134
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java67
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java206
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java1137
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java235
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java428
12 files changed, 906 insertions, 2890 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
deleted file mode 100644
index 97b411323e..0000000000
--- a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingClient.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-
-import javax.jms.JMSException;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.jms.Session;
-
-/**
- * Provides common functionality that ping clients (the recipients of ping messages) can use. This base class keeps
- * track of the connection used to send pings, provides a convenience method to commit a transaction only when a session
- * to commit on is transactional, keeps track of whether the ping client is pinging to a queue or a topic, provides
- * prompts to the console to terminate brokers before and after commits, in order to test failover functionality, and
- * provides a convience formatter for outputing readable timestamps for pings.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Commit the current transcation on a session.
- * <tr><td> Generate failover promts.
- * <tr><td> Keep track the connection.
- * <tr><td> Keep track of p2p or topic ping type.
- * </table>
- *
- * @todo This base class does not seem particularly usefull and all functionality is duplicated in {@link AbstractPingProducer}.
- * Merge it into that class.
- */
-public abstract class AbstractPingClient
-{
- private static final Logger _logger = Logger.getLogger(TestPingClient.class);
-
- /** A convenient formatter to use when time stamping output. */
- protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
-
- /** Holds the connection to the broker. */
- private AMQConnection _connection;
-
- /** Flag used to indicate if this is a point to point or pub/sub ping client. */
- private boolean _isPubSub = false;
-
- /**
- * This flag is used to indicate that the user should be prompted to kill a broker, in order to test
- * failover, immediately before committing a transaction.
- */
- protected boolean _failBeforeCommit = false;
-
- /**
- * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test
- * failover, immediate after committing a transaction.
- */
- protected boolean _failAfterCommit = false;
-
- /**
- * Convenience method to commit the transaction on the specified session. If the session to commit on is not
- * a transactional session, this method does nothing.
- *
- * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
- * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
- * after the commit is applied.
- *
- * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
- */
- protected void commitTx(Session session) throws JMSException
- {
- if (session.getTransacted())
- {
- try
- {
- if (_failBeforeCommit)
- {
- _logger.trace("Failing Before Commit");
- doFailover();
- }
-
- session.commit();
-
- if (_failAfterCommit)
- {
- _logger.trace("Failing After Commit");
- doFailover();
- }
-
- _logger.trace("Session Commited.");
- }
- catch (JMSException e)
- {
- _logger.trace("JMSException on commit:" + e.getMessage(), e);
-
- try
- {
- session.rollback();
- _logger.debug("Message rolled back.");
- }
- catch (JMSException jmse)
- {
- _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
-
- // Both commit and rollback failed. Throw the rollback exception.
- throw jmse;
- }
- }
- }
- }
-
- /**
- * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block
- * until the user supplied some input on the terminal.
- *
- * @param broker The name of the broker to terminate.
- */
- protected void doFailover(String broker)
- {
- System.out.println("Kill Broker " + broker + " now.");
- try
- {
- System.in.read();
- }
- catch (IOException e)
- { }
-
- System.out.println("Continuing.");
- }
-
- /**
- * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
- * until the user supplied some input on the terminal.
- */
- protected void doFailover()
- {
- System.out.println("Kill Broker now.");
- try
- {
- System.in.read();
- }
- catch (IOException e)
- { }
-
- System.out.println("Continuing.");
-
- }
-
- /**
- * Gets the underlying connection that this ping client is running on.
- *
- * @return The underlying connection that this ping client is running on.
- */
- public AMQConnection getConnection()
- {
- return _connection;
- }
-
- /**
- * Sets the connection that this ping client is using.
- *
- * @param connection The ping connection.
- */
- public void setConnection(AMQConnection connection)
- {
- this._connection = connection;
- }
-
- /**
- * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic.
- *
- * @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
- */
- public void setPubSub(boolean pubsub)
- {
- _isPubSub = pubsub;
- }
-
- /**
- * Checks whether this client is a p2p or pub/sub ping client.
- *
- * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
- */
- public boolean isPubSub()
- {
- return _isPubSub;
- }
-}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
deleted file mode 100644
index 091a865473..0000000000
--- a/java/perftests/src/main/java/org/apache/qpid/ping/AbstractPingProducer.java
+++ /dev/null
@@ -1,524 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.*;
-import javax.jms.Connection;
-import javax.jms.Message;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.message.TestMessageFactory;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.Session;
-
-/**
- * Provides common functionality that ping producers (the senders of ping messages) can use. This base class keeps
- * track of the connection used to send pings; provides a convenience method to commit a transaction only when a session
- * to commit on is transactional; keeps track of whether the ping client is pinging to a queue or a topic; provides
- * prompts to the console to terminate brokers before and after commits, in order to test failover functionality;
- * requires sub-classes to implement a ping loop, that this provides a run loop to repeatedly call; provides a
- * default shutdown hook to cleanly terminate the run loop; keeps track of the destinations to send pings to;
- * provides a convenience method to generate short pauses; and provides a convience formatter for outputing readable
- * timestamps for pings.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Commit the current transcation on a session.
- * <tr><td> Generate failover promts.
- * <tr><td> Keep track the connection.
- * <tr><td> Keep track of p2p or topic ping type.
- * <tr><td> Call ping loop to repeatedly send pings.
- * <tr><td> Provide a shutdown hook.
- * <tr><td> Generate short pauses.
- * </table>
- *
- * @todo Destination count versus list of desintations is redundant. Use _destinions.size() to get the count and
- * use a list of 1 destination when only 1 is needed. It is only important to distinguish when 1 destination
- * is shared between multiple ping producers on the same JVM or if each ping producer has its own single
- * destination.
- *
- * @todo Timestamp messages in nanos, not millis. Millis seems to have bad resolution, at least on windows.
- */
-public abstract class AbstractPingProducer implements Runnable, ExceptionListener
-{
- private static final Logger _logger = Logger.getLogger(AbstractPingProducer.class);
-
- /** Flag used to indicate if this is a point to point or pub/sub ping client. */
- private boolean _isPubSub = false;
-
- /** A convenient formatter to use when time stamping output. */
- protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
-
- /**
- * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
- * creating multiple ping producers in the same JVM.
- */
- private static AtomicInteger _queueSequenceID = new AtomicInteger();
-
- /** Used to tell the ping loop when to terminate, it only runs while this is true. */
- protected boolean _publish = true;
-
- /** Holds the connection to the broker. */
- private Connection _connection;
-
- /** Holds the producer session, needed to create ping messages. */
- private Session _producerSession;
-
- /** Holds the number of destinations that this ping producer will send pings to, defaulting to a single destination. */
- protected int _destinationCount = 1;
-
- /** Holds the set of destiniations that this ping producer pings. */
- private List<Destination> _destinations = new ArrayList<Destination>();
-
- /** Holds the message producer to send the pings through. */
- protected org.apache.qpid.jms.MessageProducer _producer;
-
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
- protected boolean _failBeforeCommit = false;
-
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
- protected boolean _failAfterCommit = false;
-
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
- protected boolean _failBeforeSend = false;
-
- /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
- protected boolean _failAfterSend = false;
-
- /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
- protected boolean _failOnce = true;
-
- /** Holds the number of sends that should be performed in every transaction when using transactions. */
- protected int _txBatchSize = 1;
-
- /**
- * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic.
- *
- * @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
- */
- public void setPubSub(boolean pubsub)
- {
- _isPubSub = pubsub;
- }
-
- /**
- * Checks whether this client is a p2p or pub/sub ping client.
- *
- * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
- */
- public boolean isPubSub()
- {
- return _isPubSub;
- }
-
- /**
- * Convenience method for a short pause.
- *
- * @param sleepTime The time in milliseconds to pause for.
- */
- public static void pause(long sleepTime)
- {
- if (sleepTime > 0)
- {
- try
- {
- Thread.sleep(sleepTime);
- }
- catch (InterruptedException ie)
- { }
- }
- }
-
- /**
- * Implementations should provide this method to perform a single ping cycle (which may send many messages). The
- * run loop will repeatedly call this method until the publish flag is set to false.
- */
- public abstract void pingLoop();
-
- /**
- * Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
- *
- * @param replyQueue The reply-to destination for the message.
- * @param messageSize The desired size of the message in bytes.
- * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
- *
- * @return A freshly generated test message.
- *
- * @throws JMSException All underlying JMSException are allowed to fall through.
- */
- public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
- {
- ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
- // Timestamp the message.
- msg.setLongProperty("timestamp", System.currentTimeMillis());
-
- return msg;
- }
-
- /**
- * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
- * flag has been cleared.
- */
- public void stop()
- {
- _publish = false;
- }
-
- /**
- * Implements a ping loop that repeatedly pings until the publish flag becomes false.
- */
- public void run()
- {
- // Keep running until the publish flag is cleared.
- while (_publish)
- {
- pingLoop();
- }
- }
-
- /**
- * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
- * connection, this clears the publish flag which in turn will halt the ping loop.
- *
- * @param e The exception that triggered this callback method.
- */
- public void onException(JMSException e)
- {
- _publish = false;
- _logger.debug("There was a JMSException: " + e.getMessage(), e);
- }
-
- /**
- * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
- * with the runtime system as a shutdown hook.
- *
- * @return A shutdown hook for the ping loop.
- */
- public Thread getShutdownHook()
- {
- return new Thread(new Runnable()
- {
- public void run()
- {
- stop();
- }
- });
- }
-
- /**
- * Gets the underlying connection that this ping client is running on.
- *
- * @return The underlying connection that this ping client is running on.
- */
- public Connection getConnection()
- {
- return _connection;
- }
-
- /**
- * Sets the connection that this ping client is using.
- *
- * @param connection The ping connection.
- */
- public void setConnection(Connection connection)
- {
- this._connection = connection;
- }
-
- /**
- * Gets the producer session that the ping client is using to send pings on.
- *
- * @return The producer session that the ping client is using to send pings on.
- */
- public Session getProducerSession()
- {
- return _producerSession;
- }
-
- /**
- * Keeps track of the producer session that the ping client is using to send pings on.
- *
- * @param session The producer session that the ping client is using to send pings on.
- */
- public void setProducerSession(Session session)
- {
- this._producerSession = session;
- }
-
- /**
- * Gets the number of destinations that this ping client is sending to.
- *
- * @return The number of destinations that this ping client is sending to.
- */
- public int getDestinationsCount()
- {
- return _destinationCount;
- }
-
- /**
- * Sets the number of destination that this ping client should send to.
- *
- * @param count The number of destination that this ping client should send to.
- *
- * @deprectaed Use _destinations.size() instead.
- */
- public void setDestinationsCount(int count)
- {
- this._destinationCount = count;
- }
-
- /**
- * Commits the transaction on the producer session.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- *
- * @deprecated Use the commitTx(Session session) method instead, to explicitly specify which session is being
- * committed. This makes it more obvious what is going on.
- */
- protected void commitTx() throws JMSException
- {
- commitTx(getProducerSession());
- }
-
- /**
- * Creates the specified number of destinations to send pings to. Topics or Queues will be created depending on
- * the value of the {@link #_isPubSub} flag.
- *
- * @param count The number of ping destinations to create.
- */
- protected void createDestinations(int count)
- {
- // Create the desired number of ping destinations.
- for (int i = 0; i < count; i++)
- {
- AMQDestination destination = null;
-
- // Check if this is a pub/sub pinger, in which case create topics.
- if (isPubSub())
- {
- AMQShortString name =
- new AMQShortString("AMQTopic_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
- destination = new AMQTopic(name);
- }
- // Otherwise this is a p2p pinger, in which case create queues.
- else
- {
- AMQShortString name =
- new AMQShortString("AMQQueue_" + _queueSequenceID.incrementAndGet() + "_" + System.currentTimeMillis());
- destination = new AMQQueue(name, name, false, false, false);
- }
-
- _destinations.add(destination);
- }
- }
-
- /**
- * Returns the destination from the destinations list with the given index.
- *
- * @param index The index of the destination to get.
- *
- * @return Destination with the given index.
- */
- protected Destination getDestination(int index)
- {
- return _destinations.get(index);
- }
-
- /**
- * Convenience method to commit the transaction on the specified session. If the session to commit on is not
- * a transactional session, this method does nothing (unless the failover after send flag is set).
- *
- * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit
- * is applied. This flag applies whether the pinger is transactional or not.
- *
- * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
- * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
- * after the commit is applied. These flags will only apply if using a transactional pinger.
- *
- * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
- *
- * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
- * method, because commits only apply to transactional pingers, but fail after send applied to transactional
- * and non-transactional alike.
- */
- protected void commitTx(Session session) throws JMSException
- {
- _logger.trace("Batch time reached");
- if (_failAfterSend)
- {
- _logger.trace("Batch size reached");
- if (_failOnce)
- {
- _failAfterSend = false;
- }
-
- _logger.trace("Failing After Send");
- doFailover();
- }
-
- if (session.getTransacted())
- {
- try
- {
- if (_failBeforeCommit)
- {
- if (_failOnce)
- {
- _failBeforeCommit = false;
- }
-
- _logger.trace("Failing Before Commit");
- doFailover();
- }
-
- session.commit();
-
- if (_failAfterCommit)
- {
- if (_failOnce)
- {
- _failAfterCommit = false;
- }
-
- _logger.trace("Failing After Commit");
- doFailover();
- }
-
- _logger.trace("Session Commited.");
- }
- catch (JMSException e)
- {
- _logger.trace("JMSException on commit:" + e.getMessage(), e);
-
- // Warn that the bounce back client is not available.
- if (e.getLinkedException() instanceof AMQNoConsumersException)
- {
- _logger.debug("No consumers on queue.");
- }
-
- try
- {
- session.rollback();
- _logger.trace("Message rolled back.");
- }
- catch (JMSException jmse)
- {
- _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
-
- // Both commit and rollback failed. Throw the rollback exception.
- throw jmse;
- }
- }
- }
- }
-
- /**
- * Sends the specified message to the default destination of the ping producer.
- *
- * @param message The message to send.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
- protected void sendMessage(Message message) throws JMSException
- {
- sendMessage(null, message);
- }
-
- /**
- * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination
- * of the ping producer. If an explicit destination is set, this overrides the default.
- *
- * @param destination The destination to send to.
- * @param message The message to send.
- *
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
- protected void sendMessage(Destination destination, Message message) throws JMSException
- {
- if (_failBeforeSend)
- {
- if (_failOnce)
- {
- _failBeforeSend = false;
- }
-
- _logger.trace("Failing Before Send");
- doFailover();
- }
-
- if (destination == null)
- {
- _producer.send(message);
- }
- else
- {
- _producer.send(destination, message);
- }
- }
-
- /**
- * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block
- * until the user supplied some input on the terminal.
- *
- * @param broker The name of the broker to terminate.
- */
- protected void doFailover(String broker)
- {
- System.out.println("Kill Broker " + broker + " now then press return");
- try
- {
- System.in.read();
- }
- catch (IOException e)
- { }
-
- System.out.println("Continuing.");
- }
-
- /**
- * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
- * until the user supplied some input on the terminal.
- */
- protected void doFailover()
- {
- System.out.println("Kill Broker now then press return");
- try
- {
- System.in.read();
- }
- catch (IOException e)
- { }
-
- System.out.println("Continuing.");
- }
-}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
deleted file mode 100644
index 949ace20e1..0000000000
--- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingClient.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import java.net.InetAddress;
-
-import javax.jms.*;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.jms.Session;
-
-/**
- * PingClient is a message listener that received time stamped ping messages. It can work out how long a ping took,
- * provided that its clokc is synchronized to that of the ping producer, or by running it on the same machine (or jvm)
- * as the ping producer.
- * <p/>
- * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
- * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
- * be disabled for real timing tests as writing to the console will slow things down.
- * <p/>
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Provide command line invocation to start the ping consumer on a configurable broker url.
- * </table>
- *
- * @todo Add a better command line interpreter to the main method. The command line is not very nice...
- */
-class TestPingClient extends AbstractPingClient implements MessageListener
-{
- private static final Logger _logger = Logger.getLogger(TestPingClient.class);
-
- /**
- * Used to indicate that the reply generator should log timing info to the console (logger info level).
- */
- private boolean _verbose = false;
-
- /**
- * The producer session.
- */
- private Session _consumerSession;
-
- /**
- * Creates a TestPingClient on the specified session.
- *
- * @param brokerDetails
- * @param username
- * @param password
- * @param queueName
- * @param virtualpath
- * @param transacted
- * @param selector
- * @param verbose
- * @param afterCommit
- *@param beforeCommit @throws Exception All underlying exceptions allowed to fall through. This is only test code...
- */
- public TestPingClient(String brokerDetails, String username, String password, String queueName, String virtualpath,
- boolean transacted, String selector, boolean verbose, boolean afterCommit, boolean beforeCommit) throws Exception
- {
- // Create a connection to the broker.
- InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
-
- setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
-
- // Create a transactional or non-transactional session depending on the command line parameter.
- _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
-
- // Connect a consumer to the ping queue and register this to be called back by it.
- Queue q = new AMQQueue(queueName);
- MessageConsumer consumer = _consumerSession.createConsumer(q, 1, false, false, selector);
-
- consumer.setMessageListener(this);
-
- // Hang on to the verbose flag setting.
- _verbose = verbose;
-
- // Set failover interrupts
- _failAfterCommit = afterCommit;
- _failBeforeCommit = beforeCommit;
- }
-
- /**
- * Starts a stand alone ping-pong client running in verbose mode.
- *
- * @param args
- */
- public static void main(String[] args) throws Exception
- {
- _logger.info("Starting...");
-
- // Display help on the command line.
- if (args.length < 4)
- {
- System.out.println(
- "Usage: brokerdetails username password virtual-path [queueName] [verbose] [transacted] [selector] [failover:<before|after>:commit]");
- System.exit(1);
- }
-
- // Extract all command line parameters.
- String brokerDetails = args[0];
- String username = args[1];
- String password = args[2];
- String virtualpath = args[3];
- String queueName = (args.length >= 5) ? args[4] : "ping";
- boolean verbose = (args.length >= 6) ? Boolean.parseBoolean(args[5]) : true;
- boolean transacted = (args.length >= 7) ? Boolean.parseBoolean(args[6]) : false;
- String selector = (args.length == 8) ? args[7] : null;
-
- boolean afterCommit = false;
- boolean beforeCommit = false;
-
- for (String arg : args)
- {
- if (arg.startsWith("failover:"))
- {
- //failover:<before|after>:<send:commit>
- String[] parts = arg.split(":");
- if (parts.length == 3)
- {
- if (parts[2].equals("commit"))
- {
- afterCommit = parts[1].equals("after");
- beforeCommit = parts[1].equals("before");
- }
- }
- else
- {
- System.out.println("Unrecognized failover request:" + arg);
- }
- }
- }
-
- // Create the test ping client and set it running.
- TestPingClient pingClient =
- new TestPingClient(brokerDetails, username, password, queueName, virtualpath, transacted, selector, verbose, afterCommit, beforeCommit);
-
- pingClient.getConnection().start();
-
- System.out.println("Waiting...");
- }
- /**
- * This is a callback method that is notified of all messages for which this has been registered as a message
- * listener on a message consumer.
- *
- * @param message The message that triggered this callback.
- */
- public void onMessage(javax.jms.Message message)
- {
- try
- {
- // Spew out some timing information if verbose mode is on.
- if (_verbose)
- {
- Long timestamp = message.getLongProperty("timestamp");
-
- if (timestamp != null)
- {
- long diff = System.currentTimeMillis() - timestamp;
- System.out.println("Ping time: " + diff);
- }
- }
-
- // Commit the transaction if running in transactional mode.
- commitTx(_consumerSession);
- }
- catch (JMSException e)
- {
- _logger.error("There was a JMSException: " + e.getMessage(), e);
- }
- }
-}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
deleted file mode 100644
index acb0135b86..0000000000
--- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingItself.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.ObjectMessage;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.requestreply.PingPongProducer;
-import org.apache.qpid.topic.Config;
-
-/**
- * This class is used to test sending and receiving messages to (pingQueue) and from a queue (replyQueue).
- * The producer and consumer created by this test send and receive messages to and from the same Queue. ie.
- * pingQueue and replyQueue are same.
- * This class extends @see org.apache.qpid.requestreply.PingPongProducer which different ping and reply Queues
- */
-public class TestPingItself extends PingPongProducer
-{
- private static final Logger _logger = Logger.getLogger(TestPingItself.class);
-
- /**
- * If noOfDestinations is <= 1 : There will be one Queue and one consumer instance for the test
- * If noOfDestinations is > 1 : This creats a client for tests with multiple queues. Creates as many consumer instances
- * as there are queues, each listening to a Queue. A producer is created which picks up a queue from
- * the list of queues to send message
- *
- * @param brokerDetails
- * @param username
- * @param password
- * @param virtualpath
- * @param queueName
- * @param selector
- * @param transacted
- * @param persistent
- * @param messageSize
- * @param verbose
- * @param afterCommit
- * @param beforeCommit
- * @param afterSend
- * @param beforeSend
- * @param failOnce
- * @param batchSize
- * @param noOfDestinations
- * @throws Exception
- */
- public TestPingItself(String brokerDetails, String username, String password, String virtualpath, String queueName,
- String selector, boolean transacted, boolean persistent, int messageSize, boolean verbose,
- boolean afterCommit, boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce,
- int batchSize, int noOfDestinations, int rate, boolean pubsub) throws Exception
- {
- super(brokerDetails, username, password, virtualpath, queueName, selector, transacted, persistent,
- messageSize, verbose, afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
- noOfDestinations, rate, pubsub);
-
- if (noOfDestinations > DEFAULT_DESTINATION_COUNT)
- {
- createDestinations(noOfDestinations);
- _persistent = persistent;
- _messageSize = messageSize;
- _verbose = verbose;
-
- createConsumers(selector);
- createProducer();
- }
- }
-
- /**
- * Sets the replyQueue to be the same as ping queue.
- */
- @Override
- public void createConsumer(String selector) throws JMSException
- {
- // Create a message consumer to get the replies with and register this to be called back by it.
- setReplyDestination(getPingDestination());
- MessageConsumer consumer =
- getConsumerSession().createConsumer(getReplyDestination(), PREFETCH, false, EXCLUSIVE, selector);
- consumer.setMessageListener(this);
- }
-
- /**
- * Starts a ping-pong loop running from the command line.
- *
- * @param args The command line arguments as defined above.
- */
- public static void main(String[] args) throws Exception
- {
- // Extract the command line.
- Config config = new Config();
- config.setOptions(args);
- if (args.length == 0)
- {
- _logger.info("Running test with default values...");
- }
-
- String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "/test";
- boolean verbose = true;
- boolean transacted = config.isTransacted();
- boolean persistent = config.usePersistentMessages();
- int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE;
- int messageCount = config.getMessages();
- int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT;
- int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_BATCH_SIZE;
- int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE;
- boolean pubsub = config.isPubSub();
-
- String destName = config.getDestination();
- if (destName == null)
- {
- destName = PING_DESTINATION_NAME;
- }
-
- boolean afterCommit = false;
- boolean beforeCommit = false;
- boolean afterSend = false;
- boolean beforeSend = false;
- boolean failOnce = false;
-
- for (String arg : args)
- {
- if (arg.startsWith("failover:"))
- {
- //failover:<before|after>:<send:commit>
- String[] parts = arg.split(":");
- if (parts.length == 3)
- {
- if (parts[2].equals("commit"))
- {
- afterCommit = parts[1].equals("after");
- beforeCommit = parts[1].equals("before");
- }
-
- if (parts[2].equals("send"))
- {
- afterSend = parts[1].equals("after");
- beforeSend = parts[1].equals("before");
- }
-
- if (parts[1].equals("once"))
- {
- failOnce = true;
- }
-
- }
- else
- {
- System.out.println("Unrecognized failover request:" + arg);
- }
- }
- }
-
- // Create a ping producer to handle the request/wait/reply cycle.
- TestPingItself pingItself = new TestPingItself(brokerDetails, "guest", "guest", virtualpath, destName, null,
- transacted, persistent, messageSize, verbose, afterCommit,
- beforeCommit, afterSend, beforeSend, failOnce, batchSize,
- destCount, rate, pubsub);
-
- pingItself.getConnection().start();
-
- // Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(pingItself.getShutdownHook());
-
- // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
- pingItself.getConnection().setExceptionListener(pingItself);
-
- if ((destCount > DEFAULT_DESTINATION_COUNT) || (messageCount > 0))
- {
- _logger.info("Destinations Count:" + destCount + ", Transacted:" + transacted + ", persistent:" +
- persistent + ",Message Size:" + messageSize + " bytes, pubsub:" + pubsub);
- pingItself.pingLoop();
- }
- else
- {
- _logger.info("Destination:" + destName + ", Transacted:" + transacted + ", persistent:" +
- persistent + ",Message Size:" + messageSize + " bytes, pubsub:" + pubsub);
- // set the message count to 0 to run this loop
- // Run a few priming pings to remove warm up time from test results.
- pingItself.prime(PRIMING_LOOPS);
-
- _logger.info("Running the infinite loop and pinging the broker...");
- // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
- Thread pingThread = new Thread(pingItself);
- pingThread.run();
- pingThread.join();
- }
-
- pingItself.getConnection().close();
- }
-
- private static void usage()
- {
- System.err.println("Usage: TestPingPublisher \n" + "-host : broker host" + "-port : broker port" +
- "-destinationname : queue/topic name\n" +
- "-transacted : (true/false). Default is false\n" +
- "-persistent : (true/false). Default is false\n" +
- "-pubsub : (true/false). Default is false\n" +
- "-selector : selector string\n" +
- "-payload : paylaod size. Default is 0\n" +
- "-messages : no of messages to be sent (if 0, the ping loop will run indefinitely)\n" +
- "-destinationscount : no of destinations for multi-destinations test\n" +
- "-batchsize : batch size\n" +
- "-rate : thruput rate\n");
- System.exit(0);
- }
-}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
deleted file mode 100644
index d9e81d39de..0000000000
--- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingProducer.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.ping;
-
-import java.net.InetAddress;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import javax.jms.*;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.jms.MessageProducer;
-import org.apache.qpid.jms.Session;
-
-/**
- * PingProducer is a client that sends timestamped pings to a queue. It is designed to be run from the command line
- * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session and
- * configured message producer.
- * <p/>
- * <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
- * does all its work through helper methods, so that code wishing to run a ping cycle is not forced to do so
- * by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
- * also registered to terminate the ping loop cleanly.
- * <p/>
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Provide a ping cycle.
- * <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
- * </table>
- */
-class TestPingProducer extends AbstractPingProducer
-{
- private static final Logger _logger = Logger.getLogger(TestPingProducer.class);
-
- /**
- * Used to set up a default message size.
- */
- private static final int DEFAULT_MESSAGE_SIZE = 0;
-
- /**
- * Used to define how long to wait between pings.
- */
- private static final long SLEEP_TIME = 250;
-
- /**
- * Holds the name of the queue to send pings on.
- */
- private static final String PING_QUEUE_NAME = "ping";
-
- private static TestPingProducer _pingProducer;
-
- /**
- * Determines whether this producer sends persistent messages from the run method.
- */
- private boolean _persistent = false;
-
- /**
- * Holds the message size to send, from the run method.
- */
- private int _messageSize = DEFAULT_MESSAGE_SIZE;
-
- /**
- * Used to indicate that the ping loop should print out whenever it pings.
- */
- private boolean _verbose = false;
-
- public TestPingProducer(String brokerDetails, String username, String password, String virtualpath, String queueName,
- boolean transacted, boolean persistent, int messageSize, boolean verbose, boolean afterCommit,
- boolean beforeCommit, boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize)
- throws Exception
- {
- // Create a connection to the broker.
- InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
-
- setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
-
- // Create a transactional or non-transactional session, based on the command line arguments.
- setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
-
- // Create a queue to send the pings on.
- Queue pingQueue = new AMQQueue(queueName);
- _producer = (MessageProducer) getProducerSession().createProducer(pingQueue);
-
- _persistent = persistent;
- _messageSize = messageSize;
-
- _verbose = verbose;
-
- // Set failover interrupts
- _failAfterCommit = afterCommit;
- _failBeforeCommit = beforeCommit;
- _failAfterSend = afterSend;
- _failBeforeSend = beforeSend;
- _txBatchSize = batchSize;
- _failOnce = failOnce;
- }
-
- /**
- * Starts a ping-pong loop running from the command line. The bounce back client {@link TestPingClient} also needs
- * to be started to bounce the pings back again.
- *
- * @param args The command line arguments as defined above.
- */
- public static void main(String[] args) throws Exception
- {
- // Extract the command line.
- if (args.length < 2)
- {
- System.err.println(
- "Usage: TestPingPublisher <brokerDetails> <virtual path> "
- + "[<verbose(true|false)> <transacted(true|false))> <persistent(true|false)> <message size in bytes> <batchsize>");
- System.exit(0);
- }
-
- String brokerDetails = args[0];
- String virtualpath = args[1];
- boolean verbose = (args.length >= 3) ? Boolean.parseBoolean(args[2]) : true;
- boolean transacted = (args.length >= 4) ? Boolean.parseBoolean(args[3]) : false;
- boolean persistent = (args.length >= 5) ? Boolean.parseBoolean(args[4]) : false;
- int messageSize = (args.length >= 6) ? Integer.parseInt(args[5]) : DEFAULT_MESSAGE_SIZE;
- int batchSize = (args.length >= 7) ? Integer.parseInt(args[6]) : 1;
-
- boolean afterCommit = false;
- boolean beforeCommit = false;
- boolean afterSend = false;
- boolean beforeSend = false;
- boolean failOnce = false;
-
- for (String arg : args)
- {
- if (arg.startsWith("failover:"))
- {
- //failover:<before|after>:<send:commit>
- String[] parts = arg.split(":");
- if (parts.length == 3)
- {
- if (parts[2].equals("commit"))
- {
- afterCommit = parts[1].equals("after");
- beforeCommit = parts[1].equals("before");
- }
-
- if (parts[2].equals("send"))
- {
- afterSend = parts[1].equals("after");
- beforeSend = parts[1].equals("before");
- }
-
- if (parts[1].equals("once"))
- {
- failOnce = true;
- }
- }
- else
- {
- System.out.println("Unrecognized failover request:" + arg);
- }
- }
- }
-
- // Create a ping producer to generate the pings.
- _pingProducer = new TestPingProducer(brokerDetails, "guest", "guest", virtualpath, PING_QUEUE_NAME, transacted,
- persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
- beforeSend, failOnce, batchSize);
-
- // Start the connection running.
- _pingProducer.getConnection().start();
-
- // Create a shutdown hook to terminate the ping-pong producer.
- Runtime.getRuntime().addShutdownHook(_pingProducer.getShutdownHook());
-
- // Ensure the ping loop execption listener is registered on the connection to terminate it on error.
- _pingProducer.getConnection().setExceptionListener(_pingProducer);
-
- // Start the ping loop running until it is interrupted.
- Thread pingThread = new Thread(_pingProducer);
- pingThread.run();
- pingThread.join();
- }
-
- /**
- * Sends the specified ping message.
- *
- * @param message The message to send.
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
- public void ping(Message message) throws JMSException
- {
- sendMessage(message);
-
- // Keep the messageId to correlate with the reply.
- String messageId = message.getJMSMessageID();
-
- // Commit the transaction if running in transactional mode. This must happen now, rather than at the end of
- // this method, as the message will not be sent until the transaction is committed.
- commitTx();
- }
-
- /**
- * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
- * waits for short pauses in between each.
- */
- public void pingLoop()
- {
- try
- {
- // Generate a sample message and time stamp it.
- ObjectMessage msg = getTestMessage(null, _messageSize, _persistent);
- msg.setLongProperty("timestamp", System.currentTimeMillis());
-
- // Send the message.
- ping(msg);
-
- if (_verbose)
- {
- System.out.println("Pinged at: " + timestampFormatter.format(new Date())); //" + " with id: " + msg.getJMSMessageID());
- }
- // Introduce a short pause if desired.
- pause(SLEEP_TIME);
- }
- catch (JMSException e)
- {
- _publish = false;
- _logger.error("There was a JMSException: " + e.getMessage(), e);
- }
- }
-}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingPublisher.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingPublisher.java
deleted file mode 100644
index 3b2dcc4d36..0000000000
--- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingPublisher.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.pingpong;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.BasicMessageProducer;
-import org.apache.qpid.client.message.TestMessageFactory;
-import org.apache.qpid.jms.MessageProducer;
-import org.apache.qpid.jms.Session;
-
-import javax.jms.*;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- * A client that behaves as follows:
- * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
- * <li>Creates a temporary queue</li>
- * <li>Creates messages containing a property that is the name of the temporary queue</li>
- * <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
- * </ul>
- */
-public class TestPingPublisher implements ExceptionListener
-{
- private static final Logger _log = Logger.getLogger(TestPingPublisher.class);
-
- private AMQConnection _connection;
-
- private boolean _publish;
- private static int _messageSize = 0;
- private long SLEEP_TIME = 0L;
-
-// private class CallbackHandler implements MessageListener
-// {
-//
-// private int _actualMessageCount;
-//
-//
-// public void onMessage(Message m)
-// {
-// if (_log.isDebugEnabled())
-// {
-// _log.debug("Message received: " + m);
-// }
-// _actualMessageCount++;
-// if (_actualMessageCount % 1000 == 0)
-// {
-// _log.info("Received message count: " + _actualMessageCount);
-// }
-// }
-// }
-
- public TestPingPublisher(String brokerDetails, String clientID, String virtualpath) throws AMQException, URLSyntaxException
- {
- try
- {
- createConnection(brokerDetails, clientID, virtualpath);
-
- Session session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- //AMQQueue destination = new AMQQueue("ping");
- AMQTopic destination = new AMQTopic("ping");
- MessageProducer producer = (MessageProducer) session.createProducer(destination);
-
- _connection.setExceptionListener(this);
-
- _connection.start();
-
- int msgCount = 0;
- while (_publish)
- {
-/*
- TextMessage msg = session.createTextMessage(
- "Presented to in conjunction with Mahnah Mahnah and the Snowths: " + ++messageNumber);
-*/
- ObjectMessage msg = null;
- if (_messageSize != 0)
- {
- msg = TestMessageFactory.newObjectMessage(session, _messageSize);
- }
- else
- {
- msg = session.createObjectMessage();
- }
-
- Long time = System.nanoTime();
- msg.setStringProperty("timestampString", Long.toString(time));
- msg.setLongProperty("timestamp", time);
-
- ((BasicMessageProducer) producer).send(msg, DeliveryMode.PERSISTENT, true);
-
- _log.info("Message Sent:" + msgCount++);
- _log.debug(msg);
-
- if (msgCount == Integer.MAX_VALUE)
- {
- _publish = false;
- }
-
- if (SLEEP_TIME > 0)
- {
- try
- {
- Thread.sleep(SLEEP_TIME);
- }
- catch (InterruptedException ie)
- {
- //do nothing
- }
- }
- }
-
- }
- catch (JMSException e)
- {
- e.printStackTrace();
- }
- }
-
- private void createConnection(String brokerDetails, String clientID, String virtualpath) throws AMQException, URLSyntaxException
- {
- _publish = true;
- _connection = new AMQConnection(brokerDetails, "guest", "guest", clientID, virtualpath);
- _log.info("Connected with URL:" + _connection.toURL());
- }
-
- /**
- * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
- * means the server will allocate a name.
- */
- public static void main(String[] args)
- {
- if (args.length < 2)
- {
- System.err.println("Usage: TestPingPublisher <brokerDetails> <virtual path> [message size in bytes]");
- System.exit(0);
- }
- try
- {
- InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
- if (args.length > 2 )
- {
- _messageSize = Integer.parseInt(args[2]);
- }
- new TestPingPublisher(args[0], clientID, args[1]);
- }
- catch (UnknownHostException e)
- {
- e.printStackTrace();
- }
- catch (AMQException e)
- {
- System.err.println("Error in client: " + e);
- e.printStackTrace();
- }
- catch (URLSyntaxException e)
- {
- System.err.println("Error in connection arguments : " + e);
- }
-
- //System.exit(0);
- }
-
- /**
- * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
- */
- public void onException(JMSException e)
- {
- System.err.println(e.getMessage());
-
- _publish = false;
- e.printStackTrace(System.err);
- }
-}
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java b/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java
deleted file mode 100644
index b43319744a..0000000000
--- a/java/perftests/src/main/java/org/apache/qpid/ping/TestPingSubscriber.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.pingpong;
-
-import org.apache.log4j.Logger;
-import org.apache.log4j.Level;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.jms.Session;
-
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Topic;
-import javax.jms.JMSException;
-import java.net.InetAddress;
-
-public class TestPingSubscriber
-{
- private static final Logger _logger = Logger.getLogger(TestPingSubscriber.class);
-
- private static class TestPingMessageListener implements MessageListener
- {
- public TestPingMessageListener()
- {
- }
-
- long _lastTimestamp = 0L;
- long _lastTimestampString = 0L;
-
- public void onMessage(javax.jms.Message message)
- {
- Long time = System.nanoTime();
-
- if (_logger.isInfoEnabled())
- {
- long timestampString = 0L;
-
- try
- {
- long timestamp = message.getLongProperty("timestamp");
- timestampString = Long.parseLong(message.getStringProperty("timestampString"));
-
- if (timestampString != timestamp)
- {
- _logger.info("Timetamps differ!:\n" +
- "timestamp:" + timestamp + "\n" +
- "timestampString:" + timestampString);
- }
-
- }
- catch (JMSException jmse)
- {
- _logger.error("JMSException caught:" + jmse.getMessage(), jmse);
- }
-
-
- long stringDiff = time - timestampString;
-
- _logger.info("Ping: TS:" + stringDiff / 1000 + "us");
-
- // _logger.info(_name + " got message '" + message + "\n");
- }
- }
- }
-
- public static void main(String[] args)
- {
- _logger.info("Starting...");
-
- if (args.length < 4)
- {
- System.out.println("Usage: brokerdetails username password virtual-path [selector] ");
- System.exit(1);
- }
- try
- {
- InetAddress address = InetAddress.getLocalHost();
- AMQConnection con1 = new AMQConnection(args[0], args[1], args[2],
- address.getHostName(), args[3]);
-
- _logger.info("Connected with URL:" + con1.toURL());
-
- final org.apache.qpid.jms.Session session1 = (org.apache.qpid.jms.Session)
- con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
- String selector = null;
-
- if (args.length == 5)
- {
- selector = args[4];
- _logger.info("Message selector is <" + selector + ">...");
- }
- else
- {
- _logger.info("Not using message selector ");
- }
-
- Topic t = new AMQTopic("ping");
-
- MessageConsumer consumer1 = session1.createConsumer(t,
- 1, false, false, selector);
-
- consumer1.setMessageListener(new TestPingMessageListener());
- con1.start();
- }
- catch (Throwable t)
- {
- System.err.println("Fatal error: " + t);
- t.printStackTrace();
- }
-
- System.out.println("Waiting...");
- }
-}
-
diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java b/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java
deleted file mode 100644
index 1e98e45bba..0000000000
--- a/java/perftests/src/main/java/org/apache/qpid/ping/Throttle.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.apache.qpid.ping;
-
-/**
- * Throttle is a helper class used in situations where a controlled rate of processing is desired. It allows a certain
- * number of operations-per-second to be defined and supplies a {@link #throttle} method that can only be called at
- * most at that rate. The first call to the throttle method will return immediately, subsequent calls will introduce
- * a short pause to fill out the remainder of the current cycle to attain the desired rate. If there is no remainder
- * left then it will return immediately.
- *
- * <p><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * </table>
- *
- * @author Rupert Smith
- */
-public class Throttle
-{
- /** Holds the length of the cycle in nano seconds. */
- long cycleLengthNanos = 0L;
-
- /** Records the nano time of the last call to the throttle method. */
- long lastTimeNanos = 0L;
-
- /**
- * Sets up the desired rate of operation per second that the throttle method should restrict to.
- *
- * @param opsPerSecond The maximum number of calls per second that the throttle method will take.
- */
- public void setRate(int opsPerSecond)
- {
- // Calculate the length of a cycle.
- cycleLengthNanos = 1000000000 / opsPerSecond;
- }
-
- /**
- * Introduces a short pause to fill out any time left in the cycle since this method was last called, of length
- * defined by a call to the {@link #setRate} method.
- */
- public void throttle()
- {
- // Record the time now.
- long currentTimeNanos = System.nanoTime();
-
- // Check if there is any time remaining in the current cycle and introduce a short wait to fill out the
- // remainder of the cycle if needed.
- long remainingTimeNanos = cycleLengthNanos - (currentTimeNanos - lastTimeNanos);
-
- if (remainingTimeNanos > 0)
- {
- long milliWait = remainingTimeNanos / 1000000;
- int nanoWait = (int) (remainingTimeNanos % 1000000);
-
- try
- {
- Thread.currentThread().sleep(milliWait, nanoWait);
- }
- catch (InterruptedException e)
- {
- // Just ignore this?
- }
- }
-
- // Keep the time of the last call to this method to calculate the next cycle.
- //lastTimeNanos = currentTimeNanos;
- lastTimeNanos = System.nanoTime();
- }
-}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
index bae6aa0dc2..87edd31575 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.requestreply;
+import java.io.IOException;
import java.net.InetAddress;
+import java.text.SimpleDateFormat;
import java.util.Date;
import javax.jms.*;
@@ -32,7 +34,6 @@ import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.ping.AbstractPingClient;
import org.apache.qpid.topic.Config;
/**
@@ -58,7 +59,7 @@ import org.apache.qpid.topic.Config;
*
* @todo Make verbose accept a number of messages, only prints to console every X messages.
*/
-public class PingPongBouncer extends AbstractPingClient implements MessageListener
+public class PingPongBouncer implements MessageListener
{
private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);
@@ -73,6 +74,9 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
/** The default exclusive flag for the message consumer. */
private static final boolean EXCLUSIVE = false;
+ /** A convenient formatter to use when time stamping output. */
+ protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
+
/** Used to indicate that the reply generator should log timing info to the console (logger info level). */
private boolean _verbose = false;
@@ -93,6 +97,24 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
/** The producer session. */
private Session _producerSession;
+ /** Holds the connection to the broker. */
+ private AMQConnection _connection;
+
+ /** Flag used to indicate if this is a point to point or pub/sub ping client. */
+ private boolean _isPubSub = false;
+
+ /**
+ * This flag is used to indicate that the user should be prompted to kill a broker, in order to test
+ * failover, immediately before committing a transaction.
+ */
+ protected boolean _failBeforeCommit = false;
+
+ /**
+ * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test
+ * failover, immediate after committing a transaction.
+ */
+ protected boolean _failAfterCommit = false;
+
/**
* Creates a PingPongBouncer on the specified producer and consumer sessions.
*
@@ -110,8 +132,8 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
* @throws Exception All underlying exceptions allowed to fall through. This is only test code...
*/
public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
- String destinationName, boolean persistent, boolean transacted, String selector,
- boolean verbose, boolean pubsub) throws Exception
+ String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose,
+ boolean pubsub) throws Exception
{
// Create a client id to uniquely identify this client.
InetAddress address = InetAddress.getLocalHost();
@@ -133,7 +155,8 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
// Create the queue to listen for message on.
createConsumerDestination(destinationName);
- MessageConsumer consumer = _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
+ MessageConsumer consumer =
+ _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
// Create a producer for the replies, without a default destination.
_replyProducer = _producerSession.createProducer(null);
@@ -144,18 +167,6 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
consumer.setMessageListener(this);
}
- private void createConsumerDestination(String name)
- {
- if (isPubSub())
- {
- _consumerDestination = new AMQTopic(name);
- }
- else
- {
- _consumerDestination = new AMQQueue(name);
- }
- }
-
/**
* Starts a stand alone ping-pong client running in verbose mode.
*
@@ -177,12 +188,13 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
Config config = new Config();
config.setOptions(args);
String brokerDetails = config.getHost() + ":" + config.getPort();
- String virtualpath = "/test";
+ String virtualpath = "/test";
String destinationName = config.getDestination();
if (destinationName == null)
{
destinationName = DEFAULT_DESTINATION_NAME;
}
+
String selector = config.getSelector();
boolean transacted = config.isTransacted();
boolean persistent = config.usePersistentMessages();
@@ -192,13 +204,22 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
//String selector = null;
// Instantiate the ping pong client with the command line options and start it running.
- PingPongBouncer pingBouncer = new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath,
- destinationName, persistent, transacted, selector, verbose, pubsub);
+ PingPongBouncer pingBouncer =
+ new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted,
+ selector, verbose, pubsub);
pingBouncer.getConnection().start();
System.out.println("Waiting...");
}
+ private static void usage()
+ {
+ System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n"
+ + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n"
+ + "-persistent : (true/false). Default is false\n"
+ + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n");
+ }
+
/**
* This is a callback method that is notified of all messages for which this has been registered as a message
* listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to
@@ -260,14 +281,145 @@ public class PingPongBouncer extends AbstractPingClient implements MessageListen
}
}
- private static void usage()
+ /**
+ * Gets the underlying connection that this ping client is running on.
+ *
+ * @return The underlying connection that this ping client is running on.
+ */
+ public AMQConnection getConnection()
+ {
+ return _connection;
+ }
+
+ /**
+ * Sets the connection that this ping client is using.
+ *
+ * @param connection The ping connection.
+ */
+ public void setConnection(AMQConnection connection)
+ {
+ this._connection = connection;
+ }
+
+ /**
+ * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic.
+ *
+ * @param pubsub <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
+ */
+ public void setPubSub(boolean pubsub)
+ {
+ _isPubSub = pubsub;
+ }
+
+ /**
+ * Checks whether this client is a p2p or pub/sub ping client.
+ *
+ * @return <tt>true</tt> if this client is pinging a topic, <tt>false</tt> if it is pinging a queue.
+ */
+ public boolean isPubSub()
+ {
+ return _isPubSub;
+ }
+
+ /**
+ * Convenience method to commit the transaction on the specified session. If the session to commit on is not
+ * a transactional session, this method does nothing.
+ *
+ * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
+ * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
+ * after the commit is applied.
+ *
+ * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ */
+ protected void commitTx(Session session) throws JMSException
+ {
+ if (session.getTransacted())
+ {
+ try
+ {
+ if (_failBeforeCommit)
+ {
+ _logger.trace("Failing Before Commit");
+ doFailover();
+ }
+
+ session.commit();
+
+ if (_failAfterCommit)
+ {
+ _logger.trace("Failing After Commit");
+ doFailover();
+ }
+
+ _logger.trace("Session Commited.");
+ }
+ catch (JMSException e)
+ {
+ _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+ try
+ {
+ session.rollback();
+ _logger.debug("Message rolled back.");
+ }
+ catch (JMSException jmse)
+ {
+ _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+ // Both commit and rollback failed. Throw the rollback exception.
+ throw jmse;
+ }
+ }
+ }
+ }
+
+ /**
+ * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block
+ * until the user supplied some input on the terminal.
+ *
+ * @param broker The name of the broker to terminate.
+ */
+ protected void doFailover(String broker)
+ {
+ System.out.println("Kill Broker " + broker + " now.");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ { }
+
+ System.out.println("Continuing.");
+ }
+
+ /**
+ * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
+ * until the user supplied some input on the terminal.
+ */
+ protected void doFailover()
{
- System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" +
- "-destinationname : queue/topic name\n" +
- "-transacted : (true/false). Default is false\n" +
- "-persistent : (true/false). Default is false\n" +
- "-pubsub : (true/false). Default is false\n" +
- "-selector : selector string\n");
+ System.out.println("Kill Broker now.");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ { }
+
+ System.out.println("Continuing.");
+
+ }
+
+ private void createConsumerDestination(String name)
+ {
+ if (isPubSub())
+ {
+ _consumerDestination = new AMQTopic(name);
+ }
+ else
+ {
+ _consumerDestination = new AMQQueue(name);
+ }
}
/**
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 263e62cf04..310ec5f5e3 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -20,344 +20,353 @@
*/
package org.apache.qpid.requestreply;
+import java.io.IOException;
import java.net.InetAddress;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.*;
import org.apache.log4j.Logger;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.client.*;
+import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
-import org.apache.qpid.ping.AbstractPingProducer;
-import org.apache.qpid.ping.Throttle;
import org.apache.qpid.topic.Config;
+import uk.co.thebadgerset.junit.extensions.BatchedThrottle;
+import uk.co.thebadgerset.junit.extensions.Throttle;
+
/**
* PingPongProducer is a client that sends pings to a queue and waits for pongs to be bounced back by a bounce back
- * client (see {@link PingPongBouncer} for the bounce back client). It is designed to be run from the command line
- * as a stand alone test tool, but it may also be fairly easily instantiated by other code by supplying a session,
- * message producer and message consumer to run the ping-pong cycle on.
- * <p/>
+ * client (see {@link PingPongBouncer} for the bounce back client).
+ *
* <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings.
* This means that this class has to do some work to correlate pings with pongs; it expectes the original message
- * id in the ping to be bounced back in the correlation id. If a new temporary queue per ping were used, then
- * this correlation would not need to be done.
- * <p/>
+ * correlation id in the ping to be bounced back in the reply correlation id.
+ *
+ * <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor.
+ * It can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings
+ * within transactions; control the number of pings to send in each transaction; limit its sending rate; and perform
+ * failover testing.
+ *
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
* does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so
* by starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is
* also registered to terminate the ping-pong loop cleanly.
- * <p/>
+ *
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Provide a ping and wait for response cycle.
+ * <tr><td> Provide a ping and wait for all responses cycle.
* <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
* </table>
*
- * @todo Make temp queue per ping a command line option.
- * @todo Make the queue name a command line option.
+ * @todo The use of a ping rate {@link #DEFAULT_RATE} and waits between pings {@link #DEFAULT_SLEEP_TIME} are overlapping.
+ * Use the rate and throttling only.
+ *
+ * @todo Make shared or unique destinations a configurable option, hard coded to false.
*/
-public class PingPongProducer extends AbstractPingProducer implements Runnable, MessageListener, ExceptionListener
+public class PingPongProducer implements Runnable, MessageListener, ExceptionListener
{
private static final Logger _logger = Logger.getLogger(PingPongProducer.class);
- /**
- * Used to set up a default message size.
- */
- protected static final int DEFAULT_MESSAGE_SIZE = 0;
+ /** Holds the name of the property to get the test message size from. */
+ public static final String MESSAGE_SIZE_PROPNAME = "messagesize";
- /**
- * This is set and used when the test is for multiple-destinations
- */
- protected static final int DEFAULT_DESTINATION_COUNT = 0;
+ /** Holds the name of the property to get the ping queue name from. */
+ public static final String PING_QUEUE_NAME_PROPNAME = "destinationname";
- protected static final int DEFAULT_RATE = 0;
+ /** Holds the name of the property to get the test delivery mode from. */
+ public static final String PERSISTENT_MODE_PROPNAME = "persistent";
- /**
- * Used to define how long to wait between pings.
- */
- protected static final long SLEEP_TIME = 250;
+ /** Holds the name of the property to get the test transactional mode from. */
+ public static final String TRANSACTED_PROPNAME = "transacted";
- /**
- * Used to define how long to wait before assuming that a ping has timed out.
- */
- protected static final long TIMEOUT = 9000;
+ /** Holds the name of the property to get the test broker url from. */
+ public static final String BROKER_PROPNAME = "broker";
- /**
- * Holds the name of the destination to send pings on.
- */
- protected static final String PING_DESTINATION_NAME = "ping";
+ /** Holds the name of the property to get the test broker virtual path. */
+ public static final String VIRTUAL_PATH_PROPNAME = "virtualPath";
- /**
- * The batch size.
- */
- protected static final int DEFAULT_BATCH_SIZE = 100;
+ /** Holds the name of the property to get the message rate from. */
+ public static final String RATE_PROPNAME = "rate";
- protected static final int PREFETCH = 100;
- protected static final boolean NO_LOCAL = true;
- protected static final boolean EXCLUSIVE = false;
+ public static final String VERBOSE_OUTPUT_PROPNAME = "verbose";
- /**
- * The number of priming loops to run.
- */
- protected static final int PRIMING_LOOPS = 3;
+ /** Holds the true or false depending on wether it is P2P test or PubSub */
+ public static final String IS_PUBSUB_PROPNAME = "pubsub";
- /**
- * A source for providing sequential unique correlation ids.
- */
+ public static final String FAIL_AFTER_COMMIT_PROPNAME = "FailAfterCommit";
+
+ public static final String FAIL_BEFORE_COMMIT_PROPNAME = "FailBeforeCommit";
+
+ public static final String FAIL_AFTER_SEND_PROPNAME = "FailAfterSend";
+
+ public static final String FAIL_BEFORE_SEND_PROPNAME = "FailBeforeSend";
+
+ public static final String FAIL_ONCE_PROPNAME = "FailOnce";
+
+ public static final String USERNAME_PROPNAME = "username";
+
+ public static final String PASSWORD_PROPNAME = "password";
+
+ public static final String SELECTOR_PROPNAME = "selector";
+
+ public static final String PING_DESTINATION_COUNT_PROPNAME = "destinationscount";
+
+ /** Holds the name of the property to get the waiting timeout for response messages. */
+ public static final String TIMEOUT_PROPNAME = "timeout";
+
+ public static final String COMMIT_BATCH_SIZE_PROPNAME = "CommitBatchSize";
+
+ /** Used to set up a default message size. */
+ public static final int DEFAULT_MESSAGE_SIZE = 0;
+
+ /** Holds the name of the default destination to send pings on. */
+ public static final String DEFAULT_PING_DESTINATION_NAME = "ping";
+
+ /** Defines the default number of destinations to ping. */
+ public static final int DEFAULT_DESTINATION_COUNT = 1;
+
+ /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
+ public static final int DEFAULT_RATE = 0;
+
+ /** Defines the default wait between pings. */
+ public static final long DEFAULT_SLEEP_TIME = 250;
+
+ /** Default time to wait before assuming that a ping has timed out. */
+ public static final long DEFAULT_TIMEOUT = 9000;
+
+ /** Defines the default number of pings to send in each transaction when running transactionally. */
+ public static final int DEFAULT_TX_BATCH_SIZE = 100;
+
+ /** Defines the default prefetch size to use when consuming messages. */
+ public static final int DEFAULT_PREFETCH = 100;
+
+ /** Defines the default value of the no local flag to use when consuming messages. */
+ public static final boolean DEFAULT_NO_LOCAL = false;
+
+ /** Defines the default value of the exclusive flag to use when consuming messages. */
+ public static final boolean DEFAULT_EXCLUSIVE = false;
+
+ /** Holds the message delivery mode to use for the test. */
+ public static final boolean DEFAULT_PERSISTENT_MODE = false;
+
+ /** Holds the transactional mode to use for the test. */
+ public static final boolean DEFAULT_TRANSACTED = false;
+
+ /** Holds the default broker url for the test. */
+ public static final String DEFAULT_BROKER = "tcp://localhost:5672";
+
+ /** Holds the default virtual path for the test. */
+ public static final String DEFAULT_VIRTUAL_PATH = "test";
+
+ /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
+ public static final boolean DEFAULT_PUBSUB = false;
+
+ /** Holds the default broker log on username. */
+ public static final String DEFAULT_USERNAME = "guest";
+
+ /** Holds the default broker log on password. */
+ public static final String DEFAULT_PASSWORD = "guest";
+
+ /** Holds the default message selector. */
+ public static final String DEFAULT_SELECTOR = null;
+
+ /** Holds the default failover after commit test flag. */
+ public static final String DEFAULT_FAIL_AFTER_COMMIT = "false";
+
+ /** Holds the default failover before commit test flag. */
+ public static final String DEFAULT_FAIL_BEFORE_COMMIT = "false";
+
+ /** Holds the default failover after send test flag. */
+ public static final String DEFAULT_FAIL_AFTER_SEND = "false";
+
+ /** Holds the default failover before send test flag. */
+ public static final String DEFAULT_FAIL_BEFORE_SEND = "false";
+
+ /** Holds the default failover only once flag, true means only do one failover, false means failover on every commit cycle. */
+ public static final String DEFAULT_FAIL_ONCE = "true";
+
+ /** Holds the default verbose mode. */
+ public static final boolean DEFAULT_VERBOSE = false;
+
+ /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
private static AtomicLong idGenerator = new AtomicLong(0L);
/**
- * Holds a map from message ids to latches on which threads wait for replies.
+ * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross
+ * multiple ping producers on the same JVM.
*/
- private static Map<String, CountDownLatch> trafficLights = new HashMap<String, CountDownLatch>();
+ private static Map<String, CountDownLatch> trafficLights =
+ Collections.synchronizedMap(new HashMap<String, CountDownLatch>());
+
+ /** A convenient formatter to use when time stamping output. */
+ protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
/**
- * Destination where the responses messages will arrive
+ * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
+ * creating multiple ping producers in the same JVM.
*/
+ protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();
+
+ /** Destination where the response messages will arrive. */
private Destination _replyDestination;
- /**
- * Destination where the producer will be sending message to
- */
- private Destination _pingDestination;
+ /** Destination where the producer will be sending message to. */
+ //private Destination _pingDestination;
- /**
- * Determines whether this producer sends persistent messages from the run method.
- */
+ /** Determines whether this producer sends persistent messages. */
protected boolean _persistent;
- /**
- * Holds the message size to send, from the run method.
- */
+ /** Determines what size of messages this producer sends. */
protected int _messageSize;
- /**
- * Used to indicate that the ping loop should print out whenever it pings.
- */
+ /** Used to indicate that the ping loop should print out whenever it pings. */
protected boolean _verbose = false;
+ /** Holds the session on which ping replies are received. */
protected Session _consumerSession;
- /**
- * Used to restrict the sending rate to a specified limit.
- */
- private Throttle rateLimiter = null;
+ /** Used to restrict the sending rate to a specified limit. */
+ private Throttle _rateLimiter = null;
+
+ /** Holds a message listener that this message listener chains all its messages to. */
+ private ChainedMessageListener _chainedMessageListener = null;
+
+ /** Flag used to indicate if this is a point to point or pub/sub ping client. */
+ protected boolean _isPubSub = false;
/**
- * The throttler can only reliably restrict to a few hundred cycles per second, so a throttling batch size is used
- * to group sends together into batches large enough that the throttler runs slower than that.
+ * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
+ * on the same JVM using this id generator will allow them to ping on the same queues.
*/
- int _throttleBatchSize;
+ protected AtomicInteger _queueSharedId = new AtomicInteger();
- private MessageListener _messageListener = null;
+ /** Used to tell the ping loop when to terminate, it only runs while this is true. */
+ protected boolean _publish = true;
- private PingPongProducer(String brokerDetails, String username, String password, String virtualpath, boolean transacted,
- boolean persistent, int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
- boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize, int rate)
- throws Exception
- {
- // Create a connection to the broker.
- InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
+ /** Holds the connection to the broker. */
+ private Connection _connection;
- setConnection(new AMQConnection(brokerDetails, username, password, clientID, virtualpath));
+ /** Holds the producer session, needed to create ping messages. */
+ private Session _producerSession;
- // Create transactional or non-transactional sessions, based on the command line arguments.
- setProducerSession((Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE));
- _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ /** Holds the set of destiniations that this ping producer pings. */
+ protected List<Destination> _pingDestinations = new ArrayList<Destination>();
- _persistent = persistent;
- _messageSize = messageSize;
- _verbose = verbose;
+ /** Holds the message producer to send the pings through. */
+ protected MessageProducer _producer;
- // Set failover interrupts
- _failAfterCommit = afterCommit;
- _failBeforeCommit = beforeCommit;
- _failAfterSend = afterSend;
- _failBeforeSend = beforeSend;
- _failOnce = failOnce;
- _txBatchSize = batchSize;
-
- // Calculate a throttling batch size and rate such that the throttle runs slower than 100 cycles per second
- // and batched sends within each cycle multiply up to give the desired rate.
- //
- // total rate = throttle rate * batch size.
- // 1 < throttle rate < 100
- // 1 < total rate < 20000
- if (rate > DEFAULT_RATE)
- {
- // Log base 10 over 2 is used here to get a feel for what power of 100 the total rate is.
- // As the total rate goes up the powers of 100 the batch size goes up by powers of 100 to keep the
- // throttle rate back into the range 1 to 100.
- int x = (int) (Math.log10(rate) / 2);
- _throttleBatchSize = (int) Math.pow(100, x);
- int throttleRate = rate / _throttleBatchSize;
-
- _logger.debug("rate = " + rate);
- _logger.debug("x = " + x);
- _logger.debug("_throttleBatchSize = " + _throttleBatchSize);
- _logger.debug("throttleRate = " + throttleRate);
-
- rateLimiter = new Throttle();
- rateLimiter.setRate(throttleRate);
- }
- }
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
+ protected boolean _failBeforeCommit = false;
- /**
- * Creates a ping pong producer with the specified connection details and type.
- *
- * @param brokerDetails
- * @param username
- * @param password
- * @param virtualpath
- * @param transacted
- * @throws Exception All allowed to fall through. This is only test code...
- */
- public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
- String destinationName, String selector, boolean transacted, boolean persistent,
- int messageSize, boolean verbose, boolean afterCommit, boolean beforeCommit,
- boolean afterSend, boolean beforeSend, boolean failOnce, int batchSize,
- int noOfDestinations, int rate, boolean pubsub) throws Exception
- {
- this(brokerDetails, username, password, virtualpath, transacted, persistent, messageSize, verbose, afterCommit,
- beforeCommit, afterSend, beforeSend, failOnce, batchSize, rate);
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
+ protected boolean _failAfterCommit = false;
- _destinationCount = noOfDestinations;
- setPubSub(pubsub);
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
+ protected boolean _failBeforeSend = false;
- if (noOfDestinations == DEFAULT_DESTINATION_COUNT)
- {
- if (destinationName != null)
- {
- createPingDestination(destinationName);
- // Create producer and the consumer
- createProducer();
- createConsumer(selector);
- }
- else
- {
- _logger.error("Destination is not specified");
- throw new IllegalArgumentException("Destination is not specified");
- }
- }
- }
+ /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
+ protected boolean _failAfterSend = false;
- private void createPingDestination(String name)
- {
- if (isPubSub())
- {
- _pingDestination = new AMQTopic(name);
- }
- else
- {
- _pingDestination = new AMQQueue(name);
- }
- }
+ /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
+ protected boolean _failOnce = true;
+
+ /** Holds the number of sends that should be performed in every transaction when using transactions. */
+ protected int _txBatchSize = 1;
/**
- * Creates the producer to send the pings on. If the tests are with nultiple-destinations, then producer
- * is created with null destination, so that any destination can be specified while sending
+ * Creates a ping producer with the specified parameters, of which there are many. See their individual comments
+ * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on it,
+ * to send and recieve its pings and replies on. The other options are kept, and control how this pinger behaves.
*
- * @throws JMSException
+ * @param brokerDetails The URL of the broker to send pings to.
+ * @param username The username to log onto the broker with.
+ * @param password The password to log onto the broker with.
+ * @param virtualpath The virtual host name to use on the broker.
+ * @param destinationName The name (or root where multiple destinations are used) of the desitination to send
+ * pings to.
+ * @param selector The selector to filter replies with.
+ * @param transacted Indicates whether or not pings are sent and received in transactions.
+ * @param persistent Indicates whether pings are sent using peristent delivery.
+ * @param messageSize Specifies the size of ping messages to send.
+ * @param verbose Indicates that information should be printed to the console on every ping.
+ * @param afterCommit Indicates that the user should be promted to terminate a broker after commits to test failover.
+ * @param beforeCommit Indicates that the user should be promted to terminate a broker before commits to test failover.
+ * @param afterSend Indicates that the user should be promted to terminate a broker after sends to test failover.
+ * @param beforeSend Indicates that the user should be promted to terminate a broker before sends to test failover.
+ * @param failOnce Indicates that the failover testing behaviour should only happen on the first commit, not all.
+ * @param txBatchSize Specifies the number of pings to send in each transaction.
+ * @param noOfDestinations The number of destinations to ping. Must be 1 or more.
+ * @param rate Specified the number of pings per second to send. Setting this to 0 means send as fast as
+ * possible, with no rate restriction.
+ * @param pubsub
+ *
+ * @throws Exception Any exceptions are allowed to fall through.
*/
- public void createProducer() throws JMSException
+ public PingPongProducer(String brokerDetails, String username, String password, String virtualpath,
+ String destinationName, String selector, boolean transacted, boolean persistent, int messageSize,
+ boolean verbose, boolean afterCommit, boolean beforeCommit, boolean afterSend,
+ boolean beforeSend, boolean failOnce, int txBatchSize, int noOfDestinations, int rate,
+ boolean pubsub) throws Exception
{
- if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT)
- {
- // create producer with initial destination as null for test with multiple-destinations
- // In this case, a different destination will be used while sending the message
- _producer = (MessageProducer) getProducerSession().createProducer(null);
- }
- else
+ // Check that one or more destinations were specified.
+ if (noOfDestinations < 1)
{
- // Create a producer with known destination to send the pings on.
- _producer = (MessageProducer) getProducerSession().createProducer(_pingDestination);
-
+ throw new IllegalArgumentException("There must be at least one destination.");
}
- _producer.setDisableMessageTimestamp(true);
- _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- }
+ // Create a connection to the broker.
+ InetAddress address = InetAddress.getLocalHost();
+ String clientID = address.getHostName() + System.currentTimeMillis();
- /**
- * Creates the temporary destination to listen to the responses
- *
- * @param selector
- * @throws JMSException
- */
- public void createConsumer(String selector) throws JMSException
- {
- // Create a temporary destination to get the pongs on.
- if (isPubSub())
- {
- _replyDestination = _consumerSession.createTemporaryTopic();
- }
- else
- {
- _replyDestination = _consumerSession.createTemporaryQueue();
- }
+ _connection = new AMQConnection(brokerDetails, username, password, clientID, virtualpath);
- // Create a message consumer to get the replies with and register this to be called back by it.
- MessageConsumer consumer = _consumerSession.createConsumer(_replyDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
- consumer.setMessageListener(this);
- }
+ // Create transactional or non-transactional sessions, based on the command line arguments.
+ _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- /**
- * Creates consumer instances for each destination. This is used when test is being done with multiple destinations.
- *
- * @param selector
- * @throws JMSException
- */
- public void createConsumers(String selector) throws JMSException
- {
- for (int i = 0; i < getDestinationsCount(); i++)
+ // Set up a throttle to control the send rate, if a rate > 0 is specified.
+ if (rate > 0)
{
- MessageConsumer consumer =
- getConsumerSession().createConsumer(getDestination(i), PREFETCH, false, EXCLUSIVE, selector);
- consumer.setMessageListener(this);
+ _rateLimiter = new BatchedThrottle();
+ _rateLimiter.setRate(rate);
}
- }
-
- public Session getConsumerSession()
- {
- return _consumerSession;
- }
+ // Create the temporary queue for replies.
+ _replyDestination = _consumerSession.createTemporaryQueue();
- public Destination getPingDestination()
- {
- return _pingDestination;
- }
+ // Create the producer and the consumers for all reply destinations.
+ createProducer();
+ createPingDestinations(noOfDestinations, selector, destinationName, true);
+ createReplyConsumers(getReplyDestinations(), selector);
- protected void setPingDestination(Destination destination)
- {
- _pingDestination = destination;
+ // Keep all the remaining options.
+ _persistent = persistent;
+ _messageSize = messageSize;
+ _verbose = verbose;
+ _failAfterCommit = afterCommit;
+ _failBeforeCommit = beforeCommit;
+ _failAfterSend = afterSend;
+ _failBeforeSend = beforeSend;
+ _failOnce = failOnce;
+ _txBatchSize = txBatchSize;
+ _isPubSub = pubsub;
}
/**
- * Starts a ping-pong loop running from the command line. The bounce back client {@link org.apache.qpid.requestreply.PingPongBouncer} also needs
+ * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
* to be started to bounce the pings back again.
- * <p/>
- * <p/>The command line takes from 2 to 4 arguments:
- * <p/><table>
- * <tr><td>brokerDetails <td> The broker connection string.
- * <tr><td>virtualPath <td> The virtual path.
- * <tr><td>transacted <td> A boolean flag, telling this client whether or not to use transactions.
- * <tr><td>size <td> The size of ping messages to use, in bytes.
- * </table>
- *
- * @param args The command line arguments as defined above.
+ *
+ * @param args The command line arguments.
*/
public static void main(String[] args) throws Exception
{
@@ -373,21 +382,21 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
String brokerDetails = config.getHost() + ":" + config.getPort();
String virtualpath = "/test";
- String selector = config.getSelector();
+ String selector = (config.getSelector() == null) ? DEFAULT_SELECTOR : config.getSelector();
boolean verbose = true;
boolean transacted = config.isTransacted();
boolean persistent = config.usePersistentMessages();
int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE;
//int messageCount = config.getMessages();
int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT;
- int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_BATCH_SIZE;
+ int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_TX_BATCH_SIZE;
int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE;
boolean pubsub = config.isPubSub();
String destName = config.getDestination();
if (destName == null)
{
- destName = PING_DESTINATION_NAME;
+ destName = DEFAULT_PING_DESTINATION_NAME;
}
boolean afterCommit = false;
@@ -429,15 +438,13 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
// Create a ping producer to handle the request/wait/reply cycle.
- PingPongProducer pingProducer = new PingPongProducer(brokerDetails, "guest", "guest", virtualpath,
- destName, selector, transacted, persistent, messageSize, verbose,
- afterCommit, beforeCommit, afterSend, beforeSend, failOnce, batchSize,
- destCount, rate, pubsub);
+ PingPongProducer pingProducer =
+ new PingPongProducer(brokerDetails, DEFAULT_USERNAME, DEFAULT_PASSWORD, virtualpath, destName, selector,
+ transacted, persistent, messageSize, verbose, afterCommit, beforeCommit, afterSend,
+ beforeSend, failOnce, batchSize, destCount, rate, pubsub);
pingProducer.getConnection().start();
- // Run a few priming pings to remove warm up time from test results.
- //pingProducer.prime(PRIMING_LOOPS);
// Create a shutdown hook to terminate the ping-pong producer.
Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());
@@ -450,50 +457,107 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
pingThread.join();
}
- private static void usage()
+ /**
+ * Convenience method for a short pause.
+ *
+ * @param sleepTime The time in milliseconds to pause for.
+ */
+ public static void pause(long sleepTime)
{
- System.err.println("Usage: TestPingPublisher \n" + "-host : broker host" + "-port : broker port" +
- "-destinationname : queue/topic name\n" +
- "-transacted : (true/false). Default is false\n" +
- "-persistent : (true/false). Default is false\n" +
- "-pubsub : (true/false). Default is false\n" +
- "-selector : selector string\n" +
- "-payload : paylaod size. Default is 0\n" +
- //"-messages : no of messages to be sent (if 0, the ping loop will run indefinitely)\n" +
- "-destinationscount : no of destinations for multi-destinations test\n" +
- "-batchsize : batch size\n" +
- "-rate : thruput rate\n");
+ if (sleepTime > 0)
+ {
+ try
+ {
+ Thread.sleep(sleepTime);
+ }
+ catch (InterruptedException ie)
+ { }
+ }
}
/**
- * Primes the test loop by sending a few messages, then introduces a short wait. This allows the bounce back client
- * on the other end a chance to configure its reply producer on the reply to destination. It is also worth calling
- * this a few times, in order to prime the JVMs JIT compilation.
+ * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply
+ * to destination of this pinger.
*
- * @param x The number of priming loops to run.
- * @throws JMSException All underlying exceptions are allowed to fall through.
+ * @return The single reply to destination of this pinger, wrapped in a list.
*/
- public void prime(int x) throws JMSException
+ public List<Destination> getReplyDestinations()
{
- for (int i = 0; i < x; i++)
+ _logger.debug("public List<Destination> getReplyDestinations(): called");
+
+ List<Destination> replyDestinations = new ArrayList<Destination>();
+ replyDestinations.add(_replyDestination);
+
+ return replyDestinations;
+ }
+
+ /**
+ * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
+ * flag is set accoring the ping producer creation options.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
+ public void createProducer() throws JMSException
+ {
+ _logger.debug("public void createProducer(): called");
+
+ _producer = (MessageProducer) _producerSession.createProducer(null);
+ //_producer.setDisableMessageTimestamp(true);
+ _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ }
+
+ /**
+ * Creates consumers for the specified number of destinations. The destinations themselves are also created by
+ * this method.
+ *
+ * @param noOfDestinations The number of destinations to create consumers for.
+ * @param selector The message selector to filter the consumers with.
+ * @param rootName The root of the name, or actual name if only one is being created.
+ * @param unique <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share
+ * the numbering with all pingers on the same JVM.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
+ public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique)
+ throws JMSException
+ {
+ _logger.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations
+ + ", String selector = " + selector + ", String rootName = " + rootName + ", boolean unique = "
+ + unique + "): called");
+
+ // Create the desired number of ping destinations and consumers for them.
+ for (int i = 0; i < noOfDestinations; i++)
{
- // Create and send a small message.
- Message first = getTestMessage(_replyDestination, 0, false);
- sendMessage(first);
+ AMQDestination destination = null;
- commitTx();
+ int id;
- try
+ // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
+ if (unique)
{
- Thread.sleep(100);
+ id = _queueJVMSequenceID.incrementAndGet();
}
- catch (InterruptedException ignore)
+ else
{
-
+ id = _queueSharedId.incrementAndGet();
}
- }
+ // Check if this is a pub/sub pinger, in which case create topics.
+ if (_isPubSub)
+ {
+ AMQShortString name = new AMQShortString(rootName + id);
+ destination = new AMQTopic(name);
+ }
+ // Otherwise this is a p2p pinger, in which case create queues.
+ else
+ {
+ AMQShortString name = new AMQShortString(rootName + id);
+ destination = new AMQQueue(name, name, false, false, false);
+ }
+ // Keep the destination.
+ _pingDestinations.add(destination);
+ }
}
/**
@@ -505,52 +569,64 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
*/
public void onMessage(Message message)
{
+ _logger.debug("public void onMessage(Message message): called");
try
{
-
- // Store the reply, if it has a correlation id that is expected.
+ // Extract the messages correlation id.
String correlationID = message.getJMSCorrelationID();
+ _logger.debug("correlationID = " + correlationID);
- if (_verbose)
- {
- _logger.info(timestampFormatter.format(new Date()) + ": Got reply with correlation id, " + correlationID);
- //_logger.debug("Received from : " + message.getJMSDestination());
- }
-
- // Turn the traffic light to green.
+ // Countdown on the traffic light if there is one for the matching correlation id.
CountDownLatch trafficLight = trafficLights.get(correlationID);
if (trafficLight != null)
{
- if (_messageListener != null)
- {
- synchronized (trafficLight)
- {
- _messageListener.onMessage(message);
- trafficLight.countDown();
- }
- }
- else
+ _logger.debug("Reply was expected, decrementing the latch for the id, " + correlationID);
+
+ // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
+ // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
+ // ensures that each thread will get a unique value for the remaining messages.
+ long trueCount = -1;
+ long remainingCount = -1;
+
+ synchronized (trafficLight)
{
trafficLight.countDown();
- }
- _logger.trace("Reply was expected, decrementing the latch for the id.");
+ trueCount = trafficLight.getCount();
+ remainingCount = trueCount - 1;
- long remainingCount = trafficLight.getCount();
+ _logger.debug("remainingCount = " + remainingCount);
+ _logger.debug("trueCount = " + trueCount);
- if ((remainingCount % _txBatchSize) == 0)
- {
- commitTx(getConsumerSession());
- }
+ // Commit on transaction batch size boundaries. At this point in time the waiting producer remains
+ // blocked, even on the last message.
+ if ((remainingCount % _txBatchSize) == 0)
+ {
+ commitTx(_consumerSession);
+ }
+ // Forward the message and remaining count to any interested chained message listener.
+ if (_chainedMessageListener != null)
+ {
+ _chainedMessageListener.onMessage(message, (int) remainingCount);
+ }
+
+ // Check if this is the last message, in which case release any waiting producers. This is done
+ // after the transaction has been committed and any listeners notified.
+ if (trueCount == 1)
+ {
+ trafficLight.countDown();
+ }
+ }
}
else
{
- _logger.trace("There was no thread waiting for reply: " + correlationID);
+ _logger.debug("There was no thread waiting for reply: " + correlationID);
}
+ // Print out ping times for every message in verbose mode only.
if (_verbose)
{
Long timestamp = message.getLongProperty("timestamp");
@@ -566,32 +642,70 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
{
_logger.warn("There was a JMSException: " + e.getMessage(), e);
}
+
+ _logger.debug("public void onMessage(Message message): ending");
}
/**
* Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
- * before a reply arrives, then a null reply is returned from this method.
+ * before a reply arrives, then a null reply is returned from this method. This method generates a new unqiue
+ * correlation id for the messages.
*
* @param message The message to send.
* @param numPings The number of ping messages to send.
* @param timeout The timeout in milliseconds.
+ *
* @return The number of replies received. This may be less than the number sent if the timeout terminated the
* wait for all prematurely.
+ *
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout) throws JMSException, InterruptedException
{
- String messageCorrelationId = null;
+ _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ + timeout + "): called");
+
+ // Create a unique correlation id to put on the messages before sending them.
+ String messageCorrelationId = Long.toString(idGenerator.incrementAndGet());
+
+ return pingAndWaitForReply(message, numPings, timeout, messageCorrelationId);
+ }
+
+ /**
+ * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
+ * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
+ * the correlation id.
+ *
+ * @param message The message to send.
+ * @param numPings The number of ping messages to send.
+ * @param timeout The timeout in milliseconds.
+ * @param messageCorrelationId The message correlation id.
+ *
+ * @return The number of replies received. This may be less than the number sent if the timeout terminated the
+ * wait for all prematurely.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
+ throws JMSException, InterruptedException
+ {
+ _logger.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
+ + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
try
{
- // Put a unique correlation id on the message before sending it.
- messageCorrelationId = Long.toString(getNewID());
-
+ // Create a count down latch to count the number of replies with. This is created before the messages are
+ // sent so that the replies cannot be received before the count down is created.
+ // One is added to this, so that the last reply becomes a special case. The special case is that the
+ // chained message listener must be called before this sender can be unblocked, but that decrementing the
+ // countdown needs to be done before the chained listener can be called.
+ CountDownLatch trafficLight = new CountDownLatch(numPings + 1);
+ trafficLights.put(messageCorrelationId, trafficLight);
+
+ // Send the specifed number of messages.
pingNoWaitForReply(message, numPings, messageCorrelationId);
- CountDownLatch trafficLight = trafficLights.get(messageCorrelationId);
- // Block the current thread until a reply to the message is received, or it times out.
+ // Block the current thread until replies to all the message are received, or it times out.
trafficLight.await(timeout, TimeUnit.MILLISECONDS);
// Work out how many replies were receieved.
@@ -606,45 +720,37 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
_logger.info("Got all replies on id, " + messageCorrelationId);
}
- commitTx(getConsumerSession());
+ commitTx(_consumerSession);
+
+ _logger.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");
return numReplies;
}
+ // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived,
+ // so will be a memory leak if this is not done.
finally
{
- removeLock(messageCorrelationId);
+ trafficLights.remove(messageCorrelationId);
}
}
- public long getNewID()
- {
- return idGenerator.incrementAndGet();
- }
-
- public CountDownLatch removeLock(String correlationID)
- {
- return trafficLights.remove(correlationID);
- }
-
-
- /*
- * Sends the specified ping message but does not wait for a correlating reply.
- *
- * @param message The message to send.
- * @param numPings The number of pings to send.
- * @return The reply, or null if no reply arrives before the timeout.
- * @throws JMSException All underlying JMSExceptions are allowed to fall through.
- */
- public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException, InterruptedException
+ /**
+ * Sends the specified number of ping messages and does not wait for correlating replies.
+ *
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
+ * @param messageCorrelationId A correlation id to place on all messages sent.
+ *
+ * @throws JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
{
- // Create a count down latch to count the number of replies with. This is created before the message is sent
- // so that the message is not received before the count down is created.
- CountDownLatch trafficLight = new CountDownLatch(numPings);
- trafficLights.put(messageCorrelationId, trafficLight);
+ _logger.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
+ + ", String messageCorrelationId = " + messageCorrelationId + "): called");
message.setJMSCorrelationID(messageCorrelationId);
- // Set up a committed flag to detect uncommitted message at the end of the send loop. This may occurr if the
+ // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
// transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
// needed.
boolean committed = false;
@@ -652,55 +758,46 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
// Send all of the ping messages.
for (int i = 0; i < numPings; i++)
{
- // Reset the committed flag to indicate that there are uncommitted message.
+ // Reset the committed flag to indicate that there are uncommitted messages.
committed = false;
// Re-timestamp the message.
message.setLongProperty("timestamp", System.currentTimeMillis());
- // Check if the test is with multiple-destinations, in which case round robin the destinations
- // as the messages are sent.
- if (getDestinationsCount() > DEFAULT_DESTINATION_COUNT)
- {
- sendMessage(getDestination(i % getDestinationsCount()), message);
- }
- else
- {
- sendMessage(message);
- }
+ // Round robin the destinations as the messages are sent.
+ //return _destinationCount;
+ sendMessage(_pingDestinations.get(i % _pingDestinations.size()), message);
- // Apply message rate throttling if a rate limit has been set up and the throttling batch limit has been
- // reached. See the comment on the throttle batch size for information about the use of batches here.
- if ((rateLimiter != null) && ((i % _throttleBatchSize) == 0))
+ // Apply message rate throttling if a rate limit has been set up.
+ if (_rateLimiter != null)
{
- rateLimiter.throttle();
+ _rateLimiter.throttle();
}
// Call commit every time the commit batch size is reached.
if ((i % _txBatchSize) == 0)
{
- commitTx();
+ commitTx(_producerSession);
committed = true;
}
+
+ // Spew out per message timings on every message sonly in verbose mode.
+ if (_verbose)
+ {
+ _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, "
+ + messageCorrelationId);
+ }
}
// Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
if (!committed)
{
- commitTx();
- }
-
- // Spew out per message timings only in verbose mode.
- if (_verbose)
- {
- _logger.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
+ commitTx(_producerSession);
}
-
}
/**
- * The ping loop implementation. This send out pings of the configured size, persistence and transactionality, and
- * waits for replies and inserts short pauses in between each.
+ * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
*/
public void pingLoop()
{
@@ -711,10 +808,10 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
msg.setLongProperty("timestamp", System.currentTimeMillis());
// Send the message and wait for a reply.
- pingAndWaitForReply(msg, DEFAULT_BATCH_SIZE, TIMEOUT);
+ pingAndWaitForReply(msg, DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT);
// Introduce a short pause if desired.
- pause(SLEEP_TIME);
+ pause(DEFAULT_SLEEP_TIME);
}
catch (JMSException e)
{
@@ -728,79 +825,299 @@ public class PingPongProducer extends AbstractPingProducer implements Runnable,
}
}
- public Destination getReplyDestination()
+ /*public Destination getReplyDestination()
{
return _replyDestination;
+ }*/
+
+ /**
+ * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
+ * here.
+ *
+ * @param messageListener The chained message listener.
+ */
+ public void setChainedMessageListener(ChainedMessageListener messageListener)
+ {
+ _chainedMessageListener = messageListener;
}
- protected void setReplyDestination(Destination destination)
+ /**
+ * Removes any chained message listeners from this pinger.
+ */
+ public void removeChainedMessageListener()
{
- _replyDestination = destination;
+ _chainedMessageListener = null;
}
- public void setMessageListener(MessageListener messageListener)
+ /**
+ * Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
+ *
+ * @param replyQueue The reply-to destination for the message.
+ * @param messageSize The desired size of the message in bytes.
+ * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
+ *
+ * @return A freshly generated test message.
+ *
+ * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
+ */
+ public ObjectMessage getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
{
- _messageListener = messageListener;
+ ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
+ // Timestamp the message.
+ //msg.setLongProperty("timestamp", System.currentTimeMillis());
+
+ return msg;
}
- public CountDownLatch getEndLock(String correlationID)
+ /**
+ * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
+ * flag has been cleared.
+ */
+ public void stop()
{
- return trafficLights.get(correlationID);
+ _publish = false;
}
- /*
- * When the test is being performed with multiple queues, then this method will be used, which has a loop to
- * pick up the next queue from the queues list and sends message to it.
- *
- * @param message
- * @param numPings
- * @throws JMSException
- */
- /*private void pingMultipleQueues(Message message, int numPings) throws JMSException
+ /**
+ * Implements a ping loop that repeatedly pings until the publish flag becomes false.
+ */
+ public void run()
{
- int queueIndex = 0;
- for (int i = 0; i < numPings; i++)
+ // Keep running until the publish flag is cleared.
+ while (_publish)
{
- // Re-timestamp the message.
- message.setLongProperty("timestamp", System.currentTimeMillis());
+ pingLoop();
+ }
+ }
- sendMessage(getDestination(queueIndex++), message);
+ /**
+ * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
+ * connection, this clears the publish flag which in turn will halt the ping loop.
+ *
+ * @param e The exception that triggered this callback method.
+ */
+ public void onException(JMSException e)
+ {
+ _publish = false;
+ _logger.debug("There was a JMSException: " + e.getMessage(), e);
+ }
- // reset the counter to get the first queue
- if (queueIndex == (getDestinationsCount() - 1))
+ /**
+ * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
+ * with the runtime system as a shutdown hook.
+ *
+ * @return A shutdown hook for the ping loop.
+ */
+ public Thread getShutdownHook()
+ {
+ return new Thread(new Runnable()
{
- queueIndex = 0;
- }
+ public void run()
+ {
+ stop();
+ }
+ });
+ }
+
+ /**
+ * Gets the underlying connection that this ping client is running on.
+ *
+ * @return The underlying connection that this ping client is running on.
+ */
+ public Connection getConnection()
+ {
+ return _connection;
+ }
+
+ /**
+ * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
+ *
+ * @param destinations The destinations to listen to.
+ * @param selector A selector to filter the messages with.
+ *
+ * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
+ */
+ public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
+ {
+ _logger.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
+ + ", String selector = " + selector + "): called");
+
+ for (Destination destination : destinations)
+ {
+ // Create a consumer for the destination and set this pinger to listen to its messages.
+ MessageConsumer consumer =
+ _consumerSession.createConsumer(destination, DEFAULT_PREFETCH, DEFAULT_NO_LOCAL, DEFAULT_EXCLUSIVE,
+ selector);
+ consumer.setMessageListener(this);
}
- }*/
+ }
/**
- * A connection listener that logs out any failover complete events. Could do more interesting things with this
- * at some point...
+ * Closes the pingers connection.
+ *
+ * @throws JMSException All JMSException are allowed to fall through.
*/
- public static class FailoverNotifier implements ConnectionListener
+ public void close() throws JMSException
{
- public void bytesSent(long count)
+ _logger.debug("public void close(): called");
+
+ if (_connection != null)
{
+ _connection.close();
}
+ }
+
+ /**
+ * Convenience method to commit the transaction on the specified session. If the session to commit on is not
+ * a transactional session, this method does nothing (unless the failover after send flag is set).
+ *
+ * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit
+ * is applied. This flag applies whether the pinger is transactional or not.
+ *
+ * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
+ * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
+ * after the commit is applied. These flags will only apply if using a transactional pinger.
+ *
+ * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ *
+ * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
+ * method, because commits only apply to transactional pingers, but fail after send applied to transactional
+ * and non-transactional alike.
+ */
+ protected void commitTx(Session session) throws JMSException
+ {
+ _logger.debug("protected void commitTx(Session session): called");
- public void bytesReceived(long count)
+ _logger.trace("Batch time reached");
+ if (_failAfterSend)
{
+ _logger.trace("Batch size reached");
+ if (_failOnce)
+ {
+ _failAfterSend = false;
+ }
+
+ _logger.trace("Failing After Send");
+ doFailover();
}
- public boolean preFailover(boolean redirect)
+ if (session.getTransacted())
{
- return true; //Allow failover
+ try
+ {
+ if (_failBeforeCommit)
+ {
+ if (_failOnce)
+ {
+ _failBeforeCommit = false;
+ }
+
+ _logger.trace("Failing Before Commit");
+ doFailover();
+ }
+
+ session.commit();
+
+ if (_failAfterCommit)
+ {
+ if (_failOnce)
+ {
+ _failAfterCommit = false;
+ }
+
+ _logger.trace("Failing After Commit");
+ doFailover();
+ }
+
+ _logger.trace("Session Commited.");
+ }
+ catch (JMSException e)
+ {
+ _logger.trace("JMSException on commit:" + e.getMessage(), e);
+
+ // Warn that the bounce back client is not available.
+ if (e.getLinkedException() instanceof AMQNoConsumersException)
+ {
+ _logger.debug("No consumers on queue.");
+ }
+
+ try
+ {
+ session.rollback();
+ _logger.trace("Message rolled back.");
+ }
+ catch (JMSException jmse)
+ {
+ _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse);
+
+ // Both commit and rollback failed. Throw the rollback exception.
+ throw jmse;
+ }
+ }
}
+ }
- public boolean preResubscribe()
+ /**
+ * Sends the message to the specified destination. If the destination is null, it gets sent to the default destination
+ * of the ping producer. If an explicit destination is set, this overrides the default.
+ *
+ * @param destination The destination to send to.
+ * @param message The message to send.
+ *
+ * @throws javax.jms.JMSException All underlying JMSExceptions are allowed to fall through.
+ */
+ protected void sendMessage(Destination destination, Message message) throws JMSException
+ {
+ if (_failBeforeSend)
{
- return true; // Allow resubscription
+ if (_failOnce)
+ {
+ _failBeforeSend = false;
+ }
+
+ _logger.trace("Failing Before Send");
+ doFailover();
}
- public void failoverComplete()
+ if (destination == null)
+ {
+ _producer.send(message);
+ }
+ else
{
- _logger.info("App got failover complete callback.");
+ _producer.send(destination, message);
}
}
+
+ /**
+ * Prompts the user to terminate the broker, in order to test failover functionality. This method will block
+ * until the user supplied some input on the terminal.
+ */
+ protected void doFailover()
+ {
+ System.out.println("Kill Broker now then press return");
+ try
+ {
+ System.in.read();
+ }
+ catch (IOException e)
+ { }
+
+ System.out.println("Continuing.");
+ }
+
+ /**
+ * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's
+ * {@link PingPongProducer#onMessage} method is called, the chained listener set through the
+ * {@link PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected
+ * count of messages with that correlation id.
+ *
+ * Provided only one pinger is producing messages with that correlation id, the chained listener will always be
+ * given unique message counts. It will always be called while the producer waiting for all messages to arrive is
+ * still blocked.
+ */
+ public static interface ChainedMessageListener
+ {
+ public void onMessage(Message message, int remainingCount) throws JMSException;
+ }
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
deleted file mode 100644
index bab732e2a6..0000000000
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceProvidingClient.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.requestreply;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.Session;
-import org.apache.qpid.url.URLSyntaxException;
-
-import javax.jms.*;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-public class ServiceProvidingClient
-{
- private static final Logger _logger = Logger.getLogger(ServiceProvidingClient.class);
-
- private MessageProducer _destinationProducer;
-
- private Destination _responseDest;
-
- private AMQConnection _connection;
-
- private Session _session;
- private Session _producerSession;
-
- private boolean _isTransactional;
-
- public ServiceProvidingClient(String brokerDetails, String username, String password,
- String clientName, String virtualPath, String serviceName,
- final int deliveryMode, boolean transactedMode, String selector)
- throws AMQException, JMSException, URLSyntaxException
- {
- _isTransactional = transactedMode;
-
- _logger.info("Delivery Mode: " + (deliveryMode == DeliveryMode.NON_PERSISTENT ? "Non Persistent" : "Persistent")
- + "\t isTransactional: " + _isTransactional);
-
- _connection = new AMQConnection(brokerDetails, username, password, clientName, virtualPath);
- _connection.setConnectionListener(new ConnectionListener()
- {
-
- public void bytesSent(long count)
- {
- }
-
- public void bytesReceived(long count)
- {
- }
-
- public boolean preFailover(boolean redirect)
- {
- return true;
- }
-
- public boolean preResubscribe()
- {
- return true;
- }
-
- public void failoverComplete()
- {
- _logger.info("App got failover complete callback");
- }
- });
- _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
- _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
-
- _logger.info("Service (queue) name is '" + serviceName + "'...");
-
- AMQQueue destination = new AMQQueue(serviceName);
-
- MessageConsumer consumer = _session.createConsumer(destination,
- 100, true, false, selector);
-
- consumer.setMessageListener(new MessageListener()
- {
- private int _messageCount;
-
- public void onMessage(Message message)
- {
- //_logger.info("Got message '" + message + "'");
- TextMessage tm = (TextMessage) message;
- try
- {
- Destination responseDest = tm.getJMSReplyTo();
- if (responseDest == null)
- {
- _logger.info("Producer not created because the response destination is null.");
- return;
- }
-
- if (!responseDest.equals(_responseDest))
- {
- _responseDest = responseDest;
-
- _logger.info("About to create a producer");
- _destinationProducer = _producerSession.createProducer(responseDest);
- _destinationProducer.setDisableMessageTimestamp(true);
- _destinationProducer.setDeliveryMode(deliveryMode);
- _logger.info("After create a producer");
- }
- }
- catch (JMSException e)
- {
- _logger.error("Error creating destination");
- }
- _messageCount++;
- if (_messageCount % 1000 == 0)
- {
- _logger.info("Received message total: " + _messageCount);
- _logger.info("Sending response to '" + _responseDest + "'");
- }
-
- try
- {
- String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText();
- TextMessage msg = _producerSession.createTextMessage(payload);
- if (tm.propertyExists("timeSent"))
- {
- _logger.info("timeSent property set on message");
- long timesent = tm.getLongProperty("timeSent");
- _logger.info("timeSent value is: " + timesent);
- msg.setLongProperty("timeSent", timesent);
- }
-
- _destinationProducer.send(msg);
-
- if (_isTransactional)
- {
- _producerSession.commit();
- }
- if (_isTransactional)
- {
- _session.commit();
- }
- if (_messageCount % 1000 == 0)
- {
- _logger.info("Sent response to '" + _responseDest + "'");
- }
- }
- catch (JMSException e)
- {
- _logger.error("Error sending message: " + e, e);
- }
- }
- });
- }
-
- public void run() throws JMSException
- {
- _connection.start();
- _logger.info("Waiting...");
- }
-
- public static void main(String[] args)
- {
- _logger.info("Starting...");
-
- if (args.length < 5)
- {
- System.out.println("Usage: serviceProvidingClient <brokerDetails> <username> <password> <virtual-path> <serviceQueue> [<P[ersistent]|N[onPersistent]> <T[ransacted]|N[onTransacted]>] [selector]");
- System.exit(1);
- }
- String clientId = null;
- try
- {
- InetAddress address = InetAddress.getLocalHost();
- clientId = address.getHostName() + System.currentTimeMillis();
- }
- catch (UnknownHostException e)
- {
- _logger.error("Error: " + e, e);
- }
-
- int deliveryMode = DeliveryMode.NON_PERSISTENT;
- boolean transactedMode = false;
-
- if (args.length > 7)
- {
- deliveryMode = args[args.length - 2].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT
- : DeliveryMode.NON_PERSISTENT;
-
- transactedMode = args[args.length - 1].toUpperCase().charAt(0) == 'T' ? true : false;
- }
-
- String selector = null;
- if ((args.length == 8) || (args.length == 7))
- {
- selector = args[args.length - 1];
- }
-
- try
- {
- ServiceProvidingClient client = new ServiceProvidingClient(args[0], args[1], args[2],
- clientId, args[3], args[4],
- deliveryMode, transactedMode, selector);
- client.run();
- }
- catch (JMSException e)
- {
- _logger.error("Error: " + e, e);
- }
- catch (AMQException e)
- {
- _logger.error("Error: " + e, e);
- }
- catch (URLSyntaxException e)
- {
- _logger.error("Error: " + e, e);
- }
- }
-}
-
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
deleted file mode 100644
index 57512929c1..0000000000
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/ServiceRequestingClient.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.requestreply;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.message.TestMessageFactory;
-import org.apache.qpid.client.message.JMSTextMessage;
-import org.apache.qpid.jms.MessageConsumer;
-import org.apache.qpid.jms.MessageProducer;
-import org.apache.qpid.jms.Session;
-import org.apache.qpid.url.URLSyntaxException;
-
-import javax.jms.*;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- * A client that behaves as follows:
- * <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
- * <li>Creates a temporary queue</li>
- * <li>Creates messages containing a property(reply-to) that is the name of the temporary queue</li>
- * <li>Fires off a message on the original queue and registers the callbackHandler to listen to the response on the temporary queue</li>
- * <li>Start the loop to send all messages</li>
- * <li>CallbackHandler keeps listening to the responses and exits if all the messages have been received back or
- * if the waiting time for next message is elapsed</li>
- * </ul>
- */
-public class ServiceRequestingClient implements ExceptionListener
-{
- private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);
-
- private long _messageIdentifier = 0;
-
- // time for which callbackHandler should wait for a message before exiting. Default time= 60 secs
- private static long _callbackHandlerWaitingTime = 60000;
-
- private String MESSAGE_DATA;
-
- private AMQConnection _connection;
-
- private Session _session;
- private Session _producerSession;
-
- private long _averageLatency;
-
- private int _messageCount;
- private boolean _isTransactional;
-
- private volatile boolean _completed;
-
- private AMQDestination _tempDestination;
-
- private MessageProducer _producer;
-
- private Object _waiter;
-
- private class CallbackHandler implements MessageListener
- {
- private int _actualMessageCount;
-
- private long _startTime;
- // The time when the last message was received by the callbackHandler
- private long _messageReceivedTime = 0;
- private Object _timerCallbackHandler = new Object();
-
- public CallbackHandler(long startTime)
- {
- _startTime = startTime;
- // Start the timer thread, which will keep checking if test should exit because the waiting time has elapsed
- (new Thread(new TimerThread())).start();
- }
-
- public void onMessage(Message m)
- {
- _messageReceivedTime = System.currentTimeMillis();
- if (_log.isDebugEnabled())
- {
- _log.debug("Message received: " + m);
- }
- try
- {
- m.getPropertyNames();
- if (m.propertyExists("timeSent"))
- {
- long timeSent = m.getLongProperty("timeSent");
- if (_averageLatency == 0)
- {
- _averageLatency = _messageReceivedTime - timeSent;
- _log.info("Latency " + _averageLatency);
- }
- else
- {
- _log.info("Individual latency: " + (_messageReceivedTime - timeSent));
- _averageLatency = (_averageLatency + (_messageReceivedTime - timeSent)) / 2;
- _log.info("Average latency now: " + _averageLatency);
- }
- }
- if(_isTransactional)
- {
- _session.commit();
- }
- }
- catch (JMSException e)
- {
- _log.error("Error getting latency data: " + e, e);
- }
- _actualMessageCount++;
- if (_actualMessageCount % 1000 == 0)
- {
- _log.info("Received message count: " + _actualMessageCount);
- }
-
- checkForMessageID(m);
-
- if (_actualMessageCount == _messageCount)
- {
- finishTesting(_actualMessageCount);
- }
- }
-
- /**
- * sets completed flag to true, closes the callbackHandler connection and notifies the waiter thread,
- * so that the callbackHandler can finish listening for messages. This causes the test to finish.
- * @param receivedMessageCount
- */
- private void finishTesting(int receivedMessageCount)
- {
- _completed = true;
- notifyWaiter();
- notifyTimerThread();
-
- long timeTaken = System.currentTimeMillis() - _startTime;
- _log.info("***** Result *****");
- _log.info("Total messages received = " + receivedMessageCount);
- _log.info("Total time taken to receive " + receivedMessageCount + " messages was " +
- timeTaken + "ms, equivalent to " +
- (receivedMessageCount / (timeTaken / 1000.0)) + " messages per second");
-
- try
- {
- _connection.close();
- _log.info("Connection closed");
- }
- catch (JMSException e)
- {
- _log.error("Error closing connection");
- }
- }
-
- private void notifyTimerThread()
- {
- if (_timerCallbackHandler != null)
- {
- synchronized (_timerCallbackHandler)
- {
- _timerCallbackHandler.notify();
- }
- }
- }
-
- /**
- * Thread class implementing the timer for callbackHandler. The thread will exit the test if the waiting time
- * has elapsed before next message is received.
- */
- private class TimerThread implements Runnable
- {
- public void run()
- {
- do
- {
- try
- {
- synchronized(_timerCallbackHandler)
- {
- _timerCallbackHandler.wait(_callbackHandlerWaitingTime);
- }
- }
- catch (InterruptedException ignore)
- {
-
- }
-
- // exit if callbackHandler has received all messages
- if (_completed)
- {
- return;
- }
- }
- while ((System.currentTimeMillis() - _messageReceivedTime) < _callbackHandlerWaitingTime);
-
- // waiting time has elapsed, so exit the test
- _log.info("");
- _log.info("Exited after waiting for " + _callbackHandlerWaitingTime/1000 + " secs");
- finishTesting(_actualMessageCount);
- }
- }
- } // end of CallbackHandler class
-
- /**
- * Checks if the received AMQ Message ID(delivery tag) is in sequence, by comparing it with the AMQ MessageID
- * of previous message.
- * @param receivedMsg
- */
- private void checkForMessageID(Message receivedMsg)
- {
- try
- {
- JMSTextMessage msg = (JMSTextMessage)receivedMsg;
- if (! (msg.getDeliveryTag() == _messageIdentifier + 1))
- {
- _log.info("Out of sequence message received. Previous AMQ MessageID= " + _messageIdentifier +
- ", Received AMQ messageID= " + receivedMsg.getJMSMessageID());
- }
- _messageIdentifier = msg.getDeliveryTag();
- }
- catch (Exception ex)
- {
- _log.error("Error in checking messageID ", ex);
- }
-
- }
-
- private void notifyWaiter()
- {
- if (_waiter != null)
- {
- synchronized (_waiter)
- {
- _waiter.notify();
- }
- }
- }
-
- public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password,
- String vpath, String commandQueueName,
- int deliveryMode, boolean transactedMode,
- final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException
- {
- _isTransactional = transactedMode;
-
- _log.info("Delivery Mode: " + (deliveryMode == DeliveryMode.NON_PERSISTENT ? "Non Persistent" : "Persistent"));
- _log.info("isTransactional: " + _isTransactional);
-
- _messageCount = messageCount;
- MESSAGE_DATA = TestMessageFactory.createMessagePayload(messageDataLength);
- try
- {
- createConnection(brokerHosts, clientID, username, password, vpath);
- _session = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
- _producerSession = (Session) _connection.createSession(_isTransactional, Session.AUTO_ACKNOWLEDGE);
-
- _connection.setExceptionListener(this);
-
- AMQQueue destination = new AMQQueue(commandQueueName);
- _producer = (MessageProducer) _producerSession.createProducer(destination);
- _producer.setDisableMessageTimestamp(true);
- _producer.setDeliveryMode(deliveryMode);
-
- _tempDestination = new AMQQueue("TempResponse" +
- Long.toString(System.currentTimeMillis()), true);
- MessageConsumer messageConsumer = (MessageConsumer) _session.createConsumer(_tempDestination, 100, true,
- true, null);
-
- //Send first message, then wait a bit to allow the provider to get initialised
- TextMessage first = _session.createTextMessage(MESSAGE_DATA);
- first.setJMSReplyTo(_tempDestination);
- _producer.send(first);
- if (_isTransactional)
- {
- _producerSession.commit();
- }
- try
- {
- Thread.sleep(1000);
- }
- catch (InterruptedException ignore)
- {
- }
-
- //now start the clock and the test...
- final long startTime = System.currentTimeMillis();
-
- messageConsumer.setMessageListener(new CallbackHandler(startTime));
- }
- catch (JMSException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
-
- /**
- * Run the test and notify an object upon receipt of all responses.
- *
- * @param waiter the object that will be notified
- * @throws JMSException
- */
- public void run(Object waiter) throws JMSException
- {
- _waiter = waiter;
- _connection.start();
- for (int i = 1; i < _messageCount; i++)
- {
- TextMessage msg = _producerSession.createTextMessage(MESSAGE_DATA + i);
- msg.setJMSReplyTo(_tempDestination);
- if (i % 1000 == 0)
- {
- long timeNow = System.currentTimeMillis();
- msg.setLongProperty("timeSent", timeNow);
- }
- _producer.send(msg);
- if (_isTransactional)
- {
- _producerSession.commit();
- }
-
- }
- _log.info("Finished sending " + _messageCount + " messages");
- }
-
- public boolean isCompleted()
- {
- return _completed;
- }
-
- private void createConnection(String brokerHosts, String clientID, String username, String password,
- String vpath) throws AMQException, URLSyntaxException
- {
- _connection = new AMQConnection(brokerHosts, username, password, clientID, vpath);
- }
-
- /**
- * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
- * means the server will allocate a name.
- */
- public static void main(String[] args)
- {
- if ((args.length < 6) || (args.length == 8))
- {
- System.err.println("Usage: ServiceRequestingClient <brokerDetails> <username> <password> <vpath> " +
- "<command queue name> <number of messages> [<message size>] " +
- "[<P[ersistent]|N[onPersistent] (Default N)> <T[ransacted]|N[onTransacted] (Default N)>] " +
- "[<waiting time for response in sec (default 60 sec)>]");
- System.exit(1);
- }
- try
- {
- int messageSize = 4096;
- boolean transactedMode = false;
- int deliveryMode = DeliveryMode.NON_PERSISTENT;
-
- if (args.length > 6)
- {
- messageSize = Integer.parseInt(args[6]);
- }
- if (args.length > 7)
- {
- deliveryMode = args[7].toUpperCase().charAt(0) == 'P' ? DeliveryMode.PERSISTENT
- : DeliveryMode.NON_PERSISTENT;
-
- transactedMode = args[8].toUpperCase().charAt(0) == 'T' ? true : false;
- }
-
- if (args.length > 9)
- {
- _callbackHandlerWaitingTime = Long.parseLong(args[9]) * 1000;
- }
-
- _log.info("Each message size = " + messageSize + " bytes");
-
- InetAddress address = InetAddress.getLocalHost();
- String clientID = address.getHostName() + System.currentTimeMillis();
- ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3],
- args[4], deliveryMode, transactedMode, Integer.parseInt(args[5]),
- messageSize);
- Object waiter = new Object();
- client.run(waiter);
-
- // Start a thread to
- synchronized (waiter)
- {
- while (!client.isCompleted())
- {
- waiter.wait();
- }
- }
- }
- catch (UnknownHostException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- catch (Exception e)
- {
- System.err.println("Error in client: " + e);
- e.printStackTrace();
- }
- }
-
- /**
- * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
- */
- public void onException(JMSException e)
- {
- System.err.println(e.getMessage());
- e.printStackTrace(System.err);
- }
-}