diff options
8 files changed, 608 insertions, 134 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java index 58f108f1a4..7f735e0722 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java @@ -110,7 +110,7 @@ public class AMQPEncodedMapMessage extends JMSMapMessage } // for testing - Map<String,Object> getMap() + public Map<String,Object> getMap() { return _map; } diff --git a/java/tools/bin/perf-report b/java/tools/bin/perf-report index 228f792a52..7de3f2b602 100755 --- a/java/tools/bin/perf-report +++ b/java/tools/bin/perf-report @@ -21,16 +21,16 @@ # This will run the following test cases defined below and produce # a report in tabular format. -SUB_MEM=-Xmx1024M -PUB_MEM=-Xmx1024M QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" TOPIC="amq.topic/test" DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" +COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" + waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } cleanup() -{ +{ pids=`ps aux | grep java | grep Perf | awk '{print $2}'` if [ "$pids" != "" ]; then kill -3 $pids @@ -42,30 +42,31 @@ cleanup() # $2 consumer options # $3 producer options run_testcase() -{ - sh run-sub $LOG_CONFIG $SUB_MEM $2 > sub.out & - waitfor sub.out "Warming up" - sh run-pub $LOG_CONFIG $PUB_MEM $3 > pub.out & - waitfor sub.out "Completed the test" - waitfor pub.out "Consumer has completed the test" +{ + sh run-sub $COMMON_CONFIG $2 > sub.out & + sh run-pub $COMMON_CONFIG $3 > pub.out & + waitfor pub.out "Controller: Completed the test" sleep 2 #give a grace period to shutdown - print_result $1 + print_result $1 + mv pub.out $1.pub.out + mv sub.out $1.sub.out } print_result() { - prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'` - sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'` - avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'` - min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'` - max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency + prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'` + sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'` + cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'` + avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'` + min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'` + max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'` + + printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency echo "------------------------------------------------------------------------------------------------" } trap cleanup EXIT +rm -rf *.out #cleanup old files. echo "Test report on " `date +%F` echo "================================================================================================" diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java new file mode 100644 index 0000000000..37369959a8 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java @@ -0,0 +1,92 @@ +package org.apache.qpid.tools; + +/** + * In the future this will be replaced by a Clock abstraction + * that can utilize a realtime clock when running in RT Java. + */ + +public class Clock +{ + private static Precision precision; + private static long offset = -1; // in nano secs + + public enum Precision + { + NANO_SECS, MILI_SECS; + + static Precision getPrecision(String str) + { + if ("mili".equalsIgnoreCase(str)) + { + return MILI_SECS; + } + else + { + return NANO_SECS; + } + } + }; + + static + { + precision = Precision.getPrecision(System.getProperty("precision","mili")); + //offset = Long.getLong("offset",-1); + + System.out.println("Using precision : " + precision + " and offset " + offset); + } + + public static Precision getPrecision() + { + return precision; + } + + public static long getTime() + { + if (precision == Precision.NANO_SECS) + { + if (offset == -1) + { + return System.nanoTime(); + } + else + { + return System.nanoTime() + offset; + } + } + else + { + if (offset == -1) + { + return System.currentTimeMillis(); + } + else + { + return System.currentTimeMillis() + offset/convertToMiliSecs(); + } + } + } + + public static long convertToSecs() + { + if (precision == Precision.NANO_SECS) + { + return 1000000000; + } + else + { + return 1000; + } + } + + public static long convertToMiliSecs() + { + if (precision == Precision.NANO_SECS) + { + return 1000000; + } + else + { + return 1; + } + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java index c78c752eca..340f11f5e4 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -21,9 +21,13 @@ package org.apache.qpid.tools; import java.text.DecimalFormat; +import java.util.UUID; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.qpid.client.AMQAnyDestination; @@ -31,12 +35,42 @@ import org.apache.qpid.client.AMQConnection; public class PerfBase { + public final static String CODE = "CODE"; + public final static String ID = "ID"; + public final static String REPLY_ADDR = "REPLY_ADDR"; + public final static String MAX_LATENCY = "MAX_LATENCY"; + public final static String MIN_LATENCY = "MIN_LATENCY"; + public final static String AVG_LATENCY = "AVG_LATENCY"; + public final static String STD_DEV = "STD_DEV"; + public final static String CONS_RATE = "CONS_RATE"; + public final static String PROD_RATE = "PROD_RATE"; + public final static String MSG_COUNT = "MSG_COUNT"; + public final static String TIMESTAMP = "Timestamp"; + + String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); + TestParams params; Connection con; Session session; + Session controllerSession; Destination dest; - Destination feedbackDest; + Destination myControlQueue; + Destination controllerQueue; DecimalFormat df = new DecimalFormat("###.##"); + String id = UUID.randomUUID().toString(); + String myControlQueueAddr = id + ";{create: always}"; + + MessageProducer sendToController; + MessageConsumer receiveFromController; + + enum OPCode { + REGISTER_CONSUMER, REGISTER_PRODUCER, + PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, + CONSUMER_READY, PRODUCER_READY, + PRODUCER_START, + RECEIVED_END_MSG, CONSUMER_STOP, + RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS + }; enum MessageType { BYTES, TEXT, MAP, OBJECT; @@ -88,9 +122,41 @@ public class PerfBase session = con.createSession(params.isTransacted(), params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); + controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + dest = new AMQAnyDestination(params.getAddress()); + controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); + myControlQueue = session.createQueue(myControlQueueAddr); msgType = MessageType.getType(params.getMessageType()); System.out.println("Using " + msgType + " messages"); + + sendToController = controllerSession.createProducer(controllerQueue); + receiveFromController = controllerSession.createConsumer(myControlQueue); + } + + public synchronized void sendMessageToController(MapMessage m) throws Exception + { + m.setString(ID, id); + sendToController.send(m); + } + + public void receiveFromController(OPCode expected) throws Exception + { + MapMessage m = (MapMessage)receiveFromController.receive(); + OPCode code = OPCode.values()[m.getInt(CODE)]; + System.out.println("Received Code : " + code); + if (expected != code) + { + throw new Exception("Expected OPCode : " + expected + " but received : " + code); + } + + } + + public void tearDown() throws Exception + { + session.close(); + controllerSession.close(); + con.close(); } public void handleError(Exception e,String msg) diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java index 43ffce23cc..ae439b7ce0 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -23,12 +23,10 @@ package org.apache.qpid.tools; import java.util.ArrayList; import java.util.List; -import javax.jms.BytesMessage; -import javax.jms.Destination; +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; -import javax.jms.MessageProducer; import javax.jms.TextMessage; import org.apache.qpid.thread.Threading; @@ -99,6 +97,7 @@ public class PerfConsumer extends PerfBase implements MessageListener public PerfConsumer() { super(); + System.out.println("Consumer ID : " + id); } public void setUp() throws Exception @@ -114,68 +113,87 @@ public class PerfConsumer extends PerfBase implements MessageListener { sample = new ArrayList<Long>(params.getMsgCount()); } + + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); + m.setString(REPLY_ADDR,myControlQueueAddr); + sendMessageToController(m); } public void warmup()throws Exception { - System.out.println("Warming up......"); - + receiveFromController(OPCode.CONSUMER_STARTWARMUP); boolean start = false; - while (!start) + Message msg = consumer.receive(); + // This is to ensure we drain the queue before we start the actual test. + while ( msg != null) { - Message msg = consumer.receive(); - if (msg.getBooleanProperty("End")) + if (msg.getBooleanProperty("End") == true) { - start = true; - MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); - temp.send(session.createMessage()); - if (params.isTransacted()) - { - session.commit(); - } - temp.close(); + // It's more realistic for the consumer to signal this. + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); + sendMessageToController(m); } + msg = consumer.receive(1000); } + + if (params.isTransacted()) + { + session.commit(); + } + + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); + sendMessageToController(m); } public void startTest() throws Exception { - System.out.println("Starting test......"); + System.out.println("Consumer Starting test......"); consumer.setMessageListener(this); } - public void printResults() throws Exception + public void sendResults() throws Exception { - synchronized (lock) + receiveFromController(OPCode.CONSUMER_STOP); + + double avgLatency = (double)totalLatency/(double)rcvdMsgCount; + double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime); + double stdDev = 0.0; + if (printStdDev) { - lock.wait(); + stdDev = calculateStdDev(avgLatency); } + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); + m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs()); + m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs()); + m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs()); + m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs()); + m.setDouble(CONS_RATE, consRate); + m.setLong(MSG_COUNT, rcvdMsgCount); + sendMessageToController(m); - double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; - double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); System.out.println(new StringBuilder("Consumer rate : "). append(df.format(consRate)). append(" msg/sec").toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(throughput)). - append(" msg/sec").toString()); System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency)). + append(df.format(avgLatency/Clock.convertToMiliSecs())). append(" ms").toString()); System.out.println(new StringBuilder("Min Latency : "). - append(minLatency). + append(df.format(minLatency/Clock.convertToMiliSecs())). append(" ms").toString()); System.out.println(new StringBuilder("Max Latency : "). - append(maxLatency). + append(df.format(maxLatency/Clock.convertToMiliSecs())). append(" ms").toString()); if (printStdDev) { System.out.println(new StringBuilder("Std Dev : "). - append(calculateStdDev(avgLatency)).toString()); + append(stdDev/Clock.convertToMiliSecs()).toString()); } - System.out.println("Completed the test......\n"); + System.out.println("Consumer has completed the test......\n"); } public double calculateStdDev(double mean) @@ -189,25 +207,6 @@ public class PerfConsumer extends PerfBase implements MessageListener return Math.round(Math.sqrt(v)); } - public void notifyCompletion(Destination replyTo) throws Exception - { - MessageProducer tmp = session.createProducer(replyTo); - Message endMsg = session.createMessage(); - tmp.send(endMsg); - if (params.isTransacted()) - { - session.commit(); - } - tmp.close(); - } - - public void tearDown() throws Exception - { - consumer.close(); - session.close(); - con.close(); - } - public void onMessage(Message msg) { try @@ -220,22 +219,18 @@ public class PerfConsumer extends PerfBase implements MessageListener if (msg.getBooleanProperty("End")) { - notifyCompletion(msg.getJMSReplyTo()); - - synchronized (lock) - { - lock.notifyAll(); - } + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); + sendMessageToController(m); } else { - rcvdTime = System.currentTimeMillis(); + rcvdTime = Clock.getTime(); rcvdMsgCount ++; if (rcvdMsgCount == 1) { startTime = rcvdTime; - testStartTime = msg.getJMSTimestamp(); } if (transacted && (rcvdMsgCount % transSize == 0)) @@ -243,7 +238,7 @@ public class PerfConsumer extends PerfBase implements MessageListener session.commit(); } - long latency = rcvdTime - msg.getJMSTimestamp(); + long latency = rcvdTime - msg.getLongProperty(TIMESTAMP); maxLatency = Math.max(maxLatency, latency); minLatency = Math.min(minLatency, latency); totalLatency = totalLatency + latency; @@ -261,14 +256,14 @@ public class PerfConsumer extends PerfBase implements MessageListener } - public void test() + public void run() { try { setUp(); warmup(); startTest(); - printResults(); + sendResults(); tearDown(); } catch(Exception e) @@ -284,7 +279,7 @@ public class PerfConsumer extends PerfBase implements MessageListener { public void run() { - cons.test(); + cons.run(); } }; diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java index 6b8ba25d7f..4cecd6f4df 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -26,8 +26,8 @@ import java.util.Random; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; +import javax.jms.MapMessage; import javax.jms.Message; -import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import org.apache.qpid.thread.Threading; @@ -67,17 +67,17 @@ public class PerfProducer extends PerfBase int msgSizeRange = 1024; boolean rateLimitProducer = false; double rateFactor = 0.4; + double rate = 0.0; public PerfProducer() { super(); + System.out.println("Producer ID : " + id); } public void setUp() throws Exception { super.setUp(); - feedbackDest = session.createTemporaryQueue(); - durable = params.isDurable(); rateLimitProducer = params.getRate() > 0 ? true : false; if (rateLimitProducer) @@ -116,6 +116,11 @@ public class PerfProducer extends PerfBase producer = session.createProducer(dest); producer.setDisableMessageID(params.isDisableMessageID()); producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); + m.setString(REPLY_ADDR,myControlQueueAddr); + sendMessageToController(m); } Object createPayload(int size) @@ -144,7 +149,6 @@ public class PerfProducer extends PerfBase } } - protected Message getNextMessage() throws Exception { if (cacheMsg) @@ -173,48 +177,37 @@ public class PerfProducer extends PerfBase public void warmup()throws Exception { - System.out.println("Warming up......"); - MessageConsumer tmp = session.createConsumer(feedbackDest); + receiveFromController(OPCode.PRODUCER_STARTWARMUP); + System.out.println("Producer Warming up......"); for (int i=0; i < params.getWarmupCount() -1; i++) { producer.send(getNextMessage()); } - Message msg = session.createMessage(); - msg.setBooleanProperty("End", true); - msg.setJMSReplyTo(feedbackDest); - producer.send(msg); + sendEndMessage(); if (params.isTransacted()) { session.commit(); } - - tmp.receive(); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.close(); } public void startTest() throws Exception { - System.out.println("Starting test......"); + receiveFromController(OPCode.PRODUCER_START); int count = params.getMsgCount(); boolean transacted = params.isTransacted(); int tranSize = params.getTransactionSize(); - long limit = (long)(params.getRate() * rateFactor); - long timeLimit = (long)(SEC * rateFactor); + long limit = (long)(params.getRate() * rateFactor); // in msecs + long timeLimit = (long)(SEC * rateFactor); // in msecs - long start = System.currentTimeMillis(); + long start = Clock.getTime(); // defaults to nano secs long interval = start; for(int i=0; i < count; i++ ) { Message msg = getNextMessage(); + msg.setLongProperty(TIMESTAMP, Clock.getTime()); producer.send(msg); if ( transacted && ((i+1) % tranSize == 0)) { @@ -223,62 +216,53 @@ public class PerfProducer extends PerfBase if (rateLimitProducer && i%limit == 0) { - long elapsed = System.currentTimeMillis() - interval; + long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs if (elapsed < timeLimit) { Thread.sleep(elapsed); } - interval = System.currentTimeMillis(); + interval = Clock.getTime(); } } - long time = System.currentTimeMillis() - start; - double rate = ((double)count/(double)time)*1000; + sendEndMessage(); + if ( transacted) + { + session.commit(); + } + long time = Clock.getTime() - start; + rate = (double)count*Clock.convertToSecs()/(double)time; System.out.println(new StringBuilder("Producer rate: "). append(df.format(rate)). append(" msg/sec"). toString()); + + System.out.println("Producer has completed the test......"); } - public void waitForCompletion() throws Exception + public void sendEndMessage() throws Exception { - MessageConsumer tmp = session.createConsumer(feedbackDest); Message msg = session.createMessage(); msg.setBooleanProperty("End", true); - msg.setJMSReplyTo(feedbackDest); producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.receive(); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.close(); - System.out.println("Consumer has completed the test......"); } - public void tearDown() throws Exception + public void sendResults() throws Exception { - producer.close(); - session.close(); - con.close(); + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); + msg.setDouble(PROD_RATE, rate); + sendMessageToController(msg); } - public void test() + public void run() { try { setUp(); warmup(); startTest(); - waitForCompletion(); + sendResults(); tearDown(); } catch(Exception e) @@ -287,15 +271,42 @@ public class PerfProducer extends PerfBase } } + public void startControllerIfNeeded() + { + if (!params.isExternalController()) + { + final PerfTestController controller = new PerfTestController(); + Runnable r = new Runnable() + { + public void run() + { + controller.run(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating controller thread",e); + } + t.start(); + } + } + public static void main(String[] args) { final PerfProducer prod = new PerfProducer(); + prod.startControllerIfNeeded(); Runnable r = new Runnable() { public void run() { - prod.test(); + prod.run(); } }; diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java new file mode 100644 index 0000000000..5f2c1a23dc --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java @@ -0,0 +1,296 @@ +package org.apache.qpid.tools; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; + +import org.apache.qpid.client.message.AMQPEncodedMapMessage; + +public class PerfTestController extends PerfBase implements MessageListener +{ + long totalTestTime; + + private double avgSystemLatency = 0.0; + private double minSystemLatency = Double.MAX_VALUE; + private double maxSystemLatency = 0; + private double avgSystemLatencyStdDev = 0.0; + + private double avgSystemConsRate = 0.0; + private double maxSystemConsRate = 0.0; + private double minSystemConsRate = Double.MAX_VALUE; + + private double avgSystemProdRate = 0.0; + private double maxSystemProdRate = 0.0; + private double minSystemProdRate = Double.MAX_VALUE; + + private long totalMsgCount = 0; + private double totalSystemThroughput = 0.0; + + private int consumerCount = Integer.getInteger("cons_count", 1); + private int producerCount = Integer.getInteger("prod_count", 1); + private Map<String,MapMessage> consumers; + private Map<String,MapMessage> producers; + + private CountDownLatch consRegistered; + private CountDownLatch prodRegistered; + private CountDownLatch consReady; + private CountDownLatch prodReady; + private CountDownLatch receivedEndMsg; + private CountDownLatch receivedConsStats; + private CountDownLatch receivedProdStats; + + private MessageConsumer consumer; + private boolean printStdDev = false; + + public PerfTestController() + { + super(); + consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount); + producers = new ConcurrentHashMap<String,MapMessage>(producerCount); + + consRegistered = new CountDownLatch(consumerCount); + prodRegistered = new CountDownLatch(producerCount); + consReady = new CountDownLatch(consumerCount); + prodReady = new CountDownLatch(producerCount); + receivedConsStats = new CountDownLatch(consumerCount); + receivedProdStats = new CountDownLatch(producerCount); + receivedEndMsg = new CountDownLatch(producerCount); + printStdDev = params.isPrintStdDev(); + } + + public void setUp() throws Exception + { + super.setUp(); + consumer = controllerSession.createConsumer(controllerQueue); + consumer.setMessageListener(this); + consRegistered.await(); + prodRegistered.await(); + System.out.println("\nController: All producers and consumers have registered......\n"); + } + + public void warmup() throws Exception + { + System.out.println("Controller initiating warm up sequence......"); + sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); + sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); + prodReady.await(); + consReady.await(); + System.out.println("\nController : All producers and consumers are ready to start the test......\n"); + } + + public void startTest() throws Exception + { + System.out.println("\nController Starting test......"); + long start = Clock.getTime(); + sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); + receivedEndMsg.await(); + totalTestTime = Clock.getTime() - start; + sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values()); + receivedProdStats.await(); + receivedConsStats.await(); + } + + public void calcStats() throws Exception + { + double totLatency = 0.0; + double totStdDev = 0.0; + double totalConsRate = 0.0; + double totalProdRate = 0.0; + + MapMessage conStat = null; // for error handling + try + { + for (MapMessage m: consumers.values()) + { + conStat = m; + minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY)); + maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY)); + totLatency = totLatency + m.getDouble(AVG_LATENCY); + totStdDev = totStdDev + m.getDouble(STD_DEV); + + minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE)); + maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE)); + totalConsRate = totalConsRate + m.getDouble(CONS_RATE); + + totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT); + } + } + catch(Exception e) + { + System.out.println("Error calculating stats from Consumer : " + conStat); + } + + + MapMessage prodStat = null; // for error handling + try + { + for (MapMessage m: producers.values()) + { + prodStat = m; + minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE)); + maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE)); + totalProdRate = totalProdRate + m.getDouble(PROD_RATE); + } + } + catch(Exception e) + { + System.out.println("Error calculating stats from Producer : " + conStat); + } + + avgSystemLatency = totLatency/consumers.size(); + avgSystemLatencyStdDev = totStdDev/consumers.size(); + avgSystemConsRate = totalConsRate/consumers.size(); + avgSystemProdRate = totalProdRate/producers.size(); + + System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); + + totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); + } + + public void printResults() throws Exception + { + System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); + System.out.println(new StringBuilder("System Throughput : "). + append(df.format(totalSystemThroughput)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Avg Consumer rate : "). + append(df.format(avgSystemConsRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Min Consumer rate : "). + append(df.format(minSystemConsRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Max Consumer rate : "). + append(df.format(maxSystemConsRate)). + append(" msg/sec").toString()); + + System.out.println(new StringBuilder("Avg Producer rate : "). + append(df.format(avgSystemProdRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Min Producer rate : "). + append(df.format(minSystemProdRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Max Producer rate : "). + append(df.format(maxSystemProdRate)). + append(" msg/sec").toString()); + + System.out.println(new StringBuilder("Avg System Latency : "). + append(df.format(avgSystemLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min System Latency : "). + append(df.format(minSystemLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Max System Latency : "). + append(df.format(maxSystemLatency)). + append(" ms").toString()); + if (printStdDev) + { + System.out.println(new StringBuilder("Avg System Std Dev : "). + append(avgSystemLatencyStdDev)); + } + System.out.println("Controller: Completed the test......\n"); + } + + private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception + { + System.out.println("\nController: Sending code " + code); + MessageProducer tmpProd = controllerSession.createProducer(null); + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, code.ordinal()); + for (MapMessage node : nodes) + { + if (node.getString(REPLY_ADDR) == null) + { + System.out.println("REPLY_ADDR is null " + node); + } + else + { + System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); + } + tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); + } + } + + public void onMessage(Message msg) + { + try + { + MapMessage m = (MapMessage)msg; + OPCode code = OPCode.values()[m.getInt(CODE)]; + + System.out.println("\n---------Controller Received Code : " + code); + System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); + + switch (code) + { + case REGISTER_CONSUMER : + consumers.put(m.getString(ID),m); + consRegistered.countDown(); + break; + + case REGISTER_PRODUCER : + producers.put(m.getString(ID),m); + prodRegistered.countDown(); + break; + + case CONSUMER_READY : + consReady.countDown(); + break; + + case PRODUCER_READY : + prodReady.countDown(); + break; + + case RECEIVED_END_MSG : + receivedEndMsg.countDown(); + break; + + case RECEIVED_CONSUMER_STATS : + consumers.put(m.getString(ID),m); + receivedConsStats.countDown(); + break; + + case RECEIVED_PRODUCER_STATS : + producers.put(m.getString(ID),m); + receivedProdStats.countDown(); + break; + + default: + throw new Exception("Invalid OPCode " + code); + } + } + catch (Exception e) + { + handleError(e,"Error when receiving messages " + msg); + } + } + + public void run() + { + try + { + setUp(); + warmup(); + startTest(); + calcStats(); + printResults(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + public static void main(String[] args) + { + PerfTestController controller = new PerfTestController(); + controller.run(); + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java index c656ba3cfc..a563aef6cc 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -71,6 +71,8 @@ public class TestParams private long rate = -1; + private boolean externalController = false; + public TestParams() { @@ -94,6 +96,7 @@ public class TestParams msgType = System.getProperty("msg_type","bytes"); printStdDev = Boolean.getBoolean("print_std_dev"); rate = Long.getLong("rate",-1); + externalController = Boolean.getBoolean("ext_controller"); } public String getUrl() @@ -190,4 +193,14 @@ public class TestParams { return rate; } + + public boolean isExternalController() + { + return externalController; + } + + public void setAddress(String addr) + { + address = addr; + } } |
