diff options
Diffstat (limited to 'java/tools')
20 files changed, 1636 insertions, 376 deletions
diff --git a/java/tools/bin/Profile-run-from-source b/java/tools/bin/Profile-run-from-source new file mode 100755 index 0000000000..f8ec45ccff --- /dev/null +++ b/java/tools/bin/Profile-run-from-source @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Sets the environment for running the scripts from a source checkout. +txtbld=$(tput bold) # Bold +txtrst=$(tput sgr0) # Reset +txtred=$(tput setaf 1) # red +txtgreen=$(tput setaf 2) # green + +echo "${txtbld}Setting the environment to run qpid java tools from a source checkout${txtrst}" + +abs_path() +{ + D=`dirname "$1"` + echo "`cd \"$D\" 2>/dev/null && pwd`" +} + +export QPID_CHECKOUT=`abs_path "../../../../"` +echo "${txtgreen}Using source checkout at $QPID_CHECKOUT${txtrst}" + +export PATH=$QPID_CHECKOUT/java/tools/bin:$PATH + +if [ "$JAVA" = "" ] ; then + export JAVA=$(which java) +fi + +#------------- Required for perf_report, qpid-bench & qpid-python-testkit ---------------- + +export VENDOR_LIB=$QPID_CHECKOUT/java/build/lib +export CLASSPATH=`find $VENDOR_LIB -name '*.jar' | tr '\n' ':'` +export LOG_CONFIG="-Dlog4j.configuration=file:///$QPID_CHECKOUT/java/tools/etc/test.log4j" + + +#------------- Required for qpid-python-testkit ----------------------------------------- + +PYTHONPATH=$QPID_CHECKOUT/python/qpid:$QPID_CHECKOUT/cpp/src/test/brokertest.py:$PYTHONPATH +export PATH=$QPID_CHECKOUT/python:$PATH + +if [ -x $QPID_CHECKOUT/cpp/src/qpidd ]; then + QPIDD_EXEC=$QPID_CHECKOUT/cpp/src/qpidd +else + echo "${txtred}WARNING: Qpid CPP broker executable not found. You will not be able to run qpid-python-testkit${txtrst}" +fi + +if [ -x $QPID_CHECKOUT/cpp/src/.libs/cluster.so ]; then + CLUSTER_LIB=$QPID_CHECKOUT/cpp/src/.libs/cluster.so +else + echo "${txtred}WARNING: Qpid cluster.so not found.You will not be able to run qpid-python-testkit${txtrst}" +fi + +if [ "$STORE_LIB" = "" ] ; then + echo "${txtred}WARNING: Please point the STORE_LIB variable to the message store module. If not persistence tests will not write messages to disk.${txtrst}" +fi + +export PYTHONPATH QPIDD_EXEC CLUSTER_LIB diff --git a/java/tools/bin/check-qpid-java-env b/java/tools/bin/check-qpid-java-env new file mode 100755 index 0000000000..dedd6e06ea --- /dev/null +++ b/java/tools/bin/check-qpid-java-env @@ -0,0 +1,38 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +if [ -z "$LOG_CONFIG" ]; then + echo "Please set the appropriate parameters for logging as it may affect performance. Ex log4j defaults to DEBUG if not configured properly" + exit -1 +fi + +if [ -z "$JAVA_MEM" ]; then + JAVA_MEM=-Xmx1024m +fi + +if [ -z "$JAVA" ]; then + echo "Please set the path to the correct java executable to JAVA" + exit -1 +fi + +if [ -z "$CLASSPATH" ]; then + echo "Please set the $CLASSPATH variable to point to the jar/class files" + exit -1 +fi diff --git a/java/tools/bin/controller b/java/tools/bin/controller new file mode 100644 index 0000000000..fab8614039 --- /dev/null +++ b/java/tools/bin/controller @@ -0,0 +1,132 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This starts the controller for coordinating perf tests/ + +. check-qpid-java-env + +PROGRAM_NAME=controller +CONSUMER_COUNT=1 +PRODUCER_COUNT=1 +DURATION=-1 +TEST_NAME="TEST_NAME" +EXTRA_JVM_ARGS="" + +TEMP=$(getopt -n $PROGRAM_NAME -o c:p:d:n:a:h --long consumers:,producers:,jvm-args:help -- "$@") + +usage() +{ + printf "\n%s\n" "Usage: controller [option].." + + printf "\n%31s\n%52s\n" "-c, --consumer-count=count" "No of consumers participating in the test" + + printf "\n%31s\n%52s\n" "-p, --producer-count=count" "No of producers participating in the test" + + printf "\n%24s\n%94s\n" "-d, --duration=mins" "The duration of the test in mins. If not specified, it will just run one iteration." + + printf "\n%27s\n%32s\n" "-n, --name=<test-name>" "The name of the test." + + printf "\n%19s\n%50s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" +} + +eval set -- "$TEMP" +while true; do + case $1 in + -c|--consumer-count) + CONSUMER_COUNT="$2"; shift; shift; continue + ;; + -p|--producer-count) + PRODUCER_COUNT="$2"; shift; shift; continue + ;; + -d|--duration) + DURATION="$2"; shift; shift; continue + ;; + -n|--name) + TEST_NAME="$2"; shift; shift; continue + ;; + -h|--help) + usage + exit 0 + ;; + -a|--jvm-args) + EXTRA_JVM_ARGS="$2"; shift; shift; continue + ;; + --) + # no more arguments to parse + break + ;; + *) + # no more arguments to parse + break + ;; + esac +done + +CONTROLLER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dprod_count=$PRODUCER_COUNT -Dcons_count=$CONSUMER_COUNT -Dprint_std_dev=true -Dduration=${DURATION}" + + +waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } +cleanup() +{ + pids=`ps aux | grep java | grep PerfTestController | awk '{print $2}'` + if [ "$pids" != "" ]; then + kill -3 $pids + kill -9 $pids >/dev/null 2>&1 + fi +} + +run_controller() +{ + TEST_ARGS="$LOG_CONFIG $JAVA_MEM $CONTROLLER_ARGS $EXTRA_JVM_ARGS" + echo "Running controller with : $TEST_ARGS" > test.out + $JAVA -cp $CLASSPATH $TEST_ARGS org.apache.qpid.tools.PerfTestController >> test.out & + waitfor test.out "Controller: Completed the test" + sleep 2 #give a grace period to shutdown + print_result $TEST_NAME +} + +print_result() +{ + prod_rate=`cat test.out | grep "Avg Producer rate" | awk '{print $5}'` + sys_rate=`cat test.out | grep "System Throughput" | awk '{print $4}'` + cons_rate=`cat test.out | grep "Avg Consumer rate" | awk '{print $5}'` + avg_latency=`cat test.out | grep "Avg System Latency" | awk '{print $5}'` + min_latency=`cat test.out | grep "Min System Latency" | awk '{print $5}'` + max_latency=`cat test.out | grep "Max System Latency" | awk '{print $5}'` + std_dev=`cat test.out | grep "Avg System Std Dev" | awk '{print $6}'` + + printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|%7.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency $std_dev + echo "--------------------------------------------------------------------------------------------------------" +} + +trap cleanup EXIT + +rm -rf *.out + +if [ "$DURATION" = -1 ]; then + echo "Test report on " `date +%F` + echo "========================================================================================================" + echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|Std Dev|" + echo "--------------------------------------------------------------------------------------------------------" +else + echo "Test in progress....Tail stats-csv.log to see results being printed for each iteration." +fi + +run_controller diff --git a/java/tools/bin/perf_report.sh b/java/tools/bin/perf-report index e6b4c987e5..7de3f2b602 100755 --- a/java/tools/bin/perf_report.sh +++ b/java/tools/bin/perf-report @@ -18,23 +18,19 @@ # under the License. # -# This will run the 8 use cases defined below and produce -# a report in tabular format. Refer to the documentation -# for more details. +# This will run the following test cases defined below and produce +# a report in tabular format. -SUB_MEM=-Xmx1024M -PUB_MEM=-Xmx1024M -LOG_CONFIG="-Damqj.logging.level=WARN" QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" TOPIC="amq.topic/test" DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" -. setenv.sh +COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } cleanup() -{ +{ pids=`ps aux | grep java | grep Perf | awk '{print $2}'` if [ "$pids" != "" ]; then kill -3 $pids @@ -46,30 +42,31 @@ cleanup() # $2 consumer options # $3 producer options run_testcase() -{ - sh run_sub.sh $LOG_CONFIG $SUB_MEM $2 > sub.out & - waitfor sub.out "Warming up" - sh run_pub.sh $LOG_CONFIG $PUB_MEM $3 > pub.out & - waitfor sub.out "Completed the test" - waitfor pub.out "Consumer has completed the test" +{ + sh run-sub $COMMON_CONFIG $2 > sub.out & + sh run-pub $COMMON_CONFIG $3 > pub.out & + waitfor pub.out "Controller: Completed the test" sleep 2 #give a grace period to shutdown - print_result $1 + print_result $1 + mv pub.out $1.pub.out + mv sub.out $1.sub.out } print_result() { - prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'` - sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'` - avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'` - min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'` - max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency + prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'` + sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'` + cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'` + avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'` + min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'` + max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'` + + printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency echo "------------------------------------------------------------------------------------------------" } trap cleanup EXIT +rm -rf *.out #cleanup old files. echo "Test report on " `date +%F` echo "================================================================================================" @@ -82,7 +79,7 @@ echo "-------------------------------------------------------------------------- # setting very low values to start with and experiment while increasing them slowly. # Test 1 Trans Queue -#run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" +run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" # Test 2 Dura Queue run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/java/tools/bin/qpid-bench b/java/tools/bin/qpid-bench index c982e64efd..cd894b607f 100644..100755 --- a/java/tools/bin/qpid-bench +++ b/java/tools/bin/qpid-bench @@ -18,18 +18,6 @@ # under the License. # -if [ -z "$QPID_HOME" ]; then - export QPID_HOME=$(dirname $(dirname $(readlink -f $0))) - export PATH=${PATH}:${QPID_HOME}/bin -fi +. check-qpid-java-env -# Set classpath to include Qpid jar with all required jars in manifest -QPID_LIBS=$QPID_HOME/lib/qpid-all.jar - -# Set other variables used by the qpid-run script before calling -export JAVA=java \ - JAVA_VM=-server \ - JAVA_MEM=-Xmx1024m \ - QPID_CLASSPATH=$QPID_LIBS - -. qpid-run org.apache.qpid.tools.QpidBench "$@" +$JAVA -cp $CLASSPATH -server $JAVA_MEM $LOG_CONFIG org.apache.qpid.tools.QpidBench "$@" diff --git a/java/tools/bin/qpid-python-testkit b/java/tools/bin/qpid-python-testkit index cbe7972421..7233d0d075 100755 --- a/java/tools/bin/qpid-python-testkit +++ b/java/tools/bin/qpid-python-testkit @@ -22,9 +22,12 @@ # via the python test runner. The defaults are set for a running # from an svn checkout -. ./set-testkit-env.sh +. check-qpid-java-env export PYTHONPATH=./:$PYTHONPATH -rm -rf $OUTDIR -qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@" - +echo $PYTHONPATH +if [ "$OUTDIR" = "" ] ; then + OUTDIR=$PWD +fi +testdir=$OUTDIR/testkit-out-`date +%F-%H-%M-%S` +qpid-python-test -m testkit -DOUTDIR=$testdir"$@" diff --git a/java/tools/bin/run_sub.sh b/java/tools/bin/run-pub index c9ad2fed74..9efe58c4b8 100644..100755 --- a/java/tools/bin/run_sub.sh +++ b/java/tools/bin/run-pub @@ -18,8 +18,11 @@ # under the License. # -. $QPID_TEST_HOME/bin/setenv.sh +. check-qpid-java-env -echo "$@" -$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfConsumer +JVM_ARGS="$1" +PROGRAM_ARGS="$2" +echo "JVM ARGS : $JAVA_MEM $JVM_ARGS" +echo "PROGRAM ARGS : $PROGRAM_ARGS" +$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfProducer $PROGRAM_ARGS diff --git a/java/tools/bin/run_pub.sh b/java/tools/bin/run-sub index 91b9287dea..8449563f7f 100644..100755 --- a/java/tools/bin/run_pub.sh +++ b/java/tools/bin/run-sub @@ -18,7 +18,15 @@ # under the License. # -. $QPID_TEST_HOME/bin/setenv.sh +. check-qpid-java-env + +echo "All args $@" + +JVM_ARGS="$1" +PROGRAM_ARGS="$2" + +echo "JVM ARGS : $JAVA_MEM $JVM_ARGS" +echo "PROGRAM ARGS : $PROGRAM_ARGS" + +$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfConsumer $PROGRAM_ARGS -echo "$@" -$JAVA_HOME/bin/java -cp $CLASSPATH $@ org.apache.qpid.tools.PerfProducer diff --git a/java/tools/bin/set-testkit-env.sh b/java/tools/bin/set-testkit-env.sh deleted file mode 100644 index 051dad8179..0000000000 --- a/java/tools/bin/set-testkit-env.sh +++ /dev/null @@ -1,88 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# If QPIDD_EXEC ..etc is not set, it will first check to see -# if this is run from a qpid svn check out, if not it will look -# for installed rpms. - -abs_path() -{ - D=`dirname "$1"` - B=`basename "$1"` - echo "`cd \"$D\" 2>/dev/null && pwd || echo \"$D\"`/$B" -} - -# Environment for python tests - -if [ -d ../../../python ] ; then - PYTHON_DIR=../../../python - PYTHONPATH=$PYTHON_DIR:$PYTHON_DIR/qpid -elif [ -z `echo $PYTHONPATH | awk '$0 ~ /qpid/'` ]; then - echo "WARNING: skipping test, no qpid python scripts found ."; exit 0; -fi - - -if [ "$QPIDD_EXEC" = "" ] ; then - if [ -x ../../../cpp/src/qpidd ]; then - QPIDD_EXEC=`abs_path "../../../cpp/src/qpidd"` - elif [ -n "$(which qpidd)" ] ; then - QPIDD_EXEC=$(which qpidd) - else - echo "WARNING: skipping test, QPIDD_EXEC not set and qpidd not found."; exit 0; - fi -fi - -if [ "$CLUSTER_LIB" = "" ] ; then - if [ -x ../../../cpp/src/.libs/cluster.so ]; then - CLUSTER_LIB=`abs_path "../../../cpp/src/.libs/cluster.so"` - elif [ -e /usr/lib64/qpid/daemon/cluster.so ] ; then - CLUSTER_LIB="/usr/lib64/qpid/daemon/cluster.so" - elif [ -e /usr/lib/qpid/daemon/cluster.so ] ; then - CLUSTER_LIB="/usr/lib/qpid/daemon/cluster.so" - else - echo "WARNING: skipping test, CLUSTER_LIB not set and cluster.so not found."; exit 0; - fi -fi - -if [ "$STORE_LIB" = "" ] ; then - if [ -e /usr/lib64/qpid/daemon/msgstore.so ] ; then - STORE_LIB="/usr/lib64/qpid/daemon/msgstore.so" - elif [ -e /usr/lib/qpid/daemon/msgstore.so ] ; then - STORE_LIB="/usr/lib/qpid/daemon/msgstore.so" - #else - # echo "WARNING: skipping test, STORE_LIB not set and msgstore.so not found."; exit 0; - fi -fi - -if [ "$QP_CP" = "" ] ; then - if [ -d ../../build/lib/ ]; then - QP_JAR_PATH=`abs_path "../../build/lib/"` - elif [ -d /usr/share/java/qpid-deps ]; then - QP_JAR_PATH=`abs_path "/usr/share/java"` - else - "WARNING: skipping test, QP_CP not set and the Qpid jars are not present."; exit 0; - fi - QP_CP=`find $QP_JAR_PATH -name '*.jar' | tr '\n' ':'` -fi - -if [ "$OUTDIR" = "" ] ; then - OUTDIR=`abs_path "./output"` -fi - -export PYTHONPATH PYTHON_DIR QPIDD_EXEC CLUSTER_LIB QP_CP OUTDIR diff --git a/java/tools/bin/setenv.sh b/java/tools/bin/setenv.sh deleted file mode 100644 index 24135e711b..0000000000 --- a/java/tools/bin/setenv.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Compiles the test classes and sets the CLASSPATH - -# check for QPID_TEST_HOME -if [ "$QPID_TEST_HOME" = "" ] ; then - echo "ERROR: Please set QPID_TEST_HOME ...." - exit 1 -fi - -# check for JAVA_HOME -if [ "$JAVA_HOME" = "" ] ; then - echo "ERROR: Please set JAVA_HOME ...." - exit 1 -fi - -# VENDOR_LIB path needs to be set -# for Qpid set this to {qpid_checkout}/java/build/lib -if [ "$VENDOR_LIB" = "" ] ; then - echo "ERROR: Please set VENDOR_LIB path in the script ...." - exit 1 -fi - - -[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes - -CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"` -$JAVA_HOME/bin/javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'` - -export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH - diff --git a/java/tools/bin/start-consumers b/java/tools/bin/start-consumers new file mode 100644 index 0000000000..c71fc0c21f --- /dev/null +++ b/java/tools/bin/start-consumers @@ -0,0 +1,119 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This starts the controller for coordinating perf tests/ + +. check-qpid-java-env + +PROGRAM_NAME="start-consumers" +PROCESS_COUNT=1 +CON_COUNT=1 +MSG_COUNT=10000 +ADDRESS="queue;{create:always}" +UNIQUE_DEST="false" + +EXTRA_JVM_ARGS=" -Dmax_prefetch=500 " + +TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` + +TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ + --long connection-count:,process-count:,create-unique-queues-topics,\ +jvm-args:,queue:,topic:,address:,\ +msg-count:,help -- "$@") + +usage() +{ + printf "\n%s\n" "Usage: start-producers [option].." + + printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" + + printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" + + printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" + + printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" + + printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" + + printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" + + printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" + + printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" +} + +eval set -- "$TEMP" +while true; do + case $1 in + -C|--connection-count) + CON_COUNT="$2"; shift; shift; continue + ;; + -P|--process-count) + PROCESS_COUNT="$2"; shift; shift; continue + ;; + -u|--create-unique-queues-topics) + UNIQUE_DEST="true"; shift; continue + ;; + --queue) + ADDRESS="$2;{create: always}"; shift; shift; continue + ;; + --topic) + ADDRESS="amq.topic/$2"; shift; shift; continue + ;; + --address) + ADDRESS="$2"; shift; shift; continue + ;; + -h|--help) + usage + exit 0 + ;; + -a|--jvm-args) + EXTRA_JVM_ARGS="$2"; shift; shift; continue + ;; + -c|--msg-count) + MSG_COUNT="$2"; shift; shift; continue + ;; + --) + # no more arguments to parse + break + ;; + *) + # no more arguments to parse + break + ;; + esac +done + +CONSUMER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dcon_count=$CON_COUNT -Dprint_std_dev=true" + +start_consumers() +{ + for ((i=0; i<$PROCESS_COUNT; i++)) + do + if [ "$UNIQUE_DEST" = "true" ]; then + sh run-sub "$CONSUMER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.sub.out 2>&1 & + else + sh run-sub "$CONSUMER_ARGS $@" > ${TEST_ID}_$i.sub.out 2>&1 & + fi + done +} + +start_consumers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" + diff --git a/java/tools/bin/start-producers b/java/tools/bin/start-producers new file mode 100644 index 0000000000..7ba0286f7c --- /dev/null +++ b/java/tools/bin/start-producers @@ -0,0 +1,136 @@ +#!/bin/sh +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This starts the controller for coordinating perf tests/ + +. check-qpid-java-env + +PROGRAM_NAME="start-producers" +PROCESS_COUNT=1 +CON_COUNT=1 +MSG_TYPE="bytes" +WARMUP_MSG_COUNT=1000 +MSG_COUNT=10000 +MSG_SIZE=1024 +ADDRESS="queue;{create:always}" +UNIQUE_DEST="false" + +EXTRA_JVM_ARGS="" +TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` + +TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ + --long connection-count:,process-count:,create-unique-queues-topics,\ +jvm-args:,queue:,topic:,address:,\ +msg-count:,msg-size:msg-type:,warmup-msg-count,help -- "$@") + +usage() +{ + printf "\n%s\n" "Usage: start-producers [option].." + + printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" + + printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" + + printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" + + printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" + + printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" + + printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" + + printf "\n%23s\n%37s\n" "-s, --msg-size=size" "message size (default 1024)" + + printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" + + printf "\n%18s\n%38s\n" "-t, --msg-type" "{bytes|text} (default bytes)" + + printf "\n%26s\n%49s\n" "-w, --warmup-msg-count" "warm up message count (default 100,000)" + + printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" +} + +eval set -- "$TEMP" +while true; do + case $1 in + -C|--connection-count) + CON_COUNT="$2"; shift; shift; continue + ;; + -P|--process-count) + PROCESS_COUNT="$2"; shift; shift; continue + ;; + -u|--create-unique-queues-topics) + UNIQUE_DEST="true"; shift; continue + ;; + --queue) + ADDRESS="$2;{create: always}"; shift; shift; continue + ;; + --topic) + ADDRESS="amq.topic/$2"; shift; shift; continue + ;; + --address) + ADDRESS="$2"; shift; shift; continue + ;; + -h|--help) + usage + exit 0 + ;; + -a|--jvm-args) + EXTRA_JVM_ARGS="$2"; shift; shift; continue + ;; + -s|--msg-size) + MSG_SIZE="$2"; shift; shift; continue + ;; + -c|--msg-count) + MSG_COUNT="$2"; shift; shift; continue + ;; + -t|--msg_type) + MSG_TYPE="$2"; shift; shift; continue + ;; + -w|--warmup-msg-count) + WARMUP_MSG_COUNT="$2"; shift; shift; continue + ;; + --) + # no more arguments to parse + break + ;; + *) + # no more arguments to parse + break + ;; + esac +done + +PRODUCER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dext_controller=true -Dprecision=mili -Dcon_count=$CON_COUNT" + +start_producers() +{ + for ((i=0; i<$PROCESS_COUNT; i++)) + do + if [ "$UNIQUE_DEST" = "true" ]; then + sh run-pub "$PRODUCER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.pub.out 2>&1 & + else + sh run-pub "$PRODUCER_ARGS $@" > ${TEST_ID}_$i.pub.out 2>&1 & + fi + done +} + +start_producers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dmsg_size=$MSG_SIZE -Dwarmup_count=$WARMUP_MSG_COUNT -Dmsg_type=$MSG_TYPE -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" + diff --git a/java/tools/etc/perf-report.gnu b/java/tools/etc/perf-report.gnu new file mode 100644 index 0000000000..6d5020efb5 --- /dev/null +++ b/java/tools/etc/perf-report.gnu @@ -0,0 +1,42 @@ +set terminal png +set datafile separator "," + +set title "Variation of avg latency between iterations" +set yrange [10:20] +set xlabel "Iterations" +set ylabel "Latency (ms)" +set output "avg_latency.png" +plot "stats-csv.log" using 9 title "avg latency" with lines, 14 title "target latency" with lines + + +set title "Variation of max latency between iterations" +set yrange [0:1000] +set xlabel "Iterations" +set ylabel "Latency (ms)" +set output "max_latency.png" +plot "stats-csv.log" using 11 title "max latency" with lines,14 title "target latency" with lines,100 title "100 ms" with lines + + +set title "Variation of standard deviation of latency between iterations" +set yrange [0:20] +set xlabel "Iterations" +set ylabel "Standard Deviation" +set output "std_dev_latency.png" +plot "stats-csv.log" using 12 title "standard deviation" with lines + + +set title "Variation of system throughput between iterations" +set yrange [400000:450000] +set xlabel "Iterations" +set ylabel "System Throuhgput (msg/sec)" +set output "system_rate.png" +plot "stats-csv.log" using 2 title "system throughput" with lines + + +set title "Variation of avg producer & consumer rates between iterations" +set yrange [6500:7500] +set xlabel "Iterations" +set ylabel "Avg Rates (msg/sec)" +set output "prod_cons_rate.png" +plot "stats-csv.log" using 6 title "producer rate" with lines,"stats-csv.log" using 3 title "consumer rate" with lines + diff --git a/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java new file mode 100644 index 0000000000..37369959a8 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/Clock.java @@ -0,0 +1,92 @@ +package org.apache.qpid.tools; + +/** + * In the future this will be replaced by a Clock abstraction + * that can utilize a realtime clock when running in RT Java. + */ + +public class Clock +{ + private static Precision precision; + private static long offset = -1; // in nano secs + + public enum Precision + { + NANO_SECS, MILI_SECS; + + static Precision getPrecision(String str) + { + if ("mili".equalsIgnoreCase(str)) + { + return MILI_SECS; + } + else + { + return NANO_SECS; + } + } + }; + + static + { + precision = Precision.getPrecision(System.getProperty("precision","mili")); + //offset = Long.getLong("offset",-1); + + System.out.println("Using precision : " + precision + " and offset " + offset); + } + + public static Precision getPrecision() + { + return precision; + } + + public static long getTime() + { + if (precision == Precision.NANO_SECS) + { + if (offset == -1) + { + return System.nanoTime(); + } + else + { + return System.nanoTime() + offset; + } + } + else + { + if (offset == -1) + { + return System.currentTimeMillis(); + } + else + { + return System.currentTimeMillis() + offset/convertToMiliSecs(); + } + } + } + + public static long convertToSecs() + { + if (precision == Precision.NANO_SECS) + { + return 1000000000; + } + else + { + return 1000; + } + } + + public static long convertToMiliSecs() + { + if (precision == Precision.NANO_SECS) + { + return 1000000; + } + else + { + return 1; + } + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java index b88b242e6d..90ee7e28ae 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/LatencyTest.java @@ -77,7 +77,7 @@ public class LatencyTest extends PerfBase implements MessageListener public LatencyTest() { - super(); + super(""); warmedUp = lock.newCondition(); testCompleted = lock.newCondition(); // Storing the following two for efficiency @@ -314,7 +314,7 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - final LatencyTest latencyTest = new LatencyTest(); + final LatencyTest latencyTest = new LatencyTest(); Runnable r = new Runnable() { public void run() @@ -334,16 +334,16 @@ public class LatencyTest extends PerfBase implements MessageListener } } }; - + Thread t; try { - t = Threading.getThreadFactory().createThread(r); + t = Threading.getThreadFactory().createThread(r); } catch(Exception e) { throw new Error("Error creating latency test thread",e); } - t.start(); + t.start(); } -}
\ No newline at end of file +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java index ac597d17de..121e94cea1 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfBase.java @@ -20,36 +20,113 @@ */ package org.apache.qpid.tools; +import java.net.InetAddress; import java.text.DecimalFormat; -import java.util.Hashtable; +import java.util.UUID; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQDestination; +import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.messaging.Address; public class PerfBase { + public final static String CODE = "CODE"; + public final static String ID = "ID"; + public final static String REPLY_ADDR = "REPLY_ADDR"; + public final static String MAX_LATENCY = "MAX_LATENCY"; + public final static String MIN_LATENCY = "MIN_LATENCY"; + public final static String AVG_LATENCY = "AVG_LATENCY"; + public final static String STD_DEV = "STD_DEV"; + public final static String CONS_RATE = "CONS_RATE"; + public final static String PROD_RATE = "PROD_RATE"; + public final static String MSG_COUNT = "MSG_COUNT"; + public final static String TIMESTAMP = "Timestamp"; + + String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); + TestParams params; Connection con; Session session; + Session controllerSession; Destination dest; - Destination feedbackDest; + Destination myControlQueue; + Destination controllerQueue; DecimalFormat df = new DecimalFormat("###.##"); + String id; + String myControlQueueAddr; + + MessageProducer sendToController; + MessageConsumer receiveFromController; + String prefix = ""; - public PerfBase() + enum OPCode { + REGISTER_CONSUMER, REGISTER_PRODUCER, + PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, + CONSUMER_READY, PRODUCER_READY, + PRODUCER_START, + RECEIVED_END_MSG, CONSUMER_STOP, + RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS, + CONTINUE_TEST, STOP_TEST + }; + + enum MessageType { + BYTES, TEXT, MAP, OBJECT; + + public static MessageType getType(String s) throws Exception + { + if ("text".equalsIgnoreCase(s)) + { + return TEXT; + } + else if ("bytes".equalsIgnoreCase(s)) + { + return BYTES; + } + /*else if ("map".equalsIgnoreCase(s)) + { + return MAP; + } + else if ("object".equalsIgnoreCase(s)) + { + return OBJECT; + }*/ + else + { + throw new Exception("Unsupported message type"); + } + } + }; + + MessageType msgType = MessageType.BYTES; + + public PerfBase(String prefix) { params = new TestParams(); + String host = ""; + try + { + host = InetAddress.getLocalHost().getHostName(); + } + catch (Exception e) + { + } + id = host + "-" + UUID.randomUUID().toString(); + this.prefix = prefix; + this.myControlQueueAddr = id + ";{create: always}"; } public void setUp() throws Exception - { - + { if (params.getHost().equals("") || params.getPort() == -1) { con = new AMQConnection(params.getUrl()); @@ -62,7 +139,78 @@ public class PerfBase session = con.createSession(params.isTransacted(), params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); - dest = new AMQAnyDestination(params.getAddress()); + controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + + dest = createDestination(); + controllerQueue = new AMQAnyDestination(CONTROLLER_ADDR); + myControlQueue = session.createQueue(myControlQueueAddr); + msgType = MessageType.getType(params.getMessageType()); + System.out.println("Using " + msgType + " messages"); + + sendToController = controllerSession.createProducer(controllerQueue); + receiveFromController = controllerSession.createConsumer(myControlQueue); + } + + private Destination createDestination() throws Exception + { + if (params.isUseUniqueDests()) + { + System.out.println("Prefix : " + prefix); + Address addr = Address.parse(params.getAddress()); + AMQAnyDestination temp = new AMQAnyDestination(params.getAddress()); + int type = ((AMQSession_0_10)session).resolveAddressType(temp); + + if ( type == AMQDestination.TOPIC_TYPE) + { + addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); + System.out.println("Setting subject : " + addr); + } + else + { + addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions()); + System.out.println("Setting name : " + addr); + } + + return new AMQAnyDestination(addr); + } + else + { + return new AMQAnyDestination(params.getAddress()); + } + } + + public synchronized void sendMessageToController(MapMessage m) throws Exception + { + m.setString(ID, id); + m.setString(REPLY_ADDR,myControlQueueAddr); + sendToController.send(m); + } + + public void receiveFromController(OPCode expected) throws Exception + { + MapMessage m = (MapMessage)receiveFromController.receive(); + OPCode code = OPCode.values()[m.getInt(CODE)]; + System.out.println("Received Code : " + code); + if (expected != code) + { + throw new Exception("Expected OPCode : " + expected + " but received : " + code); + } + + } + + public boolean continueTest() throws Exception + { + MapMessage m = (MapMessage)receiveFromController.receive(); + OPCode code = OPCode.values()[m.getInt(CODE)]; + System.out.println("Received Code : " + code); + return (code == OPCode.CONTINUE_TEST); + } + + public void tearDown() throws Exception + { + session.close(); + controllerSession.close(); + con.close(); } public void handleError(Exception e,String msg) diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java index 0ef0455a64..b63892bb51 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfConsumer.java @@ -20,13 +20,17 @@ */ package org.apache.qpid.tools; -import javax.jms.Destination; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; -import javax.jms.MessageProducer; import javax.jms.TextMessage; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -47,7 +51,7 @@ import org.apache.qpid.thread.Threading; * b) They are on separate machines that have their time synced via a Time Server * * In order to calculate latency the producer inserts a timestamp - * hen the message is sent. The consumer will note the current time the message is + * when the message is sent. The consumer will note the current time the message is * received and will calculate the latency as follows * latency = rcvdTime - msg.getJMSTimestamp() * @@ -55,13 +59,9 @@ import org.apache.qpid.thread.Threading; * variance in latencies. * * Avg latency is measured by adding all latencies and dividing by the total msgs. - * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount * * Throughput * =========== - * System throughput is calculated as follows - * rcvdMsgCount/(rcvdTime - testStartTime) - * * Consumer rate is calculated as * rcvdMsgCount/(rcvdTime - startTime) * @@ -81,130 +81,160 @@ public class PerfConsumer extends PerfBase implements MessageListener long minLatency = Long.MAX_VALUE; long totalLatency = 0; // to calculate avg latency. int rcvdMsgCount = 0; - long testStartTime = 0; // to measure system throughput long startTime = 0; // to measure consumer throughput long rcvdTime = 0; boolean transacted = false; int transSize = 0; + boolean printStdDev = false; + List<Long> sample; + final Object lock = new Object(); - public PerfConsumer() + public PerfConsumer(String prefix) { - super(); + super(prefix); + System.out.println("Consumer ID : " + id); } public void setUp() throws Exception { super.setUp(); consumer = session.createConsumer(dest); + System.out.println("Consumer: " + id + " Receiving messages from : " + ((AMQDestination)dest).getQueueName() + "\n"); // Storing the following two for efficiency transacted = params.isTransacted(); transSize = params.getTransactionSize(); + printStdDev = params.isPrintStdDev(); + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); + sendMessageToController(m); } public void warmup()throws Exception { - System.out.println("Warming up......"); - - boolean start = false; - while (!start) + receiveFromController(OPCode.CONSUMER_STARTWARMUP); + Message msg = consumer.receive(); + // This is to ensure we drain the queue before we start the actual test. + while ( msg != null) { - Message msg = consumer.receive(); - if (msg instanceof TextMessage) + if (msg.getBooleanProperty("End") == true) { - if (((TextMessage)msg).getText().equals("End")) - { - start = true; - MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); - temp.send(session.createMessage()); - if (params.isTransacted()) - { - session.commit(); - } - temp.close(); - } + // It's more realistic for the consumer to signal this. + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); + sendMessageToController(m); } + msg = consumer.receive(1000); + } + + if (params.isTransacted()) + { + session.commit(); } + + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); + sendMessageToController(m); + consumer.setMessageListener(this); } public void startTest() throws Exception { - System.out.println("Starting test......"); - consumer.setMessageListener(this); + System.out.println("Consumer: " + id + " Starting test......" + "\n"); + resetCounters(); } - public void printResults() throws Exception + public void resetCounters() { - synchronized (lock) + rcvdMsgCount = 0; + maxLatency = 0; + minLatency = Long.MAX_VALUE; + totalLatency = 0; + if (printStdDev) { - lock.wait(); + sample = null; + sample = new ArrayList<Long>(params.getMsgCount()); } + } + + public void sendResults() throws Exception + { + receiveFromController(OPCode.CONSUMER_STOP); double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; - double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; + double consRate = (double)rcvdMsgCount*Clock.convertToSecs()/(double)(rcvdTime - startTime); + double stdDev = 0.0; + if (printStdDev) + { + stdDev = calculateStdDev(avgLatency); + } + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); + m.setDouble(AVG_LATENCY, avgLatency/Clock.convertToMiliSecs()); + m.setDouble(MIN_LATENCY,minLatency/Clock.convertToMiliSecs()); + m.setDouble(MAX_LATENCY,maxLatency/Clock.convertToMiliSecs()); + m.setDouble(STD_DEV, stdDev/Clock.convertToMiliSecs()); + m.setDouble(CONS_RATE, consRate); + m.setLong(MSG_COUNT, rcvdMsgCount); + sendMessageToController(m); + System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); System.out.println(new StringBuilder("Consumer rate : "). append(df.format(consRate)). append(" msg/sec").toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(throughput)). - append(" msg/sec").toString()); System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency)). + append(df.format(avgLatency/Clock.convertToMiliSecs())). append(" ms").toString()); System.out.println(new StringBuilder("Min Latency : "). - append(minLatency). + append(df.format(minLatency/Clock.convertToMiliSecs())). append(" ms").toString()); System.out.println(new StringBuilder("Max Latency : "). - append(maxLatency). + append(df.format(maxLatency/Clock.convertToMiliSecs())). append(" ms").toString()); - System.out.println("Completed the test......\n"); - } - - public void notifyCompletion(Destination replyTo) throws Exception - { - MessageProducer tmp = session.createProducer(replyTo); - Message endMsg = session.createMessage(); - tmp.send(endMsg); - if (params.isTransacted()) + if (printStdDev) { - session.commit(); + System.out.println(new StringBuilder("Std Dev : "). + append(stdDev/Clock.convertToMiliSecs()).toString()); } - tmp.close(); } - public void tearDown() throws Exception + public double calculateStdDev(double mean) { - consumer.close(); - session.close(); - con.close(); + double v = 0; + for (double latency: sample) + { + v = v + Math.pow((latency-mean), 2); + } + v = v/sample.size(); + return Math.round(Math.sqrt(v)); } public void onMessage(Message msg) { try { - if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) + // To figure out the decoding overhead of text + if (msgType == MessageType.TEXT) { - notifyCompletion(msg.getJMSReplyTo()); + ((TextMessage)msg).getText(); + } - synchronized (lock) - { - lock.notifyAll(); - } + if (msg.getBooleanProperty("End")) + { + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); + sendMessageToController(m); } else { - rcvdTime = System.currentTimeMillis(); + rcvdTime = Clock.getTime(); rcvdMsgCount ++; if (rcvdMsgCount == 1) { startTime = rcvdTime; - testStartTime = msg.getJMSTimestamp(); } if (transacted && (rcvdMsgCount % transSize == 0)) @@ -212,10 +242,14 @@ public class PerfConsumer extends PerfBase implements MessageListener session.commit(); } - long latency = rcvdTime - msg.getJMSTimestamp(); + long latency = rcvdTime - msg.getLongProperty(TIMESTAMP); maxLatency = Math.max(maxLatency, latency); minLatency = Math.min(minLatency, latency); totalLatency = totalLatency + latency; + if (printStdDev) + { + sample.add(latency); + } } } @@ -226,14 +260,21 @@ public class PerfConsumer extends PerfBase implements MessageListener } - public void test() + public void run() { try { setUp(); warmup(); - startTest(); - printResults(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Consumer: " + id + " starting a new iteration ......\n"); + startTest(); + sendResults(); + nextIteration = continueTest(); + } tearDown(); } catch(Exception e) @@ -242,26 +283,43 @@ public class PerfConsumer extends PerfBase implements MessageListener } } - public static void main(String[] args) + @Override + public void tearDown() throws Exception + { + super.tearDown(); + } + + public static void main(String[] args) throws InterruptedException { - final PerfConsumer cons = new PerfConsumer(); - Runnable r = new Runnable() + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = Integer.getInteger("con_count",1); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) { - public void run() + + final PerfConsumer cons = new PerfConsumer(scriptId + i); + Runnable r = new Runnable() { - cons.test(); + public void run() + { + cons.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); + } - t.start(); + testCompleted.await(); + System.out.println("Consumers have completed the test......\n"); } }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java index 015d1e6205..ac6129ab68 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfProducer.java @@ -23,13 +23,15 @@ package org.apache.qpid.tools; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.CountDownLatch; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; +import javax.jms.MapMessage; import javax.jms.Message; -import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import org.apache.qpid.client.AMQDestination; import org.apache.qpid.thread.Threading; /** @@ -51,38 +53,52 @@ import org.apache.qpid.thread.Threading; * System throughput and latencies calculated by the PerfConsumer are more realistic * numbers. * + * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs + * I have done so far, it seems quite useful to compute the producer rate as it gives an + * indication of how the system behaves. For ex if there is a gap between producer and consumer rates + * you could clearly see the higher latencies and when producer and consumer rates are very close, + * latency is good. + * */ public class PerfProducer extends PerfBase { + private static long SEC = 60000; + MessageProducer producer; Message msg; - byte[] payload; - List<byte[]> payloads; + Object payload; + List<Object> payloads; boolean cacheMsg = false; boolean randomMsgSize = false; boolean durable = false; Random random; int msgSizeRange = 1024; - - public PerfProducer() + boolean rateLimitProducer = false; + double rateFactor = 0.4; + double rate = 0.0; + + public PerfProducer(String prefix) { - super(); + super(prefix); + System.out.println("Producer ID : " + id); } public void setUp() throws Exception { super.setUp(); - feedbackDest = session.createTemporaryQueue(); - durable = params.isDurable(); - + rateLimitProducer = params.getRate() > 0 ? true : false; + if (rateLimitProducer) + { + System.out.println("The test will attempt to limit the producer to " + params.getRate() + " msg/sec"); + } + // if message caching is enabled we pre create the message // else we pre create the payload if (params.isCacheMessage()) { cacheMsg = true; - - msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg = createMessage(createPayload(params.getMsgSize())); msg.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT @@ -93,21 +109,52 @@ public class PerfProducer extends PerfBase random = new Random(20080921); randomMsgSize = true; msgSizeRange = params.getMsgSize(); - payloads = new ArrayList<byte[]>(msgSizeRange); - + payloads = new ArrayList<Object>(msgSizeRange); + for (int i=0; i < msgSizeRange; i++) { - payloads.add(MessageFactory.createMessagePayload(i).getBytes()); + payloads.add(createPayload(i)); } - } + } else { - payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + payload = createPayload(params.getMsgSize()); } producer = session.createProducer(dest); + System.out.println("Producer: " + id + " Sending messages to: " + ((AMQDestination)dest).getQueueName()); producer.setDisableMessageID(params.isDisableMessageID()); producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + + MapMessage m = controllerSession.createMapMessage(); + m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); + sendMessageToController(m); + } + + Object createPayload(int size) + { + if (msgType == MessageType.TEXT) + { + return MessageFactory.createMessagePayload(size); + } + else + { + return MessageFactory.createMessagePayload(size).getBytes(); + } + } + + Message createMessage(Object payload) throws Exception + { + if (msgType == MessageType.TEXT) + { + return session.createTextMessage((String)payload); + } + else + { + BytesMessage m = session.createBytesMessage(); + m.writeBytes((byte[])payload); + return m; + } } protected Message getNextMessage() throws Exception @@ -117,117 +164,130 @@ public class PerfProducer extends PerfBase return msg; } else - { - msg = session.createBytesMessage(); - + { + Message m; + if (!randomMsgSize) { - ((BytesMessage)msg).writeBytes(payload); + m = createMessage(payload); } else { - ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); + m = createMessage(payloads.get(random.nextInt(msgSizeRange))); } - msg.setJMSDeliveryMode(durable? + m.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT ); - return msg; + return m; } } public void warmup()throws Exception { - System.out.println("Warming up......"); - MessageConsumer tmp = session.createConsumer(feedbackDest); + receiveFromController(OPCode.PRODUCER_STARTWARMUP); + System.out.println("Producer: " + id + " Warming up......"); for (int i=0; i < params.getWarmupCount() -1; i++) { producer.send(getNextMessage()); } - Message msg = session.createTextMessage("End"); - msg.setJMSReplyTo(feedbackDest); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.receive(); + sendEndMessage(); if (params.isTransacted()) { session.commit(); } - - tmp.close(); } public void startTest() throws Exception { - System.out.println("Starting test......"); + resetCounters(); + receiveFromController(OPCode.PRODUCER_START); int count = params.getMsgCount(); boolean transacted = params.isTransacted(); int tranSize = params.getTransactionSize(); - long start = System.currentTimeMillis(); + long limit = (long)(params.getRate() * rateFactor); // in msecs + long timeLimit = (long)(SEC * rateFactor); // in msecs + + long start = Clock.getTime(); // defaults to nano secs + long interval = start; for(int i=0; i < count; i++ ) { Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); + msg.setLongProperty(TIMESTAMP, Clock.getTime()); producer.send(msg); if ( transacted && ((i+1) % tranSize == 0)) { session.commit(); } + + if (rateLimitProducer && i%limit == 0) + { + long elapsed = (Clock.getTime() - interval)*Clock.convertToMiliSecs(); // in msecs + if (elapsed < timeLimit) + { + Thread.sleep(elapsed); + } + interval = Clock.getTime(); + + } + } + sendEndMessage(); + if ( transacted) + { + session.commit(); } - long time = System.currentTimeMillis() - start; - double rate = ((double)count/(double)time)*1000; + long time = Clock.getTime() - start; + rate = (double)count*Clock.convertToSecs()/(double)time; System.out.println(new StringBuilder("Producer rate: "). append(df.format(rate)). append(" msg/sec"). toString()); } - public void waitForCompletion() throws Exception + public void resetCounters() { - MessageConsumer tmp = session.createConsumer(feedbackDest); - Message msg = session.createTextMessage("End"); - msg.setJMSReplyTo(feedbackDest); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - tmp.receive(); + } - if (params.isTransacted()) - { - session.commit(); - } + public void sendEndMessage() throws Exception + { + Message msg = session.createMessage(); + msg.setBooleanProperty("End", true); + producer.send(msg); + } - tmp.close(); - System.out.println("Consumer has completed the test......"); + public void sendResults() throws Exception + { + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); + msg.setDouble(PROD_RATE, rate); + sendMessageToController(msg); } + @Override public void tearDown() throws Exception { - producer.close(); - session.close(); - con.close(); + super.tearDown(); } - public void test() + public void run() { try { setUp(); warmup(); - startTest(); - waitForCompletion(); + boolean nextIteration = true; + while (nextIteration) + { + System.out.println("=========================================================\n"); + System.out.println("Producer: " + id + " starting a new iteration ......\n"); + startTest(); + sendResults(); + nextIteration = continueTest(); + } tearDown(); } catch(Exception e) @@ -236,27 +296,63 @@ public class PerfProducer extends PerfBase } } - - public static void main(String[] args) + public void startControllerIfNeeded() { - final PerfProducer prod = new PerfProducer(); - Runnable r = new Runnable() + if (!params.isExternalController()) { - public void run() + final PerfTestController controller = new PerfTestController(); + Runnable r = new Runnable() + { + public void run() + { + controller.run(); + } + }; + + Thread t; + try { - prod.test(); + t = Threading.getThreadFactory().createThread(r); } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); + catch(Exception e) + { + throw new Error("Error creating controller thread",e); + } + t.start(); } - catch(Exception e) + } + + + public static void main(String[] args) throws InterruptedException + { + String scriptId = (args.length == 1) ? args[0] : ""; + int conCount = Integer.getInteger("con_count",1); + final CountDownLatch testCompleted = new CountDownLatch(conCount); + for (int i=0; i < conCount; i++) { - throw new Error("Error creating producer thread",e); + final PerfProducer prod = new PerfProducer(scriptId + i); + prod.startControllerIfNeeded(); + Runnable r = new Runnable() + { + public void run() + { + prod.run(); + testCompleted.countDown(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); } - t.start(); + testCompleted.await(); + System.out.println("Producers have completed the test......"); } }
\ No newline at end of file diff --git a/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java new file mode 100644 index 0000000000..5c98c645f4 --- /dev/null +++ b/java/tools/src/main/java/org/apache/qpid/tools/PerfTestController.java @@ -0,0 +1,422 @@ +package org.apache.qpid.tools; + +import java.io.FileWriter; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; + +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; + +import org.apache.qpid.client.message.AMQPEncodedMapMessage; + +/** + * The Controller coordinates a test run between a number + * of producers and consumers, configured via -Dprod_count and -Dcons_count. + * + * It waits till all the producers and consumers have registered and then + * conducts a warmup run. Once all consumers and producers have completed + * the warmup run and is ready, it will conduct the actual test run and + * collect all stats from the participants and calculates the system + * throughput, the avg/min/max for producer rates, consumer rates and latency. + * + * These stats are then printed to std out. + * The Controller also prints events to std out to give a running account + * of the test run in progress. Ex registering of participants, starting warmup ..etc. + * This allows a scripting tool to monitor the progress. + * + * The Controller can be run in two modes. + * 1. A single test run (default) where it just runs until the message count specified + * for the producers via -Dmsg_count is sent and received. + * + * 2. Time based, configured via -Dduration=x, where x is in mins. + * In this mode, the Controller repeatedly cycles through the tests (after an initial + * warmup run) until the desired time is reached. If a test run is in progress + * and the time is up, it will allow the run the complete. + * + * After each iteration, the stats will be printed out in csv format to a separate log file. + * System throughput is calculated as follows + * totalMsgCount/(totalTestTime) + */ +public class PerfTestController extends PerfBase implements MessageListener +{ + enum TestMode { SINGLE_RUN, TIME_BASED }; + + TestMode testMode = TestMode.SINGLE_RUN; + + long totalTestTime; + + private double avgSystemLatency = 0.0; + private double minSystemLatency = Double.MAX_VALUE; + private double maxSystemLatency = 0; + private double avgSystemLatencyStdDev = 0.0; + + private double avgSystemConsRate = 0.0; + private double maxSystemConsRate = 0.0; + private double minSystemConsRate = Double.MAX_VALUE; + + private double avgSystemProdRate = 0.0; + private double maxSystemProdRate = 0.0; + private double minSystemProdRate = Double.MAX_VALUE; + + private long totalMsgCount = 0; + private double totalSystemThroughput = 0.0; + + private int consumerCount = Integer.getInteger("cons_count", 1); + private int producerCount = Integer.getInteger("prod_count", 1); + private int duration = Integer.getInteger("duration", -1); // in mins + private Map<String,MapMessage> consumers; + private Map<String,MapMessage> producers; + + private CountDownLatch consRegistered; + private CountDownLatch prodRegistered; + private CountDownLatch consReady; + private CountDownLatch prodReady; + private CountDownLatch receivedEndMsg; + private CountDownLatch receivedConsStats; + private CountDownLatch receivedProdStats; + + private MessageConsumer consumer; + private boolean printStdDev = false; + FileWriter writer; + + public PerfTestController() + { + super(""); + consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount); + producers = new ConcurrentHashMap<String,MapMessage>(producerCount); + + consRegistered = new CountDownLatch(consumerCount); + prodRegistered = new CountDownLatch(producerCount); + consReady = new CountDownLatch(consumerCount); + prodReady = new CountDownLatch(producerCount); + printStdDev = params.isPrintStdDev(); + testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; + } + + public void setUp() throws Exception + { + super.setUp(); + if (testMode == TestMode.TIME_BASED) + { + writer = new FileWriter("stats-csv.log"); + } + consumer = controllerSession.createConsumer(controllerQueue); + System.out.println("\nController: " + producerCount + " producers are expected"); + System.out.println("Controller: " + consumerCount + " consumers are expected \n"); + consumer.setMessageListener(this); + consRegistered.await(); + prodRegistered.await(); + System.out.println("\nController: All producers and consumers have registered......\n"); + } + + public void warmup() throws Exception + { + System.out.println("Controller initiating warm up sequence......"); + sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); + sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); + prodReady.await(); + consReady.await(); + System.out.println("\nController : All producers and consumers are ready to start the test......\n"); + } + + public void startTest() throws Exception + { + resetCounters(); + System.out.println("\nController Starting test......"); + long start = Clock.getTime(); + sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); + receivedEndMsg.await(); + totalTestTime = Clock.getTime() - start; + sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values()); + receivedProdStats.await(); + receivedConsStats.await(); + } + + public void resetCounters() + { + minSystemLatency = Double.MAX_VALUE; + maxSystemLatency = 0; + maxSystemConsRate = 0.0; + minSystemConsRate = Double.MAX_VALUE; + maxSystemProdRate = 0.0; + minSystemProdRate = Double.MAX_VALUE; + + totalMsgCount = 0; + + receivedConsStats = new CountDownLatch(consumerCount); + receivedProdStats = new CountDownLatch(producerCount); + receivedEndMsg = new CountDownLatch(producerCount); + } + + public void calcStats() throws Exception + { + double totLatency = 0.0; + double totStdDev = 0.0; + double totalConsRate = 0.0; + double totalProdRate = 0.0; + + MapMessage conStat = null; // for error handling + try + { + for (MapMessage m: consumers.values()) + { + conStat = m; + minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY)); + maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY)); + totLatency = totLatency + m.getDouble(AVG_LATENCY); + totStdDev = totStdDev + m.getDouble(STD_DEV); + + minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE)); + maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE)); + totalConsRate = totalConsRate + m.getDouble(CONS_RATE); + + totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT); + } + } + catch(Exception e) + { + System.out.println("Error calculating stats from Consumer : " + conStat); + } + + + MapMessage prodStat = null; // for error handling + try + { + for (MapMessage m: producers.values()) + { + prodStat = m; + minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE)); + maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE)); + totalProdRate = totalProdRate + m.getDouble(PROD_RATE); + } + } + catch(Exception e) + { + System.out.println("Error calculating stats from Producer : " + conStat); + } + + avgSystemLatency = totLatency/consumers.size(); + avgSystemLatencyStdDev = totStdDev/consumers.size(); + avgSystemConsRate = totalConsRate/consumers.size(); + avgSystemProdRate = totalProdRate/producers.size(); + + System.out.println("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); + + totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); + } + + public void printResults() throws Exception + { + System.out.println(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); + System.out.println(new StringBuilder("System Throughput : "). + append(df.format(totalSystemThroughput)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Avg Consumer rate : "). + append(df.format(avgSystemConsRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Min Consumer rate : "). + append(df.format(minSystemConsRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Max Consumer rate : "). + append(df.format(maxSystemConsRate)). + append(" msg/sec").toString()); + + System.out.println(new StringBuilder("Avg Producer rate : "). + append(df.format(avgSystemProdRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Min Producer rate : "). + append(df.format(minSystemProdRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Max Producer rate : "). + append(df.format(maxSystemProdRate)). + append(" msg/sec").toString()); + + System.out.println(new StringBuilder("Avg System Latency : "). + append(df.format(avgSystemLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min System Latency : "). + append(df.format(minSystemLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Max System Latency : "). + append(df.format(maxSystemLatency)). + append(" ms").toString()); + if (printStdDev) + { + System.out.println(new StringBuilder("Avg System Std Dev : "). + append(avgSystemLatencyStdDev)); + } + } + + private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception + { + System.out.println("\nController: Sending code " + code); + MessageProducer tmpProd = controllerSession.createProducer(null); + MapMessage msg = controllerSession.createMapMessage(); + msg.setInt(CODE, code.ordinal()); + for (MapMessage node : nodes) + { + if (node.getString(REPLY_ADDR) == null) + { + System.out.println("REPLY_ADDR is null " + node); + } + else + { + System.out.println("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); + } + tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); + } + } + + public void onMessage(Message msg) + { + try + { + MapMessage m = (MapMessage)msg; + OPCode code = OPCode.values()[m.getInt(CODE)]; + + System.out.println("\n---------Controller Received Code : " + code); + System.out.println("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); + + switch (code) + { + case REGISTER_CONSUMER : + if (consRegistered.getCount() == 0) + { + System.out.println("Warning : Expected number of consumers have already registered," + + "ignoring extra consumer"); + break; + } + consumers.put(m.getString(ID),m); + consRegistered.countDown(); + break; + + case REGISTER_PRODUCER : + if (prodRegistered.getCount() == 0) + { + System.out.println("Warning : Expected number of producers have already registered," + + "ignoring extra producer"); + break; + } + producers.put(m.getString(ID),m); + prodRegistered.countDown(); + break; + + case CONSUMER_READY : + consReady.countDown(); + break; + + case PRODUCER_READY : + prodReady.countDown(); + break; + + case RECEIVED_END_MSG : + receivedEndMsg.countDown(); + break; + + case RECEIVED_CONSUMER_STATS : + consumers.put(m.getString(ID),m); + receivedConsStats.countDown(); + break; + + case RECEIVED_PRODUCER_STATS : + producers.put(m.getString(ID),m); + receivedProdStats.countDown(); + break; + + default: + throw new Exception("Invalid OPCode " + code); + } + } + catch (Exception e) + { + handleError(e,"Error when receiving messages " + msg); + } + } + + public void run() + { + try + { + setUp(); + warmup(); + if (testMode == TestMode.SINGLE_RUN) + { + startTest(); + calcStats(); + printResults(); + } + else + { + long startTime = Clock.getTime(); + long timeLimit = duration * 60 * 1000; // duration is in mins. + boolean nextIteration = true; + while (nextIteration) + { + startTest(); + calcStats(); + writeStatsToFile(); + if (Clock.getTime() - startTime < timeLimit) + { + sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); + sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); + nextIteration = true; + } + else + { + nextIteration = false; + } + } + } + tearDown(); + + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + @Override + public void tearDown() throws Exception { + System.out.println("Controller: Completed the test......\n"); + if (testMode == TestMode.TIME_BASED) + { + writer.close(); + } + sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); + sendMessageToNodes(OPCode.STOP_TEST,producers.values()); + super.tearDown(); + } + + public void writeStatsToFile() throws Exception + { + writer.append(String.valueOf(totalMsgCount)).append(","); + writer.append(df.format(totalSystemThroughput)).append(","); + writer.append(df.format(avgSystemConsRate)).append(","); + writer.append(df.format(minSystemConsRate)).append(","); + writer.append(df.format(maxSystemConsRate)).append(","); + writer.append(df.format(avgSystemProdRate)).append(","); + writer.append(df.format(minSystemProdRate)).append(","); + writer.append(df.format(maxSystemProdRate)).append(","); + writer.append(df.format(avgSystemLatency)).append(","); + writer.append(df.format(minSystemLatency)).append(","); + writer.append(df.format(maxSystemLatency)); + if (printStdDev) + { + writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); + } + writer.append("\n"); + writer.flush(); + } + + public static void main(String[] args) + { + PerfTestController controller = new PerfTestController(); + controller.run(); + } +} diff --git a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java index 89d6462a39..d73be0181b 100644 --- a/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java +++ b/java/tools/src/main/java/org/apache/qpid/tools/TestParams.java @@ -25,25 +25,25 @@ import javax.jms.Session; public class TestParams { /* - * By default the connection URL is used. + * By default the connection URL is used. * This allows a user to easily specify a fully fledged URL any given property. * Ex. SSL parameters - * + * * By providing a host & port allows a user to simply override the URL. * This allows to create multiple clients in test scripts easily, - * without having to deal with the long URL format. + * without having to deal with the long URL format. */ private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; - + private String host = ""; - + private int port = -1; private String address = "queue; {create : always}"; private int msg_size = 1024; - private int msg_type = 1; // not used yet + private int random_msg_size_start_from = 1; private boolean cacheMessage = false; @@ -62,19 +62,28 @@ public class TestParams private int msg_count = 10; private int warmup_count = 1; - + private boolean random_msg_size = false; + private String msgType = "bytes"; + + private boolean printStdDev = false; + + private long rate = -1; + + private boolean externalController = false; + + private boolean useUniqueDest = false; // useful when using multiple connections. + public TestParams() { - + url = System.getProperty("url",url); host = System.getProperty("host",""); port = Integer.getInteger("port", -1); - address = System.getProperty("address","queue"); + address = System.getProperty("address",address); msg_size = Integer.getInteger("msg_size", 1024); - msg_type = Integer.getInteger("msg_type",1); cacheMessage = Boolean.getBoolean("cache_msg"); disableMessageID = Boolean.getBoolean("disableMessageID"); disableTimestamp = Boolean.getBoolean("disableTimestamp"); @@ -85,6 +94,12 @@ public class TestParams msg_count = Integer.getInteger("msg_count",msg_count); warmup_count = Integer.getInteger("warmup_count",warmup_count); random_msg_size = Boolean.getBoolean("random_msg_size"); + msgType = System.getProperty("msg_type","bytes"); + printStdDev = Boolean.getBoolean("print_std_dev"); + rate = Long.getLong("rate",-1); + externalController = Boolean.getBoolean("ext_controller"); + useUniqueDest = Boolean.getBoolean("use_unique_dest"); + random_msg_size_start_from = Integer.getInteger("random_msg_size_start_from", 1); } public String getUrl() @@ -122,9 +137,9 @@ public class TestParams return msg_size; } - public int getMsgType() + public int getRandomMsgSizeStartFrom() { - return msg_type; + return random_msg_size_start_from; } public boolean isDurable() @@ -161,10 +176,39 @@ public class TestParams { return disableTimestamp; } - + public boolean isRandomMsgSize() { return random_msg_size; } + public String getMessageType() + { + return msgType; + } + + public boolean isPrintStdDev() + { + return printStdDev; + } + + public long getRate() + { + return rate; + } + + public boolean isExternalController() + { + return externalController; + } + + public void setAddress(String addr) + { + address = addr; + } + + public boolean isUseUniqueDests() + { + return useUniqueDest; + } } |