From 016edb4f2cc39124a97b2b2434a686202cbaaa02 Mon Sep 17 00:00:00 2001 From: Bhupendra Bhusman Bhardwaj Date: Fri, 16 Mar 2007 15:07:57 +0000 Subject: Added timeout to be passed on command line. Updated PingPongProducer.java to run it either continuously or for a fixed no of messages git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@518999 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/requestreply/PingPongProducer.java | 41 ++++++++++++++++------ .../main/java/org/apache/qpid/topic/Config.java | 15 ++++++++ 2 files changed, 46 insertions(+), 10 deletions(-) (limited to 'java') diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java index f2fbd29314..16d46ce2b8 100644 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java @@ -39,6 +39,7 @@ import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQNoConsumersException; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.message.TestMessageFactory; import org.apache.qpid.jms.MessageProducer; import org.apache.qpid.jms.Session; @@ -559,11 +560,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis boolean transacted = config.isTransacted(); boolean persistent = config.usePersistentMessages(); int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE; - //int messageCount = config.getMessages(); + int messageCount = config.getMessages(); int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT; int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_TX_BATCH_SIZE; int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE; boolean pubsub = config.isPubSub(); + long timeout = (config.getTimeout() != 0) ? config.getTimeout() : DEFAULT_TIMEOUT; String destName = config.getDestination(); if (destName == null) @@ -623,10 +625,19 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. pingProducer.getConnection().setExceptionListener(pingProducer); - // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. - Thread pingThread = new Thread(pingProducer); - pingThread.run(); - pingThread.join(); + // If messageount is 0, then continue sending + if (messageCount == 0) + { + // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. + Thread pingThread = new Thread(pingProducer); + pingThread.start(); + pingThread.join(); + } + else + { + pingProducer.ping(messageCount, timeout); + } + pingProducer.close(); } /** @@ -785,8 +796,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis if ((remainingCount % _txBatchSize) == 0) { commitTx(_consumerSession); + if (!_consumerSession.getTransacted() && + _consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) + { + ((AMQSession)_consumerSession).acknowledge(); + } } - + // Forward the message and remaining count to any interested chained message listener. if (_chainedMessageListener != null) { @@ -1017,9 +1033,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } /** - * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each. + * The ping implementation. This sends out pings waits for replies and inserts short pauses in between each. */ - public void pingLoop() + public void ping(int pingCount, long timeout) { try { @@ -1028,7 +1044,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); // Send the message and wait for a reply. - pingAndWaitForReply(msg, DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT); + pingAndWaitForReply(msg, pingCount, timeout); // Introduce a short pause if desired. pause(DEFAULT_SLEEP_TIME); @@ -1045,6 +1061,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis } } + public void ping() + { + ping(DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT); + } + public Destination getReplyDestination() { return getReplyDestinations().get(0); @@ -1105,7 +1126,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis // Keep running until the publish flag is cleared. while (_publish) { - pingLoop(); + ping(); } } diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java index 60aa9f3930..342b28ca17 100644 --- a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java +++ b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java @@ -51,6 +51,7 @@ public class Config extends AbstractConfig implements ConnectorConfig private int batchSize; private int rate; private boolean ispubsub; + private long timeout; public Config() { @@ -161,6 +162,16 @@ public class Config extends AbstractConfig implements ConnectorConfig this.delay = delay; } + public long getTimeout() + { + return timeout; + } + + public void setTimeout(long time) + { + this.timeout = time; + } + public String getClientId() { return clientId; @@ -285,6 +296,10 @@ public class Config extends AbstractConfig implements ConnectorConfig { destinationName = value; } + else if("-timeout".equalsIgnoreCase(key)) + { + setTimeout(parseLong("Bad timeout data", value)); + } else { System.out.println("Ignoring unrecognised option: " + key); -- cgit v1.2.1