diff options
author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 14:42:12 +0000 |
commit | f83677056891e436bf5ba99e79240df2a44528cd (patch) | |
tree | 625bfd644b948e89105630759cf6decb0435354d /java/tools/src | |
parent | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (diff) | |
download | qpid-python-QPID-2519.tar.gz |
Merged out from trunkQPID-2519
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187375 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools/src')
7 files changed, 1051 insertions, 191 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java new file mode 100644 index 0000000000..37369959a8 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java @@ -0,0 +1,92 @@ +package org.apache.qpid.tools; + +/** + * In the future this will be replaced by a Clock abstraction + * that can utilize a realtime clock when running in RT Java. + */ + +public class Clock +{ + private static Precision precision; + private static long offset = -1; // in nano secs + + public enum Precision + { + NANO_SECS, MILI_SECS; + + static Precision getPrecision(String str) + { + if ("mili".equalsIgnoreCase(str)) + { + return MILI_SECS; + } + else + { + return NANO_SECS; + } + } + }; + + static + { + precision = Precision.getPrecision(System.getProperty("precision","mili")); + //offset = Long.getLong("offset",-1); + + System.out.println("Using precision : " + precision + " and offset " + offset); + } + + public static Precision getPrecision() + { + return precision; + } + + public static long getTime() + { + if (precision == Precision.NANO_SECS) + { + if (offset == -1) + { + return System.nanoTime(); + } + else + { + return System.nanoTime() + offset; + } + } + else + { + if (offset == -1) + { + return System.currentTimeMillis(); + } + else + { + return System.currentTimeMillis() + offset/convertToMiliSecs(); + } + } + } + + public static long convertToSecs() + { + if (precision == Precision.NANO_SECS) + { + return 1000000000; + } + else + { + return 1000; + } + } + + public static long convertToMiliSecs() + { + if (precision == Precision.NANO_SECS) + { + return 1000000; + } + else + { + return 1; + } + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java index b88b242e6d..90ee7e28ae 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java @@ -77,7 +77,7 @@ public class LatencyTest extends PerfBase implements MessageListener public LatencyTest() { - super(); + super(""); warmedUp = lock.newCondition(); testCompleted = lock.newCondition(); // Storing the following two for efficiency @@ -314,7 +314,7 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - final LatencyTest latencyTest = new LatencyTest(); + final LatencyTest latencyTest = new LatencyTest(); Runnable r = new Runnable() { public void run() @@ -334,16 +334,16 @@ public class LatencyTest extends PerfBase implements MessageListener } } }; - + Thread t; try { - t = Threading.getThreadFactory().createThread(r); + t = Threading.getThreadFactory().createThread(r); } catch(Exception e) { throw new Error("Error creating latency test thread",e); } - t.start(); + t.start(); } -}
\ No newline at end of file +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java index ac597d17de..121e94cea1 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -20,36 +20,113 @@ */ package org.apache.qpid.tools; +import java.net.InetAddress; import java.text.DecimalFormat; -import java.util.Hashtable; +import java.util.UUID; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; public class PerfBase { + public final static String CODE = "CODE"; + public final static String ID = "ID"; + public final static String REPLY_ADDR = "REPLY_ADDR"; + public final static String MAX_LATENCY = "MAX_LATENCY"; + public final static String MIN_LATENCY = "MIN_LATENCY"; + public final static String AVG_LATENCY = "AVG_LATENCY"; + public final static String STD_DEV = "STD_DEV"; + public final static String CONS_RATE = "CONS_RATE"; + public final static String PROD_RATE = "PROD_RATE"; + public final static String MSG_COUNT = "MSG_COUNT"; + public final static String TIMESTAMP = "Timestamp"; + + String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); + TestParams params; Connection con; Session session; + Session controllerSession; Destination dest; - Destination feedbackDest; + Destination myControlQueue; + Destination controllerQueue; DecimalFormat df = new DecimalFormat("###.##"); + String id; + String myControlQueueAddr; + + MessageProducer sendToController; + MessageConsumer receiveFromController; + String prefix = ""; - public PerfBase() + enum OPCode { + REGISTER_CONSUMER, REGISTER_PRODUCER, + PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, + CONSUMER_READY, PRODUCER_READY, + PRODUCER_START, + RECEIVED_END_MSG, CONSUMER_STOP, + RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS, + CONTINUE_TEST, STOP_TEST + }; + + enum MessageType { + BYTES, TEXT, MAP, OBJECT; + + public static MessageType getType(String s) throws Exception + { + if ("text".equalsIgnoreCase(s)) + { + return TEXT; + } + else if ("bytes".equalsIgnoreCase(s)) + { + return BYTES; + } + /*else if ("map".equalsIgnoreCase(s)) + { + return MAP; + } + else if ("object".equalsIgnoreCase(s)) + { + return OBJECT; + }*/ + else + { + throw new Exception("Unsupported message type"); + } + } + }; + + MessageType msgType = MessageType.BYTES; + + public PerfBase(String prefix) { params = new TestParams(); + String host = ""; + try + { + host = InetAddress.getLocalHost().getHostName(); + } + catch (Exception e) + { + } + id = host + "-" + UUID.randomUUID().toString(); + this.prefix = prefix; + this.myControlQueueAddr = id + ";{create: always}"; } public void setUp() throws Exception - { - + { if (params.getHost().equals("") || params.getPort() == -1) { con = new AMQConnection(params.getUrl()); @@ -62,7 +139,78 @@ public class PerfBase session = con.createSession(params.isTransacted(), params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); - dest = new AMQAnyDestination(params.getAddress()); + controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + + dest = createDestination(); + controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); + myControlQueue = session.createQueue(myControlQueueAddr); + msgType = MessageType.getType(params.getMessageType()); + System.out.println("Using " + msgType + " messages"); + + sendToController = controllerSession.createProducer(controllerQueue); + receiveFromController = controllerSession.createConsumer(myControlQueue); + } + + private Destination createDestination() throws Exception + { + if (params.isUseUniqueDests()) + { + System.out.println("Prefix : " + prefix); + Address addr = Address.parse(params.getAddress()); + AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); + int type = ((AMQSession_0_10)session).resolveAddressType(temp); + + if ( type == AMQDestination.TOPIC_TYPE) + { + addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); + System.out.println("Setting subject : " + addr); + } + else + { + addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions()); + System.out.println("Setting name : " + addr); + } + + return new AMQAnyDestination(addr); + } + else + { + return new AMQAnyDestination(params.getAddress()); + } + } + + public synchronized void sendMessageToController(MapMessage m) throws Exception + { + m.setString(ID, id); + m.setString(REPLY_ADDR,myControlQueueAddr); + sendToController.send(m); + } + + public void receiveFromController(OPCode expected) throws Exception + { + MapMessage m = (MapMessage)receiveFromController.receive(); + OPCode code = OPCode.values()[m.getInt(CODE)]; + System.out.println("Received Code : " + code); + if (expected != code) + { + throw new Exception("Expected OPCode : " + expected + " but received : " + code); + } + + } + + public boolean continueTest() throws Exception + { + MapMessage m = (MapMessage)receiveFromController.receive(); + OPCode code = OPCode.values()[m.getInt(CODE)]; + System.out.println("Received Code : " + code); + return (code == OPCode.CONTINUE_TEST); + } + + public void tearDown() throws Exception + { + session.close(); + controllerSession.close(); + con.close(); } public void handleError(Exception e,String msg) diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java index 0ef0455a64..b63892bb51 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -20,13 +20,17 @@ */ package org.apache.qpid.tools; -import javax.jms.Destination; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; -import javax.jms.MessageProducer; import javax.jms.TextMessage; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -47,7 +51,7 @@ import org.apache.qpid.thread.Threading; * b) They are on separate machines that have their time synced via a Time Server * * In order to calculate latency the producer inserts a timestamp - * hen the message is sent. The consumer will note the current time the message is + * when the message is sent. The consumer will note the current time the message is * received and will calculate the latency as follows * latency = rcvdTime - msg.getJMSTimestamp() * @@ -55,13 +59,9 @@ import org.apache.qpid.thread.Threading; * variance in latencies. * * Avg latency is measured by adding all latencies and dividing by the total msgs. - * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount * * Throughput * =========== - * System throughput is calculated as follows - * rcvdMsgCount/(rcvdTime - testStartTime) - * * Consumer rate is calculated as * rcvdMsgCount/(rcvdTime - startTime) * @@ -81,130 +81,160 @@ public class PerfConsumer extends PerfBase implements MessageListener long minLatency = Long.MAX_VALUE; long totalLatency = 0; // to calculate avg latency. int rcvdMsgCount = 0; - long testStartTime = 0; // to measure system throughput long startTime = 0; // to measure consumer throughput long rcvdTime = 0; boolean transacted = false; int transSize = 0; + boolean printStdDev = false; + List<Long> sample; + final Object lock = new Object(); - public PerfConsumer() + public PerfConsumer(String prefix) { - super(); + super(prefix); + System.out.println("Consumer ID : " + id); } public void setUp() throws Exception { super.setUp(); consumer = session.createConsumer(dest); + System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); // Storing the following two for efficiency transacted = params.isTransacted(); transSize = params.getTransactionSize(); + printStdDev = params.isPrintStdDev(); + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); + sendMessageToController(m); } public void warmup()throws Exception { - System.out.println("Warming up......"); - - boolean start = false; - while (!start) + receiveFromController(OPCode.CONSUMER_STARTWARMUP); + Message msg = consumer.receive(); + // This is to ensure we drain the queue before we start the actual test. + while ( msg != null) { - Message msg = consumer.receive(); - if (msg instanceof TextMessage) + if (msg.getBooleanProperty("End") == true) { - if (((TextMessage)msg).getText().equals("End")) - { - start = true; - MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); - temp.send(session.createMessage()); - if (params.isTransacted()) - { - session.commit(); - } - temp.close(); - } + // It's more realistic for the consumer to signal this. + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); + sendMessageToController(m); } + msg = consumer.receive(1000); + } + + if (params.isTransacted()) + { + session.commit(); } + + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); + sendMessageToController(m); + consumer.setMessageListener(this); } public void startTest() throws Exception { - System.out.println("Starting test......"); - consumer.setMessageListener(this); + System.out.println("Consumer: " + id + " Starting test......" + "\n"); + resetCounters(); } - public void printResults() throws Exception + public void resetCounters() { - synchronized (lock) + rcvdMsgCount = 0; + maxLatency = 0; + minLatency = Long.MAX_VALUE; + totalLatency = 0; + if (printStdDev) { - lock.wait(); + sample = null; + sample = new ArrayList<Long>(params.getMsgCount()); } + } + + public void sendResults() throws Exception + { + receiveFromController(OPCode.CONSUMER_STOP); double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; - double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; + double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime); + double stdDev = 0.0; + if (printStdDev) + { + stdDev = calculateStdDev(avgLatency); + } + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); + m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs()); + m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs()); + m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs()); + m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs()); + m.setDouble(CONS_RATE, consRate); + m.setLong(MSG_COUNT, rcvdMsgCount); + sendMessageToController(m); + System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); System.out.println(new StringBuilder("Consumer rate : "). append(df.format(consRate)). append(" msg/sec").toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(throughput)). - append(" msg/sec").toString()); System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency)). + append(df.format(avgLatency/Clock.convertToMiliSecs())). append(" ms").toString()); System.out.println(new StringBuilder("Min Latency : "). - append(minLatency). + append(df.format(minLatency/Clock.convertToMiliSecs())). append(" ms").toString()); System.out.println(new StringBuilder("Max Latency : "). - append(maxLatency). + append(df.format(maxLatency/Clock.convertToMiliSecs())). append(" ms").toString()); - System.out.println("Completed the test......\n"); - } - - public void notifyCompletion(Destination replyTo) throws Exception - { - MessageProducer tmp = session.createProducer(replyTo); - Message endMsg = session.createMessage(); - tmp.send(endMsg); - if (params.isTransacted()) + if (printStdDev) { - session.commit(); + System.out.println(new StringBuilder("Std Dev : "). + append(stdDev/Clock.convertToMiliSecs()).toString()); } - tmp.close(); } - public void tearDown() throws Exception + public double calculateStdDev(double mean) { - consumer.close(); - session.close(); - con.close(); + double v = 0; + for (double latency: sample) + { + v = v + Math.pow((latency-mean), 2); + } + v = v/sample.size(); + return Math.round(Math.sqrt(v)); } public void onMessage(Message msg) { try { - if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) + // To figure out the decoding overhead of text + if (msgType == MessageType.TEXT) { - notifyCompletion(msg.getJMSReplyTo()); + ((TextMessage)msg).getText(); + } - synchronized (lock) - { - lock.notifyAll(); - } + if (msg.getBooleanProperty("End")) + { + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); + sendMessageToController(m); } else { - rcvdTime = System.currentTimeMillis(); + rcvdTime = Clock.getTime(); rcvdMsgCount ++; if (rcvdMsgCount == 1) { startTime = rcvdTime; - testStartTime = msg.getJMSTimestamp(); } if (transacted && (rcvdMsgCount % transSize == 0)) @@ -212,10 +242,14 @@ public class PerfConsumer extends PerfBase implements MessageListener session.commit(); } - long latency = rcvdTime - msg.getJMSTimestamp(); + long latency = rcvdTime - msg.getLongProperty(TIMESTAMP); maxLatency = Math.max(maxLatency, latency); minLatency = Math.min(minLatency, latency); totalLatency = totalLatency + latency; + if (printStdDev) + { + sample.add(latency); + } } } @@ -226,14 +260,21 @@ public class PerfConsumer extends PerfBase implements MessageListener } - public void test() + public void run() { try { setUp(); warmup(); - startTest(); - printResults(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Consumer: " + id + " starting a new iteration ......\n"); + startTest(); + sendResults(); + nextIteration = continueTest(); + } tearDown(); } catch(Exception e) @@ -242,26 +283,43 @@ public class PerfConsumer extends PerfBase implements MessageListener } } - public static void main(String[] args) + @Override + public void tearDown() throws Exception + { + super.tearDown(); + } + + public static void main(String[] args) throws InterruptedException { - final PerfConsumer cons = new PerfConsumer(); - Runnable r = new Runnable() + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = Integer.getInteger("con_count",1); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) { - public void run() + + final PerfConsumer cons = new PerfConsumer(scriptId + i); + Runnable r = new Runnable() { - cons.test(); + public void run() + { + cons.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); + } - t.start(); + testCompleted.await(); + System.out.println("Consumers have completed the test......\n"); } }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java index 015d1e6205..ac6129ab68 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -23,13 +23,15 @@ package org.apache.qpid.tools; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; +import javax.jms.MapMessage; import javax.jms.Message; -import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -51,38 +53,52 @@ import org.apache.qpid.thread.Threading; * System throughput and latencies calculated by the PerfConsumer are more realistic * numbers. * + * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs + * I have done so far, it seems quite useful to compute the producer rate as it gives an + * indication of how the system behaves. For ex if there is a gap between producer and consumer rates + * you could clearly see the higher latencies and when producer and consumer rates are very close, + * latency is good. + * */ public class PerfProducer extends PerfBase { + private static long SEC = 60000; + MessageProducer producer; Message msg; - byte[] payload; - List<byte[]> payloads; + Object payload; + List<Object> payloads; boolean cacheMsg = false; boolean randomMsgSize = false; boolean durable = false; Random random; int msgSizeRange = 1024; - - public PerfProducer() + boolean rateLimitProducer = false; + double rateFactor = 0.4; + double rate = 0.0; + + public PerfProducer(String prefix) { - super(); + super(prefix); + System.out.println("Producer ID : " + id); } public void setUp() throws Exception { super.setUp(); - feedbackDest = session.createTemporaryQueue(); - durable = params.isDurable(); - + rateLimitProducer = params.getRate() > 0 ? true : false; + if (rateLimitProducer) + { + System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec"); + } + // if message caching is enabled we pre create the message // else we pre create the payload if (params.isCacheMessage()) { cacheMsg = true; - - msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg = createMessage(createPayload(params.getMsgSize())); msg.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT @@ -93,21 +109,52 @@ public class PerfProducer extends PerfBase random = new Random(20080921); randomMsgSize = true; msgSizeRange = params.getMsgSize(); - payloads = new ArrayList<byte[]>(msgSizeRange); - + payloads = new ArrayList<Object>(msgSizeRange); + for (int i=0; i < msgSizeRange; i++) { - payloads.add(MessageFactory.createMessagePayload(i).getBytes()); + payloads.add(createPayload(i)); } - } + } else { - payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + payload = createPayload(params.getMsgSize()); } producer = session.createProducer(dest); + System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); producer.setDisableMessageID(params.isDisableMessageID()); producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); + sendMessageToController(m); + } + + Object createPayload(int size) + { + if (msgType == MessageType.TEXT) + { + return MessageFactory.createMessagePayload(size); + } + else + { + return MessageFactory.createMessagePayload(size).getBytes(); + } + } + + Message createMessage(Object payload) throws Exception + { + if (msgType == MessageType.TEXT) + { + return session.createTextMessage((String)payload); + } + else + { + BytesMessage m = session.createBytesMessage(); + m.writeBytes((byte[])payload); + return m; + } } protected Message getNextMessage() throws Exception @@ -117,117 +164,130 @@ public class PerfProducer extends PerfBase return msg; } else - { - msg = session.createBytesMessage(); - + { + Message m; + if (!randomMsgSize) { - ((BytesMessage)msg).writeBytes(payload); + m = createMessage(payload); } else { - ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); + m = createMessage(payloads.get(random.nextInt(msgSizeRange))); } - msg.setJMSDeliveryMode(durable? + m.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT ); - return msg; + return m; } } public void warmup()throws Exception { - System.out.println("Warming up......"); - MessageConsumer tmp = session.createConsumer(feedbackDest); + receiveFromController(OPCode.PRODUCER_STARTWARMUP); + System.out.println("Producer: " + id + " Warming up......"); for (int i=0; i < params.getWarmupCount() -1; i++) { producer.send(getNextMessage()); } - Message msg = session.createTextMessage("End"); - msg.setJMSReplyTo(feedbackDest); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.receive(); + sendEndMessage(); if (params.isTransacted()) { session.commit(); } - - tmp.close(); } public void startTest() throws Exception { - System.out.println("Starting test......"); + resetCounters(); + receiveFromController(OPCode.PRODUCER_START); int count = params.getMsgCount(); boolean transacted = params.isTransacted(); int tranSize = params.getTransactionSize(); - long start = System.currentTimeMillis(); + long limit = (long)(params.getRate() * rateFactor); // in msecs + long timeLimit = (long)(SEC * rateFactor); // in msecs + + long start = Clock.getTime(); // defaults to nano secs + long interval = start; for(int i=0; i < count; i++ ) { Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); + msg.setLongProperty(TIMESTAMP, Clock.getTime()); producer.send(msg); if ( transacted && ((i+1) % tranSize == 0)) { session.commit(); } + + if (rateLimitProducer && i%limit == 0) + { + long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs + if (elapsed < timeLimit) + { + Thread.sleep(elapsed); + } + interval = Clock.getTime(); + + } + } + sendEndMessage(); + if ( transacted) + { + session.commit(); } - long time = System.currentTimeMillis() - start; - double rate = ((double)count/(double)time)*1000; + long time = Clock.getTime() - start; + rate = (double)count*Clock.convertToSecs()/(double)time; System.out.println(new StringBuilder("Producer rate: "). append(df.format(rate)). append(" msg/sec"). toString()); } - public void waitForCompletion() throws Exception + public void resetCounters() { - MessageConsumer tmp = session.createConsumer(feedbackDest); - Message msg = session.createTextMessage("End"); - msg.setJMSReplyTo(feedbackDest); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - tmp.receive(); + } - if (params.isTransacted()) - { - session.commit(); - } + public void sendEndMessage() throws Exception + { + Message msg = session.createMessage(); + msg.setBooleanProperty("End", true); + producer.send(msg); + } - tmp.close(); - System.out.println("Consumer has completed the test......"); + public void sendResults() throws Exception + { + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); + msg.setDouble(PROD_RATE, rate); + sendMessageToController(msg); } + @Override public void tearDown() throws Exception { - producer.close(); - session.close(); - con.close(); + super.tearDown(); } - public void test() + public void run() { try { setUp(); warmup(); - startTest(); - waitForCompletion(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Producer: " + id + " starting a new iteration ......\n"); + startTest(); + sendResults(); + nextIteration = continueTest(); + } tearDown(); } catch(Exception e) @@ -236,27 +296,63 @@ public class PerfProducer extends PerfBase } } - - public static void main(String[] args) + public void startControllerIfNeeded() { - final PerfProducer prod = new PerfProducer(); - Runnable r = new Runnable() + if (!params.isExternalController()) { - public void run() + final PerfTestController controller = new PerfTestController(); + Runnable r = new Runnable() + { + public void run() + { + controller.run(); + } + }; + + Thread t; + try { - prod.test(); + t = Threading.getThreadFactory().createThread(r); } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); + catch(Exception e) + { + throw new Error("Error creating controller thread",e); + } + t.start(); } - catch(Exception e) + } + + + public static void main(String[] args) throws InterruptedException + { + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = Integer.getInteger("con_count",1); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) { - throw new Error("Error creating producer thread",e); + final PerfProducer prod = new PerfProducer(scriptId + i); + prod.startControllerIfNeeded(); + Runnable r = new Runnable() + { + public void run() + { + prod.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); } - t.start(); + testCompleted.await(); + System.out.println("Producers have completed the test......"); } }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java new file mode 100644 index 0000000000..5c98c645f4 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java @@ -0,0 +1,422 @@ +package org.apache.qpid.tools; + +import java.io.FileWriter; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; + +import org.apache.qpid.client.message.AMQPEncodedMapMessage; + +/** + * The Controller coordinates a test run between a number + * of producers and consumers, configured via -Dprod_count and -Dcons_count. + * + * It waits till all the producers and consumers have registered and then + * conducts a warmup run. Once all consumers and producers have completed + * the warmup run and is ready, it will conduct the actual test run and + * collect all stats from the participants and calculates the system + * throughput, the avg/min/max for producer rates, consumer rates and latency. + * + * These stats are then printed to std out. + * The Controller also prints events to std out to give a running account + * of the test run in progress. Ex registering of participants, starting warmup ..etc. + * This allows a scripting tool to monitor the progress. + * + * The Controller can be run in two modes. + * 1. A single test run (default) where it just runs until the message count specified + * for the producers via -Dmsg_count is sent and received. + * + * 2. Time based, configured via -Dduration=x, where x is in mins. + * In this mode, the Controller repeatedly cycles through the tests (after an initial + * warmup run) until the desired time is reached. If a test run is in progress + * and the time is up, it will allow the run the complete. + * + * After each iteration, the stats will be printed out in csv format to a separate log file. + * System throughput is calculated as follows + * totalMsgCount/(totalTestTime) + */ +public class PerfTestController extends PerfBase implements MessageListener +{ + enum TestMode { SINGLE_RUN, TIME_BASED }; + + TestMode testMode = TestMode.SINGLE_RUN; + + long totalTestTime; + + private double avgSystemLatency = 0.0; + private double minSystemLatency = Double.MAX_VALUE; + private double maxSystemLatency = 0; + private double avgSystemLatencyStdDev = 0.0; + + private double avgSystemConsRate = 0.0; + private double maxSystemConsRate = 0.0; + private double minSystemConsRate = Double.MAX_VALUE; + + private double avgSystemProdRate = 0.0; + private double maxSystemProdRate = 0.0; + private double minSystemProdRate = Double.MAX_VALUE; + + private long totalMsgCount = 0; + private double totalSystemThroughput = 0.0; + + private int consumerCount = Integer.getInteger("cons_count", 1); + private int producerCount = Integer.getInteger("prod_count", 1); + private int duration = Integer.getInteger("duration", -1); // in mins + private Map<String,MapMessage> consumers; + private Map<String,MapMessage> producers; + + private CountDownLatch consRegistered; + private CountDownLatch prodRegistered; + private CountDownLatch consReady; + private CountDownLatch prodReady; + private CountDownLatch receivedEndMsg; + private CountDownLatch receivedConsStats; + private CountDownLatch receivedProdStats; + + private MessageConsumer consumer; + private boolean printStdDev = false; + FileWriter writer; + + public PerfTestController() + { + super(""); + consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount); + producers = new ConcurrentHashMap<String,MapMessage>(producerCount); + + consRegistered = new CountDownLatch(consumerCount); + prodRegistered = new CountDownLatch(producerCount); + consReady = new CountDownLatch(consumerCount); + prodReady = new CountDownLatch(producerCount); + printStdDev = params.isPrintStdDev(); + testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; + } + + public void setUp() throws Exception + { + super.setUp(); + if (testMode == TestMode.TIME_BASED) + { + writer = new FileWriter("stats-csv.log"); + } + consumer = controllerSession.createConsumer(controllerQueue); + System.out.println("\nController: " + producerCount + " producers are expected"); + System.out.println("Controller: " + consumerCount + " consumers are expected \n"); + consumer.setMessageListener(this); + consRegistered.await(); + prodRegistered.await(); + System.out.println("\nController: All producers and consumers have registered......\n"); + } + + public void warmup() throws Exception + { + System.out.println("Controller initiating warm up sequence......"); + sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); + sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); + prodReady.await(); + consReady.await(); + System.out.println("\nController : All producers and consumers are ready to start the test......\n"); + } + + public void startTest() throws Exception + { + resetCounters(); + System.out.println("\nController Starting test......"); + long start = Clock.getTime(); + sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); + receivedEndMsg.await(); + totalTestTime = Clock.getTime() - start; + sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values()); + receivedProdStats.await(); + receivedConsStats.await(); + } + + public void resetCounters() + { + minSystemLatency = Double.MAX_VALUE; + maxSystemLatency = 0; + maxSystemConsRate = 0.0; + minSystemConsRate = Double.MAX_VALUE; + maxSystemProdRate = 0.0; + minSystemProdRate = Double.MAX_VALUE; + + totalMsgCount = 0; + + receivedConsStats = new CountDownLatch(consumerCount); + receivedProdStats = new CountDownLatch(producerCount); + receivedEndMsg = new CountDownLatch(producerCount); + } + + public void calcStats() throws Exception + { + double totLatency = 0.0; + double totStdDev = 0.0; + double totalConsRate = 0.0; + double totalProdRate = 0.0; + + MapMessage conStat = null; // for error handling + try + { + for (MapMessage m: consumers.values()) + { + conStat = m; + minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY)); + maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY)); + totLatency = totLatency + m.getDouble(AVG_LATENCY); + totStdDev = totStdDev + m.getDouble(STD_DEV); + + minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE)); + maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE)); + totalConsRate = totalConsRate + m.getDouble(CONS_RATE); + + totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT); + } + } + catch(Exception e) + { + System.out.println("Error calculating stats from Consumer : " + conStat); + } + + + MapMessage prodStat = null; // for error handling + try + { + for (MapMessage m: producers.values()) + { + prodStat = m; + minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE)); + maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE)); + totalProdRate = totalProdRate + m.getDouble(PROD_RATE); + } + } + catch(Exception e) + { + System.out.println("Error calculating stats from Producer : " + conStat); + } + + avgSystemLatency = totLatency/consumers.size(); + avgSystemLatencyStdDev = totStdDev/consumers.size(); + avgSystemConsRate = totalConsRate/consumers.size(); + avgSystemProdRate = totalProdRate/producers.size(); + + System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); + + totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); + } + + public void printResults() throws Exception + { + System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); + System.out.println(new StringBuilder("System Throughput : "). + append(df.format(totalSystemThroughput)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Avg Consumer rate : "). + append(df.format(avgSystemConsRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Min Consumer rate : "). + append(df.format(minSystemConsRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Max Consumer rate : "). + append(df.format(maxSystemConsRate)). + append(" msg/sec").toString()); + + System.out.println(new StringBuilder("Avg Producer rate : "). + append(df.format(avgSystemProdRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Min Producer rate : "). + append(df.format(minSystemProdRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Max Producer rate : "). + append(df.format(maxSystemProdRate)). + append(" msg/sec").toString()); + + System.out.println(new StringBuilder("Avg System Latency : "). + append(df.format(avgSystemLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min System Latency : "). + append(df.format(minSystemLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Max System Latency : "). + append(df.format(maxSystemLatency)). + append(" ms").toString()); + if (printStdDev) + { + System.out.println(new StringBuilder("Avg System Std Dev : "). + append(avgSystemLatencyStdDev)); + } + } + + private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception + { + System.out.println("\nController: Sending code " + code); + MessageProducer tmpProd = controllerSession.createProducer(null); + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, code.ordinal()); + for (MapMessage node : nodes) + { + if (node.getString(REPLY_ADDR) == null) + { + System.out.println("REPLY_ADDR is null " + node); + } + else + { + System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); + } + tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); + } + } + + public void onMessage(Message msg) + { + try + { + MapMessage m = (MapMessage)msg; + OPCode code = OPCode.values()[m.getInt(CODE)]; + + System.out.println("\n---------Controller Received Code : " + code); + System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); + + switch (code) + { + case REGISTER_CONSUMER : + if (consRegistered.getCount() == 0) + { + System.out.println("Warning : Expected number of consumers have already registered," + + "ignoring extra consumer"); + break; + } + consumers.put(m.getString(ID),m); + consRegistered.countDown(); + break; + + case REGISTER_PRODUCER : + if (prodRegistered.getCount() == 0) + { + System.out.println("Warning : Expected number of producers have already registered," + + "ignoring extra producer"); + break; + } + producers.put(m.getString(ID),m); + prodRegistered.countDown(); + break; + + case CONSUMER_READY : + consReady.countDown(); + break; + + case PRODUCER_READY : + prodReady.countDown(); + break; + + case RECEIVED_END_MSG : + receivedEndMsg.countDown(); + break; + + case RECEIVED_CONSUMER_STATS : + consumers.put(m.getString(ID),m); + receivedConsStats.countDown(); + break; + + case RECEIVED_PRODUCER_STATS : + producers.put(m.getString(ID),m); + receivedProdStats.countDown(); + break; + + default: + throw new Exception("Invalid OPCode " + code); + } + } + catch (Exception e) + { + handleError(e,"Error when receiving messages " + msg); + } + } + + public void run() + { + try + { + setUp(); + warmup(); + if (testMode == TestMode.SINGLE_RUN) + { + startTest(); + calcStats(); + printResults(); + } + else + { + long startTime = Clock.getTime(); + long timeLimit = duration * 60 * 1000; // duration is in mins. + boolean nextIteration = true; + while (nextIteration) + { + startTest(); + calcStats(); + writeStatsToFile(); + if (Clock.getTime() - startTime < timeLimit) + { + sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); + sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); + nextIteration = true; + } + else + { + nextIteration = false; + } + } + } + tearDown(); + + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + @Override + public void tearDown() throws Exception { + System.out.println("Controller: Completed the test......\n"); + if (testMode == TestMode.TIME_BASED) + { + writer.close(); + } + sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); + sendMessageToNodes(OPCode.STOP_TEST,producers.values()); + super.tearDown(); + } + + public void writeStatsToFile() throws Exception + { + writer.append(String.valueOf(totalMsgCount)).append(","); + writer.append(df.format(totalSystemThroughput)).append(","); + writer.append(df.format(avgSystemConsRate)).append(","); + writer.append(df.format(minSystemConsRate)).append(","); + writer.append(df.format(maxSystemConsRate)).append(","); + writer.append(df.format(avgSystemProdRate)).append(","); + writer.append(df.format(minSystemProdRate)).append(","); + writer.append(df.format(maxSystemProdRate)).append(","); + writer.append(df.format(avgSystemLatency)).append(","); + writer.append(df.format(minSystemLatency)).append(","); + writer.append(df.format(maxSystemLatency)); + if (printStdDev) + { + writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); + } + writer.append("\n"); + writer.flush(); + } + + public static void main(String[] args) + { + PerfTestController controller = new PerfTestController(); + controller.run(); + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java index 89d6462a39..d73be0181b 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -25,25 +25,25 @@ import javax.jms.Session; public class TestParams { /* - * By default the connection URL is used. + * By default the connection URL is used. * This allows a user to easily specify a fully fledged URL any given property. * Ex. SSL parameters - * + * * By providing a host & port allows a user to simply override the URL. * This allows to create multiple clients in test scripts easily, - * without having to deal with the long URL format. + * without having to deal with the long URL format. */ private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; - + private String host = ""; - + private int port = -1; private String address = "queue; {create : always}"; private int msg_size = 1024; - private int msg_type = 1; // not used yet + private int random_msg_size_start_from = 1; private boolean cacheMessage = false; @@ -62,19 +62,28 @@ public class TestParams private int msg_count = 10; private int warmup_count = 1; - + private boolean random_msg_size = false; + private String msgType = "bytes"; + + private boolean printStdDev = false; + + private long rate = -1; + + private boolean externalController = false; + + private boolean useUniqueDest = false; // useful when using multiple connections. + public TestParams() { - + url = System.getProperty("url",url); host = System.getProperty("host",""); port = Integer.getInteger("port", -1); - address = System.getProperty("address","queue"); + address = System.getProperty("address",address); msg_size = Integer.getInteger("msg_size", 1024); - msg_type = Integer.getInteger("msg_type",1); cacheMessage = Boolean.getBoolean("cache_msg"); disableMessageID = Boolean.getBoolean("disableMessageID"); disableTimestamp = Boolean.getBoolean("disableTimestamp"); @@ -85,6 +94,12 @@ public class TestParams msg_count = Integer.getInteger("msg_count",msg_count); warmup_count = Integer.getInteger("warmup_count",warmup_count); random_msg_size = Boolean.getBoolean("random_msg_size"); + msgType = System.getProperty("msg_type","bytes"); + printStdDev = Boolean.getBoolean("print_std_dev"); + rate = Long.getLong("rate",-1); + externalController = Boolean.getBoolean("ext_controller"); + useUniqueDest = Boolean.getBoolean("use_unique_dest"); + random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1); } public String getUrl() @@ -122,9 +137,9 @@ public class TestParams return msg_size; } - public int getMsgType() + public int getRandomMsgSizeStartFrom() { - return msg_type; + return random_msg_size_start_from; } public boolean isDurable() @@ -161,10 +176,39 @@ public class TestParams { return disableTimestamp; } - + public boolean isRandomMsgSize() { return random_msg_size; } + public String getMessageType() + { + return msgType; + } + + public boolean isPrintStdDev() + { + return printStdDev; + } + + public long getRate() + { + return rate; + } + + public boolean isExternalController() + { + return externalController; + } + + public void setAddress(String addr) + { + address = addr; + } + + public boolean isUseUniqueDests() + { + return useUniqueDest; + } } |