diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-07-30 02:22:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-07-30 02:22:23 +0000 |
commit | 8523ced829d64862cfb8eede5db2172ddef243ff (patch) | |
tree | 9c33deb7125488a16599d247d2ab558381057f2e | |
parent | 635d538c62c49e1d92a490b571266314c33e66a9 (diff) | |
download | qpid-python-8523ced829d64862cfb8eede5db2172ddef243ff.tar.gz |
QPID-3358 Modified the controller to allow multiple iterations in order to support long durations tests.
You could now specify -Duration=<secs> to ask the controller to run the test for atleast that duration.
If the system is in the middle of running an iteration when the time is up, it will complete the iteration before ending the test.
If a duration is specified the Controller will output the results of
each iteration into a CSV file.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1152411 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java | 142 | ||||
-rw-r--r-- | java/tools/src/main/java/org/apache/qpid/tools/TestParams.java | 16 |
2 files changed, 146 insertions, 12 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java index 5f2c1a23dc..5c98c645f4 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java @@ -1,5 +1,6 @@ package org.apache.qpid.tools; +import java.io.FileWriter; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -13,8 +14,40 @@ import javax.jms.MessageProducer; import org.apache.qpid.client.message.AMQPEncodedMapMessage; +/** + * The Controller coordinates a test run between a number + * of producers and consumers, configured via -Dprod_count and -Dcons_count. + * + * It waits till all the producers and consumers have registered and then + * conducts a warmup run. Once all consumers and producers have completed + * the warmup run and is ready, it will conduct the actual test run and + * collect all stats from the participants and calculates the system + * throughput, the avg/min/max for producer rates, consumer rates and latency. + * + * These stats are then printed to std out. + * The Controller also prints events to std out to give a running account + * of the test run in progress. Ex registering of participants, starting warmup ..etc. + * This allows a scripting tool to monitor the progress. + * + * The Controller can be run in two modes. + * 1. A single test run (default) where it just runs until the message count specified + * for the producers via -Dmsg_count is sent and received. + * + * 2. Time based, configured via -Dduration=x, where x is in mins. + * In this mode, the Controller repeatedly cycles through the tests (after an initial + * warmup run) until the desired time is reached. If a test run is in progress + * and the time is up, it will allow the run the complete. + * + * After each iteration, the stats will be printed out in csv format to a separate log file. + * System throughput is calculated as follows + * totalMsgCount/(totalTestTime) + */ public class PerfTestController extends PerfBase implements MessageListener { + enum TestMode { SINGLE_RUN, TIME_BASED }; + + TestMode testMode = TestMode.SINGLE_RUN; + long totalTestTime; private double avgSystemLatency = 0.0; @@ -35,6 +68,7 @@ public class PerfTestController extends PerfBase implements MessageListener private int consumerCount = Integer.getInteger("cons_count", 1); private int producerCount = Integer.getInteger("prod_count", 1); + private int duration = Integer.getInteger("duration", -1); // in mins private Map<String,MapMessage> consumers; private Map<String,MapMessage> producers; @@ -48,10 +82,11 @@ public class PerfTestController extends PerfBase implements MessageListener private MessageConsumer consumer; private boolean printStdDev = false; + FileWriter writer; public PerfTestController() { - super(); + super(""); consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount); producers = new ConcurrentHashMap<String,MapMessage>(producerCount); @@ -59,16 +94,20 @@ public class PerfTestController extends PerfBase implements MessageListener prodRegistered = new CountDownLatch(producerCount); consReady = new CountDownLatch(consumerCount); prodReady = new CountDownLatch(producerCount); - receivedConsStats = new CountDownLatch(consumerCount); - receivedProdStats = new CountDownLatch(producerCount); - receivedEndMsg = new CountDownLatch(producerCount); printStdDev = params.isPrintStdDev(); + testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; } public void setUp() throws Exception { super.setUp(); + if (testMode == TestMode.TIME_BASED) + { + writer = new FileWriter("stats-csv.log"); + } consumer = controllerSession.createConsumer(controllerQueue); + System.out.println("\nController: " + producerCount + " producers are expected"); + System.out.println("Controller: " + consumerCount + " consumers are expected \n"); consumer.setMessageListener(this); consRegistered.await(); prodRegistered.await(); @@ -87,6 +126,7 @@ public class PerfTestController extends PerfBase implements MessageListener public void startTest() throws Exception { + resetCounters(); System.out.println("\nController Starting test......"); long start = Clock.getTime(); sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); @@ -97,6 +137,22 @@ public class PerfTestController extends PerfBase implements MessageListener receivedConsStats.await(); } + public void resetCounters() + { + minSystemLatency = Double.MAX_VALUE; + maxSystemLatency = 0; + maxSystemConsRate = 0.0; + minSystemConsRate = Double.MAX_VALUE; + maxSystemProdRate = 0.0; + minSystemProdRate = Double.MAX_VALUE; + + totalMsgCount = 0; + + receivedConsStats = new CountDownLatch(consumerCount); + receivedProdStats = new CountDownLatch(producerCount); + receivedEndMsg = new CountDownLatch(producerCount); + } + public void calcStats() throws Exception { double totLatency = 0.0; @@ -194,7 +250,6 @@ public class PerfTestController extends PerfBase implements MessageListener System.out.println(new StringBuilder("Avg System Std Dev : "). append(avgSystemLatencyStdDev)); } - System.out.println("Controller: Completed the test......\n"); } private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception @@ -230,11 +285,23 @@ public class PerfTestController extends PerfBase implements MessageListener switch (code) { case REGISTER_CONSUMER : + if (consRegistered.getCount() == 0) + { + System.out.println("Warning : Expected number of consumers have already registered," + + "ignoring extra consumer"); + break; + } consumers.put(m.getString(ID),m); consRegistered.countDown(); break; case REGISTER_PRODUCER : + if (prodRegistered.getCount() == 0) + { + System.out.println("Warning : Expected number of producers have already registered," + + "ignoring extra producer"); + break; + } producers.put(m.getString(ID),m); prodRegistered.countDown(); break; @@ -277,10 +344,36 @@ public class PerfTestController extends PerfBase implements MessageListener { setUp(); warmup(); - startTest(); - calcStats(); - printResults(); + if (testMode == TestMode.SINGLE_RUN) + { + startTest(); + calcStats(); + printResults(); + } + else + { + long startTime = Clock.getTime(); + long timeLimit = duration * 60 * 1000; // duration is in mins. + boolean nextIteration = true; + while (nextIteration) + { + startTest(); + calcStats(); + writeStatsToFile(); + if (Clock.getTime() - startTime < timeLimit) + { + sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); + sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); + nextIteration = true; + } + else + { + nextIteration = false; + } + } + } tearDown(); + } catch(Exception e) { @@ -288,6 +381,39 @@ public class PerfTestController extends PerfBase implements MessageListener } } + @Override + public void tearDown() throws Exception { + System.out.println("Controller: Completed the test......\n"); + if (testMode == TestMode.TIME_BASED) + { + writer.close(); + } + sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); + sendMessageToNodes(OPCode.STOP_TEST,producers.values()); + super.tearDown(); + } + + public void writeStatsToFile() throws Exception + { + writer.append(String.valueOf(totalMsgCount)).append(","); + writer.append(df.format(totalSystemThroughput)).append(","); + writer.append(df.format(avgSystemConsRate)).append(","); + writer.append(df.format(minSystemConsRate)).append(","); + writer.append(df.format(maxSystemConsRate)).append(","); + writer.append(df.format(avgSystemProdRate)).append(","); + writer.append(df.format(minSystemProdRate)).append(","); + writer.append(df.format(maxSystemProdRate)).append(","); + writer.append(df.format(avgSystemLatency)).append(","); + writer.append(df.format(minSystemLatency)).append(","); + writer.append(df.format(maxSystemLatency)); + if (printStdDev) + { + writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); + } + writer.append("\n"); + writer.flush(); + } + public static void main(String[] args) { PerfTestController controller = new PerfTestController(); diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java index a563aef6cc..d73be0181b 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -43,7 +43,7 @@ public class TestParams private int msg_size = 1024; - private int msg_type = 1; // not used yet + private int random_msg_size_start_from = 1; private boolean cacheMessage = false; @@ -73,6 +73,8 @@ public class TestParams private boolean externalController = false; + private boolean useUniqueDest = false; // useful when using multiple connections. + public TestParams() { @@ -82,7 +84,6 @@ public class TestParams address = System.getProperty("address",address); msg_size = Integer.getInteger("msg_size", 1024); - msg_type = Integer.getInteger("msg_type",1); cacheMessage = Boolean.getBoolean("cache_msg"); disableMessageID = Boolean.getBoolean("disableMessageID"); disableTimestamp = Boolean.getBoolean("disableTimestamp"); @@ -97,6 +98,8 @@ public class TestParams printStdDev = Boolean.getBoolean("print_std_dev"); rate = Long.getLong("rate",-1); externalController = Boolean.getBoolean("ext_controller"); + useUniqueDest = Boolean.getBoolean("use_unique_dest"); + random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1); } public String getUrl() @@ -134,9 +137,9 @@ public class TestParams return msg_size; } - public int getMsgType() + public int getRandomMsgSizeStartFrom() { - return msg_type; + return random_msg_size_start_from; } public boolean isDurable() @@ -203,4 +206,9 @@ public class TestParams { address = addr; } + + public boolean isUseUniqueDests() + { + return useUniqueDest; + } } |