summaryrefslogtreecommitdiff
path: root/qpid/java/tools
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/tools')
-rwxr-xr-xqpid/java/tools/bin/perf_report.sh131
-rw-r--r--qpid/java/tools/bin/run_pub.sh24
-rw-r--r--qpid/java/tools/bin/run_sub.sh25
-rw-r--r--qpid/java/tools/bin/setenv.sh49
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java349
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java64
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java102
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java267
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java262
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java168
10 files changed, 1441 insertions, 0 deletions
diff --git a/qpid/java/tools/bin/perf_report.sh b/qpid/java/tools/bin/perf_report.sh
new file mode 100755
index 0000000000..22c839e08c
--- /dev/null
+++ b/qpid/java/tools/bin/perf_report.sh
@@ -0,0 +1,131 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This will run the 8 use cases defined below and produce
+# a report in tabular format. Refer to the documentation
+# for more details.
+
+SUB_MEM=-Xmx1024M
+PUB_MEM=-Xmx1024M
+LOG_CONFIG="-Damqj.logging.level=WARN"
+
+. setenv.sh
+
+waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
+cleanup()
+{
+ pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
+ if [ "$pids" != "" ]; then
+ kill -3 $pids
+ kill -9 $pids >/dev/null 2>&1
+ fi
+}
+
+# $1 test name
+# $2 consumer options
+# $3 producer options
+run_testcase()
+{
+ sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out &
+ waitfor sub.out "Warming up"
+ sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out &
+ waitfor sub.out "Completed the test"
+ waitfor pub.out "Consumer has completed the test"
+ sleep 2 #give a grace period to shutdown
+ print_result $1
+}
+
+print_result()
+{
+ prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'`
+ sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'`
+ cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'`
+ avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'`
+ min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'`
+ max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'`
+
+ printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
+ echo "------------------------------------------------------------------------------------------------"
+}
+
+trap cleanup EXIT
+
+echo "Test report on " `date +%F`
+echo "================================================================================================"
+echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|"
+echo "------------------------------------------------------------------------------------------------"
+
+# Test 1 Trans Queue
+run_testcase "Trans_Queue" "" "-Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 2 Dura Queue
+run_testcase "Dura_Queue" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 3 Dura Queue Sync
+run_testcase "Dura_Queue_Sync" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_persistence=true"
+
+# Test 4 Dura Queue Sync Publish and Ack
+run_testcase "Dura_SyncPubAck" "-Ddurable=true -Dsync_ack=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
+
+# Test 5 Topic
+run_testcase "Topic" "-DtransDest=transientTopic" "-DtransDest=transientTopic -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 6 Durable Topic
+run_testcase "Dura_Topic" "-Ddurable=true -DtransDest=durableTopic" "-Ddurable=true -DtransDest=durableTopic -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 7 Fanout
+run_testcase "Fanout" "-DtransDest=fanoutQueue" "-DtransDest=fanoutQueue -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 8 Small TX
+run_testcase "Small_Txs_2" "-Ddurable=true -Dtransacted=true -Dtrans_size=1" \
+ "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1"
+
+# Test 9 Large TX
+run_testcase "Large_Txs_1000" "-Ddurable=true -Dtransacted=true -Dtrans_size=10" \
+ "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10"
+
+# Test 10 256 MSG
+run_testcase "Msg_256b" "" "-Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 11 512 MSG
+run_testcase "Msg_512b" "" "-Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 12 2048 MSG
+run_testcase "Msg_2048b" "" "-Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 13 Random size MSG
+run_testcase "Random_Msg_Size" "" "-Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 14 Random size MSG Durable
+run_testcase "Rand_Msg_Dura" "-Ddurable=true" "-Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 15 64K MSG
+run_testcase "Msg_64K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 16 Durable 64K MSG
+run_testcase "Msg_Durable_64K" "-Ddurable=true -Damqj.tcpNoDelay=true" \
+ "-Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 17 500K MSG
+run_testcase "Msg_500K" "-Damqj.tcpNoDelay=true" "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 18 Durable 500K MSG
+run_testcase "Msg_Dura_500K" "-Damqj.tcpNoDelay=true -Ddurable=true" \
+ "-Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
diff --git a/qpid/java/tools/bin/run_pub.sh b/qpid/java/tools/bin/run_pub.sh
new file mode 100644
index 0000000000..0702a55de9
--- /dev/null
+++ b/qpid/java/tools/bin/run_pub.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+. $QPID_TEST_HOME/bin/setenv.sh
+
+echo "$@"
+$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.testkit.perf.PerfProducer
diff --git a/qpid/java/tools/bin/run_sub.sh b/qpid/java/tools/bin/run_sub.sh
new file mode 100644
index 0000000000..f7e687de38
--- /dev/null
+++ b/qpid/java/tools/bin/run_sub.sh
@@ -0,0 +1,25 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+. $QPID_TEST_HOME/bin/setenv.sh
+
+echo "$@"
+$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.testkit.perf.PerfConsumer
+
diff --git a/qpid/java/tools/bin/setenv.sh b/qpid/java/tools/bin/setenv.sh
new file mode 100644
index 0000000000..24135e711b
--- /dev/null
+++ b/qpid/java/tools/bin/setenv.sh
@@ -0,0 +1,49 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Compiles the test classes and sets the CLASSPATH
+
+# check for QPID_TEST_HOME
+if [ "$QPID_TEST_HOME" = "" ] ; then
+ echo "ERROR: Please set QPID_TEST_HOME ...."
+ exit 1
+fi
+
+# check for JAVA_HOME
+if [ "$JAVA_HOME" = "" ] ; then
+ echo "ERROR: Please set JAVA_HOME ...."
+ exit 1
+fi
+
+# VENDOR_LIB path needs to be set
+# for Qpid set this to {qpid_checkout}/java/build/lib
+if [ "$VENDOR_LIB" = "" ] ; then
+ echo "ERROR: Please set VENDOR_LIB path in the script ...."
+ exit 1
+fi
+
+
+[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes
+
+CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"`
+$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'`
+
+export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH
+
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
new file mode 100644
index 0000000000..b88b242e6d
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
@@ -0,0 +1,349 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.thread.Threading;
+
+/**
+ * Latency test sends an x number of messages in warmup mode and wait for a confirmation
+ * from the consumer that it has successfully consumed them and ready to start the
+ * test. It will start sending y number of messages and each message will contain a time
+ * stamp. This will be used at the receiving end to measure the latency.
+ *
+ * It is important to have a sufficiently large number for the warmup count to
+ * ensure the system is in steady state before the test is started.
+ *
+ * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000)
+ * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1
+ *
+ * The idea is to get a latency sample for the system once it achieves steady state.
+ *
+ */
+
+public class LatencyTest extends PerfBase implements MessageListener
+{
+ MessageProducer producer;
+ MessageConsumer consumer;
+ Message msg;
+ byte[] payload;
+ long maxLatency = 0;
+ long minLatency = Long.MAX_VALUE;
+ long totalLatency = 0; // to calculate avg latency.
+ int rcvdMsgCount = 0;
+ double stdDev = 0;
+ double avgLatency = 0;
+ boolean warmup_mode = true;
+ boolean transacted = false;
+ int transSize = 0;
+
+ final List<Long> latencies;
+ final Lock lock = new ReentrantLock();
+ final Condition warmedUp;
+ final Condition testCompleted;
+
+ public LatencyTest()
+ {
+ super();
+ warmedUp = lock.newCondition();
+ testCompleted = lock.newCondition();
+ // Storing the following two for efficiency
+ transacted = params.isTransacted();
+ transSize = params.getTransactionSize();
+ latencies = new ArrayList <Long>(params.getMsgCount());
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ consumer = session.createConsumer(dest);
+ consumer.setMessageListener(this);
+
+ // if message caching is enabled we pre create the message
+ // else we pre create the payload
+ if (params.isCacheMessage())
+ {
+ msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg.setJMSDeliveryMode(params.isDurable()?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ }
+ else
+ {
+ payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ }
+
+ producer = session.createProducer(dest);
+ producer.setDisableMessageID(params.isDisableMessageID());
+ producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+ }
+
+ protected Message getNextMessage() throws Exception
+ {
+ if (params.isCacheMessage())
+ {
+ return msg;
+ }
+ else
+ {
+ msg = session.createBytesMessage();
+ ((BytesMessage)msg).writeBytes(payload);
+ return msg;
+ }
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+ int count = params.getWarmupCount();
+ for (int i=0; i < count; i++)
+ {
+ producer.send(getNextMessage());
+ }
+ Message msg = session.createTextMessage("End");
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ try
+ {
+ lock.lock();
+ warmedUp.await();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+ {
+ if (warmup_mode)
+ {
+ warmup_mode = false;
+ try
+ {
+ lock.lock();
+ warmedUp.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ else
+ {
+ computeStats();
+ }
+ }
+ else if (!warmup_mode)
+ {
+ long time = System.currentTimeMillis();
+ rcvdMsgCount ++;
+
+ if (transacted && (rcvdMsgCount % transSize == 0))
+ {
+ session.commit();
+ }
+
+ long latency = time - msg.getJMSTimestamp();
+ latencies.add(latency);
+ totalLatency = totalLatency + latency;
+ }
+
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when receiving messages");
+ }
+
+ }
+
+ private void computeStats()
+ {
+ avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+ double sigma = 0;
+
+ for (long latency: latencies)
+ {
+ maxLatency = Math.max(maxLatency, latency);
+ minLatency = Math.min(minLatency, latency);
+ sigma = sigma + Math.pow(latency - avgLatency,2);
+ }
+
+ stdDev = Math.sqrt(sigma/(rcvdMsgCount -1));
+
+ try
+ {
+ lock.lock();
+ testCompleted.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public void writeToFile() throws Exception
+ {
+ String fileName = System.getProperty("file");
+ PrintWriter writer = new PrintWriter(new FileOutputStream(fileName));
+ for (long latency: latencies)
+ {
+ writer.println(String.valueOf(latency));
+ }
+ writer.flush();
+ writer.close();
+ }
+
+ public void printToConsole()
+ {
+ System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
+ System.out.println(new StringBuilder("Standard Deviation : ").
+ append(df.format(stdDev)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Avg Latency : ").
+ append(df.format(avgLatency)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Min Latency : ").
+ append(minLatency).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Max Latency : ").
+ append(maxLatency).
+ append(" ms").toString());
+ System.out.println("Completed the test......\n");
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ int count = params.getMsgCount();
+
+ for(int i=0; i < count; i++ )
+ {
+ Message msg = getNextMessage();
+ msg.setJMSTimestamp(System.currentTimeMillis());
+ producer.send(msg);
+ if ( transacted && ((i+1) % transSize == 0))
+ {
+ session.commit();
+ }
+ }
+ Message msg = session.createTextMessage("End");
+ producer.send(msg);
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ }
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ lock.lock();
+ testCompleted.await();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+
+ producer.close();
+ consumer.close();
+ session.close();
+ con.close();
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+ final LatencyTest latencyTest = new LatencyTest();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ latencyTest.test();
+ latencyTest.printToConsole();
+ if (System.getProperty("file") != null)
+ {
+ try
+ {
+ latencyTest.writeToFile();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating latency test thread",e);
+ }
+ t.start();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
new file mode 100644
index 0000000000..8ab1379fce
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
@@ -0,0 +1,64 @@
+package org.apache.qpid.tools;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class MessageFactory
+{
+ public static Message createBytesMessage(Session ssn, int size) throws JMSException
+ {
+ BytesMessage msg = ssn.createBytesMessage();
+ msg.writeBytes(createMessagePayload(size).getBytes());
+ return msg;
+ }
+
+ public static Message createTextMessage(Session ssn, int size) throws JMSException
+ {
+ TextMessage msg = ssn.createTextMessage();
+ msg.setText(createMessagePayload(size));
+ return msg;
+ }
+
+ public static String createMessagePayload(int size)
+ {
+ String msgData = "Qpid Test Message";
+
+ StringBuffer buf = new StringBuffer(size);
+ int count = 0;
+ while (count <= (size - msgData.length()))
+ {
+ buf.append(msgData);
+ count += msgData.length();
+ }
+ if (count < size)
+ {
+ buf.append(msgData, 0, size - count);
+ }
+
+ return buf.toString();
+ }
+}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
new file mode 100644
index 0000000000..88e75fb6a9
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.text.DecimalFormat;
+import java.util.Hashtable;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+public class PerfBase
+{
+ TestParams params;
+ Connection con;
+ Session session;
+ Destination dest;
+ Destination feedbackDest;
+ DecimalFormat df = new DecimalFormat("###.##");
+
+ public PerfBase()
+ {
+ params = new TestParams();
+ }
+
+ public void setUp() throws Exception
+ {
+ Hashtable<String,String> env = new Hashtable<String,String>();
+ env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory());
+ env.put(Context.PROVIDER_URL, params.getProviderURL());
+
+ Context ctx = null;
+ try
+ {
+ ctx = new InitialContext(env);
+ }
+ catch(Exception e)
+ {
+ throw new Exception("Error initializing JNDI",e);
+
+ }
+
+ ConnectionFactory conFac = null;
+ try
+ {
+ conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory());
+ }
+ catch(Exception e)
+ {
+ throw new Exception("Error looking up connection factory",e);
+ }
+
+ con = conFac.createConnection();
+ con.start();
+ session = con.createSession(params.isTransacted(),
+ params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
+
+ try
+ {
+ dest = (Destination)ctx.lookup( params.isDurable()?
+ params.getDurableDestination():
+ params.getTransientDestination()
+ );
+ }
+ catch(Exception e)
+ {
+ throw new Exception("Error looking up destination",e);
+ }
+ }
+
+ public void handleError(Exception e,String msg)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg);
+ sb.append(" ");
+ sb.append(e.getMessage());
+ System.err.println(sb.toString());
+ e.printStackTrace();
+ }
+}
+
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
new file mode 100644
index 0000000000..0ef0455a64
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
@@ -0,0 +1,267 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.thread.Threading;
+
+/**
+ * PerfConsumer will receive x no of messages in warmup mode.
+ * Once it receives the Start message it will then signal the PerfProducer.
+ * It will start recording stats from the first message it receives after
+ * the warmup mode is done.
+ *
+ * The following calculations are done.
+ * The important numbers to look at is
+ * a) Avg Latency
+ * b) System throughput.
+ *
+ * Latency.
+ * =========
+ * Currently this test is written with the assumption that either
+ * a) The Perf Producer and Consumer are on the same machine
+ * b) They are on separate machines that have their time synced via a Time Server
+ *
+ * In order to calculate latency the producer inserts a timestamp
+ * hen the message is sent. The consumer will note the current time the message is
+ * received and will calculate the latency as follows
+ * latency = rcvdTime - msg.getJMSTimestamp()
+ *
+ * Through out the test it will keep track of the max and min latency to show the
+ * variance in latencies.
+ *
+ * Avg latency is measured by adding all latencies and dividing by the total msgs.
+ * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
+ *
+ * Throughput
+ * ===========
+ * System throughput is calculated as follows
+ * rcvdMsgCount/(rcvdTime - testStartTime)
+ *
+ * Consumer rate is calculated as
+ * rcvdMsgCount/(rcvdTime - startTime)
+ *
+ * Note that the testStartTime referes to when the producer sent the first message
+ * and startTime is when the consumer first received a message.
+ *
+ * rcvdTime keeps track of when the last message is received.
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ */
+
+public class PerfConsumer extends PerfBase implements MessageListener
+{
+ MessageConsumer consumer;
+ long maxLatency = 0;
+ long minLatency = Long.MAX_VALUE;
+ long totalLatency = 0; // to calculate avg latency.
+ int rcvdMsgCount = 0;
+ long testStartTime = 0; // to measure system throughput
+ long startTime = 0; // to measure consumer throughput
+ long rcvdTime = 0;
+ boolean transacted = false;
+ int transSize = 0;
+
+ final Object lock = new Object();
+
+ public PerfConsumer()
+ {
+ super();
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ consumer = session.createConsumer(dest);
+
+ // Storing the following two for efficiency
+ transacted = params.isTransacted();
+ transSize = params.getTransactionSize();
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+
+ boolean start = false;
+ while (!start)
+ {
+ Message msg = consumer.receive();
+ if (msg instanceof TextMessage)
+ {
+ if (((TextMessage)msg).getText().equals("End"))
+ {
+ start = true;
+ MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
+ temp.send(session.createMessage());
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ temp.close();
+ }
+ }
+ }
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ consumer.setMessageListener(this);
+ }
+
+ public void printResults() throws Exception
+ {
+ synchronized (lock)
+ {
+ lock.wait();
+ }
+
+ double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+ double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
+ double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
+ System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
+ System.out.println(new StringBuilder("Consumer rate : ").
+ append(df.format(consRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("System Throughput : ").
+ append(df.format(throughput)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Avg Latency : ").
+ append(df.format(avgLatency)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Min Latency : ").
+ append(minLatency).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Max Latency : ").
+ append(maxLatency).
+ append(" ms").toString());
+ System.out.println("Completed the test......\n");
+ }
+
+ public void notifyCompletion(Destination replyTo) throws Exception
+ {
+ MessageProducer tmp = session.createProducer(replyTo);
+ Message endMsg = session.createMessage();
+ tmp.send(endMsg);
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ tmp.close();
+ }
+
+ public void tearDown() throws Exception
+ {
+ consumer.close();
+ session.close();
+ con.close();
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+ {
+ notifyCompletion(msg.getJMSReplyTo());
+
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ else
+ {
+ rcvdTime = System.currentTimeMillis();
+ rcvdMsgCount ++;
+
+ if (rcvdMsgCount == 1)
+ {
+ startTime = rcvdTime;
+ testStartTime = msg.getJMSTimestamp();
+ }
+
+ if (transacted && (rcvdMsgCount % transSize == 0))
+ {
+ session.commit();
+ }
+
+ long latency = rcvdTime - msg.getJMSTimestamp();
+ maxLatency = Math.max(maxLatency, latency);
+ minLatency = Math.min(minLatency, latency);
+ totalLatency = totalLatency + latency;
+ }
+
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when receiving messages");
+ }
+
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ printResults();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ final PerfConsumer cons = new PerfConsumer();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ cons.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
+ }
+ t.start();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
new file mode 100644
index 0000000000..015d1e6205
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
@@ -0,0 +1,262 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+
+import org.apache.qpid.thread.Threading;
+
+/**
+ * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
+ * from the consumer that it has successfully consumed them and ready to start the
+ * test. It will start sending y no of messages and each message will contain a time
+ * stamp. This will be used at the receiving end to measure the latency.
+ *
+ * This is done with the assumption that both consumer and producer are running on
+ * the same machine or different machines which have time synced using a time server.
+ *
+ * This test also calculates the producer rate as follows.
+ * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ * Rajith - Producer rate is not an accurate perf metric IMO.
+ * It is heavily inlfuenced by any in memory buffering.
+ * System throughput and latencies calculated by the PerfConsumer are more realistic
+ * numbers.
+ *
+ */
+public class PerfProducer extends PerfBase
+{
+ MessageProducer producer;
+ Message msg;
+ byte[] payload;
+ List<byte[]> payloads;
+ boolean cacheMsg = false;
+ boolean randomMsgSize = false;
+ boolean durable = false;
+ Random random;
+ int msgSizeRange = 1024;
+
+ public PerfProducer()
+ {
+ super();
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ feedbackDest = session.createTemporaryQueue();
+
+ durable = params.isDurable();
+
+ // if message caching is enabled we pre create the message
+ // else we pre create the payload
+ if (params.isCacheMessage())
+ {
+ cacheMsg = true;
+
+ msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg.setJMSDeliveryMode(durable?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ }
+ else if (params.isRandomMsgSize())
+ {
+ random = new Random(20080921);
+ randomMsgSize = true;
+ msgSizeRange = params.getMsgSize();
+ payloads = new ArrayList<byte[]>(msgSizeRange);
+
+ for (int i=0; i < msgSizeRange; i++)
+ {
+ payloads.add(MessageFactory.createMessagePayload(i).getBytes());
+ }
+ }
+ else
+ {
+ payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ }
+
+ producer = session.createProducer(dest);
+ producer.setDisableMessageID(params.isDisableMessageID());
+ producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+ }
+
+ protected Message getNextMessage() throws Exception
+ {
+ if (cacheMsg)
+ {
+ return msg;
+ }
+ else
+ {
+ msg = session.createBytesMessage();
+
+ if (!randomMsgSize)
+ {
+ ((BytesMessage)msg).writeBytes(payload);
+ }
+ else
+ {
+ ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
+ }
+ msg.setJMSDeliveryMode(durable?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ return msg;
+ }
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+
+ for (int i=0; i < params.getWarmupCount() -1; i++)
+ {
+ producer.send(getNextMessage());
+ }
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ int count = params.getMsgCount();
+ boolean transacted = params.isTransacted();
+ int tranSize = params.getTransactionSize();
+
+ long start = System.currentTimeMillis();
+ for(int i=0; i < count; i++ )
+ {
+ Message msg = getNextMessage();
+ msg.setJMSTimestamp(System.currentTimeMillis());
+ producer.send(msg);
+ if ( transacted && ((i+1) % tranSize == 0))
+ {
+ session.commit();
+ }
+ }
+ long time = System.currentTimeMillis() - start;
+ double rate = ((double)count/(double)time)*1000;
+ System.out.println(new StringBuilder("Producer rate: ").
+ append(df.format(rate)).
+ append(" msg/sec").
+ toString());
+ }
+
+ public void waitForCompletion() throws Exception
+ {
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ System.out.println("Consumer has completed the test......");
+ }
+
+ public void tearDown() throws Exception
+ {
+ producer.close();
+ session.close();
+ con.close();
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ waitForCompletion();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+ final PerfProducer prod = new PerfProducer();
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ prod.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating producer thread",e);
+ }
+ t.start();
+ }
+} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
new file mode 100644
index 0000000000..f1b682ff32
--- /dev/null
+++ b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tools;
+
+import javax.jms.Session;
+
+public class TestParams
+{
+ private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+
+ private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties";
+
+ private String connectionFactory = "connectionFactory";
+
+ private String transientDest = "transientQueue";
+
+ private String durableDest = "durableQueue";
+
+ private int msg_size = 1024;
+
+ private int msg_type = 1; // not used yet
+
+ private boolean cacheMessage = false;
+
+ private boolean disableMessageID = false;
+
+ private boolean disableTimestamp = false;
+
+ private boolean durable = false;
+
+ private boolean transacted = false;
+
+ private int transaction_size = 1000;
+
+ private int ack_mode = Session.AUTO_ACKNOWLEDGE;
+
+ private int msg_count = 10;
+
+ private int warmup_count = 1;
+
+ private boolean random_msg_size = false;
+
+ public TestParams()
+ {
+ initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory);
+ providerURL = System.getProperty("java.naming.provider.url",providerURL);
+
+ transientDest = System.getProperty("transDest",transientDest);
+ durableDest = System.getProperty("durableDest",durableDest);
+
+ msg_size = Integer.getInteger("msg_size", 1024);
+ msg_type = Integer.getInteger("msg_type",1);
+ cacheMessage = Boolean.getBoolean("cache_msg");
+ disableMessageID = Boolean.getBoolean("disableMessageID");
+ disableTimestamp = Boolean.getBoolean("disableTimestamp");
+ durable = Boolean.getBoolean("durable");
+ transacted = Boolean.getBoolean("transacted");
+ transaction_size = Integer.getInteger("trans_size",1000);
+ ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE);
+ msg_count = Integer.getInteger("msg_count",msg_count);
+ warmup_count = Integer.getInteger("warmup_count",warmup_count);
+ random_msg_size = Boolean.getBoolean("random_msg_size");
+ }
+
+ public int getAckMode()
+ {
+ return ack_mode;
+ }
+
+ public String getConnectionFactory()
+ {
+ return connectionFactory;
+ }
+
+ public String getTransientDestination()
+ {
+ return transientDest;
+ }
+
+ public String getDurableDestination()
+ {
+ return durableDest;
+ }
+
+ public String getInitialContextFactory()
+ {
+ return initialContextFactory;
+ }
+
+ public int getMsgCount()
+ {
+ return msg_count;
+ }
+
+ public int getMsgSize()
+ {
+ return msg_size;
+ }
+
+ public int getMsgType()
+ {
+ return msg_type;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public String getProviderURL()
+ {
+ return providerURL;
+ }
+
+ public boolean isTransacted()
+ {
+ return transacted;
+ }
+
+ public int getTransactionSize()
+ {
+ return transaction_size;
+ }
+
+ public int getWarmupCount()
+ {
+ return warmup_count;
+ }
+
+ public boolean isCacheMessage()
+ {
+ return cacheMessage;
+ }
+
+ public boolean isDisableMessageID()
+ {
+ return disableMessageID;
+ }
+
+ public boolean isDisableTimestamp()
+ {
+ return disableTimestamp;
+ }
+
+ public boolean isRandomMsgSize()
+ {
+ return random_msg_size;
+ }
+
+}