summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java2
-rwxr-xr-xjava/tools/bin/perf-report37
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/Clock.java92
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java68
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java119
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java115
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java296
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/TestParams.java13
8 files changed, 608 insertions, 134 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
index 58f108f1a4..7f735e0722 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
@@ -110,7 +110,7 @@ public class AMQPEncodedMapMessage extends JMSMapMessage
}
// for testing
- Map<String,Object> getMap()
+ public Map<String,Object> getMap()
{
return _map;
}
diff --git a/java/tools/bin/perf-report b/java/tools/bin/perf-report
index 228f792a52..7de3f2b602 100755
--- a/java/tools/bin/perf-report
+++ b/java/tools/bin/perf-report
@@ -21,16 +21,16 @@
# This will run the following test cases defined below and produce
# a report in tabular format.
-SUB_MEM=-Xmx1024M
-PUB_MEM=-Xmx1024M
QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}"
DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}"
TOPIC="amq.topic/test"
DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}"
+COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"
+
waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
cleanup()
-{
+{
pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
if [ "$pids" != "" ]; then
kill -3 $pids
@@ -42,30 +42,31 @@ cleanup()
# $2 consumer options
# $3 producer options
run_testcase()
-{
- sh run-sub $LOG_CONFIG $SUB_MEM $2 > sub.out &
- waitfor sub.out "Warming up"
- sh run-pub $LOG_CONFIG $PUB_MEM $3 > pub.out &
- waitfor sub.out "Completed the test"
- waitfor pub.out "Consumer has completed the test"
+{
+ sh run-sub $COMMON_CONFIG $2 > sub.out &
+ sh run-pub $COMMON_CONFIG $3 > pub.out &
+ waitfor pub.out "Controller: Completed the test"
sleep 2 #give a grace period to shutdown
- print_result $1
+ print_result $1
+ mv pub.out $1.pub.out
+ mv sub.out $1.sub.out
}
print_result()
{
- prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'`
- sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'`
- cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'`
- avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'`
- min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'`
- max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'`
-
- printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
+ prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'`
+ sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'`
+ cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'`
+ avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'`
+ min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'`
+ max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'`
+
+ printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
echo "------------------------------------------------------------------------------------------------"
}
trap cleanup EXIT
+rm -rf *.out #cleanup old files.
echo "Test report on " `date +%F`
echo "================================================================================================"
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
new file mode 100644
index 0000000000..37369959a8
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
@@ -0,0 +1,92 @@
+package org.apache.qpid.tools;
+
+/**
+ * In the future this will be replaced by a Clock abstraction
+ * that can utilize a realtime clock when running in RT Java.
+ */
+
+public class Clock
+{
+ private static Precision precision;
+ private static long offset = -1; // in nano secs
+
+ public enum Precision
+ {
+ NANO_SECS, MILI_SECS;
+
+ static Precision getPrecision(String str)
+ {
+ if ("mili".equalsIgnoreCase(str))
+ {
+ return MILI_SECS;
+ }
+ else
+ {
+ return NANO_SECS;
+ }
+ }
+ };
+
+ static
+ {
+ precision = Precision.getPrecision(System.getProperty("precision","mili"));
+ //offset = Long.getLong("offset",-1);
+
+ System.out.println("Using precision : " + precision + " and offset " + offset);
+ }
+
+ public static Precision getPrecision()
+ {
+ return precision;
+ }
+
+ public static long getTime()
+ {
+ if (precision == Precision.NANO_SECS)
+ {
+ if (offset == -1)
+ {
+ return System.nanoTime();
+ }
+ else
+ {
+ return System.nanoTime() + offset;
+ }
+ }
+ else
+ {
+ if (offset == -1)
+ {
+ return System.currentTimeMillis();
+ }
+ else
+ {
+ return System.currentTimeMillis() + offset/convertToMiliSecs();
+ }
+ }
+ }
+
+ public static long convertToSecs()
+ {
+ if (precision == Precision.NANO_SECS)
+ {
+ return 1000000000;
+ }
+ else
+ {
+ return 1000;
+ }
+ }
+
+ public static long convertToMiliSecs()
+ {
+ if (precision == Precision.NANO_SECS)
+ {
+ return 1000000;
+ }
+ else
+ {
+ return 1;
+ }
+ }
+}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
index c78c752eca..340f11f5e4 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
@@ -21,9 +21,13 @@
package org.apache.qpid.tools;
import java.text.DecimalFormat;
+import java.util.UUID;
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
@@ -31,12 +35,42 @@ import org.apache.qpid.client.AMQConnection;
public class PerfBase
{
+ public final static String CODE = "CODE";
+ public final static String ID = "ID";
+ public final static String REPLY_ADDR = "REPLY_ADDR";
+ public final static String MAX_LATENCY = "MAX_LATENCY";
+ public final static String MIN_LATENCY = "MIN_LATENCY";
+ public final static String AVG_LATENCY = "AVG_LATENCY";
+ public final static String STD_DEV = "STD_DEV";
+ public final static String CONS_RATE = "CONS_RATE";
+ public final static String PROD_RATE = "PROD_RATE";
+ public final static String MSG_COUNT = "MSG_COUNT";
+ public final static String TIMESTAMP = "Timestamp";
+
+ String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
+
TestParams params;
Connection con;
Session session;
+ Session controllerSession;
Destination dest;
- Destination feedbackDest;
+ Destination myControlQueue;
+ Destination controllerQueue;
DecimalFormat df = new DecimalFormat("###.##");
+ String id = UUID.randomUUID().toString();
+ String myControlQueueAddr = id + ";{create: always}";
+
+ MessageProducer sendToController;
+ MessageConsumer receiveFromController;
+
+ enum OPCode {
+ REGISTER_CONSUMER, REGISTER_PRODUCER,
+ PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
+ CONSUMER_READY, PRODUCER_READY,
+ PRODUCER_START,
+ RECEIVED_END_MSG, CONSUMER_STOP,
+ RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS
+ };
enum MessageType {
BYTES, TEXT, MAP, OBJECT;
@@ -88,9 +122,41 @@ public class PerfBase
session = con.createSession(params.isTransacted(),
params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
+ controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
dest = new AMQAnyDestination(params.getAddress());
+ controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
+ myControlQueue = session.createQueue(myControlQueueAddr);
msgType = MessageType.getType(params.getMessageType());
System.out.println("Using " + msgType + " messages");
+
+ sendToController = controllerSession.createProducer(controllerQueue);
+ receiveFromController = controllerSession.createConsumer(myControlQueue);
+ }
+
+ public synchronized void sendMessageToController(MapMessage m) throws Exception
+ {
+ m.setString(ID, id);
+ sendToController.send(m);
+ }
+
+ public void receiveFromController(OPCode expected) throws Exception
+ {
+ MapMessage m = (MapMessage)receiveFromController.receive();
+ OPCode code = OPCode.values()[m.getInt(CODE)];
+ System.out.println("Received Code : " + code);
+ if (expected != code)
+ {
+ throw new Exception("Expected OPCode : " + expected + " but received : " + code);
+ }
+
+ }
+
+ public void tearDown() throws Exception
+ {
+ session.close();
+ controllerSession.close();
+ con.close();
}
public void handleError(Exception e,String msg)
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
index 43ffce23cc..ae439b7ce0 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
@@ -23,12 +23,10 @@ package org.apache.qpid.tools;
import java.util.ArrayList;
import java.util.List;
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
+import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
import javax.jms.TextMessage;
import org.apache.qpid.thread.Threading;
@@ -99,6 +97,7 @@ public class PerfConsumer extends PerfBase implements MessageListener
public PerfConsumer()
{
super();
+ System.out.println("Consumer ID : " + id);
}
public void setUp() throws Exception
@@ -114,68 +113,87 @@ public class PerfConsumer extends PerfBase implements MessageListener
{
sample = new ArrayList<Long>(params.getMsgCount());
}
+
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
+ m.setString(REPLY_ADDR,myControlQueueAddr);
+ sendMessageToController(m);
}
public void warmup()throws Exception
{
- System.out.println("Warming up......");
-
+ receiveFromController(OPCode.CONSUMER_STARTWARMUP);
boolean start = false;
- while (!start)
+ Message msg = consumer.receive();
+ // This is to ensure we drain the queue before we start the actual test.
+ while ( msg != null)
{
- Message msg = consumer.receive();
- if (msg.getBooleanProperty("End"))
+ if (msg.getBooleanProperty("End") == true)
{
- start = true;
- MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
- temp.send(session.createMessage());
- if (params.isTransacted())
- {
- session.commit();
- }
- temp.close();
+ // It's more realistic for the consumer to signal this.
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
+ sendMessageToController(m);
}
+ msg = consumer.receive(1000);
}
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
+ sendMessageToController(m);
}
public void startTest() throws Exception
{
- System.out.println("Starting test......");
+ System.out.println("Consumer Starting test......");
consumer.setMessageListener(this);
}
- public void printResults() throws Exception
+ public void sendResults() throws Exception
{
- synchronized (lock)
+ receiveFromController(OPCode.CONSUMER_STOP);
+
+ double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+ double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
+ double stdDev = 0.0;
+ if (printStdDev)
{
- lock.wait();
+ stdDev = calculateStdDev(avgLatency);
}
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
+ m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
+ m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
+ m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
+ m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
+ m.setDouble(CONS_RATE, consRate);
+ m.setLong(MSG_COUNT, rcvdMsgCount);
+ sendMessageToController(m);
- double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
- double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
System.out.println(new StringBuilder("Consumer rate : ").
append(df.format(consRate)).
append(" msg/sec").toString());
- System.out.println(new StringBuilder("System Throughput : ").
- append(df.format(throughput)).
- append(" msg/sec").toString());
System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency)).
+ append(df.format(avgLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
System.out.println(new StringBuilder("Min Latency : ").
- append(minLatency).
+ append(df.format(minLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
System.out.println(new StringBuilder("Max Latency : ").
- append(maxLatency).
+ append(df.format(maxLatency/Clock.convertToMiliSecs())).
append(" ms").toString());
if (printStdDev)
{
System.out.println(new StringBuilder("Std Dev : ").
- append(calculateStdDev(avgLatency)).toString());
+ append(stdDev/Clock.convertToMiliSecs()).toString());
}
- System.out.println("Completed the test......\n");
+ System.out.println("Consumer has completed the test......\n");
}
public double calculateStdDev(double mean)
@@ -189,25 +207,6 @@ public class PerfConsumer extends PerfBase implements MessageListener
return Math.round(Math.sqrt(v));
}
- public void notifyCompletion(Destination replyTo) throws Exception
- {
- MessageProducer tmp = session.createProducer(replyTo);
- Message endMsg = session.createMessage();
- tmp.send(endMsg);
- if (params.isTransacted())
- {
- session.commit();
- }
- tmp.close();
- }
-
- public void tearDown() throws Exception
- {
- consumer.close();
- session.close();
- con.close();
- }
-
public void onMessage(Message msg)
{
try
@@ -220,22 +219,18 @@ public class PerfConsumer extends PerfBase implements MessageListener
if (msg.getBooleanProperty("End"))
{
- notifyCompletion(msg.getJMSReplyTo());
-
- synchronized (lock)
- {
- lock.notifyAll();
- }
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
+ sendMessageToController(m);
}
else
{
- rcvdTime = System.currentTimeMillis();
+ rcvdTime = Clock.getTime();
rcvdMsgCount ++;
if (rcvdMsgCount == 1)
{
startTime = rcvdTime;
- testStartTime = msg.getJMSTimestamp();
}
if (transacted && (rcvdMsgCount % transSize == 0))
@@ -243,7 +238,7 @@ public class PerfConsumer extends PerfBase implements MessageListener
session.commit();
}
- long latency = rcvdTime - msg.getJMSTimestamp();
+ long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
maxLatency = Math.max(maxLatency, latency);
minLatency = Math.min(minLatency, latency);
totalLatency = totalLatency + latency;
@@ -261,14 +256,14 @@ public class PerfConsumer extends PerfBase implements MessageListener
}
- public void test()
+ public void run()
{
try
{
setUp();
warmup();
startTest();
- printResults();
+ sendResults();
tearDown();
}
catch(Exception e)
@@ -284,7 +279,7 @@ public class PerfConsumer extends PerfBase implements MessageListener
{
public void run()
{
- cons.test();
+ cons.run();
}
};
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
index 6b8ba25d7f..4cecd6f4df 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
@@ -26,8 +26,8 @@ import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
+import javax.jms.MapMessage;
import javax.jms.Message;
-import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import org.apache.qpid.thread.Threading;
@@ -67,17 +67,17 @@ public class PerfProducer extends PerfBase
int msgSizeRange = 1024;
boolean rateLimitProducer = false;
double rateFactor = 0.4;
+ double rate = 0.0;
public PerfProducer()
{
super();
+ System.out.println("Producer ID : " + id);
}
public void setUp() throws Exception
{
super.setUp();
- feedbackDest = session.createTemporaryQueue();
-
durable = params.isDurable();
rateLimitProducer = params.getRate() > 0 ? true : false;
if (rateLimitProducer)
@@ -116,6 +116,11 @@ public class PerfProducer extends PerfBase
producer = session.createProducer(dest);
producer.setDisableMessageID(params.isDisableMessageID());
producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+
+ MapMessage m = controllerSession.createMapMessage();
+ m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
+ m.setString(REPLY_ADDR,myControlQueueAddr);
+ sendMessageToController(m);
}
Object createPayload(int size)
@@ -144,7 +149,6 @@ public class PerfProducer extends PerfBase
}
}
-
protected Message getNextMessage() throws Exception
{
if (cacheMsg)
@@ -173,48 +177,37 @@ public class PerfProducer extends PerfBase
public void warmup()throws Exception
{
- System.out.println("Warming up......");
- MessageConsumer tmp = session.createConsumer(feedbackDest);
+ receiveFromController(OPCode.PRODUCER_STARTWARMUP);
+ System.out.println("Producer Warming up......");
for (int i=0; i < params.getWarmupCount() -1; i++)
{
producer.send(getNextMessage());
}
- Message msg = session.createMessage();
- msg.setBooleanProperty("End", true);
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
+ sendEndMessage();
if (params.isTransacted())
{
session.commit();
}
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
}
public void startTest() throws Exception
{
- System.out.println("Starting test......");
+ receiveFromController(OPCode.PRODUCER_START);
int count = params.getMsgCount();
boolean transacted = params.isTransacted();
int tranSize = params.getTransactionSize();
- long limit = (long)(params.getRate() * rateFactor);
- long timeLimit = (long)(SEC * rateFactor);
+ long limit = (long)(params.getRate() * rateFactor); // in msecs
+ long timeLimit = (long)(SEC * rateFactor); // in msecs
- long start = System.currentTimeMillis();
+ long start = Clock.getTime(); // defaults to nano secs
long interval = start;
for(int i=0; i < count; i++ )
{
Message msg = getNextMessage();
+ msg.setLongProperty(TIMESTAMP, Clock.getTime());
producer.send(msg);
if ( transacted && ((i+1) % tranSize == 0))
{
@@ -223,62 +216,53 @@ public class PerfProducer extends PerfBase
if (rateLimitProducer && i%limit == 0)
{
- long elapsed = System.currentTimeMillis() - interval;
+ long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs
if (elapsed < timeLimit)
{
Thread.sleep(elapsed);
}
- interval = System.currentTimeMillis();
+ interval = Clock.getTime();
}
}
- long time = System.currentTimeMillis() - start;
- double rate = ((double)count/(double)time)*1000;
+ sendEndMessage();
+ if ( transacted)
+ {
+ session.commit();
+ }
+ long time = Clock.getTime() - start;
+ rate = (double)count*Clock.convertToSecs()/(double)time;
System.out.println(new StringBuilder("Producer rate: ").
append(df.format(rate)).
append(" msg/sec").
toString());
+
+ System.out.println("Producer has completed the test......");
}
- public void waitForCompletion() throws Exception
+ public void sendEndMessage() throws Exception
{
- MessageConsumer tmp = session.createConsumer(feedbackDest);
Message msg = session.createMessage();
msg.setBooleanProperty("End", true);
- msg.setJMSReplyTo(feedbackDest);
producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
- System.out.println("Consumer has completed the test......");
}
- public void tearDown() throws Exception
+ public void sendResults() throws Exception
{
- producer.close();
- session.close();
- con.close();
+ MapMessage msg = controllerSession.createMapMessage();
+ msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
+ msg.setDouble(PROD_RATE, rate);
+ sendMessageToController(msg);
}
- public void test()
+ public void run()
{
try
{
setUp();
warmup();
startTest();
- waitForCompletion();
+ sendResults();
tearDown();
}
catch(Exception e)
@@ -287,15 +271,42 @@ public class PerfProducer extends PerfBase
}
}
+ public void startControllerIfNeeded()
+ {
+ if (!params.isExternalController())
+ {
+ final PerfTestController controller = new PerfTestController();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ controller.run();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating controller thread",e);
+ }
+ t.start();
+ }
+ }
+
public static void main(String[] args)
{
final PerfProducer prod = new PerfProducer();
+ prod.startControllerIfNeeded();
Runnable r = new Runnable()
{
public void run()
{
- prod.test();
+ prod.run();
}
};
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
new file mode 100644
index 0000000000..5f2c1a23dc
--- /dev/null
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
@@ -0,0 +1,296 @@
+package org.apache.qpid.tools;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+
+public class PerfTestController extends PerfBase implements MessageListener
+{
+ long totalTestTime;
+
+ private double avgSystemLatency = 0.0;
+ private double minSystemLatency = Double.MAX_VALUE;
+ private double maxSystemLatency = 0;
+ private double avgSystemLatencyStdDev = 0.0;
+
+ private double avgSystemConsRate = 0.0;
+ private double maxSystemConsRate = 0.0;
+ private double minSystemConsRate = Double.MAX_VALUE;
+
+ private double avgSystemProdRate = 0.0;
+ private double maxSystemProdRate = 0.0;
+ private double minSystemProdRate = Double.MAX_VALUE;
+
+ private long totalMsgCount = 0;
+ private double totalSystemThroughput = 0.0;
+
+ private int consumerCount = Integer.getInteger("cons_count", 1);
+ private int producerCount = Integer.getInteger("prod_count", 1);
+ private Map<String,MapMessage> consumers;
+ private Map<String,MapMessage> producers;
+
+ private CountDownLatch consRegistered;
+ private CountDownLatch prodRegistered;
+ private CountDownLatch consReady;
+ private CountDownLatch prodReady;
+ private CountDownLatch receivedEndMsg;
+ private CountDownLatch receivedConsStats;
+ private CountDownLatch receivedProdStats;
+
+ private MessageConsumer consumer;
+ private boolean printStdDev = false;
+
+ public PerfTestController()
+ {
+ super();
+ consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
+ producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
+
+ consRegistered = new CountDownLatch(consumerCount);
+ prodRegistered = new CountDownLatch(producerCount);
+ consReady = new CountDownLatch(consumerCount);
+ prodReady = new CountDownLatch(producerCount);
+ receivedConsStats = new CountDownLatch(consumerCount);
+ receivedProdStats = new CountDownLatch(producerCount);
+ receivedEndMsg = new CountDownLatch(producerCount);
+ printStdDev = params.isPrintStdDev();
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ consumer = controllerSession.createConsumer(controllerQueue);
+ consumer.setMessageListener(this);
+ consRegistered.await();
+ prodRegistered.await();
+ System.out.println("\nController: All producers and consumers have registered......\n");
+ }
+
+ public void warmup() throws Exception
+ {
+ System.out.println("Controller initiating warm up sequence......");
+ sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values());
+ sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values());
+ prodReady.await();
+ consReady.await();
+ System.out.println("\nController : All producers and consumers are ready to start the test......\n");
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("\nController Starting test......");
+ long start = Clock.getTime();
+ sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
+ receivedEndMsg.await();
+ totalTestTime = Clock.getTime() - start;
+ sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values());
+ receivedProdStats.await();
+ receivedConsStats.await();
+ }
+
+ public void calcStats() throws Exception
+ {
+ double totLatency = 0.0;
+ double totStdDev = 0.0;
+ double totalConsRate = 0.0;
+ double totalProdRate = 0.0;
+
+ MapMessage conStat = null; // for error handling
+ try
+ {
+ for (MapMessage m: consumers.values())
+ {
+ conStat = m;
+ minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY));
+ maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY));
+ totLatency = totLatency + m.getDouble(AVG_LATENCY);
+ totStdDev = totStdDev + m.getDouble(STD_DEV);
+
+ minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE));
+ maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE));
+ totalConsRate = totalConsRate + m.getDouble(CONS_RATE);
+
+ totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT);
+ }
+ }
+ catch(Exception e)
+ {
+ System.out.println("Error calculating stats from Consumer : " + conStat);
+ }
+
+
+ MapMessage prodStat = null; // for error handling
+ try
+ {
+ for (MapMessage m: producers.values())
+ {
+ prodStat = m;
+ minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE));
+ maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE));
+ totalProdRate = totalProdRate + m.getDouble(PROD_RATE);
+ }
+ }
+ catch(Exception e)
+ {
+ System.out.println("Error calculating stats from Producer : " + conStat);
+ }
+
+ avgSystemLatency = totLatency/consumers.size();
+ avgSystemLatencyStdDev = totStdDev/consumers.size();
+ avgSystemConsRate = totalConsRate/consumers.size();
+ avgSystemProdRate = totalProdRate/producers.size();
+
+ System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
+
+ totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime);
+ }
+
+ public void printResults() throws Exception
+ {
+ System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
+ System.out.println(new StringBuilder("System Throughput : ").
+ append(df.format(totalSystemThroughput)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Avg Consumer rate : ").
+ append(df.format(avgSystemConsRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Min Consumer rate : ").
+ append(df.format(minSystemConsRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Max Consumer rate : ").
+ append(df.format(maxSystemConsRate)).
+ append(" msg/sec").toString());
+
+ System.out.println(new StringBuilder("Avg Producer rate : ").
+ append(df.format(avgSystemProdRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Min Producer rate : ").
+ append(df.format(minSystemProdRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Max Producer rate : ").
+ append(df.format(maxSystemProdRate)).
+ append(" msg/sec").toString());
+
+ System.out.println(new StringBuilder("Avg System Latency : ").
+ append(df.format(avgSystemLatency)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Min System Latency : ").
+ append(df.format(minSystemLatency)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Max System Latency : ").
+ append(df.format(maxSystemLatency)).
+ append(" ms").toString());
+ if (printStdDev)
+ {
+ System.out.println(new StringBuilder("Avg System Std Dev : ").
+ append(avgSystemLatencyStdDev));
+ }
+ System.out.println("Controller: Completed the test......\n");
+ }
+
+ private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
+ {
+ System.out.println("\nController: Sending code " + code);
+ MessageProducer tmpProd = controllerSession.createProducer(null);
+ MapMessage msg = controllerSession.createMapMessage();
+ msg.setInt(CODE, code.ordinal());
+ for (MapMessage node : nodes)
+ {
+ if (node.getString(REPLY_ADDR) == null)
+ {
+ System.out.println("REPLY_ADDR is null " + node);
+ }
+ else
+ {
+ System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
+ }
+ tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg);
+ }
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ MapMessage m = (MapMessage)msg;
+ OPCode code = OPCode.values()[m.getInt(CODE)];
+
+ System.out.println("\n---------Controller Received Code : " + code);
+ System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
+
+ switch (code)
+ {
+ case REGISTER_CONSUMER :
+ consumers.put(m.getString(ID),m);
+ consRegistered.countDown();
+ break;
+
+ case REGISTER_PRODUCER :
+ producers.put(m.getString(ID),m);
+ prodRegistered.countDown();
+ break;
+
+ case CONSUMER_READY :
+ consReady.countDown();
+ break;
+
+ case PRODUCER_READY :
+ prodReady.countDown();
+ break;
+
+ case RECEIVED_END_MSG :
+ receivedEndMsg.countDown();
+ break;
+
+ case RECEIVED_CONSUMER_STATS :
+ consumers.put(m.getString(ID),m);
+ receivedConsStats.countDown();
+ break;
+
+ case RECEIVED_PRODUCER_STATS :
+ producers.put(m.getString(ID),m);
+ receivedProdStats.countDown();
+ break;
+
+ default:
+ throw new Exception("Invalid OPCode " + code);
+ }
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Error when receiving messages " + msg);
+ }
+ }
+
+ public void run()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ calcStats();
+ printResults();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ PerfTestController controller = new PerfTestController();
+ controller.run();
+ }
+}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
index c656ba3cfc..a563aef6cc 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
@@ -71,6 +71,8 @@ public class TestParams
private long rate = -1;
+ private boolean externalController = false;
+
public TestParams()
{
@@ -94,6 +96,7 @@ public class TestParams
msgType = System.getProperty("msg_type","bytes");
printStdDev = Boolean.getBoolean("print_std_dev");
rate = Long.getLong("rate",-1);
+ externalController = Boolean.getBoolean("ext_controller");
}
public String getUrl()
@@ -190,4 +193,14 @@ public class TestParams
{
return rate;
}
+
+ public boolean isExternalController()
+ {
+ return externalController;
+ }
+
+ public void setAddress(String addr)
+ {
+ address = addr;
+ }
}