summaryrefslogtreecommitdiff
path: root/RC9/qpid/java/testkit
diff options
context:
space:
mode:
Diffstat (limited to 'RC9/qpid/java/testkit')
-rw-r--r--RC9/qpid/java/testkit/README153
-rw-r--r--RC9/qpid/java/testkit/bin/perf_report.sh100
-rw-r--r--RC9/qpid/java/testkit/bin/run_pub.sh24
-rw-r--r--RC9/qpid/java/testkit/bin/run_soak_client.sh70
-rw-r--r--RC9/qpid/java/testkit/bin/run_sub.sh25
-rw-r--r--RC9/qpid/java/testkit/bin/setenv.sh49
-rw-r--r--RC9/qpid/java/testkit/bin/soak_report.sh161
-rw-r--r--RC9/qpid/java/testkit/build.xml27
-rw-r--r--RC9/qpid/java/testkit/etc/jndi.properties35
-rw-r--r--RC9/qpid/java/testkit/etc/test.log4j28
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java64
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java332
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java102
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java248
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java207
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java160
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java152
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java153
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java166
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java138
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java134
-rw-r--r--RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java146
22 files changed, 2674 insertions, 0 deletions
diff --git a/RC9/qpid/java/testkit/README b/RC9/qpid/java/testkit/README
new file mode 100644
index 0000000000..fdde734027
--- /dev/null
+++ b/RC9/qpid/java/testkit/README
@@ -0,0 +1,153 @@
+Introduction
+============
+
+The Test kit for the java client consists of 2 components.
+
+1) A Simple Perf Test that can be used to,
+ a) Run a predefined perf report consisting of 8 use cases (see below)
+ b) Run a producer and a consumer with a number of different options
+
+2) Soak tests that can be run for longer durations (hours or days).
+
+I am planning to add some stress tests to this module as well.
+Please note this is not a replacement for the existing perf/systests etc.
+But rather a small test kit thats focused on providing a packaged set of tests that can be quickly deployed on an environment to do quick smoke testing or easily setup a soak test.
+
+Table of Contents
+=================
+1. Perf Kit
+2. Soak Kit
+3. Perf Test use cases
+4. Soak Test use cases
+5. Running the sample perf test report
+6. Running the sample soak test report
+
+1.0 Perf Kit
+------------
+1.1 The perf kit can be packaged as an RPM or a tar file and deploy on a target environment and run the perf report.
+Or else a perf report can be automated to run every day or so an record numbers to catch perf regressions.
+
+1.2 It calculates the following results in msg/sec.
+
+ System throuhgput : no_of_msgs / (time_last_msg_rcvd - time_first_msg_send)
+
+ Producer rate : no_of_msgs / (time_after_sending - time_before_sending)
+
+ Producer rate : no_of_msgs / (time_last_msg_rcvd - time_first_msg_rcvd)
+
+ Latency : time_msg_rcvd - time_msg_sent
+
+The test will print min, max and avg latency.
+
+1.3 The test assume that both producer and consumer are run on the same machine or different machines that are time synced.
+
+1.4 You can also use run_sub.sh and run_pub.sh to run different use cases with several options.
+ Please look at TestParams.java for all the configurable options.
+
+1.5 You can also use the test kit to benchmark against any vendor.
+
+
+2.0 Soak tests
+--------------
+2.0 This includes a set of soak tests that can be run for a longer duration.
+
+2.1 A typical test will send x-1 messages and the xth message will contain an "End" marker.
+ The producer will print the timestamp as soon as it sends the xth message.
+ The consumer will reply with an empty message to the replyTo destination given in the xth message.
+ The consumer prints the throuhgput for the iteration and the latency for the xth message.
+ A typical value for x is 100k
+
+2.2 The feedback loop prevents the producer from overrunning the consumer.
+ And the printout for every xth message will let you know how many iterations been completed at any given time.
+ (Ex a simple cat log | wc -l will give you the how many iterations have been completed so far).
+
+2.2 The following results can be calculated for these tests.
+
+ Memory, CPU for each producer/consumer - look at testkit/bin/run_soak_client.sh for an example
+
+ You can find the Avg, Min & Max for throughput, latency, CPU and memory for the entire test run.
+ (look at testkit/bin/soak_report.sh) for an example).
+
+ You could also graph throughput, latency, CPU and memory using the comma separated log files.
+
+2.2 If you use different machines for producer and consumer the machines have to be time synced to have meaningful latency samples.
+
+3.0 Perf Test report use cases
+-------------------------------
+3.1 Please check testkit/bin/perf_report.sh for more details
+
+3.2 A typical test run will send 1000 msgs during warmup and 200k msgs for result calculation.
+
+Test 1 Trans Queue
+
+Test 2 Dura Queue
+
+Test 3 Dura Queue Sync
+
+Test 4 Topic
+
+Test 5 Durable Topic
+
+Test 6 Fanout
+
+Test 7 Small TX (about 2 msgs per tx)
+
+Test 8 Large TX (about 1000 msgs per tx)
+
+
+4.0 Soak tests use cases
+-------------------------
+4.1 Following are the current tests available in the test kit.
+
+4.2 Please refer to the source to see the javadoc and options
+
+
+1. SimpleProducer/Consumer sends X messages at a time and will wait for confirmation from producer before proceeding with the next iteration. A no of options can be configured.
+
+2. MultiThreadedProducer/Consumer does the same thing as above but runs each session in a separate thread.
+ It can also send messages transactionally. Again a no of options can be configured.
+
+3. ResourceLeakTest will setup consumer/producers sends x messages and then teard down everything and continue again.
+
+
+5.0 Running the sample perf test report
+---------------------------------------
+The testkit/bin contains perf_report.sh.
+It runs the above 8 use cases against a broker and print the results in a tabular format.
+
+For example
+================================================================================================
+|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|
+------------------------------------------------------------------------------------------------
+|Trans_Queue | xxxxx.xx| xxxxx.xx| xxxxx.xx| xx.xx| x| xx|
+
+
+5.1 running perf_report.sh
+
+5.1.1 set JAVA_HOME to point to Java 1.5 and above
+5.1.2 set QPID_TEST_HOME to point to the testkit dir
+5.1.3 set VENDOR_LIB to point to the Qpid (or other JMS providers) jar files.
+5.1.4 start a broker
+5.1.5 update the testkit/etc/jndi.properties to point to the correct broker
+5.1.6 execute perf_report.sh
+
+
+6.0 Running the sample soak test report
+---------------------------------------
+The testkit/bin contains soak_report.sh
+It runs MultiThreadedProducer/Consumer for the duration specified and prints a report for the following stats.
+Avg, Min and Max for System Throughput, letency, CPU and memory.
+
+6.1 running soak_report.sh
+
+5.1.1 set JAVA_HOME to point to Java 1.5 and above
+5.1.2 set QPID_TEST_HOME to point to the testkit dir
+5.1.3 set JAR_PATH to point to the Qpid jars
+5.1.4 start a broker
+5.1.5 execute soak_report.sh with correct params.
+ Ex sh soak_report.sh 1 36000 will run for 10 hours colllecting CPU, memory every second.
+
+5.1.6 Please note the total duration for the test is log_freq * log_iterations
+ So if you want to run the test for 10 hours and collect 10 second samples then do the following
+ sh soak_report.sh 10 3600
+
diff --git a/RC9/qpid/java/testkit/bin/perf_report.sh b/RC9/qpid/java/testkit/bin/perf_report.sh
new file mode 100644
index 0000000000..9e574cad7a
--- /dev/null
+++ b/RC9/qpid/java/testkit/bin/perf_report.sh
@@ -0,0 +1,100 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This will run the 8 use cases defined below and produce
+# a report in tabular format. Refer to the documentation
+# for more details.
+
+SUB_MEM=-Xmx1024M
+PUB_MEM=-Xmx1024M
+LOG_CONFIG=-Dlog4j.configuration="$QPID_TEST_HOME/etc/test.log4j"
+
+. setenv.sh
+
+waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
+cleanup()
+{
+ pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
+ if [ "$pids" != "" ]; then
+ kill -3 $pids
+ kill -9 $pids >/dev/null 2>&1
+ fi
+}
+
+# $1 test name
+# $2 consumer options
+# $3 producer options
+run_testcase()
+{
+ sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out &
+ waitfor sub.out "Warming up"
+ sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out &
+ waitfor sub.out "Completed the test"
+ waitfor pub.out "Consumer has completed the test"
+ sleep 2 #give a grace period to shutdown
+ print_result $1
+}
+
+print_result()
+{
+ prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'`
+ sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'`
+ cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'`
+ avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'`
+ min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'`
+ max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'`
+
+ printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
+ echo "------------------------------------------------------------------------------------------------"
+}
+
+trap cleanup EXIT
+
+echo "Test report on " `date +%F`
+echo "================================================================================================"
+echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|"
+echo "------------------------------------------------------------------------------------------------"
+
+# Test 1 Trans Queue
+run_testcase "Trans_Queue" "" "-Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 2 Dura Queue
+run_testcase "Dura_Queue" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 3 Dura Queue Sync
+run_testcase "Dura_Queue_Sync" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_persistence=true"
+
+# Test 4 Topic
+run_testcase "Topic" "-DtransDest=transientTopic" "-DtransDest=transientTopic -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 5 Durable Topic
+#run_testcase "Dura_Topic" "-Ddurable=true -DtransDest=durableTopic" "-Ddurable=true -DtransDest=durableTopic -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 6 Fanout
+run_testcase "Fanout" "-DtransDest=fanoutQueue" "-DtransDest=fanoutQueue -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 7 Small TX
+run_testcase "Small_Txs_2" "-Ddurable=true -Dtransacted=true -Dtrans_size=1" \
+ "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1"
+
+# Test 8 Large TX
+run_testcase "Large_Txs_1000" "-Ddurable=true -Dtransacted=true -Dtrans_size=10" \
+ "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10"
+
diff --git a/RC9/qpid/java/testkit/bin/run_pub.sh b/RC9/qpid/java/testkit/bin/run_pub.sh
new file mode 100644
index 0000000000..0702a55de9
--- /dev/null
+++ b/RC9/qpid/java/testkit/bin/run_pub.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+. $QPID_TEST_HOME/bin/setenv.sh
+
+echo "$@"
+$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.testkit.perf.PerfProducer
diff --git a/RC9/qpid/java/testkit/bin/run_soak_client.sh b/RC9/qpid/java/testkit/bin/run_soak_client.sh
new file mode 100644
index 0000000000..ea1721d988
--- /dev/null
+++ b/RC9/qpid/java/testkit/bin/run_soak_client.sh
@@ -0,0 +1,70 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# This is a sample script for a soak test on
+# linux environment.
+# This will start n of Producers processors and record their CPU and memory stats.
+# Also the Producer out will be saved to a file as well.
+
+if [ "$JAR_PATH" = "" ] ; then
+ echo "ERROR: Please set JAR_PATH to point to the Qpid libraries ...."
+ exit 1
+fi
+
+#1 PID, $2 freq, $3 count
+calc_stats(){
+
+for (( i = 0 ; i <= $3; i++ ))
+ do
+ cpu=`ps auxw | grep $1 | grep -v 'grep' | awk '{print $3}'`
+ mem=`pmap $1 | grep total | grep -v 'grep' | awk '{print substr($2,0,length($2)-1)}'`
+ echo $i","$mem","$cpu
+ sleep $2
+ cpu="0.0"
+ mem="0"
+ done
+ kill -9 $1
+}
+
+# Num of producer processors to start
+num=$1
+# Log frequency in seconds
+log_freq=$2
+# Num of iterations
+log_iter=$3
+
+class_name=$4
+log_file_name=`echo $class_name | cut -d. -f6`
+
+# The total time for the test is determined by the
+# log_freq * log_iter.
+
+shift 4
+CLASSPATH=`find $JAR_PATH -name '*.jar' | tr '\n' ":"`
+
+JVM_ARGS="-Xmx1500M $@"
+echo "Starting $log_file_name with the following params $JVM_ARGS"
+
+for (( c = 1 ; c <= $num; c++ ))
+do
+ $JAVA_HOME/bin/java $JVM_ARGS -cp $CLASSPATH $class_name > ${log_file_name}_${c}.log &
+ pid=`jobs -l %% | awk '{print $2}'`
+ calc_stats $pid $log_freq $log_iter > ${log_file_name}_process_${c}.log &
+done
diff --git a/RC9/qpid/java/testkit/bin/run_sub.sh b/RC9/qpid/java/testkit/bin/run_sub.sh
new file mode 100644
index 0000000000..f7e687de38
--- /dev/null
+++ b/RC9/qpid/java/testkit/bin/run_sub.sh
@@ -0,0 +1,25 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+. $QPID_TEST_HOME/bin/setenv.sh
+
+echo "$@"
+$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.testkit.perf.PerfConsumer
+
diff --git a/RC9/qpid/java/testkit/bin/setenv.sh b/RC9/qpid/java/testkit/bin/setenv.sh
new file mode 100644
index 0000000000..24135e711b
--- /dev/null
+++ b/RC9/qpid/java/testkit/bin/setenv.sh
@@ -0,0 +1,49 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Compiles the test classes and sets the CLASSPATH
+
+# check for QPID_TEST_HOME
+if [ "$QPID_TEST_HOME" = "" ] ; then
+ echo "ERROR: Please set QPID_TEST_HOME ...."
+ exit 1
+fi
+
+# check for JAVA_HOME
+if [ "$JAVA_HOME" = "" ] ; then
+ echo "ERROR: Please set JAVA_HOME ...."
+ exit 1
+fi
+
+# VENDOR_LIB path needs to be set
+# for Qpid set this to {qpid_checkout}/java/build/lib
+if [ "$VENDOR_LIB" = "" ] ; then
+ echo "ERROR: Please set VENDOR_LIB path in the script ...."
+ exit 1
+fi
+
+
+[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes
+
+CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"`
+$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'`
+
+export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH
+
diff --git a/RC9/qpid/java/testkit/bin/soak_report.sh b/RC9/qpid/java/testkit/bin/soak_report.sh
new file mode 100644
index 0000000000..9da8bfa234
--- /dev/null
+++ b/RC9/qpid/java/testkit/bin/soak_report.sh
@@ -0,0 +1,161 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Sample script to run a soak test with MultiThreadedProducer/Consumer.
+# You need to provide the log freq and no of iterations
+# Ex to run 10 hours and collect 1 second samples
+# soak_report.sh 1 36000
+
+# This script assumes that a suitable broker instance is started.
+
+log_freq=$1
+log_iter=$2
+shift 2
+JVM_ARGS=$@
+
+if [ "$QPID_TEST_HOME" = "" ] ; then
+ echo "ERROR: Please set QPID_TEST_HOME ...."
+ exit 1
+fi
+
+print_rates()
+{
+ cat $1 | awk '{
+ FS = ",";
+ count = 0;
+ total_latency = 0;
+ min_latency = 9223372036854775807;
+ max_latency = 0;
+
+ total_tp = 0;
+ min_tp = 50000;
+ max_tp = 0;
+
+ while ((getline) == 1)
+ {
+ total_latency = total_latency + $3
+ total_tp = total_tp + $2
+
+ if ($3 > 0)
+ {
+ min_latency = (($3 < min_latency) ? $3 : min_latency);
+ max_latency = (($3 > max_latency) ? $3 : max_latency);
+ }
+ if ($2 > 0)
+ {
+ min_tp = (($2 < min_tp) ? $2 : min_tp);
+ max_tp = (($2 > max_tp) ? $2 : max_tp);
+ }
+
+ count = count + 1
+ }
+
+ print "Avg Latency (ms) : " total_latency/count
+ print "Max Latency (ms) : " max_latency
+ print "Min Latency (ms) : " min_latency
+
+ print ""
+ print "Avg Throughput (msg/sec) : " total_tp/count
+ print "Max Throughput (msg/sec) : " max_tp
+ print "Min Throughput (msg/sec) : " min_tp
+
+ print ""
+ print "Total Iteratons " count
+
+ }'
+}
+
+print_system_stats()
+{
+ cat $1 | awk '{
+ FS = ",";
+ count = 0;
+ total_memory = 0;
+ min_memory = 9223372036854775807;
+ max_memory = 0;
+
+ total_cp = 0;
+ min_cp = 50000;
+ max_cp = 0;
+
+ while ((getline) == 1)
+ {
+ total_memory = total_memory + $2
+ total_cp = total_cp + $3
+
+ if ($2 > 0)
+ {
+ min_memory = (($2 < min_memory) ? $2 : min_memory);
+ max_memory = (($2 > max_memory) ? $2 : max_memory);
+ }
+ if ($3 > 0)
+ {
+ min_cp = (($3 < min_cp) ? $3 : min_cp);
+ max_cp = (($3 > max_cp) ? $3 : max_cp);
+ }
+
+ count = count + 1
+ }
+
+ print "Avg Memory (MB) : " total_memory/(count*1024)
+ print "Max Memory (MB) : " max_memory/1024
+ print "Min Memory (MB) : " min_memory/1024
+
+ print ""
+ print "Avg CPU : " total_cp/count
+ print "Max CPU : " max_cp
+ print "Min CPU : " min_cp
+
+ print ""
+ print "Total Iteratons " count
+
+ }'
+}
+
+
+cleanup()
+{
+ kill -9 `ps aux | grep java | grep soak | awk '{ print $2 }'`
+}
+
+print_results()
+{
+ printf "\n======================================================= \n"
+ print_rates MultiThreadedConsumer_1.log
+ printf "\nConsumer process stats "
+ printf "\n----------------------- \n"
+ print_system_stats MultiThreadedConsumer_process_1.log
+ printf "\nProducer process stats "
+ printf "\n----------------------- \n"
+ print_system_stats MultiThreadedProducer_process_1.log
+ printf "\n------------------------------------------------------- \n"
+}
+
+trap cleanup EXIT
+
+# runs a single instance of the MultiThreadedConsumer and MultiThreadedProducer
+sh $QPID_TEST_HOME/bin/run_soak_client.sh 1 $log_freq $log_iter org.apache.qpid.testkit.soak.MultiThreadedConsumer $JVM_ARGS
+sh $QPID_TEST_HOME/bin/run_soak_client.sh 1 $log_freq $log_iter org.apache.qpid.testkit.soak.MultiThreadedProducer $JVM_ARGS
+
+sleep_time=$((log_freq * log_iter))
+echo "sleep time : " $sleep_time
+sleep $((log_freq * log_iter))
+
+print_results
diff --git a/RC9/qpid/java/testkit/build.xml b/RC9/qpid/java/testkit/build.xml
new file mode 100644
index 0000000000..94b97d270d
--- /dev/null
+++ b/RC9/qpid/java/testkit/build.xml
@@ -0,0 +1,27 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="Test Kit" default="build">
+
+ <property name="module.depends" value="client broker common"/>
+
+ <import file="../module.xml"/>
+
+</project>
diff --git a/RC9/qpid/java/testkit/etc/jndi.properties b/RC9/qpid/java/testkit/etc/jndi.properties
new file mode 100644
index 0000000000..f535975844
--- /dev/null
+++ b/RC9/qpid/java/testkit/etc/jndi.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+
+# use the following property to configure the default connector
+#java.naming.provider.url - ignored.
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.connectionFactory = amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'
+
+# Register an AMQP destination in JNDI
+destination.transientQueue = direct://amq.direct//testQueueT
+destination.durableQueue = direct://amq.direct//testQueueD?durable='true'
+
+destination.transientTopic = topic://amq.topic//testTopicT
+#destination.durableTopic = topic://amq.topic//testTopicD?durable='true'
+
+destination.fanoutQueue = fanout://amq.fanout//fanoutQueue \ No newline at end of file
diff --git a/RC9/qpid/java/testkit/etc/test.log4j b/RC9/qpid/java/testkit/etc/test.log4j
new file mode 100644
index 0000000000..b574a7b5b7
--- /dev/null
+++ b/RC9/qpid/java/testkit/etc/test.log4j
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+log4j.rootLogger=${root.logging.level}
+
+log4j.logger.org.apache.qpid=ERROR, console
+log4j.additivity.org.apache.qpid=false
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.Threshold=all
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
+
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java
new file mode 100644
index 0000000000..8b7b7fa434
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java
@@ -0,0 +1,64 @@
+package org.apache.qpid.testkit;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class MessageFactory
+{
+ public static Message createBytesMessage(Session ssn, int size) throws JMSException
+ {
+ BytesMessage msg = ssn.createBytesMessage();
+ msg.writeBytes(createMessagePayload(size).getBytes());
+ return msg;
+ }
+
+ public static Message createTextMessage(Session ssn, int size) throws JMSException
+ {
+ TextMessage msg = ssn.createTextMessage();
+ msg.setText(createMessagePayload(size));
+ return msg;
+ }
+
+ public static String createMessagePayload(int size)
+ {
+ String msgData = "Qpid Test Message";
+
+ StringBuffer buf = new StringBuffer(size);
+ int count = 0;
+ while (count <= (size - msgData.length()))
+ {
+ buf.append(msgData);
+ count += msgData.length();
+ }
+ if (count < size)
+ {
+ buf.append(msgData, 0, size - count);
+ }
+
+ return buf.toString();
+ }
+}
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
new file mode 100644
index 0000000000..35a2374fbc
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java
@@ -0,0 +1,332 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.perf;
+
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.testkit.MessageFactory;
+
+/**
+ * Latency test sends an x number of messages in warmup mode and wait for a confirmation
+ * from the consumer that it has successfully consumed them and ready to start the
+ * test. It will start sending y number of messages and each message will contain a time
+ * stamp. This will be used at the receiving end to measure the latency.
+ *
+ * It is important to have a sufficiently large number for the warmup count to
+ * ensure the system is in steady state before the test is started.
+ *
+ * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000)
+ * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1
+ *
+ * The idea is to get a latency sample for the system once it achieves steady state.
+ *
+ */
+
+public class LatencyTest extends PerfBase implements MessageListener
+{
+ MessageProducer producer;
+ MessageConsumer consumer;
+ Message msg;
+ byte[] payload;
+ long maxLatency = 0;
+ long minLatency = Long.MAX_VALUE;
+ long totalLatency = 0; // to calculate avg latency.
+ int rcvdMsgCount = 0;
+ double stdDev = 0;
+ double avgLatency = 0;
+ boolean warmup_mode = true;
+ boolean transacted = false;
+ int transSize = 0;
+
+ final List<Long> latencies;
+ final Lock lock = new ReentrantLock();
+ final Condition warmedUp;
+ final Condition testCompleted;
+
+ public LatencyTest()
+ {
+ super();
+ warmedUp = lock.newCondition();
+ testCompleted = lock.newCondition();
+ // Storing the following two for efficiency
+ transacted = params.isTransacted();
+ transSize = params.getTransactionSize();
+ latencies = new ArrayList <Long>(params.getMsgCount());
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ consumer = session.createConsumer(dest);
+ consumer.setMessageListener(this);
+
+ // if message caching is enabled we pre create the message
+ // else we pre create the payload
+ if (params.isCacheMessage())
+ {
+ msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg.setJMSDeliveryMode(params.isDurable()?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ }
+ else
+ {
+ payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ }
+
+ producer = session.createProducer(dest);
+ producer.setDisableMessageID(params.isDisableMessageID());
+ producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+ }
+
+ protected Message getNextMessage() throws Exception
+ {
+ if (params.isCacheMessage())
+ {
+ return msg;
+ }
+ else
+ {
+ msg = session.createBytesMessage();
+ ((BytesMessage)msg).writeBytes(payload);
+ return msg;
+ }
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+ int count = params.getWarmupCount();
+ for (int i=0; i < count; i++)
+ {
+ producer.send(getNextMessage());
+ }
+ Message msg = session.createTextMessage("End");
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ try
+ {
+ lock.lock();
+ warmedUp.await();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+ {
+ if (warmup_mode)
+ {
+ warmup_mode = false;
+ try
+ {
+ lock.lock();
+ warmedUp.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ else
+ {
+ computeStats();
+ }
+ }
+ else if (!warmup_mode)
+ {
+ long time = System.currentTimeMillis();
+ rcvdMsgCount ++;
+
+ if (transacted && (rcvdMsgCount % transSize == 0))
+ {
+ session.commit();
+ }
+
+ long latency = time - msg.getJMSTimestamp();
+ latencies.add(latency);
+ totalLatency = totalLatency + latency;
+ }
+
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when receiving messages");
+ }
+
+ }
+
+ private void computeStats()
+ {
+ avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+ double sigma = 0;
+
+ for (long latency: latencies)
+ {
+ maxLatency = Math.max(maxLatency, latency);
+ minLatency = Math.min(minLatency, latency);
+ sigma = sigma + Math.pow(latency - avgLatency,2);
+ }
+
+ stdDev = Math.sqrt(sigma/(rcvdMsgCount -1));
+
+ try
+ {
+ lock.lock();
+ testCompleted.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public void writeToFile() throws Exception
+ {
+ String fileName = System.getProperty("file");
+ PrintWriter writer = new PrintWriter(new FileOutputStream(fileName));
+ for (long latency: latencies)
+ {
+ writer.println(String.valueOf(latency));
+ }
+ writer.flush();
+ writer.close();
+ }
+
+ public void printToConsole()
+ {
+ System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
+ System.out.println(new StringBuilder("Standard Deviation : ").
+ append(df.format(stdDev)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Avg Latency : ").
+ append(df.format(avgLatency)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Min Latency : ").
+ append(minLatency).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Max Latency : ").
+ append(maxLatency).
+ append(" ms").toString());
+ System.out.println("Completed the test......\n");
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ int count = params.getMsgCount();
+
+ for(int i=0; i < count; i++ )
+ {
+ Message msg = getNextMessage();
+ msg.setJMSTimestamp(System.currentTimeMillis());
+ producer.send(msg);
+ if ( transacted && ((i+1) % transSize == 0))
+ {
+ session.commit();
+ }
+ }
+ Message msg = session.createTextMessage("End");
+ producer.send(msg);
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ }
+
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ lock.lock();
+ testCompleted.await();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+
+ producer.close();
+ consumer.close();
+ session.close();
+ con.close();
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+ LatencyTest latencyTest = new LatencyTest();
+ latencyTest.test();
+ latencyTest.printToConsole();
+ if (System.getProperty("file") != null)
+ {
+ try
+ {
+ latencyTest.writeToFile();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java
new file mode 100644
index 0000000000..95670f0507
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.perf;
+
+import java.text.DecimalFormat;
+import java.util.Hashtable;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+public class PerfBase
+{
+ TestParams params;
+ Connection con;
+ Session session;
+ Destination dest;
+ Destination feedbackDest;
+ DecimalFormat df = new DecimalFormat("###.##");
+
+ public PerfBase()
+ {
+ params = new TestParams();
+ }
+
+ public void setUp() throws Exception
+ {
+ Hashtable<String,String> env = new Hashtable<String,String>();
+ env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory());
+ env.put(Context.PROVIDER_URL, params.getProviderURL());
+
+ Context ctx = null;
+ try
+ {
+ ctx = new InitialContext(env);
+ }
+ catch(Exception e)
+ {
+ throw new Exception("Error initializing JNDI",e);
+
+ }
+
+ ConnectionFactory conFac = null;
+ try
+ {
+ conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory());
+ }
+ catch(Exception e)
+ {
+ throw new Exception("Error looking up connection factory",e);
+ }
+
+ con = conFac.createConnection();
+ con.start();
+ session = con.createSession(params.isTransacted(),
+ params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
+
+ try
+ {
+ dest = (Destination)ctx.lookup( params.isDurable()?
+ params.getDurableDestination():
+ params.getTransientDestination()
+ );
+ }
+ catch(Exception e)
+ {
+ throw new Exception("Error looking up destination",e);
+ }
+ }
+
+ public void handleError(Exception e,String msg)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg);
+ sb.append(" ");
+ sb.append(e.getMessage());
+ System.err.println(sb.toString());
+ e.printStackTrace();
+ }
+}
+
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
new file mode 100644
index 0000000000..cd12c7010d
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.perf;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+/**
+ * PerfConsumer will receive x no of messages in warmup mode.
+ * Once it receives the Start message it will then signal the PerfProducer.
+ * It will start recording stats from the first message it receives after
+ * the warmup mode is done.
+ *
+ * The following calculations are done.
+ * The important numbers to look at is
+ * a) Avg Latency
+ * b) System throughput.
+ *
+ * Latency.
+ * =========
+ * Currently this test is written with the assumption that either
+ * a) The Perf Producer and Consumer are on the same machine
+ * b) They are on separate machines that have their time synced via a Time Server
+ *
+ * In order to calculate latency the producer inserts a timestamp
+ * hen the message is sent. The consumer will note the current time the message is
+ * received and will calculate the latency as follows
+ * latency = rcvdTime - msg.getJMSTimestamp()
+ *
+ * Through out the test it will keep track of the max and min latency to show the
+ * variance in latencies.
+ *
+ * Avg latency is measured by adding all latencies and dividing by the total msgs.
+ * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
+ *
+ * Throughput
+ * ===========
+ * System throughput is calculated as follows
+ * rcvdMsgCount/(rcvdTime - testStartTime)
+ *
+ * Consumer rate is calculated as
+ * rcvdMsgCount/(rcvdTime - startTime)
+ *
+ * Note that the testStartTime referes to when the producer sent the first message
+ * and startTime is when the consumer first received a message.
+ *
+ * rcvdTime keeps track of when the last message is received.
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ */
+
+public class PerfConsumer extends PerfBase implements MessageListener
+{
+ MessageConsumer consumer;
+ long maxLatency = 0;
+ long minLatency = Long.MAX_VALUE;
+ long totalLatency = 0; // to calculate avg latency.
+ int rcvdMsgCount = 0;
+ long testStartTime = 0; // to measure system throughput
+ long startTime = 0; // to measure consumer throughput
+ long rcvdTime = 0;
+ boolean transacted = false;
+ int transSize = 0;
+
+ final Object lock = new Object();
+
+ public PerfConsumer()
+ {
+ super();
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ consumer = session.createConsumer(dest);
+
+ // Storing the following two for efficiency
+ transacted = params.isTransacted();
+ transSize = params.getTransactionSize();
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+
+ boolean start = false;
+ while (!start)
+ {
+ Message msg = consumer.receive();
+ if (msg instanceof TextMessage)
+ {
+ if (((TextMessage)msg).getText().equals("End"))
+ {
+ start = true;
+ MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
+ temp.send(session.createMessage());
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ temp.close();
+ }
+ }
+ }
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ consumer.setMessageListener(this);
+ }
+
+ public void printResults() throws Exception
+ {
+ synchronized (lock)
+ {
+ lock.wait();
+ }
+
+ double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+ double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
+ double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
+ System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
+ System.out.println(new StringBuilder("Consumer rate : ").
+ append(df.format(consRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("System Throughput : ").
+ append(df.format(throughput)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Avg Latency : ").
+ append(df.format(avgLatency)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Min Latency : ").
+ append(minLatency).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Max Latency : ").
+ append(maxLatency).
+ append(" ms").toString());
+ System.out.println("Completed the test......\n");
+ }
+
+ public void notifyCompletion(Destination replyTo) throws Exception
+ {
+ MessageProducer tmp = session.createProducer(replyTo);
+ Message endMsg = session.createMessage();
+ tmp.send(endMsg);
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ tmp.close();
+ }
+
+ public void tearDown() throws Exception
+ {
+ consumer.close();
+ session.close();
+ con.close();
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+ {
+ notifyCompletion(msg.getJMSReplyTo());
+
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ else
+ {
+ rcvdTime = System.currentTimeMillis();
+ rcvdMsgCount ++;
+
+ if (rcvdMsgCount == 1)
+ {
+ startTime = rcvdTime;
+ testStartTime = msg.getJMSTimestamp();
+ }
+
+ if (transacted && (rcvdMsgCount % transSize == 0))
+ {
+ session.commit();
+ }
+
+ long latency = rcvdTime - msg.getJMSTimestamp();
+ maxLatency = Math.max(maxLatency, latency);
+ minLatency = Math.min(minLatency, latency);
+ totalLatency = totalLatency + latency;
+ }
+
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when receiving messages");
+ }
+
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ printResults();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ PerfConsumer cons = new PerfConsumer();
+ cons.test();
+ }
+} \ No newline at end of file
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
new file mode 100644
index 0000000000..757b1bfcda
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
@@ -0,0 +1,207 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.perf;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+
+import org.apache.qpid.testkit.MessageFactory;
+
+/**
+ * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
+ * from the consumer that it has successfully consumed them and ready to start the
+ * test. It will start sending y no of messages and each message will contain a time
+ * stamp. This will be used at the receiving end to measure the latency.
+ *
+ * This is done with the assumption that both consumer and producer are running on
+ * the same machine or different machines which have time synced using a time server.
+ *
+ * This test also calculates the producer rate as follows.
+ * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ * Rajith - Producer rate is not an accurate perf metric IMO.
+ * It is heavily inlfuenced by any in memory buffering.
+ * System throughput and latencies calculated by the PerfConsumer are more realistic
+ * numbers.
+ *
+ */
+public class PerfProducer extends PerfBase
+{
+ MessageProducer producer;
+ Message msg;
+ byte[] payload;
+
+ public PerfProducer()
+ {
+ super();
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ feedbackDest = session.createTemporaryQueue();
+
+ // if message caching is enabled we pre create the message
+ // else we pre create the payload
+ if (params.isCacheMessage())
+ {
+ msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg.setJMSDeliveryMode(params.isDurable()?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ }
+ else
+ {
+ payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ }
+
+ producer = session.createProducer(dest);
+ producer.setDisableMessageID(params.isDisableMessageID());
+ producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+ }
+
+ protected Message getNextMessage() throws Exception
+ {
+ if (params.isCacheMessage())
+ {
+ return msg;
+ }
+ else
+ {
+ msg = session.createBytesMessage();
+ ((BytesMessage)msg).writeBytes(payload);
+ return msg;
+ }
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+
+ for (int i=0; i < params.getWarmupCount() -1; i++)
+ {
+ producer.send(getNextMessage());
+ }
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ int count = params.getMsgCount();
+ boolean transacted = params.isTransacted();
+ int tranSize = params.getTransactionSize();
+
+ long start = System.currentTimeMillis();
+ for(int i=0; i < count; i++ )
+ {
+ Message msg = getNextMessage();
+ msg.setJMSTimestamp(System.currentTimeMillis());
+ producer.send(msg);
+ if ( transacted && ((i+1) % tranSize == 0))
+ {
+ session.commit();
+ }
+ }
+ long time = System.currentTimeMillis() - start;
+ double rate = ((double)count/(double)time)*1000;
+ System.out.println(new StringBuilder("Producer rate: ").
+ append(df.format(rate)).
+ append(" msg/sec").
+ toString());
+ }
+
+ public void waitForCompletion() throws Exception
+ {
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ System.out.println("Consumer has completed the test......");
+ }
+
+ public void tearDown() throws Exception
+ {
+ producer.close();
+ session.close();
+ con.close();
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ waitForCompletion();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+ PerfProducer prod = new PerfProducer();
+ prod.test();
+ }
+} \ No newline at end of file
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java
new file mode 100644
index 0000000000..15142cfced
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.perf;
+
+import javax.jms.Session;
+
+public class TestParams
+{
+ private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+
+ private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties";
+
+ private String connectionFactory = "connectionFactory";
+
+ private String transientDest = "transientQueue";
+
+ private String durableDest = "durableQueue";
+
+ private int msg_size = 512;
+
+ private int msg_type = 1; // not used yet
+
+ private boolean cacheMessage = true;
+
+ private boolean disableMessageID = false;
+
+ private boolean disableTimestamp = false;
+
+ private boolean durable = false;
+
+ private boolean transacted = false;
+
+ private int transaction_size = 1000;
+
+ private int ack_mode = Session.AUTO_ACKNOWLEDGE;
+
+ private int msg_count = 10;
+
+ private int warmup_count = 1;
+
+ public TestParams()
+ {
+ initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory);
+ providerURL = System.getProperty("java.naming.provider.url",providerURL);
+
+ transientDest = System.getProperty("transDest",transientDest);
+ durableDest = System.getProperty("durableDest",durableDest);
+
+ msg_size = Integer.getInteger("msg_size", 512);
+ msg_type = Integer.getInteger("msg_type",1);
+ cacheMessage = Boolean.getBoolean("cache_msg");
+ disableMessageID = Boolean.getBoolean("disableMessageID");
+ disableTimestamp = Boolean.getBoolean("disableTimestamp");
+ durable = Boolean.getBoolean("durable");
+ transacted = Boolean.getBoolean("transacted");
+ transaction_size = Integer.getInteger("trans_size",1000);
+ ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE);
+ msg_count = Integer.getInteger("msg_count",msg_count);
+ warmup_count = Integer.getInteger("warmup_count",warmup_count);
+ }
+
+ public int getAckMode()
+ {
+ return ack_mode;
+ }
+
+ public String getConnectionFactory()
+ {
+ return connectionFactory;
+ }
+
+ public String getTransientDestination()
+ {
+ return transientDest;
+ }
+
+ public String getDurableDestination()
+ {
+ return durableDest;
+ }
+
+ public String getInitialContextFactory()
+ {
+ return initialContextFactory;
+ }
+
+ public int getMsgCount()
+ {
+ return msg_count;
+ }
+
+ public int getMsgSize()
+ {
+ return msg_size;
+ }
+
+ public int getMsgType()
+ {
+ return msg_type;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public String getProviderURL()
+ {
+ return providerURL;
+ }
+
+ public boolean isTransacted()
+ {
+ return transacted;
+ }
+
+ public int getTransactionSize()
+ {
+ return transaction_size;
+ }
+
+ public int getWarmupCount()
+ {
+ return warmup_count;
+ }
+
+ public boolean isCacheMessage()
+ {
+ return cacheMessage;
+ }
+
+ public boolean isDisableMessageID()
+ {
+ return disableMessageID;
+ }
+
+ public boolean isDisableTimestamp()
+ {
+ return disableTimestamp;
+ }
+
+}
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java
new file mode 100644
index 0000000000..0c3a17b3d8
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java
@@ -0,0 +1,152 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.testkit.MessageFactory;
+
+public class BaseTest
+{
+ protected String host = "127.0.0.1";
+ protected int msg_size = 100;
+ protected int msg_count = 10;
+ protected int session_count = 1;
+ protected boolean durable = false;
+ protected String queue_name = "message_queue";
+ protected String exchange_name = "amq.direct";
+ protected String routing_key = "routing_key";
+ protected String contentType = "application/octet-stream";
+ protected int port = 5672;
+ protected String url;
+ protected Message[] msgArray;
+
+ protected AMQConnection con;
+ protected Destination dest = null;
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+
+ public BaseTest()
+ {
+ host = System.getProperty("host", "127.0.0.1");
+ port = Integer.getInteger("port", 5672);
+ msg_size = Integer.getInteger("msg_size", 100);
+ msg_count = Integer.getInteger("msg_count", 10);
+ session_count = Integer.getInteger("session_count", 1);
+ durable = Boolean.getBoolean("durable");
+ queue_name = System.getProperty("queue_name", "message_queue");
+ exchange_name = System.getProperty("exchange_name", "amq.direct");
+ routing_key = System.getProperty("routing_key", "routing_key");
+ contentType = System.getProperty("content_type","application/octet-stream");
+
+
+
+ url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'";
+ }
+
+ public void setUp()
+ {
+ try
+ {
+ con = new AMQConnection(url);
+ con.start();
+
+
+ if (exchange_name.equals("amq.topic"))
+ {
+ dest = new AMQTopic(new AMQShortString(exchange_name),
+ new AMQShortString(routing_key),
+ false, //auto-delete
+ null, //queue name
+ durable);
+ }
+ else
+ {
+ dest = new AMQQueue(new AMQShortString(exchange_name),
+ new AMQShortString(routing_key),
+ new AMQShortString(queue_name),
+ false, //exclusive
+ false, //auto-delete
+ durable);
+ }
+
+ // Create the session to setup the messages
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ if (msg_size == -1)
+ {
+ // This creates an array of 1000 messages from 500-1500 bytes
+ // During the tests a message will be picked randomly
+ msgArray = new Message[1000];
+ for (int i = 0; i < 1000; i++)
+ {
+ Message msg = (contentType.equals("text/plain")) ?
+ MessageFactory.createTextMessage(session,500 + i) :
+ MessageFactory.createBytesMessage(session, 500 + i);
+ msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ msgArray[i] = msg;
+ }
+ }
+ else
+ {
+ Message msg = (contentType.equals("text/plain")) ?
+ MessageFactory.createTextMessage(session, msg_size):
+ MessageFactory.createBytesMessage(session, msg_size);
+ msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ msgArray = new Message[]
+ { msg };
+ }
+
+ session.close();
+
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Error while setting up the test");
+ }
+ }
+
+ public void handleError(Exception e,String msg)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg);
+ sb.append(" @ ");
+ sb.append(df.format(new Date(System.currentTimeMillis())));
+ sb.append(" ");
+ sb.append(e.getMessage());
+ System.err.println(sb.toString());
+ e.printStackTrace();
+ }
+}
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
new file mode 100644
index 0000000000..a91d9e7e85
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * Test Description
+ * ================
+ * The difference between this test and the
+ * LongDurationConsumer is that each Session runs
+ * in it's own Thread and the ability to receive
+ * messages transactionally.
+ *
+ * All consumers will still share the same destination.
+ *
+ */
+public class MultiThreadedConsumer extends BaseTest
+{
+ protected final boolean transacted;
+
+ public MultiThreadedConsumer()
+ {
+ super();
+ transacted = Boolean.getBoolean("transacted");
+ // needed only to calculate throughput.
+ // If msg_count is different set it via -Dmsg_count
+ msg_count = 10;
+ }
+
+ /**
+ * Creates a Session and a consumer that runs in its
+ * own thread.
+ * It can also consume transactionally.
+ *
+ */
+ public void test()
+ {
+ try
+ {
+ for (int i = 0; i < session_count; i++)
+ {
+
+ final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ MessageConsumer consumer = session.createConsumer(dest);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+
+ private boolean startIteration = true;
+ private long startTime = 0;
+ private long iterations = 0;
+
+ public void onMessage(Message m)
+ {
+ try
+ {
+ long now = System.currentTimeMillis();
+ if (startIteration)
+ {
+ startTime = m.getJMSTimestamp();
+ startIteration = false;
+ }
+
+ if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
+ {
+ startIteration = true;
+ long totalIterationTime = now - startTime;
+ double throughput = ((double)msg_count/(double)totalIterationTime) * 1000;
+ long latencySample = now - m.getJMSTimestamp();
+ iterations++;
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(iterations).append(",").
+ append(nf.format(throughput)).append(",").append(latencySample);
+
+ System.out.println(sb.toString());
+
+ MessageProducer temp = session.createProducer(m.getJMSReplyTo());
+ Message controlMsg = session.createTextMessage();
+ temp.send(controlMsg);
+ if (transacted)
+ {
+ session.commit();
+ }
+ temp.close();
+ }
+ }
+ catch (JMSException e)
+ {
+ handleError(e,"Exception receiving messages");
+ }
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception creating a consumer");
+ }
+
+ }
+
+ });
+ t.setName("session-" + i);
+ t.start();
+ } // for loop
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception while setting up the test");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ MultiThreadedConsumer test = new MultiThreadedConsumer();
+ test.setUp();
+ test.test();
+ }
+
+}
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
new file mode 100644
index 0000000000..279e5ea0bf
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
@@ -0,0 +1,166 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import java.util.Random;
+import java.util.UUID;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * Test Description
+ * ================
+ *
+ * This test creats x number of sessions, where each session
+ * runs in it's own thread. Each session creates a producer
+ * and it's own feedback queue.
+ *
+ * A producer will send n-1 messages, followed by the n-th
+ * message which contains "End" in it's payload to signal
+ * that this is the last message message in the sequence.
+ * The end message has the feedback queue as it's replyTo.
+ * It will then listen on the feedback queue waiting for the
+ * confirmation and then sleeps for 1000 ms before proceeding
+ * with the next n messages.
+ *
+ * This hand shaking mechanism ensures that all of the
+ * messages sent are consumed by some consumer. This prevents
+ * the producers from saturating the broker especially when
+ * the consumers are slow.
+ *
+ * All producers send to a single destination
+ * If using transactions it's best to use smaller message count
+ * as the test only commits after sending all messages in a batch.
+ *
+ */
+
+public class MultiThreadedProducer extends SimpleProducer
+{
+ protected final boolean transacted;
+
+ public MultiThreadedProducer()
+ {
+ super();
+ transacted = Boolean.getBoolean("transacted");
+ }
+
+ public void test()
+ {
+ try
+ {
+ final int msg_count_per_session = msg_count/session_count;
+
+ for (int i = 0; i < session_count; i++)
+ {
+ final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ Thread t = new Thread(new Runnable()
+ {
+ private Random gen = new Random();
+
+ private Message getNextMessage()
+ {
+ if (msg_size == -1)
+ {
+ int index = gen.nextInt(1000);
+ return msgArray[index];
+ }
+ else
+ {
+ return msgArray[0];
+ }
+ }
+
+ public void run()
+ {
+ try
+ {
+ MessageProducer prod = session.createProducer(dest);
+ // this will ensure that the producer will not overun the consumer.
+ feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString(UUID
+ .randomUUID().toString()), new AMQShortString("control"));
+
+ MessageConsumer feedbackConsumer = session.createConsumer(feedbackQueue);
+
+ while (true)
+ {
+ for (int i = 0; i < msg_count_per_session; i++)
+ {
+ Message msg = getNextMessage();
+ msg.setJMSMessageID("ID:" + UUID.randomUUID());
+ prod.send(msg);
+ }
+
+ TextMessage m = session.createTextMessage("End");
+ m.setJMSReplyTo(feedbackQueue);
+ prod.send(m);
+
+ if (transacted)
+ {
+ session.commit();
+ }
+
+ System.out.println(df.format(System.currentTimeMillis()));
+ feedbackConsumer.receive();
+ if (transacted)
+ {
+ session.commit();
+ }
+ Thread.sleep(1000);
+ }
+
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception in producing message");
+ }
+
+ }
+
+ });
+ t.setName("session-" + i);
+ t.start();
+
+ }
+
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception while setting up the test");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ MultiThreadedProducer test = new MultiThreadedProducer();
+ test.setUp();
+ test.test();
+ }
+
+}
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
new file mode 100644
index 0000000000..c33f9ffbf2
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * Test Description
+ * ================
+ * This test will open x number of connections where each
+ * connection will create a session and a producer/consumer pair,
+ * and then send configurable no of messages.
+ * It will them sleep for configurable time interval and
+ * tear down the connections/sessions/consumers.
+ * It will then repeat the process again until the test is stopped.
+ *
+ * Purpose of the test
+ * ===================
+ * To find if the broker has leaks when cleaning resources.
+ * To find if the client has leaks with resources.
+ */
+public class ResourceLeakTest extends BaseTest
+{
+ protected int connection_count = 10;
+ protected long connection_idle_time = 5000;
+
+ public ResourceLeakTest()
+ {
+ super();
+ connection_count = Integer.getInteger("con_count",10);
+ connection_idle_time = Long.getLong("con_idle_time", 5000);
+ }
+
+ public void test()
+ {
+ try
+ {
+
+ AMQConnection[] cons = new AMQConnection[connection_count];
+ Session[] sessions = new Session[connection_count];
+ MessageConsumer[] msgCons = new MessageConsumer[connection_count];
+ MessageProducer [] msgProds = new MessageProducer[connection_count];
+ Destination dest = new AMQQueue(new AMQShortString(exchange_name),
+ new AMQShortString(routing_key),
+ new AMQShortString(queue_name),
+ true, //exclusive
+ true // auto delete
+ );
+
+ while (true)
+ {
+ for (int i = 0; i < connection_count; i++)
+ {
+ AMQConnection con = new AMQConnection(url);
+ con.start();
+ cons[i] = con;
+ Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sessions[i] = ssn;
+ MessageConsumer msgCon = ssn.createConsumer(dest);
+ msgCons[i] = msgCon;
+ MessageProducer msgProd = ssn.createProducer(dest);
+ msgProds[i] = msgProd;
+
+ BytesMessage msg = ssn.createBytesMessage();
+ msg.writeBytes("Test Msg".getBytes());
+
+ for (int j = 0; j < msg_count;j++)
+ {
+ msgProd.send(msg);
+ }
+
+ int j = 0;
+ while (j < msg_count)
+ {
+ msgCon.receive();
+ j++;
+ }
+ }
+ System.out.println(df.format(System.currentTimeMillis()));
+ Thread.sleep(connection_idle_time);
+
+ try
+ {
+ for (int i = 0; i < connection_count; i++)
+ {
+ msgCons[i].close();
+ msgProds[i].close();
+ sessions[i].close();
+ cons[i].close();
+ }
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception closing resources");
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception in setting up the test");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ ResourceLeakTest test = new ResourceLeakTest();
+ test.test();
+ }
+
+}
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
new file mode 100644
index 0000000000..b3eb97dafe
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * Test Description
+ * ================
+ * This test will create x number of sessions.
+ * Each session will have it's own consumer.
+ * Once a consumer receives the "End" message it
+ * will send a message to the destination indicated
+ * by the replyTo field in the End message.
+ * This will signal the producer that all the previous
+ * messages have been consumed. The producer will
+ * then start sending messages again.
+ *
+ * This prevents the producer from overruning the
+ * consumer.
+ * *
+ * All consumers share a single destination
+ *
+ */
+
+public class SimpleConsumer extends BaseTest
+{
+ public SimpleConsumer()
+ {
+ super();
+ //needed only to calculate throughput.
+ // If msg_count is different set it via -Dmsg_count
+ msg_count = 10;
+ }
+
+ public void test()
+ {
+ try
+ {
+ final Session[] sessions = new Session[session_count];
+ MessageConsumer[] cons = new MessageConsumer[session_count];
+
+ for (int i = 0; i < session_count; i++)
+ {
+ sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ cons[i] = sessions[i].createConsumer(dest);
+ cons[i].setMessageListener(new MessageListener()
+ {
+
+ private boolean startIteration = true;
+ private long startTime = 0;
+ private long iterations = 0;
+
+ public void onMessage(Message m)
+ {
+ try
+ {
+ long now = System.currentTimeMillis();
+ if (startIteration)
+ {
+ startTime = m.getJMSTimestamp();
+ startIteration = false;
+ }
+
+ if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
+ {
+
+ long totalIterationTime = now - startTime;
+ startIteration = true;
+ double throughput = ((double)msg_count/(double)totalIterationTime) * 1000;
+ long latencySample = now - m.getJMSTimestamp();
+ iterations++;
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(iterations).append(",").
+ append(nf.format(throughput)).append(",").append(latencySample);
+
+ System.out.println(sb.toString());
+
+ MessageProducer temp = sessions[0].createProducer(m.getJMSReplyTo());
+ Message controlMsg = sessions[0].createTextMessage();
+ temp.send(controlMsg);
+ temp.close();
+ }
+ }
+ catch (JMSException e)
+ {
+ handleError(e,"Exception when receiving the message");
+ }
+ }
+ });
+ }
+
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception when setting up the consumers");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ SimpleConsumer test = new SimpleConsumer();
+ test.setUp();
+ test.test();
+ }
+
+}
diff --git a/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
new file mode 100644
index 0000000000..1080092536
--- /dev/null
+++ b/RC9/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import java.util.Random;
+import java.util.UUID;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * Test Description
+ * ================
+ * This test will send n-1 messages, followed by the n-th
+ * message which contains "End" in it's payload to signal
+ * that this is the last message message in the sequence.
+ * The end message has the feedback queue as it's replyTo.
+ * It will then listen on the feedback queue waiting for the
+ * confirmation and then sleeps for 1000 ms before proceeding
+ * with the next n messages.
+ *
+ * This hand shaking mechanism ensures that all of the
+ * messages sent are consumed by some consumer. This prevents
+ * the producers from saturating the broker especially when
+ * the consumers are slow.
+ *
+ * It creates a producer per session.
+ * If session_count is > 1 it will round robin the messages
+ * btw the producers.
+ *
+ * All producers send to a single destination
+ *
+ */
+
+public class SimpleProducer extends BaseTest
+{
+ protected Destination feedbackQueue;
+ Random gen = new Random();
+
+ public SimpleProducer()
+ {
+ super();
+ }
+
+ protected Message getNextMessage()
+ {
+ if (msg_size == -1)
+ {
+ int index = gen.nextInt(1000);
+ return msgArray[index];
+ }
+ else
+ {
+ return msgArray[0];
+ }
+ }
+
+ public void test()
+ {
+ try
+ {
+ Session[] sessions = new Session[session_count];
+ MessageProducer[] prods = new MessageProducer[session_count];
+
+ for (int i = 0; i < session_count; i++)
+ {
+ sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ prods[i] = sessions[i].createProducer(dest);
+ }
+
+ // this will ensure that the producer will not overun the consumer.
+ feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"),
+ new AMQShortString(UUID.randomUUID().toString()),
+ new AMQShortString("control"));
+
+ MessageConsumer feedbackConsumer = sessions[0].createConsumer(feedbackQueue);
+
+ int prod_pointer = 0;
+ boolean multi_session = session_count > 1 ? true : false;
+
+ while (true)
+ {
+ for (int i = 0; i < msg_count - 1; i++)
+ {
+ Message msg = getNextMessage();
+ msg.setJMSTimestamp(System.currentTimeMillis());
+ prods[prod_pointer].send(msg);
+ if (multi_session)
+ {
+ prod_pointer++;
+ if (prod_pointer == session_count)
+ {
+ prod_pointer = 0;
+ }
+ }
+ }
+
+ TextMessage m = sessions[0].createTextMessage("End");
+ m.setJMSReplyTo(feedbackQueue);
+ prods[prod_pointer].send(m);
+ System.out.println(df.format(System.currentTimeMillis()));
+ feedbackConsumer.receive();
+ Thread.sleep(1000);
+ }
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception while setting up the producer");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ SimpleProducer test = new SimpleProducer();
+ test.setUp();
+ test.test();
+ }
+
+}