diff options
Diffstat (limited to 'qpid/java/tools')
4 files changed, 187 insertions, 71 deletions
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java index b88b242e6d..90ee7e28ae 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java @@ -77,7 +77,7 @@ public class LatencyTest extends PerfBase implements MessageListener public LatencyTest() { - super(); + super(""); warmedUp = lock.newCondition(); testCompleted = lock.newCondition(); // Storing the following two for efficiency @@ -314,7 +314,7 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - final LatencyTest latencyTest = new LatencyTest(); + final LatencyTest latencyTest = new LatencyTest(); Runnable r = new Runnable() { public void run() @@ -334,16 +334,16 @@ public class LatencyTest extends PerfBase implements MessageListener } } }; - + Thread t; try { - t = Threading.getThreadFactory().createThread(r); + t = Threading.getThreadFactory().createThread(r); } catch(Exception e) { throw new Error("Error creating latency test thread",e); } - t.start(); + t.start(); } -}
\ No newline at end of file +} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java index 340f11f5e4..121e94cea1 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.tools; +import java.net.InetAddress; import java.text.DecimalFormat; import java.util.UUID; @@ -32,6 +33,10 @@ import javax.jms.Session; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; public class PerfBase { @@ -57,11 +62,12 @@ public class PerfBase Destination myControlQueue; Destination controllerQueue; DecimalFormat df = new DecimalFormat("###.##"); - String id = UUID.randomUUID().toString(); - String myControlQueueAddr = id + ";{create: always}"; + String id; + String myControlQueueAddr; MessageProducer sendToController; MessageConsumer receiveFromController; + String prefix = ""; enum OPCode { REGISTER_CONSUMER, REGISTER_PRODUCER, @@ -69,7 +75,8 @@ public class PerfBase CONSUMER_READY, PRODUCER_READY, PRODUCER_START, RECEIVED_END_MSG, CONSUMER_STOP, - RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS + RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS, + CONTINUE_TEST, STOP_TEST }; enum MessageType { @@ -102,14 +109,24 @@ public class PerfBase MessageType msgType = MessageType.BYTES; - public PerfBase() + public PerfBase(String prefix) { params = new TestParams(); + String host = ""; + try + { + host = InetAddress.getLocalHost().getHostName(); + } + catch (Exception e) + { + } + id = host + "-" + UUID.randomUUID().toString(); + this.prefix = prefix; + this.myControlQueueAddr = id + ";{create: always}"; } public void setUp() throws Exception { - if (params.getHost().equals("") || params.getPort() == -1) { con = new AMQConnection(params.getUrl()); @@ -124,7 +141,7 @@ public class PerfBase controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - dest = new AMQAnyDestination(params.getAddress()); + dest = createDestination(); controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); myControlQueue = session.createQueue(myControlQueueAddr); msgType = MessageType.getType(params.getMessageType()); @@ -134,9 +151,38 @@ public class PerfBase receiveFromController = controllerSession.createConsumer(myControlQueue); } + private Destination createDestination() throws Exception + { + if (params.isUseUniqueDests()) + { + System.out.println("Prefix : " + prefix); + Address addr = Address.parse(params.getAddress()); + AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); + int type = ((AMQSession_0_10)session).resolveAddressType(temp); + + if ( type == AMQDestination.TOPIC_TYPE) + { + addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); + System.out.println("Setting subject : " + addr); + } + else + { + addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions()); + System.out.println("Setting name : " + addr); + } + + return new AMQAnyDestination(addr); + } + else + { + return new AMQAnyDestination(params.getAddress()); + } + } + public synchronized void sendMessageToController(MapMessage m) throws Exception { m.setString(ID, id); + m.setString(REPLY_ADDR,myControlQueueAddr); sendToController.send(m); } @@ -152,6 +198,14 @@ public class PerfBase } + public boolean continueTest() throws Exception + { + MapMessage m = (MapMessage)receiveFromController.receive(); + OPCode code = OPCode.values()[m.getInt(CODE)]; + System.out.println("Received Code : " + code); + return (code == OPCode.CONTINUE_TEST); + } + public void tearDown() throws Exception { session.close(); diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java index ae439b7ce0..b63892bb51 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -22,6 +22,7 @@ package org.apache.qpid.tools; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import javax.jms.MapMessage; import javax.jms.Message; @@ -29,6 +30,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.TextMessage; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -49,7 +51,7 @@ import org.apache.qpid.thread.Threading; * b) They are on separate machines that have their time synced via a Time Server * * In order to calculate latency the producer inserts a timestamp - * hen the message is sent. The consumer will note the current time the message is + * when the message is sent. The consumer will note the current time the message is * received and will calculate the latency as follows * latency = rcvdTime - msg.getJMSTimestamp() * @@ -57,13 +59,9 @@ import org.apache.qpid.thread.Threading; * variance in latencies. * * Avg latency is measured by adding all latencies and dividing by the total msgs. - * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount * * Throughput * =========== - * System throughput is calculated as follows - * rcvdMsgCount/(rcvdTime - testStartTime) - * * Consumer rate is calculated as * rcvdMsgCount/(rcvdTime - startTime) * @@ -83,7 +81,6 @@ public class PerfConsumer extends PerfBase implements MessageListener long minLatency = Long.MAX_VALUE; long totalLatency = 0; // to calculate avg latency. int rcvdMsgCount = 0; - long testStartTime = 0; // to measure system throughput long startTime = 0; // to measure consumer throughput long rcvdTime = 0; boolean transacted = false; @@ -94,9 +91,9 @@ public class PerfConsumer extends PerfBase implements MessageListener final Object lock = new Object(); - public PerfConsumer() + public PerfConsumer(String prefix) { - super(); + super(prefix); System.out.println("Consumer ID : " + id); } @@ -104,26 +101,20 @@ public class PerfConsumer extends PerfBase implements MessageListener { super.setUp(); consumer = session.createConsumer(dest); + System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); // Storing the following two for efficiency transacted = params.isTransacted(); transSize = params.getTransactionSize(); printStdDev = params.isPrintStdDev(); - if (printStdDev) - { - sample = new ArrayList<Long>(params.getMsgCount()); - } - MapMessage m = controllerSession.createMapMessage(); m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); - m.setString(REPLY_ADDR,myControlQueueAddr); sendMessageToController(m); } public void warmup()throws Exception { receiveFromController(OPCode.CONSUMER_STARTWARMUP); - boolean start = false; Message msg = consumer.receive(); // This is to ensure we drain the queue before we start the actual test. while ( msg != null) @@ -146,12 +137,26 @@ public class PerfConsumer extends PerfBase implements MessageListener MapMessage m = controllerSession.createMapMessage(); m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); sendMessageToController(m); + consumer.setMessageListener(this); } public void startTest() throws Exception { - System.out.println("Consumer Starting test......"); - consumer.setMessageListener(this); + System.out.println("Consumer: " + id + " Starting test......" + "\n"); + resetCounters(); + } + + public void resetCounters() + { + rcvdMsgCount = 0; + maxLatency = 0; + minLatency = Long.MAX_VALUE; + totalLatency = 0; + if (printStdDev) + { + sample = null; + sample = new ArrayList<Long>(params.getMsgCount()); + } } public void sendResults() throws Exception @@ -193,7 +198,6 @@ public class PerfConsumer extends PerfBase implements MessageListener System.out.println(new StringBuilder("Std Dev : "). append(stdDev/Clock.convertToMiliSecs()).toString()); } - System.out.println("Consumer has completed the test......\n"); } public double calculateStdDev(double mean) @@ -262,8 +266,15 @@ public class PerfConsumer extends PerfBase implements MessageListener { setUp(); warmup(); - startTest(); - sendResults(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Consumer: " + id + " starting a new iteration ......\n"); + startTest(); + sendResults(); + nextIteration = continueTest(); + } tearDown(); } catch(Exception e) @@ -272,26 +283,43 @@ public class PerfConsumer extends PerfBase implements MessageListener } } - public static void main(String[] args) + @Override + public void tearDown() throws Exception + { + super.tearDown(); + } + + public static void main(String[] args) throws InterruptedException { - final PerfConsumer cons = new PerfConsumer(); - Runnable r = new Runnable() + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = Integer.getInteger("con_count",1); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) { - public void run() + + final PerfConsumer cons = new PerfConsumer(scriptId + i); + Runnable r = new Runnable() + { + public void run() + { + cons.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) { - cons.run(); + throw new Error("Error creating consumer thread",e); } - }; + t.start(); - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); } - t.start(); + testCompleted.await(); + System.out.println("Consumers have completed the test......\n"); } }
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java index 4cecd6f4df..ac6129ab68 100644 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -23,6 +23,7 @@ package org.apache.qpid.tools; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; @@ -30,6 +31,7 @@ import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageProducer; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -51,6 +53,12 @@ import org.apache.qpid.thread.Threading; * System throughput and latencies calculated by the PerfConsumer are more realistic * numbers. * + * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs + * I have done so far, it seems quite useful to compute the producer rate as it gives an + * indication of how the system behaves. For ex if there is a gap between producer and consumer rates + * you could clearly see the higher latencies and when producer and consumer rates are very close, + * latency is good. + * */ public class PerfProducer extends PerfBase { @@ -69,9 +77,9 @@ public class PerfProducer extends PerfBase double rateFactor = 0.4; double rate = 0.0; - public PerfProducer() + public PerfProducer(String prefix) { - super(); + super(prefix); System.out.println("Producer ID : " + id); } @@ -114,12 +122,12 @@ public class PerfProducer extends PerfBase } producer = session.createProducer(dest); + System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); producer.setDisableMessageID(params.isDisableMessageID()); producer.setDisableMessageTimestamp(params.isDisableTimestamp()); MapMessage m = controllerSession.createMapMessage(); m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); - m.setString(REPLY_ADDR,myControlQueueAddr); sendMessageToController(m); } @@ -178,7 +186,7 @@ public class PerfProducer extends PerfBase public void warmup()throws Exception { receiveFromController(OPCode.PRODUCER_STARTWARMUP); - System.out.println("Producer Warming up......"); + System.out.println("Producer: " + id + " Warming up......"); for (int i=0; i < params.getWarmupCount() -1; i++) { @@ -194,6 +202,7 @@ public class PerfProducer extends PerfBase public void startTest() throws Exception { + resetCounters(); receiveFromController(OPCode.PRODUCER_START); int count = params.getMsgCount(); boolean transacted = params.isTransacted(); @@ -236,8 +245,11 @@ public class PerfProducer extends PerfBase append(df.format(rate)). append(" msg/sec"). toString()); + } + + public void resetCounters() + { - System.out.println("Producer has completed the test......"); } public void sendEndMessage() throws Exception @@ -255,14 +267,27 @@ public class PerfProducer extends PerfBase sendMessageToController(msg); } + @Override + public void tearDown() throws Exception + { + super.tearDown(); + } + public void run() { try { setUp(); warmup(); - startTest(); - sendResults(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Producer: " + id + " starting a new iteration ......\n"); + startTest(); + sendResults(); + nextIteration = continueTest(); + } tearDown(); } catch(Exception e) @@ -298,27 +323,36 @@ public class PerfProducer extends PerfBase } - public static void main(String[] args) + public static void main(String[] args) throws InterruptedException { - final PerfProducer prod = new PerfProducer(); - prod.startControllerIfNeeded(); - Runnable r = new Runnable() + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = Integer.getInteger("con_count",1); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) { - public void run() + final PerfProducer prod = new PerfProducer(scriptId + i); + prod.startControllerIfNeeded(); + Runnable r = new Runnable() { - prod.run(); - } - }; + public void run() + { + prod.run(); + testCompleted.countDown(); + } + }; - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); } - t.start(); + testCompleted.await(); + System.out.println("Producers have completed the test......"); } }
\ No newline at end of file |
