summaryrefslogtreecommitdiff
path: root/java/perftests
diff options
context:
space:
mode:
authorBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-16 15:07:57 +0000
committerBhupendra Bhusman Bhardwaj <bhupendrab@apache.org>2007-03-16 15:07:57 +0000
commit016edb4f2cc39124a97b2b2434a686202cbaaa02 (patch)
tree1dcd2c258facb31a4230285bf78efc640cecfae6 /java/perftests
parent5413139791dc0b518ad8285d4b052d7bdb54ba48 (diff)
downloadqpid-python-016edb4f2cc39124a97b2b2434a686202cbaaa02.tar.gz
Added timeout to be passed on command line.
Updated PingPongProducer.java to run it either continuously or for a fixed no of messages git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@518999 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java41
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/topic/Config.java15
2 files changed, 46 insertions, 10 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
index f2fbd29314..16d46ce2b8 100644
--- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
@@ -39,6 +39,7 @@ import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.message.TestMessageFactory;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
@@ -559,11 +560,12 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
boolean transacted = config.isTransacted();
boolean persistent = config.usePersistentMessages();
int messageSize = (config.getPayload() != 0) ? config.getPayload() : DEFAULT_MESSAGE_SIZE;
- //int messageCount = config.getMessages();
+ int messageCount = config.getMessages();
int destCount = (config.getDestinationsCount() != 0) ? config.getDestinationsCount() : DEFAULT_DESTINATION_COUNT;
int batchSize = (config.getBatchSize() != 0) ? config.getBatchSize() : DEFAULT_TX_BATCH_SIZE;
int rate = (config.getRate() != 0) ? config.getRate() : DEFAULT_RATE;
boolean pubsub = config.isPubSub();
+ long timeout = (config.getTimeout() != 0) ? config.getTimeout() : DEFAULT_TIMEOUT;
String destName = config.getDestination();
if (destName == null)
@@ -623,10 +625,19 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
pingProducer.getConnection().setExceptionListener(pingProducer);
- // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
- Thread pingThread = new Thread(pingProducer);
- pingThread.run();
- pingThread.join();
+ // If messageount is 0, then continue sending
+ if (messageCount == 0)
+ {
+ // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
+ Thread pingThread = new Thread(pingProducer);
+ pingThread.start();
+ pingThread.join();
+ }
+ else
+ {
+ pingProducer.ping(messageCount, timeout);
+ }
+ pingProducer.close();
}
/**
@@ -785,8 +796,13 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
if ((remainingCount % _txBatchSize) == 0)
{
commitTx(_consumerSession);
+ if (!_consumerSession.getTransacted() &&
+ _consumerSession.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ ((AMQSession)_consumerSession).acknowledge();
+ }
}
-
+
// Forward the message and remaining count to any interested chained message listener.
if (_chainedMessageListener != null)
{
@@ -1017,9 +1033,9 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
/**
- * The ping loop implementation. This sends out pings waits for replies and inserts short pauses in between each.
+ * The ping implementation. This sends out pings waits for replies and inserts short pauses in between each.
*/
- public void pingLoop()
+ public void ping(int pingCount, long timeout)
{
try
{
@@ -1028,7 +1044,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
// Send the message and wait for a reply.
- pingAndWaitForReply(msg, DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT);
+ pingAndWaitForReply(msg, pingCount, timeout);
// Introduce a short pause if desired.
pause(DEFAULT_SLEEP_TIME);
@@ -1045,6 +1061,11 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
}
}
+ public void ping()
+ {
+ ping(DEFAULT_TX_BATCH_SIZE, DEFAULT_TIMEOUT);
+ }
+
public Destination getReplyDestination()
{
return getReplyDestinations().get(0);
@@ -1105,7 +1126,7 @@ public class PingPongProducer implements Runnable, MessageListener, ExceptionLis
// Keep running until the publish flag is cleared.
while (_publish)
{
- pingLoop();
+ ping();
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
index 60aa9f3930..342b28ca17 100644
--- a/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
+++ b/java/perftests/src/main/java/org/apache/qpid/topic/Config.java
@@ -51,6 +51,7 @@ public class Config extends AbstractConfig implements ConnectorConfig
private int batchSize;
private int rate;
private boolean ispubsub;
+ private long timeout;
public Config()
{
@@ -161,6 +162,16 @@ public class Config extends AbstractConfig implements ConnectorConfig
this.delay = delay;
}
+ public long getTimeout()
+ {
+ return timeout;
+ }
+
+ public void setTimeout(long time)
+ {
+ this.timeout = time;
+ }
+
public String getClientId()
{
return clientId;
@@ -285,6 +296,10 @@ public class Config extends AbstractConfig implements ConnectorConfig
{
destinationName = value;
}
+ else if("-timeout".equalsIgnoreCase(key))
+ {
+ setTimeout(parseLong("Bad timeout data", value));
+ }
else
{
System.out.println("Ignoring unrecognised option: " + key);