diff options
| author | Alex Rudyy <orudyy@apache.org> | 2015-04-15 09:47:28 +0000 |
|---|---|---|
| committer | Alex Rudyy <orudyy@apache.org> | 2015-04-15 09:47:28 +0000 |
| commit | 0a0baee45ebcff44635907d457c4ff6810b09c87 (patch) | |
| tree | 8bfb0f9eddbc23cff88af69be80ab3ce7d47011c /qpid/java/tools | |
| parent | 54aa3d7070da16ce55c28ccad3f7d0871479e461 (diff) | |
| download | qpid-python-0a0baee45ebcff44635907d457c4ff6810b09c87.tar.gz | |
QPID-6481: Move java source tree to top level
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1673693 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/tools')
44 files changed, 0 insertions, 8638 deletions
diff --git a/qpid/java/tools/README.txt b/qpid/java/tools/README.txt deleted file mode 100644 index fdde734027..0000000000 --- a/qpid/java/tools/README.txt +++ /dev/null @@ -1,153 +0,0 @@ -Introduction -============ - -The Test kit for the java client consists of 2 components. - -1) A Simple Perf Test that can be used to, - a) Run a predefined perf report consisting of 8 use cases (see below) - b) Run a producer and a consumer with a number of different options - -2) Soak tests that can be run for longer durations (hours or days). - -I am planning to add some stress tests to this module as well. -Please note this is not a replacement for the existing perf/systests etc. -But rather a small test kit thats focused on providing a packaged set of tests that can be quickly deployed on an environment to do quick smoke testing or easily setup a soak test. - -Table of Contents -================= -1. Perf Kit -2. Soak Kit -3. Perf Test use cases -4. Soak Test use cases -5. Running the sample perf test report -6. Running the sample soak test report - -1.0 Perf Kit ------------- -1.1 The perf kit can be packaged as an RPM or a tar file and deploy on a target environment and run the perf report. -Or else a perf report can be automated to run every day or so an record numbers to catch perf regressions. - -1.2 It calculates the following results in msg/sec. - - System throuhgput : no_of_msgs / (time_last_msg_rcvd - time_first_msg_send) - - Producer rate : no_of_msgs / (time_after_sending - time_before_sending) - - Producer rate : no_of_msgs / (time_last_msg_rcvd - time_first_msg_rcvd) - - Latency : time_msg_rcvd - time_msg_sent - -The test will print min, max and avg latency. - -1.3 The test assume that both producer and consumer are run on the same machine or different machines that are time synced. - -1.4 You can also use run_sub.sh and run_pub.sh to run different use cases with several options. - Please look at TestParams.java for all the configurable options. - -1.5 You can also use the test kit to benchmark against any vendor. - - -2.0 Soak tests --------------- -2.0 This includes a set of soak tests that can be run for a longer duration. - -2.1 A typical test will send x-1 messages and the xth message will contain an "End" marker. - The producer will print the timestamp as soon as it sends the xth message. - The consumer will reply with an empty message to the replyTo destination given in the xth message. - The consumer prints the throuhgput for the iteration and the latency for the xth message. - A typical value for x is 100k - -2.2 The feedback loop prevents the producer from overrunning the consumer. - And the printout for every xth message will let you know how many iterations been completed at any given time. - (Ex a simple cat log | wc -l will give you the how many iterations have been completed so far). - -2.2 The following results can be calculated for these tests. - - Memory, CPU for each producer/consumer - look at testkit/bin/run_soak_client.sh for an example - - You can find the Avg, Min & Max for throughput, latency, CPU and memory for the entire test run. - (look at testkit/bin/soak_report.sh) for an example). - - You could also graph throughput, latency, CPU and memory using the comma separated log files. - -2.2 If you use different machines for producer and consumer the machines have to be time synced to have meaningful latency samples. - -3.0 Perf Test report use cases -------------------------------- -3.1 Please check testkit/bin/perf_report.sh for more details - -3.2 A typical test run will send 1000 msgs during warmup and 200k msgs for result calculation. - -Test 1 Trans Queue - -Test 2 Dura Queue - -Test 3 Dura Queue Sync - -Test 4 Topic - -Test 5 Durable Topic - -Test 6 Fanout - -Test 7 Small TX (about 2 msgs per tx) - -Test 8 Large TX (about 1000 msgs per tx) - - -4.0 Soak tests use cases -------------------------- -4.1 Following are the current tests available in the test kit. - -4.2 Please refer to the source to see the javadoc and options - - -1. SimpleProducer/Consumer sends X messages at a time and will wait for confirmation from producer before proceeding with the next iteration. A no of options can be configured. - -2. MultiThreadedProducer/Consumer does the same thing as above but runs each session in a separate thread. - It can also send messages transactionally. Again a no of options can be configured. - -3. ResourceLeakTest will setup consumer/producers sends x messages and then teard down everything and continue again. - - -5.0 Running the sample perf test report ---------------------------------------- -The testkit/bin contains perf_report.sh. -It runs the above 8 use cases against a broker and print the results in a tabular format. - -For example -================================================================================================ -|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency| ------------------------------------------------------------------------------------------------- -|Trans_Queue | xxxxx.xx| xxxxx.xx| xxxxx.xx| xx.xx| x| xx| - - -5.1 running perf_report.sh - -5.1.1 set JAVA_HOME to point to Java 1.5 and above -5.1.2 set QPID_TEST_HOME to point to the testkit dir -5.1.3 set VENDOR_LIB to point to the Qpid (or other JMS providers) jar files. -5.1.4 start a broker -5.1.5 update the testkit/etc/jndi.properties to point to the correct broker -5.1.6 execute perf_report.sh - - -6.0 Running the sample soak test report ---------------------------------------- -The testkit/bin contains soak_report.sh -It runs MultiThreadedProducer/Consumer for the duration specified and prints a report for the following stats. -Avg, Min and Max for System Throughput, letency, CPU and memory. - -6.1 running soak_report.sh - -5.1.1 set JAVA_HOME to point to Java 1.5 and above -5.1.2 set QPID_TEST_HOME to point to the testkit dir -5.1.3 set JAR_PATH to point to the Qpid jars -5.1.4 start a broker -5.1.5 execute soak_report.sh with correct params. - Ex sh soak_report.sh 1 36000 will run for 10 hours colllecting CPU, memory every second. - -5.1.6 Please note the total duration for the test is log_freq * log_iterations - So if you want to run the test for 10 hours and collect 10 second samples then do the following - sh soak_report.sh 10 3600 - diff --git a/qpid/java/tools/bin/Profile-run-from-source b/qpid/java/tools/bin/Profile-run-from-source deleted file mode 100755 index 179c365450..0000000000 --- a/qpid/java/tools/bin/Profile-run-from-source +++ /dev/null @@ -1,71 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# Sets the environment for running the scripts from a source checkout. -txtbld=$(tput bold) # Bold -txtrst=$(tput sgr0) # Reset -txtred=$(tput setaf 1) # red -txtgreen=$(tput setaf 2) # green - -echo "${txtbld}Setting the environment to run qpid java tools from a source checkout${txtrst}" - -abs_path() -{ - D=`dirname "$1"` - echo "`cd \"$D\" 2>/dev/null && pwd`" -} - -export QPID_CHECKOUT=`abs_path "../../../../"` -echo "${txtgreen}Using source checkout at $QPID_CHECKOUT${txtrst}" - -export PATH=$QPID_CHECKOUT/java/tools/bin:$PATH - -if [ "$JAVA" = "" ] ; then - export JAVA=$(which java) -fi - -#------------- Required for perf_report, qpid-bench & qpid-python-testkit ---------------- - -export VENDOR_LIB=$QPID_CHECKOUT/java/build/lib -export CLASSPATH=`find $VENDOR_LIB -name '*.jar' | tr '\n' ':'` -export LOG_CONFIG="-Dlog4j.configuration=file:///$QPID_CHECKOUT/java/tools/etc/test.log4j" - - -#------------- Required for qpid-python-testkit ----------------------------------------- - -PYTHONPATH=$QPID_CHECKOUT/python:$QPID_CHECKOUT/cpp/src/test/brokertest.py:$PYTHONPATH -export PATH=$QPID_CHECKOUT/python:$PATH - -if [ -x $QPID_CHECKOUT/cpp/src/qpidd ]; then - QPIDD_EXEC=$QPID_CHECKOUT/cpp/src/qpidd -else - echo "${txtred}WARNING: Qpid CPP broker executable not found. You will not be able to run qpid-python-testkit${txtrst}" -fi - -if [ -x $QPID_CHECKOUT/cpp/src/.libs/cluster.so ]; then - CLUSTER_LIB=$QPID_CHECKOUT/cpp/src/.libs/cluster.so -else - echo "${txtred}WARNING: Qpid cluster.so not found.You will not be able to run qpid-python-testkit${txtrst}" -fi - -if [ "$STORE_LIB" = "" ] ; then - echo "${txtred}WARNING: Please point the STORE_LIB variable to the message store module. If not persistence tests will not write messages to disk.${txtrst}" -fi - -export PYTHONPATH QPIDD_EXEC CLUSTER_LIB diff --git a/qpid/java/tools/bin/check-qpid-java-env b/qpid/java/tools/bin/check-qpid-java-env deleted file mode 100755 index da67e50a90..0000000000 --- a/qpid/java/tools/bin/check-qpid-java-env +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/bin/env bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -if [ -z "$LOG_CONFIG" ]; then - echo "Please set the appropriate parameters for logging as it may affect performance. Ex log4j defaults to DEBUG if not configured properly" - exit -1 -fi - -if [ -z "$JAVA_MEM" ]; then - JAVA_MEM=-Xmx1024m -fi - -if [ -z "$JAVA" ]; then - echo "Please set the path to the correct java executable to JAVA" - exit -1 -fi - -if [ -z "$CLASSPATH" ]; then - echo "Please set the $CLASSPATH variable to point to the jar/class files" - exit -1 -fi diff --git a/qpid/java/tools/bin/jms-quick-perf-report b/qpid/java/tools/bin/jms-quick-perf-report deleted file mode 100755 index 7de3f2b602..0000000000 --- a/qpid/java/tools/bin/jms-quick-perf-report +++ /dev/null @@ -1,137 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This will run the following test cases defined below and produce -# a report in tabular format. - -QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}" -DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}" -TOPIC="amq.topic/test" -DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}" - -COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" - -waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } -cleanup() -{ - pids=`ps aux | grep java | grep Perf | awk '{print $2}'` - if [ "$pids" != "" ]; then - kill -3 $pids - kill -9 $pids >/dev/null 2>&1 - fi -} - -# $1 test name -# $2 consumer options -# $3 producer options -run_testcase() -{ - sh run-sub $COMMON_CONFIG $2 > sub.out & - sh run-pub $COMMON_CONFIG $3 > pub.out & - waitfor pub.out "Controller: Completed the test" - sleep 2 #give a grace period to shutdown - print_result $1 - mv pub.out $1.pub.out - mv sub.out $1.sub.out -} - -print_result() -{ - prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'` - sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'` - avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'` - min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'` - max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency - echo "------------------------------------------------------------------------------------------------" -} - -trap cleanup EXIT -rm -rf *.out #cleanup old files. - -echo "Test report on " `date +%F` -echo "================================================================================================" -echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|" -echo "------------------------------------------------------------------------------------------------" - -# The message counts and warmup counts are set to very low values for quick testing of the script. -# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k -# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend -# setting very low values to start with and experiment while increasing them slowly. - -# Test 1 Trans Queue -run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10" - -# Test 2 Dura Queue -run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 3 Dura Queue Sync -run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" - -# Test 4 Dura Queue Sync Publish and Ack -run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent" - -# Test 5 Topic -run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10" - -# Test 6 Durable Topic -run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 7 Fanout -run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10" - -# Test 8 Small TX -run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \ - "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1" - -# Test 9 Large TX -run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \ - "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10" - -# Test 10 256 MSG -run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 11 512 MSG -run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 12 2048 MSG -run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 13 Random size MSG -run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 14 Random size MSG Durable -run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 15 64K MSG -run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 16 Durable 64K MSG -run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \ - "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" - -# Test 17 500K MSG -run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10" - -# Test 18 Durable 500K MSG -run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \ - "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10" diff --git a/qpid/java/tools/bin/mercury-controller b/qpid/java/tools/bin/mercury-controller deleted file mode 100755 index fab8614039..0000000000 --- a/qpid/java/tools/bin/mercury-controller +++ /dev/null @@ -1,132 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME=controller -CONSUMER_COUNT=1 -PRODUCER_COUNT=1 -DURATION=-1 -TEST_NAME="TEST_NAME" -EXTRA_JVM_ARGS="" - -TEMP=$(getopt -n $PROGRAM_NAME -o c:p:d:n:a:h --long consumers:,producers:,jvm-args:help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: controller [option].." - - printf "\n%31s\n%52s\n" "-c, --consumer-count=count" "No of consumers participating in the test" - - printf "\n%31s\n%52s\n" "-p, --producer-count=count" "No of producers participating in the test" - - printf "\n%24s\n%94s\n" "-d, --duration=mins" "The duration of the test in mins. If not specified, it will just run one iteration." - - printf "\n%27s\n%32s\n" "-n, --name=<test-name>" "The name of the test." - - printf "\n%19s\n%50s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -c|--consumer-count) - CONSUMER_COUNT="$2"; shift; shift; continue - ;; - -p|--producer-count) - PRODUCER_COUNT="$2"; shift; shift; continue - ;; - -d|--duration) - DURATION="$2"; shift; shift; continue - ;; - -n|--name) - TEST_NAME="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -CONTROLLER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dprod_count=$PRODUCER_COUNT -Dcons_count=$CONSUMER_COUNT -Dprint_std_dev=true -Dduration=${DURATION}" - - -waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } -cleanup() -{ - pids=`ps aux | grep java | grep PerfTestController | awk '{print $2}'` - if [ "$pids" != "" ]; then - kill -3 $pids - kill -9 $pids >/dev/null 2>&1 - fi -} - -run_controller() -{ - TEST_ARGS="$LOG_CONFIG $JAVA_MEM $CONTROLLER_ARGS $EXTRA_JVM_ARGS" - echo "Running controller with : $TEST_ARGS" > test.out - $JAVA -cp $CLASSPATH $TEST_ARGS org.apache.qpid.tools.PerfTestController >> test.out & - waitfor test.out "Controller: Completed the test" - sleep 2 #give a grace period to shutdown - print_result $TEST_NAME -} - -print_result() -{ - prod_rate=`cat test.out | grep "Avg Producer rate" | awk '{print $5}'` - sys_rate=`cat test.out | grep "System Throughput" | awk '{print $4}'` - cons_rate=`cat test.out | grep "Avg Consumer rate" | awk '{print $5}'` - avg_latency=`cat test.out | grep "Avg System Latency" | awk '{print $5}'` - min_latency=`cat test.out | grep "Min System Latency" | awk '{print $5}'` - max_latency=`cat test.out | grep "Max System Latency" | awk '{print $5}'` - std_dev=`cat test.out | grep "Avg System Std Dev" | awk '{print $6}'` - - printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|%7.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency $std_dev - echo "--------------------------------------------------------------------------------------------------------" -} - -trap cleanup EXIT - -rm -rf *.out - -if [ "$DURATION" = -1 ]; then - echo "Test report on " `date +%F` - echo "========================================================================================================" - echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|Std Dev|" - echo "--------------------------------------------------------------------------------------------------------" -else - echo "Test in progress....Tail stats-csv.log to see results being printed for each iteration." -fi - -run_controller diff --git a/qpid/java/tools/bin/mercury-start-consumers b/qpid/java/tools/bin/mercury-start-consumers deleted file mode 100755 index c71fc0c21f..0000000000 --- a/qpid/java/tools/bin/mercury-start-consumers +++ /dev/null @@ -1,119 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME="start-consumers" -PROCESS_COUNT=1 -CON_COUNT=1 -MSG_COUNT=10000 -ADDRESS="queue;{create:always}" -UNIQUE_DEST="false" - -EXTRA_JVM_ARGS=" -Dmax_prefetch=500 " - -TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` - -TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ - --long connection-count:,process-count:,create-unique-queues-topics,\ -jvm-args:,queue:,topic:,address:,\ -msg-count:,help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: start-producers [option].." - - printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" - - printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" - - printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" - - printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" - - printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" - - printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" - - printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" - - printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -C|--connection-count) - CON_COUNT="$2"; shift; shift; continue - ;; - -P|--process-count) - PROCESS_COUNT="$2"; shift; shift; continue - ;; - -u|--create-unique-queues-topics) - UNIQUE_DEST="true"; shift; continue - ;; - --queue) - ADDRESS="$2;{create: always}"; shift; shift; continue - ;; - --topic) - ADDRESS="amq.topic/$2"; shift; shift; continue - ;; - --address) - ADDRESS="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - -c|--msg-count) - MSG_COUNT="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -CONSUMER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dcon_count=$CON_COUNT -Dprint_std_dev=true" - -start_consumers() -{ - for ((i=0; i<$PROCESS_COUNT; i++)) - do - if [ "$UNIQUE_DEST" = "true" ]; then - sh run-sub "$CONSUMER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.sub.out 2>&1 & - else - sh run-sub "$CONSUMER_ARGS $@" > ${TEST_ID}_$i.sub.out 2>&1 & - fi - done -} - -start_consumers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" - diff --git a/qpid/java/tools/bin/mercury-start-producers b/qpid/java/tools/bin/mercury-start-producers deleted file mode 100755 index 7ba0286f7c..0000000000 --- a/qpid/java/tools/bin/mercury-start-producers +++ /dev/null @@ -1,136 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME="start-producers" -PROCESS_COUNT=1 -CON_COUNT=1 -MSG_TYPE="bytes" -WARMUP_MSG_COUNT=1000 -MSG_COUNT=10000 -MSG_SIZE=1024 -ADDRESS="queue;{create:always}" -UNIQUE_DEST="false" - -EXTRA_JVM_ARGS="" -TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` - -TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\ - --long connection-count:,process-count:,create-unique-queues-topics,\ -jvm-args:,queue:,topic:,address:,\ -msg-count:,msg-size:msg-type:,warmup-msg-count,help -- "$@") - -usage() -{ - printf "\n%s\n" "Usage: start-producers [option].." - - printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test" - - printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test" - - printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic" - - printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue" - - printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic" - - printf "\n%13s\n%44s\n" "--address" "The address you want to publish to" - - printf "\n%23s\n%37s\n" "-s, --msg-size=size" "message size (default 1024)" - - printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)" - - printf "\n%18s\n%38s\n" "-t, --msg-type" "{bytes|text} (default bytes)" - - printf "\n%26s\n%49s\n" "-w, --warmup-msg-count" "warm up message count (default 100,000)" - - printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -C|--connection-count) - CON_COUNT="$2"; shift; shift; continue - ;; - -P|--process-count) - PROCESS_COUNT="$2"; shift; shift; continue - ;; - -u|--create-unique-queues-topics) - UNIQUE_DEST="true"; shift; continue - ;; - --queue) - ADDRESS="$2;{create: always}"; shift; shift; continue - ;; - --topic) - ADDRESS="amq.topic/$2"; shift; shift; continue - ;; - --address) - ADDRESS="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - -s|--msg-size) - MSG_SIZE="$2"; shift; shift; continue - ;; - -c|--msg-count) - MSG_COUNT="$2"; shift; shift; continue - ;; - -t|--msg_type) - MSG_TYPE="$2"; shift; shift; continue - ;; - -w|--warmup-msg-count) - WARMUP_MSG_COUNT="$2"; shift; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -PRODUCER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dext_controller=true -Dprecision=mili -Dcon_count=$CON_COUNT" - -start_producers() -{ - for ((i=0; i<$PROCESS_COUNT; i++)) - do - if [ "$UNIQUE_DEST" = "true" ]; then - sh run-pub "$PRODUCER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.pub.out 2>&1 & - else - sh run-pub "$PRODUCER_ARGS $@" > ${TEST_ID}_$i.pub.out 2>&1 & - fi - done -} - -start_producers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dmsg_size=$MSG_SIZE -Dwarmup_count=$WARMUP_MSG_COUNT -Dmsg_type=$MSG_TYPE -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS" - diff --git a/qpid/java/tools/bin/qpid-bench b/qpid/java/tools/bin/qpid-bench deleted file mode 100755 index 4773320b9e..0000000000 --- a/qpid/java/tools/bin/qpid-bench +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -. check-qpid-java-env - -$JAVA -cp $CLASSPATH -server $JAVA_MEM $LOG_CONFIG org.apache.qpid.tools.QpidBench "$@" diff --git a/qpid/java/tools/bin/qpid-jms-benchmark b/qpid/java/tools/bin/qpid-jms-benchmark deleted file mode 100755 index 3d712a27dc..0000000000 --- a/qpid/java/tools/bin/qpid-jms-benchmark +++ /dev/null @@ -1,316 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, time, qpid.messaging, re -from threading import Thread -from subprocess import Popen, PIPE, STDOUT - -op = optparse.OptionParser(usage="usage: %prog [options]", - description="simple performance benchmarks") -op.add_option("-b", "--broker", default=[], action="append", type="str", - help="url of broker(s) to connect to, round robin on multiple brokers") -op.add_option("-c", "--client-host", default=[], action="append", type="str", - help="host(s) to run clients on via ssh, round robin on mulple hosts") -op.add_option("-q", "--queues", default=1, type="int", metavar="N", - help="create N queues (default %default)") -op.add_option("-s", "--senders", default=1, type="int", metavar="N", - help="start N senders per queue (default %default)") -op.add_option("-r", "--receivers", default=1, type="int", metavar="N", - help="start N receivers per queue (default %default)") -op.add_option("-m", "--messages", default=100000, type="int", metavar="N", - help="send N messages per sender (default %default)") -op.add_option("--queue-name", default="benchmark", metavar="NAME", - help="base name for queues (default %default)") -op.add_option("--send-rate", default=0, metavar="N", - help="send rate limited to N messages/second, 0 means no limit (default %default)") -op.add_option("--receive-rate", default=0, metavar="N", - help="receive rate limited to N messages/second, 0 means no limit (default %default)") -op.add_option("--content-size", default=1024, type="int", metavar="BYTES", - help="message size in bytes (default %default)") -op.add_option("--ack-frequency", default=100, metavar="N", type="int", - help="receiver ack's every N messages, 0 means unconfirmed (default %default)") -op.add_option("--no-report-header", dest="report_header", default=True, - action="store_false", help="don't print header on report") -op.add_option("--summarize", default=False, action="store_true", - help="print summary statistics for multiple senders/receivers: total throughput, average latency") -op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int") -op.add_option("--send-option", default=[], action="append", type="str", - help="Additional option for sending addresses") -op.add_option("--receive-option", default=[], action="append", type="str", - help="Additional option for receiving addresses") -op.add_option("--create-option", default=[], action="append", type="str", - help="Additional option for creating addresses") -op.add_option("--send-arg", default=[], action="append", type="str", - help="Additional argument for qpid-send") -op.add_option("--receive-arg", default=[], action="append", type="str", - help="Additional argument for qpid-receive") -op.add_option("--no-timestamp", dest="timestamp", default=True, - action="store_false", help="don't add a timestamp, no latency results") -op.add_option("--sequence", dest="sequence", default=False, - action="store_true", help="add a sequence number to each message") -op.add_option("--connection-options", type="str", - help="Connection options for senders & receivers") -op.add_option("--durable", default=False, action="store_true", - help="Use durable queues and messages") -op.add_option("--save-received", default=False, action="store_true", - help="Save received message content to files <queuename>-receiver-<n>.msg") -op.add_option("--verbose", default=False, action="store_true", - help="Show commands executed") -op.add_option("--no-delete", default=False, action="store_true", - help="Don't delete the test queues.") -op.add_option("--fill-drain", default=False, action="store_true", - help="First fill the queues, then drain them") - -single_quote_re = re.compile("'") -def posix_quote(string): - """ Quote a string for use as an argument in a posix shell""" - return "'" + single_quote_re.sub("\\'", string) + "'"; - -def ssh_command(host, command): - """ Convert command into an ssh command on host with quoting""" - return ["ssh", host] + [posix_quote(arg) for arg in command] - -class Clients: - def __init__(self): self.clients=[] - - def add(self, client): - self.clients.append(client) - return client - - def kill(self): - for c in self.clients: - try: c.kill() - except: pass - -class PopenCommand(Popen): - """Like Popen but you can query for the command""" - def __init__(self, command, *args, **kwargs): - self.command = command - Popen.__init__(self, command, *args, **kwargs) - -clients = Clients() - -def start_receive(queue, index, opts, ready_queue, broker, host): - address_opts=opts.receive_option - if opts.durable: address_opts += ["node:{durable:true}"] - address="%s;{%s}"%(queue,",".join(address_opts)) - msg_total=opts.senders*opts.messages - messages = msg_total/opts.receivers; - if (index < msg_total%opts.receivers): messages += 1 - if (messages == 0): return None - command = ["qpid-jms-receive", - #"-b", broker, - "--ready-address", "benchmark-ready;{create:always}", - "-a", address, - "-m", str(messages), - "--forever", - "--print-content=no", - # "--receive-rate", str(opts.receive_rate), - "--report-total", - "--ack-frequency", str(opts.ack_frequency), - # "--ready-address", "%s;{create:always}"%ready_queue, - "--report-header=no -v" - ] - if opts.save_received: - command += ["--save-content=%s-receiver-%s.msg"%(queue,index)] - command += opts.receive_arg - if opts.connection_options: - command += ["--connection-options",opts.connection_options] - if host: command = ssh_command(host, command) - if opts.verbose: print "Receiver: ", command - return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) - -def start_send(queue, opts, broker, host): - address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"])) - command = ["qpid-jms-send", - #"-b", broker, - "-a", address, - "--messages", str(opts.messages), - "--content-size", str(opts.content_size), - "--send-rate", str(opts.send_rate), - "--report-total", - "--report-header=no", - "--timestamp=%s"%(opts.timestamp and "yes" or "no"), - "--sequence=%s"%(opts.sequence and "yes" or "no"), - "--durable", str(opts.durable) - ] - command += opts.send_arg - if opts.connection_options: - command += ["--connection-options",opts.connection_options] - if host: command = ssh_command(host, command) - if opts.verbose: print "Sender: ", command - return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE)) - -def error_msg(out, err): - return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err)) - -def first_line(p): - out,err=p.communicate() - if p.returncode != 0: - raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err))) - - print str(out) - print str(err) - return out.split("\n")[0] - -def recreate_queues(queues, brokers, no_delete, opts): - c = qpid.messaging.Connection(brokers[0]) - c.open() - s = c.session() - for q in queues: - if not no_delete: - try: s.sender("%s;{delete:always}"%(q)).close() - except qpid.messaging.exceptions.NotFound: pass - address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"])) - if opts.verbose: print "Creating", address - s.sender(address) - c.close() - -def print_header(timestamp): - if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp" - else: latency_header="" - print "send-tp\trecv-tp%s"%latency_header - -def parse(parser, lines): # Parse sender/receiver output - return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] - -def parse_senders(senders): - return parse([int],[first_line(p) for p in senders]) - -def parse_receivers(receivers): - return parse([int,float,float,float],[first_line(p) for p in receivers if p]) - -def print_data(send_stats, recv_stats, total_tp): - for send,recv in map(None, send_stats, recv_stats): - line="" - if send: line += "%d"%send[0] - if recv: - line += "\t%d"%recv[0] - if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) - if total_tp is not None: - line += "\t%d"%total_tp - total_tp = None - print line - -def print_summary(send_stats, recv_stats, total_tp): - def avg(s): sum(s) / len(s) - send_tp = sum([l[0] for l in send_stats]) - recv_tp = sum([l[0] for l in recv_stats]) - summary = "%d\t%d"%(send_tp, recv_tp) - if recv_stats and len(recv_stats[0]) == 4: - l_min = sum(l[1] for l in recv_stats)/len(recv_stats) - l_max = sum(l[2] for l in recv_stats)/len(recv_stats) - l_avg = sum(l[3] for l in recv_stats)/len(recv_stats) - summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg) - summary += "\t%d"%total_tp - print summary - - -class ReadyReceiver: - """A receiver for ready messages""" - def __init__(self, queue, broker): - self.connection = qpid.messaging.Connection(broker) - self.connection.open() - self.receiver = self.connection.session().receiver( - "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue)) - self.receiver.session.sync() - self.timeout=10 - - def wait(self, receivers): - try: - for i in receivers: self.receiver.fetch(self.timeout) - self.connection.close() - except qpid.messaging.Empty: - for r in receivers: - if (r.poll() is not None): - out,err=r.communicate() - raise Exception("Receiver error: %s\n%s" % - (" ".join(r.command), error_msg(out,err))) - raise Exception("Timed out waiting for receivers to be ready") - -def flatten(l): - return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), []) - -class RoundRobin: - def __init__(self,items): - self.items = items - self.index = 0 - - def next(self): - if not self.items: return None - ret = self.items[self.index] - self.index = (self.index+1)%len(self.items) - return ret - -def main(): - opts, args = op.parse_args() - opts.client_host = flatten(opts.client_host) - if not opts.broker: - if opts.client_host: - raise Exception("--broker must be specified if --client_host is.") - opts.broker = ["127.0.0.1"] # Deafult to local broker - opts.broker = flatten(opts.broker) - brokers = RoundRobin(opts.broker) - client_hosts = RoundRobin(opts.client_host) - send_out = "" - receive_out = "" - ready_queue="%s-ready"%(opts.queue_name) - queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] - try: - for i in xrange(opts.repeat): - recreate_queues(queues, opts.broker, opts.no_delete, opts) - ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) - - def start_receivers(): - return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.receivers) ] - - - def start_senders(): - return [ start_send(q, opts,brokers.next(), client_hosts.next()) - for q in queues for j in xrange(opts.senders) ] - - if opts.report_header and i == 0: print_header(opts.timestamp) - - if opts.fill_drain: - # First fill the queues, then drain them - start = time.time() - senders = start_senders() - for p in senders: p.wait() - receivers = start_receivers() - for p in receivers: p.wait() - else: - # Run senders and receivers in parallel - receivers = start_receivers() - ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready - start = time.time() - senders = start_senders() - for p in senders + receivers: p.wait() - - total_sent = opts.queues * opts.senders * opts.messages - total_tp = total_sent / (time.time()-start) - #send_stats=parse_senders(senders) - recv_stats=parse_receivers(receivers) - #if opts.summarize: print_summary(send_stats, recv_stats, total_tp) - #else: print_data(send_stats, recv_stats, total_tp) - finally: clients.kill() # No strays - -if __name__ == "__main__": main() - diff --git a/qpid/java/tools/bin/qpid-jms-receive b/qpid/java/tools/bin/qpid-jms-receive deleted file mode 100755 index 57abe874ff..0000000000 --- a/qpid/java/tools/bin/qpid-jms-receive +++ /dev/null @@ -1,193 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME="qpid-jms-receive" -URL="amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" -ADDRESS="queue;{create:always}" -TIMEOUT="0" -FOREVER="false" -MESSAGES="1" -IGNORE_DUPLICATES="false" -CHECK_REDELIVERED="false" -CAPACITY="1000" -ACK_FREQUENCY="100" -TX="0" -ROLLBACL_FREQUENCY="0" -PRINT_CONTENT="false" -PRINT_HEADERS="false" -REPORT_TOTAL="false" -REPORT_EVERY="0" -REPORT_HEADER="true" -READY_ADDRES="''" -EXTRA_JVM_ARGS="" -VERBOSE="0" - -TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` - -TEMP=$(getopt -n $PROGRAM_NAME -o b:a:f:m:vh\ - --long broker:,address:,timeout:,forever\ -,messages:,ignore-duplicates,check-redelivered\ -,capacity:,ack-frequency:,tx:,rollback-frequency:\ -,print-content:,print-headers:,report-total\ -,report-every:,report-header:,ready-address:\ -,jvm-args:,verbose,help -- "$@") - -# padding the option string with 4 spaces -# padding the desc string with 30 spaces -usage() -{ - printf "\n%s\n" "Usage: $PROGRAM_NAME [option].." - - printf "\n%20s\n%57s\n" "-b, --broker URL" "url of broker to connect to" - - printf "\n%24s\n%53s\n" "-a,--address ADDRESS" "address to receive from" - - printf "\n%25s\n%71s\n" "--timeout TIMEOUT (0)" "timeout in seconds to wait before exiting" - - printf "\n%17s\n%61s\n" "-f, --forever" "ignore timeout and wait forever" - - printf "\n%24s\n%89s\n" "-m, --messages N (0)" "Number of messages to receive; 0 means receive indefinitely" - - printf "\n%23s\n%84s\n" "--ignore-duplicates" "Detect and ignore duplicates (by checking 'sn' header)" - - printf "\n%23s\n%82s\n%92s\n" "--check-redelivered" "Fails with exception if a duplicate is not marked as" " redelivered (only relevant when ignore-duplicates is selected)" - - printf "\n%23s\n%71s\n" "--capacity N (1000)" "Pre-fetch window (0 implies no pre-fetch)" - - printf "\n%27s\n%94s\n" "--ack-frequency N (100)" "Ack frequency (0 implies none of the messages will get accepted)" - - printf "\n%14s\n%94s\n" "--tx N (0)" "batch size for transactions (0 implies transaction are not used)" - - printf "\n%30s\n%94s\n" "--rollback-frequency N (0)" "rollback frequency (0 implies no transaction will be rolledback)" - - printf "\n%30s\n%55s\n" "--print-content yes|no (0)" "print out message content" - - printf "\n%30s\n%55s\n" "--print-headers yes|no (0)" "print out message headers" - - printf "\n%18s\n%76s\n" "--report-total" "Report total throughput and latency statistics" - - printf "\n%24s\n%87s\n" "--report-every N (0)" "Report throughput and latency statistics every N messages" - - printf "\n%30s\n%47s\n" "--report-header yes|no (1)" "Headers on report" - - printf "\n%27s\n%82s\n" "--ready-address ADDRESS" "send a message to this address when ready to receive" - - printf "\n%14s\n%69s\n" "--jvm-args" "Extra jvm arguments you want to specify" - - printf "\n%17s\n%69s\n\n" "-v, --verbose" "Print debug information for this script" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -b|--broker) - URL="$2"; shift; shift; continue - ;; - -a|--address) - ADDRESS="$2"; shift; shift; continue - ;; - --timeout) - TIMEOUT="$2"; shift; shift; continue - ;; - -f|--forever) - FOREVER="$2"; shift; shift; continue - ;; - -m|--messages) - MESSAGES="$2"; shift; shift; continue - ;; - --ignore-duplicates) - IGNORE_DUPLICATES="true"; shift; continue - ;; - --check-redelivered) - CHECK_REDELIVERED="true"; shift; continue - ;; - --capacity) - CAPACITY="$2"; shift; shift; continue - ;; - --ack-frequency) - ACK_FREQUENCY="$2"; shift; shift; continue - ;; - --tx) - TX="$2"; shift; shift; continue - ;; - --rollback-frequency) - ROLLBACK_FREQUENCY="$2"; shift; shift; continue - ;; - --print-content) - if [ "$2" == "yes" ]; then PRINT_CONTENT="true"; else PRINT_CONTENT="false"; fi; shift; shift; continue - ;; - --print-headers) - if [ "$2" == "yes" ]; then PRINT_HEADERS="true"; else PRINT_HEADERS="false"; fi; shift; shift; continue - ;; - --report-total) - REPORT_TOTAL="true"; shift; continue - ;; - --report-every) - REPORT_EVERY="$2"; shift; shift; continue - ;; - --report-header) - if [ "$2" == "yes" ]; then REPORT_HEADER="true"; else REPORT_HEADER="false"; fi; shift; shift; continue - ;; - --ready-address) - READY_ADDRESS="$2"; shift; shift; continue - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -v|--verbose) - VERBOSE="1"; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -RECEIVER_ARGS="-server -Durl=$URL \ --Daddress=$ADDRESS \ --Dtimeout=$TIMEOUT \ --Dmsg-count=$MESSAGES \ --Dack-frequency=$ACK_FREQUENCY \ --Dtx=$TX \ --Drollback-frequnecy=$ROLLBACL_FREQUENCY \ --Dprint-content=$PRINT_CONTENT \ --Dprint-headers=$PRINT_HEADERS \ --Dreport-total=$REPORT_TOTAL \ --Dreport-every=$REPORT_EVERY \ --Dreport-header=$REPORT_HEADER \ --Dmax_prefetch=$CAPACITY " - -if [ "x$READY_ADDRESS" != "x" ]; then RECEIVER_ARGS="$RECEIVER_ARGS -Dready-address=$READY_ADDRESS"; fi -if [ "$VERBOSE" == "1" ]; then echo $RECEIVER_ARGS; fi -echo $RECEIVER_ARGS -$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $RECEIVER_ARGS org.apache.qpid.tools.QpidReceive diff --git a/qpid/java/tools/bin/qpid-jms-send b/qpid/java/tools/bin/qpid-jms-send deleted file mode 100755 index d7695924f0..0000000000 --- a/qpid/java/tools/bin/qpid-jms-send +++ /dev/null @@ -1,261 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This starts the controller for coordinating perf tests/ - -. check-qpid-java-env - -PROGRAM_NAME="qpid-jms-send" -URL="amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'" -ADDRESS="queue;{create:always}" -MESSAGES="1" -ID="" -REPLY_TO="" -SEND_EOS="1" -DURABLE="false" -TTL="0" -PRIORITY="0" -PROPERTY="" -CORRELATION_ID="" -USER_ID="" -CONTENT_STRING="" -CONTENT_SIZE="1024" -CONTENT_MAP="" -CAPACITY="1000" -ACK_FREQUENCY="100" -TX="0" -ROLLBACL_FREQUENCY="0" -PRINT_CONTENT="true" -PRINT_HEADERS="false" -REPORT_TOTAL="false" -REPORT_EVERY="0" -REPORT_HEADER="true" -SEND_RATE="-1" -SEQUNCE="1" -DISABLE_TIMESTAMP="false" -EXTRA_JVM_ARGS="" -VERBOSE="0" - -TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'` - -TEMP=$(getopt -n $PROGRAM_NAME -o b:a:m:i:P:M:vh\ - --long broker:,address:,messages:,id:,reply-to:\ -,send-eos:,durable:,ttl:,property:,correlational-id:\ -,user-id:,content-string:,content-size:,content-map:\ -,capacity:,ack-frequency:,tx:,rollback-frequency:\ -,print-content:,print-headers:,report-total\ -,report-every:,report-header:,send-rate:,sequence:,timestamp:\ -,jvm-args:,verbose,help -- "$@") - -# padding the option string with 4 spaces -# padding the desc string with 30 spaces -usage() -{ - printf "\n%s\n" "Usage: $PROGRAM_NAME [option].." - - printf "\n%20s\n%57s\n" "-b, --broker URL" "url of broker to connect to" - - printf "\n%24s\n%53s\n" "-a,--address ADDRESS" "address to receive from" - - printf "\n%24s\n%89s\n" "-m, --messages N (0)" "Number of messages to receive; 0 means receive indefinitely" - - printf "\n%15s\n%75s\n" "-i, --id ID" "use the supplied id instead of generating one" - - printf "\n%23s\n%54s\n" "--reply-to REPLY-TO" "specify reply-to address" - - printf "\n%20s\n%70s\n" "--send-eos N (0)" "send N EOS messages to mark end of input" - - printf "\n%24s\n%54s\n" "--durable yes|no (0)" "mark messages as durable" - - printf "\n%19s\n%72s\n" "--ttl msecs (0)" "time-to-live for messages, in milliseconds" - - printf "\n%27s\n%72s\n" "--priority PRIORITY (0)" "time-to-live for messages, in milliseconds" - - printf "\n%29s\n%54s\n" "-P, --property NAME=VALUE" "specify message property" - - printf "\n%23s\n%57s\n" "--correlation-id ID" "correlation-id for message" - - printf "\n%20s\n%48s\n" "--user-id USERID" "userid for message" - - printf "\n%28s\n%60s\n" "--content-string CONTENT" "use CONTENT as message content" - - printf "\n%24s\n%62s\n" "--content-size N (0)" "create an N-byte message content" - - printf "\n%32s\n%59s\n" "-M, --content-map NAME=VALUE" "specify entry for map content" - - printf "\n%23s\n%71s\n" "--capacity N (1000)" "Pre-fetch window (0 implies no pre-fetch)" - - printf "\n%27s\n%94s\n" "--ack-frequency N (100)" "Ack frequency (0 implies none of the messages will get accepted)" - - printf "\n%14s\n%94s\n" "--tx N (0)" "batch size for transactions (0 implies transaction are not used)" - - printf "\n%30s\n%94s\n" "--rollback-frequency N (0)" "rollback frequency (0 implies no transaction will be rolledback)" - - printf "\n%30s\n%55s\n" "--print-content yes|no (1)" "print out message content" - - printf "\n%30s\n%55s\n" "--print-headers yes|no (0)" "print out message headers" - - printf "\n%18s\n%76s\n" "--report-total" "Report total throughput and latency statistics" - - printf "\n%24s\n%87s\n" "--report-every N (0)" "Report throughput and latency statistics every N messages" - - printf "\n%30s\n%47s\n" "--report-header yes|no (1)" "Headers on report" - - printf "\n%21s\n%64s\n%62s\n" "--send-rate N (0)" "Send at rate of N messages/second." "0 means send as fast as possible" - - printf "\n%25s\n%69s\n%77s\n" "--sequence yes|no (1)" "Add a sequence number messages property" "(required for duplicate/lost message detection)" - - printf "\n%26s\n%64s\n%77s\n" "--timestamp yes|no (1)" "Add a time stamp messages property" "(required for duplicate/lost message detection)" - - printf "\n%14s\n%69s\n" "--jvm-args" "Extra jvm arguments you want to specify" - - printf "\n%17s\n%69s\n\n" "-v, --verbose" "Print debug information for this script" -} - -eval set -- "$TEMP" -while true; do - case $1 in - -b|--broker) - URL="$2"; shift; shift; continue - ;; - -a|--address) - ADDRESS="$2"; shift; shift; continue - ;; - -m|--messages) - MESSAGES="$2"; shift; shift; continue - ;; - -i|--id) - ID="$2"; shift; shift; continue - ;; - --reply-to) - REPLY_TO="$2"; shift; shift; continue - ;; - --send-eos) - SEND_EOS="$2"; shift; shift; continue - ;; - --durable) - if [ "$2" == "1" ]; then DURABLE="true"; else DURABLE="false"; fi; shift; shift; continue - ;; - --ttl) - TTL="$2"; shift; shift; continue - ;; - --priority) - PRIORITY="$2"; shift; shift; continue - ;; - -P|--property) - PROPERTY="$2,$PROPERTY"; shift; shift; continue - ;; - --correlation-id) - CORRELATION_ID="$2"; shift; shift; continue - ;; - --user-id) - USER_ID="$2"; shift; shift; continue - ;; - --content-string) - CONTENT_STRING="$2"; shift; shift; continue - ;; - --content-size) - CONTENT_SIZE="$2"; shift; shift; continue - ;; - -M|--content-map) - CONTENT_MAP="$2,$CONTENT_MAP"; shift; shift; continue - ;; - --capacity) - CAPACITY="$2"; shift; shift; continue - ;; - --ack-frequency) - ACK_FREQUENCY="$2"; shift; shift; continue - ;; - --tx) - TX="$2"; shift; shift; continue - ;; - --rollback-frequency) - ROLLBACK_FREQUENCY="$2"; shift; shift; continue - ;; - --print-content) - if [ "$2" == "yes" ]; then PRINT_CONTENT="true"; else PRINT_CONTENT="false"; fi; shift; shift; continue - ;; - --print-headers) - if [ "$2" == "yes" ]; then PRINT_HEADERS="true"; else PRINT_HEADERS="false"; fi; shift; shift; continue - ;; - --report-total) - REPORT_TOTAL="true"; shift; continue - ;; - --report-every) - REPORT_EVERY="$2"; shift; shift; continue - ;; - --report-header) - if [ "$2" == "yes" ]; then REPORT_HEADER="true"; else REPORT_HEADER="false"; fi; shift; shift; continue - ;; - --send-rate) - SEND_RATE="$2"; shift; shift; continue - ;; - --sequence) - if [ "$2" == "yes" ]; then SEQUENCE="true"; else SEQUENCE="false"; fi; shift; shift; continue - ;; - --timestamp) - if [ "$2" == "yes" ]; then DISABLE_TIMESTAMP="false"; else DISABLE_TIMESTAMP="true"; fi; shift; shift; continue - ;; - -a|--jvm-args) - EXTRA_JVM_ARGS="$2"; shift; shift; continue - ;; - -h|--help) - usage - exit 0 - ;; - -v|--verbose) - VERBOSE="1"; shift; continue - ;; - --) - # no more arguments to parse - break - ;; - *) - # no more arguments to parse - break - ;; - esac -done - -SENDER_ARGS="-server -Durl=$URL \ --Daddress=$ADDRESS \ --Dmsg-count=$MESSAGES \ --Dsend-eos=$SEND_EOS \ --Ddurable=$DURABLE \ --Dmsg_size=$CONTENT_SIZE \ --Dsend-rate=$SEND_RATE \ --Ddisable-timestamp=$DISABLE_TIMESTAMP \ --Dttl=$TTL \ --Dpriority=$PRIORITY \ --Dtx=$TX \ --Drollback-frequnecy=$ROLLBACK_FREQUENCY \ --Dprint-content=$PRINT_CONTENT \ --Dprint-headers=$PRINT_HEADERS \ --Dreport-total=$REPORT_TOTAL \ --Dreport-every=$REPORT_EVERY \ --Dreport-header=$REPORT_HEADER \ --Dmax_prefetch=$CAPACITY " - -if [ "x$ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Did=$ID"; fi -if [ "x$USER_ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Duser_id=$USER_ID"; fi -if [ "x$CORRELATION_ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Dcorrelation_id=$CORRELATION_ID"; fi - -if [ "$VERBOSE" == "1" ]; then echo $SENDER_ARGS; fi -$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $SENDER_ARGS org.apache.qpid.tools.QpidSend diff --git a/qpid/java/tools/bin/qpid-python-testkit b/qpid/java/tools/bin/qpid-python-testkit deleted file mode 100755 index 20f72aca53..0000000000 --- a/qpid/java/tools/bin/qpid-python-testkit +++ /dev/null @@ -1,33 +0,0 @@ -#!/usr/bin/env bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# This is wrapper script to run the tests defined in testkit.py -# via the python test runner. The defaults are set for a running -# from an svn checkout - -. check-qpid-java-env - -export PYTHONPATH=./:$PYTHONPATH -echo $PYTHONPATH -if [ "$OUTDIR" = "" ] ; then - OUTDIR=$PWD -fi -testdir=$OUTDIR/testkit-out-`date +%F-%H-%M-%S` -qpid-python-test -m testkit -DOUTDIR=$testdir"$@" diff --git a/qpid/java/tools/bin/run-pub b/qpid/java/tools/bin/run-pub deleted file mode 100755 index 9efe58c4b8..0000000000 --- a/qpid/java/tools/bin/run-pub +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -. check-qpid-java-env - -JVM_ARGS="$1" -PROGRAM_ARGS="$2" - -echo "JVM ARGS : $JAVA_MEM $JVM_ARGS" -echo "PROGRAM ARGS : $PROGRAM_ARGS" -$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfProducer $PROGRAM_ARGS diff --git a/qpid/java/tools/bin/run-sub b/qpid/java/tools/bin/run-sub deleted file mode 100755 index 8449563f7f..0000000000 --- a/qpid/java/tools/bin/run-sub +++ /dev/null @@ -1,32 +0,0 @@ -#!/bin/sh -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -. check-qpid-java-env - -echo "All args $@" - -JVM_ARGS="$1" -PROGRAM_ARGS="$2" - -echo "JVM ARGS : $JAVA_MEM $JVM_ARGS" -echo "PROGRAM ARGS : $PROGRAM_ARGS" - -$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfConsumer $PROGRAM_ARGS - diff --git a/qpid/java/tools/bin/testkit.py b/qpid/java/tools/bin/testkit.py deleted file mode 100755 index 1c2ad598b8..0000000000 --- a/qpid/java/tools/bin/testkit.py +++ /dev/null @@ -1,278 +0,0 @@ -#!/usr/bin/env python - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import time, string, traceback -from brokertest import * -from qpid.messaging import * - - -try: - import java.lang.System - _cp = java.lang.System.getProperty("java.class.path"); -except ImportError: - _cp = checkenv("QP_CP") - -class Formatter: - - def __init__(self, message): - self.message = message - self.environ = {"M": self.message, - "P": self.message.properties, - "C": self.message.content} - - def __getitem__(self, st): - return eval(st, self.environ) - -# The base test case has support for launching the generic -# receiver and sender through the TestLauncher with all the options. -# -class JavaClientTest(BrokerTest): - """Base Case for Java Test cases""" - - client_class = "org.apache.qpid.testkit.TestLauncher" - - # currently there is no transparent reconnection. - # temp hack: just creating the queue here and closing it. - def start_error_watcher(self,broker=None): - ssn = broker.connect().session() - err_watcher = ssn.receiver("control; {create:always}", capacity=1) - ssn.close() - - def store_module_args(self): - if BrokerTest.store_lib: - return ["--load-module", BrokerTest.store_lib] - else: - print "Store module not present." - return [""] - - def client(self,**options): - cmd = ["java","-cp",_cp] - - cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")] - cmd += ["-Dhost=" + options.get("host","127.0.0.1")] - cmd += ["-Dport=" + str(options.get("port",5672))] - cmd += ["-Dcon_count=" + str(options.get("con_count",1))] - cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))] - cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))] - cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))] - cmd += ["-Ddurable=" + str(options.get("durable",False))] - cmd += ["-Dtransacted=" + str(options.get("transacted",False))] - cmd += ["-Dreceiver=" + str(options.get("receiver",False))] - cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))] - cmd += ["-Dsender=" + str(options.get("sender",False))] - cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))] - cmd += ["-Dtx_size=" + str(options.get("tx_size",10))] - cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))] - cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))] - cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))] - cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))] - cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))] - cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")] - cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))] - cmd += ["-Dlog.level=" + options.get("log.level", "warn")] - cmd += [self.client_class] - cmd += [options.get("address", "my_queue; {create: always}")] - - #print str(options.get("port",5672)) - return cmd - - # currently there is no transparent reconnection. - # temp hack: just creating a receiver and closing session soon after. - def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60): - ssn = broker.connect().session() - err_watcher = ssn.receiver("control; {create:always}", capacity=1) - i = run_time/error_ck_freq - is_error = False - for j in range(i): - not_empty = True - while not_empty: - try: - m = err_watcher.fetch(timeout=error_ck_freq) - ssn.acknowledge() - print "Java process notified of an error" - self.print_error(m) - is_error = True - except messaging.Empty, e: - not_empty = False - - ssn.close() - return is_error - - def print_error(self,msg): - print msg.properties.get("exception-trace") - - def verify(self, receiver,sender): - sender_running = receiver.is_running() - receiver_running = sender.is_running() - - self.assertTrue(receiver_running,"Receiver has exited prematually") - self.assertTrue(sender_running,"Sender has exited prematually") - - def start_sender_and_receiver(self,**options): - - receiver_opts = options - receiver_opts["receiver"]=True - receiver = self.popen(self.client(**receiver_opts), - expect=EXPECT_RUNNING) - - sender_opts = options - sender_opts["sender"]=True - sender = self.popen(self.client(**sender_opts), - expect=EXPECT_RUNNING) - - return receiver, sender - - def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options): - if options.get("durable",False)==True: - cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args()) - else: - cluster = Cluster(self, count=count) - return cluster - -class ConcurrencyTest(JavaClientTest): - """A concurrency test suite for the JMS client""" - skip = False - - def base_case(self,**options): - if self.skip : - print "Skipping test" - return - - cluster = self.start_cluster(count=2,**options) - self.start_error_watcher(broker=cluster[0]) - options["port"] = port=cluster[0].port() - - options["use_unique_dests"]=True - options["address"]="amq.topic" - receiver, sender = self.start_sender_and_receiver(**options) - self.monitor_clients(broker=cluster[0],run_time=180) - self.verify(receiver,sender) - - def test_multiplexing_con(self): - """Tests multiple sessions on a single connection""" - - self.base_case(ssn_per_con=25,test_name=self.id()) - - def test_multiplexing_con_with_tx(self): - """Tests multiple transacted sessions on a single connection""" - - self.base_case(ssn_per_con=25,transacted=True,test_name=self.id()) - - def test_multiplexing_con_with_sync_rcv(self): - """Tests multiple sessions with sync receive""" - - self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id()) - - def test_multiplexing_con_with_durable_sub(self): - """Tests multiple sessions with durable subs""" - - self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id()) - - def test_multiplexing_con_with_sync_ack(self): - """Tests multiple sessions with sync ack""" - - self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id()) - - def test_multiplexing_con_with_sync_pub(self): - """Tests multiple sessions with sync pub""" - - self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id()) - - def test_multiple_cons_and_ssns(self): - """Tests multiple connections and sessions""" - - self.base_case(con_count=10,ssn_per_con=25,test_name=self.id()) - - -class SoakTest(JavaClientTest): - """A soak test suite for the JMS client""" - - def base_case(self,**options): - cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options) - options["port"] = port=cluster[0].port() - self.start_error_watcher(broker=cluster[0]) - options["use_unique_dests"]=True - options["address"]="amq.topic" - receiver,sender = self.start_sender_and_receiver(**options) - is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30) - - if (is_error): - print "The sender or receiver didn't start properly. Exiting test." - return - else: - "Print no error !" - - # grace period for java clients to get the failover properly setup. - time.sleep(30) - error_msg= None - # Kill original brokers, start new ones. - try: - for i in range(8): - cluster[i].kill() - b=cluster.start() - self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) - print "iteration : " + str(i) - except ConnectError, e1: - error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1) - - except SessionError, e2: - error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2) - - self.verify(receiver,sender) - if error_msg: - raise Exception(error_msg) - - - def test_failover(self) : - """Test basic failover""" - - self.base_case(test_name=self.id()) - - - def test_failover_with_durablesub(self): - """Test failover with durable subscriber""" - - self.base_case(durable=True,jms_durable_sub=True,test_name=self.id()) - - - def test_failover_with_sync_rcv(self): - """Test failover with sync receive""" - - self.base_case(sync_rcv=True,test_name=self.id()) - - - def test_failover_with_sync_ack(self): - """Test failover with sync ack""" - - self.base_case(sync_ack=True,test_name=self.id()) - - - def test_failover_with_noprefetch(self): - """Test failover with no prefetch""" - - self.base_case(max_prefetch=1,test_name=self.id()) - - - def test_failover_with_multiple_cons_and_ssns(self): - """Test failover with multiple connections and sessions""" - - self.base_case(use_unique_dests=True,address="amq.topic", - con_count=10,ssn_per_con=25,test_name=self.id()) diff --git a/qpid/java/tools/etc/perf-report.gnu b/qpid/java/tools/etc/perf-report.gnu deleted file mode 100644 index b7662b0bfe..0000000000 --- a/qpid/java/tools/etc/perf-report.gnu +++ /dev/null @@ -1,61 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -set terminal png -set datafile separator "," - -set title "Variation of avg latency between iterations" -set yrange [10:20] -set xlabel "Iterations" -set ylabel "Latency (ms)" -set output "avg_latency.png" -plot "stats-csv.log" using 9 title "avg latency" with lines, 14 title "target latency" with lines - - -set title "Variation of max latency between iterations" -set yrange [0:1000] -set xlabel "Iterations" -set ylabel "Latency (ms)" -set output "max_latency.png" -plot "stats-csv.log" using 11 title "max latency" with lines,14 title "target latency" with lines,100 title "100 ms" with lines - - -set title "Variation of standard deviation of latency between iterations" -set yrange [0:20] -set xlabel "Iterations" -set ylabel "Standard Deviation" -set output "std_dev_latency.png" -plot "stats-csv.log" using 12 title "standard deviation" with lines - - -set title "Variation of system throughput between iterations" -set yrange [400000:450000] -set xlabel "Iterations" -set ylabel "System Throuhgput (msg/sec)" -set output "system_rate.png" -plot "stats-csv.log" using 2 title "system throughput" with lines - - -set title "Variation of avg producer & consumer rates between iterations" -set yrange [6500:7500] -set xlabel "Iterations" -set ylabel "Avg Rates (msg/sec)" -set output "prod_cons_rate.png" -plot "stats-csv.log" using 6 title "producer rate" with lines,"stats-csv.log" using 3 title "consumer rate" with lines - diff --git a/qpid/java/tools/etc/test.log4j b/qpid/java/tools/etc/test.log4j deleted file mode 100644 index b574a7b5b7..0000000000 --- a/qpid/java/tools/etc/test.log4j +++ /dev/null @@ -1,28 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -log4j.rootLogger=${root.logging.level} - -log4j.logger.org.apache.qpid=ERROR, console -log4j.additivity.org.apache.qpid=false - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.Threshold=all -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n - diff --git a/qpid/java/tools/pom.xml b/qpid/java/tools/pom.xml deleted file mode 100644 index ac90c98ee7..0000000000 --- a/qpid/java/tools/pom.xml +++ /dev/null @@ -1,88 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-java-build</artifactId> - <version>0.32-SNAPSHOT</version> - </parent> - - <artifactId>qpid-tools</artifactId> - <name>Qpid Tools</name> - <description>Tools</description> - - <dependencies> - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>qpid-client</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>${log4j-version}</version> - </dependency> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>${slf4j-version}</version> - </dependency> - - <dependency> - <groupId>org.apache.geronimo.specs</groupId> - <artifactId>geronimo-jms_1.1_spec</artifactId> - <version>${geronimo-jms-1-1-version}</version> - </dependency> - - <dependency> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - <version>${commons-codec-version}</version> - </dependency> - - <dependency> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - <version>${jackson-version}</version> - </dependency> - - <dependency> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - <version>${jackson-version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-deploy-plugin</artifactId> - <!--version specified in parent pluginManagement --> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - </plugins> - </build> - -</project> diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java deleted file mode 100644 index b10129d855..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.text.DateFormat; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Session; - -public abstract class Client implements ExceptionListener -{ - private Connection con; - private Session ssn; - private boolean durable = false; - private boolean transacted = false; - private int txSize = 10; - private int ack_mode = Session.AUTO_ACKNOWLEDGE; - private String contentType = "application/octet-stream"; - - private long reportFrequency = 60000; // every min - - private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - private NumberFormat nf = new DecimalFormat("##.00"); - - private long startTime = System.currentTimeMillis(); - private ErrorHandler errorHandler = null; - - public Client(Connection con) throws Exception - { - this.con = con; - this.con.setExceptionListener(this); - durable = Boolean.getBoolean("durable"); - transacted = Boolean.getBoolean("transacted"); - txSize = Integer.getInteger("tx_size",10); - contentType = System.getProperty("content_type","application/octet-stream"); - reportFrequency = Long.getLong("report_frequency", 60000); - } - - public void close() - { - try - { - con.close(); - } - catch (Exception e) - { - handleError("Error closing connection",e); - } - } - - public void onException(JMSException e) - { - handleError("Connection error",e); - } - - public void setErrorHandler(ErrorHandler h) - { - this.errorHandler = h; - } - - public void handleError(String msg,Exception e) - { - if (errorHandler != null) - { - errorHandler.handleError(msg, e); - } - else - { - System.err.println(msg); - e.printStackTrace(); - } - } - - protected Session getSsn() - { - return ssn; - } - - protected void setSsn(Session ssn) - { - this.ssn = ssn; - } - - protected boolean isDurable() - { - return durable; - } - - protected boolean isTransacted() - { - return transacted; - } - - protected int getTxSize() - { - return txSize; - } - - protected int getAck_mode() - { - return ack_mode; - } - - protected String getContentType() - { - return contentType; - } - - protected long getReportFrequency() - { - return reportFrequency; - } - - protected long getStartTime() - { - return startTime; - } - - protected void setStartTime(long startTime) - { - this.startTime = startTime; - } - - public DateFormat getDf() - { - return df; - } - -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java deleted file mode 100644 index de7748acd6..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -public interface ErrorHandler { - - public void handleError(String msg,Exception e); -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java deleted file mode 100644 index 8dcf59e9c1..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.util.ArrayList; -import java.util.List; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; - -/** - * A generic receiver which consumes messages - * from a given address in a broker (host/port) - * until told to stop by killing it. - * - * It participates in a feedback loop to ensure the producer - * doesn't fill up the queue. If it receives an "End" msg - * it sends a reply to the replyTo address in that msg. - * - * It doesn't check for correctness or measure anything - * leaving those concerns to another entity. - * However it prints a timestamp every x secs(-Dreport_frequency) - * as checkpoint to figure out how far the test has progressed if - * a failure occurred. - * - * It also takes in an optional Error handler to - * pass out any error in addition to writing them to std err. - * - * This is intended more as building block to create - * more complex test cases. However there is a main method - * provided to use this standalone. - * - * The following options are available and configurable - * via jvm args. - * - * sync_rcv - Whether to consume sync (instead of using a listener). - * report_frequency - how often a timestamp is printed - * durable - * transacted - * tx_size - size of transaction batch in # msgs. * - * check_for_dups - check for duplicate messages and out of order messages. - * jms_durable_sub - create a durable subscription instead of a regular subscription. - */ -public class Receiver extends Client implements MessageListener -{ - private long msg_count = 0; - private int sequence = 0; - private boolean syncRcv = Boolean.getBoolean("sync_rcv"); - private boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub"); - private boolean checkForDups = Boolean.getBoolean("check_for_dups"); - private MessageConsumer consumer; - private List<Integer> duplicateMessages = new ArrayList<Integer>(); - - public Receiver(Connection con,String addr) throws Exception - { - super(con); - setSsn(con.createSession(isTransacted(), getAck_mode())); - consumer = getSsn().createConsumer(new AMQAnyDestination(addr)); - if (!syncRcv) - { - consumer.setMessageListener(this); - } - - System.out.println("Receiving messages from : " + addr); - } - - public void onMessage(Message msg) - { - handleMessage(msg); - } - - public void run() throws Exception - { - long sleepTime = getReportFrequency(); - while(true) - { - if(syncRcv) - { - long t = sleepTime; - while (t > 0) - { - long start = System.currentTimeMillis(); - Message msg = consumer.receive(t); - t = t - (System.currentTimeMillis() - start); - handleMessage(msg); - } - } - Thread.sleep(sleepTime); - System.out.println(getDf().format(System.currentTimeMillis()) - + " - messages received : " + msg_count); - } - } - - private void handleMessage(Message m) - { - if (m == null) { return; } - - try - { - if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) - { - MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo()); - Message controlMsg = getSsn().createTextMessage(); - temp.send(controlMsg); - if (isTransacted()) - { - getSsn().commit(); - } - temp.close(); - } - else - { - - int seq = m.getIntProperty("sequence"); - if (checkForDups) - { - if (seq == 0) - { - sequence = 0; // wrap around for each iteration - System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration"); - duplicateMessages.clear(); - } - - if (seq < sequence) - { - duplicateMessages.add(seq); - } - else if (seq == sequence) - { - sequence++; - msg_count ++; - } - else - { - // Multiple publishers are not allowed in this test case. - // So out of order messages are not allowed. - throw new Exception(": Received an out of order message (expected=" - + sequence + ",received=" + seq + ")" ); - } - } - else - { - msg_count ++; - } - - // Please note that this test case doesn't expect duplicates - // When testing for transactions. - if (isTransacted() && msg_count % getTxSize() == 0) - { - getSsn().commit(); - } - } - } - catch (Exception e) - { - e.printStackTrace(); - handleError("Exception receiving messages",e); - } - } - - // Receiver host port address - public static void main(String[] args) throws Exception - { - String host = "127.0.0.1"; - int port = 5672; - String addr = "message_queue"; - - if (args.length > 0) - { - host = args[0]; - } - if (args.length > 1) - { - port = Integer.parseInt(args[1]); - } - if (args.length > 2) - { - addr = args[2]; - } - - AMQConnection con = new AMQConnection( - "amqp://username:password@topicClientid/test?brokerlist='tcp://" - + host + ":" + port + "'"); - - Receiver rcv = new Receiver(con,addr); - rcv.run(); - } - -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java deleted file mode 100644 index 14b9b7302f..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.text.DateFormat; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.Random; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.tools.MessageFactory; - -/** - * A generic sender which sends a stream of messages - * to a given address in a broker (host/port) - * until told to stop by killing it. - * - * It has a feedback loop to ensure it doesn't fill - * up queues due to a slow consumer. - * - * It doesn't check for correctness or measure anything - * leaving those concerns to another entity. - * However it prints a timestamp every x secs(-Dreport_frequency) - * as checkpoint to figure out how far the test has progressed if - * a failure occurred. - * - * It also takes in an optional Error handler to - * pass out any error in addition to writing them to std err. - * - * This is intended more as building block to create - * more complex test cases. However there is a main method - * provided to use this standalone. - * - * The following options are available and configurable - * via jvm args. - * - * msg_size (256) - * msg_count (10) - # messages before waiting for feedback - * sleep_time (1000 ms) - sleep time btw each iteration - * report_frequency - how often a timestamp is printed - * durable - * transacted - * tx_size - size of transaction batch in # msgs. - */ -public class Sender extends Client -{ - protected int msg_size = 256; - protected int msg_count = 10; - protected int iterations = -1; - protected long sleep_time = 1000; - - protected Destination dest = null; - protected Destination replyTo = null; - protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - protected NumberFormat nf = new DecimalFormat("##.00"); - - protected MessageProducer producer; - Random gen = new Random(19770905); - - public Sender(Connection con,String addr) throws Exception - { - super(con); - this.msg_size = Integer.getInteger("msg_size", 100); - this.msg_count = Integer.getInteger("msg_count", 10); - this.iterations = Integer.getInteger("iterations", -1); - this.sleep_time = Long.getLong("sleep_time", 1000); - this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE)); - this.dest = new AMQAnyDestination(addr); - this.producer = getSsn().createProducer(dest); - this.replyTo = getSsn().createTemporaryQueue(); - - System.out.println("Sending messages to : " + addr); - } - - /* - * If msg_size not specified it generates a message - * between 500-1500 bytes. - */ - protected Message getNextMessage() throws Exception - { - int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size; - Message msg = (getContentType().equals("text/plain")) ? - MessageFactory.createTextMessage(getSsn(), s): - MessageFactory.createBytesMessage(getSsn(), s); - - msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT - : DeliveryMode.NON_PERSISTENT); - return msg; - } - - public void run() - { - try - { - boolean infinite = (iterations == -1); - for (int x=0; infinite || x < iterations; x++) - { - long now = System.currentTimeMillis(); - if (now - getStartTime() >= getReportFrequency()) - { - System.out.println(df.format(now) + " - iterations : " + x); - setStartTime(now); - } - - for (int i = 0; i < msg_count; i++) - { - Message msg = getNextMessage(); - msg.setIntProperty("sequence",i); - producer.send(msg); - if (isTransacted() && msg_count % getTxSize() == 0) - { - getSsn().commit(); - } - } - TextMessage m = getSsn().createTextMessage("End"); - m.setJMSReplyTo(replyTo); - producer.send(m); - - if (isTransacted()) - { - getSsn().commit(); - } - - MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo); - feedbackConsumer.receive(); - feedbackConsumer.close(); - if (isTransacted()) - { - getSsn().commit(); - } - Thread.sleep(sleep_time); - } - } - catch (Exception e) - { - handleError("Exception sending messages",e); - } - } - - // Receiver host port address - public static void main(String[] args) throws Exception - { - String host = "127.0.0.1"; - int port = 5672; - String addr = "message_queue"; - - if (args.length > 0) - { - host = args[0]; - } - if (args.length > 1) - { - port = Integer.parseInt(args[1]); - } - if (args.length > 2) - { - addr = args[2]; - } - - AMQConnection con = new AMQConnection( - "amqp://username:password@topicClientid/test?brokerlist='tcp://" - + host + ":" + port + "'"); - - Sender sender = new Sender(con,addr); - sender.run(); - } -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java deleted file mode 100644 index 0c94030ec6..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java +++ /dev/null @@ -1,370 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.text.DateFormat; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; - -/** - * A basic test case class that could launch a Sender/Receiver - * or both, each on it's own separate thread. - * - * If con_count == ssn_count, then each entity created will have - * it's own Connection. Else if con_count {@literal <} ssn_count, then - * a connection will be shared by ssn_count/con_count # of entities. - * - * The if both sender and receiver options are set, it will - * share a connection. - * - * The following options are available as jvm args - * host, port - * con_count,ssn_count - * con_idle_time - which determines heartbeat - * sender, receiver - booleans which indicate which entity to create. - * Setting them both is also a valid option. - */ -public class TestLauncher implements ErrorHandler -{ - protected String host = "127.0.0.1"; - protected int port = 5672; - protected int sessions_per_con = 1; - protected int connection_count = 1; - protected long heartbeat = 5000; - protected boolean sender = false; - protected boolean receiver = false; - protected boolean useUniqueDests = false; - protected String url; - - protected String address = "my_queue; {create: always}"; - protected boolean durable = false; - protected String failover = ""; - protected AMQConnection controlCon; - protected Destination controlDest = null; - protected Session controlSession = null; - protected MessageProducer statusSender; - protected List<AMQConnection> clients = new ArrayList<AMQConnection>(); - protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - protected NumberFormat nf = new DecimalFormat("##.00"); - protected String testName; - - public TestLauncher() - { - testName = System.getProperty("test_name","UNKNOWN"); - host = System.getProperty("host", "127.0.0.1"); - port = Integer.getInteger("port", 5672); - sessions_per_con = Integer.getInteger("ssn_per_con", 1); - connection_count = Integer.getInteger("con_count", 1); - heartbeat = Long.getLong("heartbeat", 5); - sender = Boolean.getBoolean("sender"); - receiver = Boolean.getBoolean("receiver"); - useUniqueDests = Boolean.getBoolean("use_unique_dests"); - - failover = System.getProperty("failover", ""); - durable = Boolean.getBoolean("durable"); - - url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" - + host + ":" + port + "?heartbeat='" + heartbeat+ "''"; - - if (failover.equalsIgnoreCase("failover_exchange")) - { - url += "&failover='failover_exchange'"; - - System.out.println("Failover exchange " + url ); - } - - configureLogging(); - } - - protected void configureLogging() - { - PatternLayout layout = new PatternLayout(); - layout.setConversionPattern("%t %d %p [%c{4}] %m%n"); - BasicConfigurator.configure(new ConsoleAppender(layout)); - - String logLevel = System.getProperty("log.level","warn"); - String logComponent = System.getProperty("log.comp","org.apache.qpid"); - - Logger logger = Logger.getLogger(logComponent); - logger.setLevel(Level.toLevel(logLevel, Level.WARN)); - - System.out.println("Level " + logger.getLevel()); - - } - - public void setUpControlChannel() - { - try - { - controlCon = new AMQConnection(url); - controlCon.start(); - - controlDest = new AMQAnyDestination("control; {create: always}"); // durable - - // Create the session to setup the messages - controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE); - statusSender = controlSession.createProducer(controlDest); - - } - catch (Exception e) - { - handleError("Error while setting up the test",e); - } - } - - public void cleanup() - { - try - { - controlSession.close(); - controlCon.close(); - for (AMQConnection con : clients) - { - con.close(); - } - } - catch (Exception e) - { - handleError("Error while tearing down the test",e); - } - } - - public void start(String addr) - { - try - { - if (addr == null) - { - addr = address; - } - - int ssn_per_con = sessions_per_con; - String addrTemp = addr; - for (int i = 0; i< connection_count; i++) - { - AMQConnection con = new AMQConnection(url); - con.start(); - clients.add(con); - for (int j = 0; j< ssn_per_con; j++) - { - String index = createPrefix(i,j); - if (useUniqueDests) - { - addrTemp = modifySubject(index,addr); - } - - if (sender) - { - createSender(index,con,addrTemp,this); - } - - if (receiver) - { - System.out.println("########## Creating receiver ##################"); - - createReceiver(index,con,addrTemp,this); - } - } - } - } - catch (Exception e) - { - handleError("Exception while setting up the test",e); - } - - } - - protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h) - { - Runnable r = new Runnable() - { - public void run() - { - try - { - Receiver rcv = new Receiver(con,addr); - rcv.setErrorHandler(h); - rcv.run(); - } - catch (Exception e) - { - h.handleError("Error Starting Receiver", e); - } - } - }; - - Thread t = null; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - handleError("Error creating Receive thread",e); - } - - t.setName("ReceiverThread-" + index); - t.start(); - } - - protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h) - { - Runnable r = new Runnable() - { - public void run() - { - try - { - Sender sender = new Sender(con, addr); - sender.setErrorHandler(h); - sender.run(); - } - catch (Exception e) - { - h.handleError("Error Starting Sender", e); - } - } - }; - - Thread t = null; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - handleError("Error creating Sender thread",e); - } - - t.setName("SenderThread-" + index); - t.start(); - } - - public synchronized void handleError(String msg,Exception e) - { - // In case sending the message fails - StringBuilder sb = new StringBuilder(); - sb.append(msg); - sb.append(" @ "); - sb.append(df.format(new Date(System.currentTimeMillis()))); - sb.append(" "); - sb.append(e.getMessage()); - System.err.println(sb.toString()); - e.printStackTrace(); - - try - { - TextMessage errorMsg = controlSession.createTextMessage(); - errorMsg.setStringProperty("status", "error"); - errorMsg.setStringProperty("desc", msg); - errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis()))); - errorMsg.setStringProperty("exception-trace", serializeStackTrace(e)); - - System.out.println("Msg " + errorMsg); - - statusSender.send(errorMsg); - } - catch (JMSException e1) - { - e1.printStackTrace(); - } - } - - private String serializeStackTrace(Exception e) - { - ByteArrayOutputStream bOut = new ByteArrayOutputStream(); - PrintStream printStream = new PrintStream(bOut); - e.printStackTrace(printStream); - printStream.close(); - return bOut.toString(); - } - - private String createPrefix(int i, int j) - { - return String.valueOf(i).concat(String.valueOf(j)); - } - - /** - * A basic helper function to modify the subjects by - * appending an index. - */ - private String modifySubject(String index,String addr) - { - if (addr.indexOf("/") > 0) - { - addr = addr.substring(0,addr.indexOf("/")+1) + - index + - addr.substring(addr.indexOf("/")+1,addr.length()); - } - else if (addr.indexOf(";") > 0) - { - addr = addr.substring(0,addr.indexOf(";")) + - "/" + index + - addr.substring(addr.indexOf(";"),addr.length()); - } - else - { - addr = addr + "/" + index; - } - - return addr; - } - - public static void main(String[] args) - { - final TestLauncher test = new TestLauncher(); - test.setUpControlChannel(); - System.out.println("args.length " + args.length); - System.out.println("args [0] " + args [0]); - test.start(args.length > 0 ? args [0] : null); - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { test.cleanup(); } - }); - - } -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java deleted file mode 100644 index 7eb83a520b..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * In the future this will be replaced by a Clock abstraction - * that can utilize a realtime clock when running in RT Java. - */ - -public class Clock -{ - private static final Logger _logger = LoggerFactory.getLogger(Clock.class); - - public final static long SEC = 60000; - - private static Precision precision; - private static long offset = -1; // in nano secs - - public enum Precision - { - NANO_SECS, MILI_SECS; - - static Precision getPrecision(String str) - { - if ("mili".equalsIgnoreCase(str)) - { - return MILI_SECS; - } - else - { - return NANO_SECS; - } - } - }; - - static - { - precision = Precision.getPrecision(System.getProperty("precision","mili")); - //offset = Long.getLong("offset",-1); - - if (_logger.isDebugEnabled()) - { - System.out.println("Using precision : " + precision ); - //+ " and offset " + offset); - } - } - - public static Precision getPrecision() - { - return precision; - } - - public static long getTime() - { - if (precision == Precision.NANO_SECS) - { - if (offset == -1) - { - return System.nanoTime(); - } - else - { - return System.nanoTime() + offset; - } - } - else - { - if (offset == -1) - { - return System.currentTimeMillis(); - } - else - { - return System.currentTimeMillis() + offset/convertToMiliSecs(); - } - } - } - - public static long convertToSecs() - { - if (precision == Precision.NANO_SECS) - { - return 1000000000; - } - else - { - return 1000; - } - } - - public static long convertToMiliSecs() - { - if (precision == Precision.NANO_SECS) - { - return 1000000; - } - else - { - return 1; - } - } -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JMXStressTestClient.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JMXStressTestClient.java deleted file mode 100644 index 1b3c961660..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JMXStressTestClient.java +++ /dev/null @@ -1,329 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - - -import javax.management.MBeanServerConnection; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXConnectorFactory; -import javax.management.remote.JMXServiceURL; -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.qpid.tools.util.ArgumentsParser; - -public class JMXStressTestClient -{ - - public static void main(String[] args) throws Exception - { - ArgumentsParser parser = new ArgumentsParser(); - Arguments arguments; - try - { - arguments = parser.parse(args, Arguments.class); - arguments.validate(); - } - catch(IllegalArgumentException e) - { - System.out.println("Invalid argument:" + e.getMessage()); - parser.usage(Arguments.class, Arguments.REQUIRED); - System.out.println("\nRun example:"); - System.out.println(" java -cp qpid-tools.jar org.apache.qpid.tools.JMXStressTestClient \\"); - System.out.println(" repetitions=10 host=localhost port=8999 username=admin password=admin \\"); - System.out.println(" virtualHost=default createQueue=true bindQueue=true deleteQueue=true \\"); - System.out.println(" uniqueQueues=true queueName=boo exchangeName=amq.fanout"); - return; - } - - JMXStressTestClient client = new JMXStressTestClient(); - client.run(arguments); - } - - public void run(Arguments arguments) throws IOException,MalformedObjectNameException - { - log(arguments.toString()); - for (int i = 0; i < arguments.getRepetitions(); i++) - { - try(JMXConnector connector = createConnector(arguments.getHost(), arguments.getPort(), arguments.getUsername(), arguments.getPassword())) - { - runIteration(arguments, connector, i); - } - } - } - - private void runIteration(Arguments arguments, JMXConnector connector, int iteration) throws IOException, MalformedObjectNameException - { - log("Iteration " + iteration); - MBeanServerConnection connection = connector.getMBeanServerConnection(); - String virtualHost = arguments.getVirtualHost(); - if (virtualHost != null) - { - ObjectName virtualHostMBeanName = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost=" - + ObjectName.quote(virtualHost)); - - Set<ObjectName> virtualHostMBeans = connection.queryNames(virtualHostMBeanName, null); - if(virtualHostMBeans.size() == 0) - { - throw new IllegalArgumentException("VirtualHost MBean was not found for virtual host " + virtualHost); - } - - createAndBindQueueIfRequired(arguments, iteration, connection, virtualHostMBeanName); - } - } - - private void log(String logMessage) - { - System.out.println(logMessage); - } - - private void createAndBindQueueIfRequired(Arguments arguments, int iteration, MBeanServerConnection connection, - ObjectName virtualHostMBeanName) throws MalformedObjectNameException, IOException - { - if (arguments.isCreateQueue()) - { - String queueName = arguments.getQueueName(); - - if (queueName == null) - { - queueName = "temp-queue-" + System.nanoTime(); - } - else if (arguments.isUniqueQueues()) - { - queueName = queueName + "-" + iteration; - } - - createQueue(connection, virtualHostMBeanName, queueName); - - if (arguments.isBindQueue()) - { - bindQueue(connection, arguments.getVirtualHost(), queueName, arguments.getExchangeName()); - } - - if (arguments.isDeleteQueue()) - { - deleteQueue(connection, virtualHostMBeanName, queueName); - } - } - } - - private void deleteQueue(MBeanServerConnection connection, ObjectName virtualHostMBeanName, String queueName) - { - log(" Delete queue " + queueName); - try - { - connection.invoke(virtualHostMBeanName, "deleteQueue", new Object[]{queueName}, new String[]{String.class.getName()}); - } - catch (Exception e) - { - throw new RuntimeException("Cannot delete queue " + queueName, e); - } - } - - private void createQueue(MBeanServerConnection connection, ObjectName virtualHostMBeanName, String queueName) - { - log(" Create queue " + queueName); - try - { - connection.invoke(virtualHostMBeanName, "createNewQueue", new Object[]{queueName, null, true}, - new String[]{String.class.getName(), String.class.getName(), boolean.class.getName()}); - } - catch (Exception e) - { - throw new RuntimeException("Cannot create queue " + queueName, e); - } - } - - private void bindQueue(MBeanServerConnection connection, String virtualHost, String queueName, String exchangeName) - throws MalformedObjectNameException, IOException - { - if (exchangeName == null) - { - exchangeName = "amq.direct"; - } - - log(" Bind queue " + queueName + " to " + exchangeName + " using binding key " + queueName); - - ObjectName exchangeObjectName = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost=" - + ObjectName.quote(virtualHost) + "," - + "name=" + ObjectName.quote(exchangeName) + ",ExchangeType=*"); - - Set<ObjectName> exchanges = connection.queryNames(exchangeObjectName, null); - - if(exchanges.size() == 0) - { - throw new IllegalArgumentException("Cannot find exchange MBean for exchange " + exchangeName); - } - - try - { - connection.invoke(exchanges.iterator().next(), "createNewBinding", new Object[]{queueName, queueName}, - new String[]{String.class.getName(), String.class.getName()}); - } - catch (Exception e) - { - throw new RuntimeException("Cannot delete queue " + queueName, e); - } - } - - JMXConnector createConnector(String host, int port, String username, String password) throws IOException - { - Map<String, Object> env = new HashMap<>(); - JMXServiceURL jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi"); - env.put(JMXConnector.CREDENTIALS, new String[] {username,password}); - - return JMXConnectorFactory.connect(jmxUrl, env); - } - - public static class Arguments - { - private static final Set<String> REQUIRED = new HashSet<>(Arrays.asList("host", "port", "username", "password")); - - private String host = null; - private int port = -1; - private String username = null; - private String password = null; - - private String virtualHost = null; - private String queueName = null; - private String exchangeName = null; - - private int repetitions = 1; - - private boolean createQueue = false; - private boolean deleteQueue = false; - private boolean uniqueQueues = false; - private boolean bindQueue = false; - - public Arguments() - { - } - - public void validate() - { - if (host == null || host.equals("")) - { - throw new IllegalArgumentException("Mandatory argument 'host' is not specified"); - } - - if (port == -1) - { - throw new IllegalArgumentException("Mandatory argument 'port' is not specified"); - } - - if (username == null || username.equals("")) - { - throw new IllegalArgumentException("Mandatory argument 'username' is not specified"); - } - - if (password == null || password.equals("")) - { - throw new IllegalArgumentException("Mandatory argument 'password' is not specified"); - } - } - - public int getRepetitions() - { - return repetitions; - } - - public String getHost() - { - return host; - } - - public int getPort() - { - return port; - } - - public String getUsername() - { - return username; - } - - public String getPassword() - { - return password; - } - - public String getVirtualHost() - { - return virtualHost; - } - - public boolean isCreateQueue() - { - return createQueue; - } - - public boolean isDeleteQueue() - { - return deleteQueue; - } - - public boolean isUniqueQueues() - { - return uniqueQueues; - } - - public String getQueueName() - { - return queueName; - } - - public boolean isBindQueue() - { - return bindQueue; - } - - public String getExchangeName() - { - return exchangeName; - } - - @Override - public String toString() - { - return "Arguments{" + - "host='" + host + '\'' + - ", port=" + port + - ", username='" + username + '\'' + - ", password='" + password + '\'' + - ", virtualHost='" + virtualHost + '\'' + - ", queueName='" + queueName + '\'' + - ", exchangeName='" + exchangeName + '\'' + - ", repetitions=" + repetitions + - ", createQueue=" + createQueue + - ", deleteQueue=" + deleteQueue + - ", uniqueQueues=" + uniqueQueues + - ", bindQueue=" + bindQueue + - '}'; - } - } - -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java deleted file mode 100644 index bd6cfd4363..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.tools; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Enumeration; -import java.util.Hashtable; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; - -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import org.apache.qpid.client.AMQConnectionFactory; -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.jms.FailoverPolicy; - -public class JNDICheck -{ - private static final String QUEUE = "queue."; - private static final String TOPIC = "topic."; - private static final String DESTINATION = "destination."; - private static final String CONNECTION_FACTORY = "connectionfactory."; - - public static void main(String[] args) - { - - if (args.length != 1) - { - usage(); - } - - String propertyFile = args[0]; - - new JNDICheck(propertyFile); - } - - private static void usage() - { - exit("Usage: JNDICheck <JNDI Config file>", 0); - } - - private static void exit(String message, int exitCode) - { - System.err.println(message); - System.exit(exitCode); - } - - private static String JAVA_NAMING = "java.naming.factory.initial"; - - private Context _context = null; - private Hashtable _environment = null; - - public JNDICheck(String propertyFile) - { - - // Load JNDI properties - Properties properties = new Properties(); - - try(FileInputStream propertiesStream = new FileInputStream(new File(propertyFile))) - { - properties.load(propertiesStream); - } - catch (IOException e) - { - exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1); - } - - //Create the initial context - try - { - - System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING)); - - _context = new InitialContext(properties); - - _environment = _context.getEnvironment(); - - Enumeration keys = _environment.keys(); - - List<String> queues = new LinkedList<String>(); - List<String> topics = new LinkedList<String>(); - List<String> destinations = new LinkedList<String>(); - List<String> connectionFactories = new LinkedList<String>(); - - while (keys.hasMoreElements()) - { - String key = keys.nextElement().toString(); - - if (key.startsWith(QUEUE)) - { - queues.add(key); - } - else if (key.startsWith(TOPIC)) - { - topics.add(key); - } - else if (key.startsWith(DESTINATION)) - { - destinations.add(key); - } - else if (key.startsWith(CONNECTION_FACTORY)) - { - connectionFactories.add(key); - } - } - - printHeader(propertyFile); - printEntries(QUEUE, queues); - printEntries(TOPIC, topics); - printEntries(DESTINATION, destinations); - printEntries(CONNECTION_FACTORY, connectionFactories); - - } - catch (NamingException e) - { - exit("Unable to load JNDI Context due to:" + e.getMessage(), 1); - } - - } - - private void printHeader(String file) - { - print("JNDI file :" + file); - } - - private void printEntries(String type, List<String> list) - { - if (list.size() > 0) - { - String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1); - print(name + " elements in file:"); - printList(list); - print(""); - } - } - - private void printList(List<String> list) - { - for (String item : list) - { - String key = item.substring(item.indexOf('.') + 1); - - try - { - print(key, _context.lookup(key)); - } - catch (NamingException e) - { - exit("Error: item " + key + " no longer in context.", 1); - } - } - } - - private void print(String key, Object object) - { - if (object instanceof AMQDestination) - { - print(key + ":" + object); - } - else if (object instanceof AMQConnectionFactory) - { - AMQConnectionFactory factory = (AMQConnectionFactory) object; - print(key + ":Connection"); - print("ConnectionURL:"); - print(factory.getConnectionURL().toString()); - print("FailoverPolicy"); - print(new FailoverPolicy(factory.getConnectionURL(),null).toString()); - print(""); - } - } - - private void print(String msg) - { - System.out.println(msg); - } - -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java deleted file mode 100644 index e0e48519f3..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java +++ /dev/null @@ -1,450 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.text.DecimalFormat; - -import javax.jms.Connection; -import javax.jms.Session; - -import org.apache.qpid.client.AMQConnection; - -public class JVMArgConfiguration implements TestConfiguration -{ - /* - * By default the connection URL is used. - * This allows a user to easily specify a fully fledged URL any given property. - * Ex. SSL parameters - * - * By providing a host & port allows a user to simply override the URL. - * This allows to create multiple clients in test scripts easily, - * without having to deal with the long URL format. - */ - private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"; - - private String host = ""; - - private int port = -1; - - private String address = "queue; {create : always}"; - - private long timeout = 0; - - private int msg_size = 1024; - - private int random_msg_size_start_from = 1; - - private boolean cacheMessage = false; - - private boolean disableMessageID = false; - - private boolean disableTimestamp = false; - - private boolean durable = false; - - private int transaction_size = 0; - - private int ack_mode = Session.AUTO_ACKNOWLEDGE; - - private int msg_count = 10; - - private int warmup_count = 1; - - private boolean random_msg_size = false; - - private String msgType = "bytes"; - - private boolean printStdDev = false; - - private int sendRate = 0; - - private boolean externalController = false; - - private boolean useUniqueDest = false; // useful when using multiple connections. - - private int ackFrequency = 100; - - private DecimalFormat df = new DecimalFormat("###.##"); - - private int reportEvery = 0; - - private boolean isReportTotal = false; - - private boolean isReportHeader = true; - - private int sendEOS = 0; - - private int connectionCount = 1; - - private int rollbackFrequency = 0; - - private boolean printHeaders; - - private boolean printContent; - - private long ttl; - - private int priority; - - private String readyAddress; - - public JVMArgConfiguration() - { - - url = System.getProperty("url",url); - host = System.getProperty("host",""); - port = Integer.getInteger("port", -1); - address = System.getProperty("address",address); - - timeout = Long.getLong("timeout",0); - msg_size = Integer.getInteger("msg-size", 0); - cacheMessage = true; //Boolean.getBoolean("cache-msg"); - disableMessageID = Boolean.getBoolean("disable-message-id"); - disableTimestamp = Boolean.getBoolean("disable-timestamp"); - durable = Boolean.getBoolean("durable"); - transaction_size = Integer.getInteger("tx",1000); - ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE); - msg_count = Integer.getInteger("msg-count",msg_count); - warmup_count = Integer.getInteger("warmup-count",warmup_count); - random_msg_size = Boolean.getBoolean("random-msg-size"); - msgType = System.getProperty("msg-type","bytes"); - printStdDev = Boolean.getBoolean("print-std-dev"); - sendRate = Integer.getInteger("rate",0); - externalController = Boolean.getBoolean("ext-controller"); - useUniqueDest = Boolean.getBoolean("use-unique-dest"); - random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1); - reportEvery = Integer.getInteger("report-every",0); - isReportTotal = Boolean.getBoolean("report-total"); - isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header"); - sendEOS = Integer.getInteger("send-eos",1); - connectionCount = Integer.getInteger("con_count",1); - ackFrequency = Integer.getInteger("ack-frequency",100); - rollbackFrequency = Integer.getInteger("rollback-frequency",0); - printHeaders = Boolean.getBoolean("print-headers"); - printContent = Boolean.getBoolean("print-content"); - ttl = Long.getLong("ttl", 0); - priority = Integer.getInteger("priority", 0); - readyAddress = System.getProperty("ready-address"); - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getUrl() - */ - @Override - public String getUrl() - { - return url; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getHost() - */ - @Override - public String getHost() - { - return host; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getPort() - */ - @Override - public int getPort() - { - return port; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getAddress() - */ - @Override - public String getAddress() - { - return address; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getTimeout() - */ - @Override - public long getTimeout() - { - return timeout; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getAckMode() - */ - @Override - public int getAckMode() - { - return ack_mode; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getMsgCount() - */ - @Override - public int getMsgCount() - { - return msg_count; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getMsgSize() - */ - @Override - public int getMsgSize() - { - return msg_size; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom() - */ - @Override - public int getRandomMsgSizeStartFrom() - { - return random_msg_size_start_from; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isDurable() - */ - @Override - public boolean isDurable() - { - return durable; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isTransacted() - */ - @Override - public boolean isTransacted() - { - return transaction_size > 0; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize() - */ - @Override - public int getTransactionSize() - { - return transaction_size; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount() - */ - @Override - public int getWarmupCount() - { - return warmup_count; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage() - */ - @Override - public boolean isCacheMessage() - { - return cacheMessage; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID() - */ - @Override - public boolean isDisableMessageID() - { - return disableMessageID; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp() - */ - @Override - public boolean isDisableTimestamp() - { - return disableTimestamp; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize() - */ - @Override - public boolean isRandomMsgSize() - { - return random_msg_size; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getMessageType() - */ - @Override - public String getMessageType() - { - return msgType; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev() - */ - @Override - public boolean isPrintStdDev() - { - return printStdDev; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getSendRate() - */ - @Override - public int getSendRate() - { - return sendRate; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isExternalController() - */ - @Override - public boolean isExternalController() - { - return externalController; - } - - public void setAddress(String addr) - { - address = addr; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests() - */ - @Override - public boolean isUseUniqueDests() - { - return useUniqueDest; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency() - */ - @Override - public int getAckFrequency() - { - return ackFrequency; - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#createConnection() - */ - @Override - public Connection createConnection() throws Exception - { - if (getHost().equals("") || getPort() == -1) - { - return new AMQConnection(getUrl()); - } - else - { - return new AMQConnection(getHost(),getPort(),"guest","guest","test","test"); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat() - */ - @Override - public DecimalFormat getDecimalFormat() - { - return df; - } - - @Override - public int reportEvery() - { - return reportEvery; - } - - @Override - public boolean isReportTotal() - { - return isReportTotal; - } - - @Override - public boolean isReportHeader() - { - return isReportHeader; - } - - @Override - public int getSendEOS() - { - return sendEOS; - } - - @Override - public int getConnectionCount() - { - return connectionCount; - } - - @Override - public int getRollbackFrequency() - { - return rollbackFrequency; - } - - @Override - public boolean isPrintHeaders() - { - return printHeaders; - } - - @Override - public boolean isPrintContent() - { - return printContent; - } - - @Override - public long getTTL() - { - return ttl; - } - - @Override - public int getPriority() - { - return priority; - } - - @Override - public String getReadyAddress() - { - return readyAddress; - } -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java deleted file mode 100644 index 7ceef47573..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.net.InetAddress; -import java.util.UUID; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MapMessage; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.AMQSession_0_10; -import org.apache.qpid.messaging.Address; -import org.apache.qpid.tools.TestConfiguration.MessageType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MercuryBase -{ - private static final Logger _logger = LoggerFactory.getLogger(MercuryBase.class); - - public final static String CODE = "CODE"; - public final static String ID = "ID"; - public final static String REPLY_ADDR = "REPLY_ADDR"; - public final static String MAX_LATENCY = "MAX_LATENCY"; - public final static String MIN_LATENCY = "MIN_LATENCY"; - public final static String AVG_LATENCY = "AVG_LATENCY"; - public final static String STD_DEV = "STD_DEV"; - public final static String CONS_RATE = "CONS_RATE"; - public final static String PROD_RATE = "PROD_RATE"; - public final static String MSG_COUNT = "MSG_COUNT"; - public final static String TIMESTAMP = "Timestamp"; - - String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}"); - - TestConfiguration config; - Connection con; - Session session; - Session controllerSession; - Destination dest; - Destination myControlQueue; - Destination controllerQueue; - String id; - String myControlQueueAddr; - - MessageProducer sendToController; - MessageConsumer receiveFromController; - String prefix = ""; - - enum OPCode - { - REGISTER_CONSUMER, REGISTER_PRODUCER, - PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP, - CONSUMER_READY, PRODUCER_READY, - PRODUCER_START, - RECEIVED_END_MSG, CONSUMER_STOP, - RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS, - CONTINUE_TEST, STOP_TEST - }; - - MessageType msgType = MessageType.BYTES; - - public MercuryBase(TestConfiguration config,String prefix) - { - this.config = config; - String host = ""; - try - { - host = InetAddress.getLocalHost().getHostName(); - } - catch (Exception e) - { - } - id = host + "-" + UUID.randomUUID().toString(); - this.prefix = prefix; - this.myControlQueueAddr = id + ";{create: always}"; - } - - public void setUp() throws Exception - { - con = config.createConnection(); - con.start(); - - controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - - dest = createDestination(); - controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR, false); - myControlQueue = session.createQueue(myControlQueueAddr); - msgType = MessageType.getType(config.getMessageType()); - _logger.debug("Using " + msgType + " messages"); - - sendToController = controllerSession.createProducer(controllerQueue); - receiveFromController = controllerSession.createConsumer(myControlQueue); - } - - private Destination createDestination() throws Exception - { - if (config.isUseUniqueDests()) - { - _logger.debug("Prefix : " + prefix); - Address addr = Address.parse(config.getAddress()); - AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress(), false); - int type = ((AMQSession_0_10)session).resolveAddressType(temp); - - if ( type == AMQDestination.TOPIC_TYPE) - { - addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions()); - System.out.println("Setting subject : " + addr); - } - else - { - addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions()); - System.out.println("Setting name : " + addr); - } - - return AMQDestination.createDestination(addr.toString(), false); - } - else - { - return AMQDestination.createDestination(config.getAddress(), false); - } - } - - public synchronized void sendMessageToController(MapMessage m) throws Exception - { - m.setString(ID, id); - m.setString(REPLY_ADDR,myControlQueueAddr); - sendToController.send(m); - } - - public void receiveFromController(OPCode expected) throws Exception - { - MapMessage m = (MapMessage)receiveFromController.receive(); - OPCode code = OPCode.values()[m.getInt(CODE)]; - _logger.debug("Received Code : " + code); - if (expected != code) - { - throw new Exception("Expected OPCode : " + expected + " but received : " + code); - } - - } - - public boolean continueTest() throws Exception - { - MapMessage m = (MapMessage)receiveFromController.receive(); - OPCode code = OPCode.values()[m.getInt(CODE)]; - _logger.debug("Received Code : " + code); - return (code == OPCode.CONTINUE_TEST); - } - - public void tearDown() throws Exception - { - session.close(); - controllerSession.close(); - con.close(); - } - - public void handleError(Exception e,String msg) - { - StringBuilder sb = new StringBuilder(); - sb.append(msg); - sb.append(" "); - sb.append(e.getMessage()); - System.err.println(sb.toString()); - e.printStackTrace(); - } -} - diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java deleted file mode 100644 index b35adc45d6..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; - -import org.apache.qpid.thread.Threading; -import org.apache.qpid.tools.report.MercuryReporter; -import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughputAndLatency; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * PerfConsumer will receive x no of messages in warmup mode. - * Once it receives the Start message it will then signal the PerfProducer. - * It will start recording stats from the first message it receives after - * the warmup mode is done. - * - * The following calculations are done. - * The important numbers to look at is - * a) Avg Latency - * b) System throughput. - * - * Latency. - * ========= - * Currently this test is written with the assumption that either - * a) The Perf Producer and Consumer are on the same machine - * b) They are on separate machines that have their time synced via a Time Server - * - * In order to calculate latency the producer inserts a timestamp - * when the message is sent. The consumer will note the current time the message is - * received and will calculate the latency as follows - * latency = rcvdTime - msg.getJMSTimestamp() - * - * Through out the test it will keep track of the max and min latency to show the - * variance in latencies. - * - * Avg latency is measured by adding all latencies and dividing by the total msgs. - * - * Throughput - * =========== - * Consumer rate is calculated as - * rcvdMsgCount/(rcvdTime - startTime) - * - * Note that the testStartTime referes to when the producer sent the first message - * and startTime is when the consumer first received a message. - * - * rcvdTime keeps track of when the last message is received. - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - */ - -public class MercuryConsumerController extends MercuryBase -{ - private static final Logger _logger = LoggerFactory.getLogger(MercuryConsumerController.class); - MercuryReporter reporter; - TestConfiguration config; - QpidReceive receiver; - - public MercuryConsumerController(TestConfiguration config, MercuryReporter reporter, String prefix) - { - super(config,prefix); - this.reporter = reporter; - if (_logger.isInfoEnabled()) - { - _logger.info("Consumer ID : " + id); - } - } - - public void setUp() throws Exception - { - super.setUp(); - receiver = new QpidReceive(reporter,config, con,dest); - receiver.setUp(); - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal()); - sendMessageToController(m); - } - - public void warmup()throws Exception - { - receiveFromController(OPCode.CONSUMER_STARTWARMUP); - receiver.waitforCompletion(config.getWarmupCount()); - - // It's more realistic for the consumer to signal this. - MapMessage m1 = controllerSession.createMapMessage(); - m1.setInt(CODE, OPCode.PRODUCER_READY.ordinal()); - sendMessageToController(m1); - - MapMessage m2 = controllerSession.createMapMessage(); - m2.setInt(CODE, OPCode.CONSUMER_READY.ordinal()); - sendMessageToController(m2); - } - - public void runReceiver() throws Exception - { - if (_logger.isInfoEnabled()) - { - _logger.info("Consumer: " + id + " Starting iteration......" + "\n"); - } - resetCounters(); - receiver.waitforCompletion(config.getMsgCount()); - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal()); - sendMessageToController(m); - } - - public void resetCounters() - { - reporter.clear(); - } - - public void sendResults() throws Exception - { - receiveFromController(OPCode.CONSUMER_STOP); - reporter.report(); - - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal()); - m.setDouble(AVG_LATENCY, reporter.getAvgLatency()); - m.setDouble(MIN_LATENCY, reporter.getMinLatency()); - m.setDouble(MAX_LATENCY, reporter.getMaxLatency()); - m.setDouble(STD_DEV, reporter.getStdDev()); - m.setDouble(CONS_RATE, reporter.getRate()); - m.setLong(MSG_COUNT, reporter.getSampleSize()); - sendMessageToController(m); - - reporter.log(new StringBuilder("Total Msgs Received : ").append(reporter.getSampleSize()).toString()); - reporter.log(new StringBuilder("Consumer rate : "). - append(config.getDecimalFormat().format(reporter.getRate())). - append(" msg/sec").toString()); - reporter.log(new StringBuilder("Avg Latency : "). - append(config.getDecimalFormat().format(reporter.getAvgLatency())). - append(" ms").toString()); - reporter.log(new StringBuilder("Min Latency : "). - append(config.getDecimalFormat().format(reporter.getMinLatency())). - append(" ms").toString()); - reporter.log(new StringBuilder("Max Latency : "). - append(config.getDecimalFormat().format(reporter.getMaxLatency())). - append(" ms").toString()); - if (config.isPrintStdDev()) - { - reporter.log(new StringBuilder("Std Dev : "). - append(reporter.getStdDev()).toString()); - } - } - - public void run() - { - try - { - setUp(); - warmup(); - boolean nextIteration = true; - while (nextIteration) - { - System.out.println("=========================================================\n"); - System.out.println("Consumer: " + id + " starting a new iteration ......\n"); - runReceiver(); - sendResults(); - nextIteration = continueTest(); - } - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - @Override - public void tearDown() throws Exception - { - super.tearDown(); - } - - public static void main(String[] args) throws Exception - { - TestConfiguration config = new JVMArgConfiguration(); - MercuryReporter reporter= new MercuryReporter(MercuryThroughputAndLatency.class,System.out,10,true); - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = config.getConnectionCount(); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) - { - final MercuryConsumerController cons = new MercuryConsumerController(config, reporter, scriptId + i); - Runnable r = new Runnable() - { - public void run() - { - cons.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); - } - testCompleted.await(); - reporter.log("Consumers have completed the test......\n"); - } -}
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java deleted file mode 100644 index 02377bb853..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; - -import org.apache.qpid.thread.Threading; -import org.apache.qpid.tools.report.MercuryReporter; -import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughput; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y no of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * This is done with the assumption that both consumer and producer are running on - * the same machine or different machines which have time synced using a time server. - * - * This test also calculates the producer rate as follows. - * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - * Rajith - Producer rate is not an accurate perf metric IMO. - * It is heavily inlfuenced by any in memory buffering. - * System throughput and latencies calculated by the PerfConsumer are more realistic - * numbers. - * - * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs - * I have done so far, it seems quite useful to compute the producer rate as it gives an - * indication of how the system behaves. For ex if there is a gap between producer and consumer rates - * you could clearly see the higher latencies and when producer and consumer rates are very close, - * latency is good. - * - */ -public class MercuryProducerController extends MercuryBase -{ - private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class); - MercuryReporter reporter; - QpidSend sender; - - public MercuryProducerController(TestConfiguration config, MercuryReporter reporter, String prefix) - { - super(config,prefix); - this.reporter = reporter; - System.out.println("Producer ID : " + id); - } - - public void setUp() throws Exception - { - super.setUp(); - sender = new QpidSend(reporter,config, con,dest); - sender.setUp(); - MapMessage m = controllerSession.createMapMessage(); - m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal()); - sendMessageToController(m); - } - - public void warmup()throws Exception - { - receiveFromController(OPCode.PRODUCER_STARTWARMUP); - if (_logger.isInfoEnabled()) - { - _logger.info("Producer: " + id + " Warming up......"); - } - sender.send(config.getWarmupCount()); - sender.sendEndMessage(); - } - - public void runSender() throws Exception - { - resetCounters(); - receiveFromController(OPCode.PRODUCER_START); - sender.send(config.getMsgCount()); - } - - public void resetCounters() - { - sender.resetCounters(); - } - - public void sendResults() throws Exception - { - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal()); - msg.setDouble(PROD_RATE, reporter.getRate()); - sendMessageToController(msg); - reporter.log(new StringBuilder("Producer rate: "). - append(config.getDecimalFormat().format(reporter.getRate())). - append(" msg/sec"). - toString()); - } - - @Override - public void tearDown() throws Exception - { - sender.tearDown(); - super.tearDown(); - } - - public void run() - { - try - { - setUp(); - warmup(); - boolean nextIteration = true; - while (nextIteration) - { - if(_logger.isInfoEnabled()) - { - _logger.info("=========================================================\n"); - _logger.info("Producer: " + id + " starting a new iteration ......\n"); - } - runSender(); - sendResults(); - nextIteration = continueTest(); - } - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - public void startControllerIfNeeded() - { - if (!config.isExternalController()) - { - final MercuryTestController controller = new MercuryTestController(config); - Runnable r = new Runnable() - { - public void run() - { - controller.run(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating controller thread",e); - } - t.start(); - } - } - - public static void main(String[] args) throws Exception - { - TestConfiguration config = new JVMArgConfiguration(); - MercuryReporter reporter= new MercuryReporter(MercuryThroughput.class,System.out,10,true); - String scriptId = (args.length == 1) ? args[0] : ""; - int conCount = config.getConnectionCount(); - final CountDownLatch testCompleted = new CountDownLatch(conCount); - for (int i=0; i < conCount; i++) - { - final MercuryProducerController prod = new MercuryProducerController(config, reporter, scriptId + i); - prod.startControllerIfNeeded(); - Runnable r = new Runnable() - { - public void run() - { - prod.run(); - testCompleted.countDown(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); - } - testCompleted.await(); - reporter.log("Producers have completed the test......"); - } -}
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java deleted file mode 100644 index 8c66a1e44d..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java +++ /dev/null @@ -1,450 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.io.FileWriter; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; - -import javax.jms.MapMessage; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; - -import org.apache.qpid.client.message.AMQPEncodedMapMessage; -import org.apache.qpid.tools.report.Reporter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The Controller coordinates a test run between a number - * of producers and consumers, configured via -Dprod_count and -Dcons_count. - * - * It waits till all the producers and consumers have registered and then - * conducts a warmup run. Once all consumers and producers have completed - * the warmup run and is ready, it will conduct the actual test run and - * collect all stats from the participants and calculates the system - * throughput, the avg/min/max for producer rates, consumer rates and latency. - * - * These stats are then printed to std out. - * The Controller also prints events to std out to give a running account - * of the test run in progress. Ex registering of participants, starting warmup ..etc. - * This allows a scripting tool to monitor the progress. - * - * The Controller can be run in two modes. - * 1. A single test run (default) where it just runs until the message count specified - * for the producers via -Dmsg_count is sent and received. - * - * 2. Time based, configured via -Dduration=x, where x is in mins. - * In this mode, the Controller repeatedly cycles through the tests (after an initial - * warmup run) until the desired time is reached. If a test run is in progress - * and the time is up, it will allow the run the complete. - * - * After each iteration, the stats will be printed out in csv format to a separate log file. - * System throughput is calculated as follows - * totalMsgCount/(totalTestTime) - */ -public class MercuryTestController extends MercuryBase implements MessageListener -{ - private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class); - - enum TestMode { SINGLE_RUN, TIME_BASED }; - - TestMode testMode = TestMode.SINGLE_RUN; - - long totalTestTime; - - private double avgSystemLatency = 0.0; - private double minSystemLatency = Double.MAX_VALUE; - private double maxSystemLatency = 0; - private double avgSystemLatencyStdDev = 0.0; - - private double avgSystemConsRate = 0.0; - private double maxSystemConsRate = 0.0; - private double minSystemConsRate = Double.MAX_VALUE; - - private double avgSystemProdRate = 0.0; - private double maxSystemProdRate = 0.0; - private double minSystemProdRate = Double.MAX_VALUE; - - private long totalMsgCount = 0; - private double totalSystemThroughput = 0.0; - - private int consumerCount = Integer.getInteger("cons_count", 1); - private int producerCount = Integer.getInteger("prod_count", 1); - private int duration = Integer.getInteger("duration", -1); // in mins - private Map<String,MapMessage> consumers; - private Map<String,MapMessage> producers; - - private CountDownLatch consRegistered; - private CountDownLatch prodRegistered; - private CountDownLatch consReady; - private CountDownLatch prodReady; - private CountDownLatch receivedEndMsg; - private CountDownLatch receivedConsStats; - private CountDownLatch receivedProdStats; - - private MessageConsumer consumer; - private boolean printStdDev = false; - private FileWriter writer; - private Reporter report; - - public MercuryTestController(TestConfiguration config) - { - super(config,""); - - consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount); - producers = new ConcurrentHashMap<String,MapMessage>(producerCount); - - consRegistered = new CountDownLatch(consumerCount); - prodRegistered = new CountDownLatch(producerCount); - consReady = new CountDownLatch(consumerCount); - prodReady = new CountDownLatch(producerCount); - printStdDev = config.isPrintStdDev(); - testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED; - } - - public void setUp() throws Exception - { - super.setUp(); - if (testMode == TestMode.TIME_BASED) - { - writer = new FileWriter("stats-csv.log"); - } - consumer = controllerSession.createConsumer(controllerQueue); - report.log("\nController: " + producerCount + " producers are expected"); - report.log("Controller: " + consumerCount + " consumers are expected \n"); - consumer.setMessageListener(this); - consRegistered.await(); - prodRegistered.await(); - report.log("\nController: All producers and consumers have registered......\n"); - } - - public void warmup() throws Exception - { - report.log("Controller initiating warm up sequence......"); - sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values()); - sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values()); - prodReady.await(); - consReady.await(); - report.log("\nController : All producers and consumers are ready to start the test......\n"); - } - - public void startTest() throws Exception - { - resetCounters(); - report.log("\nController Starting test......"); - long start = Clock.getTime(); - sendMessageToNodes(OPCode.PRODUCER_START,producers.values()); - receivedEndMsg.await(); - totalTestTime = Clock.getTime() - start; - sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values()); - receivedProdStats.await(); - receivedConsStats.await(); - } - - public void resetCounters() - { - minSystemLatency = Double.MAX_VALUE; - maxSystemLatency = 0; - maxSystemConsRate = 0.0; - minSystemConsRate = Double.MAX_VALUE; - maxSystemProdRate = 0.0; - minSystemProdRate = Double.MAX_VALUE; - - totalMsgCount = 0; - - receivedConsStats = new CountDownLatch(consumerCount); - receivedProdStats = new CountDownLatch(producerCount); - receivedEndMsg = new CountDownLatch(producerCount); - } - - public void calcStats() throws Exception - { - double totLatency = 0.0; - double totStdDev = 0.0; - double totalConsRate = 0.0; - double totalProdRate = 0.0; - - MapMessage conStat = null; // for error handling - try - { - for (MapMessage m: consumers.values()) - { - conStat = m; - minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY)); - maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY)); - totLatency = totLatency + m.getDouble(AVG_LATENCY); - totStdDev = totStdDev + m.getDouble(STD_DEV); - - minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE)); - maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE)); - totalConsRate = totalConsRate + m.getDouble(CONS_RATE); - - totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT); - } - } - catch(Exception e) - { - System.err.println("Error calculating stats from Consumer : " + conStat); - } - - - MapMessage prodStat = null; // for error handling - try - { - for (MapMessage m: producers.values()) - { - prodStat = m; - minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE)); - maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE)); - totalProdRate = totalProdRate + m.getDouble(PROD_RATE); - } - } - catch(Exception e) - { - System.err.println("Error calculating stats from Producer : " + conStat); - } - - avgSystemLatency = totLatency/consumers.size(); - avgSystemLatencyStdDev = totStdDev/consumers.size(); - avgSystemConsRate = totalConsRate/consumers.size(); - avgSystemProdRate = totalProdRate/producers.size(); - - report.log("Total test time : " + totalTestTime + " in " + Clock.getPrecision()); - - totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime); - } - - public void printResults() throws Exception - { - report.log(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString()); - report.log(new StringBuilder("System Throughput : "). - append(config.getDecimalFormat().format(totalSystemThroughput)). - append(" msg/sec").toString()); - report.log(new StringBuilder("Avg Consumer rate : "). - append(config.getDecimalFormat().format(avgSystemConsRate)). - append(" msg/sec").toString()); - report.log(new StringBuilder("Min Consumer rate : "). - append(config.getDecimalFormat().format(minSystemConsRate)). - append(" msg/sec").toString()); - report.log(new StringBuilder("Max Consumer rate : "). - append(config.getDecimalFormat().format(maxSystemConsRate)). - append(" msg/sec").toString()); - - report.log(new StringBuilder("Avg Producer rate : "). - append(config.getDecimalFormat().format(avgSystemProdRate)). - append(" msg/sec").toString()); - report.log(new StringBuilder("Min Producer rate : "). - append(config.getDecimalFormat().format(minSystemProdRate)). - append(" msg/sec").toString()); - report.log(new StringBuilder("Max Producer rate : "). - append(config.getDecimalFormat().format(maxSystemProdRate)). - append(" msg/sec").toString()); - - report.log(new StringBuilder("Avg System Latency : "). - append(config.getDecimalFormat().format(avgSystemLatency)). - append(" ms").toString()); - report.log(new StringBuilder("Min System Latency : "). - append(config.getDecimalFormat().format(minSystemLatency)). - append(" ms").toString()); - report.log(new StringBuilder("Max System Latency : "). - append(config.getDecimalFormat().format(maxSystemLatency)). - append(" ms").toString()); - if (printStdDev) - { - report.log(new StringBuilder("Avg System Std Dev : "). - append(avgSystemLatencyStdDev).toString()); - } - } - - private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception - { - report.log("\nController: Sending code " + code); - MessageProducer tmpProd = controllerSession.createProducer(null); - MapMessage msg = controllerSession.createMapMessage(); - msg.setInt(CODE, code.ordinal()); - for (MapMessage node : nodes) - { - if (node.getString(REPLY_ADDR) == null) - { - report.log("REPLY_ADDR is null " + node); - } - else - { - report.log("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR)); - } - tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg); - } - } - - public void onMessage(Message msg) - { - try - { - MapMessage m = (MapMessage)msg; - OPCode code = OPCode.values()[m.getInt(CODE)]; - - report.log("\n---------Controller Received Code : " + code); - report.log("---------Data : " + ((AMQPEncodedMapMessage)m).getMap()); - - switch (code) - { - case REGISTER_CONSUMER : - if (consRegistered.getCount() == 0) - { - report.log("Warning : Expected number of consumers have already registered," + - "ignoring extra consumer"); - break; - } - consumers.put(m.getString(ID),m); - consRegistered.countDown(); - break; - - case REGISTER_PRODUCER : - if (prodRegistered.getCount() == 0) - { - report.log("Warning : Expected number of producers have already registered," + - "ignoring extra producer"); - break; - } - producers.put(m.getString(ID),m); - prodRegistered.countDown(); - break; - - case CONSUMER_READY : - consReady.countDown(); - break; - - case PRODUCER_READY : - prodReady.countDown(); - break; - - case RECEIVED_END_MSG : - receivedEndMsg.countDown(); - break; - - case RECEIVED_CONSUMER_STATS : - consumers.put(m.getString(ID),m); - receivedConsStats.countDown(); - break; - - case RECEIVED_PRODUCER_STATS : - producers.put(m.getString(ID),m); - receivedProdStats.countDown(); - break; - - default: - throw new Exception("Invalid OPCode " + code); - } - } - catch (Exception e) - { - handleError(e,"Error when receiving messages " + msg); - } - } - - public void run() - { - try - { - setUp(); - warmup(); - if (testMode == TestMode.SINGLE_RUN) - { - startTest(); - calcStats(); - printResults(); - } - else - { - long startTime = Clock.getTime(); - long timeLimit = duration * 60 * 1000; // duration is in mins. - boolean nextIteration = true; - while (nextIteration) - { - startTest(); - calcStats(); - writeStatsToFile(); - if (Clock.getTime() - startTime < timeLimit) - { - sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values()); - sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values()); - nextIteration = true; - } - else - { - nextIteration = false; - } - } - } - tearDown(); - - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - @Override - public void tearDown() throws Exception { - report.log("Controller: Completed the test......\n"); - if (testMode == TestMode.TIME_BASED) - { - writer.close(); - } - sendMessageToNodes(OPCode.STOP_TEST,consumers.values()); - sendMessageToNodes(OPCode.STOP_TEST,producers.values()); - super.tearDown(); - } - - public void writeStatsToFile() throws Exception - { - writer.append(String.valueOf(totalMsgCount)).append(","); - writer.append(config.getDecimalFormat().format(totalSystemThroughput)).append(","); - writer.append(config.getDecimalFormat().format(avgSystemConsRate)).append(","); - writer.append(config.getDecimalFormat().format(minSystemConsRate)).append(","); - writer.append(config.getDecimalFormat().format(maxSystemConsRate)).append(","); - writer.append(config.getDecimalFormat().format(avgSystemProdRate)).append(","); - writer.append(config.getDecimalFormat().format(minSystemProdRate)).append(","); - writer.append(config.getDecimalFormat().format(maxSystemProdRate)).append(","); - writer.append(config.getDecimalFormat().format(avgSystemLatency)).append(","); - writer.append(config.getDecimalFormat().format(minSystemLatency)).append(","); - writer.append(config.getDecimalFormat().format(maxSystemLatency)); - if (printStdDev) - { - writer.append(",").append(String.valueOf(avgSystemLatencyStdDev)); - } - writer.append("\n"); - writer.flush(); - } - - public static void main(String[] args) - { - TestConfiguration config = new JVMArgConfiguration(); - MercuryTestController controller = new MercuryTestController(config); - controller.run(); - } -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java deleted file mode 100644 index a0ba928e1f..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; -import javax.jms.TextMessage; - -public class MessageFactory -{ - public static Message createBytesMessage(Session ssn, int size) throws JMSException - { - BytesMessage msg = ssn.createBytesMessage(); - msg.writeBytes(createMessagePayload(size).getBytes()); - return msg; - } - - public static Message createTextMessage(Session ssn, int size) throws JMSException - { - TextMessage msg = ssn.createTextMessage(); - msg.setText(createMessagePayload(size)); - return msg; - } - - public static String createMessagePayload(int size) - { - String msgData = "Qpid Test Message"; - - StringBuffer buf = new StringBuffer(size); - int count = 0; - while (count <= (size - msgData.length())) - { - buf.append(msgData); - count += msgData.length(); - } - if (count < size) - { - buf.append(msgData, 0, size - count); - } - - return buf.toString(); - } -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java deleted file mode 100644 index e2d179965b..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java +++ /dev/null @@ -1,904 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import static org.apache.qpid.tools.QpidBench.Mode.BOTH; -import static org.apache.qpid.tools.QpidBench.Mode.CONSUME; -import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH; - -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.ExchangeBind; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageDeliveryMode; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageSubscribe; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.QueueDeclare; -import org.apache.qpid.transport.SessionException; -import org.apache.qpid.transport.SessionListener; -import org.apache.qpid.util.UUIDGen; -import org.apache.qpid.util.UUIDs; - -/** - * QpidBench - * - */ - -public class QpidBench -{ - - static enum Mode - { - PUBLISH, CONSUME, BOTH - } - - private static class Options - { - private StringBuilder usage = new StringBuilder("qpid-bench <options>"); - - void usage(String name, String description, Object def) - { - String defval = ""; - if (def != null) - { - defval = String.format(" (%s)", def); - } - usage.append(String.format("\n %-15s%-14s %s", name, defval, description)); - } - - public String broker = "localhost"; - public int port = 5672; - public long count = 1000000; - public long window = 100000; - public long sample = window; - public int size = 1024; - public Mode mode = BOTH; - public boolean timestamp = false; - public boolean message_id = false; - public boolean message_cache = false; - public boolean persistent = false; - public boolean jms_publish = false; - public boolean jms_consume = false; - public boolean help = false; - - { - usage("-b, --broker", "the broker hostname", broker); - } - - public void parse__broker(String b) - { - this.broker = b; - } - - public void parse_b(String b) - { - parse__broker(b); - } - - { - usage("-p, --port", "the broker port", port); - } - - public void parse__port(String p) - { - this.port = Integer.parseInt(p); - } - - public void parse_p(String p) - { - parse__port(p); - } - - { - usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count); - } - - public void parse__count(String c) - { - this.count = Long.parseLong(c); - } - - public void parse_c(String c) - { - parse__count(c); - } - - { - usage("-w, --window", "the number of messages to send before blocking, 0 disables", window); - } - - public void parse__window(String w) - { - this.window = Long.parseLong(w); - } - - public void parse_w(String w) - { - parse__window(w); - } - - { - usage("--sample", "print stats after this many messages, 0 disables", sample); - } - - public void parse__sample(String s) - { - this.sample = Long.parseLong(s); - } - - { - usage("-i, --interval", "sets both --window and --sample", window); - } - - public void parse__interval(String i) - { - this.window = Long.parseLong(i); - this.sample = window; - } - - public void parse_i(String i) - { - parse__interval(i); - } - - { - usage("-s, --size", "the message size", size); - } - - public void parse__size(String s) - { - this.size = Integer.parseInt(s); - } - - public void parse_s(String s) - { - parse__size(s); - } - - { - usage("-m, --mode", "one of publish, consume, or both", mode); - } - - public void parse__mode(String m) - { - if (m.equalsIgnoreCase("publish")) - { - this.mode = PUBLISH; - } - else if (m.equalsIgnoreCase("consume")) - { - this.mode = CONSUME; - } - else if (m.equalsIgnoreCase("both")) - { - this.mode = BOTH; - } - else - { - throw new IllegalArgumentException - ("must be one of 'publish', 'consume', or 'both'"); - } - } - - public void parse_m(String m) - { - parse__mode(m); - } - - { - usage("--timestamp", "set timestamps on each message if true", timestamp); - } - - public void parse__timestamp(String t) - { - this.timestamp = Boolean.parseBoolean(t); - } - - { - usage("--mesage-id", "set the message-id on each message if true", message_id); - } - - public void parse__message_id(String m) - { - this.message_id = Boolean.parseBoolean(m); - } - - { - usage("--message-cache", "reuse the same message for each send if true", message_cache); - } - - public void parse__message_cache(String c) - { - this.message_cache = Boolean.parseBoolean(c); - } - - { - usage("--persistent", "set the delivery-mode to persistent if true", persistent); - } - - public void parse__persistent(String p) - { - this.persistent = Boolean.parseBoolean(p); - } - - { - usage("--jms-publish", "use the jms client for publish", jms_publish); - } - - public void parse__jms_publish(String jp) - { - this.jms_publish = Boolean.parseBoolean(jp); - } - - { - usage("--jms-consume", "use the jms client for consume", jms_consume); - } - - public void parse__jms_consume(String jc) - { - this.jms_consume = Boolean.parseBoolean(jc); - } - - { - usage("--jms", "sets both --jms-publish and --jms-consume", false); - } - - public void parse__jms(String j) - { - this.jms_publish = this.jms_consume = Boolean.parseBoolean(j); - } - - { - usage("-h, --help", "prints this message", null); - } - - public void parse__help() - { - this.help = true; - } - - public void parse_h() - { - parse__help(); - } - - public String parse(String ... args) - { - Class klass = getClass(); - List<String> arguments = new ArrayList<String>(); - for (int i = 0; i < args.length; i++) - { - String option = args[i]; - - if (!option.startsWith("-")) - { - arguments.add(option); - continue; - } - - String method = "parse" + option.replace('-', '_'); - try - { - try - { - Method parser = klass.getMethod(method); - parser.invoke(this); - } - catch (NoSuchMethodException e) - { - try - { - Method parser = klass.getMethod(method, String.class); - - String value = null; - if (i + 1 < args.length) - { - value = args[i+1]; - i++; - } - else - { - return option + " requires a value"; - } - - parser.invoke(this, value); - } - catch (NoSuchMethodException e2) - { - return "no such option: " + option; - } - } - } - catch (InvocationTargetException e) - { - Throwable t = e.getCause(); - return String.format - ("error parsing %s: %s: %s", option, t.getClass().getName(), - t.getMessage()); - } - catch (IllegalAccessException e) - { - throw new RuntimeException - ("unable to access parse method: " + option, e); - } - } - - return parseArguments(arguments); - } - - public String parseArguments(List<String> arguments) - { - if (arguments.size() > 0) - { - String args = arguments.toString(); - return "unrecognized arguments: " + args.substring(1, args.length() - 1); - } - else - { - return null; - } - } - - public String toString() - { - Class klass = getClass(); - Field[] fields = klass.getFields(); - StringBuilder str = new StringBuilder(); - for (int i = 0; i < fields.length; i++) - { - if (i > 0) - { - str.append("\n"); - } - - String name = fields[i].getName(); - str.append(name); - str.append(" = "); - Object value; - try - { - value = fields[i].get(this); - } - catch (IllegalAccessException e) - { - throw new RuntimeException - ("unable to access field: " + name, e); - } - str.append(value); - } - - return str.toString(); - } - } - - public static final void main(String[] args) throws Exception - { - final Options opts = new Options(); - String error = opts.parse(args); - if (error != null) - { - System.err.println(error); - System.exit(-1); - return; - } - - if (opts.help) - { - System.out.println(opts.usage); - return; - } - - System.out.println(opts); - - switch (opts.mode) - { - case CONSUME: - case BOTH: - Runnable r = new Runnable() - { - public void run() - { - try - { - if (opts.jms_consume) - { - jms_consumer(opts); - } - else - { - native_consumer(opts); - } - } - catch (Exception e) - { - throw new RuntimeException(e); - } - System.out.println("Consumer Completed"); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); - break; - } - - switch (opts.mode) - { - case PUBLISH: - case BOTH: - Runnable r = new Runnable() - { - public void run() - { - try - { - if (opts.jms_publish) - { - jms_publisher(opts); - } - else - { - native_publisher(opts); - } - } - catch (Exception e) - { - throw new RuntimeException(e); - } - System.out.println("Producer Completed"); - } - }; - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating publisher thread",e); - } - t.start(); - break; - } - } - - private static enum Column - { - LEFT, RIGHT - } - - private static final void sample(Options opts, Column col, String name, long count, - long start, long time, long lastTime) - { - String pfx = ""; - String sfx = ""; - if (opts.mode == BOTH) - { - if (col == Column.RIGHT) - { - pfx = " -- "; - } - else - { - sfx = " --"; - } - } - - if (count == 0) - { - String stats = String.format("%s: %tc", name, start); - System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); - return; - } - - double cumulative = 1000 * (double) count / (double) (time - start); - double interval = 1000 * ((double) opts.sample / (double) (time - lastTime)); - - String stats = String.format - ("%s: %d %.2f %.2f", name, count, cumulative, interval); - System.out.println(String.format("%s%-36s%s", pfx, stats, sfx)); - } - - private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception - { - String url = String.format - ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'", - opts.broker, opts.port); - return new AMQConnection(url); - } - - private static final void jms_publisher(Options opts) throws Exception - { - javax.jms.Connection conn = getJMSConnection(opts); - - javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); - Destination dest = ssn.createQueue("test-queue"); - Destination echo_dest = ssn.createQueue("echo-queue"); - MessageProducer prod = ssn.createProducer(dest); - MessageConsumer cons = ssn.createConsumer(echo_dest); - prod.setDisableMessageID(!opts.message_id); - prod.setDisableMessageTimestamp(!opts.timestamp); - prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - StringBuilder str = new StringBuilder(); - for (int i = 0; i < opts.size; i++) - { - str.append((char) (i % 128)); - } - - String body = str.toString(); - - TextMessage cached = ssn.createTextMessage(); - cached.setText(body); - - conn.start(); - - long count = 0; - long lastTime = 0; - long start = System.currentTimeMillis(); - while (opts.count == 0 || count < opts.count) - { - if (opts.window > 0 && (count % opts.window) == 0 && count > 0) - { - Message echo = cons.receive(); - } - - if (opts.sample > 0 && (count % opts.sample) == 0) - { - long time = System.currentTimeMillis(); - sample(opts, Column.LEFT, "JP", count, start, time, lastTime); - lastTime = time; - } - - TextMessage m; - if (opts.message_cache) - { - m = cached; - } - else - { - m = ssn.createTextMessage(); - m.setText(body); - } - - prod.send(m); - count++; - } - - conn.close(); - } - - private static final void jms_consumer(final Options opts) throws Exception - { - final javax.jms.Connection conn = getJMSConnection(opts); - javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); - Destination dest = ssn.createQueue("test-queue"); - Destination echo_dest = ssn.createQueue("echo-queue"); - MessageConsumer cons = ssn.createConsumer(dest); - final MessageProducer prod = ssn.createProducer(echo_dest); - prod.setDisableMessageID(true); - prod.setDisableMessageTimestamp(true); - prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - final TextMessage echo = ssn.createTextMessage(); - echo.setText("ECHO"); - - final Object done = new Object(); - cons.setMessageListener(new MessageListener() - { - private long count = 0; - private long lastTime = 0; - private long start; - - public void onMessage(Message m) - { - if (count == 0) - { - start = System.currentTimeMillis(); - } - - try - { - boolean sample = opts.sample > 0 && (count % opts.sample) == 0; - long time = sample ? System.currentTimeMillis() : 0; - - if (opts.window > 0 && (count % opts.window) == 0) - { - prod.send(echo); - } - - if (sample) - { - sample(opts, Column.RIGHT, "JC", count, start, time, lastTime); - lastTime = time; - } - } - catch (JMSException e) - { - throw new RuntimeException(e); - } - count++; - - if (opts.count > 0 && count >= opts.count) - { - synchronized (done) - { - done.notify(); - } - } - } - }); - - conn.start(); - synchronized (done) - { - done.wait(); - } - conn.close(); - } - - private static final org.apache.qpid.transport.Connection getConnection - (Options opts) - { - org.apache.qpid.transport.Connection conn = - new org.apache.qpid.transport.Connection(); - conn.connect(opts.broker, opts.port, null, "guest", "guest", false, null); - return conn; - } - - private static abstract class NativeListener implements SessionListener - { - - public void opened(org.apache.qpid.transport.Session ssn) {} - - public void resumed(org.apache.qpid.transport.Session ssn) {} - - public void exception(org.apache.qpid.transport.Session ssn, - SessionException exc) - { - exc.printStackTrace(); - } - - public void closed(org.apache.qpid.transport.Session ssn) {} - - } - - private static final void native_publisher(Options opts) throws Exception - { - final long[] echos = { 0 }; - org.apache.qpid.transport.Connection conn = getConnection(opts); - org.apache.qpid.transport.Session ssn = conn.createSession(); - ssn.setSessionListener(new NativeListener() - { - public void message(org.apache.qpid.transport.Session ssn, - MessageTransfer xfr) - { - synchronized (echos) - { - echos[0]++; - echos.notify(); - } - ssn.processed(xfr); - } - }); - - ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); - ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); - - MessageProperties cached_mp = new MessageProperties(); - DeliveryProperties cached_dp = new DeliveryProperties(); - cached_dp.setRoutingKey("test-queue"); - cached_dp.setDeliveryMode - (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); - - int size = opts.size; - ByteBuffer body = ByteBuffer.allocate(size); - for (int i = 0; i < size; i++) - { - body.put((byte) i); - } - body.flip(); - - ssn.invoke(new MessageSubscribe() - .queue("echo-queue") - .destination("echo-queue") - .acceptMode(MessageAcceptMode.NONE) - .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); - ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW); - ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); - ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); - - UUIDGen gen = UUIDs.newGenerator(); - - long count = 0; - long lastTime = 0; - long start = System.currentTimeMillis(); - while (opts.count == 0 || count < opts.count) - { - if (opts.window > 0 && (count % opts.window) == 0 && count > 0) - { - synchronized (echos) - { - while (echos[0] < (count/opts.window)) - { - echos.wait(); - } - } - } - - if (opts.sample > 0 && (count % opts.sample) == 0) - { - long time = System.currentTimeMillis(); - sample(opts, Column.LEFT, "NP", count, start, time, lastTime); - lastTime = time; - } - - MessageProperties mp; - DeliveryProperties dp; - if (opts.message_cache) - { - mp = cached_mp; - dp = cached_dp; - } - else - { - mp = new MessageProperties(); - dp = new DeliveryProperties(); - dp.setRoutingKey("test-queue"); - dp.setDeliveryMode - (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT); - - } - - if (opts.message_id) - { - mp.setMessageId(gen.generate()); - } - - if (opts.timestamp) - { - dp.setTimestamp(System.currentTimeMillis()); - } - - ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED, - new Header(dp, mp), body.slice()); - count++; - } - - ssn.messageCancel("echo-queue"); - - ssn.sync(); - ssn.close(); - conn.close(); - } - - private static final void native_consumer(final Options opts) throws Exception - { - final DeliveryProperties dp = new DeliveryProperties(); - final byte[] echo = new byte[0]; - dp.setRoutingKey("echo-queue"); - dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); - final MessageProperties mp = new MessageProperties(); - final Object done = new Object(); - org.apache.qpid.transport.Connection conn = getConnection(opts); - org.apache.qpid.transport.Session ssn = conn.createSession(); - ssn.setSessionListener(new NativeListener() - { - private long count = 0; - private long lastTime = 0; - private long start; - - public void message(org.apache.qpid.transport.Session ssn, - MessageTransfer xfr) - { - if (count == 0) - { - start = System.currentTimeMillis(); - } - - boolean sample = opts.sample > 0 && (count % opts.sample) == 0; - long time = sample ? System.currentTimeMillis() : 0; - - if (opts.window > 0 && (count % opts.window) == 0) - { - ssn.messageTransfer("amq.direct", - MessageAcceptMode.NONE, - MessageAcquireMode.PRE_ACQUIRED, - new Header(dp, mp), - echo); - } - - if (sample) - { - sample(opts, Column.RIGHT, "NC", count, start, time, lastTime); - lastTime = time; - } - ssn.processed(xfr); - count++; - - if (opts.count > 0 && count >= opts.count) - { - synchronized (done) - { - done.notify(); - } - } - } - }); - - ssn.invoke(new QueueDeclare().queue("test-queue").durable(false)); - ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false)); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue")); - ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue")); - - ssn.invoke(new MessageSubscribe() - .queue("test-queue") - .destination("test-queue") - .acceptMode(MessageAcceptMode.NONE) - .acquireMode(MessageAcquireMode.PRE_ACQUIRED)); - ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW); - ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF); - ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF); - - synchronized (done) - { - done.wait(); - } - - ssn.messageCancel("test-queue"); - - ssn.sync(); - ssn.close(); - conn.close(); - } - -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java deleted file mode 100644 index 4092f0d59d..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.util.UUID; -import java.util.concurrent.CountDownLatch; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.client.message.AbstractJMSMessage; -import org.apache.qpid.tools.report.BasicReporter; -import org.apache.qpid.tools.report.Reporter; -import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class QpidReceive implements MessageListener -{ - private static final Logger _logger = LoggerFactory.getLogger(QpidReceive.class); - private final CountDownLatch testCompleted = new CountDownLatch(1); - - private Connection con; - private Session session; - private Destination dest; - private MessageConsumer consumer; - private boolean transacted = false; - private boolean isRollback = false; - private int txSize = 0; - private int rollbackFrequency = 0; - private int ackFrequency = 0; - private int expected = 0; - private int received = 0; - private Reporter report; - private TestConfiguration config; - - public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest) - { - this(report,config, con, dest, UUID.randomUUID().toString()); - } - - public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) - { - //System.out.println("Producer ID : " + id); - this.report = report; - this.config = config; - this.con = con; - this.dest = dest; - } - - public void setUp() throws Exception - { - con.start(); - if (config.isTransacted()) - { - session = con.createSession(true, Session.SESSION_TRANSACTED); - } - else if (config.getAckFrequency() > 0) - { - session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); - } - else - { - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - if (_logger.isDebugEnabled()) - { - System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n"); - } - - transacted = config.isTransacted(); - txSize = config.getTransactionSize(); - isRollback = config.getRollbackFrequency() > 0; - rollbackFrequency = config.getRollbackFrequency(); - ackFrequency = config.getAckFrequency(); - - _logger.debug("Ready address : " + config.getReadyAddress()); - if (config.getReadyAddress() != null) - { - MessageProducer prod = session.createProducer(AMQDestination - .createDestination(config.getReadyAddress(), false)); - prod.send(session.createMessage()); - if (_logger.isDebugEnabled()) - { - _logger.debug("Sending message to ready address " + prod.getDestination()); - } - } - } - - public void resetCounters() - { - received = 0; - expected = 0; - report.clear(); - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && - TestConfiguration.EOS.equals(((TextMessage)msg).getText())) - { - testCompleted.countDown(); - return; - } - - received++; - report.message(msg); - - if (config.isPrintHeaders()) - { - System.out.println(((AbstractJMSMessage)msg).toHeaderString()); - } - - if (config.isPrintContent()) - { - System.out.println(((AbstractJMSMessage)msg).toBodyString()); - } - - if (transacted && (received % txSize == 0)) - { - if (isRollback && (received % rollbackFrequency == 0)) - { - session.rollback(); - } - else - { - session.commit(); - } - } - else if (ackFrequency > 0) - { - msg.acknowledge(); - } - - if (received >= expected) - { - testCompleted.countDown(); - } - - } - catch(Exception e) - { - _logger.error("Error when receiving messages",e); - } - } - - public void waitforCompletion(int expected) throws Exception - { - this.expected = expected; - testCompleted.await(); - } - - public void tearDown() throws Exception - { - session.close(); - } - - public static void main(String[] args) throws Exception - { - TestConfiguration config = new JVMArgConfiguration(); - Reporter reporter = new BasicReporter(ThroughputAndLatency.class, - System.out, - config.reportEvery(), - config.isReportHeader()); - Destination dest = AMQDestination.createDestination(config.getAddress(), false); - QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest); - receiver.setUp(); - receiver.waitforCompletion(config.getMsgCount() + config.getSendEOS()); - if (config.isReportTotal()) - { - reporter.report(); - } - receiver.tearDown(); - } - -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java deleted file mode 100644 index 58a643726c..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.UUID; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.qpid.client.AMQDestination; -import org.apache.qpid.tools.TestConfiguration.MessageType; -import org.apache.qpid.tools.report.BasicReporter; -import org.apache.qpid.tools.report.Reporter; -import org.apache.qpid.tools.report.Statistics.Throughput; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class QpidSend -{ - private Connection con; - private Session session; - private Destination dest; - private MessageProducer producer; - private MessageType msgType; - private Message msg; - private Object payload; - private List<Object> payloads; - private boolean cacheMsg = false; - private boolean randomMsgSize = false; - private boolean durable = false; - private Random random; - private int msgSizeRange = 1024; - private int totalMsgCount = 0; - private boolean rateLimitProducer = false; - private boolean transacted = false; - private int txSize = 0; - - private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class); - Reporter report; - TestConfiguration config; - - public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest) - { - this(report,config, con, dest, UUID.randomUUID().toString()); - } - - public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix) - { - //System.out.println("Producer ID : " + id); - this.report = report; - this.config = config; - this.con = con; - this.dest = dest; - } - - public void setUp() throws Exception - { - con.start(); - if (config.isTransacted()) - { - session = con.createSession(true, Session.SESSION_TRANSACTED); - } - else - { - session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - durable = config.isDurable(); - rateLimitProducer = config.getSendRate() > 0 ? true : false; - if (_logger.isDebugEnabled() && rateLimitProducer) - { - _logger.debug("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec"); - } - - transacted = config.isTransacted(); - txSize = config.getTransactionSize(); - - msgType = MessageType.getType(config.getMessageType()); - // if message caching is enabled we pre create the message - // else we pre create the payload - if (config.isCacheMessage()) - { - cacheMsg = true; - msg = createMessage(createPayload(config.getMsgSize())); - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else if (config.isRandomMsgSize()) - { - random = new Random(20080921); - randomMsgSize = true; - msgSizeRange = config.getMsgSize(); - payloads = new ArrayList<Object>(msgSizeRange); - - for (int i=0; i < msgSizeRange; i++) - { - payloads.add(createPayload(i)); - } - } - else - { - payload = createPayload(config.getMsgSize()); - } - - producer = session.createProducer(dest); - if (_logger.isDebugEnabled()) - { - _logger.debug("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName()); - } - producer.setDisableMessageID(config.isDisableMessageID()); - //we add a separate timestamp to allow interoperability with other clients. - producer.setDisableMessageTimestamp(true); - if (config.getTTL() > 0) - { - producer.setTimeToLive(config.getTTL()); - } - if (config.getPriority() > 0) - { - producer.setPriority(config.getPriority()); - } - } - - Object createPayload(int size) - { - if (msgType == MessageType.TEXT) - { - return MessageFactory.createMessagePayload(size); - } - else - { - return MessageFactory.createMessagePayload(size).getBytes(); - } - } - - Message createMessage(Object payload) throws Exception - { - if (msgType == MessageType.TEXT) - { - return session.createTextMessage((String)payload); - } - else - { - BytesMessage m = session.createBytesMessage(); - m.writeBytes((byte[])payload); - return m; - } - } - - protected Message getNextMessage() throws Exception - { - if (cacheMsg) - { - return msg; - } - else - { - Message m; - - if (!randomMsgSize) - { - m = createMessage(payload); - } - else - { - m = createMessage(payloads.get(random.nextInt(msgSizeRange))); - } - m.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - return m; - } - } - - public void commit() throws Exception - { - session.commit(); - } - - public void send() throws Exception - { - send(config.getMsgCount()); - } - - public void send(int count) throws Exception - { - int sendRate = config.getSendRate(); - if (rateLimitProducer) - { - int iterations = count/sendRate; - int remainder = count%sendRate; - for (int i=0; i < iterations; i++) - { - long iterationStart = System.currentTimeMillis(); - sendMessages(sendRate); - long elapsed = System.currentTimeMillis() - iterationStart; - long diff = Clock.SEC - elapsed; - if (diff > 0) - { - // We have sent more messages in a sec than specified by the rate. - Thread.sleep(diff); - } - } - sendMessages(remainder); - } - else - { - sendMessages(count); - } - } - - private void sendMessages(int count) throws Exception - { - boolean isTimestamp = !config.isDisableTimestamp(); - long s = System.currentTimeMillis(); - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - if (isTimestamp) - { - msg.setLongProperty(TestConfiguration.TIMESTAMP, System.currentTimeMillis()); - } - producer.send(msg); - //report.message(msg); - totalMsgCount++; - - if ( transacted && ((totalMsgCount) % txSize == 0)) - { - session.commit(); - } - } - long e = System.currentTimeMillis() - s; - //System.out.println("Rate : " + totalMsgCount/e); - } - - public void resetCounters() - { - totalMsgCount = 0; - report.clear(); - } - - public void sendEndMessage() throws Exception - { - Message msg = session.createTextMessage(TestConfiguration.EOS); - producer.send(msg); - } - - public void tearDown() throws Exception - { - session.close(); - } - - public static void main(String[] args) throws Exception - { - TestConfiguration config = new JVMArgConfiguration(); - Reporter reporter = new BasicReporter(Throughput.class, - System.out, - config.reportEvery(), - config.isReportHeader() - ); - Destination dest = AMQDestination.createDestination(config.getAddress(), false); - QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest); - sender.setUp(); - sender.send(); - if (config.getSendEOS() > 0) - { - sender.sendEndMessage(); - } - if (config.isReportTotal()) - { - reporter.report(); - } - sender.tearDown(); - } -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java deleted file mode 100644 index 790ed80e5f..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java +++ /dev/null @@ -1,667 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import javax.crypto.Mac; -import javax.crypto.spec.SecretKeySpec; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.codec.binary.Base64; -import org.apache.qpid.tools.util.ArgumentsParser; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.type.TypeReference; - -public class RestStressTestClient -{ - - public static void main(String[] args) throws Exception - { - ArgumentsParser parser = new ArgumentsParser(); - Arguments arguments; - try - { - arguments = parser.parse(args, Arguments.class); - arguments.validate(); - } - catch(IllegalArgumentException e) - { - System.out.println("Invalid argument:" + e.getMessage()); - parser.usage(Arguments.class, Arguments.REQUIRED); - System.out.println("\nRun examples:" ); - System.out.println(" Using Basic authentication:" ); - System.out.println(" java -cp qpid-tools.jar:commons-codec.jar:jackson-core.jar:jackson-mapper.jar \\" ); - System.out.println(" -Djavax.net.ssl.trustStore=java_client_truststore.jks \\"); - System.out.println(" -Djavax.net.ssl.trustStorePassword=password \\"); - System.out.println(" org.apache.qpid.tools.RestStressTestClient \\"); - System.out.println(" repetitions=10 brokerUrl=https://localhost:8081 username=admin password=admin \\"); - System.out.println(" virtualHost=default virtualHostNode=default createQueue=true bindQueue=true \\"); - System.out.println(" deleteQueue=true uniqueQueues=true queueName=boo exchangeName=amq.fanout" ); - System.out.println(" Using CRAM-MD5 SASL authentication:" ); - System.out.println(" java -cp qpid-tools.jar:commons-codec.jar:jackson-core.jar:jackson-mapper.jar \\" ); - System.out.println(" org.apache.qpid.tools.RestStressTestClient saslMechanism=CRAM-MD5 \\"); - System.out.println(" repetitions=10 brokerUrl=http://localhost:8080 username=admin password=admin \\"); - System.out.println(" virtualHost=default virtualHostNode=default createQueue=true bindQueue=true \\"); - System.out.println(" deleteQueue=true uniqueQueues=true queueName=boo exchangeName=amq.fanout" ); - return; - } - - RestStressTestClient client = new RestStressTestClient(); - client.run(arguments); - } - - public void run(Arguments arguments) throws IOException - { - log(arguments.toString()); - for (int i = 0; i < arguments.getRepetitions(); i++) - { - runIteration(arguments, i); - } - } - - private void runIteration(Arguments arguments, int iteration) throws IOException - { - log("Iteration " + iteration); - - RestClient client = new RestClient(arguments.getBrokerUrl(), arguments.getUsername(), arguments.getPassword(), arguments.getSaslMechanism()); - client.authenticateIfSaslAuthenticationRequested(); - try - { - List<Map<String, Object>> brokerData = client.get("/api/latest/broker?depth=0"); - log(" Connected to broker " + brokerData.get(0).get("name")); - createAndBindQueueIfRequired(arguments, client, iteration); - } - finally - { - if (arguments.isLogout()) - { - client.logout(); - } - } - } - - private void log(String logMessage) - { - System.out.println(logMessage); - } - - private void createAndBindQueueIfRequired(Arguments arguments, RestClient client, int iteration) throws IOException - { - if (arguments.isCreateQueue()) - { - String virtualHostNode = arguments.getVirtualHostNode(); - String virtualHost = arguments.getVirtualHost(); - String queueName = arguments.getQueueName(); - - if (queueName == null) - { - queueName = "temp-queue-" + System.nanoTime(); - } - else if (arguments.isUniqueQueues()) - { - queueName = queueName + "-" + iteration; - } - - createQueue(client, virtualHostNode, virtualHost, queueName); - - if (arguments.isBindQueue()) - { - bindQueue(client, virtualHostNode, virtualHost, queueName, arguments.getExchangeName()); - } - - if (arguments.isDeleteQueue()) - { - deleteQueue(client, virtualHostNode, virtualHost, queueName); - } - } - } - - private void createQueue(RestClient client, String virtualHostNode, String virtualHost, String queueName) throws IOException - { - log(" Create queue " + queueName); - - String queueUrl = getQueueServiceUrl(virtualHostNode, virtualHost, queueName); - Map<String, Object> queueData = new HashMap<>(); - queueData.put("name", queueName); - queueData.put("durable", true); - - int result = client.put(queueUrl, queueData); - - if (result != RestClient.RESPONSE_PUT_CREATE_OK) - { - throw new RuntimeException("Failure to create queue " + queueName); - } - } - - private String getQueueServiceUrl(String virtualHostNode, String virtualHost, String queueName) - { - return "/api/latest/queue/" + virtualHostNode + "/" + virtualHost + "/" + queueName; - } - - private void deleteQueue(RestClient client, String virtualHostNode, String virtualHost, String queueName) throws IOException - { - log(" Delete queue " + queueName); - int result = client.delete(getQueueServiceUrl(virtualHostNode, virtualHost, queueName)); - if (result != RestClient.RESPONSE_PUT_UPDATE_OK) - { - throw new RuntimeException("Failure to delete queue " + queueName); - } - } - - private void bindQueue(RestClient client, String virtualHostNode, String virtualHost, String queueName, String exchangeName) - throws IOException - { - if (exchangeName == null) - { - exchangeName = "amq.direct"; - } - - log(" Bind queue " + queueName + " to " + exchangeName + " using binding key " + queueName); - - String bindingUrl = "/api/latest/binding/" + virtualHostNode + "/" + virtualHost + "/" + exchangeName + "/" + queueName + "/" + queueName; - - Map<String, Object> bindingData = new HashMap<>(); - bindingData.put("name", queueName); - bindingData.put("queue", queueName); - bindingData.put("exchange", exchangeName); - - int result = client.put(bindingUrl, bindingData); - - if (result != RestClient.RESPONSE_PUT_CREATE_OK) - { - throw new RuntimeException("Failure to bind queue " + queueName + " to " + exchangeName); - } - } - - public static class RestClient - { - private static final TypeReference<List<LinkedHashMap<String, Object>>> TYPE_LIST_OF_LINKED_HASH_MAPS = new TypeReference<List<LinkedHashMap<String, Object>>>() - { - }; - - private static final TypeReference<LinkedHashMap<String, Object>> TYPE_HASH_MAP = new TypeReference<LinkedHashMap<String, Object>>() - { - }; - - public static final int RESPONSE_PUT_CREATE_OK = 201; - public static final int RESPONSE_PUT_UPDATE_OK = 200; - public static final int RESPONSE_OK = 200; - public static final int RESPONSE_AUTHENTICATION_REQUIRED = 401; - - private final ObjectMapper _mapper; - private final String _brokerUrl; - private final String _username; - private final String _password; - private final String _saslMechanism; - private final String _authorizationHeader; - - private List<String> _cookies; - - public RestClient(String brokerUrl, String username, String password, String saslMechanism) - { - _mapper = new ObjectMapper(); - _brokerUrl = brokerUrl; - _username = username; - _password = password; - _saslMechanism = saslMechanism; - - if (saslMechanism == null) - { - _authorizationHeader = "Basic " + new String(new Base64().encode((_username + ":" + _password).getBytes())); - } - else - { - _authorizationHeader = null; - } - } - - public List<Map<String, Object>> get(String restServiceUrl) throws IOException - { - HttpURLConnection connection = createConnection("GET", restServiceUrl, _cookies); - try - { - connection.connect(); - byte[] data = readConnectionInputStream(connection); - checkResponseCode(connection); - return _mapper.readValue(new ByteArrayInputStream(data), TYPE_LIST_OF_LINKED_HASH_MAPS); - } - finally - { - connection.disconnect(); - } - } - - public int put(String restServiceUrl, Map<String, Object> attributes) throws IOException - { - HttpURLConnection connection = createConnection("PUT", restServiceUrl, _cookies); - try - { - connection.connect(); - if (attributes != null) - { - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(connection.getOutputStream(), attributes); - } - checkResponseCode(connection); - return connection.getResponseCode(); - } - finally - { - connection.disconnect(); - } - } - - public int delete(String restServiceUrl) throws IOException - { - HttpURLConnection connection = createConnection("DELETE", restServiceUrl, _cookies); - try - { - checkResponseCode(connection); - return connection.getResponseCode(); - } - finally - { - connection.disconnect(); - } - } - - public int post(String restServiceUrl, Map<String, String> postData) throws IOException - { - HttpURLConnection connection = createConnectionAndPostData(restServiceUrl, postData, _cookies); - try - { - checkResponseCode(connection); - return connection.getResponseCode(); - } - finally - { - connection.disconnect(); - } - } - - private HttpURLConnection createConnectionAndPostData(String restServiceUrl, Map<String, String> postData, List<String> cookies) throws IOException - { - String postParameters = getPostDataString(postData); - HttpURLConnection connection = createConnection("POST", restServiceUrl, cookies); - try - { - OutputStream os = connection.getOutputStream(); - os.write(postParameters.getBytes()); - os.flush(); - } - catch (IOException e) - { - connection.disconnect(); - throw e; - } - return connection; - } - - private void checkResponseCode(HttpURLConnection connection) throws IOException - { - if (connection.getResponseCode() == RESPONSE_AUTHENTICATION_REQUIRED) - { - _cookies = null; - throw new IllegalArgumentException("Authentication is required"); - } - } - - private String getPostDataString(Map<String, String> postData) - { - StringBuilder sb = new StringBuilder(); - if (postData != null) - { - Iterator<String> iterator = postData.keySet().iterator(); - while (iterator.hasNext()) - { - String key = iterator.next(); - sb.append(key + "=" + postData.get(key)); - if (iterator.hasNext()) - { - sb.append("&"); - } - } - } - return sb.toString(); - } - - private HttpURLConnection createConnection(String method, String restServiceUrl, List<String> cookies) throws IOException - { - HttpURLConnection httpConnection = (HttpURLConnection) new URL(_brokerUrl + restServiceUrl).openConnection(); - if (cookies != null) - { - for (String cookie : cookies) - { - httpConnection.addRequestProperty("Cookie", cookie.split(";", 2)[0]); - } - } - if (_saslMechanism == null) - { - httpConnection.setRequestProperty("Authorization", _authorizationHeader); - } - - httpConnection.setDoOutput(true); - httpConnection.setRequestMethod(method); - return httpConnection; - } - - public void authenticateIfSaslAuthenticationRequested() throws IOException - { - if (_saslMechanism == null) - { - // basic authentication will be used with each request - } - else if ("CRAM-MD5".equals(_saslMechanism)) - { - _cookies = performCramMD5Authentication(); - } - else - { - throw new IllegalArgumentException("Unsupported SASL mechanism :" + _saslMechanism); - } - } - - - public void logout() throws IOException - { - if (_cookies != null) - { - HttpURLConnection connection = createConnection("GET", "/service/logout", _cookies); - try - { - connection.connect(); - _cookies = null; - } - finally - { - connection.disconnect(); - } - } - - //TODO: we need to track sessions for basic auth in order to logout those - } - - private List<String> performCramMD5Authentication() throws IOException - { - // request the challenge for CRAM-MD5 - HttpURLConnection connection = createConnectionAndPostData("/service/sasl", Collections.singletonMap("mechanism", "CRAM-MD5"), null); - try - { - List<String> cookies = connection.getHeaderFields().get("Set-Cookie"); - - // get response - byte[] data = readConnectionInputStream(connection); - Map<String, Object> response = _mapper.readValue(new ByteArrayInputStream(data), TYPE_HASH_MAP); - String challenge = (String) response.get("challenge"); - - // generate the authentication response for the received challenge - String responseData = generateResponseForChallengeAndCredentials(challenge, _username, _password); - - Map<String, String> saslResponse = new HashMap<>(); - saslResponse.put("id", (String)response.get("id")); - saslResponse.put("response", responseData); - - HttpURLConnection authenticateConnection = createConnectionAndPostData("/service/sasl", saslResponse, cookies); - try - { - int code = authenticateConnection.getResponseCode(); - if (code != RESPONSE_OK) - { - throw new RuntimeException("Authentication failed"); - } - else - { - return cookies; - } - } - finally - { - authenticateConnection.disconnect(); - } - } - finally - { - connection.disconnect(); - } - } - - private String generateResponseForChallengeAndCredentials(String challenge, String username, String password) - { - try - { - byte[] challengeBytes = Base64.decodeBase64(challenge); - - String macAlgorithm = "HmacMD5"; - Mac mac = Mac.getInstance(macAlgorithm); - mac.init(new SecretKeySpec(password.getBytes("UTF-8"), macAlgorithm)); - final byte[] messageAuthenticationCode = mac.doFinal(challengeBytes); - String responseAsString = username + " " + toHex(messageAuthenticationCode); - byte[] responseBytes = responseAsString.getBytes(); - return Base64.encodeBase64String(responseBytes); - } - catch (Exception e) - { - throw new IllegalArgumentException("Unexpected exception", e); - } - } - - private String toHex(byte[] data) - { - StringBuffer hash = new StringBuffer(); - for (int i = 0; i < data.length; i++) - { - String hex = Integer.toHexString(0xFF & data[i]); - if (hex.length() == 1) - { - hash.append('0'); - } - hash.append(hex); - } - return hash.toString(); - } - - private byte[] readConnectionInputStream(HttpURLConnection connection) throws IOException - { - if (connection.getResponseCode() == RESPONSE_AUTHENTICATION_REQUIRED) - { - _cookies = null; - } - InputStream is = connection.getInputStream(); - try(ByteArrayOutputStream baos = new ByteArrayOutputStream()) - { - byte[] buffer = new byte[1024]; - int len; - while ((len = is.read(buffer)) != -1) - { - baos.write(buffer, 0, len); - } - return baos.toByteArray(); - } - } - - } - - public static class Arguments - { - private static final Set<String> REQUIRED = new HashSet<>(Arrays.asList("brokerUrl", "username", "password")); - - private String brokerUrl = null; - private String username = null; - private String password = null; - private String saslMechanism = null; - - private String virtualHostNode = null; - private String virtualHost = null; - private String queueName = null; - private String exchangeName = null; - - private int repetitions = 1; - - private boolean createQueue = false; - private boolean deleteQueue = false; - private boolean uniqueQueues = false; - private boolean bindQueue = false; - - private boolean logout = true; - - public Arguments() - { - } - - public void validate() - { - if (brokerUrl == null || brokerUrl.equals("")) - { - throw new IllegalArgumentException("Mandatory argument 'brokerUrl' is not specified"); - } - - if (username == null || username.equals("")) - { - throw new IllegalArgumentException("Mandatory argument 'username' is not specified"); - } - - if (password == null || password.equals("")) - { - throw new IllegalArgumentException("Mandatory argument 'password' is not specified"); - } - - if (createQueue) - { - if (virtualHostNode == null || virtualHostNode.equals("")) - { - throw new IllegalArgumentException("Virtual host node name needs to be specified for queue creation"); - } - - if (virtualHost == null || virtualHost.equals("")) - { - throw new IllegalArgumentException("Virtual host name needs to be specified for queue creation"); - } - } - } - - public String getUsername() - { - return username; - } - - public String getPassword() - { - return password; - } - - public String getVirtualHost() - { - return virtualHost; - } - - public boolean isCreateQueue() - { - return createQueue; - } - - public boolean isDeleteQueue() - { - return deleteQueue; - } - - public boolean isUniqueQueues() - { - return uniqueQueues; - } - - public String getQueueName() - { - return queueName; - } - - public boolean isBindQueue() - { - return bindQueue; - } - - public String getExchangeName() - { - return exchangeName; - } - - public String getVirtualHostNode() - { - return virtualHostNode; - } - - - public int getRepetitions() - { - return repetitions; - } - - public String getBrokerUrl() - { - return brokerUrl; - } - - public String getSaslMechanism() - { - return saslMechanism; - } - - public boolean isLogout() - { - return logout; - } - - @Override - public String toString() - { - return "Arguments{" + - "brokerUrl='" + brokerUrl + '\'' + - ", username='" + username + '\'' + - ", password='" + password + '\'' + - ", saslMechanism='" + saslMechanism + '\'' + - ", virtualHostNode='" + virtualHostNode + '\'' + - ", virtualHost='" + virtualHost + '\'' + - ", queueName='" + queueName + '\'' + - ", exchangeName='" + exchangeName + '\'' + - ", repetitions=" + repetitions + - ", createQueue=" + createQueue + - ", deleteQueue=" + deleteQueue + - ", uniqueQueues=" + uniqueQueues + - ", bindQueue=" + bindQueue + - ", logout=" + logout + - '}'; - } - } - -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java deleted file mode 100644 index 6494a2e800..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java +++ /dev/null @@ -1,446 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import java.util.Random; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; - -public class StressTestClient -{ - private static final String QUEUE_NAME_PREFIX = "BURL:direct://amq.direct//stress-test-queue"; - private static final String DURABLE_SUFFIX = "?durable='true'"; - - public static final String CONNECTIONS_ARG = "connections"; - public static final String SESSIONS_ARG = "sessions"; - public static final String CONSUME_IMMEDIATELY_ARG = "consumeImmediately"; - public static final String CONSUMERS_ARG = "consumers"; - public static final String CLOSE_CONSUMERS_ARG = "closeconsumers"; - public static final String PRODUCERS_ARG = "producers"; - public static final String MESSAGE_COUNT_ARG = "messagecount"; - public static final String MESSAGE_SIZE_ARG = "size"; - public static final String SUFFIX_ARG = "suffix"; - public static final String REPETITIONS_ARG = "repetitions"; - public static final String PERSISTENT_ARG = "persistent"; - public static final String RANDOM_ARG = "random"; - public static final String TIMEOUT_ARG = "timeout"; - public static final String DELAYCLOSE_ARG = "delayclose"; - public static final String REPORT_MOD_ARG = "reportmod"; - public static final String LOW_PREFETCH_ARG = "lowprefetch"; - public static final String TRANSACTED_ARG = "transacted"; - public static final String TX_BATCH_ARG = "txbatch"; - - public static final String CONNECTIONS_DEFAULT = "1"; - public static final String SESSIONS_DEFAULT = "1"; - public static final String CONSUME_IMMEDIATELY_DEFAULT = "true"; - public static final String CLOSE_CONSUMERS_DEFAULT = "true"; - public static final String PRODUCERS_DEFAULT = "1"; - public static final String CONSUMERS_DEFAULT = "1"; - public static final String MESSAGE_COUNT_DEFAULT = "1"; - public static final String MESSAGE_SIZE_DEFAULT = "256"; - public static final String SUFFIX_DEFAULT = ""; - public static final String REPETITIONS_DEFAULT = "1"; - public static final String PERSISTENT_DEFAULT = "false"; - public static final String RANDOM_DEFAULT = "true"; - public static final String TIMEOUT_DEFAULT = "30000"; - public static final String DELAYCLOSE_DEFAULT = "0"; - public static final String REPORT_MOD_DEFAULT = "1"; - public static final String LOW_PREFETCH_DEFAULT = "false"; - public static final String TRANSACTED_DEFAULT = "false"; - public static final String TX_BATCH_DEFAULT = "1"; - - private static final String CLASS = "StressTestClient"; - - public static void main(String[] args) - { - Map<String,String> options = new HashMap<>(); - options.put(CONNECTIONS_ARG, CONNECTIONS_DEFAULT); - options.put(SESSIONS_ARG, SESSIONS_DEFAULT); - options.put(CONSUME_IMMEDIATELY_ARG, CONSUME_IMMEDIATELY_DEFAULT); - options.put(PRODUCERS_ARG, PRODUCERS_DEFAULT); - options.put(CONSUMERS_ARG, CONSUMERS_DEFAULT); - options.put(CLOSE_CONSUMERS_ARG, CLOSE_CONSUMERS_DEFAULT); - options.put(MESSAGE_COUNT_ARG, MESSAGE_COUNT_DEFAULT); - options.put(MESSAGE_SIZE_ARG, MESSAGE_SIZE_DEFAULT); - options.put(SUFFIX_ARG, SUFFIX_DEFAULT); - options.put(REPETITIONS_ARG, REPETITIONS_DEFAULT); - options.put(PERSISTENT_ARG, PERSISTENT_DEFAULT); - options.put(RANDOM_ARG, RANDOM_DEFAULT); - options.put(TIMEOUT_ARG, TIMEOUT_DEFAULT); - options.put(DELAYCLOSE_ARG, DELAYCLOSE_DEFAULT); - options.put(REPORT_MOD_ARG, REPORT_MOD_DEFAULT); - options.put(LOW_PREFETCH_ARG, LOW_PREFETCH_DEFAULT); - options.put(TRANSACTED_ARG, TRANSACTED_DEFAULT); - options.put(TX_BATCH_ARG, TX_BATCH_DEFAULT); - - if(args.length == 1 && - (args[0].equals("-h") || args[0].equals("--help") || args[0].equals("help"))) - { - System.out.println("arg=value options: \n" + options.keySet()); - return; - } - - parseArgumentsIntoConfig(options, args); - - StressTestClient testClient = new StressTestClient(); - testClient.runTest(options); - } - - public static void parseArgumentsIntoConfig(Map<String, String> initialValues, String[] args) - { - for(String arg: args) - { - String[] splitArg = arg.split("="); - if(splitArg.length != 2) - { - throw new IllegalArgumentException("arguments must have format <name>=<value>: " + arg); - } - - if(initialValues.put(splitArg[0], splitArg[1]) == null) - { - throw new IllegalArgumentException("not a valid configuration property: " + arg); - } - } - } - - - private void runTest(Map<String,String> options) - { - int numConnections = Integer.parseInt(options.get(CONNECTIONS_ARG)); - int numSessions = Integer.parseInt(options.get(SESSIONS_ARG)); - int numProducers = Integer.parseInt(options.get(PRODUCERS_ARG)); - int numConsumers = Integer.parseInt(options.get(CONSUMERS_ARG)); - boolean closeConsumers = Boolean.valueOf(options.get(CLOSE_CONSUMERS_ARG)); - boolean consumeImmediately = Boolean.valueOf(options.get(CONSUME_IMMEDIATELY_ARG)); - int numMessage = Integer.parseInt(options.get(MESSAGE_COUNT_ARG)); - int messageSize = Integer.parseInt(options.get(MESSAGE_SIZE_ARG)); - int repetitions = Integer.parseInt(options.get(REPETITIONS_ARG)); - String queueString = QUEUE_NAME_PREFIX + options.get(SUFFIX_ARG) + DURABLE_SUFFIX; - int deliveryMode = Boolean.valueOf(options.get(PERSISTENT_ARG)) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; - boolean random = Boolean.valueOf(options.get(RANDOM_ARG)); - long recieveTimeout = Long.parseLong(options.get(TIMEOUT_ARG)); - long delayClose = Long.parseLong(options.get(DELAYCLOSE_ARG)); - int reportingMod = Integer.parseInt(options.get(REPORT_MOD_ARG)); - boolean lowPrefetch = Boolean.valueOf(options.get(LOW_PREFETCH_ARG)); - boolean transacted = Boolean.valueOf(options.get(TRANSACTED_ARG)); - int txBatch = Integer.parseInt(options.get(TX_BATCH_ARG)); - - System.out.println(CLASS + ": Using options: " + options); - - System.out.println(CLASS + ": Creating message payload of " + messageSize + " (bytes)"); - byte[] sentBytes = generateMessage(random, messageSize); - - try - { - // Load JNDI properties - Properties properties = new Properties(); - try(InputStream is = this.getClass().getClassLoader().getResourceAsStream("stress-test-client.properties")) - { - properties.load(is); - } - Context ctx = new InitialContext(properties); - - ConnectionFactory conFac; - if(lowPrefetch) - { - System.out.println(CLASS + ": Using lowprefetch connection factory"); - conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactoryLowPrefetch"); - } - else - { - conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory"); - } - - //ensure the queue to be used exists and is bound - System.out.println(CLASS + ": Creating queue: " + queueString); - Connection startupConn = conFac.createConnection(); - Session startupSess = startupConn.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination startupDestination = startupSess.createQueue(queueString); - MessageConsumer startupConsumer = startupSess.createConsumer(startupDestination); - startupConsumer.close(); - startupSess.close(); - startupConn.close(); - - for(int rep = 1 ; rep <= repetitions; rep++) - { - ArrayList<Connection> connectionList = new ArrayList<>(); - - for (int co= 1; co<= numConnections ; co++) - { - if( co % reportingMod == 0) - { - System.out.println(CLASS + ": Creating connection " + co); - } - Connection conn = conFac.createConnection(); - conn.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException jmse) - { - System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); - jmse.printStackTrace(); - System.exit(0); - } - }); - - connectionList.add(conn); - conn.start(); - for (int se= 1; se<= numSessions ; se++) - { - if( se % reportingMod == 0) - { - System.out.println(CLASS + ": Creating Session " + se); - } - try - { - Session sess; - if(transacted) - { - sess = conn.createSession(true, Session.SESSION_TRANSACTED); - } - else - { - sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - BytesMessage message = sess.createBytesMessage(); - - message.writeBytes(sentBytes); - - if(!random && numMessage == 1 && numSessions == 1 && numConnections == 1 && repetitions == 1) - { - //null the array to save memory - sentBytes = null; - } - - Destination destination = sess.createQueue(queueString); - - MessageConsumer consumer = null; - for(int cns = 1 ; cns <= numConsumers ; cns++) - { - if( cns % reportingMod == 0) - { - System.out.println(CLASS + ": Creating Consumer " + cns); - } - consumer = sess.createConsumer(destination); - } - - for(int pr = 1 ; pr <= numProducers ; pr++) - { - if( pr % reportingMod == 0) - { - System.out.println(CLASS + ": Creating Producer " + pr); - } - MessageProducer prod = sess.createProducer(destination); - for(int me = 1; me <= numMessage ; me++) - { - if( me % reportingMod == 0) - { - System.out.println(CLASS + ": Sending Message " + me); - } - prod.send(message, deliveryMode, - Message.DEFAULT_PRIORITY, - Message.DEFAULT_TIME_TO_LIVE); - if(transacted && me % txBatch == 0) - { - sess.commit(); - } - } - } - - if(numConsumers > 0 && consumeImmediately) - { - for(int cs = 1 ; cs <= numMessage ; cs++) - { - if( cs % reportingMod == 0) - { - System.out.println(CLASS + ": Consuming Message " + cs); - } - BytesMessage msg = (BytesMessage) consumer.receive(recieveTimeout); - - if(transacted && cs % txBatch == 0) - { - sess.commit(); - } - - if(msg == null) - { - throw new RuntimeException("Expected message not received in allowed time: " + recieveTimeout); - } - - validateReceivedMessageContent(sentBytes, msg, random, messageSize); - } - - if(closeConsumers) - { - sess.close(); - } - } - - } - catch (Exception exp) - { - System.err.println(CLASS + ": Caught an Exception: " + exp); - exp.printStackTrace(); - } - - } - } - - if(numConsumers == -1 && !consumeImmediately) - { - System.out.println(CLASS + ": Consuming left over messages, using recieve timeout:" + recieveTimeout); - - Connection conn = conFac.createConnection(); - Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = sess.createQueue(queueString); - MessageConsumer consumer = sess.createConsumer(destination); - conn.start(); - - int count = 0; - while(true) - { - BytesMessage msg = (BytesMessage) consumer.receive(recieveTimeout); - - if(msg == null) - { - System.out.println(CLASS + ": Received " + count + " messages"); - break; - } - else - { - count++; - } - - validateReceivedMessageContent(sentBytes, msg, random, messageSize); - } - - consumer.close(); - sess.close(); - conn.close(); - } - - if(delayClose > 0) - { - System.out.println(CLASS + ": Delaying closing connections: " + delayClose); - Thread.sleep(delayClose); - } - - // Close the connections to the server - System.out.println(CLASS + ": Closing connections"); - - for(int connection = 0 ; connection < connectionList.size() ; connection++) - { - if( (connection+1) % reportingMod == 0) - { - System.out.println(CLASS + ": Closing connection " + (connection+1)); - } - Connection c = connectionList.get(connection); - c.close(); - } - - // Close the JNDI reference - System.out.println(CLASS + ": Closing JNDI context"); - ctx.close(); - } - } - catch (Exception exp) - { - System.err.println(CLASS + ": Caught an Exception: " + exp); - exp.printStackTrace(); - } - } - - - private byte[] generateMessage(boolean random, int messageSize) - { - byte[] sentBytes = new byte[messageSize]; - if(random) - { - //fill the array with numbers from 0-9 - Random rand = new Random(System.currentTimeMillis()); - for(int r = 0 ; r < messageSize ; r++) - { - sentBytes[r] = (byte) (48 + rand.nextInt(10)); - } - } - else - { - //use sequential numbers from 0-9 - for(int r = 0 ; r < messageSize ; r++) - { - sentBytes[r] = (byte) (48 + (r % 10)); - } - } - return sentBytes; - } - - - private void validateReceivedMessageContent(byte[] sentBytes, - BytesMessage msg, boolean random, int messageSize) throws JMSException - { - Long length = msg.getBodyLength(); - - if(length != messageSize) - { - throw new RuntimeException("Incorrect number of bytes received"); - } - - byte[] recievedBytes = new byte[length.intValue()]; - msg.readBytes(recievedBytes); - - if(random) - { - if(!Arrays.equals(sentBytes, recievedBytes)) - { - throw new RuntimeException("Incorrect value of bytes received"); - } - } - else - { - for(int r = 0 ; r < messageSize ; r++) - { - if(! (recievedBytes[r] == (byte) (48 + (r % 10)))) - { - throw new RuntimeException("Incorrect value of bytes received"); - } - } - } - } -} - diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java deleted file mode 100644 index 18870bac59..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools; - -import java.text.DecimalFormat; - -import javax.jms.Connection; - -public interface TestConfiguration -{ - enum MessageType { - BYTES, TEXT, MAP, OBJECT; - - public static MessageType getType(String s) throws Exception - { - if ("text".equalsIgnoreCase(s)) - { - return TEXT; - } - else if ("bytes".equalsIgnoreCase(s)) - { - return BYTES; - } - /*else if ("map".equalsIgnoreCase(s)) - { - return MAP; - } - else if ("object".equalsIgnoreCase(s)) - { - return OBJECT; - }*/ - else - { - throw new Exception("Unsupported message type"); - } - } - }; - - public final static String TIMESTAMP = "ts"; - - public final static String EOS = "eos"; - - public final static String SEQUENCE_NUMBER = "sn"; - - public String getUrl(); - - public String getHost(); - - public int getPort(); - - public String getAddress(); - - public long getTimeout(); - - public int getAckMode(); - - public int getMsgCount(); - - public int getMsgSize(); - - public int getRandomMsgSizeStartFrom(); - - public boolean isDurable(); - - public boolean isTransacted(); - - public int getTransactionSize(); - - public int getWarmupCount(); - - public boolean isCacheMessage(); - - public boolean isDisableMessageID(); - - public boolean isDisableTimestamp(); - - public boolean isRandomMsgSize(); - - public String getMessageType(); - - public boolean isPrintStdDev(); - - public int getSendRate(); - - public boolean isExternalController(); - - public boolean isUseUniqueDests(); - - public int getAckFrequency(); - - public Connection createConnection() throws Exception; - - public DecimalFormat getDecimalFormat(); - - public int reportEvery(); - - public boolean isReportTotal(); - - public boolean isReportHeader(); - - public int getSendEOS(); - - public int getConnectionCount(); - - public int getRollbackFrequency(); - - public boolean isPrintHeaders(); - - public boolean isPrintContent(); - - public long getTTL(); - - public int getPriority(); - - public String getReadyAddress(); -}
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java deleted file mode 100644 index a9896c1d4e..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools.report; - -import java.io.PrintStream; -import java.lang.reflect.Constructor; - -import javax.jms.Message; - -public class BasicReporter implements Reporter -{ - PrintStream out; - int batchSize = 0; - int batchCount = 0; - boolean headerPrinted = false; - protected Statistics overall; - Statistics batch; - Constructor<? extends Statistics> statCtor; - - public BasicReporter(Class<? extends Statistics> clazz, PrintStream out, - int batchSize, boolean printHeader) throws Exception - { - this.out = out; - this.batchSize = batchSize; - this.headerPrinted = !printHeader; - statCtor = clazz.getConstructor(); - overall = statCtor.newInstance(); - if (batchSize > 0) - { - batch = statCtor.newInstance(); - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.report.Reporter#message(javax.jms.Message) - */ - @Override - public void message(Message msg) - { - overall.message(msg); - if (batchSize > 0) - { - batch.message(msg); - if (++batchCount == batchSize) - { - if (!headerPrinted) - { - header(); - } - batch.report(out); - batch.clear(); - batchCount = 0; - } - } - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.report.Reporter#report() - */ - @Override - public void report() - { - if (!headerPrinted) - { - header(); - } - overall.report(out); - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.report.Reporter#header() - */ - @Override - public void header() - { - headerPrinted = true; - overall.header(out); - } - - /* (non-Javadoc) - * @see org.apache.qpid.tools.report.Reporter#log() - */ - @Override - public void log(String s) - { - // NOOP - } - - @Override - public void clear() - { - batch.clear(); - overall.clear(); - } -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java deleted file mode 100644 index e9bf7100c1..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools.report; - -import java.io.PrintStream; - -import org.apache.qpid.tools.report.Statistics.Throughput; -import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency; - -public class MercuryReporter extends BasicReporter -{ - MercuryStatistics stats; - - public MercuryReporter(Class<? extends MercuryStatistics> clazz, PrintStream out, - int batchSize, boolean printHeader) throws Exception - { - super(clazz, out, batchSize, printHeader); - stats = (MercuryStatistics)overall; - } - - public double getRate() - { - return stats.getRate(); - } - - public double getAvgLatency() - { - return stats.getAvgLatency(); - } - - public double getStdDev() - { - return stats.getStdDev(); - } - - public double getMinLatency() - { - return stats.getMinLatency(); - } - - public double getMaxLatency() - { - return stats.getMaxLatency(); - } - - public int getSampleSize() - { - return stats.getSampleSize(); - } - - public interface MercuryStatistics extends Statistics - { - public double getRate(); - public long getMinLatency(); - public long getMaxLatency(); - public double getAvgLatency(); - public double getStdDev(); - public int getSampleSize(); - } - - public static class MercuryThroughput extends Throughput implements MercuryStatistics - { - double rate = 0; - - @Override - public void report(PrintStream out) - { - long elapsed = System.currentTimeMillis() - start; - rate = (double)messages/(double)elapsed; - } - - @Override - public void clear() - { - super.clear(); - rate = 0; - } - - public double getRate() - { - return rate; - } - - public int getSampleSize() - { - return messages; - } - - public long getMinLatency() { return 0; } - public long getMaxLatency() { return 0; } - public double getAvgLatency(){ return 0; } - public double getStdDev(){ return 0; } - - } - - public static class MercuryThroughputAndLatency extends ThroughputAndLatency implements MercuryStatistics - { - double rate = 0; - double avgLatency = 0; - double stdDev; - - @Override - public void report(PrintStream out) - { - long elapsed = System.currentTimeMillis() - start; - rate = (double)messages/(double)elapsed; - avgLatency = totalLatency/(double)sampleCount; - } - - @Override - public void clear() - { - super.clear(); - rate = 0; - avgLatency = 0; - } - - public double getRate() - { - return rate; - } - - public long getMinLatency() - { - return minLatency; - } - - public long getMaxLatency() - { - return maxLatency; - } - - public double getAvgLatency() - { - return avgLatency; - } - - public double getStdDev() - { - return stdDev; - } - - public int getSampleSize() - { - return messages; - } - } - -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java deleted file mode 100644 index 5e481458be..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools.report; - -import javax.jms.Message; - -public interface Reporter -{ - - public void message(Message msg); - - public void report(); - - public void header(); - - // Will be used by some reporters to print statements which are greped by - // scripts. Example see java/tools/bin/perf-report - public void log(String s); - - public void clear(); - -}
\ No newline at end of file diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java deleted file mode 100644 index db8b4ddcee..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools.report; - -import java.io.PrintStream; -import java.text.DecimalFormat; - -import javax.jms.Message; - -import org.apache.qpid.tools.TestConfiguration; - -public interface Statistics -{ - public void message(Message msg); - public void report(PrintStream out); - public void header(PrintStream out); - public void clear(); - - static class Throughput implements Statistics - { - DecimalFormat df = new DecimalFormat("###"); - int messages = 0; - long start = 0; - boolean started = false; - - @Override - public void message(Message msg) - { - ++messages; - if (!started) - { - start = System.currentTimeMillis(); - started = true; - } - } - - @Override - public void report(PrintStream out) - { - long elapsed = System.currentTimeMillis() - start; - out.println(df.format((double)messages/(double)elapsed)); - } - - @Override - public void header(PrintStream out) - { - out.println("tp(m/s)"); - } - - public void clear() - { - messages = 0; - start = 0; - started = false; - } - } - - static class ThroughputAndLatency extends Throughput - { - long minLatency = Long.MAX_VALUE; - long maxLatency = Long.MIN_VALUE; - double totalLatency = 0; - int sampleCount = 0; - - @Override - public void message(Message msg) - { - super.message(msg); - try - { - long ts = msg.getLongProperty(TestConfiguration.TIMESTAMP); - long latency = System.currentTimeMillis() - ts; - minLatency = Math.min(latency, minLatency); - maxLatency = Math.max(latency, maxLatency); - totalLatency = totalLatency + latency; - sampleCount++; - } - catch(Exception e) - { - System.out.println("Error calculating latency " + e); - } - } - - @Override - public void report(PrintStream out) - { - long elapsed = System.currentTimeMillis() - start; - double rate = (double)messages/(double)elapsed; - double avgLatency = totalLatency/(double)sampleCount; - out.append("\n") - .append(df.format(rate)) - .append('\t') - .append(String.valueOf(minLatency)) - .append('\t') - .append(String.valueOf(maxLatency)) - .append('\t') - .append(df.format(avgLatency)) - .append("\n"); - - out.flush(); - } - - @Override - public void header(PrintStream out) - { - out.append("tp(m/s)") - .append('\t') - .append("l-min") - .append('\t') - .append("l-max") - .append('\t') - .append("l-avg"); - - out.flush(); - } - - public void clear() - { - super.clear(); - minLatency = 0; - maxLatency = 0; - totalLatency = 0; - sampleCount = 0; - } - } - -} diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java deleted file mode 100644 index a71b466a0f..0000000000 --- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.tools.util; - - -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.Set; - -public class ArgumentsParser -{ - public ArgumentsParser() - { - } - - public <T> T parse(String[] args, Class<T> pojoClass) - { - T object; - try - { - object = pojoClass.newInstance(); - } - catch (Exception e) - { - throw new IllegalArgumentException("Cannot instantiate object of class " + pojoClass, e); - } - - for (String arg: args) - { - int pos = arg.indexOf('='); - if (pos == -1) - { - throw new IllegalArgumentException("Invalid argument '" + arg + "' Argument should be specified in format <name>=<value>"); - } - String name = arg.substring(0, pos); - String value = arg.substring(pos + 1); - - Field field = findField(pojoClass, name); - if (field != null) - { - setField(object, field, value); - } - } - return object; - } - - private Field findField(Class<?> objectClass, String name) - { - Field[] fields = objectClass.getDeclaredFields(); - - Field field = null; - for (int i = 0 ; i< fields.length ; i++) - { - if (fields[i].getName().equals(name) && !Modifier.isFinal(fields[i].getModifiers())) - { - field = fields[i]; - break; - } - } - return field; - } - - private void setField(Object object, Field field, String value) - { - Object convertedValue = convertStringToType(value, field.getType(), field.getName()); - - field.setAccessible(true); - - try - { - field.set(object, convertedValue); - } - catch (IllegalAccessException e) - { - throw new RuntimeException("Cannot access field " + field.getName()); - } - } - - private Object convertStringToType(String value, Class<?> fieldType, String fieldName) - { - Object o; - if (fieldType == String.class) - { - o = value; - } - else if (fieldType == boolean.class) - { - try - { - o = Boolean.parseBoolean(value); - } - catch(Exception e) - { - throw new RuntimeException("Cannot convert to boolean argument " + fieldName); - } - } - else if (fieldType == int.class) - { - try - { - o = Integer.parseInt(value); - } - catch(Exception e) - { - throw new RuntimeException("Cannot convert to int argument " + fieldName); - } - } - else - { - throw new RuntimeException("Unsupported tye " + fieldType + " in " + fieldName); - } - return o; - } - - public void usage(Class<?> objectClass, Set<String> requiredFields) - { - System.out.println("Supported arguments:"); - Field[] fields = objectClass.getDeclaredFields(); - - Object object = null; - try - { - object = objectClass.newInstance(); - } - catch(Exception e) - { - // ignore any - } - - for (int i = 0 ; i< fields.length ; i++) - { - Field field = fields[i]; - if (!Modifier.isFinal(field.getModifiers())) - { - Object defaultValue = null; - try - { - field.setAccessible(true); - defaultValue = field.get(object); - } - catch(Exception e) - { - // ignore any - } - - System.out.println(" " + field.getName() + " ( type: " - + field.getType().getSimpleName().toLowerCase() - + (object != null ? ", default: " + defaultValue : "") - + (requiredFields != null && requiredFields.contains(field.getName()) ? ", mandatory" : "") - + ")"); - } - } - } -} diff --git a/qpid/java/tools/src/main/resources/stress-test-client.properties b/qpid/java/tools/src/main/resources/stress-test-client.properties deleted file mode 100644 index 2ef8c258b4..0000000000 --- a/qpid/java/tools/src/main/resources/stress-test-client.properties +++ /dev/null @@ -1,3 +0,0 @@ -java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
-connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'
-connectionfactory.qpidConnectionfactoryLowPrefetch=amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672?maxprefetch='10''
|
