summaryrefslogtreecommitdiff
path: root/java/tools
diff options
context:
space:
mode:
Diffstat (limited to 'java/tools')
-rwxr-xr-xjava/tools/bin/Profile-run-from-source71
-rwxr-xr-xjava/tools/bin/check-qpid-java-env38
-rw-r--r--java/tools/bin/controller132
-rwxr-xr-xjava/tools/bin/perf_report.sh (renamed from java/tools/bin/perf-report)45
-rw-r--r--[-rwxr-xr-x]java/tools/bin/qpid-bench16
-rwxr-xr-xjava/tools/bin/qpid-python-testkit11
-rw-r--r--[-rwxr-xr-x]java/tools/bin/run_pub.sh (renamed from java/tools/bin/run-sub)14
-rw-r--r--[-rwxr-xr-x]java/tools/bin/run_sub.sh (renamed from java/tools/bin/run-pub)9
-rw-r--r--java/tools/bin/set-testkit-env.sh88
-rw-r--r--java/tools/bin/setenv.sh49
-rw-r--r--java/tools/bin/start-consumers119
-rw-r--r--java/tools/bin/start-producers136
-rw-r--r--java/tools/etc/perf-report.gnu42
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/Clock.java92
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java12
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java166
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java224
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java256
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java422
-rw-r--r--java/tools/src/main/java/org/apache/qpid/tools/TestParams.java70
20 files changed, 376 insertions, 1636 deletions
diff --git a/java/tools/bin/Profile-run-from-source b/java/tools/bin/Profile-run-from-source
deleted file mode 100755
index f8ec45ccff..0000000000
--- a/java/tools/bin/Profile-run-from-source
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Sets the environment for running the scripts from a source checkout.
-txtbld=$(tput bold) # Bold
-txtrst=$(tput sgr0) # Reset
-txtred=$(tput setaf 1) # red
-txtgreen=$(tput setaf 2) # green
-
-echo "${txtbld}Setting the environment to run qpid java tools from a source checkout${txtrst}"
-
-abs_path()
-{
- D=`dirname "$1"`
- echo "`cd \"$D\" 2>/dev/null && pwd`"
-}
-
-export QPID_CHECKOUT=`abs_path "../../../../"`
-echo "${txtgreen}Using source checkout at $QPID_CHECKOUT${txtrst}"
-
-export PATH=$QPID_CHECKOUT/java/tools/bin:$PATH
-
-if [ "$JAVA" = "" ] ; then
- export JAVA=$(which java)
-fi
-
-#------------- Required for perf_report, qpid-bench & qpid-python-testkit ----------------
-
-export VENDOR_LIB=$QPID_CHECKOUT/java/build/lib
-export CLASSPATH=`find $VENDOR_LIB -name '*.jar' | tr '\n' ':'`
-export LOG_CONFIG="-Dlog4j.configuration=file:///$QPID_CHECKOUT/java/tools/etc/test.log4j"
-
-
-#------------- Required for qpid-python-testkit -----------------------------------------
-
-PYTHONPATH=$QPID_CHECKOUT/python/qpid:$QPID_CHECKOUT/cpp/src/test/brokertest.py:$PYTHONPATH
-export PATH=$QPID_CHECKOUT/python:$PATH
-
-if [ -x $QPID_CHECKOUT/cpp/src/qpidd ]; then
- QPIDD_EXEC=$QPID_CHECKOUT/cpp/src/qpidd
-else
- echo "${txtred}WARNING: Qpid CPP broker executable not found. You will not be able to run qpid-python-testkit${txtrst}"
-fi
-
-if [ -x $QPID_CHECKOUT/cpp/src/.libs/cluster.so ]; then
- CLUSTER_LIB=$QPID_CHECKOUT/cpp/src/.libs/cluster.so
-else
- echo "${txtred}WARNING: Qpid cluster.so not found.You will not be able to run qpid-python-testkit${txtrst}"
-fi
-
-if [ "$STORE_LIB" = "" ] ; then
- echo "${txtred}WARNING: Please point the STORE_LIB variable to the message store module. If not persistence tests will not write messages to disk.${txtrst}"
-fi
-
-export PYTHONPATH QPIDD_EXEC CLUSTER_LIB
diff --git a/java/tools/bin/check-qpid-java-env b/java/tools/bin/check-qpid-java-env
deleted file mode 100755
index dedd6e06ea..0000000000
--- a/java/tools/bin/check-qpid-java-env
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-if [ -z "$LOG_CONFIG" ]; then
- echo "Please set the appropriate parameters for logging as it may affect performance. Ex log4j defaults to DEBUG if not configured properly"
- exit -1
-fi
-
-if [ -z "$JAVA_MEM" ]; then
- JAVA_MEM=-Xmx1024m
-fi
-
-if [ -z "$JAVA" ]; then
- echo "Please set the path to the correct java executable to JAVA"
- exit -1
-fi
-
-if [ -z "$CLASSPATH" ]; then
- echo "Please set the $CLASSPATH variable to point to the jar/class files"
- exit -1
-fi
diff --git a/java/tools/bin/controller b/java/tools/bin/controller
deleted file mode 100644
index fab8614039..0000000000
--- a/java/tools/bin/controller
+++ /dev/null
@@ -1,132 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This starts the controller for coordinating perf tests/
-
-. check-qpid-java-env
-
-PROGRAM_NAME=controller
-CONSUMER_COUNT=1
-PRODUCER_COUNT=1
-DURATION=-1
-TEST_NAME="TEST_NAME"
-EXTRA_JVM_ARGS=""
-
-TEMP=$(getopt -n $PROGRAM_NAME -o c:p:d:n:a:h --long consumers:,producers:,jvm-args:help -- "$@")
-
-usage()
-{
- printf "\n%s\n" "Usage: controller [option].."
-
- printf "\n%31s\n%52s\n" "-c, --consumer-count=count" "No of consumers participating in the test"
-
- printf "\n%31s\n%52s\n" "-p, --producer-count=count" "No of producers participating in the test"
-
- printf "\n%24s\n%94s\n" "-d, --duration=mins" "The duration of the test in mins. If not specified, it will just run one iteration."
-
- printf "\n%27s\n%32s\n" "-n, --name=<test-name>" "The name of the test."
-
- printf "\n%19s\n%50s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify"
-}
-
-eval set -- "$TEMP"
-while true; do
- case $1 in
- -c|--consumer-count)
- CONSUMER_COUNT="$2"; shift; shift; continue
- ;;
- -p|--producer-count)
- PRODUCER_COUNT="$2"; shift; shift; continue
- ;;
- -d|--duration)
- DURATION="$2"; shift; shift; continue
- ;;
- -n|--name)
- TEST_NAME="$2"; shift; shift; continue
- ;;
- -h|--help)
- usage
- exit 0
- ;;
- -a|--jvm-args)
- EXTRA_JVM_ARGS="$2"; shift; shift; continue
- ;;
- --)
- # no more arguments to parse
- break
- ;;
- *)
- # no more arguments to parse
- break
- ;;
- esac
-done
-
-CONTROLLER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dprod_count=$PRODUCER_COUNT -Dcons_count=$CONSUMER_COUNT -Dprint_std_dev=true -Dduration=${DURATION}"
-
-
-waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
-cleanup()
-{
- pids=`ps aux | grep java | grep PerfTestController | awk '{print $2}'`
- if [ "$pids" != "" ]; then
- kill -3 $pids
- kill -9 $pids >/dev/null 2>&1
- fi
-}
-
-run_controller()
-{
- TEST_ARGS="$LOG_CONFIG $JAVA_MEM $CONTROLLER_ARGS $EXTRA_JVM_ARGS"
- echo "Running controller with : $TEST_ARGS" > test.out
- $JAVA -cp $CLASSPATH $TEST_ARGS org.apache.qpid.tools.PerfTestController >> test.out &
- waitfor test.out "Controller: Completed the test"
- sleep 2 #give a grace period to shutdown
- print_result $TEST_NAME
-}
-
-print_result()
-{
- prod_rate=`cat test.out | grep "Avg Producer rate" | awk '{print $5}'`
- sys_rate=`cat test.out | grep "System Throughput" | awk '{print $4}'`
- cons_rate=`cat test.out | grep "Avg Consumer rate" | awk '{print $5}'`
- avg_latency=`cat test.out | grep "Avg System Latency" | awk '{print $5}'`
- min_latency=`cat test.out | grep "Min System Latency" | awk '{print $5}'`
- max_latency=`cat test.out | grep "Max System Latency" | awk '{print $5}'`
- std_dev=`cat test.out | grep "Avg System Std Dev" | awk '{print $6}'`
-
- printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|%7.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency $std_dev
- echo "--------------------------------------------------------------------------------------------------------"
-}
-
-trap cleanup EXIT
-
-rm -rf *.out
-
-if [ "$DURATION" = -1 ]; then
- echo "Test report on " `date +%F`
- echo "========================================================================================================"
- echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|Std Dev|"
- echo "--------------------------------------------------------------------------------------------------------"
-else
- echo "Test in progress....Tail stats-csv.log to see results being printed for each iteration."
-fi
-
-run_controller
diff --git a/java/tools/bin/perf-report b/java/tools/bin/perf_report.sh
index 7de3f2b602..e6b4c987e5 100755
--- a/java/tools/bin/perf-report
+++ b/java/tools/bin/perf_report.sh
@@ -18,19 +18,23 @@
# under the License.
#
-# This will run the following test cases defined below and produce
-# a report in tabular format.
+# This will run the 8 use cases defined below and produce
+# a report in tabular format. Refer to the documentation
+# for more details.
+SUB_MEM=-Xmx1024M
+PUB_MEM=-Xmx1024M
+LOG_CONFIG="-Damqj.logging.level=WARN"
QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}"
DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}"
TOPIC="amq.topic/test"
DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}"
-COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"
+. setenv.sh
waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
cleanup()
-{
+{
pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
if [ "$pids" != "" ]; then
kill -3 $pids
@@ -42,31 +46,30 @@ cleanup()
# $2 consumer options
# $3 producer options
run_testcase()
-{
- sh run-sub $COMMON_CONFIG $2 > sub.out &
- sh run-pub $COMMON_CONFIG $3 > pub.out &
- waitfor pub.out "Controller: Completed the test"
+{
+ sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out &
+ waitfor sub.out "Warming up"
+ sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out &
+ waitfor sub.out "Completed the test"
+ waitfor pub.out "Consumer has completed the test"
sleep 2 #give a grace period to shutdown
- print_result $1
- mv pub.out $1.pub.out
- mv sub.out $1.sub.out
+ print_result $1
}
print_result()
{
- prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'`
- sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'`
- cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'`
- avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'`
- min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'`
- max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'`
-
- printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
+ prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'`
+ sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'`
+ cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'`
+ avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'`
+ min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'`
+ max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'`
+
+ printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
echo "------------------------------------------------------------------------------------------------"
}
trap cleanup EXIT
-rm -rf *.out #cleanup old files.
echo "Test report on " `date +%F`
echo "================================================================================================"
@@ -79,7 +82,7 @@ echo "--------------------------------------------------------------------------
# setting very low values to start with and experiment while increasing them slowly.
# Test 1 Trans Queue
-run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10"
+#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10"
# Test 2 Dura Queue
run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
diff --git a/java/tools/bin/qpid-bench b/java/tools/bin/qpid-bench
index cd894b607f..c982e64efd 100755..100644
--- a/java/tools/bin/qpid-bench
+++ b/java/tools/bin/qpid-bench
@@ -18,6 +18,18 @@
# under the License.
#
-. check-qpid-java-env
+if [ -z "$QPID_HOME" ]; then
+ export QPID_HOME=$(dirname $(dirname $(readlink -f $0)))
+ export PATH=${PATH}:${QPID_HOME}/bin
+fi
-$JAVA -cp $CLASSPATH -server $JAVA_MEM $LOG_CONFIG org.apache.qpid.tools.QpidBench "$@"
+# Set classpath to include Qpid jar with all required jars in manifest
+QPID_LIBS=$QPID_HOME/lib/qpid-all.jar
+
+# Set other variables used by the qpid-run script before calling
+export JAVA=java \
+ JAVA_VM=-server \
+ JAVA_MEM=-Xmx1024m \
+ QPID_CLASSPATH=$QPID_LIBS
+
+. qpid-run org.apache.qpid.tools.QpidBench "$@"
diff --git a/java/tools/bin/qpid-python-testkit b/java/tools/bin/qpid-python-testkit
index 7233d0d075..cbe7972421 100755
--- a/java/tools/bin/qpid-python-testkit
+++ b/java/tools/bin/qpid-python-testkit
@@ -22,12 +22,9 @@
# via the python test runner. The defaults are set for a running
# from an svn checkout
-. check-qpid-java-env
+. ./set-testkit-env.sh
export PYTHONPATH=./:$PYTHONPATH
-echo $PYTHONPATH
-if [ "$OUTDIR" = "" ] ; then
- OUTDIR=$PWD
-fi
-testdir=$OUTDIR/testkit-out-`date +%F-%H-%M-%S`
-qpid-python-test -m testkit -DOUTDIR=$testdir"$@"
+rm -rf $OUTDIR
+qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@"
+
diff --git a/java/tools/bin/run-sub b/java/tools/bin/run_pub.sh
index 8449563f7f..91b9287dea 100755..100644
--- a/java/tools/bin/run-sub
+++ b/java/tools/bin/run_pub.sh
@@ -18,15 +18,7 @@
# under the License.
#
-. check-qpid-java-env
-
-echo "All args $@"
-
-JVM_ARGS="$1"
-PROGRAM_ARGS="$2"
-
-echo "JVM ARGS : $JAVA_MEM $JVM_ARGS"
-echo "PROGRAM ARGS : $PROGRAM_ARGS"
-
-$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfConsumer $PROGRAM_ARGS
+. $QPID_TEST_HOME/bin/setenv.sh
+echo "$@"
+$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfProducer
diff --git a/java/tools/bin/run-pub b/java/tools/bin/run_sub.sh
index 9efe58c4b8..c9ad2fed74 100755..100644
--- a/java/tools/bin/run-pub
+++ b/java/tools/bin/run_sub.sh
@@ -18,11 +18,8 @@
# under the License.
#
-. check-qpid-java-env
+. $QPID_TEST_HOME/bin/setenv.sh
-JVM_ARGS="$1"
-PROGRAM_ARGS="$2"
+echo "$@"
+$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfConsumer
-echo "JVM ARGS : $JAVA_MEM $JVM_ARGS"
-echo "PROGRAM ARGS : $PROGRAM_ARGS"
-$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfProducer $PROGRAM_ARGS
diff --git a/java/tools/bin/set-testkit-env.sh b/java/tools/bin/set-testkit-env.sh
new file mode 100644
index 0000000000..051dad8179
--- /dev/null
+++ b/java/tools/bin/set-testkit-env.sh
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# If QPIDD_EXEC ..etc is not set, it will first check to see
+# if this is run from a qpid svn check out, if not it will look
+# for installed rpms.
+
+abs_path()
+{
+ D=`dirname "$1"`
+ B=`basename "$1"`
+ echo "`cd \"$D\" 2>/dev/null && pwd || echo \"$D\"`/$B"
+}
+
+# Environment for python tests
+
+if [ -d ../../../python ] ; then
+ PYTHON_DIR=../../../python
+ PYTHONPATH=$PYTHON_DIR:$PYTHON_DIR/qpid
+elif [ -z `echo $PYTHONPATH | awk '$0 ~ /qpid/'` ]; then
+ echo "WARNING: skipping test, no qpid python scripts found ."; exit 0;
+fi
+
+
+if [ "$QPIDD_EXEC" = "" ] ; then
+ if [ -x ../../../cpp/src/qpidd ]; then
+ QPIDD_EXEC=`abs_path "../../../cpp/src/qpidd"`
+ elif [ -n "$(which qpidd)" ] ; then
+ QPIDD_EXEC=$(which qpidd)
+ else
+ echo "WARNING: skipping test, QPIDD_EXEC not set and qpidd not found."; exit 0;
+ fi
+fi
+
+if [ "$CLUSTER_LIB" = "" ] ; then
+ if [ -x ../../../cpp/src/.libs/cluster.so ]; then
+ CLUSTER_LIB=`abs_path "../../../cpp/src/.libs/cluster.so"`
+ elif [ -e /usr/lib64/qpid/daemon/cluster.so ] ; then
+ CLUSTER_LIB="/usr/lib64/qpid/daemon/cluster.so"
+ elif [ -e /usr/lib/qpid/daemon/cluster.so ] ; then
+ CLUSTER_LIB="/usr/lib/qpid/daemon/cluster.so"
+ else
+ echo "WARNING: skipping test, CLUSTER_LIB not set and cluster.so not found."; exit 0;
+ fi
+fi
+
+if [ "$STORE_LIB" = "" ] ; then
+ if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then
+ STORE_LIB="/usr/lib64/qpid/daemon/msgstore.so"
+ elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then
+ STORE_LIB="/usr/lib/qpid/daemon/msgstore.so"
+ #else
+ # echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0;
+ fi
+fi
+
+if [ "$QP_CP" = "" ] ; then
+ if [ -d ../../build/lib/ ]; then
+ QP_JAR_PATH=`abs_path "../../build/lib/"`
+ elif [ -d /usr/share/java/qpid-deps ]; then
+ QP_JAR_PATH=`abs_path "/usr/share/java"`
+ else
+ "WARNING: skipping test, QP_CP not set and the Qpid jars are not present."; exit 0;
+ fi
+ QP_CP=`find $QP_JAR_PATH -name '*.jar' | tr '\n' ':'`
+fi
+
+if [ "$OUTDIR" = "" ] ; then
+ OUTDIR=`abs_path "./output"`
+fi
+
+export PYTHONPATH PYTHON_DIR QPIDD_EXEC CLUSTER_LIB QP_CP OUTDIR
diff --git a/java/tools/bin/setenv.sh b/java/tools/bin/setenv.sh
new file mode 100644
index 0000000000..24135e711b
--- /dev/null
+++ b/java/tools/bin/setenv.sh
@@ -0,0 +1,49 @@
+#!/bin/sh
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Compiles the test classes and sets the CLASSPATH
+
+# check for QPID_TEST_HOME
+if [ "$QPID_TEST_HOME" = "" ] ; then
+ echo "ERROR: Please set QPID_TEST_HOME ...."
+ exit 1
+fi
+
+# check for JAVA_HOME
+if [ "$JAVA_HOME" = "" ] ; then
+ echo "ERROR: Please set JAVA_HOME ...."
+ exit 1
+fi
+
+# VENDOR_LIB path needs to be set
+# for Qpid set this to {qpid_checkout}/java/build/lib
+if [ "$VENDOR_LIB" = "" ] ; then
+ echo "ERROR: Please set VENDOR_LIB path in the script ...."
+ exit 1
+fi
+
+
+[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes
+
+CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"`
+$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'`
+
+export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH
+
diff --git a/java/tools/bin/start-consumers b/java/tools/bin/start-consumers
deleted file mode 100644
index c71fc0c21f..0000000000
--- a/java/tools/bin/start-consumers
+++ /dev/null
@@ -1,119 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This starts the controller for coordinating perf tests/
-
-. check-qpid-java-env
-
-PROGRAM_NAME="start-consumers"
-PROCESS_COUNT=1
-CON_COUNT=1
-MSG_COUNT=10000
-ADDRESS="queue;{create:always}"
-UNIQUE_DEST="false"
-
-EXTRA_JVM_ARGS=" -Dmax_prefetch=500 "
-
-TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'`
-
-TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\
- --long connection-count:,process-count:,create-unique-queues-topics,\
-jvm-args:,queue:,topic:,address:,\
-msg-count:,help -- "$@")
-
-usage()
-{
- printf "\n%s\n" "Usage: start-producers [option].."
-
- printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test"
-
- printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test"
-
- printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic"
-
- printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue"
-
- printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic"
-
- printf "\n%13s\n%44s\n" "--address" "The address you want to publish to"
-
- printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)"
-
- printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify"
-}
-
-eval set -- "$TEMP"
-while true; do
- case $1 in
- -C|--connection-count)
- CON_COUNT="$2"; shift; shift; continue
- ;;
- -P|--process-count)
- PROCESS_COUNT="$2"; shift; shift; continue
- ;;
- -u|--create-unique-queues-topics)
- UNIQUE_DEST="true"; shift; continue
- ;;
- --queue)
- ADDRESS="$2;{create: always}"; shift; shift; continue
- ;;
- --topic)
- ADDRESS="amq.topic/$2"; shift; shift; continue
- ;;
- --address)
- ADDRESS="$2"; shift; shift; continue
- ;;
- -h|--help)
- usage
- exit 0
- ;;
- -a|--jvm-args)
- EXTRA_JVM_ARGS="$2"; shift; shift; continue
- ;;
- -c|--msg-count)
- MSG_COUNT="$2"; shift; shift; continue
- ;;
- --)
- # no more arguments to parse
- break
- ;;
- *)
- # no more arguments to parse
- break
- ;;
- esac
-done
-
-CONSUMER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dcon_count=$CON_COUNT -Dprint_std_dev=true"
-
-start_consumers()
-{
- for ((i=0; i<$PROCESS_COUNT; i++))
- do
- if [ "$UNIQUE_DEST" = "true" ]; then
- sh run-sub "$CONSUMER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.sub.out 2>&1 &
- else
- sh run-sub "$CONSUMER_ARGS $@" > ${TEST_ID}_$i.sub.out 2>&1 &
- fi
- done
-}
-
-start_consumers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS"
-
diff --git a/java/tools/bin/start-producers b/java/tools/bin/start-producers
deleted file mode 100644
index 7ba0286f7c..0000000000
--- a/java/tools/bin/start-producers
+++ /dev/null
@@ -1,136 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This starts the controller for coordinating perf tests/
-
-. check-qpid-java-env
-
-PROGRAM_NAME="start-producers"
-PROCESS_COUNT=1
-CON_COUNT=1
-MSG_TYPE="bytes"
-WARMUP_MSG_COUNT=1000
-MSG_COUNT=10000
-MSG_SIZE=1024
-ADDRESS="queue;{create:always}"
-UNIQUE_DEST="false"
-
-EXTRA_JVM_ARGS=""
-TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'`
-
-TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\
- --long connection-count:,process-count:,create-unique-queues-topics,\
-jvm-args:,queue:,topic:,address:,\
-msg-count:,msg-size:msg-type:,warmup-msg-count,help -- "$@")
-
-usage()
-{
- printf "\n%s\n" "Usage: start-producers [option].."
-
- printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test"
-
- printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test"
-
- printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic"
-
- printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue"
-
- printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic"
-
- printf "\n%13s\n%44s\n" "--address" "The address you want to publish to"
-
- printf "\n%23s\n%37s\n" "-s, --msg-size=size" "message size (default 1024)"
-
- printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)"
-
- printf "\n%18s\n%38s\n" "-t, --msg-type" "{bytes|text} (default bytes)"
-
- printf "\n%26s\n%49s\n" "-w, --warmup-msg-count" "warm up message count (default 100,000)"
-
- printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify"
-}
-
-eval set -- "$TEMP"
-while true; do
- case $1 in
- -C|--connection-count)
- CON_COUNT="$2"; shift; shift; continue
- ;;
- -P|--process-count)
- PROCESS_COUNT="$2"; shift; shift; continue
- ;;
- -u|--create-unique-queues-topics)
- UNIQUE_DEST="true"; shift; continue
- ;;
- --queue)
- ADDRESS="$2;{create: always}"; shift; shift; continue
- ;;
- --topic)
- ADDRESS="amq.topic/$2"; shift; shift; continue
- ;;
- --address)
- ADDRESS="$2"; shift; shift; continue
- ;;
- -h|--help)
- usage
- exit 0
- ;;
- -a|--jvm-args)
- EXTRA_JVM_ARGS="$2"; shift; shift; continue
- ;;
- -s|--msg-size)
- MSG_SIZE="$2"; shift; shift; continue
- ;;
- -c|--msg-count)
- MSG_COUNT="$2"; shift; shift; continue
- ;;
- -t|--msg_type)
- MSG_TYPE="$2"; shift; shift; continue
- ;;
- -w|--warmup-msg-count)
- WARMUP_MSG_COUNT="$2"; shift; shift; continue
- ;;
- --)
- # no more arguments to parse
- break
- ;;
- *)
- # no more arguments to parse
- break
- ;;
- esac
-done
-
-PRODUCER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dext_controller=true -Dprecision=mili -Dcon_count=$CON_COUNT"
-
-start_producers()
-{
- for ((i=0; i<$PROCESS_COUNT; i++))
- do
- if [ "$UNIQUE_DEST" = "true" ]; then
- sh run-pub "$PRODUCER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.pub.out 2>&1 &
- else
- sh run-pub "$PRODUCER_ARGS $@" > ${TEST_ID}_$i.pub.out 2>&1 &
- fi
- done
-}
-
-start_producers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dmsg_size=$MSG_SIZE -Dwarmup_count=$WARMUP_MSG_COUNT -Dmsg_type=$MSG_TYPE -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS"
-
diff --git a/java/tools/etc/perf-report.gnu b/java/tools/etc/perf-report.gnu
deleted file mode 100644
index 6d5020efb5..0000000000
--- a/java/tools/etc/perf-report.gnu
+++ /dev/null
@@ -1,42 +0,0 @@
-set terminal png
-set datafile separator ","
-
-set title "Variation of avg latency between iterations"
-set yrange [10:20]
-set xlabel "Iterations"
-set ylabel "Latency (ms)"
-set output "avg_latency.png"
-plot "stats-csv.log" using 9 title "avg latency" with lines, 14 title "target latency" with lines
-
-
-set title "Variation of max latency between iterations"
-set yrange [0:1000]
-set xlabel "Iterations"
-set ylabel "Latency (ms)"
-set output "max_latency.png"
-plot "stats-csv.log" using 11 title "max latency" with lines,14 title "target latency" with lines,100 title "100 ms" with lines
-
-
-set title "Variation of standard deviation of latency between iterations"
-set yrange [0:20]
-set xlabel "Iterations"
-set ylabel "Standard Deviation"
-set output "std_dev_latency.png"
-plot "stats-csv.log" using 12 title "standard deviation" with lines
-
-
-set title "Variation of system throughput between iterations"
-set yrange [400000:450000]
-set xlabel "Iterations"
-set ylabel "System Throuhgput (msg/sec)"
-set output "system_rate.png"
-plot "stats-csv.log" using 2 title "system throughput" with lines
-
-
-set title "Variation of avg producer & consumer rates between iterations"
-set yrange [6500:7500]
-set xlabel "Iterations"
-set ylabel "Avg Rates (msg/sec)"
-set output "prod_cons_rate.png"
-plot "stats-csv.log" using 6 title "producer rate" with lines,"stats-csv.log" using 3 title "consumer rate" with lines
-
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
deleted file mode 100644
index 37369959a8..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package org.apache.qpid.tools;
-
-/**
- * In the future this will be replaced by a Clock abstraction
- * that can utilize a realtime clock when running in RT Java.
- */
-
-public class Clock
-{
- private static Precision precision;
- private static long offset = -1; // in nano secs
-
- public enum Precision
- {
- NANO_SECS, MILI_SECS;
-
- static Precision getPrecision(String str)
- {
- if ("mili".equalsIgnoreCase(str))
- {
- return MILI_SECS;
- }
- else
- {
- return NANO_SECS;
- }
- }
- };
-
- static
- {
- precision = Precision.getPrecision(System.getProperty("precision","mili"));
- //offset = Long.getLong("offset",-1);
-
- System.out.println("Using precision : " + precision + " and offset " + offset);
- }
-
- public static Precision getPrecision()
- {
- return precision;
- }
-
- public static long getTime()
- {
- if (precision == Precision.NANO_SECS)
- {
- if (offset == -1)
- {
- return System.nanoTime();
- }
- else
- {
- return System.nanoTime() + offset;
- }
- }
- else
- {
- if (offset == -1)
- {
- return System.currentTimeMillis();
- }
- else
- {
- return System.currentTimeMillis() + offset/convertToMiliSecs();
- }
- }
- }
-
- public static long convertToSecs()
- {
- if (precision == Precision.NANO_SECS)
- {
- return 1000000000;
- }
- else
- {
- return 1000;
- }
- }
-
- public static long convertToMiliSecs()
- {
- if (precision == Precision.NANO_SECS)
- {
- return 1000000;
- }
- else
- {
- return 1;
- }
- }
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
index 90ee7e28ae..b88b242e6d 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java
@@ -77,7 +77,7 @@ public class LatencyTest extends PerfBase implements MessageListener
public LatencyTest()
{
- super("");
+ super();
warmedUp = lock.newCondition();
testCompleted = lock.newCondition();
// Storing the following two for efficiency
@@ -314,7 +314,7 @@ public class LatencyTest extends PerfBase implements MessageListener
public static void main(String[] args)
{
- final LatencyTest latencyTest = new LatencyTest();
+ final LatencyTest latencyTest = new LatencyTest();
Runnable r = new Runnable()
{
public void run()
@@ -334,16 +334,16 @@ public class LatencyTest extends PerfBase implements MessageListener
}
}
};
-
+
Thread t;
try
{
- t = Threading.getThreadFactory().createThread(r);
+ t = Threading.getThreadFactory().createThread(r);
}
catch(Exception e)
{
throw new Error("Error creating latency test thread",e);
}
- t.start();
+ t.start();
}
-}
+} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
index 121e94cea1..ac597d17de 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java
@@ -20,113 +20,36 @@
*/
package org.apache.qpid.tools;
-import java.net.InetAddress;
import java.text.DecimalFormat;
-import java.util.UUID;
+import java.util.Hashtable;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.messaging.Address;
public class PerfBase
{
- public final static String CODE = "CODE";
- public final static String ID = "ID";
- public final static String REPLY_ADDR = "REPLY_ADDR";
- public final static String MAX_LATENCY = "MAX_LATENCY";
- public final static String MIN_LATENCY = "MIN_LATENCY";
- public final static String AVG_LATENCY = "AVG_LATENCY";
- public final static String STD_DEV = "STD_DEV";
- public final static String CONS_RATE = "CONS_RATE";
- public final static String PROD_RATE = "PROD_RATE";
- public final static String MSG_COUNT = "MSG_COUNT";
- public final static String TIMESTAMP = "Timestamp";
-
- String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
-
TestParams params;
Connection con;
Session session;
- Session controllerSession;
Destination dest;
- Destination myControlQueue;
- Destination controllerQueue;
+ Destination feedbackDest;
DecimalFormat df = new DecimalFormat("###.##");
- String id;
- String myControlQueueAddr;
-
- MessageProducer sendToController;
- MessageConsumer receiveFromController;
- String prefix = "";
- enum OPCode {
- REGISTER_CONSUMER, REGISTER_PRODUCER,
- PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
- CONSUMER_READY, PRODUCER_READY,
- PRODUCER_START,
- RECEIVED_END_MSG, CONSUMER_STOP,
- RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS,
- CONTINUE_TEST, STOP_TEST
- };
-
- enum MessageType {
- BYTES, TEXT, MAP, OBJECT;
-
- public static MessageType getType(String s) throws Exception
- {
- if ("text".equalsIgnoreCase(s))
- {
- return TEXT;
- }
- else if ("bytes".equalsIgnoreCase(s))
- {
- return BYTES;
- }
- /*else if ("map".equalsIgnoreCase(s))
- {
- return MAP;
- }
- else if ("object".equalsIgnoreCase(s))
- {
- return OBJECT;
- }*/
- else
- {
- throw new Exception("Unsupported message type");
- }
- }
- };
-
- MessageType msgType = MessageType.BYTES;
-
- public PerfBase(String prefix)
+ public PerfBase()
{
params = new TestParams();
- String host = "";
- try
- {
- host = InetAddress.getLocalHost().getHostName();
- }
- catch (Exception e)
- {
- }
- id = host + "-" + UUID.randomUUID().toString();
- this.prefix = prefix;
- this.myControlQueueAddr = id + ";{create: always}";
}
public void setUp() throws Exception
- {
+ {
+
if (params.getHost().equals("") || params.getPort() == -1)
{
con = new AMQConnection(params.getUrl());
@@ -139,78 +62,7 @@ public class PerfBase
session = con.createSession(params.isTransacted(),
params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
- controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- dest = createDestination();
- controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR);
- myControlQueue = session.createQueue(myControlQueueAddr);
- msgType = MessageType.getType(params.getMessageType());
- System.out.println("Using " + msgType + " messages");
-
- sendToController = controllerSession.createProducer(controllerQueue);
- receiveFromController = controllerSession.createConsumer(myControlQueue);
- }
-
- private Destination createDestination() throws Exception
- {
- if (params.isUseUniqueDests())
- {
- System.out.println("Prefix : " + prefix);
- Address addr = Address.parse(params.getAddress());
- AMQAnyDestination temp = new AMQAnyDestination(params.getAddress());
- int type = ((AMQSession_0_10)session).resolveAddressType(temp);
-
- if ( type == AMQDestination.TOPIC_TYPE)
- {
- addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
- System.out.println("Setting subject : " + addr);
- }
- else
- {
- addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions());
- System.out.println("Setting name : " + addr);
- }
-
- return new AMQAnyDestination(addr);
- }
- else
- {
- return new AMQAnyDestination(params.getAddress());
- }
- }
-
- public synchronized void sendMessageToController(MapMessage m) throws Exception
- {
- m.setString(ID, id);
- m.setString(REPLY_ADDR,myControlQueueAddr);
- sendToController.send(m);
- }
-
- public void receiveFromController(OPCode expected) throws Exception
- {
- MapMessage m = (MapMessage)receiveFromController.receive();
- OPCode code = OPCode.values()[m.getInt(CODE)];
- System.out.println("Received Code : " + code);
- if (expected != code)
- {
- throw new Exception("Expected OPCode : " + expected + " but received : " + code);
- }
-
- }
-
- public boolean continueTest() throws Exception
- {
- MapMessage m = (MapMessage)receiveFromController.receive();
- OPCode code = OPCode.values()[m.getInt(CODE)];
- System.out.println("Received Code : " + code);
- return (code == OPCode.CONTINUE_TEST);
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- controllerSession.close();
- con.close();
+ dest = new AMQAnyDestination(params.getAddress());
}
public void handleError(Exception e,String msg)
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
index b63892bb51..0ef0455a64 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java
@@ -20,17 +20,13 @@
*/
package org.apache.qpid.tools;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
+import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
import javax.jms.TextMessage;
-import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -51,7 +47,7 @@ import org.apache.qpid.thread.Threading;
* b) They are on separate machines that have their time synced via a Time Server
*
* In order to calculate latency the producer inserts a timestamp
- * when the message is sent. The consumer will note the current time the message is
+ * hen the message is sent. The consumer will note the current time the message is
* received and will calculate the latency as follows
* latency = rcvdTime - msg.getJMSTimestamp()
*
@@ -59,9 +55,13 @@ import org.apache.qpid.thread.Threading;
* variance in latencies.
*
* Avg latency is measured by adding all latencies and dividing by the total msgs.
+ * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
*
* Throughput
* ===========
+ * System throughput is calculated as follows
+ * rcvdMsgCount/(rcvdTime - testStartTime)
+ *
* Consumer rate is calculated as
* rcvdMsgCount/(rcvdTime - startTime)
*
@@ -81,160 +81,130 @@ public class PerfConsumer extends PerfBase implements MessageListener
long minLatency = Long.MAX_VALUE;
long totalLatency = 0; // to calculate avg latency.
int rcvdMsgCount = 0;
+ long testStartTime = 0; // to measure system throughput
long startTime = 0; // to measure consumer throughput
long rcvdTime = 0;
boolean transacted = false;
int transSize = 0;
- boolean printStdDev = false;
- List<Long> sample;
-
final Object lock = new Object();
- public PerfConsumer(String prefix)
+ public PerfConsumer()
{
- super(prefix);
- System.out.println("Consumer ID : " + id);
+ super();
}
public void setUp() throws Exception
{
super.setUp();
consumer = session.createConsumer(dest);
- System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n");
// Storing the following two for efficiency
transacted = params.isTransacted();
transSize = params.getTransactionSize();
- printStdDev = params.isPrintStdDev();
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
- sendMessageToController(m);
}
public void warmup()throws Exception
{
- receiveFromController(OPCode.CONSUMER_STARTWARMUP);
- Message msg = consumer.receive();
- // This is to ensure we drain the queue before we start the actual test.
- while ( msg != null)
+ System.out.println("Warming up......");
+
+ boolean start = false;
+ while (!start)
{
- if (msg.getBooleanProperty("End") == true)
+ Message msg = consumer.receive();
+ if (msg instanceof TextMessage)
{
- // It's more realistic for the consumer to signal this.
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
- sendMessageToController(m);
+ if (((TextMessage)msg).getText().equals("End"))
+ {
+ start = true;
+ MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
+ temp.send(session.createMessage());
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ temp.close();
+ }
}
- msg = consumer.receive(1000);
- }
-
- if (params.isTransacted())
- {
- session.commit();
}
-
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
- sendMessageToController(m);
- consumer.setMessageListener(this);
}
public void startTest() throws Exception
{
- System.out.println("Consumer: " + id + " Starting test......" + "\n");
- resetCounters();
+ System.out.println("Starting test......");
+ consumer.setMessageListener(this);
}
- public void resetCounters()
+ public void printResults() throws Exception
{
- rcvdMsgCount = 0;
- maxLatency = 0;
- minLatency = Long.MAX_VALUE;
- totalLatency = 0;
- if (printStdDev)
+ synchronized (lock)
{
- sample = null;
- sample = new ArrayList<Long>(params.getMsgCount());
+ lock.wait();
}
- }
-
- public void sendResults() throws Exception
- {
- receiveFromController(OPCode.CONSUMER_STOP);
double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime);
- double stdDev = 0.0;
- if (printStdDev)
- {
- stdDev = calculateStdDev(avgLatency);
- }
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
- m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs());
- m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs());
- m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs());
- m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs());
- m.setDouble(CONS_RATE, consRate);
- m.setLong(MSG_COUNT, rcvdMsgCount);
- sendMessageToController(m);
-
+ double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
+ double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
System.out.println(new StringBuilder("Consumer rate : ").
append(df.format(consRate)).
append(" msg/sec").toString());
+ System.out.println(new StringBuilder("System Throughput : ").
+ append(df.format(throughput)).
+ append(" msg/sec").toString());
System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency/Clock.convertToMiliSecs())).
+ append(df.format(avgLatency)).
append(" ms").toString());
System.out.println(new StringBuilder("Min Latency : ").
- append(df.format(minLatency/Clock.convertToMiliSecs())).
+ append(minLatency).
append(" ms").toString());
System.out.println(new StringBuilder("Max Latency : ").
- append(df.format(maxLatency/Clock.convertToMiliSecs())).
+ append(maxLatency).
append(" ms").toString());
- if (printStdDev)
- {
- System.out.println(new StringBuilder("Std Dev : ").
- append(stdDev/Clock.convertToMiliSecs()).toString());
- }
+ System.out.println("Completed the test......\n");
}
- public double calculateStdDev(double mean)
+ public void notifyCompletion(Destination replyTo) throws Exception
{
- double v = 0;
- for (double latency: sample)
+ MessageProducer tmp = session.createProducer(replyTo);
+ Message endMsg = session.createMessage();
+ tmp.send(endMsg);
+ if (params.isTransacted())
{
- v = v + Math.pow((latency-mean), 2);
+ session.commit();
}
- v = v/sample.size();
- return Math.round(Math.sqrt(v));
+ tmp.close();
+ }
+
+ public void tearDown() throws Exception
+ {
+ consumer.close();
+ session.close();
+ con.close();
}
public void onMessage(Message msg)
{
try
{
- // To figure out the decoding overhead of text
- if (msgType == MessageType.TEXT)
+ if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
{
- ((TextMessage)msg).getText();
- }
+ notifyCompletion(msg.getJMSReplyTo());
- if (msg.getBooleanProperty("End"))
- {
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
- sendMessageToController(m);
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
}
else
{
- rcvdTime = Clock.getTime();
+ rcvdTime = System.currentTimeMillis();
rcvdMsgCount ++;
if (rcvdMsgCount == 1)
{
startTime = rcvdTime;
+ testStartTime = msg.getJMSTimestamp();
}
if (transacted && (rcvdMsgCount % transSize == 0))
@@ -242,14 +212,10 @@ public class PerfConsumer extends PerfBase implements MessageListener
session.commit();
}
- long latency = rcvdTime - msg.getLongProperty(TIMESTAMP);
+ long latency = rcvdTime - msg.getJMSTimestamp();
maxLatency = Math.max(maxLatency, latency);
minLatency = Math.min(minLatency, latency);
totalLatency = totalLatency + latency;
- if (printStdDev)
- {
- sample.add(latency);
- }
}
}
@@ -260,21 +226,14 @@ public class PerfConsumer extends PerfBase implements MessageListener
}
- public void run()
+ public void test()
{
try
{
setUp();
warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- System.out.println("=========================================================\n");
- System.out.println("Consumer: " + id + " starting a new iteration ......\n");
- startTest();
- sendResults();
- nextIteration = continueTest();
- }
+ startTest();
+ printResults();
tearDown();
}
catch(Exception e)
@@ -283,43 +242,26 @@ public class PerfConsumer extends PerfBase implements MessageListener
}
}
- @Override
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public static void main(String[] args) throws InterruptedException
+ public static void main(String[] args)
{
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = Integer.getInteger("con_count",1);
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
+ final PerfConsumer cons = new PerfConsumer();
+ Runnable r = new Runnable()
{
-
- final PerfConsumer cons = new PerfConsumer(scriptId + i);
- Runnable r = new Runnable()
- {
- public void run()
- {
- cons.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
+ public void run()
{
- throw new Error("Error creating consumer thread",e);
+ cons.test();
}
- t.start();
-
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating consumer thread",e);
}
- testCompleted.await();
- System.out.println("Consumers have completed the test......\n");
+ t.start();
}
} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
index ac6129ab68..015d1e6205 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java
@@ -23,15 +23,13 @@ package org.apache.qpid.tools;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
-import java.util.concurrent.CountDownLatch;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
-import javax.jms.MapMessage;
import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
-import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.thread.Threading;
/**
@@ -53,52 +51,38 @@ import org.apache.qpid.thread.Threading;
* System throughput and latencies calculated by the PerfConsumer are more realistic
* numbers.
*
- * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
- * I have done so far, it seems quite useful to compute the producer rate as it gives an
- * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
- * you could clearly see the higher latencies and when producer and consumer rates are very close,
- * latency is good.
- *
*/
public class PerfProducer extends PerfBase
{
- private static long SEC = 60000;
-
MessageProducer producer;
Message msg;
- Object payload;
- List<Object> payloads;
+ byte[] payload;
+ List<byte[]> payloads;
boolean cacheMsg = false;
boolean randomMsgSize = false;
boolean durable = false;
Random random;
int msgSizeRange = 1024;
- boolean rateLimitProducer = false;
- double rateFactor = 0.4;
- double rate = 0.0;
-
- public PerfProducer(String prefix)
+
+ public PerfProducer()
{
- super(prefix);
- System.out.println("Producer ID : " + id);
+ super();
}
public void setUp() throws Exception
{
super.setUp();
- durable = params.isDurable();
- rateLimitProducer = params.getRate() > 0 ? true : false;
- if (rateLimitProducer)
- {
- System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec");
- }
+ feedbackDest = session.createTemporaryQueue();
+ durable = params.isDurable();
+
// if message caching is enabled we pre create the message
// else we pre create the payload
if (params.isCacheMessage())
{
cacheMsg = true;
- msg = createMessage(createPayload(params.getMsgSize()));
+
+ msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
msg.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
@@ -109,52 +93,21 @@ public class PerfProducer extends PerfBase
random = new Random(20080921);
randomMsgSize = true;
msgSizeRange = params.getMsgSize();
- payloads = new ArrayList<Object>(msgSizeRange);
-
+ payloads = new ArrayList<byte[]>(msgSizeRange);
+
for (int i=0; i < msgSizeRange; i++)
{
- payloads.add(createPayload(i));
+ payloads.add(MessageFactory.createMessagePayload(i).getBytes());
}
- }
+ }
else
{
- payload = createPayload(params.getMsgSize());
+ payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
}
producer = session.createProducer(dest);
- System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName());
producer.setDisableMessageID(params.isDisableMessageID());
producer.setDisableMessageTimestamp(params.isDisableTimestamp());
-
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
- sendMessageToController(m);
- }
-
- Object createPayload(int size)
- {
- if (msgType == MessageType.TEXT)
- {
- return MessageFactory.createMessagePayload(size);
- }
- else
- {
- return MessageFactory.createMessagePayload(size).getBytes();
- }
- }
-
- Message createMessage(Object payload) throws Exception
- {
- if (msgType == MessageType.TEXT)
- {
- return session.createTextMessage((String)payload);
- }
- else
- {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes((byte[])payload);
- return m;
- }
}
protected Message getNextMessage() throws Exception
@@ -164,130 +117,117 @@ public class PerfProducer extends PerfBase
return msg;
}
else
- {
- Message m;
-
+ {
+ msg = session.createBytesMessage();
+
if (!randomMsgSize)
{
- m = createMessage(payload);
+ ((BytesMessage)msg).writeBytes(payload);
}
else
{
- m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
+ ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange)));
}
- m.setJMSDeliveryMode(durable?
+ msg.setJMSDeliveryMode(durable?
DeliveryMode.PERSISTENT :
DeliveryMode.NON_PERSISTENT
);
- return m;
+ return msg;
}
}
public void warmup()throws Exception
{
- receiveFromController(OPCode.PRODUCER_STARTWARMUP);
- System.out.println("Producer: " + id + " Warming up......");
+ System.out.println("Warming up......");
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
for (int i=0; i < params.getWarmupCount() -1; i++)
{
producer.send(getNextMessage());
}
- sendEndMessage();
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
if (params.isTransacted())
{
session.commit();
}
+
+ tmp.close();
}
public void startTest() throws Exception
{
- resetCounters();
- receiveFromController(OPCode.PRODUCER_START);
+ System.out.println("Starting test......");
int count = params.getMsgCount();
boolean transacted = params.isTransacted();
int tranSize = params.getTransactionSize();
- long limit = (long)(params.getRate() * rateFactor); // in msecs
- long timeLimit = (long)(SEC * rateFactor); // in msecs
-
- long start = Clock.getTime(); // defaults to nano secs
- long interval = start;
+ long start = System.currentTimeMillis();
for(int i=0; i < count; i++ )
{
Message msg = getNextMessage();
- msg.setLongProperty(TIMESTAMP, Clock.getTime());
+ msg.setJMSTimestamp(System.currentTimeMillis());
producer.send(msg);
if ( transacted && ((i+1) % tranSize == 0))
{
session.commit();
}
-
- if (rateLimitProducer && i%limit == 0)
- {
- long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs
- if (elapsed < timeLimit)
- {
- Thread.sleep(elapsed);
- }
- interval = Clock.getTime();
-
- }
- }
- sendEndMessage();
- if ( transacted)
- {
- session.commit();
}
- long time = Clock.getTime() - start;
- rate = (double)count*Clock.convertToSecs()/(double)time;
+ long time = System.currentTimeMillis() - start;
+ double rate = ((double)count/(double)time)*1000;
System.out.println(new StringBuilder("Producer rate: ").
append(df.format(rate)).
append(" msg/sec").
toString());
}
- public void resetCounters()
+ public void waitForCompletion() throws Exception
{
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
- }
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
- public void sendEndMessage() throws Exception
- {
- Message msg = session.createMessage();
- msg.setBooleanProperty("End", true);
- producer.send(msg);
- }
+ tmp.receive();
- public void sendResults() throws Exception
- {
- MapMessage msg = controllerSession.createMapMessage();
- msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
- msg.setDouble(PROD_RATE, rate);
- sendMessageToController(msg);
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ System.out.println("Consumer has completed the test......");
}
- @Override
public void tearDown() throws Exception
{
- super.tearDown();
+ producer.close();
+ session.close();
+ con.close();
}
- public void run()
+ public void test()
{
try
{
setUp();
warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- System.out.println("=========================================================\n");
- System.out.println("Producer: " + id + " starting a new iteration ......\n");
- startTest();
- sendResults();
- nextIteration = continueTest();
- }
+ startTest();
+ waitForCompletion();
tearDown();
}
catch(Exception e)
@@ -296,63 +236,27 @@ public class PerfProducer extends PerfBase
}
}
- public void startControllerIfNeeded()
+
+ public static void main(String[] args)
{
- if (!params.isExternalController())
+ final PerfProducer prod = new PerfProducer();
+ Runnable r = new Runnable()
{
- final PerfTestController controller = new PerfTestController();
- Runnable r = new Runnable()
- {
- public void run()
- {
- controller.run();
- }
- };
-
- Thread t;
- try
+ public void run()
{
- t = Threading.getThreadFactory().createThread(r);
+ prod.test();
}
- catch(Exception e)
- {
- throw new Error("Error creating controller thread",e);
- }
- t.start();
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
}
- }
-
-
- public static void main(String[] args) throws InterruptedException
- {
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = Integer.getInteger("con_count",1);
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
+ catch(Exception e)
{
- final PerfProducer prod = new PerfProducer(scriptId + i);
- prod.startControllerIfNeeded();
- Runnable r = new Runnable()
- {
- public void run()
- {
- prod.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
- }
- t.start();
+ throw new Error("Error creating producer thread",e);
}
- testCompleted.await();
- System.out.println("Producers have completed the test......");
+ t.start();
}
} \ No newline at end of file
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
deleted file mode 100644
index 5c98c645f4..0000000000
--- a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java
+++ /dev/null
@@ -1,422 +0,0 @@
-package org.apache.qpid.tools;
-
-import java.io.FileWriter;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-
-import org.apache.qpid.client.message.AMQPEncodedMapMessage;
-
-/**
- * The Controller coordinates a test run between a number
- * of producers and consumers, configured via -Dprod_count and -Dcons_count.
- *
- * It waits till all the producers and consumers have registered and then
- * conducts a warmup run. Once all consumers and producers have completed
- * the warmup run and is ready, it will conduct the actual test run and
- * collect all stats from the participants and calculates the system
- * throughput, the avg/min/max for producer rates, consumer rates and latency.
- *
- * These stats are then printed to std out.
- * The Controller also prints events to std out to give a running account
- * of the test run in progress. Ex registering of participants, starting warmup ..etc.
- * This allows a scripting tool to monitor the progress.
- *
- * The Controller can be run in two modes.
- * 1. A single test run (default) where it just runs until the message count specified
- * for the producers via -Dmsg_count is sent and received.
- *
- * 2. Time based, configured via -Dduration=x, where x is in mins.
- * In this mode, the Controller repeatedly cycles through the tests (after an initial
- * warmup run) until the desired time is reached. If a test run is in progress
- * and the time is up, it will allow the run the complete.
- *
- * After each iteration, the stats will be printed out in csv format to a separate log file.
- * System throughput is calculated as follows
- * totalMsgCount/(totalTestTime)
- */
-public class PerfTestController extends PerfBase implements MessageListener
-{
- enum TestMode { SINGLE_RUN, TIME_BASED };
-
- TestMode testMode = TestMode.SINGLE_RUN;
-
- long totalTestTime;
-
- private double avgSystemLatency = 0.0;
- private double minSystemLatency = Double.MAX_VALUE;
- private double maxSystemLatency = 0;
- private double avgSystemLatencyStdDev = 0.0;
-
- private double avgSystemConsRate = 0.0;
- private double maxSystemConsRate = 0.0;
- private double minSystemConsRate = Double.MAX_VALUE;
-
- private double avgSystemProdRate = 0.0;
- private double maxSystemProdRate = 0.0;
- private double minSystemProdRate = Double.MAX_VALUE;
-
- private long totalMsgCount = 0;
- private double totalSystemThroughput = 0.0;
-
- private int consumerCount = Integer.getInteger("cons_count", 1);
- private int producerCount = Integer.getInteger("prod_count", 1);
- private int duration = Integer.getInteger("duration", -1); // in mins
- private Map<String,MapMessage> consumers;
- private Map<String,MapMessage> producers;
-
- private CountDownLatch consRegistered;
- private CountDownLatch prodRegistered;
- private CountDownLatch consReady;
- private CountDownLatch prodReady;
- private CountDownLatch receivedEndMsg;
- private CountDownLatch receivedConsStats;
- private CountDownLatch receivedProdStats;
-
- private MessageConsumer consumer;
- private boolean printStdDev = false;
- FileWriter writer;
-
- public PerfTestController()
- {
- super("");
- consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
- producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
-
- consRegistered = new CountDownLatch(consumerCount);
- prodRegistered = new CountDownLatch(producerCount);
- consReady = new CountDownLatch(consumerCount);
- prodReady = new CountDownLatch(producerCount);
- printStdDev = params.isPrintStdDev();
- testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED;
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- if (testMode == TestMode.TIME_BASED)
- {
- writer = new FileWriter("stats-csv.log");
- }
- consumer = controllerSession.createConsumer(controllerQueue);
- System.out.println("\nController: " + producerCount + " producers are expected");
- System.out.println("Controller: " + consumerCount + " consumers are expected \n");
- consumer.setMessageListener(this);
- consRegistered.await();
- prodRegistered.await();
- System.out.println("\nController: All producers and consumers have registered......\n");
- }
-
- public void warmup() throws Exception
- {
- System.out.println("Controller initiating warm up sequence......");
- sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values());
- sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values());
- prodReady.await();
- consReady.await();
- System.out.println("\nController : All producers and consumers are ready to start the test......\n");
- }
-
- public void startTest() throws Exception
- {
- resetCounters();
- System.out.println("\nController Starting test......");
- long start = Clock.getTime();
- sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
- receivedEndMsg.await();
- totalTestTime = Clock.getTime() - start;
- sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values());
- receivedProdStats.await();
- receivedConsStats.await();
- }
-
- public void resetCounters()
- {
- minSystemLatency = Double.MAX_VALUE;
- maxSystemLatency = 0;
- maxSystemConsRate = 0.0;
- minSystemConsRate = Double.MAX_VALUE;
- maxSystemProdRate = 0.0;
- minSystemProdRate = Double.MAX_VALUE;
-
- totalMsgCount = 0;
-
- receivedConsStats = new CountDownLatch(consumerCount);
- receivedProdStats = new CountDownLatch(producerCount);
- receivedEndMsg = new CountDownLatch(producerCount);
- }
-
- public void calcStats() throws Exception
- {
- double totLatency = 0.0;
- double totStdDev = 0.0;
- double totalConsRate = 0.0;
- double totalProdRate = 0.0;
-
- MapMessage conStat = null; // for error handling
- try
- {
- for (MapMessage m: consumers.values())
- {
- conStat = m;
- minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY));
- maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY));
- totLatency = totLatency + m.getDouble(AVG_LATENCY);
- totStdDev = totStdDev + m.getDouble(STD_DEV);
-
- minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE));
- maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE));
- totalConsRate = totalConsRate + m.getDouble(CONS_RATE);
-
- totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT);
- }
- }
- catch(Exception e)
- {
- System.out.println("Error calculating stats from Consumer : " + conStat);
- }
-
-
- MapMessage prodStat = null; // for error handling
- try
- {
- for (MapMessage m: producers.values())
- {
- prodStat = m;
- minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE));
- maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE));
- totalProdRate = totalProdRate + m.getDouble(PROD_RATE);
- }
- }
- catch(Exception e)
- {
- System.out.println("Error calculating stats from Producer : " + conStat);
- }
-
- avgSystemLatency = totLatency/consumers.size();
- avgSystemLatencyStdDev = totStdDev/consumers.size();
- avgSystemConsRate = totalConsRate/consumers.size();
- avgSystemProdRate = totalProdRate/producers.size();
-
- System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
-
- totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime);
- }
-
- public void printResults() throws Exception
- {
- System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
- System.out.println(new StringBuilder("System Throughput : ").
- append(df.format(totalSystemThroughput)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Avg Consumer rate : ").
- append(df.format(avgSystemConsRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Min Consumer rate : ").
- append(df.format(minSystemConsRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Max Consumer rate : ").
- append(df.format(maxSystemConsRate)).
- append(" msg/sec").toString());
-
- System.out.println(new StringBuilder("Avg Producer rate : ").
- append(df.format(avgSystemProdRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Min Producer rate : ").
- append(df.format(minSystemProdRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Max Producer rate : ").
- append(df.format(maxSystemProdRate)).
- append(" msg/sec").toString());
-
- System.out.println(new StringBuilder("Avg System Latency : ").
- append(df.format(avgSystemLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min System Latency : ").
- append(df.format(minSystemLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max System Latency : ").
- append(df.format(maxSystemLatency)).
- append(" ms").toString());
- if (printStdDev)
- {
- System.out.println(new StringBuilder("Avg System Std Dev : ").
- append(avgSystemLatencyStdDev));
- }
- }
-
- private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
- {
- System.out.println("\nController: Sending code " + code);
- MessageProducer tmpProd = controllerSession.createProducer(null);
- MapMessage msg = controllerSession.createMapMessage();
- msg.setInt(CODE, code.ordinal());
- for (MapMessage node : nodes)
- {
- if (node.getString(REPLY_ADDR) == null)
- {
- System.out.println("REPLY_ADDR is null " + node);
- }
- else
- {
- System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
- }
- tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg);
- }
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- MapMessage m = (MapMessage)msg;
- OPCode code = OPCode.values()[m.getInt(CODE)];
-
- System.out.println("\n---------Controller Received Code : " + code);
- System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
-
- switch (code)
- {
- case REGISTER_CONSUMER :
- if (consRegistered.getCount() == 0)
- {
- System.out.println("Warning : Expected number of consumers have already registered," +
- "ignoring extra consumer");
- break;
- }
- consumers.put(m.getString(ID),m);
- consRegistered.countDown();
- break;
-
- case REGISTER_PRODUCER :
- if (prodRegistered.getCount() == 0)
- {
- System.out.println("Warning : Expected number of producers have already registered," +
- "ignoring extra producer");
- break;
- }
- producers.put(m.getString(ID),m);
- prodRegistered.countDown();
- break;
-
- case CONSUMER_READY :
- consReady.countDown();
- break;
-
- case PRODUCER_READY :
- prodReady.countDown();
- break;
-
- case RECEIVED_END_MSG :
- receivedEndMsg.countDown();
- break;
-
- case RECEIVED_CONSUMER_STATS :
- consumers.put(m.getString(ID),m);
- receivedConsStats.countDown();
- break;
-
- case RECEIVED_PRODUCER_STATS :
- producers.put(m.getString(ID),m);
- receivedProdStats.countDown();
- break;
-
- default:
- throw new Exception("Invalid OPCode " + code);
- }
- }
- catch (Exception e)
- {
- handleError(e,"Error when receiving messages " + msg);
- }
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- if (testMode == TestMode.SINGLE_RUN)
- {
- startTest();
- calcStats();
- printResults();
- }
- else
- {
- long startTime = Clock.getTime();
- long timeLimit = duration * 60 * 1000; // duration is in mins.
- boolean nextIteration = true;
- while (nextIteration)
- {
- startTest();
- calcStats();
- writeStatsToFile();
- if (Clock.getTime() - startTime < timeLimit)
- {
- sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values());
- sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values());
- nextIteration = true;
- }
- else
- {
- nextIteration = false;
- }
- }
- }
- tearDown();
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- @Override
- public void tearDown() throws Exception {
- System.out.println("Controller: Completed the test......\n");
- if (testMode == TestMode.TIME_BASED)
- {
- writer.close();
- }
- sendMessageToNodes(OPCode.STOP_TEST,consumers.values());
- sendMessageToNodes(OPCode.STOP_TEST,producers.values());
- super.tearDown();
- }
-
- public void writeStatsToFile() throws Exception
- {
- writer.append(String.valueOf(totalMsgCount)).append(",");
- writer.append(df.format(totalSystemThroughput)).append(",");
- writer.append(df.format(avgSystemConsRate)).append(",");
- writer.append(df.format(minSystemConsRate)).append(",");
- writer.append(df.format(maxSystemConsRate)).append(",");
- writer.append(df.format(avgSystemProdRate)).append(",");
- writer.append(df.format(minSystemProdRate)).append(",");
- writer.append(df.format(maxSystemProdRate)).append(",");
- writer.append(df.format(avgSystemLatency)).append(",");
- writer.append(df.format(minSystemLatency)).append(",");
- writer.append(df.format(maxSystemLatency));
- if (printStdDev)
- {
- writer.append(",").append(String.valueOf(avgSystemLatencyStdDev));
- }
- writer.append("\n");
- writer.flush();
- }
-
- public static void main(String[] args)
- {
- PerfTestController controller = new PerfTestController();
- controller.run();
- }
-}
diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
index d73be0181b..89d6462a39 100644
--- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
+++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java
@@ -25,25 +25,25 @@ import javax.jms.Session;
public class TestParams
{
/*
- * By default the connection URL is used.
+ * By default the connection URL is used.
* This allows a user to easily specify a fully fledged URL any given property.
* Ex. SSL parameters
- *
+ *
* By providing a host & port allows a user to simply override the URL.
* This allows to create multiple clients in test scripts easily,
- * without having to deal with the long URL format.
+ * without having to deal with the long URL format.
*/
private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
-
+
private String host = "";
-
+
private int port = -1;
private String address = "queue; {create : always}";
private int msg_size = 1024;
- private int random_msg_size_start_from = 1;
+ private int msg_type = 1; // not used yet
private boolean cacheMessage = false;
@@ -62,28 +62,19 @@ public class TestParams
private int msg_count = 10;
private int warmup_count = 1;
-
+
private boolean random_msg_size = false;
- private String msgType = "bytes";
-
- private boolean printStdDev = false;
-
- private long rate = -1;
-
- private boolean externalController = false;
-
- private boolean useUniqueDest = false; // useful when using multiple connections.
-
public TestParams()
{
-
+
url = System.getProperty("url",url);
host = System.getProperty("host","");
port = Integer.getInteger("port", -1);
- address = System.getProperty("address",address);
+ address = System.getProperty("address","queue");
msg_size = Integer.getInteger("msg_size", 1024);
+ msg_type = Integer.getInteger("msg_type",1);
cacheMessage = Boolean.getBoolean("cache_msg");
disableMessageID = Boolean.getBoolean("disableMessageID");
disableTimestamp = Boolean.getBoolean("disableTimestamp");
@@ -94,12 +85,6 @@ public class TestParams
msg_count = Integer.getInteger("msg_count",msg_count);
warmup_count = Integer.getInteger("warmup_count",warmup_count);
random_msg_size = Boolean.getBoolean("random_msg_size");
- msgType = System.getProperty("msg_type","bytes");
- printStdDev = Boolean.getBoolean("print_std_dev");
- rate = Long.getLong("rate",-1);
- externalController = Boolean.getBoolean("ext_controller");
- useUniqueDest = Boolean.getBoolean("use_unique_dest");
- random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1);
}
public String getUrl()
@@ -137,9 +122,9 @@ public class TestParams
return msg_size;
}
- public int getRandomMsgSizeStartFrom()
+ public int getMsgType()
{
- return random_msg_size_start_from;
+ return msg_type;
}
public boolean isDurable()
@@ -176,39 +161,10 @@ public class TestParams
{
return disableTimestamp;
}
-
+
public boolean isRandomMsgSize()
{
return random_msg_size;
}
- public String getMessageType()
- {
- return msgType;
- }
-
- public boolean isPrintStdDev()
- {
- return printStdDev;
- }
-
- public long getRate()
- {
- return rate;
- }
-
- public boolean isExternalController()
- {
- return externalController;
- }
-
- public void setAddress(String addr)
- {
- address = addr;
- }
-
- public boolean isUseUniqueDests()
- {
- return useUniqueDest;
- }
}