diff options
Diffstat (limited to 'java/tools')
20 files changed, 376 insertions, 1636 deletions
diff --git a/java/tools/bin/Profile-run-from-source b/java/tools/bin/Profile-run-from-source deleted file mode 100755 index f8ec45ccff..0000000000 --- a/java/tools/bin/Profile-run-from-source +++ /dev/null @@ -1,71 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Sets the environment for running the scripts from a source checkout. -txtbld=$(tput bold) # Bold -txtrst=$(tput sgr0) # Reset -txtred=$(tput setaf 1) # red -txtgreen=$(tput setaf 2) # green - -echo "${txtbld}Setting the environment to run qpid java tools from a source checkout${txtrst}" - -abs_path() -{ - D=`dirname "$1"` - echo "`cd \"$D\" 2>/dev/null && pwd`" -} - -export QPID_CHECKOUT=`abs_path "../../../../"` -echo "${txtgreen}Using source checkout at $QPID_CHECKOUT${txtrst}" - -export PATH=$QPID_CHECKOUT/java/tools/bin:$PATH - -if [ "$JAVA" = "" ] ; then - export JAVA=$(which java) -fi - -#------------- Required for perf_report, qpid-bench & qpid-python-testkit ---------------- - -export VENDOR_LIB=$QPID_CHECKOUT/java/build/lib -export CLASSPATH=`find $VENDOR_LIB -name '*.jar' | tr '\n' ':'` -export LOG_CONFIG="-Dlog4j.configuration=file:///$QPID_CHECKOUT/java/tools/etc/test.log4j" - - -#------------- Required for qpid-python-testkit ----------------------------------------- - -PYTHONPATH=$QPID_CHECKOUT/python/qpid:$QPID_CHECKOUT/cpp/src/test/brokertest.py:$PYTHONPATH -export PATH=$QPID_CHECKOUT/python:$PATH - -if [ -x $QPID_CHECKOUT/cpp/src/qpidd ]; then - QPIDD_EXEC=$QPID_CHECKOUT/cpp/src/qpidd -else - echo "${txtred}WARNING: Qpid CPP broker executable not found. You will not be able to run qpid-python-testkit${txtrst}" -fi - -if [ -x $QPID_CHECKOUT/cpp/src/.libs/cluster.so ]; then - CLUSTER_LIB=$QPID_CHECKOUT/cpp/src/.libs/cluster.so -else - echo "${txtred}WARNING: Qpid cluster.so not found.You will not be able to run qpid-python-testkit${txtrst}" -fi - -if [ "$STORE_LIB" = "" ] ; then - echo "${txtred}WARNING: Please point the STORE_LIB variable to the message store module. If not persistence tests will not write messages to disk.${txtrst}" -fi - -export PYTHONPATH QPIDD_EXEC CLUSTER_LIB diff --git a/java/tools/bin/check-qpid-java-env b/java/tools/bin/check-qpid-java-env deleted file mode 100755 index dedd6e06ea..0000000000 --- a/java/tools/bin/check-qpid-java-env +++ /dev/null @@ -1,38 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -if [ -z "$LOG_CONFIG" ]; then - echo "Please set the appropriate parameters for logging as it may affect performance. Ex log4j defaults to DEBUG if not configured properly" - exit -1 -fi - -if [ -z "$JAVA_MEM" ]; then - JAVA_MEM=-Xmx1024m -fi - -if [ -z "$JAVA" ]; then - echo "Please set the path to the correct java executable to JAVA" - exit -1 -fi - -if [ -z "$CLASSPATH" ]; then - echo "Please set the $CLASSPATH variable to point to the jar/class files" - exit -1 -fi diff --git a/java/tools/bin/controller b/java/tools/bin/controller deleted file mode 100644 index fab8614039..0000000000 --- a/java/tools/bin/controller +++ /dev/null @@ -1,132 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME=controller -CONSUMER_COUNT=1 -PRODUCER_COUNT=1 -DURATION=-1 -TEST_NAME="TEST_NAME" -EXTRA_JVM_ARGS="" - -TEMP=$(getopt -n $PROGRAM_NAME -o c:p:d:n:a:h --long consumers:,producers:,jvm-args:help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: controller [option].." - - printf "\n%31s\n%52s\n" "-c, --consumer-count=count" "No of consumers participating in the test" - - printf "\n%31s\n%52s\n" "-p, --producer-count=count" "No of producers participating in the test" - - printf "\n%24s\n%94s\n" "-d, --duration=mins" "The duration of the test in mins. If not specified, it will just run one iteration." - - printf "\n%27s\n%32s\n" "-n, --name=<test-name>" "The name of the test." - - printf "\n%19s\n%50s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -c|--consumer-count) - CONSUMER_COUNT="$2"; shift; shift; continue - ;; - -p|--producer-count) - PRODUCER_COUNT="$2"; shift; shift; continue - ;; - -d|--duration) - DURATION="$2"; shift; shift; continue - ;; - -n|--name) - TEST_NAME="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -CONTROLLER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dprod_count=$PRODUCER_COUNT -Dcons_count=$CONSUMER_COUNT -Dprint_std_dev=true -Dduration=${DURATION}" - - -waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } -cleanup() -{ - pids=`ps aux | grep java | grep PerfTestController | awk '{print $2}'` - if [ "$pids" != "" ]; then - kill -3 $pids - kill -9 $pids >/dev/null 2>&1 - fi -} - -run_controller() -{ - TEST_ARGS="$LOG_CONFIG $JAVA_MEM $CONTROLLER_ARGS $EXTRA_JVM_ARGS" - echo "Running controller with : $TEST_ARGS" > test.out - $JAVA -cp $CLASSPATH $TEST_ARGS org.apache.qpid.tools.PerfTestController >> test.out & - waitfor test.out "Controller: Completed the test" - sleep 2 #give a grace period to shutdown - print_result $TEST_NAME -} - -print_result() -{ - prod_rate=`cat test.out | grep "Avg Producer rate" | awk '{print $5}'` - sys_rate=`cat test.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat test.out | grep "Avg Consumer rate" | awk '{print $5}'` - avg_latency=`cat test.out | grep "Avg System Latency" | awk '{print $5}'` - min_latency=`cat test.out | grep "Min System Latency" | awk '{print $5}'` - max_latency=`cat test.out | grep "Max System Latency" | awk '{print $5}'` - std_dev=`cat test.out | grep "Avg System Std Dev" | awk '{print $6}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|%7.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency $std_dev - echo "--------------------------------------------------------------------------------------------------------" -} - -trap cleanup EXIT - -rm -rf *.out - -if [ "$DURATION" = -1 ]; then - echo "Test report on " `date +%F` - echo "========================================================================================================" - echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|Std Dev|" - echo "--------------------------------------------------------------------------------------------------------" -else - echo "Test in progress....Tail stats-csv.log to see results being printed for each iteration." -fi - -run_controller diff --git a/java/tools/bin/perf-report b/java/tools/bin/perf_report.sh index 7de3f2b602..e6b4c987e5 100755 --- a/java/tools/bin/perf-report +++ b/java/tools/bin/perf_report.sh @@ -18,19 +18,23 @@ # under the License. # -# This will run the following test cases defined below and produce -# a report in tabular format. +# This will run the 8 use cases defined below and produce +# a report in tabular format. Refer to the documentation +# for more details. +SUB_MEM=-Xmx1024M +PUB_MEM=-Xmx1024M +LOG_CONFIG="-Damqj.logging.level=WARN" QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" TOPIC="amq.topic/test" DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" -COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" +. setenv.sh waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } cleanup() -{ +{ pids=`ps aux | grep java | grep Perf | awk '{print $2}'` if [ "$pids" != "" ]; then kill -3 $pids @@ -42,31 +46,30 @@ cleanup() # $2 consumer options # $3 producer options run_testcase() -{ - sh run-sub $COMMON_CONFIG $2 > sub.out & - sh run-pub $COMMON_CONFIG $3 > pub.out & - waitfor pub.out "Controller: Completed the test" +{ + sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out & + waitfor sub.out "Warming up" + sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out & + waitfor sub.out "Completed the test" + waitfor pub.out "Consumer has completed the test" sleep 2 #give a grace period to shutdown - print_result $1 - mv pub.out $1.pub.out - mv sub.out $1.sub.out + print_result $1 } print_result() { - prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'` - sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'` - avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'` - min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'` - max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency + prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'` + sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'` + cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'` + avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'` + min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'` + max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'` + + printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency echo "------------------------------------------------------------------------------------------------" } trap cleanup EXIT -rm -rf *.out #cleanup old files. echo "Test report on " `date +%F` echo "================================================================================================" @@ -79,7 +82,7 @@ echo "-------------------------------------------------------------------------- # setting very low values to start with and experiment while increasing them slowly. # Test 1 Trans Queue -run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" +#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" # Test 2 Dura Queue run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/java/tools/bin/qpid-bench b/java/tools/bin/qpid-bench index cd894b607f..c982e64efd 100755..100644 --- a/java/tools/bin/qpid-bench +++ b/java/tools/bin/qpid-bench @@ -18,6 +18,18 @@ # under the License. # -. check-qpid-java-env +if [ -z "$QPID_HOME" ]; then + export QPID_HOME=$(dirname $(dirname $(readlink -f $0))) + export PATH=${PATH}:${QPID_HOME}/bin +fi -$JAVA -cp $CLASSPATH -server $JAVA_MEM $LOG_CONFIG org.apache.qpid.tools.QpidBench "$@" +# Set classpath to include Qpid jar with all required jars in manifest +QPID_LIBS=$QPID_HOME/lib/qpid-all.jar + +# Set other variables used by the qpid-run script before calling +export JAVA=java \ + JAVA_VM=-server \ + JAVA_MEM=-Xmx1024m \ + QPID_CLASSPATH=$QPID_LIBS + +. qpid-run org.apache.qpid.tools.QpidBench "$@" diff --git a/java/tools/bin/qpid-python-testkit b/java/tools/bin/qpid-python-testkit index 7233d0d075..cbe7972421 100755 --- a/java/tools/bin/qpid-python-testkit +++ b/java/tools/bin/qpid-python-testkit @@ -22,12 +22,9 @@ # via the python test runner. The defaults are set for a running # from an svn checkout -. check-qpid-java-env +. ./set-testkit-env.sh export PYTHONPATH=./:$PYTHONPATH -echo $PYTHONPATH -if [ "$OUTDIR" = "" ] ; then - OUTDIR=$PWD -fi -testdir=$OUTDIR/testkit-out-`date +%F-%H-%M-%S` -qpid-python-test -m testkit -DOUTDIR=$testdir"$@" +rm -rf $OUTDIR +qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@" + diff --git a/java/tools/bin/run-sub b/java/tools/bin/run_pub.sh index 8449563f7f..91b9287dea 100755..100644 --- a/java/tools/bin/run-sub +++ b/java/tools/bin/run_pub.sh @@ -18,15 +18,7 @@ # under the License. # -. check-qpid-java-env - -echo "All args $@" - -JVM_ARGS="$1" -PROGRAM_ARGS="$2" - -echo "JVM ARGS : $JAVA_MEM $JVM_ARGS" -echo "PROGRAM ARGS : $PROGRAM_ARGS" - -$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfConsumer $PROGRAM_ARGS +. $QPID_TEST_HOME/bin/setenv.sh +echo "$@" +$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfProducer diff --git a/java/tools/bin/run-pub b/java/tools/bin/run_sub.sh index 9efe58c4b8..c9ad2fed74 100755..100644 --- a/java/tools/bin/run-pub +++ b/java/tools/bin/run_sub.sh @@ -18,11 +18,8 @@ # under the License. # -. check-qpid-java-env +. $QPID_TEST_HOME/bin/setenv.sh -JVM_ARGS="$1" -PROGRAM_ARGS="$2" +echo "$@" +$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfConsumer -echo "JVM ARGS : $JAVA_MEM $JVM_ARGS" -echo "PROGRAM ARGS : $PROGRAM_ARGS" -$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfProducer $PROGRAM_ARGS diff --git a/java/tools/bin/set-testkit-env.sh b/java/tools/bin/set-testkit-env.sh new file mode 100644 index 0000000000..051dad8179 --- /dev/null +++ b/java/tools/bin/set-testkit-env.sh @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# If QPIDD_EXEC ..etc is not set, it will first check to see +# if this is run from a qpid svn check out, if not it will look +# for installed rpms. + +abs_path() +{ + D=`dirname "$1"` + B=`basename "$1"` + echo "`cd \"$D\" 2>/dev/null && pwd || echo \"$D\"`/$B" +} + +# Environment for python tests + +if [ -d ../../../python ] ; then + PYTHON_DIR=../../../python + PYTHONPATH=$PYTHON_DIR:$PYTHON_DIR/qpid +elif [ -z `echo $PYTHONPATH | awk '$0 ~ /qpid/'` ]; then + echo "WARNING: skipping test, no qpid python scripts found ."; exit 0; +fi + + +if [ "$QPIDD_EXEC" = "" ] ; then + if [ -x ../../../cpp/src/qpidd ]; then + QPIDD_EXEC=`abs_path "../../../cpp/src/qpidd"` + elif [ -n "$(which qpidd)" ] ; then + QPIDD_EXEC=$(which qpidd) + else + echo "WARNING: skipping test, QPIDD_EXEC not set and qpidd not found."; exit 0; + fi +fi + +if [ "$CLUSTER_LIB" = "" ] ; then + if [ -x ../../../cpp/src/.libs/cluster.so ]; then + CLUSTER_LIB=`abs_path "../../../cpp/src/.libs/cluster.so"` + elif [ -e /usr/lib64/qpid/daemon/cluster.so ] ; then + CLUSTER_LIB="/usr/lib64/qpid/daemon/cluster.so" + elif [ -e /usr/lib/qpid/daemon/cluster.so ] ; then + CLUSTER_LIB="/usr/lib/qpid/daemon/cluster.so" + else + echo "WARNING: skipping test, CLUSTER_LIB not set and cluster.so not found."; exit 0; + fi +fi + +if [ "$STORE_LIB" = "" ] ; then + if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then + STORE_LIB="/usr/lib64/qpid/daemon/msgstore.so" + elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then + STORE_LIB="/usr/lib/qpid/daemon/msgstore.so" + #else + # echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0; + fi +fi + +if [ "$QP_CP" = "" ] ; then + if [ -d ../../build/lib/ ]; then + QP_JAR_PATH=`abs_path "../../build/lib/"` + elif [ -d /usr/share/java/qpid-deps ]; then + QP_JAR_PATH=`abs_path "/usr/share/java"` + else + "WARNING: skipping test, QP_CP not set and the Qpid jars are not present."; exit 0; + fi + QP_CP=`find $QP_JAR_PATH -name '*.jar' | tr '\n' ':'` +fi + +if [ "$OUTDIR" = "" ] ; then + OUTDIR=`abs_path "./output"` +fi + +export PYTHONPATH PYTHON_DIR QPIDD_EXEC CLUSTER_LIB QP_CP OUTDIR diff --git a/java/tools/bin/setenv.sh b/java/tools/bin/setenv.sh new file mode 100644 index 0000000000..24135e711b --- /dev/null +++ b/java/tools/bin/setenv.sh @@ -0,0 +1,49 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Compiles the test classes and sets the CLASSPATH + +# check for QPID_TEST_HOME +if [ "$QPID_TEST_HOME" = "" ] ; then + echo "ERROR: Please set QPID_TEST_HOME ...." + exit 1 +fi + +# check for JAVA_HOME +if [ "$JAVA_HOME" = "" ] ; then + echo "ERROR: Please set JAVA_HOME ...." + exit 1 +fi + +# VENDOR_LIB path needs to be set +# for Qpid set this to {qpid_checkout}/java/build/lib +if [ "$VENDOR_LIB" = "" ] ; then + echo "ERROR: Please set VENDOR_LIB path in the script ...." + exit 1 +fi + + +[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes + +CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"` +$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'` + +export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH + diff --git a/java/tools/bin/start-consumers b/java/tools/bin/start-consumers deleted file mode 100644 index c71fc0c21f..0000000000 --- a/java/tools/bin/start-consumers +++ /dev/null @@ -1,119 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME="start-consumers" -PROCESS_COUNT=1 -CON_COUNT=1 -MSG_COUNT=10000 -ADDRESS="queue;{create:always}" -UNIQUE_DEST="false" - -EXTRA_JVM_ARGS=" -Dmax_prefetch=500 " - -TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` - -TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ - --long connection-count:,process-count:,create-unique-queues-topics,\ -jvm-args:,queue:,topic:,address:,\ -msg-count:,help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: start-producers [option].." - - printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" - - printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" - - printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" - - printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" - - printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" - - printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" - - printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" - - printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -C|--connection-count) - CON_COUNT="$2"; shift; shift; continue - ;; - -P|--process-count) - PROCESS_COUNT="$2"; shift; shift; continue - ;; - -u|--create-unique-queues-topics) - UNIQUE_DEST="true"; shift; continue - ;; - --queue) - ADDRESS="$2;{create: always}"; shift; shift; continue - ;; - --topic) - ADDRESS="amq.topic/$2"; shift; shift; continue - ;; - --address) - ADDRESS="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - -c|--msg-count) - MSG_COUNT="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -CONSUMER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dcon_count=$CON_COUNT -Dprint_std_dev=true" - -start_consumers() -{ - for ((i=0; i<$PROCESS_COUNT; i++)) - do - if [ "$UNIQUE_DEST" = "true" ]; then - sh run-sub "$CONSUMER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.sub.out 2>&1 & - else - sh run-sub "$CONSUMER_ARGS $@" > ${TEST_ID}_$i.sub.out 2>&1 & - fi - done -} - -start_consumers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" - diff --git a/java/tools/bin/start-producers b/java/tools/bin/start-producers deleted file mode 100644 index 7ba0286f7c..0000000000 --- a/java/tools/bin/start-producers +++ /dev/null @@ -1,136 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME="start-producers" -PROCESS_COUNT=1 -CON_COUNT=1 -MSG_TYPE="bytes" -WARMUP_MSG_COUNT=1000 -MSG_COUNT=10000 -MSG_SIZE=1024 -ADDRESS="queue;{create:always}" -UNIQUE_DEST="false" - -EXTRA_JVM_ARGS="" -TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` - -TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ - --long connection-count:,process-count:,create-unique-queues-topics,\ -jvm-args:,queue:,topic:,address:,\ -msg-count:,msg-size:msg-type:,warmup-msg-count,help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: start-producers [option].." - - printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" - - printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" - - printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" - - printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" - - printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" - - printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" - - printf "\n%23s\n%37s\n" "-s, --msg-size=size" "message size (default 1024)" - - printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" - - printf "\n%18s\n%38s\n" "-t, --msg-type" "{bytes|text} (default bytes)" - - printf "\n%26s\n%49s\n" "-w, --warmup-msg-count" "warm up message count (default 100,000)" - - printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -C|--connection-count) - CON_COUNT="$2"; shift; shift; continue - ;; - -P|--process-count) - PROCESS_COUNT="$2"; shift; shift; continue - ;; - -u|--create-unique-queues-topics) - UNIQUE_DEST="true"; shift; continue - ;; - --queue) - ADDRESS="$2;{create: always}"; shift; shift; continue - ;; - --topic) - ADDRESS="amq.topic/$2"; shift; shift; continue - ;; - --address) - ADDRESS="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - -s|--msg-size) - MSG_SIZE="$2"; shift; shift; continue - ;; - -c|--msg-count) - MSG_COUNT="$2"; shift; shift; continue - ;; - -t|--msg_type) - MSG_TYPE="$2"; shift; shift; continue - ;; - -w|--warmup-msg-count) - WARMUP_MSG_COUNT="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -PRODUCER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dext_controller=true -Dprecision=mili -Dcon_count=$CON_COUNT" - -start_producers() -{ - for ((i=0; i<$PROCESS_COUNT; i++)) - do - if [ "$UNIQUE_DEST" = "true" ]; then - sh run-pub "$PRODUCER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.pub.out 2>&1 & - else - sh run-pub "$PRODUCER_ARGS $@" > ${TEST_ID}_$i.pub.out 2>&1 & - fi - done -} - -start_producers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dmsg_size=$MSG_SIZE -Dwarmup_count=$WARMUP_MSG_COUNT -Dmsg_type=$MSG_TYPE -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" - diff --git a/java/tools/etc/perf-report.gnu b/java/tools/etc/perf-report.gnu deleted file mode 100644 index 6d5020efb5..0000000000 --- a/java/tools/etc/perf-report.gnu +++ /dev/null @@ -1,42 +0,0 @@ -set terminal png -set datafile separator "," - -set title "Variation of avg latency between iterations" -set yrange [10:20] -set xlabel "Iterations" -set ylabel "Latency (ms)" -set output "avg_latency.png" -plot "stats-csv.log" using 9 title "avg latency" with lines, 14 title "target latency" with lines - - -set title "Variation of max latency between iterations" -set yrange [0:1000] -set xlabel "Iterations" -set ylabel "Latency (ms)" -set output "max_latency.png" -plot "stats-csv.log" using 11 title "max latency" with lines,14 title "target latency" with lines,100 title "100 ms" with lines - - -set title "Variation of standard deviation of latency between iterations" -set yrange [0:20] -set xlabel "Iterations" -set ylabel "Standard Deviation" -set output "std_dev_latency.png" -plot "stats-csv.log" using 12 title "standard deviation" with lines - - -set title "Variation of system throughput between iterations" -set yrange [400000:450000] -set xlabel "Iterations" -set ylabel "System Throuhgput (msg/sec)" -set output "system_rate.png" -plot "stats-csv.log" using 2 title "system throughput" with lines - - -set title "Variation of avg producer & consumer rates between iterations" -set yrange [6500:7500] -set xlabel "Iterations" -set ylabel "Avg Rates (msg/sec)" -set output "prod_cons_rate.png" -plot "stats-csv.log" using 6 title "producer rate" with lines,"stats-csv.log" using 3 title "consumer rate" with lines - diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java deleted file mode 100644 index 37369959a8..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java +++ /dev/null @@ -1,92 +0,0 @@ -package org.apache.qpid.tools; - -/** - * In the future this will be replaced by a Clock abstraction - * that can utilize a realtime clock when running in RT Java. - */ - -public class Clock -{ - private static Precision precision; - private static long offset = -1; // in nano secs - - public enum Precision - { - NANO_SECS, MILI_SECS; - - static Precision getPrecision(String str) - { - if ("mili".equalsIgnoreCase(str)) - { - return MILI_SECS; - } - else - { - return NANO_SECS; - } - } - }; - - static - { - precision = Precision.getPrecision(System.getProperty("precision","mili")); - //offset = Long.getLong("offset",-1); - - System.out.println("Using precision : " + precision + " and offset " + offset); - } - - public static Precision getPrecision() - { - return precision; - } - - public static long getTime() - { - if (precision == Precision.NANO_SECS) - { - if (offset == -1) - { - return System.nanoTime(); - } - else - { - return System.nanoTime() + offset; - } - } - else - { - if (offset == -1) - { - return System.currentTimeMillis(); - } - else - { - return System.currentTimeMillis() + offset/convertToMiliSecs(); - } - } - } - - public static long convertToSecs() - { - if (precision == Precision.NANO_SECS) - { - return 1000000000; - } - else - { - return 1000; - } - } - - public static long convertToMiliSecs() - { - if (precision == Precision.NANO_SECS) - { - return 1000000; - } - else - { - return 1; - } - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java index 90ee7e28ae..b88b242e6d 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java @@ -77,7 +77,7 @@ public class LatencyTest extends PerfBase implements MessageListener public LatencyTest() { - super(""); + super(); warmedUp = lock.newCondition(); testCompleted = lock.newCondition(); // Storing the following two for efficiency @@ -314,7 +314,7 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - final LatencyTest latencyTest = new LatencyTest(); + final LatencyTest latencyTest = new LatencyTest(); Runnable r = new Runnable() { public void run() @@ -334,16 +334,16 @@ public class LatencyTest extends PerfBase implements MessageListener } } }; - + Thread t; try { - t = Threading.getThreadFactory().createThread(r); + t = Threading.getThreadFactory().createThread(r); } catch(Exception e) { throw new Error("Error creating latency test thread",e); } - t.start(); + t.start(); } -} +}
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java index 121e94cea1..ac597d17de 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -20,113 +20,36 @@ */ package org.apache.qpid.tools; -import java.net.InetAddress; import java.text.DecimalFormat; -import java.util.UUID; +import java.util.Hashtable; import javax.jms.Connection; +import javax.jms.ConnectionFactory; import javax.jms.Destination; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.messaging.Address; public class PerfBase { - public final static String CODE = "CODE"; - public final static String ID = "ID"; - public final static String REPLY_ADDR = "REPLY_ADDR"; - public final static String MAX_LATENCY = "MAX_LATENCY"; - public final static String MIN_LATENCY = "MIN_LATENCY"; - public final static String AVG_LATENCY = "AVG_LATENCY"; - public final static String STD_DEV = "STD_DEV"; - public final static String CONS_RATE = "CONS_RATE"; - public final static String PROD_RATE = "PROD_RATE"; - public final static String MSG_COUNT = "MSG_COUNT"; - public final static String TIMESTAMP = "Timestamp"; - - String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); - TestParams params; Connection con; Session session; - Session controllerSession; Destination dest; - Destination myControlQueue; - Destination controllerQueue; + Destination feedbackDest; DecimalFormat df = new DecimalFormat("###.##"); - String id; - String myControlQueueAddr; - - MessageProducer sendToController; - MessageConsumer receiveFromController; - String prefix = ""; - enum OPCode { - REGISTER_CONSUMER, REGISTER_PRODUCER, - PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, - CONSUMER_READY, PRODUCER_READY, - PRODUCER_START, - RECEIVED_END_MSG, CONSUMER_STOP, - RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS, - CONTINUE_TEST, STOP_TEST - }; - - enum MessageType { - BYTES, TEXT, MAP, OBJECT; - - public static MessageType getType(String s) throws Exception - { - if ("text".equalsIgnoreCase(s)) - { - return TEXT; - } - else if ("bytes".equalsIgnoreCase(s)) - { - return BYTES; - } - /*else if ("map".equalsIgnoreCase(s)) - { - return MAP; - } - else if ("object".equalsIgnoreCase(s)) - { - return OBJECT; - }*/ - else - { - throw new Exception("Unsupported message type"); - } - } - }; - - MessageType msgType = MessageType.BYTES; - - public PerfBase(String prefix) + public PerfBase() { params = new TestParams(); - String host = ""; - try - { - host = InetAddress.getLocalHost().getHostName(); - } - catch (Exception e) - { - } - id = host + "-" + UUID.randomUUID().toString(); - this.prefix = prefix; - this.myControlQueueAddr = id + ";{create: always}"; } public void setUp() throws Exception - { + { + if (params.getHost().equals("") || params.getPort() == -1) { con = new AMQConnection(params.getUrl()); @@ -139,78 +62,7 @@ public class PerfBase session = con.createSession(params.isTransacted(), params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); - controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - - dest = createDestination(); - controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); - myControlQueue = session.createQueue(myControlQueueAddr); - msgType = MessageType.getType(params.getMessageType()); - System.out.println("Using " + msgType + " messages"); - - sendToController = controllerSession.createProducer(controllerQueue); - receiveFromController = controllerSession.createConsumer(myControlQueue); - } - - private Destination createDestination() throws Exception - { - if (params.isUseUniqueDests()) - { - System.out.println("Prefix : " + prefix); - Address addr = Address.parse(params.getAddress()); - AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); - int type = ((AMQSession_0_10)session).resolveAddressType(temp); - - if ( type == AMQDestination.TOPIC_TYPE) - { - addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); - System.out.println("Setting subject : " + addr); - } - else - { - addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions()); - System.out.println("Setting name : " + addr); - } - - return new AMQAnyDestination(addr); - } - else - { - return new AMQAnyDestination(params.getAddress()); - } - } - - public synchronized void sendMessageToController(MapMessage m) throws Exception - { - m.setString(ID, id); - m.setString(REPLY_ADDR,myControlQueueAddr); - sendToController.send(m); - } - - public void receiveFromController(OPCode expected) throws Exception - { - MapMessage m = (MapMessage)receiveFromController.receive(); - OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); - if (expected != code) - { - throw new Exception("Expected OPCode : " + expected + " but received : " + code); - } - - } - - public boolean continueTest() throws Exception - { - MapMessage m = (MapMessage)receiveFromController.receive(); - OPCode code = OPCode.values()[m.getInt(CODE)]; - System.out.println("Received Code : " + code); - return (code == OPCode.CONTINUE_TEST); - } - - public void tearDown() throws Exception - { - session.close(); - controllerSession.close(); - con.close(); + dest = new AMQAnyDestination(params.getAddress()); } public void handleError(Exception e,String msg) diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java index b63892bb51..0ef0455a64 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -20,17 +20,13 @@ */ package org.apache.qpid.tools; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.TextMessage; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -51,7 +47,7 @@ import org.apache.qpid.thread.Threading; * b) They are on separate machines that have their time synced via a Time Server * * In order to calculate latency the producer inserts a timestamp - * when the message is sent. The consumer will note the current time the message is + * hen the message is sent. The consumer will note the current time the message is * received and will calculate the latency as follows * latency = rcvdTime - msg.getJMSTimestamp() * @@ -59,9 +55,13 @@ import org.apache.qpid.thread.Threading; * variance in latencies. * * Avg latency is measured by adding all latencies and dividing by the total msgs. + * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount * * Throughput * =========== + * System throughput is calculated as follows + * rcvdMsgCount/(rcvdTime - testStartTime) + * * Consumer rate is calculated as * rcvdMsgCount/(rcvdTime - startTime) * @@ -81,160 +81,130 @@ public class PerfConsumer extends PerfBase implements MessageListener long minLatency = Long.MAX_VALUE; long totalLatency = 0; // to calculate avg latency. int rcvdMsgCount = 0; + long testStartTime = 0; // to measure system throughput long startTime = 0; // to measure consumer throughput long rcvdTime = 0; boolean transacted = false; int transSize = 0; - boolean printStdDev = false; - List<Long> sample; - final Object lock = new Object(); - public PerfConsumer(String prefix) + public PerfConsumer() { - super(prefix); - System.out.println("Consumer ID : " + id); + super(); } public void setUp() throws Exception { super.setUp(); consumer = session.createConsumer(dest); - System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); // Storing the following two for efficiency transacted = params.isTransacted(); transSize = params.getTransactionSize(); - printStdDev = params.isPrintStdDev(); - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); - sendMessageToController(m); } public void warmup()throws Exception { - receiveFromController(OPCode.CONSUMER_STARTWARMUP); - Message msg = consumer.receive(); - // This is to ensure we drain the queue before we start the actual test. - while ( msg != null) + System.out.println("Warming up......"); + + boolean start = false; + while (!start) { - if (msg.getBooleanProperty("End") == true) + Message msg = consumer.receive(); + if (msg instanceof TextMessage) { - // It's more realistic for the consumer to signal this. - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); - sendMessageToController(m); + if (((TextMessage)msg).getText().equals("End")) + { + start = true; + MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); + temp.send(session.createMessage()); + if (params.isTransacted()) + { + session.commit(); + } + temp.close(); + } } - msg = consumer.receive(1000); - } - - if (params.isTransacted()) - { - session.commit(); } - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); - sendMessageToController(m); - consumer.setMessageListener(this); } public void startTest() throws Exception { - System.out.println("Consumer: " + id + " Starting test......" + "\n"); - resetCounters(); + System.out.println("Starting test......"); + consumer.setMessageListener(this); } - public void resetCounters() + public void printResults() throws Exception { - rcvdMsgCount = 0; - maxLatency = 0; - minLatency = Long.MAX_VALUE; - totalLatency = 0; - if (printStdDev) + synchronized (lock) { - sample = null; - sample = new ArrayList<Long>(params.getMsgCount()); + lock.wait(); } - } - - public void sendResults() throws Exception - { - receiveFromController(OPCode.CONSUMER_STOP); double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime); - double stdDev = 0.0; - if (printStdDev) - { - stdDev = calculateStdDev(avgLatency); - } - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); - m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs()); - m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs()); - m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs()); - m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs()); - m.setDouble(CONS_RATE, consRate); - m.setLong(MSG_COUNT, rcvdMsgCount); - sendMessageToController(m); - + double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; + double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); System.out.println(new StringBuilder("Consumer rate : "). append(df.format(consRate)). append(" msg/sec").toString()); + System.out.println(new StringBuilder("System Throughput : "). + append(df.format(throughput)). + append(" msg/sec").toString()); System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency/Clock.convertToMiliSecs())). + append(df.format(avgLatency)). append(" ms").toString()); System.out.println(new StringBuilder("Min Latency : "). - append(df.format(minLatency/Clock.convertToMiliSecs())). + append(minLatency). append(" ms").toString()); System.out.println(new StringBuilder("Max Latency : "). - append(df.format(maxLatency/Clock.convertToMiliSecs())). + append(maxLatency). append(" ms").toString()); - if (printStdDev) - { - System.out.println(new StringBuilder("Std Dev : "). - append(stdDev/Clock.convertToMiliSecs()).toString()); - } + System.out.println("Completed the test......\n"); } - public double calculateStdDev(double mean) + public void notifyCompletion(Destination replyTo) throws Exception { - double v = 0; - for (double latency: sample) + MessageProducer tmp = session.createProducer(replyTo); + Message endMsg = session.createMessage(); + tmp.send(endMsg); + if (params.isTransacted()) { - v = v + Math.pow((latency-mean), 2); + session.commit(); } - v = v/sample.size(); - return Math.round(Math.sqrt(v)); + tmp.close(); + } + + public void tearDown() throws Exception + { + consumer.close(); + session.close(); + con.close(); } public void onMessage(Message msg) { try { - // To figure out the decoding overhead of text - if (msgType == MessageType.TEXT) + if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) { - ((TextMessage)msg).getText(); - } + notifyCompletion(msg.getJMSReplyTo()); - if (msg.getBooleanProperty("End")) - { - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); - sendMessageToController(m); + synchronized (lock) + { + lock.notifyAll(); + } } else { - rcvdTime = Clock.getTime(); + rcvdTime = System.currentTimeMillis(); rcvdMsgCount ++; if (rcvdMsgCount == 1) { startTime = rcvdTime; + testStartTime = msg.getJMSTimestamp(); } if (transacted && (rcvdMsgCount % transSize == 0)) @@ -242,14 +212,10 @@ public class PerfConsumer extends PerfBase implements MessageListener session.commit(); } - long latency = rcvdTime - msg.getLongProperty(TIMESTAMP); + long latency = rcvdTime - msg.getJMSTimestamp(); maxLatency = Math.max(maxLatency, latency); minLatency = Math.min(minLatency, latency); totalLatency = totalLatency + latency; - if (printStdDev) - { - sample.add(latency); - } } } @@ -260,21 +226,14 @@ public class PerfConsumer extends PerfBase implements MessageListener } - public void run() + public void test() { try { setUp(); warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Consumer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } + startTest(); + printResults(); tearDown(); } catch(Exception e) @@ -283,43 +242,26 @@ public class PerfConsumer extends PerfBase implements MessageListener } } - @Override - public void tearDown() throws Exception - { - super.tearDown(); - } - - public static void main(String[] args) throws InterruptedException + public static void main(String[] args) { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) + final PerfConsumer cons = new PerfConsumer(); + Runnable r = new Runnable() { - - final PerfConsumer cons = new PerfConsumer(scriptId + i); - Runnable r = new Runnable() - { - public void run() - { - cons.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) + public void run() { - throw new Error("Error creating consumer thread",e); + cons.test(); } - t.start(); - + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); } - testCompleted.await(); - System.out.println("Consumers have completed the test......\n"); + t.start(); } }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java index ac6129ab68..015d1e6205 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -23,15 +23,13 @@ package org.apache.qpid.tools; import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.concurrent.CountDownLatch; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; -import javax.jms.MapMessage; import javax.jms.Message; +import javax.jms.MessageConsumer; import javax.jms.MessageProducer; -import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -53,52 +51,38 @@ import org.apache.qpid.thread.Threading; * System throughput and latencies calculated by the PerfConsumer are more realistic * numbers. * - * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs - * I have done so far, it seems quite useful to compute the producer rate as it gives an - * indication of how the system behaves. For ex if there is a gap between producer and consumer rates - * you could clearly see the higher latencies and when producer and consumer rates are very close, - * latency is good. - * */ public class PerfProducer extends PerfBase { - private static long SEC = 60000; - MessageProducer producer; Message msg; - Object payload; - List<Object> payloads; + byte[] payload; + List<byte[]> payloads; boolean cacheMsg = false; boolean randomMsgSize = false; boolean durable = false; Random random; int msgSizeRange = 1024; - boolean rateLimitProducer = false; - double rateFactor = 0.4; - double rate = 0.0; - - public PerfProducer(String prefix) + + public PerfProducer() { - super(prefix); - System.out.println("Producer ID : " + id); + super(); } public void setUp() throws Exception { super.setUp(); - durable = params.isDurable(); - rateLimitProducer = params.getRate() > 0 ? true : false; - if (rateLimitProducer) - { - System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec"); - } + feedbackDest = session.createTemporaryQueue(); + durable = params.isDurable(); + // if message caching is enabled we pre create the message // else we pre create the payload if (params.isCacheMessage()) { cacheMsg = true; - msg = createMessage(createPayload(params.getMsgSize())); + + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); msg.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT @@ -109,52 +93,21 @@ public class PerfProducer extends PerfBase random = new Random(20080921); randomMsgSize = true; msgSizeRange = params.getMsgSize(); - payloads = new ArrayList<Object>(msgSizeRange); - + payloads = new ArrayList<byte[]>(msgSizeRange); + for (int i=0; i < msgSizeRange; i++) { - payloads.add(createPayload(i)); + payloads.add(MessageFactory.createMessagePayload(i).getBytes()); } - } + } else { - payload = createPayload(params.getMsgSize()); + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); } producer = session.createProducer(dest); - System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); producer.setDisableMessageID(params.isDisableMessageID()); producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); - sendMessageToController(m); - } - - Object createPayload(int size) - { - if (msgType == MessageType.TEXT) - { - return MessageFactory.createMessagePayload(size); - } - else - { - return MessageFactory.createMessagePayload(size).getBytes(); - } - } - - Message createMessage(Object payload) throws Exception - { - if (msgType == MessageType.TEXT) - { - return session.createTextMessage((String)payload); - } - else - { - BytesMessage m = session.createBytesMessage(); - m.writeBytes((byte[])payload); - return m; - } } protected Message getNextMessage() throws Exception @@ -164,130 +117,117 @@ public class PerfProducer extends PerfBase return msg; } else - { - Message m; - + { + msg = session.createBytesMessage(); + if (!randomMsgSize) { - m = createMessage(payload); + ((BytesMessage)msg).writeBytes(payload); } else { - m = createMessage(payloads.get(random.nextInt(msgSizeRange))); + ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); } - m.setJMSDeliveryMode(durable? + msg.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT ); - return m; + return msg; } } public void warmup()throws Exception { - receiveFromController(OPCode.PRODUCER_STARTWARMUP); - System.out.println("Producer: " + id + " Warming up......"); + System.out.println("Warming up......"); + MessageConsumer tmp = session.createConsumer(feedbackDest); for (int i=0; i < params.getWarmupCount() -1; i++) { producer.send(getNextMessage()); } - sendEndMessage(); + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); if (params.isTransacted()) { session.commit(); } + + tmp.close(); } public void startTest() throws Exception { - resetCounters(); - receiveFromController(OPCode.PRODUCER_START); + System.out.println("Starting test......"); int count = params.getMsgCount(); boolean transacted = params.isTransacted(); int tranSize = params.getTransactionSize(); - long limit = (long)(params.getRate() * rateFactor); // in msecs - long timeLimit = (long)(SEC * rateFactor); // in msecs - - long start = Clock.getTime(); // defaults to nano secs - long interval = start; + long start = System.currentTimeMillis(); for(int i=0; i < count; i++ ) { Message msg = getNextMessage(); - msg.setLongProperty(TIMESTAMP, Clock.getTime()); + msg.setJMSTimestamp(System.currentTimeMillis()); producer.send(msg); if ( transacted && ((i+1) % tranSize == 0)) { session.commit(); } - - if (rateLimitProducer && i%limit == 0) - { - long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs - if (elapsed < timeLimit) - { - Thread.sleep(elapsed); - } - interval = Clock.getTime(); - - } - } - sendEndMessage(); - if ( transacted) - { - session.commit(); } - long time = Clock.getTime() - start; - rate = (double)count*Clock.convertToSecs()/(double)time; + long time = System.currentTimeMillis() - start; + double rate = ((double)count/(double)time)*1000; System.out.println(new StringBuilder("Producer rate: "). append(df.format(rate)). append(" msg/sec"). toString()); } - public void resetCounters() + public void waitForCompletion() throws Exception { + MessageConsumer tmp = session.createConsumer(feedbackDest); + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); - } + if (params.isTransacted()) + { + session.commit(); + } - public void sendEndMessage() throws Exception - { - Message msg = session.createMessage(); - msg.setBooleanProperty("End", true); - producer.send(msg); - } + tmp.receive(); - public void sendResults() throws Exception - { - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); - msg.setDouble(PROD_RATE, rate); - sendMessageToController(msg); + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + System.out.println("Consumer has completed the test......"); } - @Override public void tearDown() throws Exception { - super.tearDown(); + producer.close(); + session.close(); + con.close(); } - public void run() + public void test() { try { setUp(); warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Producer: " + id + " starting a new iteration ......\n"); - startTest(); - sendResults(); - nextIteration = continueTest(); - } + startTest(); + waitForCompletion(); tearDown(); } catch(Exception e) @@ -296,63 +236,27 @@ public class PerfProducer extends PerfBase } } - public void startControllerIfNeeded() + + public static void main(String[] args) { - if (!params.isExternalController()) + final PerfProducer prod = new PerfProducer(); + Runnable r = new Runnable() { - final PerfTestController controller = new PerfTestController(); - Runnable r = new Runnable() - { - public void run() - { - controller.run(); - } - }; - - Thread t; - try + public void run() { - t = Threading.getThreadFactory().createThread(r); + prod.test(); } - catch(Exception e) - { - throw new Error("Error creating controller thread",e); - } - t.start(); + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); } - } - - - public static void main(String[] args) throws InterruptedException - { - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = Integer.getInteger("con_count",1); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) + catch(Exception e) { - final PerfProducer prod = new PerfProducer(scriptId + i); - prod.startControllerIfNeeded(); - Runnable r = new Runnable() - { - public void run() - { - prod.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); + throw new Error("Error creating producer thread",e); } - testCompleted.await(); - System.out.println("Producers have completed the test......"); + t.start(); } }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java deleted file mode 100644 index 5c98c645f4..0000000000 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java +++ /dev/null @@ -1,422 +0,0 @@ -package org.apache.qpid.tools; - -import java.io.FileWriter; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; - -import org.apache.qpid.client.message.AMQPEncodedMapMessage; - -/** - * The Controller coordinates a test run between a number - * of producers and consumers, configured via -Dprod_count and -Dcons_count. - * - * It waits till all the producers and consumers have registered and then - * conducts a warmup run. Once all consumers and producers have completed - * the warmup run and is ready, it will conduct the actual test run and - * collect all stats from the participants and calculates the system - * throughput, the avg/min/max for producer rates, consumer rates and latency. - * - * These stats are then printed to std out. - * The Controller also prints events to std out to give a running account - * of the test run in progress. Ex registering of participants, starting warmup ..etc. - * This allows a scripting tool to monitor the progress. - * - * The Controller can be run in two modes. - * 1. A single test run (default) where it just runs until the message count specified - * for the producers via -Dmsg_count is sent and received. - * - * 2. Time based, configured via -Dduration=x, where x is in mins. - * In this mode, the Controller repeatedly cycles through the tests (after an initial - * warmup run) until the desired time is reached. If a test run is in progress - * and the time is up, it will allow the run the complete. - * - * After each iteration, the stats will be printed out in csv format to a separate log file. - * System throughput is calculated as follows - * totalMsgCount/(totalTestTime) - */ -public class PerfTestController extends PerfBase implements MessageListener -{ - enum TestMode { SINGLE_RUN, TIME_BASED }; - - TestMode testMode = TestMode.SINGLE_RUN; - - long totalTestTime; - - private double avgSystemLatency = 0.0; - private double minSystemLatency = Double.MAX_VALUE; - private double maxSystemLatency = 0; - private double avgSystemLatencyStdDev = 0.0; - - private double avgSystemConsRate = 0.0; - private double maxSystemConsRate = 0.0; - private double minSystemConsRate = Double.MAX_VALUE; - - private double avgSystemProdRate = 0.0; - private double maxSystemProdRate = 0.0; - private double minSystemProdRate = Double.MAX_VALUE; - - private long totalMsgCount = 0; - private double totalSystemThroughput = 0.0; - - private int consumerCount = Integer.getInteger("cons_count", 1); - private int producerCount = Integer.getInteger("prod_count", 1); - private int duration = Integer.getInteger("duration", -1); // in mins - private Map<String,MapMessage> consumers; - private Map<String,MapMessage> producers; - - private CountDownLatch consRegistered; - private CountDownLatch prodRegistered; - private CountDownLatch consReady; - private CountDownLatch prodReady; - private CountDownLatch receivedEndMsg; - private CountDownLatch receivedConsStats; - private CountDownLatch receivedProdStats; - - private MessageConsumer consumer; - private boolean printStdDev = false; - FileWriter writer; - - public PerfTestController() - { - super(""); - consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount); - producers = new ConcurrentHashMap<String,MapMessage>(producerCount); - - consRegistered = new CountDownLatch(consumerCount); - prodRegistered = new CountDownLatch(producerCount); - consReady = new CountDownLatch(consumerCount); - prodReady = new CountDownLatch(producerCount); - printStdDev = params.isPrintStdDev(); - testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; - } - - public void setUp() throws Exception - { - super.setUp(); - if (testMode == TestMode.TIME_BASED) - { - writer = new FileWriter("stats-csv.log"); - } - consumer = controllerSession.createConsumer(controllerQueue); - System.out.println("\nController: " + producerCount + " producers are expected"); - System.out.println("Controller: " + consumerCount + " consumers are expected \n"); - consumer.setMessageListener(this); - consRegistered.await(); - prodRegistered.await(); - System.out.println("\nController: All producers and consumers have registered......\n"); - } - - public void warmup() throws Exception - { - System.out.println("Controller initiating warm up sequence......"); - sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); - sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); - prodReady.await(); - consReady.await(); - System.out.println("\nController : All producers and consumers are ready to start the test......\n"); - } - - public void startTest() throws Exception - { - resetCounters(); - System.out.println("\nController Starting test......"); - long start = Clock.getTime(); - sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); - receivedEndMsg.await(); - totalTestTime = Clock.getTime() - start; - sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values()); - receivedProdStats.await(); - receivedConsStats.await(); - } - - public void resetCounters() - { - minSystemLatency = Double.MAX_VALUE; - maxSystemLatency = 0; - maxSystemConsRate = 0.0; - minSystemConsRate = Double.MAX_VALUE; - maxSystemProdRate = 0.0; - minSystemProdRate = Double.MAX_VALUE; - - totalMsgCount = 0; - - receivedConsStats = new CountDownLatch(consumerCount); - receivedProdStats = new CountDownLatch(producerCount); - receivedEndMsg = new CountDownLatch(producerCount); - } - - public void calcStats() throws Exception - { - double totLatency = 0.0; - double totStdDev = 0.0; - double totalConsRate = 0.0; - double totalProdRate = 0.0; - - MapMessage conStat = null; // for error handling - try - { - for (MapMessage m: consumers.values()) - { - conStat = m; - minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY)); - maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY)); - totLatency = totLatency + m.getDouble(AVG_LATENCY); - totStdDev = totStdDev + m.getDouble(STD_DEV); - - minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE)); - maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE)); - totalConsRate = totalConsRate + m.getDouble(CONS_RATE); - - totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT); - } - } - catch(Exception e) - { - System.out.println("Error calculating stats from Consumer : " + conStat); - } - - - MapMessage prodStat = null; // for error handling - try - { - for (MapMessage m: producers.values()) - { - prodStat = m; - minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE)); - maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE)); - totalProdRate = totalProdRate + m.getDouble(PROD_RATE); - } - } - catch(Exception e) - { - System.out.println("Error calculating stats from Producer : " + conStat); - } - - avgSystemLatency = totLatency/consumers.size(); - avgSystemLatencyStdDev = totStdDev/consumers.size(); - avgSystemConsRate = totalConsRate/consumers.size(); - avgSystemProdRate = totalProdRate/producers.size(); - - System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); - - totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); - } - - public void printResults() throws Exception - { - System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(totalSystemThroughput)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Consumer rate : "). - append(df.format(avgSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Consumer rate : "). - append(df.format(minSystemConsRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Consumer rate : "). - append(df.format(maxSystemConsRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg Producer rate : "). - append(df.format(avgSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Min Producer rate : "). - append(df.format(minSystemProdRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Max Producer rate : "). - append(df.format(maxSystemProdRate)). - append(" msg/sec").toString()); - - System.out.println(new StringBuilder("Avg System Latency : "). - append(df.format(avgSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min System Latency : "). - append(df.format(minSystemLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Max System Latency : "). - append(df.format(maxSystemLatency)). - append(" ms").toString()); - if (printStdDev) - { - System.out.println(new StringBuilder("Avg System Std Dev : "). - append(avgSystemLatencyStdDev)); - } - } - - private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception - { - System.out.println("\nController: Sending code " + code); - MessageProducer tmpProd = controllerSession.createProducer(null); - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, code.ordinal()); - for (MapMessage node : nodes) - { - if (node.getString(REPLY_ADDR) == null) - { - System.out.println("REPLY_ADDR is null " + node); - } - else - { - System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); - } - tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); - } - } - - public void onMessage(Message msg) - { - try - { - MapMessage m = (MapMessage)msg; - OPCode code = OPCode.values()[m.getInt(CODE)]; - - System.out.println("\n---------Controller Received Code : " + code); - System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); - - switch (code) - { - case REGISTER_CONSUMER : - if (consRegistered.getCount() == 0) - { - System.out.println("Warning : Expected number of consumers have already registered," + - "ignoring extra consumer"); - break; - } - consumers.put(m.getString(ID),m); - consRegistered.countDown(); - break; - - case REGISTER_PRODUCER : - if (prodRegistered.getCount() == 0) - { - System.out.println("Warning : Expected number of producers have already registered," + - "ignoring extra producer"); - break; - } - producers.put(m.getString(ID),m); - prodRegistered.countDown(); - break; - - case CONSUMER_READY : - consReady.countDown(); - break; - - case PRODUCER_READY : - prodReady.countDown(); - break; - - case RECEIVED_END_MSG : - receivedEndMsg.countDown(); - break; - - case RECEIVED_CONSUMER_STATS : - consumers.put(m.getString(ID),m); - receivedConsStats.countDown(); - break; - - case RECEIVED_PRODUCER_STATS : - producers.put(m.getString(ID),m); - receivedProdStats.countDown(); - break; - - default: - throw new Exception("Invalid OPCode " + code); - } - } - catch (Exception e) - { - handleError(e,"Error when receiving messages " + msg); - } - } - - public void run() - { - try - { - setUp(); - warmup(); - if (testMode == TestMode.SINGLE_RUN) - { - startTest(); - calcStats(); - printResults(); - } - else - { - long startTime = Clock.getTime(); - long timeLimit = duration * 60 * 1000; // duration is in mins. - boolean nextIteration = true; - while (nextIteration) - { - startTest(); - calcStats(); - writeStatsToFile(); - if (Clock.getTime() - startTime < timeLimit) - { - sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); - sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); - nextIteration = true; - } - else - { - nextIteration = false; - } - } - } - tearDown(); - - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - @Override - public void tearDown() throws Exception { - System.out.println("Controller: Completed the test......\n"); - if (testMode == TestMode.TIME_BASED) - { - writer.close(); - } - sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); - sendMessageToNodes(OPCode.STOP_TEST,producers.values()); - super.tearDown(); - } - - public void writeStatsToFile() throws Exception - { - writer.append(String.valueOf(totalMsgCount)).append(","); - writer.append(df.format(totalSystemThroughput)).append(","); - writer.append(df.format(avgSystemConsRate)).append(","); - writer.append(df.format(minSystemConsRate)).append(","); - writer.append(df.format(maxSystemConsRate)).append(","); - writer.append(df.format(avgSystemProdRate)).append(","); - writer.append(df.format(minSystemProdRate)).append(","); - writer.append(df.format(maxSystemProdRate)).append(","); - writer.append(df.format(avgSystemLatency)).append(","); - writer.append(df.format(minSystemLatency)).append(","); - writer.append(df.format(maxSystemLatency)); - if (printStdDev) - { - writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); - } - writer.append("\n"); - writer.flush(); - } - - public static void main(String[] args) - { - PerfTestController controller = new PerfTestController(); - controller.run(); - } -} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java index d73be0181b..89d6462a39 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -25,25 +25,25 @@ import javax.jms.Session; public class TestParams { /* - * By default the connection URL is used. + * By default the connection URL is used. * This allows a user to easily specify a fully fledged URL any given property. * Ex. SSL parameters - * + * * By providing a host & port allows a user to simply override the URL. * This allows to create multiple clients in test scripts easily, - * without having to deal with the long URL format. + * without having to deal with the long URL format. */ private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; - + private String host = ""; - + private int port = -1; private String address = "queue; {create : always}"; private int msg_size = 1024; - private int random_msg_size_start_from = 1; + private int msg_type = 1; // not used yet private boolean cacheMessage = false; @@ -62,28 +62,19 @@ public class TestParams private int msg_count = 10; private int warmup_count = 1; - + private boolean random_msg_size = false; - private String msgType = "bytes"; - - private boolean printStdDev = false; - - private long rate = -1; - - private boolean externalController = false; - - private boolean useUniqueDest = false; // useful when using multiple connections. - public TestParams() { - + url = System.getProperty("url",url); host = System.getProperty("host",""); port = Integer.getInteger("port", -1); - address = System.getProperty("address",address); + address = System.getProperty("address","queue"); msg_size = Integer.getInteger("msg_size", 1024); + msg_type = Integer.getInteger("msg_type",1); cacheMessage = Boolean.getBoolean("cache_msg"); disableMessageID = Boolean.getBoolean("disableMessageID"); disableTimestamp = Boolean.getBoolean("disableTimestamp"); @@ -94,12 +85,6 @@ public class TestParams msg_count = Integer.getInteger("msg_count",msg_count); warmup_count = Integer.getInteger("warmup_count",warmup_count); random_msg_size = Boolean.getBoolean("random_msg_size"); - msgType = System.getProperty("msg_type","bytes"); - printStdDev = Boolean.getBoolean("print_std_dev"); - rate = Long.getLong("rate",-1); - externalController = Boolean.getBoolean("ext_controller"); - useUniqueDest = Boolean.getBoolean("use_unique_dest"); - random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1); } public String getUrl() @@ -137,9 +122,9 @@ public class TestParams return msg_size; } - public int getRandomMsgSizeStartFrom() + public int getMsgType() { - return random_msg_size_start_from; + return msg_type; } public boolean isDurable() @@ -176,39 +161,10 @@ public class TestParams { return disableTimestamp; } - + public boolean isRandomMsgSize() { return random_msg_size; } - public String getMessageType() - { - return msgType; - } - - public boolean isPrintStdDev() - { - return printStdDev; - } - - public long getRate() - { - return rate; - } - - public boolean isExternalController() - { - return externalController; - } - - public void setAddress(String addr) - { - address = addr; - } - - public boolean isUseUniqueDests() - { - return useUniqueDest; - } } |
