diff options
| author | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
|---|---|---|
| committer | Stephen D. Huston <shuston@apache.org> | 2011-10-21 01:19:00 +0000 |
| commit | ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5 (patch) | |
| tree | dcfb94e75656c6c239fc3dcb754cd2015126424d /java/tools/src | |
| parent | 5eb354b338bb8d8fcd35b6ac3fb33f8103e757c3 (diff) | |
| download | qpid-python-ebfd9ff053b04ab379acfc0fefedee5a31b6d8a5.tar.gz | |
Undo bad merge from trunk - merged at wrong level.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-2519@1187150 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/tools/src')
7 files changed, 191 insertions, 1051 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java deleted file mode 100644 index 37369959a8..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java +++ /dev/null @@ -1,92 +0,0 @@ -package org.apache.qpid.tools; - -/** - * In the future this will be replaced by a Clock abstraction - * that can utilize a realtime clock when running in RT Java. - */ - -public class Clock -{ - private static Precision precision; - private static long offset = -1; // in nano secs - - public enum Precision - { - NANO_SECS, MILI_SECS; - - static Precision getPrecision(String str) - { - if ("mili".equalsIgnoreCase(str)) - { - return MILI_SECS; - } - else - { - return NANO_SECS; - } - } - }; - - static - { - precision = Precision.getPrecision(System.getProperty("precision","mili")); - //offset = Long.getLong("offset",-1); - - System.out.println("Using precision : " + precision + " and offset " + offset); - } - - public static Precision getPrecision() - { - return precision; - } - - public static long getTime() - { - if (precision == Precision.NANO_SECS) - { - if (offset == -1) - { - return System.nanoTime(); - } - else - { - return System.nanoTime() + offset; - } - } - else - { - if (offset == -1) - { - return System.currentTimeMillis(); - } - else - { - return System.currentTimeMillis() + offset/convertToMiliSecs(); - } - } - } - - public static long convertToSecs() - { - if (precision == Precision.NANO_SECS) - { - return 1000000000; - } - else - { - return 1000; - } - } - - public static long convertToMiliSecs() - { - if (precision == Precision.NANO_SECS) - { - return 1000000; - } - else - { - return 1; - } - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java index 90ee7e28ae..b88b242e6d 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java @@ -77,7 +77,7 @@ public class LatencyTest extends PerfBase implements MessageListener public LatencyTest() { - super(""); + super(); warmedUp = lock.newCondition(); testCompleted = lock.newCondition(); // Storing the following two for efficiency @@ -314,7 +314,7 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - final LatencyTest latencyTest = new LatencyTest(); + final LatencyTest latencyTest = new LatencyTest(); Runnable r = new Runnable() { public void run() @@ -334,16 +334,16 @@ public class LatencyTest extends PerfBase implements MessageListener } } }; - + Thread t; try { - t = Threading.getThreadFactory().createThread(r); + t = Threading.getThreadFactory().createThread(r); } catch(Exception e) { throw new Error("Error creating latency test thread",e); } - t.start(); + t.start(); } -} +}
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java index 121e94cea1..ac597d17de 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -20,113 +20,36 @@ */ package org.apache.qpid.tools; -import java.net.InetAddress; import java.text.DecimalFormat; -import java.util.UUID; +import java.util.Hashtable; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Destination; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.messaging.Address; public class PerfBase { - public final static String CODE = "CODE"; - public final static String ID = "ID"; - public final static String REPLY_ADDR = "REPLY_ADDR"; - public final static String MAX_LATENCY = "MAX_LATENCY"; - public final static String MIN_LATENCY = "MIN_LATENCY"; - public final static String AVG_LATENCY = "AVG_LATENCY"; - public final static String STD_DEV = "STD_DEV"; - public final static String CONS_RATE = "CONS_RATE"; - public final static String PROD_RATE = "PROD_RATE"; - public final static String MSG_COUNT = "MSG_COUNT"; - public final static String TIMESTAMP = "Timestamp"; - - String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); - TestParams params; Connection con; Session session; - Session controllerSession; Destination dest; - Destination myControlQueue; - Destination controllerQueue; + Destination feedbackDest; DecimalFormat df = new DecimalFormat("###.##"); - String id; - String myControlQueueAddr; - - MessageProducer sendToController; - MessageConsumer receiveFromController; - String prefix = ""; - enum OPCode { - REGISTER_CONSUMER, REGISTER_PRODUCER, - PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, - CONSUMER_READY, PRODUCER_READY, - PRODUCER_START, - RECEIVED_END_MSG, CONSUMER_STOP, - RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS, - CONTINUE_TEST, STOP_TEST - }; - - enum MessageType { - BYTES, TEXT, MAP, OBJECT; - - public static MessageType getType(String s) throws Exception - { - if ("text".equalsIgnoreCase(s)) - { - return TEXT; - } - else if ("bytes".equalsIgnoreCase(s)) - { - return BYTES; - } - /*else if ("map".equalsIgnoreCase(s)) - { - return MAP; - } - else if ("object".equalsIgnoreCase(s)) - { - return OBJECT; - }*/ - else - { - throw new Exception("Unsupported message type"); - } - } - }; - - MessageType msgType = MessageType.BYTES; - - public PerfBase(String prefix) + public PerfBase() { params = new TestParams(); - String host = ""; - try - { - host = InetAddress.getLocalHost().getHostName(); - } - catch (Exception e) - { - } - id = host + "-" + UUID.randomUUID().toString(); - this.prefix = prefix; - this.myControlQueueAddr = id + ";{create: always}"; } public void setUp() throws Exception - { + { + if (params.getHost().equals("") || params.getPort() == -1) { con = new AMQConnection(params.getUrl()); @@ -139,78 +62,7 @@ public class PerfBase session = con.createSession(params.isTransacted(), params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); - controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - - dest = createDestination(); - controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); - myControlQueue = session.createQueue(myControlQueueAddr); - msgType = MessageType.getType(params.getMessageType()); - System.out.println("Using " + msgType + " messages"); - - sendToController = controllerSession.createProducer(controllerQueue); - receiveFromController = controllerSession.createConsumer(myControlQueue); - } - - private Destination createDestination() throws Exception - { - if (params.isUseUniqueDests()) - { - System.out.println("Prefix : " + prefix); - Address addr = Address.parse(params.getAddress()); - AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); - int type = ((AMQSession_0_10)session).resolveAddressType(temp); - - if ( type == AMQDestination.TOPIC_TYPE) - { - addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); - System.out.println("Setting subject : " + addr); - } - else - { - addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions()); - System.out.println("Setting name : " + addr); - } - - return new AMQAnyDestination(addr); - } - else - { - return new AMQAnyDestination(params.getAddress()); - } - } - - public synchronized void sendMessageToController(MapMessage m) throws Exception - { - m.setString(ID, id); - m.setString(REPLY_ADDR,myControlQueueAddr); - sendToController.send(m); - } - - public void receiveFromController(OPCode expected) throws Exception - { - MapMessage m = (MapMessage)receiveFromController.receive(); - OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); - if (expected != code) - { - throw new Exception("Expected OPCode : " + expected + " but received : " + code); - } - - } - - public boolean continueTest() throws Exception - { - MapMessage m = (MapMessage)receiveFromController.receive(); - OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); - return (code == OPCode.CONTINUE_TEST); - } - - public void tearDown() throws Exception - { - session.close(); - controllerSession.close(); - con.close(); + dest = new AMQAnyDestination(params.getAddress()); } public void handleError(Exception e,String msg) diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java index b63892bb51..0ef0455a64 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -20,17 +20,13 @@ */ package org.apache.qpid.tools; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.TextMessage; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -51,7 +47,7 @@ import org.apache.qpid.thread.Threading; * b) They are on separate machines that have their time synced via a Time Server * * In order to calculate latency the producer inserts a timestamp - * when the message is sent. The consumer will note the current time the message is + * hen the message is sent. The consumer will note the current time the message is * received and will calculate the latency as follows * latency = rcvdTime - msg.getJMSTimestamp() * @@ -59,9 +55,13 @@ import org.apache.qpid.thread.Threading; * variance in latencies. * * Avg latency is measured by adding all latencies and dividing by the total msgs. + * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount * * Throughput * =========== + * System throughput is calculated as follows + * rcvdMsgCount/(rcvdTime - testStartTime) + * * Consumer rate is calculated as * rcvdMsgCount/(rcvdTime - startTime) * @@ -81,160 +81,130 @@ public class PerfConsumer extends PerfBase implements MessageListener long minLatency = Long.MAX_VALUE; long totalLatency = 0; // to calculate avg latency. int rcvdMsgCount = 0; + long testStartTime = 0; // to measure system throughput long startTime = 0; // to measure consumer throughput long rcvdTime = 0; boolean transacted = false; int transSize = 0; - boolean printStdDev = false; - List<Long> sample; - final Object lock = new Object(); - public PerfConsumer(String prefix) + public PerfConsumer() { - super(prefix); - System.out.println("Consumer ID : " + id); + super(); } public void setUp() throws Exception { super.setUp(); consumer = session.createConsumer(dest); - System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); // Storing the following two for efficiency transacted = params.isTransacted(); transSize = params.getTransactionSize(); - printStdDev = params.isPrintStdDev(); - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); - sendMessageToController(m); } public void warmup()throws Exception { - receiveFromController(OPCode.CONSUMER_STARTWARMUP); - Message msg = consumer.receive(); - // This is to ensure we drain the queue before we start the actual test. - while ( msg != null) + System.out.println("Warming up......"); + + boolean start = false; + while (!start) { - if (msg.getBooleanProperty("End") == true) + Message msg = consumer.receive(); + if (msg instanceof TextMessage) { - // It's more realistic for the consumer to signal this. - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); - sendMessageToController(m); + if (((TextMessage)msg).getText().equals("End")) + { + start = true; + MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); + temp.send(session.createMessage()); + if (params.isTransacted()) + { + session.commit(); + } + temp.close(); + } } - msg = consumer.receive(1000); - } - - if (params.isTransacted()) - { - session.commit(); } - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); - sendMessageToController(m); - consumer.setMessageListener(this); } public void startTest() throws Exception { - System.out.println("Consumer: " + id + " Starting test......" + "\n"); - resetCounters(); + System.out.println("Starting test......"); + consumer.setMessageListener(this); } - public void resetCounters() + public void printResults() throws Exception { - rcvdMsgCount = 0; - maxLatency = 0; - minLatency = Long.MAX_VALUE; - totalLatency = 0; - if (printStdDev) + synchronized (lock) { - sample = null; - sample = new ArrayList<Long>(params.getMsgCount()); + lock.wait(); } - } - - public void sendResults() throws Exception - { - receiveFromController(OPCode.CONSUMER_STOP); double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime); - double stdDev = 0.0; - if (printStdDev) - { - stdDev = calculateStdDev(avgLatency); - } - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); - m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs()); - m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs()); - m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs()); - m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs()); - m.setDouble(CONS_RATE, consRate); - m.setLong(MSG_COUNT, rcvdMsgCount); - sendMessageToController(m); - + double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; + double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); System.out.println(new StringBuilder("Consumer rate : "). append(df.format(consRate)). append(" msg/sec").toString()); + System.out.println(new StringBuilder("System Throughput : "). + append(df.format(throughput)). + append(" msg/sec").toString()); System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency/Clock.convertToMiliSecs())). + append(df.format(avgLatency)). append(" ms").toString()); System.out.println(new StringBuilder("Min Latency : "). - append(df.format(minLatency/Clock.convertToMiliSecs())). + append(minLatency). append(" ms").toString()); System.out.println(new StringBuilder("Max Latency : "). - append(df.format(maxLatency/Clock.convertToMiliSecs())). + append(maxLatency). append(" ms").toString()); - if (printStdDev) - { - System.out.println(new StringBuilder("Std Dev : "). - append(stdDev/Clock.convertToMiliSecs()).toString()); - } + System.out.println("Completed the test......\n"); } - public double calculateStdDev(double mean) + public void notifyCompletion(Destination replyTo) throws Exception { - double v = 0; - for (double latency: sample) + MessageProducer tmp = session.createProducer(replyTo); + Message endMsg = session.createMessage(); + tmp.send(endMsg); + if (params.isTransacted()) { - v = v + Math.pow((latency-mean), 2); + session.commit(); } - v = v/sample.size(); - return Math.round(Math.sqrt(v)); + tmp.close(); + } + + public void tearDown() throws Exception + { + consumer.close(); + session.close(); + con.close(); } public void onMessage(Message msg) { try { - // To figure out the decoding overhead of text - if (msgType == MessageType.TEXT) + if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) { - ((TextMessage)msg).getText(); - } + notifyCompletion(msg.getJMSReplyTo()); - if (msg.getBooleanProperty("End")) - { - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); - sendMessageToController(m); + synchronized (lock) + { + lock.notifyAll(); + } } else { - rcvdTime = Clock.getTime(); + rcvdTime = System.currentTimeMillis(); rcvdMsgCount ++; if (rcvdMsgCount == 1) { startTime = rcvdTime; + testStartTime = msg.getJMSTimestamp(); } if (transacted && (rcvdMsgCount % transSize == 0)) @@ -242,14 +212,10 @@ public class PerfConsumer extends PerfBase implements MessageListener session.commit(); } - long latency = rcvdTime - msg.getLongProperty(TIMESTAMP); + long latency = rcvdTime - msg.getJMSTimestamp(); maxLatency = Math.max(maxLatency, latency); minLatency = Math.min(minLatency, latency); totalLatency = totalLatency + latency; - if (printStdDev) - { - sample.add(latency); - } } } @@ -260,21 +226,14 @@ public class PerfConsumer extends PerfBase implements MessageListener } - public void run() + public void test() { try { setUp(); warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Consumer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } + startTest(); + printResults(); tearDown(); } catch(Exception e) @@ -283,43 +242,26 @@ public class PerfConsumer extends PerfBase implements MessageListener } } - @Override - public void tearDown() throws Exception - { - super.tearDown(); - } - - public static void main(String[] args) throws InterruptedException + public static void main(String[] args) { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) + final PerfConsumer cons = new PerfConsumer(); + Runnable r = new Runnable() { - - final PerfConsumer cons = new PerfConsumer(scriptId + i); - Runnable r = new Runnable() - { - public void run() - { - cons.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) + public void run() { - throw new Error("Error creating consumer thread",e); + cons.test(); } - t.start(); - + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); } - testCompleted.await(); - System.out.println("Consumers have completed the test......\n"); + t.start(); } }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java index ac6129ab68..015d1e6205 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -23,15 +23,13 @@ package org.apache.qpid.tools; import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.concurrent.CountDownLatch; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; -import javax.jms.MapMessage; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageProducer; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -53,52 +51,38 @@ import org.apache.qpid.thread.Threading; * System throughput and latencies calculated by the PerfConsumer are more realistic * numbers. * - * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs - * I have done so far, it seems quite useful to compute the producer rate as it gives an - * indication of how the system behaves. For ex if there is a gap between producer and consumer rates - * you could clearly see the higher latencies and when producer and consumer rates are very close, - * latency is good. - * */ public class PerfProducer extends PerfBase { - private static long SEC = 60000; - MessageProducer producer; Message msg; - Object payload; - List<Object> payloads; + byte[] payload; + List<byte[]> payloads; boolean cacheMsg = false; boolean randomMsgSize = false; boolean durable = false; Random random; int msgSizeRange = 1024; - boolean rateLimitProducer = false; - double rateFactor = 0.4; - double rate = 0.0; - - public PerfProducer(String prefix) + + public PerfProducer() { - super(prefix); - System.out.println("Producer ID : " + id); + super(); } public void setUp() throws Exception { super.setUp(); - durable = params.isDurable(); - rateLimitProducer = params.getRate() > 0 ? true : false; - if (rateLimitProducer) - { - System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec"); - } + feedbackDest = session.createTemporaryQueue(); + durable = params.isDurable(); + // if message caching is enabled we pre create the message // else we pre create the payload if (params.isCacheMessage()) { cacheMsg = true; - msg = createMessage(createPayload(params.getMsgSize())); + + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); msg.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT @@ -109,52 +93,21 @@ public class PerfProducer extends PerfBase random = new Random(20080921); randomMsgSize = true; msgSizeRange = params.getMsgSize(); - payloads = new ArrayList<Object>(msgSizeRange); - + payloads = new ArrayList<byte[]>(msgSizeRange); + for (int i=0; i < msgSizeRange; i++) { - payloads.add(createPayload(i)); + payloads.add(MessageFactory.createMessagePayload(i).getBytes()); } - } + } else { - payload = createPayload(params.getMsgSize()); + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); } producer = session.createProducer(dest); - System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); producer.setDisableMessageID(params.isDisableMessageID()); producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); - sendMessageToController(m); - } - - Object createPayload(int size) - { - if (msgType == MessageType.TEXT) - { - return MessageFactory.createMessagePayload(size); - } - else - { - return MessageFactory.createMessagePayload(size).getBytes(); - } - } - - Message createMessage(Object payload) throws Exception - { - if (msgType == MessageType.TEXT) - { - return session.createTextMessage((String)payload); - } - else - { - BytesMessage m = session.createBytesMessage(); - m.writeBytes((byte[])payload); - return m; - } } protected Message getNextMessage() throws Exception @@ -164,130 +117,117 @@ public class PerfProducer extends PerfBase return msg; } else - { - Message m; - + { + msg = session.createBytesMessage(); + if (!randomMsgSize) { - m = createMessage(payload); + ((BytesMessage)msg).writeBytes(payload); } else { - m = createMessage(payloads.get(random.nextInt(msgSizeRange))); + ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); } - m.setJMSDeliveryMode(durable? + msg.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT ); - return m; + return msg; } } public void warmup()throws Exception { - receiveFromController(OPCode.PRODUCER_STARTWARMUP); - System.out.println("Producer: " + id + " Warming up......"); + System.out.println("Warming up......"); + MessageConsumer tmp = session.createConsumer(feedbackDest); for (int i=0; i < params.getWarmupCount() -1; i++) { producer.send(getNextMessage()); } - sendEndMessage(); + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); if (params.isTransacted()) { session.commit(); } + + tmp.close(); } public void startTest() throws Exception { - resetCounters(); - receiveFromController(OPCode.PRODUCER_START); + System.out.println("Starting test......"); int count = params.getMsgCount(); boolean transacted = params.isTransacted(); int tranSize = params.getTransactionSize(); - long limit = (long)(params.getRate() * rateFactor); // in msecs - long timeLimit = (long)(SEC * rateFactor); // in msecs - - long start = Clock.getTime(); // defaults to nano secs - long interval = start; + long start = System.currentTimeMillis(); for(int i=0; i < count; i++ ) { Message msg = getNextMessage(); - msg.setLongProperty(TIMESTAMP, Clock.getTime()); + msg.setJMSTimestamp(System.currentTimeMillis()); producer.send(msg); if ( transacted && ((i+1) % tranSize == 0)) { session.commit(); } - - if (rateLimitProducer && i%limit == 0) - { - long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs - if (elapsed < timeLimit) - { - Thread.sleep(elapsed); - } - interval = Clock.getTime(); - - } - } - sendEndMessage(); - if ( transacted) - { - session.commit(); } - long time = Clock.getTime() - start; - rate = (double)count*Clock.convertToSecs()/(double)time; + long time = System.currentTimeMillis() - start; + double rate = ((double)count/(double)time)*1000; System.out.println(new StringBuilder("Producer rate: "). append(df.format(rate)). append(" msg/sec"). toString()); } - public void resetCounters() + public void waitForCompletion() throws Exception { + MessageConsumer tmp = session.createConsumer(feedbackDest); + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); - } + if (params.isTransacted()) + { + session.commit(); + } - public void sendEndMessage() throws Exception - { - Message msg = session.createMessage(); - msg.setBooleanProperty("End", true); - producer.send(msg); - } + tmp.receive(); - public void sendResults() throws Exception - { - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); - msg.setDouble(PROD_RATE, rate); - sendMessageToController(msg); + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + System.out.println("Consumer has completed the test......"); } - @Override public void tearDown() throws Exception { - super.tearDown(); + producer.close(); + session.close(); + con.close(); } - public void run() + public void test() { try { setUp(); warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Producer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } + startTest(); + waitForCompletion(); tearDown(); } catch(Exception e) @@ -296,63 +236,27 @@ public class PerfProducer extends PerfBase } } - public void startControllerIfNeeded() + + public static void main(String[] args) { - if (!params.isExternalController()) + final PerfProducer prod = new PerfProducer(); + Runnable r = new Runnable() { - final PerfTestController controller = new PerfTestController(); - Runnable r = new Runnable() - { - public void run() - { - controller.run(); - } - }; - - Thread t; - try + public void run() { - t = Threading.getThreadFactory().createThread(r); + prod.test(); } - catch(Exception e) - { - throw new Error("Error creating controller thread",e); - } - t.start(); + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); } - } - - - public static void main(String[] args) throws InterruptedException - { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) + catch(Exception e) { - final PerfProducer prod = new PerfProducer(scriptId + i); - prod.startControllerIfNeeded(); - Runnable r = new Runnable() - { - public void run() - { - prod.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); + throw new Error("Error creating producer thread",e); } - testCompleted.await(); - System.out.println("Producers have completed the test......"); + t.start(); } }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java deleted file mode 100644 index 5c98c645f4..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java +++ /dev/null @@ -1,422 +0,0 @@ -package org.apache.qpid.tools; - -import java.io.FileWriter; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; - -import org.apache.qpid.client.message.AMQPEncodedMapMessage; - -/** - * The Controller coordinates a test run between a number - * of producers and consumers, configured via -Dprod_count and -Dcons_count. - * - * It waits till all the producers and consumers have registered and then - * conducts a warmup run. Once all consumers and producers have completed - * the warmup run and is ready, it will conduct the actual test run and - * collect all stats from the participants and calculates the system - * throughput, the avg/min/max for producer rates, consumer rates and latency. - * - * These stats are then printed to std out. - * The Controller also prints events to std out to give a running account - * of the test run in progress. Ex registering of participants, starting warmup ..etc. - * This allows a scripting tool to monitor the progress. - * - * The Controller can be run in two modes. - * 1. A single test run (default) where it just runs until the message count specified - * for the producers via -Dmsg_count is sent and received. - * - * 2. Time based, configured via -Dduration=x, where x is in mins. - * In this mode, the Controller repeatedly cycles through the tests (after an initial - * warmup run) until the desired time is reached. If a test run is in progress - * and the time is up, it will allow the run the complete. - * - * After each iteration, the stats will be printed out in csv format to a separate log file. - * System throughput is calculated as follows - * totalMsgCount/(totalTestTime) - */ -public class PerfTestController extends PerfBase implements MessageListener -{ - enum TestMode { SINGLE_RUN, TIME_BASED }; - - TestMode testMode = TestMode.SINGLE_RUN; - - long totalTestTime; - - private double avgSystemLatency = 0.0; - private double minSystemLatency = Double.MAX_VALUE; - private double maxSystemLatency = 0; - private double avgSystemLatencyStdDev = 0.0; - - private double avgSystemConsRate = 0.0; - private double maxSystemConsRate = 0.0; - private double minSystemConsRate = Double.MAX_VALUE; - - private double avgSystemProdRate = 0.0; - private double maxSystemProdRate = 0.0; - private double minSystemProdRate = Double.MAX_VALUE; - - private long totalMsgCount = 0; - private double totalSystemThroughput = 0.0; - - private int consumerCount = Integer.getInteger("cons_count", 1); - private int producerCount = Integer.getInteger("prod_count", 1); - private int duration = Integer.getInteger("duration", -1); // in mins - private Map<String,MapMessage> consumers; - private Map<String,MapMessage> producers; - - private CountDownLatch consRegistered; - private CountDownLatch prodRegistered; - private CountDownLatch consReady; - private CountDownLatch prodReady; - private CountDownLatch receivedEndMsg; - private CountDownLatch receivedConsStats; - private CountDownLatch receivedProdStats; - - private MessageConsumer consumer; - private boolean printStdDev = false; - FileWriter writer; - - public PerfTestController() - { - super(""); - consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount); - producers = new ConcurrentHashMap<String,MapMessage>(producerCount); - - consRegistered = new CountDownLatch(consumerCount); - prodRegistered = new CountDownLatch(producerCount); - consReady = new CountDownLatch(consumerCount); - prodReady = new CountDownLatch(producerCount); - printStdDev = params.isPrintStdDev(); - testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; - } - - public void setUp() throws Exception - { - super.setUp(); - if (testMode == TestMode.TIME_BASED) - { - writer = new FileWriter("stats-csv.log"); - } - consumer = controllerSession.createConsumer(controllerQueue); - System.out.println("\nController: " + producerCount + " producers are expected"); - System.out.println("Controller: " + consumerCount + " consumers are expected \n"); - consumer.setMessageListener(this); - consRegistered.await(); - prodRegistered.await(); - System.out.println("\nController: All producers and consumers have registered......\n"); - } - - public void warmup() throws Exception - { - System.out.println("Controller initiating warm up sequence......"); - sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); - sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); - prodReady.await(); - consReady.await(); - System.out.println("\nController : All producers and consumers are ready to start the test......\n"); - } - - public void startTest() throws Exception - { - resetCounters(); - System.out.println("\nController Starting test......"); - long start = Clock.getTime(); - sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); - receivedEndMsg.await(); - totalTestTime = Clock.getTime() - start; - sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values()); - receivedProdStats.await(); - receivedConsStats.await(); - } - - public void resetCounters() - { - minSystemLatency = Double.MAX_VALUE; - maxSystemLatency = 0; - maxSystemConsRate = 0.0; - minSystemConsRate = Double.MAX_VALUE; - maxSystemProdRate = 0.0; - minSystemProdRate = Double.MAX_VALUE; - - totalMsgCount = 0; - - receivedConsStats = new CountDownLatch(consumerCount); - receivedProdStats = new CountDownLatch(producerCount); - receivedEndMsg = new CountDownLatch(producerCount); - } - - public void calcStats() throws Exception - { - double totLatency = 0.0; - double totStdDev = 0.0; - double totalConsRate = 0.0; - double totalProdRate = 0.0; - - MapMessage conStat = null; // for error handling - try - { - for (MapMessage m: consumers.values()) - { - conStat = m; - minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY)); - maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY)); - totLatency = totLatency + m.getDouble(AVG_LATENCY); - totStdDev = totStdDev + m.getDouble(STD_DEV); - - minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE)); - maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE)); - totalConsRate = totalConsRate + m.getDouble(CONS_RATE); - - totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT); - } - } - catch(Exception e) - { - System.out.println("Error calculating stats from Consumer : " + conStat); - } - - - MapMessage prodStat = null; // for error handling - try - { - for (MapMessage m: producers.values()) - { - prodStat = m; - minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE)); - maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE)); - totalProdRate = totalProdRate + m.getDouble(PROD_RATE); - } - } - catch(Exception e) - { - System.out.println("Error calculating stats from Producer : " + conStat); - } - - avgSystemLatency = totLatency/consumers.size(); - avgSystemLatencyStdDev = totStdDev/consumers.size(); - avgSystemConsRate = totalConsRate/consumers.size(); - avgSystemProdRate = totalProdRate/producers.size(); - - System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); - - totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); - } - - public void printResults() throws Exception - { - System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(totalSystemThroughput)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Consumer rate : "). - append(df.format(avgSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Consumer rate : "). - append(df.format(minSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Consumer rate : "). - append(df.format(maxSystemConsRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg Producer rate : "). - append(df.format(avgSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Producer rate : "). - append(df.format(minSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Producer rate : "). - append(df.format(maxSystemProdRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg System Latency : "). - append(df.format(avgSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min System Latency : "). - append(df.format(minSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Max System Latency : "). - append(df.format(maxSystemLatency)). - append(" ms").toString()); - if (printStdDev) - { - System.out.println(new StringBuilder("Avg System Std Dev : "). - append(avgSystemLatencyStdDev)); - } - } - - private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception - { - System.out.println("\nController: Sending code " + code); - MessageProducer tmpProd = controllerSession.createProducer(null); - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, code.ordinal()); - for (MapMessage node : nodes) - { - if (node.getString(REPLY_ADDR) == null) - { - System.out.println("REPLY_ADDR is null " + node); - } - else - { - System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); - } - tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); - } - } - - public void onMessage(Message msg) - { - try - { - MapMessage m = (MapMessage)msg; - OPCode code = OPCode.values()[m.getInt(CODE)]; - - System.out.println("\n---------Controller Received Code : " + code); - System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); - - switch (code) - { - case REGISTER_CONSUMER : - if (consRegistered.getCount() == 0) - { - System.out.println("Warning : Expected number of consumers have already registered," + - "ignoring extra consumer"); - break; - } - consumers.put(m.getString(ID),m); - consRegistered.countDown(); - break; - - case REGISTER_PRODUCER : - if (prodRegistered.getCount() == 0) - { - System.out.println("Warning : Expected number of producers have already registered," + - "ignoring extra producer"); - break; - } - producers.put(m.getString(ID),m); - prodRegistered.countDown(); - break; - - case CONSUMER_READY : - consReady.countDown(); - break; - - case PRODUCER_READY : - prodReady.countDown(); - break; - - case RECEIVED_END_MSG : - receivedEndMsg.countDown(); - break; - - case RECEIVED_CONSUMER_STATS : - consumers.put(m.getString(ID),m); - receivedConsStats.countDown(); - break; - - case RECEIVED_PRODUCER_STATS : - producers.put(m.getString(ID),m); - receivedProdStats.countDown(); - break; - - default: - throw new Exception("Invalid OPCode " + code); - } - } - catch (Exception e) - { - handleError(e,"Error when receiving messages " + msg); - } - } - - public void run() - { - try - { - setUp(); - warmup(); - if (testMode == TestMode.SINGLE_RUN) - { - startTest(); - calcStats(); - printResults(); - } - else - { - long startTime = Clock.getTime(); - long timeLimit = duration * 60 * 1000; // duration is in mins. - boolean nextIteration = true; - while (nextIteration) - { - startTest(); - calcStats(); - writeStatsToFile(); - if (Clock.getTime() - startTime < timeLimit) - { - sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); - sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); - nextIteration = true; - } - else - { - nextIteration = false; - } - } - } - tearDown(); - - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - @Override - public void tearDown() throws Exception { - System.out.println("Controller: Completed the test......\n"); - if (testMode == TestMode.TIME_BASED) - { - writer.close(); - } - sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); - sendMessageToNodes(OPCode.STOP_TEST,producers.values()); - super.tearDown(); - } - - public void writeStatsToFile() throws Exception - { - writer.append(String.valueOf(totalMsgCount)).append(","); - writer.append(df.format(totalSystemThroughput)).append(","); - writer.append(df.format(avgSystemConsRate)).append(","); - writer.append(df.format(minSystemConsRate)).append(","); - writer.append(df.format(maxSystemConsRate)).append(","); - writer.append(df.format(avgSystemProdRate)).append(","); - writer.append(df.format(minSystemProdRate)).append(","); - writer.append(df.format(maxSystemProdRate)).append(","); - writer.append(df.format(avgSystemLatency)).append(","); - writer.append(df.format(minSystemLatency)).append(","); - writer.append(df.format(maxSystemLatency)); - if (printStdDev) - { - writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); - } - writer.append("\n"); - writer.flush(); - } - - public static void main(String[] args) - { - PerfTestController controller = new PerfTestController(); - controller.run(); - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java index d73be0181b..89d6462a39 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -25,25 +25,25 @@ import javax.jms.Session; public class TestParams { /* - * By default the connection URL is used. + * By default the connection URL is used. * This allows a user to easily specify a fully fledged URL any given property. * Ex. SSL parameters - * + * * By providing a host & port allows a user to simply override the URL. * This allows to create multiple clients in test scripts easily, - * without having to deal with the long URL format. + * without having to deal with the long URL format. */ private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; - + private String host = ""; - + private int port = -1; private String address = "queue; {create : always}"; private int msg_size = 1024; - private int random_msg_size_start_from = 1; + private int msg_type = 1; // not used yet private boolean cacheMessage = false; @@ -62,28 +62,19 @@ public class TestParams private int msg_count = 10; private int warmup_count = 1; - + private boolean random_msg_size = false; - private String msgType = "bytes"; - - private boolean printStdDev = false; - - private long rate = -1; - - private boolean externalController = false; - - private boolean useUniqueDest = false; // useful when using multiple connections. - public TestParams() { - + url = System.getProperty("url",url); host = System.getProperty("host",""); port = Integer.getInteger("port", -1); - address = System.getProperty("address",address); + address = System.getProperty("address","queue"); msg_size = Integer.getInteger("msg_size", 1024); + msg_type = Integer.getInteger("msg_type",1); cacheMessage = Boolean.getBoolean("cache_msg"); disableMessageID = Boolean.getBoolean("disableMessageID"); disableTimestamp = Boolean.getBoolean("disableTimestamp"); @@ -94,12 +85,6 @@ public class TestParams msg_count = Integer.getInteger("msg_count",msg_count); warmup_count = Integer.getInteger("warmup_count",warmup_count); random_msg_size = Boolean.getBoolean("random_msg_size"); - msgType = System.getProperty("msg_type","bytes"); - printStdDev = Boolean.getBoolean("print_std_dev"); - rate = Long.getLong("rate",-1); - externalController = Boolean.getBoolean("ext_controller"); - useUniqueDest = Boolean.getBoolean("use_unique_dest"); - random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1); } public String getUrl() @@ -137,9 +122,9 @@ public class TestParams return msg_size; } - public int getRandomMsgSizeStartFrom() + public int getMsgType() { - return random_msg_size_start_from; + return msg_type; } public boolean isDurable() @@ -176,39 +161,10 @@ public class TestParams { return disableTimestamp; } - + public boolean isRandomMsgSize() { return random_msg_size; } - public String getMessageType() - { - return msgType; - } - - public boolean isPrintStdDev() - { - return printStdDev; - } - - public long getRate() - { - return rate; - } - - public boolean isExternalController() - { - return externalController; - } - - public void setAddress(String addr) - { - address = addr; - } - - public boolean isUseUniqueDests() - { - return useUniqueDest; - } } |
