summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-07-30 02:22:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-07-30 02:22:23 +0000
commit8523ced829d64862cfb8eede5db2172ddef243ff (patch)
tree9c33deb7125488a16599d247d2ab558381057f2e
parent635d538c62c49e1d92a490b571266314c33e66a9 (diff)
downloadqpid-python-8523ced829d64862cfb8eede5db2172ddef243ff.tar.gz
QPID-3358 Modified the controller to allow multiple iterations in order to support long durations tests.
You could now specify -Duration=<secs> to ask the controller to run the test for atleast that duration. If the system is in the middle of running an iteration when the time is up, it will complete the iteration before ending the test. If a duration is specified the Controller will output the results of each iteration into a CSV file. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1152411 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java142
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/TestParams.java16
2 files changed, 146 insertions, 12 deletions
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
index 5f2c1a23dc..5c98c645f4 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
@@ -1,5 +1,6 @@
package org.apache.qpid.tools;
+import java.io.FileWriter;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -13,8 +14,40 @@ import javax.jms.MessageProducer;
import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+/**
+ * The Controller coordinates a test run between a number
+ * of producers and consumers, configured via -Dprod_count and -Dcons_count.
+ *
+ * It waits till all the producers and consumers have registered and then
+ * conducts a warmup run. Once all consumers and producers have completed
+ * the warmup run and is ready, it will conduct the actual test run and
+ * collect all stats from the participants and calculates the system
+ * throughput, the avg/min/max for producer rates, consumer rates and latency.
+ *
+ * These stats are then printed to std out.
+ * The Controller also prints events to std out to give a running account
+ * of the test run in progress. Ex registering of participants, starting warmup ..etc.
+ * This allows a scripting tool to monitor the progress.
+ *
+ * The Controller can be run in two modes.
+ * 1. A single test run (default) where it just runs until the message count specified
+ * for the producers via -Dmsg_count is sent and received.
+ *
+ * 2. Time based, configured via -Dduration=x, where x is in mins.
+ * In this mode, the Controller repeatedly cycles through the tests (after an initial
+ * warmup run) until the desired time is reached. If a test run is in progress
+ * and the time is up, it will allow the run the complete.
+ *
+ * After each iteration, the stats will be printed out in csv format to a separate log file.
+ * System throughput is calculated as follows
+ * totalMsgCount/(totalTestTime)
+ */
public class PerfTestController extends PerfBase implements MessageListener
{
+ enum TestMode { SINGLE_RUN, TIME_BASED };
+
+ TestMode testMode = TestMode.SINGLE_RUN;
+
long totalTestTime;
private double avgSystemLatency = 0.0;
@@ -35,6 +68,7 @@ public class PerfTestController extends PerfBase implements MessageListener
private int consumerCount = Integer.getInteger("cons_count", 1);
private int producerCount = Integer.getInteger("prod_count", 1);
+ private int duration = Integer.getInteger("duration", -1); // in mins
private Map<String,MapMessage> consumers;
private Map<String,MapMessage> producers;
@@ -48,10 +82,11 @@ public class PerfTestController extends PerfBase implements MessageListener
private MessageConsumer consumer;
private boolean printStdDev = false;
+ FileWriter writer;
public PerfTestController()
{
- super();
+ super("");
consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
@@ -59,16 +94,20 @@ public class PerfTestController extends PerfBase implements MessageListener
prodRegistered = new CountDownLatch(producerCount);
consReady = new CountDownLatch(consumerCount);
prodReady = new CountDownLatch(producerCount);
- receivedConsStats = new CountDownLatch(consumerCount);
- receivedProdStats = new CountDownLatch(producerCount);
- receivedEndMsg = new CountDownLatch(producerCount);
printStdDev = params.isPrintStdDev();
+ testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED;
}
public void setUp() throws Exception
{
super.setUp();
+ if (testMode == TestMode.TIME_BASED)
+ {
+ writer = new FileWriter("stats-csv.log");
+ }
consumer = controllerSession.createConsumer(controllerQueue);
+ System.out.println("\nController: " + producerCount + " producers are expected");
+ System.out.println("Controller: " + consumerCount + " consumers are expected \n");
consumer.setMessageListener(this);
consRegistered.await();
prodRegistered.await();
@@ -87,6 +126,7 @@ public class PerfTestController extends PerfBase implements MessageListener
public void startTest() throws Exception
{
+ resetCounters();
System.out.println("\nController Starting test......");
long start = Clock.getTime();
sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
@@ -97,6 +137,22 @@ public class PerfTestController extends PerfBase implements MessageListener
receivedConsStats.await();
}
+ public void resetCounters()
+ {
+ minSystemLatency = Double.MAX_VALUE;
+ maxSystemLatency = 0;
+ maxSystemConsRate = 0.0;
+ minSystemConsRate = Double.MAX_VALUE;
+ maxSystemProdRate = 0.0;
+ minSystemProdRate = Double.MAX_VALUE;
+
+ totalMsgCount = 0;
+
+ receivedConsStats = new CountDownLatch(consumerCount);
+ receivedProdStats = new CountDownLatch(producerCount);
+ receivedEndMsg = new CountDownLatch(producerCount);
+ }
+
public void calcStats() throws Exception
{
double totLatency = 0.0;
@@ -194,7 +250,6 @@ public class PerfTestController extends PerfBase implements MessageListener
System.out.println(new StringBuilder("Avg System Std Dev : ").
append(avgSystemLatencyStdDev));
}
- System.out.println("Controller: Completed the test......\n");
}
private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
@@ -230,11 +285,23 @@ public class PerfTestController extends PerfBase implements MessageListener
switch (code)
{
case REGISTER_CONSUMER :
+ if (consRegistered.getCount() == 0)
+ {
+ System.out.println("Warning : Expected number of consumers have already registered," +
+ "ignoring extra consumer");
+ break;
+ }
consumers.put(m.getString(ID),m);
consRegistered.countDown();
break;
case REGISTER_PRODUCER :
+ if (prodRegistered.getCount() == 0)
+ {
+ System.out.println("Warning : Expected number of producers have already registered," +
+ "ignoring extra producer");
+ break;
+ }
producers.put(m.getString(ID),m);
prodRegistered.countDown();
break;
@@ -277,10 +344,36 @@ public class PerfTestController extends PerfBase implements MessageListener
{
setUp();
warmup();
- startTest();
- calcStats();
- printResults();
+ if (testMode == TestMode.SINGLE_RUN)
+ {
+ startTest();
+ calcStats();
+ printResults();
+ }
+ else
+ {
+ long startTime = Clock.getTime();
+ long timeLimit = duration * 60 * 1000; // duration is in mins.
+ boolean nextIteration = true;
+ while (nextIteration)
+ {
+ startTest();
+ calcStats();
+ writeStatsToFile();
+ if (Clock.getTime() - startTime < timeLimit)
+ {
+ sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values());
+ sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values());
+ nextIteration = true;
+ }
+ else
+ {
+ nextIteration = false;
+ }
+ }
+ }
tearDown();
+
}
catch(Exception e)
{
@@ -288,6 +381,39 @@ public class PerfTestController extends PerfBase implements MessageListener
}
}
+ @Override
+ public void tearDown() throws Exception {
+ System.out.println("Controller: Completed the test......\n");
+ if (testMode == TestMode.TIME_BASED)
+ {
+ writer.close();
+ }
+ sendMessageToNodes(OPCode.STOP_TEST,consumers.values());
+ sendMessageToNodes(OPCode.STOP_TEST,producers.values());
+ super.tearDown();
+ }
+
+ public void writeStatsToFile() throws Exception
+ {
+ writer.append(String.valueOf(totalMsgCount)).append(",");
+ writer.append(df.format(totalSystemThroughput)).append(",");
+ writer.append(df.format(avgSystemConsRate)).append(",");
+ writer.append(df.format(minSystemConsRate)).append(",");
+ writer.append(df.format(maxSystemConsRate)).append(",");
+ writer.append(df.format(avgSystemProdRate)).append(",");
+ writer.append(df.format(minSystemProdRate)).append(",");
+ writer.append(df.format(maxSystemProdRate)).append(",");
+ writer.append(df.format(avgSystemLatency)).append(",");
+ writer.append(df.format(minSystemLatency)).append(",");
+ writer.append(df.format(maxSystemLatency));
+ if (printStdDev)
+ {
+ writer.append(",").append(String.valueOf(avgSystemLatencyStdDev));
+ }
+ writer.append("\n");
+ writer.flush();
+ }
+
public static void main(String[] args)
{
PerfTestController controller = new PerfTestController();
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
index a563aef6cc..d73be0181b 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
@@ -43,7 +43,7 @@ public class TestParams
private int msg_size = 1024;
- private int msg_type = 1; // not used yet
+ private int random_msg_size_start_from = 1;
private boolean cacheMessage = false;
@@ -73,6 +73,8 @@ public class TestParams
private boolean externalController = false;
+ private boolean useUniqueDest = false; // useful when using multiple connections.
+
public TestParams()
{
@@ -82,7 +84,6 @@ public class TestParams
address = System.getProperty("address",address);
msg_size = Integer.getInteger("msg_size", 1024);
- msg_type = Integer.getInteger("msg_type",1);
cacheMessage = Boolean.getBoolean("cache_msg");
disableMessageID = Boolean.getBoolean("disableMessageID");
disableTimestamp = Boolean.getBoolean("disableTimestamp");
@@ -97,6 +98,8 @@ public class TestParams
printStdDev = Boolean.getBoolean("print_std_dev");
rate = Long.getLong("rate",-1);
externalController = Boolean.getBoolean("ext_controller");
+ useUniqueDest = Boolean.getBoolean("use_unique_dest");
+ random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1);
}
public String getUrl()
@@ -134,9 +137,9 @@ public class TestParams
return msg_size;
}
- public int getMsgType()
+ public int getRandomMsgSizeStartFrom()
{
- return msg_type;
+ return random_msg_size_start_from;
}
public boolean isDurable()
@@ -203,4 +206,9 @@ public class TestParams
{
address = addr;
}
+
+ public boolean isUseUniqueDests()
+ {
+ return useUniqueDest;
+ }
}