summaryrefslogtreecommitdiff
path: root/qpid/java/perftests/src/main
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-05-07 09:40:58 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-05-07 09:40:58 +0000
commit1d6bec9c7ea9f4ea71a919e59010fd0744581ac9 (patch)
tree23f30d77d0953588cb84178c38d73657e59f1c82 /qpid/java/perftests/src/main
parentab8d563c9ac1f7f82a4b417ca02b210d2e51245a (diff)
downloadqpid-python-1d6bec9c7ea9f4ea71a919e59010fd0744581ac9.tar.gz
Merged revisions 534897-534902,534904-535253,535255-535809 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r534897 | bhupendrab | 2007-05-03 15:53:20 +0100 (Thu, 03 May 2007) | 1 line Attribute details background made same as other displays. ........ r535309 | ritchiem | 2007-05-04 17:12:59 +0100 (Fri, 04 May 2007) | 2 lines QPID-466 Changes to FieldTable along with corresponding PropertyValueTest to limit the Java client to only AMQP 0-8 compliant values. ........ r535809 | ritchiem | 2007-05-07 10:28:15 +0100 (Mon, 07 May 2007) | 5 lines QPID-466 Updated FieldTable to ensure no Decimal value is set that is larger than can be transmitted over AMQP. That is a max scale value of Byte.MAX_VALUE and value of up to Integer.MAX_VALUE. Additional tests to ensure this is the case. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@535819 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/perftests/src/main')
-rw-r--r--qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java182
1 files changed, 105 insertions, 77 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index 5dec2125ee..642f3077fd 100644
--- a/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -37,6 +37,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.*;
import org.apache.qpid.client.message.TestMessageFactory;
+import org.apache.qpid.client.message.AMQMessage;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.MessageProducer;
@@ -374,7 +375,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* ping producers on the same JVM.
*/
private static Map<String, PerCorrelationId> perCorrelationIds =
- Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
+ Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());
/** A convenient formatter to use when time stamping output. */
protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
@@ -549,13 +550,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
* to be started to bounce the pings back again.
*
- * @param args The command line arguments.
+ * @param args The command line arguments.
*/
public static void main(String[] args)
{
try
{
- Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}));
+ Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][]{}));
// Create a ping producer overriding its defaults with all options passed on the command line.
PingPongProducer pingProducer = new PingPongProducer(options);
@@ -597,7 +598,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
Thread.sleep(sleepTime);
}
catch (InterruptedException ie)
- { }
+ {
+ }
}
}
@@ -648,11 +650,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* @throws JMSException Any JMSExceptions are allowed to fall through.
*/
public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
- boolean durable) throws JMSException, AMQException
+ boolean durable) throws JMSException, AMQException
{
log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
- + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
- + durable + "): called");
+ + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
+ + durable + "): called");
_pingDestinations = new ArrayList<Destination>();
@@ -688,8 +690,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
else
{
destination =
- AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
- _clientID, (AMQConnection) _connection);
+ AMQTopic.createDurableTopic(new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, rootName + id),
+ _clientID, (AMQConnection) _connection);
log.debug("Created durable topic " + destination);
}
}
@@ -698,11 +700,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
AMQShortString destinationName = new AMQShortString(rootName + id);
destination =
- new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false,
- _isDurable);
+ new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, destinationName, destinationName, false, false,
+ _isDurable);
((AMQSession) _producerSession).createQueue(destinationName, false, _isDurable, false);
((AMQSession) _producerSession).bindQueue(destinationName, destinationName, null,
- ExchangeDefaults.DIRECT_EXCHANGE_NAME);
+ ExchangeDefaults.DIRECT_EXCHANGE_NAME);
log.debug("Created queue " + destination);
}
@@ -715,15 +717,15 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/**
* Creates consumers for the specified destinations and registers this pinger to listen to their messages.
*
- * @param destinations The destinations to listen to.
- * @param selector A selector to filter the messages with.
+ * @param destinations The destinations to listen to.
+ * @param selector A selector to filter the messages with.
*
* @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
*/
public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
{
log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
- + ", String selector = " + selector + "): called");
+ + ", String selector = " + selector + "): called");
log.debug("Creating " + destinations.size() + " reply consumers.");
@@ -731,8 +733,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
// Create a consumer for the destination and set this pinger to listen to its messages.
_consumer =
- _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
- selector);
+ _consumerSession.createConsumer(destination, PREFETCH_DEFAULT, NO_LOCAL_DEFAULT, EXCLUSIVE_DEFAULT,
+ selector);
_consumer.setMessageListener(this);
log.debug("Set this to listen to replies sent to destination: " + destination);
@@ -740,8 +742,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a correlating
- * reply may be waiting on. This is only done if the reply has a correlation id that is expected in the replies map.
+ * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
+ * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the
+ * replies map.
*
* @param message The received message.
*/
@@ -830,26 +833,26 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out before a
- * reply arrives, then a null reply is returned from this method. This method allows the caller to specify the
- * correlation id.
+ * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
+ * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
+ * the correlation id.
*
- * @param message The message to send. If this is null, one is generated.
- * @param numPings The number of ping messages to send.
- * @param timeout The timeout in milliseconds.
- * @param messageCorrelationId The message correlation id. If this is null, one is generated.
+ * @param message The message to send. If this is null, one is generated.
+ * @param numPings The number of ping messages to send.
+ * @param timeout The timeout in milliseconds.
+ * @param messageCorrelationId The message correlation id. If this is null, one is generated.
*
- * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait for
- * all prematurely.
+ * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
+ * for all prematurely.
*
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
* @throws InterruptedException When interrupted by a timeout
*/
public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
- throws JMSException, InterruptedException
+ throws JMSException, InterruptedException
{
log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
- + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");
// Generate a unique correlation id to put on the messages before sending them, if one was not specified.
if (messageCorrelationId == null)
@@ -929,16 +932,16 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/**
* Sends the specified number of ping messages and does not wait for correlating replies.
*
- * @param message The message to send.
- * @param numPings The number of pings to send.
- * @param messageCorrelationId A correlation id to place on all messages sent.
+ * @param message The message to send.
+ * @param numPings The number of pings to send.
+ * @param messageCorrelationId A correlation id to place on all messages sent.
*
* @throws JMSException All underlying JMSExceptions are allowed to fall through.
*/
public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
{
log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
- + ", String messageCorrelationId = " + messageCorrelationId + "): called");
+ + ", String messageCorrelationId = " + messageCorrelationId + "): called");
if (message == null)
{
@@ -1040,9 +1043,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction
- * batch size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared,
- * which will terminate the pinger.
+ * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch
+ * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will
+ * terminate the pinger.
*/
public void pingLoop()
{
@@ -1050,7 +1053,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
{
// Generate a sample message and time stamp it.
Message msg = getTestMessage(_replyDestination, _messageSize, _persistent);
- msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+ setTimestamp(msg);
// Send the message and wait for a reply.
pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
@@ -1068,7 +1071,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set here.
+ * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
+ * here.
*
* @param messageListener The chained message listener.
*/
@@ -1077,9 +1081,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
_chainedMessageListener = messageListener;
}
- /**
- * Removes any chained message listeners from this pinger.
- */
+ /** Removes any chained message listeners from this pinger. */
public void removeChainedMessageListener()
{
_chainedMessageListener = null;
@@ -1088,9 +1090,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/**
* Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
*
- * @param replyQueue The reply-to destination for the message.
- * @param messageSize The desired size of the message in bytes.
- * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
+ * @param replyQueue The reply-to destination for the message.
+ * @param messageSize The desired size of the message in bytes.
+ * @param persistent <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
*
* @return A freshly generated test message.
*
@@ -1101,23 +1103,50 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
ObjectMessage msg = TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
// Timestamp the message in nanoseconds.
- msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+
+ setTimestamp(msg);
return msg;
}
+ protected void setTimestamp(Message msg) throws JMSException
+ {
+ if (((AMQSession) _producerSession).isStrictAMQP())
+ {
+ ((AMQMessage) msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime());
+ }
+ else
+ {
+ msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
+ }
+ }
+
+ protected long getTimestamp(Message msg) throws JMSException
+ {
+
+ if (((AMQSession) _producerSession).isStrictAMQP())
+ {
+ Long value = ((AMQMessage) msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));
+
+ return value == null ? 0L : value;
+ }
+ else
+ {
+ return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
+ }
+ }
+
+
/**
- * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this
- * flag has been cleared.
+ * Stops the ping loop by clearing the publish flag. The current loop will complete before it notices that this flag
+ * has been cleared.
*/
public void stop()
{
_publish = false;
}
- /**
- * Implements a ping loop that repeatedly pings until the publish flag becomes false.
- */
+ /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
public void run()
{
// Keep running until the publish flag is cleared.
@@ -1128,8 +1157,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the connection,
- * this clears the publish flag which in turn will halt the ping loop.
+ * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
+ * connection, this clears the publish flag which in turn will halt the ping loop.
*
* @param e The exception that triggered this callback method.
*/
@@ -1140,20 +1169,20 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with
- * the runtime system as a shutdown hook.
+ * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
+ * with the runtime system as a shutdown hook.
*
* @return A shutdown hook for the ping loop.
*/
public Thread getShutdownHook()
{
return new Thread(new Runnable()
- {
- public void run()
- {
- stop();
- }
- });
+ {
+ public void run()
+ {
+ stop();
+ }
+ });
}
/**
@@ -1202,19 +1231,18 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
* <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is
* applied. This flag applies whether the pinger is transactional or not.
*
- * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit is
- * applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the commit
- * is applied. These flags will only apply if using a transactional pinger.
- *
- * @param session The session to commit
+ * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit
+ * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the
+ * commit is applied. These flags will only apply if using a transactional pinger.
*
- * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ * @param session The session to commit
*
* @return <tt>true</tt> if the session was committed, <tt>false</tt> if it was not.
*
- * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
- * method, because commits only apply to transactional pingers, but fail after send applied to transactional and
- * non-transactional alike.
+ * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
+ * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
+ * method, because commits only apply to transactional pingers, but fail after send applied to transactional and
+ * non-transactional alike.
*/
protected boolean commitTx(Session session) throws JMSException
{
@@ -1335,12 +1363,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
/**
* Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link
* PingPongProducer#onMessage} method is called, the chained listener set through the {@link
- * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of messages
- * with that correlation id.
+ * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of
+ * messages with that correlation id.
*
- * <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be given
- * unique message counts. It will always be called while the producer waiting for all messages to arrive is still
- * blocked.
+ * <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be
+ * given unique message counts. It will always be called while the producer waiting for all messages to arrive is
+ * still blocked.
*/
public static interface ChainedMessageListener
{
@@ -1348,8 +1376,8 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be added to
- * this: read/write lock to make onMessage more concurrent as described in class header comment.
+ * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be
+ * added to this: read/write lock to make onMessage more concurrent as described in class header comment.
*/
protected static class PerCorrelationId
{