summaryrefslogtreecommitdiff
path: root/qpid/java/tools
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/tools')
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java12
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java66
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java98
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java82
4 files changed, 187 insertions, 71 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
index b88b242e6d..90ee7e28ae 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
@@ -77,7 +77,7 @@ public class LatencyTest extends PerfBase implements MessageListener
public LatencyTest()
{
- super();
+ super("");
warmedUp = lock.newCondition();
testCompleted = lock.newCondition();
// Storing the following two for efficiency
@@ -314,7 +314,7 @@ public class LatencyTest extends PerfBase implements MessageListener
public static void main(String[] args)
{
- final LatencyTest latencyTest = new LatencyTest();
+ final LatencyTest latencyTest = new LatencyTest();
Runnable r = new Runnable()
{
public void run()
@@ -334,16 +334,16 @@ public class LatencyTest extends PerfBase implements MessageListener
}
}
};
-
+
Thread t;
try
{
- t = Threading.getThreadFactory().createThread(r);
+ t = Threading.getThreadFactory().createThread(r);
}
catch(Exception e)
{
throw new Error("Error creating latency test thread",e);
}
- t.start();
+ t.start();
}
-} \ No newline at end of file
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
index 340f11f5e4..121e94cea1 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.tools;
+import java.net.InetAddress;
import java.text.DecimalFormat;
import java.util.UUID;
@@ -32,6 +33,10 @@ import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.messaging.Address;
public class PerfBase
{
@@ -57,11 +62,12 @@ public class PerfBase
Destination myControlQueue;
Destination controllerQueue;
DecimalFormat df = new DecimalFormat("###.##");
- String id = UUID.randomUUID().toString();
- String myControlQueueAddr = id + ";{create: always}";
+ String id;
+ String myControlQueueAddr;
MessageProducer sendToController;
MessageConsumer receiveFromController;
+ String prefix = "";
enum OPCode {
REGISTER_CONSUMER, REGISTER_PRODUCER,
@@ -69,7 +75,8 @@ public class PerfBase
CONSUMER_READY, PRODUCER_READY,
PRODUCER_START,
RECEIVED_END_MSG, CONSUMER_STOP,
- RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS
+ RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS,
+ CONTINUE_TEST, STOP_TEST
};
enum MessageType {
@@ -102,14 +109,24 @@ public class PerfBase
MessageType msgType = MessageType.BYTES;
- public PerfBase()
+ public PerfBase(String prefix)
{
params = new TestParams();
+ String host = "";
+ try
+ {
+ host = InetAddress.getLocalHost().getHostName();
+ }
+ catch (Exception e)
+ {
+ }
+ id = host + "-" + UUID.randomUUID().toString();
+ this.prefix = prefix;
+ this.myControlQueueAddr = id + ";{create: always}";
}
public void setUp() throws Exception
{
-
if (params.getHost().equals("") || params.getPort() == -1)
{
con = new AMQConnection(params.getUrl());
@@ -124,7 +141,7 @@ public class PerfBase
controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- dest = new AMQAnyDestination(params.getAddress());
+ dest = createDestination();
controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
myControlQueue = session.createQueue(myControlQueueAddr);
msgType = MessageType.getType(params.getMessageType());
@@ -134,9 +151,38 @@ public class PerfBase
receiveFromController = controllerSession.createConsumer(myControlQueue);
}
+ private Destination createDestination() throws Exception
+ {
+ if (params.isUseUniqueDests())
+ {
+ System.out.println("Prefix : " + prefix);
+ Address addr = Address.parse(params.getAddress());
+ AMQAnyDestination temp = new AMQAnyDestination(params.getAddress());
+ int type = ((AMQSession_0_10)session).resolveAddressType(temp);
+
+ if ( type == AMQDestination.TOPIC_TYPE)
+ {
+ addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
+ System.out.println("Setting subject : " + addr);
+ }
+ else
+ {
+ addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions());
+ System.out.println("Setting name : " + addr);
+ }
+
+ return new AMQAnyDestination(addr);
+ }
+ else
+ {
+ return new AMQAnyDestination(params.getAddress());
+ }
+ }
+
public synchronized void sendMessageToController(MapMessage m) throws Exception
{
m.setString(ID, id);
+ m.setString(REPLY_ADDR,myControlQueueAddr);
sendToController.send(m);
}
@@ -152,6 +198,14 @@ public class PerfBase
}
+ public boolean continueTest() throws Exception
+ {
+ MapMessage m = (MapMessage)receiveFromController.receive();
+ OPCode code = OPCode.values()[m.getInt(CODE)];
+ System.out.println("Received Code : " + code);
+ return (code == OPCode.CONTINUE_TEST);
+ }
+
public void tearDown() throws Exception
{
session.close();
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
index ae439b7ce0..b63892bb51 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
@@ -22,6 +22,7 @@ package org.apache.qpid.tools;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import javax.jms.MapMessage;
import javax.jms.Message;
@@ -29,6 +30,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -49,7 +51,7 @@ import org.apache.qpid.thread.Threading;
* b) They are on separate machines that have their time synced via a Time Server
*
* In order to calculate latency the producer inserts a timestamp
- * hen the message is sent. The consumer will note the current time the message is
+ * when the message is sent. The consumer will note the current time the message is
* received and will calculate the latency as follows
* latency = rcvdTime - msg.getJMSTimestamp()
*
@@ -57,13 +59,9 @@ import org.apache.qpid.thread.Threading;
* variance in latencies.
*
* Avg latency is measured by adding all latencies and dividing by the total msgs.
- * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
*
* Throughput
* ===========
- * System throughput is calculated as follows
- * rcvdMsgCount/(rcvdTime - testStartTime)
- *
* Consumer rate is calculated as
* rcvdMsgCount/(rcvdTime - startTime)
*
@@ -83,7 +81,6 @@ public class PerfConsumer extends PerfBase implements MessageListener
long minLatency = Long.MAX_VALUE;
long totalLatency = 0; // to calculate avg latency.
int rcvdMsgCount = 0;
- long testStartTime = 0; // to measure system throughput
long startTime = 0; // to measure consumer throughput
long rcvdTime = 0;
boolean transacted = false;
@@ -94,9 +91,9 @@ public class PerfConsumer extends PerfBase implements MessageListener
final Object lock = new Object();
- public PerfConsumer()
+ public PerfConsumer(String prefix)
{
- super();
+ super(prefix);
System.out.println("Consumer ID : " + id);
}
@@ -104,26 +101,20 @@ public class PerfConsumer extends PerfBase implements MessageListener
{
super.setUp();
consumer = session.createConsumer(dest);
+ System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n");
// Storing the following two for efficiency
transacted = params.isTransacted();
transSize = params.getTransactionSize();
printStdDev = params.isPrintStdDev();
- if (printStdDev)
- {
- sample = new ArrayList<Long>(params.getMsgCount());
- }
-
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
- m.setString(REPLY_ADDR,myControlQueueAddr);
sendMessageToController(m);
}
public void warmup()throws Exception
{
receiveFromController(OPCode.CONSUMER_STARTWARMUP);
- boolean start = false;
Message msg = consumer.receive();
// This is to ensure we drain the queue before we start the actual test.
while ( msg != null)
@@ -146,12 +137,26 @@ public class PerfConsumer extends PerfBase implements MessageListener
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
sendMessageToController(m);
+ consumer.setMessageListener(this);
}
public void startTest() throws Exception
{
- System.out.println("Consumer Starting test......");
- consumer.setMessageListener(this);
+ System.out.println("Consumer: " + id + " Starting test......" + "\n");
+ resetCounters();
+ }
+
+ public void resetCounters()
+ {
+ rcvdMsgCount = 0;
+ maxLatency = 0;
+ minLatency = Long.MAX_VALUE;
+ totalLatency = 0;
+ if (printStdDev)
+ {
+ sample = null;
+ sample = new ArrayList<Long>(params.getMsgCount());
+ }
}
public void sendResults() throws Exception
@@ -193,7 +198,6 @@ public class PerfConsumer extends PerfBase implements MessageListener
System.out.println(new StringBuilder("Std Dev : ").
append(stdDev/Clock.convertToMiliSecs()).toString());
}
- System.out.println("Consumer has completed the test......\n");
}
public double calculateStdDev(double mean)
@@ -262,8 +266,15 @@ public class PerfConsumer extends PerfBase implements MessageListener
{
setUp();
warmup();
- startTest();
- sendResults();
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ System.out.println("=========================================================\n");
+ System.out.println("Consumer: " + id + " starting a new iteration ......\n");
+ startTest();
+ sendResults();
+ nextIteration = continueTest();
+ }
tearDown();
}
catch(Exception e)
@@ -272,26 +283,43 @@ public class PerfConsumer extends PerfBase implements MessageListener
}
}
- public static void main(String[] args)
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ public static void main(String[] args) throws InterruptedException
{
- final PerfConsumer cons = new PerfConsumer();
- Runnable r = new Runnable()
+ String scriptId = (args.length == 1) ? args[0] : "";
+ int conCount = Integer.getInteger("con_count",1);
+ final CountDownLatch testCompleted = new CountDownLatch(conCount);
+ for (int i=0; i < conCount; i++)
{
- public void run()
+
+ final PerfConsumer cons = new PerfConsumer(scriptId + i);
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ cons.run();
+ testCompleted.countDown();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
{
- cons.run();
+ throw new Error("Error creating consumer thread",e);
}
- };
+ t.start();
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
}
- t.start();
+ testCompleted.await();
+ System.out.println("Consumers have completed the test......\n");
}
} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
index 4cecd6f4df..ac6129ab68 100644
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
@@ -23,6 +23,7 @@ package org.apache.qpid.tools;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
@@ -30,6 +31,7 @@ import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
+import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -51,6 +53,12 @@ import org.apache.qpid.thread.Threading;
* System throughput and latencies calculated by the PerfConsumer are more realistic
* numbers.
*
+ * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
+ * I have done so far, it seems quite useful to compute the producer rate as it gives an
+ * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
+ * you could clearly see the higher latencies and when producer and consumer rates are very close,
+ * latency is good.
+ *
*/
public class PerfProducer extends PerfBase
{
@@ -69,9 +77,9 @@ public class PerfProducer extends PerfBase
double rateFactor = 0.4;
double rate = 0.0;
- public PerfProducer()
+ public PerfProducer(String prefix)
{
- super();
+ super(prefix);
System.out.println("Producer ID : " + id);
}
@@ -114,12 +122,12 @@ public class PerfProducer extends PerfBase
}
producer = session.createProducer(dest);
+ System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName());
producer.setDisableMessageID(params.isDisableMessageID());
producer.setDisableMessageTimestamp(params.isDisableTimestamp());
MapMessage m = controllerSession.createMapMessage();
m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
- m.setString(REPLY_ADDR,myControlQueueAddr);
sendMessageToController(m);
}
@@ -178,7 +186,7 @@ public class PerfProducer extends PerfBase
public void warmup()throws Exception
{
receiveFromController(OPCode.PRODUCER_STARTWARMUP);
- System.out.println("Producer Warming up......");
+ System.out.println("Producer: " + id + " Warming up......");
for (int i=0; i < params.getWarmupCount() -1; i++)
{
@@ -194,6 +202,7 @@ public class PerfProducer extends PerfBase
public void startTest() throws Exception
{
+ resetCounters();
receiveFromController(OPCode.PRODUCER_START);
int count = params.getMsgCount();
boolean transacted = params.isTransacted();
@@ -236,8 +245,11 @@ public class PerfProducer extends PerfBase
append(df.format(rate)).
append(" msg/sec").
toString());
+ }
+
+ public void resetCounters()
+ {
- System.out.println("Producer has completed the test......");
}
public void sendEndMessage() throws Exception
@@ -255,14 +267,27 @@ public class PerfProducer extends PerfBase
sendMessageToController(msg);
}
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
public void run()
{
try
{
setUp();
warmup();
- startTest();
- sendResults();
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ System.out.println("=========================================================\n");
+ System.out.println("Producer: " + id + " starting a new iteration ......\n");
+ startTest();
+ sendResults();
+ nextIteration = continueTest();
+ }
tearDown();
}
catch(Exception e)
@@ -298,27 +323,36 @@ public class PerfProducer extends PerfBase
}
- public static void main(String[] args)
+ public static void main(String[] args) throws InterruptedException
{
- final PerfProducer prod = new PerfProducer();
- prod.startControllerIfNeeded();
- Runnable r = new Runnable()
+ String scriptId = (args.length == 1) ? args[0] : "";
+ int conCount = Integer.getInteger("con_count",1);
+ final CountDownLatch testCompleted = new CountDownLatch(conCount);
+ for (int i=0; i < conCount; i++)
{
- public void run()
+ final PerfProducer prod = new PerfProducer(scriptId + i);
+ prod.startControllerIfNeeded();
+ Runnable r = new Runnable()
{
- prod.run();
- }
- };
+ public void run()
+ {
+ prod.run();
+ testCompleted.countDown();
+ }
+ };
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
+ t.start();
}
- t.start();
+ testCompleted.await();
+ System.out.println("Producers have completed the test......");
}
} \ No newline at end of file