summaryrefslogtreecommitdiff
path: root/qpid/java/tools
diff options
context:
space:
mode:
authorAlex Rudyy <orudyy@apache.org>2015-04-15 09:47:28 +0000
committerAlex Rudyy <orudyy@apache.org>2015-04-15 09:47:28 +0000
commit0a0baee45ebcff44635907d457c4ff6810b09c87 (patch)
tree8bfb0f9eddbc23cff88af69be80ab3ce7d47011c /qpid/java/tools
parent54aa3d7070da16ce55c28ccad3f7d0871479e461 (diff)
downloadqpid-python-0a0baee45ebcff44635907d457c4ff6810b09c87.tar.gz
QPID-6481: Move java source tree to top level
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1673693 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/tools')
-rw-r--r--qpid/java/tools/README.txt153
-rwxr-xr-xqpid/java/tools/bin/Profile-run-from-source71
-rwxr-xr-xqpid/java/tools/bin/check-qpid-java-env38
-rwxr-xr-xqpid/java/tools/bin/jms-quick-perf-report137
-rwxr-xr-xqpid/java/tools/bin/mercury-controller132
-rwxr-xr-xqpid/java/tools/bin/mercury-start-consumers119
-rwxr-xr-xqpid/java/tools/bin/mercury-start-producers136
-rwxr-xr-xqpid/java/tools/bin/qpid-bench23
-rwxr-xr-xqpid/java/tools/bin/qpid-jms-benchmark316
-rwxr-xr-xqpid/java/tools/bin/qpid-jms-receive193
-rwxr-xr-xqpid/java/tools/bin/qpid-jms-send261
-rwxr-xr-xqpid/java/tools/bin/qpid-python-testkit33
-rwxr-xr-xqpid/java/tools/bin/run-pub28
-rwxr-xr-xqpid/java/tools/bin/run-sub32
-rwxr-xr-xqpid/java/tools/bin/testkit.py278
-rw-r--r--qpid/java/tools/etc/perf-report.gnu61
-rw-r--r--qpid/java/tools/etc/test.log4j28
-rw-r--r--qpid/java/tools/pom.xml88
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java154
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java27
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java216
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java197
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java370
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java123
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/JMXStressTestClient.java329
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java201
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java450
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java190
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java231
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java210
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java450
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java64
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java904
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java205
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java303
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java667
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java446
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java134
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java113
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java167
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java40
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java145
-rw-r--r--qpid/java/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java172
-rw-r--r--qpid/java/tools/src/main/resources/stress-test-client.properties3
44 files changed, 0 insertions, 8638 deletions
diff --git a/qpid/java/tools/README.txt b/qpid/java/tools/README.txt
deleted file mode 100644
index fdde734027..0000000000
--- a/qpid/java/tools/README.txt
+++ /dev/null
@@ -1,153 +0,0 @@
-Introduction
-============
-
-The Test kit for the java client consists of 2 components.
-
-1) A Simple Perf Test that can be used to,
- a) Run a predefined perf report consisting of 8 use cases (see below)
- b) Run a producer and a consumer with a number of different options
-
-2) Soak tests that can be run for longer durations (hours or days).
-
-I am planning to add some stress tests to this module as well.
-Please note this is not a replacement for the existing perf/systests etc.
-But rather a small test kit thats focused on providing a packaged set of tests that can be quickly deployed on an environment to do quick smoke testing or easily setup a soak test.
-
-Table of Contents
-=================
-1. Perf Kit
-2. Soak Kit
-3. Perf Test use cases
-4. Soak Test use cases
-5. Running the sample perf test report
-6. Running the sample soak test report
-
-1.0 Perf Kit
-------------
-1.1 The perf kit can be packaged as an RPM or a tar file and deploy on a target environment and run the perf report.
-Or else a perf report can be automated to run every day or so an record numbers to catch perf regressions.
-
-1.2 It calculates the following results in msg/sec.
-
- System throuhgput : no_of_msgs / (time_last_msg_rcvd - time_first_msg_send)
-
- Producer rate : no_of_msgs / (time_after_sending - time_before_sending)
-
- Producer rate : no_of_msgs / (time_last_msg_rcvd - time_first_msg_rcvd)
-
- Latency : time_msg_rcvd - time_msg_sent
-
-The test will print min, max and avg latency.
-
-1.3 The test assume that both producer and consumer are run on the same machine or different machines that are time synced.
-
-1.4 You can also use run_sub.sh and run_pub.sh to run different use cases with several options.
- Please look at TestParams.java for all the configurable options.
-
-1.5 You can also use the test kit to benchmark against any vendor.
-
-
-2.0 Soak tests
---------------
-2.0 This includes a set of soak tests that can be run for a longer duration.
-
-2.1 A typical test will send x-1 messages and the xth message will contain an "End" marker.
- The producer will print the timestamp as soon as it sends the xth message.
- The consumer will reply with an empty message to the replyTo destination given in the xth message.
- The consumer prints the throuhgput for the iteration and the latency for the xth message.
- A typical value for x is 100k
-
-2.2 The feedback loop prevents the producer from overrunning the consumer.
- And the printout for every xth message will let you know how many iterations been completed at any given time.
- (Ex a simple cat log | wc -l will give you the how many iterations have been completed so far).
-
-2.2 The following results can be calculated for these tests.
-
- Memory, CPU for each producer/consumer - look at testkit/bin/run_soak_client.sh for an example
-
- You can find the Avg, Min & Max for throughput, latency, CPU and memory for the entire test run.
- (look at testkit/bin/soak_report.sh) for an example).
-
- You could also graph throughput, latency, CPU and memory using the comma separated log files.
-
-2.2 If you use different machines for producer and consumer the machines have to be time synced to have meaningful latency samples.
-
-3.0 Perf Test report use cases
--------------------------------
-3.1 Please check testkit/bin/perf_report.sh for more details
-
-3.2 A typical test run will send 1000 msgs during warmup and 200k msgs for result calculation.
-
-Test 1 Trans Queue
-
-Test 2 Dura Queue
-
-Test 3 Dura Queue Sync
-
-Test 4 Topic
-
-Test 5 Durable Topic
-
-Test 6 Fanout
-
-Test 7 Small TX (about 2 msgs per tx)
-
-Test 8 Large TX (about 1000 msgs per tx)
-
-
-4.0 Soak tests use cases
--------------------------
-4.1 Following are the current tests available in the test kit.
-
-4.2 Please refer to the source to see the javadoc and options
-
-
-1. SimpleProducer/Consumer sends X messages at a time and will wait for confirmation from producer before proceeding with the next iteration. A no of options can be configured.
-
-2. MultiThreadedProducer/Consumer does the same thing as above but runs each session in a separate thread.
- It can also send messages transactionally. Again a no of options can be configured.
-
-3. ResourceLeakTest will setup consumer/producers sends x messages and then teard down everything and continue again.
-
-
-5.0 Running the sample perf test report
----------------------------------------
-The testkit/bin contains perf_report.sh.
-It runs the above 8 use cases against a broker and print the results in a tabular format.
-
-For example
-================================================================================================
-|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|
-------------------------------------------------------------------------------------------------
-|Trans_Queue | xxxxx.xx| xxxxx.xx| xxxxx.xx| xx.xx| x| xx|
-
-
-5.1 running perf_report.sh
-
-5.1.1 set JAVA_HOME to point to Java 1.5 and above
-5.1.2 set QPID_TEST_HOME to point to the testkit dir
-5.1.3 set VENDOR_LIB to point to the Qpid (or other JMS providers) jar files.
-5.1.4 start a broker
-5.1.5 update the testkit/etc/jndi.properties to point to the correct broker
-5.1.6 execute perf_report.sh
-
-
-6.0 Running the sample soak test report
----------------------------------------
-The testkit/bin contains soak_report.sh
-It runs MultiThreadedProducer/Consumer for the duration specified and prints a report for the following stats.
-Avg, Min and Max for System Throughput, letency, CPU and memory.
-
-6.1 running soak_report.sh
-
-5.1.1 set JAVA_HOME to point to Java 1.5 and above
-5.1.2 set QPID_TEST_HOME to point to the testkit dir
-5.1.3 set JAR_PATH to point to the Qpid jars
-5.1.4 start a broker
-5.1.5 execute soak_report.sh with correct params.
- Ex sh soak_report.sh 1 36000 will run for 10 hours colllecting CPU, memory every second.
-
-5.1.6 Please note the total duration for the test is log_freq * log_iterations
- So if you want to run the test for 10 hours and collect 10 second samples then do the following
- sh soak_report.sh 10 3600
-
diff --git a/qpid/java/tools/bin/Profile-run-from-source b/qpid/java/tools/bin/Profile-run-from-source
deleted file mode 100755
index 179c365450..0000000000
--- a/qpid/java/tools/bin/Profile-run-from-source
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# Sets the environment for running the scripts from a source checkout.
-txtbld=$(tput bold) # Bold
-txtrst=$(tput sgr0) # Reset
-txtred=$(tput setaf 1) # red
-txtgreen=$(tput setaf 2) # green
-
-echo "${txtbld}Setting the environment to run qpid java tools from a source checkout${txtrst}"
-
-abs_path()
-{
- D=`dirname "$1"`
- echo "`cd \"$D\" 2>/dev/null && pwd`"
-}
-
-export QPID_CHECKOUT=`abs_path "../../../../"`
-echo "${txtgreen}Using source checkout at $QPID_CHECKOUT${txtrst}"
-
-export PATH=$QPID_CHECKOUT/java/tools/bin:$PATH
-
-if [ "$JAVA" = "" ] ; then
- export JAVA=$(which java)
-fi
-
-#------------- Required for perf_report, qpid-bench & qpid-python-testkit ----------------
-
-export VENDOR_LIB=$QPID_CHECKOUT/java/build/lib
-export CLASSPATH=`find $VENDOR_LIB -name '*.jar' | tr '\n' ':'`
-export LOG_CONFIG="-Dlog4j.configuration=file:///$QPID_CHECKOUT/java/tools/etc/test.log4j"
-
-
-#------------- Required for qpid-python-testkit -----------------------------------------
-
-PYTHONPATH=$QPID_CHECKOUT/python:$QPID_CHECKOUT/cpp/src/test/brokertest.py:$PYTHONPATH
-export PATH=$QPID_CHECKOUT/python:$PATH
-
-if [ -x $QPID_CHECKOUT/cpp/src/qpidd ]; then
- QPIDD_EXEC=$QPID_CHECKOUT/cpp/src/qpidd
-else
- echo "${txtred}WARNING: Qpid CPP broker executable not found. You will not be able to run qpid-python-testkit${txtrst}"
-fi
-
-if [ -x $QPID_CHECKOUT/cpp/src/.libs/cluster.so ]; then
- CLUSTER_LIB=$QPID_CHECKOUT/cpp/src/.libs/cluster.so
-else
- echo "${txtred}WARNING: Qpid cluster.so not found.You will not be able to run qpid-python-testkit${txtrst}"
-fi
-
-if [ "$STORE_LIB" = "" ] ; then
- echo "${txtred}WARNING: Please point the STORE_LIB variable to the message store module. If not persistence tests will not write messages to disk.${txtrst}"
-fi
-
-export PYTHONPATH QPIDD_EXEC CLUSTER_LIB
diff --git a/qpid/java/tools/bin/check-qpid-java-env b/qpid/java/tools/bin/check-qpid-java-env
deleted file mode 100755
index da67e50a90..0000000000
--- a/qpid/java/tools/bin/check-qpid-java-env
+++ /dev/null
@@ -1,38 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-if [ -z "$LOG_CONFIG" ]; then
- echo "Please set the appropriate parameters for logging as it may affect performance. Ex log4j defaults to DEBUG if not configured properly"
- exit -1
-fi
-
-if [ -z "$JAVA_MEM" ]; then
- JAVA_MEM=-Xmx1024m
-fi
-
-if [ -z "$JAVA" ]; then
- echo "Please set the path to the correct java executable to JAVA"
- exit -1
-fi
-
-if [ -z "$CLASSPATH" ]; then
- echo "Please set the $CLASSPATH variable to point to the jar/class files"
- exit -1
-fi
diff --git a/qpid/java/tools/bin/jms-quick-perf-report b/qpid/java/tools/bin/jms-quick-perf-report
deleted file mode 100755
index 7de3f2b602..0000000000
--- a/qpid/java/tools/bin/jms-quick-perf-report
+++ /dev/null
@@ -1,137 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This will run the following test cases defined below and produce
-# a report in tabular format.
-
-QUEUE="queue;{create:always,node:{x-declare:{auto-delete:true}}}"
-DURA_QUEUE="dqueue;{create:always,node:{durable:true,x-declare:{auto-delete:true}}}"
-TOPIC="amq.topic/test"
-DURA_TOPIC="amq.topic/test;{create:always,link:{durable:true}}"
-
-COMMON_CONFIG="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"
-
-waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
-cleanup()
-{
- pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
- if [ "$pids" != "" ]; then
- kill -3 $pids
- kill -9 $pids >/dev/null 2>&1
- fi
-}
-
-# $1 test name
-# $2 consumer options
-# $3 producer options
-run_testcase()
-{
- sh run-sub $COMMON_CONFIG $2 > sub.out &
- sh run-pub $COMMON_CONFIG $3 > pub.out &
- waitfor pub.out "Controller: Completed the test"
- sleep 2 #give a grace period to shutdown
- print_result $1
- mv pub.out $1.pub.out
- mv sub.out $1.sub.out
-}
-
-print_result()
-{
- prod_rate=`cat pub.out | grep "Avg Producer rate" | awk '{print $5}'`
- sys_rate=`cat pub.out | grep "System Throughput" | awk '{print $4}'`
- cons_rate=`cat pub.out | grep "Avg Consumer rate" | awk '{print $5}'`
- avg_latency=`cat pub.out | grep "Avg System Latency" | awk '{print $5}'`
- min_latency=`cat pub.out | grep "Min System Latency" | awk '{print $5}'`
- max_latency=`cat pub.out | grep "Max System Latency" | awk '{print $5}'`
-
- printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
- echo "------------------------------------------------------------------------------------------------"
-}
-
-trap cleanup EXIT
-rm -rf *.out #cleanup old files.
-
-echo "Test report on " `date +%F`
-echo "================================================================================================"
-echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|"
-echo "------------------------------------------------------------------------------------------------"
-
-# The message counts and warmup counts are set to very low values for quick testing of the script.
-# For a real performance run I recommend setting warmup count to 10k and message count in excess of 100k
-# However for transactions, sync_publish and especially small durable transactions (which is quite slow) I recommend
-# setting very low values to start with and experiment while increasing them slowly.
-
-# Test 1 Trans Queue
-run_testcase "Trans_Queue" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 2 Dura Queue
-run_testcase "Dura_Queue" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 3 Dura Queue Sync
-run_testcase "Dura_Queue_Sync" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
-
-# Test 4 Dura Queue Sync Publish and Ack
-run_testcase "Dura_SyncPubAck" "-Daddress=$DURA_QUEUE -Ddurable=true -Dsync_ack=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_publish=persistent"
-
-# Test 5 Topic
-run_testcase "Topic" "-Daddress=$TOPIC" "-Daddress=$TOPIC -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 6 Durable Topic
-run_testcase "Dura_Topic" "-Daddress=$DURA_TOPIC -Ddurable=true" "-Daddress=$DURA_TOPIC -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 7 Fanout
-run_testcase "Fanout" "-Daddress=amq.fanout" "-Daddress=amq.fanout -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 8 Small TX
-run_testcase "Small_Txs_2" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=1" \
- "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1"
-
-# Test 9 Large TX
-run_testcase "Large_Txs_1000" "-Daddress=$DURA_QUEUE -Ddurable=true -Dtransacted=true -Dtrans_size=10" \
- "-Daddress=$DURA_QUEUE -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10"
-
-# Test 10 256 MSG
-run_testcase "Msg_256b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=256 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 11 512 MSG
-run_testcase "Msg_512b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=512 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 12 2048 MSG
-run_testcase "Msg_2048b" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Dmsg_size=2048 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 13 Random size MSG
-run_testcase "Random_Msg_Size" "-Daddress=$QUEUE" "-Daddress=$QUEUE -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 14 Random size MSG Durable
-run_testcase "Rand_Msg_Dura" "-Daddress=$DURA_QUEUE -Ddurable=true" "-Daddress=$DURA_QUEUE -Ddurable=true -Drandom_msg_size=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 15 64K MSG
-run_testcase "Msg_64K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 16 Durable 64K MSG
-run_testcase "Msg_Durable_64K" "-Daddress=$DURA_QUEUE -Ddurable=true -Damqj.tcpNoDelay=true" \
- "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=64000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 17 500K MSG
-run_testcase "Msg_500K" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true" "-Daddress=$QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Dwarmup_count=1 -Dmsg_count=10"
-
-# Test 18 Durable 500K MSG
-run_testcase "Msg_Dura_500K" "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Ddurable=true" \
- "-Daddress=$DURA_QUEUE -Damqj.tcpNoDelay=true -Dmsg_size=500000 -Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
diff --git a/qpid/java/tools/bin/mercury-controller b/qpid/java/tools/bin/mercury-controller
deleted file mode 100755
index fab8614039..0000000000
--- a/qpid/java/tools/bin/mercury-controller
+++ /dev/null
@@ -1,132 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This starts the controller for coordinating perf tests/
-
-. check-qpid-java-env
-
-PROGRAM_NAME=controller
-CONSUMER_COUNT=1
-PRODUCER_COUNT=1
-DURATION=-1
-TEST_NAME="TEST_NAME"
-EXTRA_JVM_ARGS=""
-
-TEMP=$(getopt -n $PROGRAM_NAME -o c:p:d:n:a:h --long consumers:,producers:,jvm-args:help -- "$@")
-
-usage()
-{
- printf "\n%s\n" "Usage: controller [option].."
-
- printf "\n%31s\n%52s\n" "-c, --consumer-count=count" "No of consumers participating in the test"
-
- printf "\n%31s\n%52s\n" "-p, --producer-count=count" "No of producers participating in the test"
-
- printf "\n%24s\n%94s\n" "-d, --duration=mins" "The duration of the test in mins. If not specified, it will just run one iteration."
-
- printf "\n%27s\n%32s\n" "-n, --name=<test-name>" "The name of the test."
-
- printf "\n%19s\n%50s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify"
-}
-
-eval set -- "$TEMP"
-while true; do
- case $1 in
- -c|--consumer-count)
- CONSUMER_COUNT="$2"; shift; shift; continue
- ;;
- -p|--producer-count)
- PRODUCER_COUNT="$2"; shift; shift; continue
- ;;
- -d|--duration)
- DURATION="$2"; shift; shift; continue
- ;;
- -n|--name)
- TEST_NAME="$2"; shift; shift; continue
- ;;
- -h|--help)
- usage
- exit 0
- ;;
- -a|--jvm-args)
- EXTRA_JVM_ARGS="$2"; shift; shift; continue
- ;;
- --)
- # no more arguments to parse
- break
- ;;
- *)
- # no more arguments to parse
- break
- ;;
- esac
-done
-
-CONTROLLER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dprod_count=$PRODUCER_COUNT -Dcons_count=$CONSUMER_COUNT -Dprint_std_dev=true -Dduration=${DURATION}"
-
-
-waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
-cleanup()
-{
- pids=`ps aux | grep java | grep PerfTestController | awk '{print $2}'`
- if [ "$pids" != "" ]; then
- kill -3 $pids
- kill -9 $pids >/dev/null 2>&1
- fi
-}
-
-run_controller()
-{
- TEST_ARGS="$LOG_CONFIG $JAVA_MEM $CONTROLLER_ARGS $EXTRA_JVM_ARGS"
- echo "Running controller with : $TEST_ARGS" > test.out
- $JAVA -cp $CLASSPATH $TEST_ARGS org.apache.qpid.tools.PerfTestController >> test.out &
- waitfor test.out "Controller: Completed the test"
- sleep 2 #give a grace period to shutdown
- print_result $TEST_NAME
-}
-
-print_result()
-{
- prod_rate=`cat test.out | grep "Avg Producer rate" | awk '{print $5}'`
- sys_rate=`cat test.out | grep "System Throughput" | awk '{print $4}'`
- cons_rate=`cat test.out | grep "Avg Consumer rate" | awk '{print $5}'`
- avg_latency=`cat test.out | grep "Avg System Latency" | awk '{print $5}'`
- min_latency=`cat test.out | grep "Min System Latency" | awk '{print $5}'`
- max_latency=`cat test.out | grep "Max System Latency" | awk '{print $5}'`
- std_dev=`cat test.out | grep "Avg System Std Dev" | awk '{print $6}'`
-
- printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11.2f|%11.2f|%7.2f|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency $std_dev
- echo "--------------------------------------------------------------------------------------------------------"
-}
-
-trap cleanup EXIT
-
-rm -rf *.out
-
-if [ "$DURATION" = -1 ]; then
- echo "Test report on " `date +%F`
- echo "========================================================================================================"
- echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|Std Dev|"
- echo "--------------------------------------------------------------------------------------------------------"
-else
- echo "Test in progress....Tail stats-csv.log to see results being printed for each iteration."
-fi
-
-run_controller
diff --git a/qpid/java/tools/bin/mercury-start-consumers b/qpid/java/tools/bin/mercury-start-consumers
deleted file mode 100755
index c71fc0c21f..0000000000
--- a/qpid/java/tools/bin/mercury-start-consumers
+++ /dev/null
@@ -1,119 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This starts the controller for coordinating perf tests/
-
-. check-qpid-java-env
-
-PROGRAM_NAME="start-consumers"
-PROCESS_COUNT=1
-CON_COUNT=1
-MSG_COUNT=10000
-ADDRESS="queue;{create:always}"
-UNIQUE_DEST="false"
-
-EXTRA_JVM_ARGS=" -Dmax_prefetch=500 "
-
-TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'`
-
-TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\
- --long connection-count:,process-count:,create-unique-queues-topics,\
-jvm-args:,queue:,topic:,address:,\
-msg-count:,help -- "$@")
-
-usage()
-{
- printf "\n%s\n" "Usage: start-producers [option].."
-
- printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test"
-
- printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test"
-
- printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic"
-
- printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue"
-
- printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic"
-
- printf "\n%13s\n%44s\n" "--address" "The address you want to publish to"
-
- printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)"
-
- printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify"
-}
-
-eval set -- "$TEMP"
-while true; do
- case $1 in
- -C|--connection-count)
- CON_COUNT="$2"; shift; shift; continue
- ;;
- -P|--process-count)
- PROCESS_COUNT="$2"; shift; shift; continue
- ;;
- -u|--create-unique-queues-topics)
- UNIQUE_DEST="true"; shift; continue
- ;;
- --queue)
- ADDRESS="$2;{create: always}"; shift; shift; continue
- ;;
- --topic)
- ADDRESS="amq.topic/$2"; shift; shift; continue
- ;;
- --address)
- ADDRESS="$2"; shift; shift; continue
- ;;
- -h|--help)
- usage
- exit 0
- ;;
- -a|--jvm-args)
- EXTRA_JVM_ARGS="$2"; shift; shift; continue
- ;;
- -c|--msg-count)
- MSG_COUNT="$2"; shift; shift; continue
- ;;
- --)
- # no more arguments to parse
- break
- ;;
- *)
- # no more arguments to parse
- break
- ;;
- esac
-done
-
-CONSUMER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dprecision=mili -Dcon_count=$CON_COUNT -Dprint_std_dev=true"
-
-start_consumers()
-{
- for ((i=0; i<$PROCESS_COUNT; i++))
- do
- if [ "$UNIQUE_DEST" = "true" ]; then
- sh run-sub "$CONSUMER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.sub.out 2>&1 &
- else
- sh run-sub "$CONSUMER_ARGS $@" > ${TEST_ID}_$i.sub.out 2>&1 &
- fi
- done
-}
-
-start_consumers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS"
-
diff --git a/qpid/java/tools/bin/mercury-start-producers b/qpid/java/tools/bin/mercury-start-producers
deleted file mode 100755
index 7ba0286f7c..0000000000
--- a/qpid/java/tools/bin/mercury-start-producers
+++ /dev/null
@@ -1,136 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This starts the controller for coordinating perf tests/
-
-. check-qpid-java-env
-
-PROGRAM_NAME="start-producers"
-PROCESS_COUNT=1
-CON_COUNT=1
-MSG_TYPE="bytes"
-WARMUP_MSG_COUNT=1000
-MSG_COUNT=10000
-MSG_SIZE=1024
-ADDRESS="queue;{create:always}"
-UNIQUE_DEST="false"
-
-EXTRA_JVM_ARGS=""
-TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'`
-
-TEMP=$(getopt -n $PROGRAM_NAME -o C:P:uc:p:a:s:t:w:h\
- --long connection-count:,process-count:,create-unique-queues-topics,\
-jvm-args:,queue:,topic:,address:,\
-msg-count:,msg-size:msg-type:,warmup-msg-count,help -- "$@")
-
-usage()
-{
- printf "\n%s\n" "Usage: start-producers [option].."
-
- printf "\n%32s\n%51s\n" "-C, --connection-count=count" "No of consumers participating in the test"
-
- printf "\n%29s\n%51s\n" "-P, --process-count=count" "No of producers participating in the test"
-
- printf "\n%37s\n%105s\n" "-u, --create-unique-queues-topics" "This will create unique queue names and topics based on what you specify for --queue or --topic"
-
- printf "\n%11s\n%55s\n" "--queue" "The Queue you want to publish to. Ex my-queue"
-
- printf "\n%11s\n%84s\n" "--topic" "The Topic you want to publish to in amq.topic exchange. Ex amq.topic/topic"
-
- printf "\n%13s\n%44s\n" "--address" "The address you want to publish to"
-
- printf "\n%23s\n%37s\n" "-s, --msg-size=size" "message size (default 1024)"
-
- printf "\n%25s\n%50s\n" "-c, --msg-count=count" "message count per test (default 500,000)"
-
- printf "\n%18s\n%38s\n" "-t, --msg-type" "{bytes|text} (default bytes)"
-
- printf "\n%26s\n%49s\n" "-w, --warmup-msg-count" "warm up message count (default 100,000)"
-
- printf "\n%18s\n%49s\n" "-a, --jvm-args" "Extra jvm arguments you want to specify"
-}
-
-eval set -- "$TEMP"
-while true; do
- case $1 in
- -C|--connection-count)
- CON_COUNT="$2"; shift; shift; continue
- ;;
- -P|--process-count)
- PROCESS_COUNT="$2"; shift; shift; continue
- ;;
- -u|--create-unique-queues-topics)
- UNIQUE_DEST="true"; shift; continue
- ;;
- --queue)
- ADDRESS="$2;{create: always}"; shift; shift; continue
- ;;
- --topic)
- ADDRESS="amq.topic/$2"; shift; shift; continue
- ;;
- --address)
- ADDRESS="$2"; shift; shift; continue
- ;;
- -h|--help)
- usage
- exit 0
- ;;
- -a|--jvm-args)
- EXTRA_JVM_ARGS="$2"; shift; shift; continue
- ;;
- -s|--msg-size)
- MSG_SIZE="$2"; shift; shift; continue
- ;;
- -c|--msg-count)
- MSG_COUNT="$2"; shift; shift; continue
- ;;
- -t|--msg_type)
- MSG_TYPE="$2"; shift; shift; continue
- ;;
- -w|--warmup-msg-count)
- WARMUP_MSG_COUNT="$2"; shift; shift; continue
- ;;
- --)
- # no more arguments to parse
- break
- ;;
- *)
- # no more arguments to parse
- break
- ;;
- esac
-done
-
-PRODUCER_ARGS="-server -Durl=amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672' -Dext_controller=true -Dprecision=mili -Dcon_count=$CON_COUNT"
-
-start_producers()
-{
- for ((i=0; i<$PROCESS_COUNT; i++))
- do
- if [ "$UNIQUE_DEST" = "true" ]; then
- sh run-pub "$PRODUCER_ARGS $@" "${TEST_ID}_$i" > ${TEST_ID}_$i.pub.out 2>&1 &
- else
- sh run-pub "$PRODUCER_ARGS $@" > ${TEST_ID}_$i.pub.out 2>&1 &
- fi
- done
-}
-
-start_producers "-Daddress=$ADDRESS -Duse_unique_dest=$UNIQUE_DEST -Dmsg_count=$MSG_COUNT -Dmsg_size=$MSG_SIZE -Dwarmup_count=$WARMUP_MSG_COUNT -Dmsg_type=$MSG_TYPE -Dcon_count=$CON_COUNT $EXTRA_JVM_ARGS"
-
diff --git a/qpid/java/tools/bin/qpid-bench b/qpid/java/tools/bin/qpid-bench
deleted file mode 100755
index 4773320b9e..0000000000
--- a/qpid/java/tools/bin/qpid-bench
+++ /dev/null
@@ -1,23 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-. check-qpid-java-env
-
-$JAVA -cp $CLASSPATH -server $JAVA_MEM $LOG_CONFIG org.apache.qpid.tools.QpidBench "$@"
diff --git a/qpid/java/tools/bin/qpid-jms-benchmark b/qpid/java/tools/bin/qpid-jms-benchmark
deleted file mode 100755
index 3d712a27dc..0000000000
--- a/qpid/java/tools/bin/qpid-jms-benchmark
+++ /dev/null
@@ -1,316 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import optparse, time, qpid.messaging, re
-from threading import Thread
-from subprocess import Popen, PIPE, STDOUT
-
-op = optparse.OptionParser(usage="usage: %prog [options]",
- description="simple performance benchmarks")
-op.add_option("-b", "--broker", default=[], action="append", type="str",
- help="url of broker(s) to connect to, round robin on multiple brokers")
-op.add_option("-c", "--client-host", default=[], action="append", type="str",
- help="host(s) to run clients on via ssh, round robin on mulple hosts")
-op.add_option("-q", "--queues", default=1, type="int", metavar="N",
- help="create N queues (default %default)")
-op.add_option("-s", "--senders", default=1, type="int", metavar="N",
- help="start N senders per queue (default %default)")
-op.add_option("-r", "--receivers", default=1, type="int", metavar="N",
- help="start N receivers per queue (default %default)")
-op.add_option("-m", "--messages", default=100000, type="int", metavar="N",
- help="send N messages per sender (default %default)")
-op.add_option("--queue-name", default="benchmark", metavar="NAME",
- help="base name for queues (default %default)")
-op.add_option("--send-rate", default=0, metavar="N",
- help="send rate limited to N messages/second, 0 means no limit (default %default)")
-op.add_option("--receive-rate", default=0, metavar="N",
- help="receive rate limited to N messages/second, 0 means no limit (default %default)")
-op.add_option("--content-size", default=1024, type="int", metavar="BYTES",
- help="message size in bytes (default %default)")
-op.add_option("--ack-frequency", default=100, metavar="N", type="int",
- help="receiver ack's every N messages, 0 means unconfirmed (default %default)")
-op.add_option("--no-report-header", dest="report_header", default=True,
- action="store_false", help="don't print header on report")
-op.add_option("--summarize", default=False, action="store_true",
- help="print summary statistics for multiple senders/receivers: total throughput, average latency")
-op.add_option("--repeat", default=1, metavar="N", help="repeat N times", type="int")
-op.add_option("--send-option", default=[], action="append", type="str",
- help="Additional option for sending addresses")
-op.add_option("--receive-option", default=[], action="append", type="str",
- help="Additional option for receiving addresses")
-op.add_option("--create-option", default=[], action="append", type="str",
- help="Additional option for creating addresses")
-op.add_option("--send-arg", default=[], action="append", type="str",
- help="Additional argument for qpid-send")
-op.add_option("--receive-arg", default=[], action="append", type="str",
- help="Additional argument for qpid-receive")
-op.add_option("--no-timestamp", dest="timestamp", default=True,
- action="store_false", help="don't add a timestamp, no latency results")
-op.add_option("--sequence", dest="sequence", default=False,
- action="store_true", help="add a sequence number to each message")
-op.add_option("--connection-options", type="str",
- help="Connection options for senders & receivers")
-op.add_option("--durable", default=False, action="store_true",
- help="Use durable queues and messages")
-op.add_option("--save-received", default=False, action="store_true",
- help="Save received message content to files <queuename>-receiver-<n>.msg")
-op.add_option("--verbose", default=False, action="store_true",
- help="Show commands executed")
-op.add_option("--no-delete", default=False, action="store_true",
- help="Don't delete the test queues.")
-op.add_option("--fill-drain", default=False, action="store_true",
- help="First fill the queues, then drain them")
-
-single_quote_re = re.compile("'")
-def posix_quote(string):
- """ Quote a string for use as an argument in a posix shell"""
- return "'" + single_quote_re.sub("\\'", string) + "'";
-
-def ssh_command(host, command):
- """ Convert command into an ssh command on host with quoting"""
- return ["ssh", host] + [posix_quote(arg) for arg in command]
-
-class Clients:
- def __init__(self): self.clients=[]
-
- def add(self, client):
- self.clients.append(client)
- return client
-
- def kill(self):
- for c in self.clients:
- try: c.kill()
- except: pass
-
-class PopenCommand(Popen):
- """Like Popen but you can query for the command"""
- def __init__(self, command, *args, **kwargs):
- self.command = command
- Popen.__init__(self, command, *args, **kwargs)
-
-clients = Clients()
-
-def start_receive(queue, index, opts, ready_queue, broker, host):
- address_opts=opts.receive_option
- if opts.durable: address_opts += ["node:{durable:true}"]
- address="%s;{%s}"%(queue,",".join(address_opts))
- msg_total=opts.senders*opts.messages
- messages = msg_total/opts.receivers;
- if (index < msg_total%opts.receivers): messages += 1
- if (messages == 0): return None
- command = ["qpid-jms-receive",
- #"-b", broker,
- "--ready-address", "benchmark-ready;{create:always}",
- "-a", address,
- "-m", str(messages),
- "--forever",
- "--print-content=no",
- # "--receive-rate", str(opts.receive_rate),
- "--report-total",
- "--ack-frequency", str(opts.ack_frequency),
- # "--ready-address", "%s;{create:always}"%ready_queue,
- "--report-header=no -v"
- ]
- if opts.save_received:
- command += ["--save-content=%s-receiver-%s.msg"%(queue,index)]
- command += opts.receive_arg
- if opts.connection_options:
- command += ["--connection-options",opts.connection_options]
- if host: command = ssh_command(host, command)
- if opts.verbose: print "Receiver: ", command
- return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
-
-def start_send(queue, opts, broker, host):
- address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
- command = ["qpid-jms-send",
- #"-b", broker,
- "-a", address,
- "--messages", str(opts.messages),
- "--content-size", str(opts.content_size),
- "--send-rate", str(opts.send_rate),
- "--report-total",
- "--report-header=no",
- "--timestamp=%s"%(opts.timestamp and "yes" or "no"),
- "--sequence=%s"%(opts.sequence and "yes" or "no"),
- "--durable", str(opts.durable)
- ]
- command += opts.send_arg
- if opts.connection_options:
- command += ["--connection-options",opts.connection_options]
- if host: command = ssh_command(host, command)
- if opts.verbose: print "Sender: ", command
- return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
-
-def error_msg(out, err):
- return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err))
-
-def first_line(p):
- out,err=p.communicate()
- if p.returncode != 0:
- raise Exception("Process exit %d: %s"%(p.returncode, error_msg(out,err)))
-
- print str(out)
- print str(err)
- return out.split("\n")[0]
-
-def recreate_queues(queues, brokers, no_delete, opts):
- c = qpid.messaging.Connection(brokers[0])
- c.open()
- s = c.session()
- for q in queues:
- if not no_delete:
- try: s.sender("%s;{delete:always}"%(q)).close()
- except qpid.messaging.exceptions.NotFound: pass
- address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"]))
- if opts.verbose: print "Creating", address
- s.sender(address)
- c.close()
-
-def print_header(timestamp):
- if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp"
- else: latency_header=""
- print "send-tp\trecv-tp%s"%latency_header
-
-def parse(parser, lines): # Parse sender/receiver output
- return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
-
-def parse_senders(senders):
- return parse([int],[first_line(p) for p in senders])
-
-def parse_receivers(receivers):
- return parse([int,float,float,float],[first_line(p) for p in receivers if p])
-
-def print_data(send_stats, recv_stats, total_tp):
- for send,recv in map(None, send_stats, recv_stats):
- line=""
- if send: line += "%d"%send[0]
- if recv:
- line += "\t%d"%recv[0]
- if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
- if total_tp is not None:
- line += "\t%d"%total_tp
- total_tp = None
- print line
-
-def print_summary(send_stats, recv_stats, total_tp):
- def avg(s): sum(s) / len(s)
- send_tp = sum([l[0] for l in send_stats])
- recv_tp = sum([l[0] for l in recv_stats])
- summary = "%d\t%d"%(send_tp, recv_tp)
- if recv_stats and len(recv_stats[0]) == 4:
- l_min = sum(l[1] for l in recv_stats)/len(recv_stats)
- l_max = sum(l[2] for l in recv_stats)/len(recv_stats)
- l_avg = sum(l[3] for l in recv_stats)/len(recv_stats)
- summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg)
- summary += "\t%d"%total_tp
- print summary
-
-
-class ReadyReceiver:
- """A receiver for ready messages"""
- def __init__(self, queue, broker):
- self.connection = qpid.messaging.Connection(broker)
- self.connection.open()
- self.receiver = self.connection.session().receiver(
- "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
- self.receiver.session.sync()
- self.timeout=10
-
- def wait(self, receivers):
- try:
- for i in receivers: self.receiver.fetch(self.timeout)
- self.connection.close()
- except qpid.messaging.Empty:
- for r in receivers:
- if (r.poll() is not None):
- out,err=r.communicate()
- raise Exception("Receiver error: %s\n%s" %
- (" ".join(r.command), error_msg(out,err)))
- raise Exception("Timed out waiting for receivers to be ready")
-
-def flatten(l):
- return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), [])
-
-class RoundRobin:
- def __init__(self,items):
- self.items = items
- self.index = 0
-
- def next(self):
- if not self.items: return None
- ret = self.items[self.index]
- self.index = (self.index+1)%len(self.items)
- return ret
-
-def main():
- opts, args = op.parse_args()
- opts.client_host = flatten(opts.client_host)
- if not opts.broker:
- if opts.client_host:
- raise Exception("--broker must be specified if --client_host is.")
- opts.broker = ["127.0.0.1"] # Deafult to local broker
- opts.broker = flatten(opts.broker)
- brokers = RoundRobin(opts.broker)
- client_hosts = RoundRobin(opts.client_host)
- send_out = ""
- receive_out = ""
- ready_queue="%s-ready"%(opts.queue_name)
- queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
- try:
- for i in xrange(opts.repeat):
- recreate_queues(queues, opts.broker, opts.no_delete, opts)
- ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
-
- def start_receivers():
- return [ start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.receivers) ]
-
-
- def start_senders():
- return [ start_send(q, opts,brokers.next(), client_hosts.next())
- for q in queues for j in xrange(opts.senders) ]
-
- if opts.report_header and i == 0: print_header(opts.timestamp)
-
- if opts.fill_drain:
- # First fill the queues, then drain them
- start = time.time()
- senders = start_senders()
- for p in senders: p.wait()
- receivers = start_receivers()
- for p in receivers: p.wait()
- else:
- # Run senders and receivers in parallel
- receivers = start_receivers()
- ready_receiver.wait(filter(None, receivers)) # Wait for receivers ready
- start = time.time()
- senders = start_senders()
- for p in senders + receivers: p.wait()
-
- total_sent = opts.queues * opts.senders * opts.messages
- total_tp = total_sent / (time.time()-start)
- #send_stats=parse_senders(senders)
- recv_stats=parse_receivers(receivers)
- #if opts.summarize: print_summary(send_stats, recv_stats, total_tp)
- #else: print_data(send_stats, recv_stats, total_tp)
- finally: clients.kill() # No strays
-
-if __name__ == "__main__": main()
-
diff --git a/qpid/java/tools/bin/qpid-jms-receive b/qpid/java/tools/bin/qpid-jms-receive
deleted file mode 100755
index 57abe874ff..0000000000
--- a/qpid/java/tools/bin/qpid-jms-receive
+++ /dev/null
@@ -1,193 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This starts the controller for coordinating perf tests/
-
-. check-qpid-java-env
-
-PROGRAM_NAME="qpid-jms-receive"
-URL="amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"
-ADDRESS="queue;{create:always}"
-TIMEOUT="0"
-FOREVER="false"
-MESSAGES="1"
-IGNORE_DUPLICATES="false"
-CHECK_REDELIVERED="false"
-CAPACITY="1000"
-ACK_FREQUENCY="100"
-TX="0"
-ROLLBACL_FREQUENCY="0"
-PRINT_CONTENT="false"
-PRINT_HEADERS="false"
-REPORT_TOTAL="false"
-REPORT_EVERY="0"
-REPORT_HEADER="true"
-READY_ADDRES="''"
-EXTRA_JVM_ARGS=""
-VERBOSE="0"
-
-TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'`
-
-TEMP=$(getopt -n $PROGRAM_NAME -o b:a:f:m:vh\
- --long broker:,address:,timeout:,forever\
-,messages:,ignore-duplicates,check-redelivered\
-,capacity:,ack-frequency:,tx:,rollback-frequency:\
-,print-content:,print-headers:,report-total\
-,report-every:,report-header:,ready-address:\
-,jvm-args:,verbose,help -- "$@")
-
-# padding the option string with 4 spaces
-# padding the desc string with 30 spaces
-usage()
-{
- printf "\n%s\n" "Usage: $PROGRAM_NAME [option].."
-
- printf "\n%20s\n%57s\n" "-b, --broker URL" "url of broker to connect to"
-
- printf "\n%24s\n%53s\n" "-a,--address ADDRESS" "address to receive from"
-
- printf "\n%25s\n%71s\n" "--timeout TIMEOUT (0)" "timeout in seconds to wait before exiting"
-
- printf "\n%17s\n%61s\n" "-f, --forever" "ignore timeout and wait forever"
-
- printf "\n%24s\n%89s\n" "-m, --messages N (0)" "Number of messages to receive; 0 means receive indefinitely"
-
- printf "\n%23s\n%84s\n" "--ignore-duplicates" "Detect and ignore duplicates (by checking 'sn' header)"
-
- printf "\n%23s\n%82s\n%92s\n" "--check-redelivered" "Fails with exception if a duplicate is not marked as" " redelivered (only relevant when ignore-duplicates is selected)"
-
- printf "\n%23s\n%71s\n" "--capacity N (1000)" "Pre-fetch window (0 implies no pre-fetch)"
-
- printf "\n%27s\n%94s\n" "--ack-frequency N (100)" "Ack frequency (0 implies none of the messages will get accepted)"
-
- printf "\n%14s\n%94s\n" "--tx N (0)" "batch size for transactions (0 implies transaction are not used)"
-
- printf "\n%30s\n%94s\n" "--rollback-frequency N (0)" "rollback frequency (0 implies no transaction will be rolledback)"
-
- printf "\n%30s\n%55s\n" "--print-content yes|no (0)" "print out message content"
-
- printf "\n%30s\n%55s\n" "--print-headers yes|no (0)" "print out message headers"
-
- printf "\n%18s\n%76s\n" "--report-total" "Report total throughput and latency statistics"
-
- printf "\n%24s\n%87s\n" "--report-every N (0)" "Report throughput and latency statistics every N messages"
-
- printf "\n%30s\n%47s\n" "--report-header yes|no (1)" "Headers on report"
-
- printf "\n%27s\n%82s\n" "--ready-address ADDRESS" "send a message to this address when ready to receive"
-
- printf "\n%14s\n%69s\n" "--jvm-args" "Extra jvm arguments you want to specify"
-
- printf "\n%17s\n%69s\n\n" "-v, --verbose" "Print debug information for this script"
-}
-
-eval set -- "$TEMP"
-while true; do
- case $1 in
- -b|--broker)
- URL="$2"; shift; shift; continue
- ;;
- -a|--address)
- ADDRESS="$2"; shift; shift; continue
- ;;
- --timeout)
- TIMEOUT="$2"; shift; shift; continue
- ;;
- -f|--forever)
- FOREVER="$2"; shift; shift; continue
- ;;
- -m|--messages)
- MESSAGES="$2"; shift; shift; continue
- ;;
- --ignore-duplicates)
- IGNORE_DUPLICATES="true"; shift; continue
- ;;
- --check-redelivered)
- CHECK_REDELIVERED="true"; shift; continue
- ;;
- --capacity)
- CAPACITY="$2"; shift; shift; continue
- ;;
- --ack-frequency)
- ACK_FREQUENCY="$2"; shift; shift; continue
- ;;
- --tx)
- TX="$2"; shift; shift; continue
- ;;
- --rollback-frequency)
- ROLLBACK_FREQUENCY="$2"; shift; shift; continue
- ;;
- --print-content)
- if [ "$2" == "yes" ]; then PRINT_CONTENT="true"; else PRINT_CONTENT="false"; fi; shift; shift; continue
- ;;
- --print-headers)
- if [ "$2" == "yes" ]; then PRINT_HEADERS="true"; else PRINT_HEADERS="false"; fi; shift; shift; continue
- ;;
- --report-total)
- REPORT_TOTAL="true"; shift; continue
- ;;
- --report-every)
- REPORT_EVERY="$2"; shift; shift; continue
- ;;
- --report-header)
- if [ "$2" == "yes" ]; then REPORT_HEADER="true"; else REPORT_HEADER="false"; fi; shift; shift; continue
- ;;
- --ready-address)
- READY_ADDRESS="$2"; shift; shift; continue
- ;;
- -a|--jvm-args)
- EXTRA_JVM_ARGS="$2"; shift; shift; continue
- ;;
- -h|--help)
- usage
- exit 0
- ;;
- -v|--verbose)
- VERBOSE="1"; shift; continue
- ;;
- --)
- # no more arguments to parse
- break
- ;;
- *)
- # no more arguments to parse
- break
- ;;
- esac
-done
-
-RECEIVER_ARGS="-server -Durl=$URL \
--Daddress=$ADDRESS \
--Dtimeout=$TIMEOUT \
--Dmsg-count=$MESSAGES \
--Dack-frequency=$ACK_FREQUENCY \
--Dtx=$TX \
--Drollback-frequnecy=$ROLLBACL_FREQUENCY \
--Dprint-content=$PRINT_CONTENT \
--Dprint-headers=$PRINT_HEADERS \
--Dreport-total=$REPORT_TOTAL \
--Dreport-every=$REPORT_EVERY \
--Dreport-header=$REPORT_HEADER \
--Dmax_prefetch=$CAPACITY "
-
-if [ "x$READY_ADDRESS" != "x" ]; then RECEIVER_ARGS="$RECEIVER_ARGS -Dready-address=$READY_ADDRESS"; fi
-if [ "$VERBOSE" == "1" ]; then echo $RECEIVER_ARGS; fi
-echo $RECEIVER_ARGS
-$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $RECEIVER_ARGS org.apache.qpid.tools.QpidReceive
diff --git a/qpid/java/tools/bin/qpid-jms-send b/qpid/java/tools/bin/qpid-jms-send
deleted file mode 100755
index d7695924f0..0000000000
--- a/qpid/java/tools/bin/qpid-jms-send
+++ /dev/null
@@ -1,261 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This starts the controller for coordinating perf tests/
-
-. check-qpid-java-env
-
-PROGRAM_NAME="qpid-jms-send"
-URL="amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'"
-ADDRESS="queue;{create:always}"
-MESSAGES="1"
-ID=""
-REPLY_TO=""
-SEND_EOS="1"
-DURABLE="false"
-TTL="0"
-PRIORITY="0"
-PROPERTY=""
-CORRELATION_ID=""
-USER_ID=""
-CONTENT_STRING=""
-CONTENT_SIZE="1024"
-CONTENT_MAP=""
-CAPACITY="1000"
-ACK_FREQUENCY="100"
-TX="0"
-ROLLBACL_FREQUENCY="0"
-PRINT_CONTENT="true"
-PRINT_HEADERS="false"
-REPORT_TOTAL="false"
-REPORT_EVERY="0"
-REPORT_HEADER="true"
-SEND_RATE="-1"
-SEQUNCE="1"
-DISABLE_TIMESTAMP="false"
-EXTRA_JVM_ARGS=""
-VERBOSE="0"
-
-TEST_ID=`echo ${HOSTNAME} | awk -F . '{print $1}'`
-
-TEMP=$(getopt -n $PROGRAM_NAME -o b:a:m:i:P:M:vh\
- --long broker:,address:,messages:,id:,reply-to:\
-,send-eos:,durable:,ttl:,property:,correlational-id:\
-,user-id:,content-string:,content-size:,content-map:\
-,capacity:,ack-frequency:,tx:,rollback-frequency:\
-,print-content:,print-headers:,report-total\
-,report-every:,report-header:,send-rate:,sequence:,timestamp:\
-,jvm-args:,verbose,help -- "$@")
-
-# padding the option string with 4 spaces
-# padding the desc string with 30 spaces
-usage()
-{
- printf "\n%s\n" "Usage: $PROGRAM_NAME [option].."
-
- printf "\n%20s\n%57s\n" "-b, --broker URL" "url of broker to connect to"
-
- printf "\n%24s\n%53s\n" "-a,--address ADDRESS" "address to receive from"
-
- printf "\n%24s\n%89s\n" "-m, --messages N (0)" "Number of messages to receive; 0 means receive indefinitely"
-
- printf "\n%15s\n%75s\n" "-i, --id ID" "use the supplied id instead of generating one"
-
- printf "\n%23s\n%54s\n" "--reply-to REPLY-TO" "specify reply-to address"
-
- printf "\n%20s\n%70s\n" "--send-eos N (0)" "send N EOS messages to mark end of input"
-
- printf "\n%24s\n%54s\n" "--durable yes|no (0)" "mark messages as durable"
-
- printf "\n%19s\n%72s\n" "--ttl msecs (0)" "time-to-live for messages, in milliseconds"
-
- printf "\n%27s\n%72s\n" "--priority PRIORITY (0)" "time-to-live for messages, in milliseconds"
-
- printf "\n%29s\n%54s\n" "-P, --property NAME=VALUE" "specify message property"
-
- printf "\n%23s\n%57s\n" "--correlation-id ID" "correlation-id for message"
-
- printf "\n%20s\n%48s\n" "--user-id USERID" "userid for message"
-
- printf "\n%28s\n%60s\n" "--content-string CONTENT" "use CONTENT as message content"
-
- printf "\n%24s\n%62s\n" "--content-size N (0)" "create an N-byte message content"
-
- printf "\n%32s\n%59s\n" "-M, --content-map NAME=VALUE" "specify entry for map content"
-
- printf "\n%23s\n%71s\n" "--capacity N (1000)" "Pre-fetch window (0 implies no pre-fetch)"
-
- printf "\n%27s\n%94s\n" "--ack-frequency N (100)" "Ack frequency (0 implies none of the messages will get accepted)"
-
- printf "\n%14s\n%94s\n" "--tx N (0)" "batch size for transactions (0 implies transaction are not used)"
-
- printf "\n%30s\n%94s\n" "--rollback-frequency N (0)" "rollback frequency (0 implies no transaction will be rolledback)"
-
- printf "\n%30s\n%55s\n" "--print-content yes|no (1)" "print out message content"
-
- printf "\n%30s\n%55s\n" "--print-headers yes|no (0)" "print out message headers"
-
- printf "\n%18s\n%76s\n" "--report-total" "Report total throughput and latency statistics"
-
- printf "\n%24s\n%87s\n" "--report-every N (0)" "Report throughput and latency statistics every N messages"
-
- printf "\n%30s\n%47s\n" "--report-header yes|no (1)" "Headers on report"
-
- printf "\n%21s\n%64s\n%62s\n" "--send-rate N (0)" "Send at rate of N messages/second." "0 means send as fast as possible"
-
- printf "\n%25s\n%69s\n%77s\n" "--sequence yes|no (1)" "Add a sequence number messages property" "(required for duplicate/lost message detection)"
-
- printf "\n%26s\n%64s\n%77s\n" "--timestamp yes|no (1)" "Add a time stamp messages property" "(required for duplicate/lost message detection)"
-
- printf "\n%14s\n%69s\n" "--jvm-args" "Extra jvm arguments you want to specify"
-
- printf "\n%17s\n%69s\n\n" "-v, --verbose" "Print debug information for this script"
-}
-
-eval set -- "$TEMP"
-while true; do
- case $1 in
- -b|--broker)
- URL="$2"; shift; shift; continue
- ;;
- -a|--address)
- ADDRESS="$2"; shift; shift; continue
- ;;
- -m|--messages)
- MESSAGES="$2"; shift; shift; continue
- ;;
- -i|--id)
- ID="$2"; shift; shift; continue
- ;;
- --reply-to)
- REPLY_TO="$2"; shift; shift; continue
- ;;
- --send-eos)
- SEND_EOS="$2"; shift; shift; continue
- ;;
- --durable)
- if [ "$2" == "1" ]; then DURABLE="true"; else DURABLE="false"; fi; shift; shift; continue
- ;;
- --ttl)
- TTL="$2"; shift; shift; continue
- ;;
- --priority)
- PRIORITY="$2"; shift; shift; continue
- ;;
- -P|--property)
- PROPERTY="$2,$PROPERTY"; shift; shift; continue
- ;;
- --correlation-id)
- CORRELATION_ID="$2"; shift; shift; continue
- ;;
- --user-id)
- USER_ID="$2"; shift; shift; continue
- ;;
- --content-string)
- CONTENT_STRING="$2"; shift; shift; continue
- ;;
- --content-size)
- CONTENT_SIZE="$2"; shift; shift; continue
- ;;
- -M|--content-map)
- CONTENT_MAP="$2,$CONTENT_MAP"; shift; shift; continue
- ;;
- --capacity)
- CAPACITY="$2"; shift; shift; continue
- ;;
- --ack-frequency)
- ACK_FREQUENCY="$2"; shift; shift; continue
- ;;
- --tx)
- TX="$2"; shift; shift; continue
- ;;
- --rollback-frequency)
- ROLLBACK_FREQUENCY="$2"; shift; shift; continue
- ;;
- --print-content)
- if [ "$2" == "yes" ]; then PRINT_CONTENT="true"; else PRINT_CONTENT="false"; fi; shift; shift; continue
- ;;
- --print-headers)
- if [ "$2" == "yes" ]; then PRINT_HEADERS="true"; else PRINT_HEADERS="false"; fi; shift; shift; continue
- ;;
- --report-total)
- REPORT_TOTAL="true"; shift; continue
- ;;
- --report-every)
- REPORT_EVERY="$2"; shift; shift; continue
- ;;
- --report-header)
- if [ "$2" == "yes" ]; then REPORT_HEADER="true"; else REPORT_HEADER="false"; fi; shift; shift; continue
- ;;
- --send-rate)
- SEND_RATE="$2"; shift; shift; continue
- ;;
- --sequence)
- if [ "$2" == "yes" ]; then SEQUENCE="true"; else SEQUENCE="false"; fi; shift; shift; continue
- ;;
- --timestamp)
- if [ "$2" == "yes" ]; then DISABLE_TIMESTAMP="false"; else DISABLE_TIMESTAMP="true"; fi; shift; shift; continue
- ;;
- -a|--jvm-args)
- EXTRA_JVM_ARGS="$2"; shift; shift; continue
- ;;
- -h|--help)
- usage
- exit 0
- ;;
- -v|--verbose)
- VERBOSE="1"; shift; continue
- ;;
- --)
- # no more arguments to parse
- break
- ;;
- *)
- # no more arguments to parse
- break
- ;;
- esac
-done
-
-SENDER_ARGS="-server -Durl=$URL \
--Daddress=$ADDRESS \
--Dmsg-count=$MESSAGES \
--Dsend-eos=$SEND_EOS \
--Ddurable=$DURABLE \
--Dmsg_size=$CONTENT_SIZE \
--Dsend-rate=$SEND_RATE \
--Ddisable-timestamp=$DISABLE_TIMESTAMP \
--Dttl=$TTL \
--Dpriority=$PRIORITY \
--Dtx=$TX \
--Drollback-frequnecy=$ROLLBACK_FREQUENCY \
--Dprint-content=$PRINT_CONTENT \
--Dprint-headers=$PRINT_HEADERS \
--Dreport-total=$REPORT_TOTAL \
--Dreport-every=$REPORT_EVERY \
--Dreport-header=$REPORT_HEADER \
--Dmax_prefetch=$CAPACITY "
-
-if [ "x$ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Did=$ID"; fi
-if [ "x$USER_ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Duser_id=$USER_ID"; fi
-if [ "x$CORRELATION_ID" != "x" ]; then SENDER_ARGS="$SENDER_ARGS -Dcorrelation_id=$CORRELATION_ID"; fi
-
-if [ "$VERBOSE" == "1" ]; then echo $SENDER_ARGS; fi
-$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $SENDER_ARGS org.apache.qpid.tools.QpidSend
diff --git a/qpid/java/tools/bin/qpid-python-testkit b/qpid/java/tools/bin/qpid-python-testkit
deleted file mode 100755
index 20f72aca53..0000000000
--- a/qpid/java/tools/bin/qpid-python-testkit
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/usr/bin/env bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-# This is wrapper script to run the tests defined in testkit.py
-# via the python test runner. The defaults are set for a running
-# from an svn checkout
-
-. check-qpid-java-env
-
-export PYTHONPATH=./:$PYTHONPATH
-echo $PYTHONPATH
-if [ "$OUTDIR" = "" ] ; then
- OUTDIR=$PWD
-fi
-testdir=$OUTDIR/testkit-out-`date +%F-%H-%M-%S`
-qpid-python-test -m testkit -DOUTDIR=$testdir"$@"
diff --git a/qpid/java/tools/bin/run-pub b/qpid/java/tools/bin/run-pub
deleted file mode 100755
index 9efe58c4b8..0000000000
--- a/qpid/java/tools/bin/run-pub
+++ /dev/null
@@ -1,28 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-. check-qpid-java-env
-
-JVM_ARGS="$1"
-PROGRAM_ARGS="$2"
-
-echo "JVM ARGS : $JAVA_MEM $JVM_ARGS"
-echo "PROGRAM ARGS : $PROGRAM_ARGS"
-$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfProducer $PROGRAM_ARGS
diff --git a/qpid/java/tools/bin/run-sub b/qpid/java/tools/bin/run-sub
deleted file mode 100755
index 8449563f7f..0000000000
--- a/qpid/java/tools/bin/run-sub
+++ /dev/null
@@ -1,32 +0,0 @@
-#!/bin/sh
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-. check-qpid-java-env
-
-echo "All args $@"
-
-JVM_ARGS="$1"
-PROGRAM_ARGS="$2"
-
-echo "JVM ARGS : $JAVA_MEM $JVM_ARGS"
-echo "PROGRAM ARGS : $PROGRAM_ARGS"
-
-$JAVA -cp $CLASSPATH $LOG_CONFIG $JAVA_MEM $JVM_ARGS org.apache.qpid.tools.PerfConsumer $PROGRAM_ARGS
-
diff --git a/qpid/java/tools/bin/testkit.py b/qpid/java/tools/bin/testkit.py
deleted file mode 100755
index 1c2ad598b8..0000000000
--- a/qpid/java/tools/bin/testkit.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#!/usr/bin/env python
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import time, string, traceback
-from brokertest import *
-from qpid.messaging import *
-
-
-try:
- import java.lang.System
- _cp = java.lang.System.getProperty("java.class.path");
-except ImportError:
- _cp = checkenv("QP_CP")
-
-class Formatter:
-
- def __init__(self, message):
- self.message = message
- self.environ = {"M": self.message,
- "P": self.message.properties,
- "C": self.message.content}
-
- def __getitem__(self, st):
- return eval(st, self.environ)
-
-# The base test case has support for launching the generic
-# receiver and sender through the TestLauncher with all the options.
-#
-class JavaClientTest(BrokerTest):
- """Base Case for Java Test cases"""
-
- client_class = "org.apache.qpid.testkit.TestLauncher"
-
- # currently there is no transparent reconnection.
- # temp hack: just creating the queue here and closing it.
- def start_error_watcher(self,broker=None):
- ssn = broker.connect().session()
- err_watcher = ssn.receiver("control; {create:always}", capacity=1)
- ssn.close()
-
- def store_module_args(self):
- if BrokerTest.store_lib:
- return ["--load-module", BrokerTest.store_lib]
- else:
- print "Store module not present."
- return [""]
-
- def client(self,**options):
- cmd = ["java","-cp",_cp]
-
- cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")]
- cmd += ["-Dhost=" + options.get("host","127.0.0.1")]
- cmd += ["-Dport=" + str(options.get("port",5672))]
- cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
- cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))]
- cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))]
- cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))]
- cmd += ["-Ddurable=" + str(options.get("durable",False))]
- cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
- cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
- cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))]
- cmd += ["-Dsender=" + str(options.get("sender",False))]
- cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))]
- cmd += ["-Dtx_size=" + str(options.get("tx_size",10))]
- cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))]
- cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))]
- cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))]
- cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))]
- cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
- cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
- cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))]
- cmd += ["-Dlog.level=" + options.get("log.level", "warn")]
- cmd += [self.client_class]
- cmd += [options.get("address", "my_queue; {create: always}")]
-
- #print str(options.get("port",5672))
- return cmd
-
- # currently there is no transparent reconnection.
- # temp hack: just creating a receiver and closing session soon after.
- def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60):
- ssn = broker.connect().session()
- err_watcher = ssn.receiver("control; {create:always}", capacity=1)
- i = run_time/error_ck_freq
- is_error = False
- for j in range(i):
- not_empty = True
- while not_empty:
- try:
- m = err_watcher.fetch(timeout=error_ck_freq)
- ssn.acknowledge()
- print "Java process notified of an error"
- self.print_error(m)
- is_error = True
- except messaging.Empty, e:
- not_empty = False
-
- ssn.close()
- return is_error
-
- def print_error(self,msg):
- print msg.properties.get("exception-trace")
-
- def verify(self, receiver,sender):
- sender_running = receiver.is_running()
- receiver_running = sender.is_running()
-
- self.assertTrue(receiver_running,"Receiver has exited prematually")
- self.assertTrue(sender_running,"Sender has exited prematually")
-
- def start_sender_and_receiver(self,**options):
-
- receiver_opts = options
- receiver_opts["receiver"]=True
- receiver = self.popen(self.client(**receiver_opts),
- expect=EXPECT_RUNNING)
-
- sender_opts = options
- sender_opts["sender"]=True
- sender = self.popen(self.client(**sender_opts),
- expect=EXPECT_RUNNING)
-
- return receiver, sender
-
- def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options):
- if options.get("durable",False)==True:
- cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args())
- else:
- cluster = Cluster(self, count=count)
- return cluster
-
-class ConcurrencyTest(JavaClientTest):
- """A concurrency test suite for the JMS client"""
- skip = False
-
- def base_case(self,**options):
- if self.skip :
- print "Skipping test"
- return
-
- cluster = self.start_cluster(count=2,**options)
- self.start_error_watcher(broker=cluster[0])
- options["port"] = port=cluster[0].port()
-
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
- receiver, sender = self.start_sender_and_receiver(**options)
- self.monitor_clients(broker=cluster[0],run_time=180)
- self.verify(receiver,sender)
-
- def test_multiplexing_con(self):
- """Tests multiple sessions on a single connection"""
-
- self.base_case(ssn_per_con=25,test_name=self.id())
-
- def test_multiplexing_con_with_tx(self):
- """Tests multiple transacted sessions on a single connection"""
-
- self.base_case(ssn_per_con=25,transacted=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_rcv(self):
- """Tests multiple sessions with sync receive"""
-
- self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id())
-
- def test_multiplexing_con_with_durable_sub(self):
- """Tests multiple sessions with durable subs"""
-
- self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_ack(self):
- """Tests multiple sessions with sync ack"""
-
- self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id())
-
- def test_multiplexing_con_with_sync_pub(self):
- """Tests multiple sessions with sync pub"""
-
- self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id())
-
- def test_multiple_cons_and_ssns(self):
- """Tests multiple connections and sessions"""
-
- self.base_case(con_count=10,ssn_per_con=25,test_name=self.id())
-
-
-class SoakTest(JavaClientTest):
- """A soak test suite for the JMS client"""
-
- def base_case(self,**options):
- cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options)
- options["port"] = port=cluster[0].port()
- self.start_error_watcher(broker=cluster[0])
- options["use_unique_dests"]=True
- options["address"]="amq.topic"
- receiver,sender = self.start_sender_and_receiver(**options)
- is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30)
-
- if (is_error):
- print "The sender or receiver didn't start properly. Exiting test."
- return
- else:
- "Print no error !"
-
- # grace period for java clients to get the failover properly setup.
- time.sleep(30)
- error_msg= None
- # Kill original brokers, start new ones.
- try:
- for i in range(8):
- cluster[i].kill()
- b=cluster.start()
- self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
- print "iteration : " + str(i)
- except ConnectError, e1:
- error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1)
-
- except SessionError, e2:
- error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2)
-
- self.verify(receiver,sender)
- if error_msg:
- raise Exception(error_msg)
-
-
- def test_failover(self) :
- """Test basic failover"""
-
- self.base_case(test_name=self.id())
-
-
- def test_failover_with_durablesub(self):
- """Test failover with durable subscriber"""
-
- self.base_case(durable=True,jms_durable_sub=True,test_name=self.id())
-
-
- def test_failover_with_sync_rcv(self):
- """Test failover with sync receive"""
-
- self.base_case(sync_rcv=True,test_name=self.id())
-
-
- def test_failover_with_sync_ack(self):
- """Test failover with sync ack"""
-
- self.base_case(sync_ack=True,test_name=self.id())
-
-
- def test_failover_with_noprefetch(self):
- """Test failover with no prefetch"""
-
- self.base_case(max_prefetch=1,test_name=self.id())
-
-
- def test_failover_with_multiple_cons_and_ssns(self):
- """Test failover with multiple connections and sessions"""
-
- self.base_case(use_unique_dests=True,address="amq.topic",
- con_count=10,ssn_per_con=25,test_name=self.id())
diff --git a/qpid/java/tools/etc/perf-report.gnu b/qpid/java/tools/etc/perf-report.gnu
deleted file mode 100644
index b7662b0bfe..0000000000
--- a/qpid/java/tools/etc/perf-report.gnu
+++ /dev/null
@@ -1,61 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-set terminal png
-set datafile separator ","
-
-set title "Variation of avg latency between iterations"
-set yrange [10:20]
-set xlabel "Iterations"
-set ylabel "Latency (ms)"
-set output "avg_latency.png"
-plot "stats-csv.log" using 9 title "avg latency" with lines, 14 title "target latency" with lines
-
-
-set title "Variation of max latency between iterations"
-set yrange [0:1000]
-set xlabel "Iterations"
-set ylabel "Latency (ms)"
-set output "max_latency.png"
-plot "stats-csv.log" using 11 title "max latency" with lines,14 title "target latency" with lines,100 title "100 ms" with lines
-
-
-set title "Variation of standard deviation of latency between iterations"
-set yrange [0:20]
-set xlabel "Iterations"
-set ylabel "Standard Deviation"
-set output "std_dev_latency.png"
-plot "stats-csv.log" using 12 title "standard deviation" with lines
-
-
-set title "Variation of system throughput between iterations"
-set yrange [400000:450000]
-set xlabel "Iterations"
-set ylabel "System Throuhgput (msg/sec)"
-set output "system_rate.png"
-plot "stats-csv.log" using 2 title "system throughput" with lines
-
-
-set title "Variation of avg producer & consumer rates between iterations"
-set yrange [6500:7500]
-set xlabel "Iterations"
-set ylabel "Avg Rates (msg/sec)"
-set output "prod_cons_rate.png"
-plot "stats-csv.log" using 6 title "producer rate" with lines,"stats-csv.log" using 3 title "consumer rate" with lines
-
diff --git a/qpid/java/tools/etc/test.log4j b/qpid/java/tools/etc/test.log4j
deleted file mode 100644
index b574a7b5b7..0000000000
--- a/qpid/java/tools/etc/test.log4j
+++ /dev/null
@@ -1,28 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-log4j.rootLogger=${root.logging.level}
-
-log4j.logger.org.apache.qpid=ERROR, console
-log4j.additivity.org.apache.qpid=false
-
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.Threshold=all
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n
-
diff --git a/qpid/java/tools/pom.xml b/qpid/java/tools/pom.xml
deleted file mode 100644
index ac90c98ee7..0000000000
--- a/qpid/java/tools/pom.xml
+++ /dev/null
@@ -1,88 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-java-build</artifactId>
- <version>0.32-SNAPSHOT</version>
- </parent>
-
- <artifactId>qpid-tools</artifactId>
- <name>Qpid Tools</name>
- <description>Tools</description>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.qpid</groupId>
- <artifactId>qpid-client</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>${log4j-version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j-version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
- <version>${geronimo-jms-1-1-version}</version>
- </dependency>
-
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <version>${commons-codec-version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- <version>${jackson-version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>${jackson-version}</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-deploy-plugin</artifactId>
- <!--version specified in parent pluginManagement -->
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
deleted file mode 100644
index b10129d855..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Client.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Session;
-
-public abstract class Client implements ExceptionListener
-{
- private Connection con;
- private Session ssn;
- private boolean durable = false;
- private boolean transacted = false;
- private int txSize = 10;
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
- private String contentType = "application/octet-stream";
-
- private long reportFrequency = 60000; // every min
-
- private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- private NumberFormat nf = new DecimalFormat("##.00");
-
- private long startTime = System.currentTimeMillis();
- private ErrorHandler errorHandler = null;
-
- public Client(Connection con) throws Exception
- {
- this.con = con;
- this.con.setExceptionListener(this);
- durable = Boolean.getBoolean("durable");
- transacted = Boolean.getBoolean("transacted");
- txSize = Integer.getInteger("tx_size",10);
- contentType = System.getProperty("content_type","application/octet-stream");
- reportFrequency = Long.getLong("report_frequency", 60000);
- }
-
- public void close()
- {
- try
- {
- con.close();
- }
- catch (Exception e)
- {
- handleError("Error closing connection",e);
- }
- }
-
- public void onException(JMSException e)
- {
- handleError("Connection error",e);
- }
-
- public void setErrorHandler(ErrorHandler h)
- {
- this.errorHandler = h;
- }
-
- public void handleError(String msg,Exception e)
- {
- if (errorHandler != null)
- {
- errorHandler.handleError(msg, e);
- }
- else
- {
- System.err.println(msg);
- e.printStackTrace();
- }
- }
-
- protected Session getSsn()
- {
- return ssn;
- }
-
- protected void setSsn(Session ssn)
- {
- this.ssn = ssn;
- }
-
- protected boolean isDurable()
- {
- return durable;
- }
-
- protected boolean isTransacted()
- {
- return transacted;
- }
-
- protected int getTxSize()
- {
- return txSize;
- }
-
- protected int getAck_mode()
- {
- return ack_mode;
- }
-
- protected String getContentType()
- {
- return contentType;
- }
-
- protected long getReportFrequency()
- {
- return reportFrequency;
- }
-
- protected long getStartTime()
- {
- return startTime;
- }
-
- protected void setStartTime(long startTime)
- {
- this.startTime = startTime;
- }
-
- public DateFormat getDf()
- {
- return df;
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
deleted file mode 100644
index de7748acd6..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-public interface ErrorHandler {
-
- public void handleError(String msg,Exception e);
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
deleted file mode 100644
index 8dcf59e9c1..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Receiver.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-
-/**
- * A generic receiver which consumes messages
- * from a given address in a broker (host/port)
- * until told to stop by killing it.
- *
- * It participates in a feedback loop to ensure the producer
- * doesn't fill up the queue. If it receives an "End" msg
- * it sends a reply to the replyTo address in that msg.
- *
- * It doesn't check for correctness or measure anything
- * leaving those concerns to another entity.
- * However it prints a timestamp every x secs(-Dreport_frequency)
- * as checkpoint to figure out how far the test has progressed if
- * a failure occurred.
- *
- * It also takes in an optional Error handler to
- * pass out any error in addition to writing them to std err.
- *
- * This is intended more as building block to create
- * more complex test cases. However there is a main method
- * provided to use this standalone.
- *
- * The following options are available and configurable
- * via jvm args.
- *
- * sync_rcv - Whether to consume sync (instead of using a listener).
- * report_frequency - how often a timestamp is printed
- * durable
- * transacted
- * tx_size - size of transaction batch in # msgs. *
- * check_for_dups - check for duplicate messages and out of order messages.
- * jms_durable_sub - create a durable subscription instead of a regular subscription.
- */
-public class Receiver extends Client implements MessageListener
-{
- private long msg_count = 0;
- private int sequence = 0;
- private boolean syncRcv = Boolean.getBoolean("sync_rcv");
- private boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub");
- private boolean checkForDups = Boolean.getBoolean("check_for_dups");
- private MessageConsumer consumer;
- private List<Integer> duplicateMessages = new ArrayList<Integer>();
-
- public Receiver(Connection con,String addr) throws Exception
- {
- super(con);
- setSsn(con.createSession(isTransacted(), getAck_mode()));
- consumer = getSsn().createConsumer(new AMQAnyDestination(addr));
- if (!syncRcv)
- {
- consumer.setMessageListener(this);
- }
-
- System.out.println("Receiving messages from : " + addr);
- }
-
- public void onMessage(Message msg)
- {
- handleMessage(msg);
- }
-
- public void run() throws Exception
- {
- long sleepTime = getReportFrequency();
- while(true)
- {
- if(syncRcv)
- {
- long t = sleepTime;
- while (t > 0)
- {
- long start = System.currentTimeMillis();
- Message msg = consumer.receive(t);
- t = t - (System.currentTimeMillis() - start);
- handleMessage(msg);
- }
- }
- Thread.sleep(sleepTime);
- System.out.println(getDf().format(System.currentTimeMillis())
- + " - messages received : " + msg_count);
- }
- }
-
- private void handleMessage(Message m)
- {
- if (m == null) { return; }
-
- try
- {
- if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
- {
- MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo());
- Message controlMsg = getSsn().createTextMessage();
- temp.send(controlMsg);
- if (isTransacted())
- {
- getSsn().commit();
- }
- temp.close();
- }
- else
- {
-
- int seq = m.getIntProperty("sequence");
- if (checkForDups)
- {
- if (seq == 0)
- {
- sequence = 0; // wrap around for each iteration
- System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration");
- duplicateMessages.clear();
- }
-
- if (seq < sequence)
- {
- duplicateMessages.add(seq);
- }
- else if (seq == sequence)
- {
- sequence++;
- msg_count ++;
- }
- else
- {
- // Multiple publishers are not allowed in this test case.
- // So out of order messages are not allowed.
- throw new Exception(": Received an out of order message (expected="
- + sequence + ",received=" + seq + ")" );
- }
- }
- else
- {
- msg_count ++;
- }
-
- // Please note that this test case doesn't expect duplicates
- // When testing for transactions.
- if (isTransacted() && msg_count % getTxSize() == 0)
- {
- getSsn().commit();
- }
- }
- }
- catch (Exception e)
- {
- e.printStackTrace();
- handleError("Exception receiving messages",e);
- }
- }
-
- // Receiver host port address
- public static void main(String[] args) throws Exception
- {
- String host = "127.0.0.1";
- int port = 5672;
- String addr = "message_queue";
-
- if (args.length > 0)
- {
- host = args[0];
- }
- if (args.length > 1)
- {
- port = Integer.parseInt(args[1]);
- }
- if (args.length > 2)
- {
- addr = args[2];
- }
-
- AMQConnection con = new AMQConnection(
- "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "'");
-
- Receiver rcv = new Receiver(con,addr);
- rcv.run();
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
deleted file mode 100644
index 14b9b7302f..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/Sender.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.Random;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.tools.MessageFactory;
-
-/**
- * A generic sender which sends a stream of messages
- * to a given address in a broker (host/port)
- * until told to stop by killing it.
- *
- * It has a feedback loop to ensure it doesn't fill
- * up queues due to a slow consumer.
- *
- * It doesn't check for correctness or measure anything
- * leaving those concerns to another entity.
- * However it prints a timestamp every x secs(-Dreport_frequency)
- * as checkpoint to figure out how far the test has progressed if
- * a failure occurred.
- *
- * It also takes in an optional Error handler to
- * pass out any error in addition to writing them to std err.
- *
- * This is intended more as building block to create
- * more complex test cases. However there is a main method
- * provided to use this standalone.
- *
- * The following options are available and configurable
- * via jvm args.
- *
- * msg_size (256)
- * msg_count (10) - # messages before waiting for feedback
- * sleep_time (1000 ms) - sleep time btw each iteration
- * report_frequency - how often a timestamp is printed
- * durable
- * transacted
- * tx_size - size of transaction batch in # msgs.
- */
-public class Sender extends Client
-{
- protected int msg_size = 256;
- protected int msg_count = 10;
- protected int iterations = -1;
- protected long sleep_time = 1000;
-
- protected Destination dest = null;
- protected Destination replyTo = null;
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
-
- protected MessageProducer producer;
- Random gen = new Random(19770905);
-
- public Sender(Connection con,String addr) throws Exception
- {
- super(con);
- this.msg_size = Integer.getInteger("msg_size", 100);
- this.msg_count = Integer.getInteger("msg_count", 10);
- this.iterations = Integer.getInteger("iterations", -1);
- this.sleep_time = Long.getLong("sleep_time", 1000);
- this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE));
- this.dest = new AMQAnyDestination(addr);
- this.producer = getSsn().createProducer(dest);
- this.replyTo = getSsn().createTemporaryQueue();
-
- System.out.println("Sending messages to : " + addr);
- }
-
- /*
- * If msg_size not specified it generates a message
- * between 500-1500 bytes.
- */
- protected Message getNextMessage() throws Exception
- {
- int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size;
- Message msg = (getContentType().equals("text/plain")) ?
- MessageFactory.createTextMessage(getSsn(), s):
- MessageFactory.createBytesMessage(getSsn(), s);
-
- msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT
- : DeliveryMode.NON_PERSISTENT);
- return msg;
- }
-
- public void run()
- {
- try
- {
- boolean infinite = (iterations == -1);
- for (int x=0; infinite || x < iterations; x++)
- {
- long now = System.currentTimeMillis();
- if (now - getStartTime() >= getReportFrequency())
- {
- System.out.println(df.format(now) + " - iterations : " + x);
- setStartTime(now);
- }
-
- for (int i = 0; i < msg_count; i++)
- {
- Message msg = getNextMessage();
- msg.setIntProperty("sequence",i);
- producer.send(msg);
- if (isTransacted() && msg_count % getTxSize() == 0)
- {
- getSsn().commit();
- }
- }
- TextMessage m = getSsn().createTextMessage("End");
- m.setJMSReplyTo(replyTo);
- producer.send(m);
-
- if (isTransacted())
- {
- getSsn().commit();
- }
-
- MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo);
- feedbackConsumer.receive();
- feedbackConsumer.close();
- if (isTransacted())
- {
- getSsn().commit();
- }
- Thread.sleep(sleep_time);
- }
- }
- catch (Exception e)
- {
- handleError("Exception sending messages",e);
- }
- }
-
- // Receiver host port address
- public static void main(String[] args) throws Exception
- {
- String host = "127.0.0.1";
- int port = 5672;
- String addr = "message_queue";
-
- if (args.length > 0)
- {
- host = args[0];
- }
- if (args.length > 1)
- {
- port = Integer.parseInt(args[1]);
- }
- if (args.length > 2)
- {
- addr = args[2];
- }
-
- AMQConnection con = new AMQConnection(
- "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "'");
-
- Sender sender = new Sender(con,addr);
- sender.run();
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
deleted file mode 100644
index 0c94030ec6..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/testkit/TestLauncher.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit;
-
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
-import org.apache.qpid.client.AMQAnyDestination;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.thread.Threading;
-
-/**
- * A basic test case class that could launch a Sender/Receiver
- * or both, each on it's own separate thread.
- *
- * If con_count == ssn_count, then each entity created will have
- * it's own Connection. Else if con_count {@literal <} ssn_count, then
- * a connection will be shared by ssn_count/con_count # of entities.
- *
- * The if both sender and receiver options are set, it will
- * share a connection.
- *
- * The following options are available as jvm args
- * host, port
- * con_count,ssn_count
- * con_idle_time - which determines heartbeat
- * sender, receiver - booleans which indicate which entity to create.
- * Setting them both is also a valid option.
- */
-public class TestLauncher implements ErrorHandler
-{
- protected String host = "127.0.0.1";
- protected int port = 5672;
- protected int sessions_per_con = 1;
- protected int connection_count = 1;
- protected long heartbeat = 5000;
- protected boolean sender = false;
- protected boolean receiver = false;
- protected boolean useUniqueDests = false;
- protected String url;
-
- protected String address = "my_queue; {create: always}";
- protected boolean durable = false;
- protected String failover = "";
- protected AMQConnection controlCon;
- protected Destination controlDest = null;
- protected Session controlSession = null;
- protected MessageProducer statusSender;
- protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
- protected String testName;
-
- public TestLauncher()
- {
- testName = System.getProperty("test_name","UNKNOWN");
- host = System.getProperty("host", "127.0.0.1");
- port = Integer.getInteger("port", 5672);
- sessions_per_con = Integer.getInteger("ssn_per_con", 1);
- connection_count = Integer.getInteger("con_count", 1);
- heartbeat = Long.getLong("heartbeat", 5);
- sender = Boolean.getBoolean("sender");
- receiver = Boolean.getBoolean("receiver");
- useUniqueDests = Boolean.getBoolean("use_unique_dests");
-
- failover = System.getProperty("failover", "");
- durable = Boolean.getBoolean("durable");
-
- url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
- + host + ":" + port + "?heartbeat='" + heartbeat+ "''";
-
- if (failover.equalsIgnoreCase("failover_exchange"))
- {
- url += "&failover='failover_exchange'";
-
- System.out.println("Failover exchange " + url );
- }
-
- configureLogging();
- }
-
- protected void configureLogging()
- {
- PatternLayout layout = new PatternLayout();
- layout.setConversionPattern("%t %d %p [%c{4}] %m%n");
- BasicConfigurator.configure(new ConsoleAppender(layout));
-
- String logLevel = System.getProperty("log.level","warn");
- String logComponent = System.getProperty("log.comp","org.apache.qpid");
-
- Logger logger = Logger.getLogger(logComponent);
- logger.setLevel(Level.toLevel(logLevel, Level.WARN));
-
- System.out.println("Level " + logger.getLevel());
-
- }
-
- public void setUpControlChannel()
- {
- try
- {
- controlCon = new AMQConnection(url);
- controlCon.start();
-
- controlDest = new AMQAnyDestination("control; {create: always}"); // durable
-
- // Create the session to setup the messages
- controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
- statusSender = controlSession.createProducer(controlDest);
-
- }
- catch (Exception e)
- {
- handleError("Error while setting up the test",e);
- }
- }
-
- public void cleanup()
- {
- try
- {
- controlSession.close();
- controlCon.close();
- for (AMQConnection con : clients)
- {
- con.close();
- }
- }
- catch (Exception e)
- {
- handleError("Error while tearing down the test",e);
- }
- }
-
- public void start(String addr)
- {
- try
- {
- if (addr == null)
- {
- addr = address;
- }
-
- int ssn_per_con = sessions_per_con;
- String addrTemp = addr;
- for (int i = 0; i< connection_count; i++)
- {
- AMQConnection con = new AMQConnection(url);
- con.start();
- clients.add(con);
- for (int j = 0; j< ssn_per_con; j++)
- {
- String index = createPrefix(i,j);
- if (useUniqueDests)
- {
- addrTemp = modifySubject(index,addr);
- }
-
- if (sender)
- {
- createSender(index,con,addrTemp,this);
- }
-
- if (receiver)
- {
- System.out.println("########## Creating receiver ##################");
-
- createReceiver(index,con,addrTemp,this);
- }
- }
- }
- }
- catch (Exception e)
- {
- handleError("Exception while setting up the test",e);
- }
-
- }
-
- protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h)
- {
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- Receiver rcv = new Receiver(con,addr);
- rcv.setErrorHandler(h);
- rcv.run();
- }
- catch (Exception e)
- {
- h.handleError("Error Starting Receiver", e);
- }
- }
- };
-
- Thread t = null;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- handleError("Error creating Receive thread",e);
- }
-
- t.setName("ReceiverThread-" + index);
- t.start();
- }
-
- protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h)
- {
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- Sender sender = new Sender(con, addr);
- sender.setErrorHandler(h);
- sender.run();
- }
- catch (Exception e)
- {
- h.handleError("Error Starting Sender", e);
- }
- }
- };
-
- Thread t = null;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- handleError("Error creating Sender thread",e);
- }
-
- t.setName("SenderThread-" + index);
- t.start();
- }
-
- public synchronized void handleError(String msg,Exception e)
- {
- // In case sending the message fails
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" @ ");
- sb.append(df.format(new Date(System.currentTimeMillis())));
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
-
- try
- {
- TextMessage errorMsg = controlSession.createTextMessage();
- errorMsg.setStringProperty("status", "error");
- errorMsg.setStringProperty("desc", msg);
- errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
- errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
-
- System.out.println("Msg " + errorMsg);
-
- statusSender.send(errorMsg);
- }
- catch (JMSException e1)
- {
- e1.printStackTrace();
- }
- }
-
- private String serializeStackTrace(Exception e)
- {
- ByteArrayOutputStream bOut = new ByteArrayOutputStream();
- PrintStream printStream = new PrintStream(bOut);
- e.printStackTrace(printStream);
- printStream.close();
- return bOut.toString();
- }
-
- private String createPrefix(int i, int j)
- {
- return String.valueOf(i).concat(String.valueOf(j));
- }
-
- /**
- * A basic helper function to modify the subjects by
- * appending an index.
- */
- private String modifySubject(String index,String addr)
- {
- if (addr.indexOf("/") > 0)
- {
- addr = addr.substring(0,addr.indexOf("/")+1) +
- index +
- addr.substring(addr.indexOf("/")+1,addr.length());
- }
- else if (addr.indexOf(";") > 0)
- {
- addr = addr.substring(0,addr.indexOf(";")) +
- "/" + index +
- addr.substring(addr.indexOf(";"),addr.length());
- }
- else
- {
- addr = addr + "/" + index;
- }
-
- return addr;
- }
-
- public static void main(String[] args)
- {
- final TestLauncher test = new TestLauncher();
- test.setUpControlChannel();
- System.out.println("args.length " + args.length);
- System.out.println("args [0] " + args [0]);
- test.start(args.length > 0 ? args [0] : null);
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() { test.cleanup(); }
- });
-
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
deleted file mode 100644
index 7eb83a520b..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/Clock.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * In the future this will be replaced by a Clock abstraction
- * that can utilize a realtime clock when running in RT Java.
- */
-
-public class Clock
-{
- private static final Logger _logger = LoggerFactory.getLogger(Clock.class);
-
- public final static long SEC = 60000;
-
- private static Precision precision;
- private static long offset = -1; // in nano secs
-
- public enum Precision
- {
- NANO_SECS, MILI_SECS;
-
- static Precision getPrecision(String str)
- {
- if ("mili".equalsIgnoreCase(str))
- {
- return MILI_SECS;
- }
- else
- {
- return NANO_SECS;
- }
- }
- };
-
- static
- {
- precision = Precision.getPrecision(System.getProperty("precision","mili"));
- //offset = Long.getLong("offset",-1);
-
- if (_logger.isDebugEnabled())
- {
- System.out.println("Using precision : " + precision );
- //+ " and offset " + offset);
- }
- }
-
- public static Precision getPrecision()
- {
- return precision;
- }
-
- public static long getTime()
- {
- if (precision == Precision.NANO_SECS)
- {
- if (offset == -1)
- {
- return System.nanoTime();
- }
- else
- {
- return System.nanoTime() + offset;
- }
- }
- else
- {
- if (offset == -1)
- {
- return System.currentTimeMillis();
- }
- else
- {
- return System.currentTimeMillis() + offset/convertToMiliSecs();
- }
- }
- }
-
- public static long convertToSecs()
- {
- if (precision == Precision.NANO_SECS)
- {
- return 1000000000;
- }
- else
- {
- return 1000;
- }
- }
-
- public static long convertToMiliSecs()
- {
- if (precision == Precision.NANO_SECS)
- {
- return 1000000;
- }
- else
- {
- return 1;
- }
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JMXStressTestClient.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JMXStressTestClient.java
deleted file mode 100644
index 1b3c961660..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JMXStressTestClient.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.qpid.tools.util.ArgumentsParser;
-
-public class JMXStressTestClient
-{
-
- public static void main(String[] args) throws Exception
- {
- ArgumentsParser parser = new ArgumentsParser();
- Arguments arguments;
- try
- {
- arguments = parser.parse(args, Arguments.class);
- arguments.validate();
- }
- catch(IllegalArgumentException e)
- {
- System.out.println("Invalid argument:" + e.getMessage());
- parser.usage(Arguments.class, Arguments.REQUIRED);
- System.out.println("\nRun example:");
- System.out.println(" java -cp qpid-tools.jar org.apache.qpid.tools.JMXStressTestClient \\");
- System.out.println(" repetitions=10 host=localhost port=8999 username=admin password=admin \\");
- System.out.println(" virtualHost=default createQueue=true bindQueue=true deleteQueue=true \\");
- System.out.println(" uniqueQueues=true queueName=boo exchangeName=amq.fanout");
- return;
- }
-
- JMXStressTestClient client = new JMXStressTestClient();
- client.run(arguments);
- }
-
- public void run(Arguments arguments) throws IOException,MalformedObjectNameException
- {
- log(arguments.toString());
- for (int i = 0; i < arguments.getRepetitions(); i++)
- {
- try(JMXConnector connector = createConnector(arguments.getHost(), arguments.getPort(), arguments.getUsername(), arguments.getPassword()))
- {
- runIteration(arguments, connector, i);
- }
- }
- }
-
- private void runIteration(Arguments arguments, JMXConnector connector, int iteration) throws IOException, MalformedObjectNameException
- {
- log("Iteration " + iteration);
- MBeanServerConnection connection = connector.getMBeanServerConnection();
- String virtualHost = arguments.getVirtualHost();
- if (virtualHost != null)
- {
- ObjectName virtualHostMBeanName = new ObjectName("org.apache.qpid:type=VirtualHost.VirtualHostManager,VirtualHost="
- + ObjectName.quote(virtualHost));
-
- Set<ObjectName> virtualHostMBeans = connection.queryNames(virtualHostMBeanName, null);
- if(virtualHostMBeans.size() == 0)
- {
- throw new IllegalArgumentException("VirtualHost MBean was not found for virtual host " + virtualHost);
- }
-
- createAndBindQueueIfRequired(arguments, iteration, connection, virtualHostMBeanName);
- }
- }
-
- private void log(String logMessage)
- {
- System.out.println(logMessage);
- }
-
- private void createAndBindQueueIfRequired(Arguments arguments, int iteration, MBeanServerConnection connection,
- ObjectName virtualHostMBeanName) throws MalformedObjectNameException, IOException
- {
- if (arguments.isCreateQueue())
- {
- String queueName = arguments.getQueueName();
-
- if (queueName == null)
- {
- queueName = "temp-queue-" + System.nanoTime();
- }
- else if (arguments.isUniqueQueues())
- {
- queueName = queueName + "-" + iteration;
- }
-
- createQueue(connection, virtualHostMBeanName, queueName);
-
- if (arguments.isBindQueue())
- {
- bindQueue(connection, arguments.getVirtualHost(), queueName, arguments.getExchangeName());
- }
-
- if (arguments.isDeleteQueue())
- {
- deleteQueue(connection, virtualHostMBeanName, queueName);
- }
- }
- }
-
- private void deleteQueue(MBeanServerConnection connection, ObjectName virtualHostMBeanName, String queueName)
- {
- log(" Delete queue " + queueName);
- try
- {
- connection.invoke(virtualHostMBeanName, "deleteQueue", new Object[]{queueName}, new String[]{String.class.getName()});
- }
- catch (Exception e)
- {
- throw new RuntimeException("Cannot delete queue " + queueName, e);
- }
- }
-
- private void createQueue(MBeanServerConnection connection, ObjectName virtualHostMBeanName, String queueName)
- {
- log(" Create queue " + queueName);
- try
- {
- connection.invoke(virtualHostMBeanName, "createNewQueue", new Object[]{queueName, null, true},
- new String[]{String.class.getName(), String.class.getName(), boolean.class.getName()});
- }
- catch (Exception e)
- {
- throw new RuntimeException("Cannot create queue " + queueName, e);
- }
- }
-
- private void bindQueue(MBeanServerConnection connection, String virtualHost, String queueName, String exchangeName)
- throws MalformedObjectNameException, IOException
- {
- if (exchangeName == null)
- {
- exchangeName = "amq.direct";
- }
-
- log(" Bind queue " + queueName + " to " + exchangeName + " using binding key " + queueName);
-
- ObjectName exchangeObjectName = new ObjectName("org.apache.qpid:type=VirtualHost.Exchange,VirtualHost="
- + ObjectName.quote(virtualHost) + ","
- + "name=" + ObjectName.quote(exchangeName) + ",ExchangeType=*");
-
- Set<ObjectName> exchanges = connection.queryNames(exchangeObjectName, null);
-
- if(exchanges.size() == 0)
- {
- throw new IllegalArgumentException("Cannot find exchange MBean for exchange " + exchangeName);
- }
-
- try
- {
- connection.invoke(exchanges.iterator().next(), "createNewBinding", new Object[]{queueName, queueName},
- new String[]{String.class.getName(), String.class.getName()});
- }
- catch (Exception e)
- {
- throw new RuntimeException("Cannot delete queue " + queueName, e);
- }
- }
-
- JMXConnector createConnector(String host, int port, String username, String password) throws IOException
- {
- Map<String, Object> env = new HashMap<>();
- JMXServiceURL jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi");
- env.put(JMXConnector.CREDENTIALS, new String[] {username,password});
-
- return JMXConnectorFactory.connect(jmxUrl, env);
- }
-
- public static class Arguments
- {
- private static final Set<String> REQUIRED = new HashSet<>(Arrays.asList("host", "port", "username", "password"));
-
- private String host = null;
- private int port = -1;
- private String username = null;
- private String password = null;
-
- private String virtualHost = null;
- private String queueName = null;
- private String exchangeName = null;
-
- private int repetitions = 1;
-
- private boolean createQueue = false;
- private boolean deleteQueue = false;
- private boolean uniqueQueues = false;
- private boolean bindQueue = false;
-
- public Arguments()
- {
- }
-
- public void validate()
- {
- if (host == null || host.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'host' is not specified");
- }
-
- if (port == -1)
- {
- throw new IllegalArgumentException("Mandatory argument 'port' is not specified");
- }
-
- if (username == null || username.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'username' is not specified");
- }
-
- if (password == null || password.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'password' is not specified");
- }
- }
-
- public int getRepetitions()
- {
- return repetitions;
- }
-
- public String getHost()
- {
- return host;
- }
-
- public int getPort()
- {
- return port;
- }
-
- public String getUsername()
- {
- return username;
- }
-
- public String getPassword()
- {
- return password;
- }
-
- public String getVirtualHost()
- {
- return virtualHost;
- }
-
- public boolean isCreateQueue()
- {
- return createQueue;
- }
-
- public boolean isDeleteQueue()
- {
- return deleteQueue;
- }
-
- public boolean isUniqueQueues()
- {
- return uniqueQueues;
- }
-
- public String getQueueName()
- {
- return queueName;
- }
-
- public boolean isBindQueue()
- {
- return bindQueue;
- }
-
- public String getExchangeName()
- {
- return exchangeName;
- }
-
- @Override
- public String toString()
- {
- return "Arguments{" +
- "host='" + host + '\'' +
- ", port=" + port +
- ", username='" + username + '\'' +
- ", password='" + password + '\'' +
- ", virtualHost='" + virtualHost + '\'' +
- ", queueName='" + queueName + '\'' +
- ", exchangeName='" + exchangeName + '\'' +
- ", repetitions=" + repetitions +
- ", createQueue=" + createQueue +
- ", deleteQueue=" + deleteQueue +
- ", uniqueQueues=" + uniqueQueues +
- ", bindQueue=" + bindQueue +
- '}';
- }
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
deleted file mode 100644
index bd6cfd4363..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JNDICheck.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tools;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.Hashtable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-
-import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.jms.FailoverPolicy;
-
-public class JNDICheck
-{
- private static final String QUEUE = "queue.";
- private static final String TOPIC = "topic.";
- private static final String DESTINATION = "destination.";
- private static final String CONNECTION_FACTORY = "connectionfactory.";
-
- public static void main(String[] args)
- {
-
- if (args.length != 1)
- {
- usage();
- }
-
- String propertyFile = args[0];
-
- new JNDICheck(propertyFile);
- }
-
- private static void usage()
- {
- exit("Usage: JNDICheck <JNDI Config file>", 0);
- }
-
- private static void exit(String message, int exitCode)
- {
- System.err.println(message);
- System.exit(exitCode);
- }
-
- private static String JAVA_NAMING = "java.naming.factory.initial";
-
- private Context _context = null;
- private Hashtable _environment = null;
-
- public JNDICheck(String propertyFile)
- {
-
- // Load JNDI properties
- Properties properties = new Properties();
-
- try(FileInputStream propertiesStream = new FileInputStream(new File(propertyFile)))
- {
- properties.load(propertiesStream);
- }
- catch (IOException e)
- {
- exit("Unable to open property file:" + propertyFile + ". Due to:" + e.getMessage(), 1);
- }
-
- //Create the initial context
- try
- {
-
- System.setProperty(JAVA_NAMING, properties.getProperty(JAVA_NAMING));
-
- _context = new InitialContext(properties);
-
- _environment = _context.getEnvironment();
-
- Enumeration keys = _environment.keys();
-
- List<String> queues = new LinkedList<String>();
- List<String> topics = new LinkedList<String>();
- List<String> destinations = new LinkedList<String>();
- List<String> connectionFactories = new LinkedList<String>();
-
- while (keys.hasMoreElements())
- {
- String key = keys.nextElement().toString();
-
- if (key.startsWith(QUEUE))
- {
- queues.add(key);
- }
- else if (key.startsWith(TOPIC))
- {
- topics.add(key);
- }
- else if (key.startsWith(DESTINATION))
- {
- destinations.add(key);
- }
- else if (key.startsWith(CONNECTION_FACTORY))
- {
- connectionFactories.add(key);
- }
- }
-
- printHeader(propertyFile);
- printEntries(QUEUE, queues);
- printEntries(TOPIC, topics);
- printEntries(DESTINATION, destinations);
- printEntries(CONNECTION_FACTORY, connectionFactories);
-
- }
- catch (NamingException e)
- {
- exit("Unable to load JNDI Context due to:" + e.getMessage(), 1);
- }
-
- }
-
- private void printHeader(String file)
- {
- print("JNDI file :" + file);
- }
-
- private void printEntries(String type, List<String> list)
- {
- if (list.size() > 0)
- {
- String name = type.substring(0, 1).toUpperCase() + type.substring(1, type.length() - 1);
- print(name + " elements in file:");
- printList(list);
- print("");
- }
- }
-
- private void printList(List<String> list)
- {
- for (String item : list)
- {
- String key = item.substring(item.indexOf('.') + 1);
-
- try
- {
- print(key, _context.lookup(key));
- }
- catch (NamingException e)
- {
- exit("Error: item " + key + " no longer in context.", 1);
- }
- }
- }
-
- private void print(String key, Object object)
- {
- if (object instanceof AMQDestination)
- {
- print(key + ":" + object);
- }
- else if (object instanceof AMQConnectionFactory)
- {
- AMQConnectionFactory factory = (AMQConnectionFactory) object;
- print(key + ":Connection");
- print("ConnectionURL:");
- print(factory.getConnectionURL().toString());
- print("FailoverPolicy");
- print(new FailoverPolicy(factory.getConnectionURL(),null).toString());
- print("");
- }
- }
-
- private void print(String msg)
- {
- System.out.println(msg);
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
deleted file mode 100644
index e0e48519f3..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/JVMArgConfiguration.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.text.DecimalFormat;
-
-import javax.jms.Connection;
-import javax.jms.Session;
-
-import org.apache.qpid.client.AMQConnection;
-
-public class JVMArgConfiguration implements TestConfiguration
-{
- /*
- * By default the connection URL is used.
- * This allows a user to easily specify a fully fledged URL any given property.
- * Ex. SSL parameters
- *
- * By providing a host & port allows a user to simply override the URL.
- * This allows to create multiple clients in test scripts easily,
- * without having to deal with the long URL format.
- */
- private String url = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
-
- private String host = "";
-
- private int port = -1;
-
- private String address = "queue; {create : always}";
-
- private long timeout = 0;
-
- private int msg_size = 1024;
-
- private int random_msg_size_start_from = 1;
-
- private boolean cacheMessage = false;
-
- private boolean disableMessageID = false;
-
- private boolean disableTimestamp = false;
-
- private boolean durable = false;
-
- private int transaction_size = 0;
-
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
-
- private int msg_count = 10;
-
- private int warmup_count = 1;
-
- private boolean random_msg_size = false;
-
- private String msgType = "bytes";
-
- private boolean printStdDev = false;
-
- private int sendRate = 0;
-
- private boolean externalController = false;
-
- private boolean useUniqueDest = false; // useful when using multiple connections.
-
- private int ackFrequency = 100;
-
- private DecimalFormat df = new DecimalFormat("###.##");
-
- private int reportEvery = 0;
-
- private boolean isReportTotal = false;
-
- private boolean isReportHeader = true;
-
- private int sendEOS = 0;
-
- private int connectionCount = 1;
-
- private int rollbackFrequency = 0;
-
- private boolean printHeaders;
-
- private boolean printContent;
-
- private long ttl;
-
- private int priority;
-
- private String readyAddress;
-
- public JVMArgConfiguration()
- {
-
- url = System.getProperty("url",url);
- host = System.getProperty("host","");
- port = Integer.getInteger("port", -1);
- address = System.getProperty("address",address);
-
- timeout = Long.getLong("timeout",0);
- msg_size = Integer.getInteger("msg-size", 0);
- cacheMessage = true; //Boolean.getBoolean("cache-msg");
- disableMessageID = Boolean.getBoolean("disable-message-id");
- disableTimestamp = Boolean.getBoolean("disable-timestamp");
- durable = Boolean.getBoolean("durable");
- transaction_size = Integer.getInteger("tx",1000);
- ack_mode = Integer.getInteger("ack-mode",Session.AUTO_ACKNOWLEDGE);
- msg_count = Integer.getInteger("msg-count",msg_count);
- warmup_count = Integer.getInteger("warmup-count",warmup_count);
- random_msg_size = Boolean.getBoolean("random-msg-size");
- msgType = System.getProperty("msg-type","bytes");
- printStdDev = Boolean.getBoolean("print-std-dev");
- sendRate = Integer.getInteger("rate",0);
- externalController = Boolean.getBoolean("ext-controller");
- useUniqueDest = Boolean.getBoolean("use-unique-dest");
- random_msg_size_start_from = Integer.getInteger("random-msg-size-start-from", 1);
- reportEvery = Integer.getInteger("report-every",0);
- isReportTotal = Boolean.getBoolean("report-total");
- isReportHeader = (System.getProperty("report-header") == null) ? true : Boolean.getBoolean("report-header");
- sendEOS = Integer.getInteger("send-eos",1);
- connectionCount = Integer.getInteger("con_count",1);
- ackFrequency = Integer.getInteger("ack-frequency",100);
- rollbackFrequency = Integer.getInteger("rollback-frequency",0);
- printHeaders = Boolean.getBoolean("print-headers");
- printContent = Boolean.getBoolean("print-content");
- ttl = Long.getLong("ttl", 0);
- priority = Integer.getInteger("priority", 0);
- readyAddress = System.getProperty("ready-address");
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getUrl()
- */
- @Override
- public String getUrl()
- {
- return url;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getHost()
- */
- @Override
- public String getHost()
- {
- return host;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getPort()
- */
- @Override
- public int getPort()
- {
- return port;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAddress()
- */
- @Override
- public String getAddress()
- {
- return address;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getTimeout()
- */
- @Override
- public long getTimeout()
- {
- return timeout;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAckMode()
- */
- @Override
- public int getAckMode()
- {
- return ack_mode;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMsgCount()
- */
- @Override
- public int getMsgCount()
- {
- return msg_count;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMsgSize()
- */
- @Override
- public int getMsgSize()
- {
- return msg_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getRandomMsgSizeStartFrom()
- */
- @Override
- public int getRandomMsgSizeStartFrom()
- {
- return random_msg_size_start_from;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDurable()
- */
- @Override
- public boolean isDurable()
- {
- return durable;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isTransacted()
- */
- @Override
- public boolean isTransacted()
- {
- return transaction_size > 0;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getTransactionSize()
- */
- @Override
- public int getTransactionSize()
- {
- return transaction_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getWarmupCount()
- */
- @Override
- public int getWarmupCount()
- {
- return warmup_count;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isCacheMessage()
- */
- @Override
- public boolean isCacheMessage()
- {
- return cacheMessage;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDisableMessageID()
- */
- @Override
- public boolean isDisableMessageID()
- {
- return disableMessageID;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isDisableTimestamp()
- */
- @Override
- public boolean isDisableTimestamp()
- {
- return disableTimestamp;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isRandomMsgSize()
- */
- @Override
- public boolean isRandomMsgSize()
- {
- return random_msg_size;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getMessageType()
- */
- @Override
- public String getMessageType()
- {
- return msgType;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isPrintStdDev()
- */
- @Override
- public boolean isPrintStdDev()
- {
- return printStdDev;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getSendRate()
- */
- @Override
- public int getSendRate()
- {
- return sendRate;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isExternalController()
- */
- @Override
- public boolean isExternalController()
- {
- return externalController;
- }
-
- public void setAddress(String addr)
- {
- address = addr;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#isUseUniqueDests()
- */
- @Override
- public boolean isUseUniqueDests()
- {
- return useUniqueDest;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getAckFrequency()
- */
- @Override
- public int getAckFrequency()
- {
- return ackFrequency;
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#createConnection()
- */
- @Override
- public Connection createConnection() throws Exception
- {
- if (getHost().equals("") || getPort() == -1)
- {
- return new AMQConnection(getUrl());
- }
- else
- {
- return new AMQConnection(getHost(),getPort(),"guest","guest","test","test");
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.TestConfiguration#getDecimalFormat()
- */
- @Override
- public DecimalFormat getDecimalFormat()
- {
- return df;
- }
-
- @Override
- public int reportEvery()
- {
- return reportEvery;
- }
-
- @Override
- public boolean isReportTotal()
- {
- return isReportTotal;
- }
-
- @Override
- public boolean isReportHeader()
- {
- return isReportHeader;
- }
-
- @Override
- public int getSendEOS()
- {
- return sendEOS;
- }
-
- @Override
- public int getConnectionCount()
- {
- return connectionCount;
- }
-
- @Override
- public int getRollbackFrequency()
- {
- return rollbackFrequency;
- }
-
- @Override
- public boolean isPrintHeaders()
- {
- return printHeaders;
- }
-
- @Override
- public boolean isPrintContent()
- {
- return printContent;
- }
-
- @Override
- public long getTTL()
- {
- return ttl;
- }
-
- @Override
- public int getPriority()
- {
- return priority;
- }
-
- @Override
- public String getReadyAddress()
- {
- return readyAddress;
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
deleted file mode 100644
index 7ceef47573..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryBase.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.net.InetAddress;
-import java.util.UUID;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession_0_10;
-import org.apache.qpid.messaging.Address;
-import org.apache.qpid.tools.TestConfiguration.MessageType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MercuryBase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryBase.class);
-
- public final static String CODE = "CODE";
- public final static String ID = "ID";
- public final static String REPLY_ADDR = "REPLY_ADDR";
- public final static String MAX_LATENCY = "MAX_LATENCY";
- public final static String MIN_LATENCY = "MIN_LATENCY";
- public final static String AVG_LATENCY = "AVG_LATENCY";
- public final static String STD_DEV = "STD_DEV";
- public final static String CONS_RATE = "CONS_RATE";
- public final static String PROD_RATE = "PROD_RATE";
- public final static String MSG_COUNT = "MSG_COUNT";
- public final static String TIMESTAMP = "Timestamp";
-
- String CONTROLLER_ADDR = System.getProperty("CONT_ADDR","CONTROLLER;{create: always, node:{x-declare:{auto-delete:true}}}");
-
- TestConfiguration config;
- Connection con;
- Session session;
- Session controllerSession;
- Destination dest;
- Destination myControlQueue;
- Destination controllerQueue;
- String id;
- String myControlQueueAddr;
-
- MessageProducer sendToController;
- MessageConsumer receiveFromController;
- String prefix = "";
-
- enum OPCode
- {
- REGISTER_CONSUMER, REGISTER_PRODUCER,
- PRODUCER_STARTWARMUP, CONSUMER_STARTWARMUP,
- CONSUMER_READY, PRODUCER_READY,
- PRODUCER_START,
- RECEIVED_END_MSG, CONSUMER_STOP,
- RECEIVED_PRODUCER_STATS, RECEIVED_CONSUMER_STATS,
- CONTINUE_TEST, STOP_TEST
- };
-
- MessageType msgType = MessageType.BYTES;
-
- public MercuryBase(TestConfiguration config,String prefix)
- {
- this.config = config;
- String host = "";
- try
- {
- host = InetAddress.getLocalHost().getHostName();
- }
- catch (Exception e)
- {
- }
- id = host + "-" + UUID.randomUUID().toString();
- this.prefix = prefix;
- this.myControlQueueAddr = id + ";{create: always}";
- }
-
- public void setUp() throws Exception
- {
- con = config.createConnection();
- con.start();
-
- controllerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- dest = createDestination();
- controllerQueue = AMQDestination.createDestination(CONTROLLER_ADDR, false);
- myControlQueue = session.createQueue(myControlQueueAddr);
- msgType = MessageType.getType(config.getMessageType());
- _logger.debug("Using " + msgType + " messages");
-
- sendToController = controllerSession.createProducer(controllerQueue);
- receiveFromController = controllerSession.createConsumer(myControlQueue);
- }
-
- private Destination createDestination() throws Exception
- {
- if (config.isUseUniqueDests())
- {
- _logger.debug("Prefix : " + prefix);
- Address addr = Address.parse(config.getAddress());
- AMQDestination temp = (AMQDestination) AMQDestination.createDestination(config.getAddress(), false);
- int type = ((AMQSession_0_10)session).resolveAddressType(temp);
-
- if ( type == AMQDestination.TOPIC_TYPE)
- {
- addr = new Address(addr.getName(),addr.getSubject() + "." + prefix,addr.getOptions());
- System.out.println("Setting subject : " + addr);
- }
- else
- {
- addr = new Address(addr.getName() + "_" + prefix,addr.getSubject(),addr.getOptions());
- System.out.println("Setting name : " + addr);
- }
-
- return AMQDestination.createDestination(addr.toString(), false);
- }
- else
- {
- return AMQDestination.createDestination(config.getAddress(), false);
- }
- }
-
- public synchronized void sendMessageToController(MapMessage m) throws Exception
- {
- m.setString(ID, id);
- m.setString(REPLY_ADDR,myControlQueueAddr);
- sendToController.send(m);
- }
-
- public void receiveFromController(OPCode expected) throws Exception
- {
- MapMessage m = (MapMessage)receiveFromController.receive();
- OPCode code = OPCode.values()[m.getInt(CODE)];
- _logger.debug("Received Code : " + code);
- if (expected != code)
- {
- throw new Exception("Expected OPCode : " + expected + " but received : " + code);
- }
-
- }
-
- public boolean continueTest() throws Exception
- {
- MapMessage m = (MapMessage)receiveFromController.receive();
- OPCode code = OPCode.values()[m.getInt(CODE)];
- _logger.debug("Received Code : " + code);
- return (code == OPCode.CONTINUE_TEST);
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- controllerSession.close();
- con.close();
- }
-
- public void handleError(Exception e,String msg)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
- }
-}
-
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
deleted file mode 100644
index b35adc45d6..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryConsumerController.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
-
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.tools.report.MercuryReporter;
-import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughputAndLatency;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * PerfConsumer will receive x no of messages in warmup mode.
- * Once it receives the Start message it will then signal the PerfProducer.
- * It will start recording stats from the first message it receives after
- * the warmup mode is done.
- *
- * The following calculations are done.
- * The important numbers to look at is
- * a) Avg Latency
- * b) System throughput.
- *
- * Latency.
- * =========
- * Currently this test is written with the assumption that either
- * a) The Perf Producer and Consumer are on the same machine
- * b) They are on separate machines that have their time synced via a Time Server
- *
- * In order to calculate latency the producer inserts a timestamp
- * when the message is sent. The consumer will note the current time the message is
- * received and will calculate the latency as follows
- * latency = rcvdTime - msg.getJMSTimestamp()
- *
- * Through out the test it will keep track of the max and min latency to show the
- * variance in latencies.
- *
- * Avg latency is measured by adding all latencies and dividing by the total msgs.
- *
- * Throughput
- * ===========
- * Consumer rate is calculated as
- * rcvdMsgCount/(rcvdTime - startTime)
- *
- * Note that the testStartTime referes to when the producer sent the first message
- * and startTime is when the consumer first received a message.
- *
- * rcvdTime keeps track of when the last message is received.
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- */
-
-public class MercuryConsumerController extends MercuryBase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryConsumerController.class);
- MercuryReporter reporter;
- TestConfiguration config;
- QpidReceive receiver;
-
- public MercuryConsumerController(TestConfiguration config, MercuryReporter reporter, String prefix)
- {
- super(config,prefix);
- this.reporter = reporter;
- if (_logger.isInfoEnabled())
- {
- _logger.info("Consumer ID : " + id);
- }
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- receiver = new QpidReceive(reporter,config, con,dest);
- receiver.setUp();
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_CONSUMER.ordinal());
- sendMessageToController(m);
- }
-
- public void warmup()throws Exception
- {
- receiveFromController(OPCode.CONSUMER_STARTWARMUP);
- receiver.waitforCompletion(config.getWarmupCount());
-
- // It's more realistic for the consumer to signal this.
- MapMessage m1 = controllerSession.createMapMessage();
- m1.setInt(CODE, OPCode.PRODUCER_READY.ordinal());
- sendMessageToController(m1);
-
- MapMessage m2 = controllerSession.createMapMessage();
- m2.setInt(CODE, OPCode.CONSUMER_READY.ordinal());
- sendMessageToController(m2);
- }
-
- public void runReceiver() throws Exception
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Consumer: " + id + " Starting iteration......" + "\n");
- }
- resetCounters();
- receiver.waitforCompletion(config.getMsgCount());
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_END_MSG.ordinal());
- sendMessageToController(m);
- }
-
- public void resetCounters()
- {
- reporter.clear();
- }
-
- public void sendResults() throws Exception
- {
- receiveFromController(OPCode.CONSUMER_STOP);
- reporter.report();
-
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.RECEIVED_CONSUMER_STATS.ordinal());
- m.setDouble(AVG_LATENCY, reporter.getAvgLatency());
- m.setDouble(MIN_LATENCY, reporter.getMinLatency());
- m.setDouble(MAX_LATENCY, reporter.getMaxLatency());
- m.setDouble(STD_DEV, reporter.getStdDev());
- m.setDouble(CONS_RATE, reporter.getRate());
- m.setLong(MSG_COUNT, reporter.getSampleSize());
- sendMessageToController(m);
-
- reporter.log(new StringBuilder("Total Msgs Received : ").append(reporter.getSampleSize()).toString());
- reporter.log(new StringBuilder("Consumer rate : ").
- append(config.getDecimalFormat().format(reporter.getRate())).
- append(" msg/sec").toString());
- reporter.log(new StringBuilder("Avg Latency : ").
- append(config.getDecimalFormat().format(reporter.getAvgLatency())).
- append(" ms").toString());
- reporter.log(new StringBuilder("Min Latency : ").
- append(config.getDecimalFormat().format(reporter.getMinLatency())).
- append(" ms").toString());
- reporter.log(new StringBuilder("Max Latency : ").
- append(config.getDecimalFormat().format(reporter.getMaxLatency())).
- append(" ms").toString());
- if (config.isPrintStdDev())
- {
- reporter.log(new StringBuilder("Std Dev : ").
- append(reporter.getStdDev()).toString());
- }
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- System.out.println("=========================================================\n");
- System.out.println("Consumer: " + id + " starting a new iteration ......\n");
- runReceiver();
- sendResults();
- nextIteration = continueTest();
- }
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- @Override
- public void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- MercuryReporter reporter= new MercuryReporter(MercuryThroughputAndLatency.class,System.out,10,true);
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = config.getConnectionCount();
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
- {
- final MercuryConsumerController cons = new MercuryConsumerController(config, reporter, scriptId + i);
- Runnable r = new Runnable()
- {
- public void run()
- {
- cons.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
- t.start();
- }
- testCompleted.await();
- reporter.log("Consumers have completed the test......\n");
- }
-} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
deleted file mode 100644
index 02377bb853..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryProducerController.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
-
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.tools.report.MercuryReporter;
-import org.apache.qpid.tools.report.MercuryReporter.MercuryThroughput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
- * from the consumer that it has successfully consumed them and ready to start the
- * test. It will start sending y no of messages and each message will contain a time
- * stamp. This will be used at the receiving end to measure the latency.
- *
- * This is done with the assumption that both consumer and producer are running on
- * the same machine or different machines which have time synced using a time server.
- *
- * This test also calculates the producer rate as follows.
- * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- * Rajith - Producer rate is not an accurate perf metric IMO.
- * It is heavily inlfuenced by any in memory buffering.
- * System throughput and latencies calculated by the PerfConsumer are more realistic
- * numbers.
- *
- * Answer by rajith : I agree about in memory buffering affecting rates. But Based on test runs
- * I have done so far, it seems quite useful to compute the producer rate as it gives an
- * indication of how the system behaves. For ex if there is a gap between producer and consumer rates
- * you could clearly see the higher latencies and when producer and consumer rates are very close,
- * latency is good.
- *
- */
-public class MercuryProducerController extends MercuryBase
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
- MercuryReporter reporter;
- QpidSend sender;
-
- public MercuryProducerController(TestConfiguration config, MercuryReporter reporter, String prefix)
- {
- super(config,prefix);
- this.reporter = reporter;
- System.out.println("Producer ID : " + id);
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- sender = new QpidSend(reporter,config, con,dest);
- sender.setUp();
- MapMessage m = controllerSession.createMapMessage();
- m.setInt(CODE, OPCode.REGISTER_PRODUCER.ordinal());
- sendMessageToController(m);
- }
-
- public void warmup()throws Exception
- {
- receiveFromController(OPCode.PRODUCER_STARTWARMUP);
- if (_logger.isInfoEnabled())
- {
- _logger.info("Producer: " + id + " Warming up......");
- }
- sender.send(config.getWarmupCount());
- sender.sendEndMessage();
- }
-
- public void runSender() throws Exception
- {
- resetCounters();
- receiveFromController(OPCode.PRODUCER_START);
- sender.send(config.getMsgCount());
- }
-
- public void resetCounters()
- {
- sender.resetCounters();
- }
-
- public void sendResults() throws Exception
- {
- MapMessage msg = controllerSession.createMapMessage();
- msg.setInt(CODE, OPCode.RECEIVED_PRODUCER_STATS.ordinal());
- msg.setDouble(PROD_RATE, reporter.getRate());
- sendMessageToController(msg);
- reporter.log(new StringBuilder("Producer rate: ").
- append(config.getDecimalFormat().format(reporter.getRate())).
- append(" msg/sec").
- toString());
- }
-
- @Override
- public void tearDown() throws Exception
- {
- sender.tearDown();
- super.tearDown();
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- boolean nextIteration = true;
- while (nextIteration)
- {
- if(_logger.isInfoEnabled())
- {
- _logger.info("=========================================================\n");
- _logger.info("Producer: " + id + " starting a new iteration ......\n");
- }
- runSender();
- sendResults();
- nextIteration = continueTest();
- }
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- public void startControllerIfNeeded()
- {
- if (!config.isExternalController())
- {
- final MercuryTestController controller = new MercuryTestController(config);
- Runnable r = new Runnable()
- {
- public void run()
- {
- controller.run();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating controller thread",e);
- }
- t.start();
- }
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- MercuryReporter reporter= new MercuryReporter(MercuryThroughput.class,System.out,10,true);
- String scriptId = (args.length == 1) ? args[0] : "";
- int conCount = config.getConnectionCount();
- final CountDownLatch testCompleted = new CountDownLatch(conCount);
- for (int i=0; i < conCount; i++)
- {
- final MercuryProducerController prod = new MercuryProducerController(config, reporter, scriptId + i);
- prod.startControllerIfNeeded();
- Runnable r = new Runnable()
- {
- public void run()
- {
- prod.run();
- testCompleted.countDown();
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating producer thread",e);
- }
- t.start();
- }
- testCompleted.await();
- reporter.log("Producers have completed the test......");
- }
-} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
deleted file mode 100644
index 8c66a1e44d..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MercuryTestController.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.io.FileWriter;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-
-import org.apache.qpid.client.message.AMQPEncodedMapMessage;
-import org.apache.qpid.tools.report.Reporter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The Controller coordinates a test run between a number
- * of producers and consumers, configured via -Dprod_count and -Dcons_count.
- *
- * It waits till all the producers and consumers have registered and then
- * conducts a warmup run. Once all consumers and producers have completed
- * the warmup run and is ready, it will conduct the actual test run and
- * collect all stats from the participants and calculates the system
- * throughput, the avg/min/max for producer rates, consumer rates and latency.
- *
- * These stats are then printed to std out.
- * The Controller also prints events to std out to give a running account
- * of the test run in progress. Ex registering of participants, starting warmup ..etc.
- * This allows a scripting tool to monitor the progress.
- *
- * The Controller can be run in two modes.
- * 1. A single test run (default) where it just runs until the message count specified
- * for the producers via -Dmsg_count is sent and received.
- *
- * 2. Time based, configured via -Dduration=x, where x is in mins.
- * In this mode, the Controller repeatedly cycles through the tests (after an initial
- * warmup run) until the desired time is reached. If a test run is in progress
- * and the time is up, it will allow the run the complete.
- *
- * After each iteration, the stats will be printed out in csv format to a separate log file.
- * System throughput is calculated as follows
- * totalMsgCount/(totalTestTime)
- */
-public class MercuryTestController extends MercuryBase implements MessageListener
-{
- private static final Logger _logger = LoggerFactory.getLogger(MercuryProducerController.class);
-
- enum TestMode { SINGLE_RUN, TIME_BASED };
-
- TestMode testMode = TestMode.SINGLE_RUN;
-
- long totalTestTime;
-
- private double avgSystemLatency = 0.0;
- private double minSystemLatency = Double.MAX_VALUE;
- private double maxSystemLatency = 0;
- private double avgSystemLatencyStdDev = 0.0;
-
- private double avgSystemConsRate = 0.0;
- private double maxSystemConsRate = 0.0;
- private double minSystemConsRate = Double.MAX_VALUE;
-
- private double avgSystemProdRate = 0.0;
- private double maxSystemProdRate = 0.0;
- private double minSystemProdRate = Double.MAX_VALUE;
-
- private long totalMsgCount = 0;
- private double totalSystemThroughput = 0.0;
-
- private int consumerCount = Integer.getInteger("cons_count", 1);
- private int producerCount = Integer.getInteger("prod_count", 1);
- private int duration = Integer.getInteger("duration", -1); // in mins
- private Map<String,MapMessage> consumers;
- private Map<String,MapMessage> producers;
-
- private CountDownLatch consRegistered;
- private CountDownLatch prodRegistered;
- private CountDownLatch consReady;
- private CountDownLatch prodReady;
- private CountDownLatch receivedEndMsg;
- private CountDownLatch receivedConsStats;
- private CountDownLatch receivedProdStats;
-
- private MessageConsumer consumer;
- private boolean printStdDev = false;
- private FileWriter writer;
- private Reporter report;
-
- public MercuryTestController(TestConfiguration config)
- {
- super(config,"");
-
- consumers = new ConcurrentHashMap<String,MapMessage>(consumerCount);
- producers = new ConcurrentHashMap<String,MapMessage>(producerCount);
-
- consRegistered = new CountDownLatch(consumerCount);
- prodRegistered = new CountDownLatch(producerCount);
- consReady = new CountDownLatch(consumerCount);
- prodReady = new CountDownLatch(producerCount);
- printStdDev = config.isPrintStdDev();
- testMode = (duration == -1) ? TestMode.SINGLE_RUN : TestMode.TIME_BASED;
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- if (testMode == TestMode.TIME_BASED)
- {
- writer = new FileWriter("stats-csv.log");
- }
- consumer = controllerSession.createConsumer(controllerQueue);
- report.log("\nController: " + producerCount + " producers are expected");
- report.log("Controller: " + consumerCount + " consumers are expected \n");
- consumer.setMessageListener(this);
- consRegistered.await();
- prodRegistered.await();
- report.log("\nController: All producers and consumers have registered......\n");
- }
-
- public void warmup() throws Exception
- {
- report.log("Controller initiating warm up sequence......");
- sendMessageToNodes(OPCode.CONSUMER_STARTWARMUP,consumers.values());
- sendMessageToNodes(OPCode.PRODUCER_STARTWARMUP,producers.values());
- prodReady.await();
- consReady.await();
- report.log("\nController : All producers and consumers are ready to start the test......\n");
- }
-
- public void startTest() throws Exception
- {
- resetCounters();
- report.log("\nController Starting test......");
- long start = Clock.getTime();
- sendMessageToNodes(OPCode.PRODUCER_START,producers.values());
- receivedEndMsg.await();
- totalTestTime = Clock.getTime() - start;
- sendMessageToNodes(OPCode.CONSUMER_STOP,consumers.values());
- receivedProdStats.await();
- receivedConsStats.await();
- }
-
- public void resetCounters()
- {
- minSystemLatency = Double.MAX_VALUE;
- maxSystemLatency = 0;
- maxSystemConsRate = 0.0;
- minSystemConsRate = Double.MAX_VALUE;
- maxSystemProdRate = 0.0;
- minSystemProdRate = Double.MAX_VALUE;
-
- totalMsgCount = 0;
-
- receivedConsStats = new CountDownLatch(consumerCount);
- receivedProdStats = new CountDownLatch(producerCount);
- receivedEndMsg = new CountDownLatch(producerCount);
- }
-
- public void calcStats() throws Exception
- {
- double totLatency = 0.0;
- double totStdDev = 0.0;
- double totalConsRate = 0.0;
- double totalProdRate = 0.0;
-
- MapMessage conStat = null; // for error handling
- try
- {
- for (MapMessage m: consumers.values())
- {
- conStat = m;
- minSystemLatency = Math.min(minSystemLatency,m.getDouble(MIN_LATENCY));
- maxSystemLatency = Math.max(maxSystemLatency,m.getDouble(MAX_LATENCY));
- totLatency = totLatency + m.getDouble(AVG_LATENCY);
- totStdDev = totStdDev + m.getDouble(STD_DEV);
-
- minSystemConsRate = Math.min(minSystemConsRate,m.getDouble(CONS_RATE));
- maxSystemConsRate = Math.max(maxSystemConsRate,m.getDouble(CONS_RATE));
- totalConsRate = totalConsRate + m.getDouble(CONS_RATE);
-
- totalMsgCount = totalMsgCount + m.getLong(MSG_COUNT);
- }
- }
- catch(Exception e)
- {
- System.err.println("Error calculating stats from Consumer : " + conStat);
- }
-
-
- MapMessage prodStat = null; // for error handling
- try
- {
- for (MapMessage m: producers.values())
- {
- prodStat = m;
- minSystemProdRate = Math.min(minSystemProdRate,m.getDouble(PROD_RATE));
- maxSystemProdRate = Math.max(maxSystemProdRate,m.getDouble(PROD_RATE));
- totalProdRate = totalProdRate + m.getDouble(PROD_RATE);
- }
- }
- catch(Exception e)
- {
- System.err.println("Error calculating stats from Producer : " + conStat);
- }
-
- avgSystemLatency = totLatency/consumers.size();
- avgSystemLatencyStdDev = totStdDev/consumers.size();
- avgSystemConsRate = totalConsRate/consumers.size();
- avgSystemProdRate = totalProdRate/producers.size();
-
- report.log("Total test time : " + totalTestTime + " in " + Clock.getPrecision());
-
- totalSystemThroughput = (totalMsgCount*Clock.convertToSecs()/totalTestTime);
- }
-
- public void printResults() throws Exception
- {
- report.log(new StringBuilder("Total Msgs Received : ").append(totalMsgCount).toString());
- report.log(new StringBuilder("System Throughput : ").
- append(config.getDecimalFormat().format(totalSystemThroughput)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Avg Consumer rate : ").
- append(config.getDecimalFormat().format(avgSystemConsRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Min Consumer rate : ").
- append(config.getDecimalFormat().format(minSystemConsRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Max Consumer rate : ").
- append(config.getDecimalFormat().format(maxSystemConsRate)).
- append(" msg/sec").toString());
-
- report.log(new StringBuilder("Avg Producer rate : ").
- append(config.getDecimalFormat().format(avgSystemProdRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Min Producer rate : ").
- append(config.getDecimalFormat().format(minSystemProdRate)).
- append(" msg/sec").toString());
- report.log(new StringBuilder("Max Producer rate : ").
- append(config.getDecimalFormat().format(maxSystemProdRate)).
- append(" msg/sec").toString());
-
- report.log(new StringBuilder("Avg System Latency : ").
- append(config.getDecimalFormat().format(avgSystemLatency)).
- append(" ms").toString());
- report.log(new StringBuilder("Min System Latency : ").
- append(config.getDecimalFormat().format(minSystemLatency)).
- append(" ms").toString());
- report.log(new StringBuilder("Max System Latency : ").
- append(config.getDecimalFormat().format(maxSystemLatency)).
- append(" ms").toString());
- if (printStdDev)
- {
- report.log(new StringBuilder("Avg System Std Dev : ").
- append(avgSystemLatencyStdDev).toString());
- }
- }
-
- private synchronized void sendMessageToNodes(OPCode code,Collection<MapMessage> nodes) throws Exception
- {
- report.log("\nController: Sending code " + code);
- MessageProducer tmpProd = controllerSession.createProducer(null);
- MapMessage msg = controllerSession.createMapMessage();
- msg.setInt(CODE, code.ordinal());
- for (MapMessage node : nodes)
- {
- if (node.getString(REPLY_ADDR) == null)
- {
- report.log("REPLY_ADDR is null " + node);
- }
- else
- {
- report.log("Controller: Sending " + code + " to " + node.getString(REPLY_ADDR));
- }
- tmpProd.send(controllerSession.createQueue(node.getString(REPLY_ADDR)), msg);
- }
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- MapMessage m = (MapMessage)msg;
- OPCode code = OPCode.values()[m.getInt(CODE)];
-
- report.log("\n---------Controller Received Code : " + code);
- report.log("---------Data : " + ((AMQPEncodedMapMessage)m).getMap());
-
- switch (code)
- {
- case REGISTER_CONSUMER :
- if (consRegistered.getCount() == 0)
- {
- report.log("Warning : Expected number of consumers have already registered," +
- "ignoring extra consumer");
- break;
- }
- consumers.put(m.getString(ID),m);
- consRegistered.countDown();
- break;
-
- case REGISTER_PRODUCER :
- if (prodRegistered.getCount() == 0)
- {
- report.log("Warning : Expected number of producers have already registered," +
- "ignoring extra producer");
- break;
- }
- producers.put(m.getString(ID),m);
- prodRegistered.countDown();
- break;
-
- case CONSUMER_READY :
- consReady.countDown();
- break;
-
- case PRODUCER_READY :
- prodReady.countDown();
- break;
-
- case RECEIVED_END_MSG :
- receivedEndMsg.countDown();
- break;
-
- case RECEIVED_CONSUMER_STATS :
- consumers.put(m.getString(ID),m);
- receivedConsStats.countDown();
- break;
-
- case RECEIVED_PRODUCER_STATS :
- producers.put(m.getString(ID),m);
- receivedProdStats.countDown();
- break;
-
- default:
- throw new Exception("Invalid OPCode " + code);
- }
- }
- catch (Exception e)
- {
- handleError(e,"Error when receiving messages " + msg);
- }
- }
-
- public void run()
- {
- try
- {
- setUp();
- warmup();
- if (testMode == TestMode.SINGLE_RUN)
- {
- startTest();
- calcStats();
- printResults();
- }
- else
- {
- long startTime = Clock.getTime();
- long timeLimit = duration * 60 * 1000; // duration is in mins.
- boolean nextIteration = true;
- while (nextIteration)
- {
- startTest();
- calcStats();
- writeStatsToFile();
- if (Clock.getTime() - startTime < timeLimit)
- {
- sendMessageToNodes(OPCode.CONTINUE_TEST,consumers.values());
- sendMessageToNodes(OPCode.CONTINUE_TEST,producers.values());
- nextIteration = true;
- }
- else
- {
- nextIteration = false;
- }
- }
- }
- tearDown();
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- @Override
- public void tearDown() throws Exception {
- report.log("Controller: Completed the test......\n");
- if (testMode == TestMode.TIME_BASED)
- {
- writer.close();
- }
- sendMessageToNodes(OPCode.STOP_TEST,consumers.values());
- sendMessageToNodes(OPCode.STOP_TEST,producers.values());
- super.tearDown();
- }
-
- public void writeStatsToFile() throws Exception
- {
- writer.append(String.valueOf(totalMsgCount)).append(",");
- writer.append(config.getDecimalFormat().format(totalSystemThroughput)).append(",");
- writer.append(config.getDecimalFormat().format(avgSystemConsRate)).append(",");
- writer.append(config.getDecimalFormat().format(minSystemConsRate)).append(",");
- writer.append(config.getDecimalFormat().format(maxSystemConsRate)).append(",");
- writer.append(config.getDecimalFormat().format(avgSystemProdRate)).append(",");
- writer.append(config.getDecimalFormat().format(minSystemProdRate)).append(",");
- writer.append(config.getDecimalFormat().format(maxSystemProdRate)).append(",");
- writer.append(config.getDecimalFormat().format(avgSystemLatency)).append(",");
- writer.append(config.getDecimalFormat().format(minSystemLatency)).append(",");
- writer.append(config.getDecimalFormat().format(maxSystemLatency));
- if (printStdDev)
- {
- writer.append(",").append(String.valueOf(avgSystemLatencyStdDev));
- }
- writer.append("\n");
- writer.flush();
- }
-
- public static void main(String[] args)
- {
- TestConfiguration config = new JVMArgConfiguration();
- MercuryTestController controller = new MercuryTestController(config);
- controller.run();
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
deleted file mode 100644
index a0ba928e1f..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/MessageFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-public class MessageFactory
-{
- public static Message createBytesMessage(Session ssn, int size) throws JMSException
- {
- BytesMessage msg = ssn.createBytesMessage();
- msg.writeBytes(createMessagePayload(size).getBytes());
- return msg;
- }
-
- public static Message createTextMessage(Session ssn, int size) throws JMSException
- {
- TextMessage msg = ssn.createTextMessage();
- msg.setText(createMessagePayload(size));
- return msg;
- }
-
- public static String createMessagePayload(int size)
- {
- String msgData = "Qpid Test Message";
-
- StringBuffer buf = new StringBuffer(size);
- int count = 0;
- while (count <= (size - msgData.length()))
- {
- buf.append(msgData);
- count += msgData.length();
- }
- if (count < size)
- {
- buf.append(msgData, 0, size - count);
- }
-
- return buf.toString();
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
deleted file mode 100644
index e2d179965b..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java
+++ /dev/null
@@ -1,904 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import static org.apache.qpid.tools.QpidBench.Mode.BOTH;
-import static org.apache.qpid.tools.QpidBench.Mode.CONSUME;
-import static org.apache.qpid.tools.QpidBench.Mode.PUBLISH;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.thread.Threading;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.ExchangeBind;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageSubscribe;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.QueueDeclare;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-import org.apache.qpid.util.UUIDGen;
-import org.apache.qpid.util.UUIDs;
-
-/**
- * QpidBench
- *
- */
-
-public class QpidBench
-{
-
- static enum Mode
- {
- PUBLISH, CONSUME, BOTH
- }
-
- private static class Options
- {
- private StringBuilder usage = new StringBuilder("qpid-bench <options>");
-
- void usage(String name, String description, Object def)
- {
- String defval = "";
- if (def != null)
- {
- defval = String.format(" (%s)", def);
- }
- usage.append(String.format("\n %-15s%-14s %s", name, defval, description));
- }
-
- public String broker = "localhost";
- public int port = 5672;
- public long count = 1000000;
- public long window = 100000;
- public long sample = window;
- public int size = 1024;
- public Mode mode = BOTH;
- public boolean timestamp = false;
- public boolean message_id = false;
- public boolean message_cache = false;
- public boolean persistent = false;
- public boolean jms_publish = false;
- public boolean jms_consume = false;
- public boolean help = false;
-
- {
- usage("-b, --broker", "the broker hostname", broker);
- }
-
- public void parse__broker(String b)
- {
- this.broker = b;
- }
-
- public void parse_b(String b)
- {
- parse__broker(b);
- }
-
- {
- usage("-p, --port", "the broker port", port);
- }
-
- public void parse__port(String p)
- {
- this.port = Integer.parseInt(p);
- }
-
- public void parse_p(String p)
- {
- parse__port(p);
- }
-
- {
- usage("-c, --count", "the number of messages to send/receive, 0 means no limit", count);
- }
-
- public void parse__count(String c)
- {
- this.count = Long.parseLong(c);
- }
-
- public void parse_c(String c)
- {
- parse__count(c);
- }
-
- {
- usage("-w, --window", "the number of messages to send before blocking, 0 disables", window);
- }
-
- public void parse__window(String w)
- {
- this.window = Long.parseLong(w);
- }
-
- public void parse_w(String w)
- {
- parse__window(w);
- }
-
- {
- usage("--sample", "print stats after this many messages, 0 disables", sample);
- }
-
- public void parse__sample(String s)
- {
- this.sample = Long.parseLong(s);
- }
-
- {
- usage("-i, --interval", "sets both --window and --sample", window);
- }
-
- public void parse__interval(String i)
- {
- this.window = Long.parseLong(i);
- this.sample = window;
- }
-
- public void parse_i(String i)
- {
- parse__interval(i);
- }
-
- {
- usage("-s, --size", "the message size", size);
- }
-
- public void parse__size(String s)
- {
- this.size = Integer.parseInt(s);
- }
-
- public void parse_s(String s)
- {
- parse__size(s);
- }
-
- {
- usage("-m, --mode", "one of publish, consume, or both", mode);
- }
-
- public void parse__mode(String m)
- {
- if (m.equalsIgnoreCase("publish"))
- {
- this.mode = PUBLISH;
- }
- else if (m.equalsIgnoreCase("consume"))
- {
- this.mode = CONSUME;
- }
- else if (m.equalsIgnoreCase("both"))
- {
- this.mode = BOTH;
- }
- else
- {
- throw new IllegalArgumentException
- ("must be one of 'publish', 'consume', or 'both'");
- }
- }
-
- public void parse_m(String m)
- {
- parse__mode(m);
- }
-
- {
- usage("--timestamp", "set timestamps on each message if true", timestamp);
- }
-
- public void parse__timestamp(String t)
- {
- this.timestamp = Boolean.parseBoolean(t);
- }
-
- {
- usage("--mesage-id", "set the message-id on each message if true", message_id);
- }
-
- public void parse__message_id(String m)
- {
- this.message_id = Boolean.parseBoolean(m);
- }
-
- {
- usage("--message-cache", "reuse the same message for each send if true", message_cache);
- }
-
- public void parse__message_cache(String c)
- {
- this.message_cache = Boolean.parseBoolean(c);
- }
-
- {
- usage("--persistent", "set the delivery-mode to persistent if true", persistent);
- }
-
- public void parse__persistent(String p)
- {
- this.persistent = Boolean.parseBoolean(p);
- }
-
- {
- usage("--jms-publish", "use the jms client for publish", jms_publish);
- }
-
- public void parse__jms_publish(String jp)
- {
- this.jms_publish = Boolean.parseBoolean(jp);
- }
-
- {
- usage("--jms-consume", "use the jms client for consume", jms_consume);
- }
-
- public void parse__jms_consume(String jc)
- {
- this.jms_consume = Boolean.parseBoolean(jc);
- }
-
- {
- usage("--jms", "sets both --jms-publish and --jms-consume", false);
- }
-
- public void parse__jms(String j)
- {
- this.jms_publish = this.jms_consume = Boolean.parseBoolean(j);
- }
-
- {
- usage("-h, --help", "prints this message", null);
- }
-
- public void parse__help()
- {
- this.help = true;
- }
-
- public void parse_h()
- {
- parse__help();
- }
-
- public String parse(String ... args)
- {
- Class klass = getClass();
- List<String> arguments = new ArrayList<String>();
- for (int i = 0; i < args.length; i++)
- {
- String option = args[i];
-
- if (!option.startsWith("-"))
- {
- arguments.add(option);
- continue;
- }
-
- String method = "parse" + option.replace('-', '_');
- try
- {
- try
- {
- Method parser = klass.getMethod(method);
- parser.invoke(this);
- }
- catch (NoSuchMethodException e)
- {
- try
- {
- Method parser = klass.getMethod(method, String.class);
-
- String value = null;
- if (i + 1 < args.length)
- {
- value = args[i+1];
- i++;
- }
- else
- {
- return option + " requires a value";
- }
-
- parser.invoke(this, value);
- }
- catch (NoSuchMethodException e2)
- {
- return "no such option: " + option;
- }
- }
- }
- catch (InvocationTargetException e)
- {
- Throwable t = e.getCause();
- return String.format
- ("error parsing %s: %s: %s", option, t.getClass().getName(),
- t.getMessage());
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException
- ("unable to access parse method: " + option, e);
- }
- }
-
- return parseArguments(arguments);
- }
-
- public String parseArguments(List<String> arguments)
- {
- if (arguments.size() > 0)
- {
- String args = arguments.toString();
- return "unrecognized arguments: " + args.substring(1, args.length() - 1);
- }
- else
- {
- return null;
- }
- }
-
- public String toString()
- {
- Class klass = getClass();
- Field[] fields = klass.getFields();
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < fields.length; i++)
- {
- if (i > 0)
- {
- str.append("\n");
- }
-
- String name = fields[i].getName();
- str.append(name);
- str.append(" = ");
- Object value;
- try
- {
- value = fields[i].get(this);
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException
- ("unable to access field: " + name, e);
- }
- str.append(value);
- }
-
- return str.toString();
- }
- }
-
- public static final void main(String[] args) throws Exception
- {
- final Options opts = new Options();
- String error = opts.parse(args);
- if (error != null)
- {
- System.err.println(error);
- System.exit(-1);
- return;
- }
-
- if (opts.help)
- {
- System.out.println(opts.usage);
- return;
- }
-
- System.out.println(opts);
-
- switch (opts.mode)
- {
- case CONSUME:
- case BOTH:
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- if (opts.jms_consume)
- {
- jms_consumer(opts);
- }
- else
- {
- native_consumer(opts);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- System.out.println("Consumer Completed");
- }
- };
-
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating consumer thread",e);
- }
- t.start();
- break;
- }
-
- switch (opts.mode)
- {
- case PUBLISH:
- case BOTH:
- Runnable r = new Runnable()
- {
- public void run()
- {
- try
- {
- if (opts.jms_publish)
- {
- jms_publisher(opts);
- }
- else
- {
- native_publisher(opts);
- }
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- System.out.println("Producer Completed");
- }
- };
- Thread t;
- try
- {
- t = Threading.getThreadFactory().createThread(r);
- }
- catch(Exception e)
- {
- throw new Error("Error creating publisher thread",e);
- }
- t.start();
- break;
- }
- }
-
- private static enum Column
- {
- LEFT, RIGHT
- }
-
- private static final void sample(Options opts, Column col, String name, long count,
- long start, long time, long lastTime)
- {
- String pfx = "";
- String sfx = "";
- if (opts.mode == BOTH)
- {
- if (col == Column.RIGHT)
- {
- pfx = " -- ";
- }
- else
- {
- sfx = " --";
- }
- }
-
- if (count == 0)
- {
- String stats = String.format("%s: %tc", name, start);
- System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
- return;
- }
-
- double cumulative = 1000 * (double) count / (double) (time - start);
- double interval = 1000 * ((double) opts.sample / (double) (time - lastTime));
-
- String stats = String.format
- ("%s: %d %.2f %.2f", name, count, cumulative, interval);
- System.out.println(String.format("%s%-36s%s", pfx, stats, sfx));
- }
-
- private static final javax.jms.Connection getJMSConnection(Options opts) throws Exception
- {
- String url = String.format
- ("amqp://guest:guest@clientid/test?brokerlist='tcp://%s:%d'",
- opts.broker, opts.port);
- return new AMQConnection(url);
- }
-
- private static final void jms_publisher(Options opts) throws Exception
- {
- javax.jms.Connection conn = getJMSConnection(opts);
-
- javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createQueue("test-queue");
- Destination echo_dest = ssn.createQueue("echo-queue");
- MessageProducer prod = ssn.createProducer(dest);
- MessageConsumer cons = ssn.createConsumer(echo_dest);
- prod.setDisableMessageID(!opts.message_id);
- prod.setDisableMessageTimestamp(!opts.timestamp);
- prod.setDeliveryMode(opts.persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- StringBuilder str = new StringBuilder();
- for (int i = 0; i < opts.size; i++)
- {
- str.append((char) (i % 128));
- }
-
- String body = str.toString();
-
- TextMessage cached = ssn.createTextMessage();
- cached.setText(body);
-
- conn.start();
-
- long count = 0;
- long lastTime = 0;
- long start = System.currentTimeMillis();
- while (opts.count == 0 || count < opts.count)
- {
- if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
- {
- Message echo = cons.receive();
- }
-
- if (opts.sample > 0 && (count % opts.sample) == 0)
- {
- long time = System.currentTimeMillis();
- sample(opts, Column.LEFT, "JP", count, start, time, lastTime);
- lastTime = time;
- }
-
- TextMessage m;
- if (opts.message_cache)
- {
- m = cached;
- }
- else
- {
- m = ssn.createTextMessage();
- m.setText(body);
- }
-
- prod.send(m);
- count++;
- }
-
- conn.close();
- }
-
- private static final void jms_consumer(final Options opts) throws Exception
- {
- final javax.jms.Connection conn = getJMSConnection(opts);
- javax.jms.Session ssn = conn.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
- Destination dest = ssn.createQueue("test-queue");
- Destination echo_dest = ssn.createQueue("echo-queue");
- MessageConsumer cons = ssn.createConsumer(dest);
- final MessageProducer prod = ssn.createProducer(echo_dest);
- prod.setDisableMessageID(true);
- prod.setDisableMessageTimestamp(true);
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- final TextMessage echo = ssn.createTextMessage();
- echo.setText("ECHO");
-
- final Object done = new Object();
- cons.setMessageListener(new MessageListener()
- {
- private long count = 0;
- private long lastTime = 0;
- private long start;
-
- public void onMessage(Message m)
- {
- if (count == 0)
- {
- start = System.currentTimeMillis();
- }
-
- try
- {
- boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
- long time = sample ? System.currentTimeMillis() : 0;
-
- if (opts.window > 0 && (count % opts.window) == 0)
- {
- prod.send(echo);
- }
-
- if (sample)
- {
- sample(opts, Column.RIGHT, "JC", count, start, time, lastTime);
- lastTime = time;
- }
- }
- catch (JMSException e)
- {
- throw new RuntimeException(e);
- }
- count++;
-
- if (opts.count > 0 && count >= opts.count)
- {
- synchronized (done)
- {
- done.notify();
- }
- }
- }
- });
-
- conn.start();
- synchronized (done)
- {
- done.wait();
- }
- conn.close();
- }
-
- private static final org.apache.qpid.transport.Connection getConnection
- (Options opts)
- {
- org.apache.qpid.transport.Connection conn =
- new org.apache.qpid.transport.Connection();
- conn.connect(opts.broker, opts.port, null, "guest", "guest", false, null);
- return conn;
- }
-
- private static abstract class NativeListener implements SessionListener
- {
-
- public void opened(org.apache.qpid.transport.Session ssn) {}
-
- public void resumed(org.apache.qpid.transport.Session ssn) {}
-
- public void exception(org.apache.qpid.transport.Session ssn,
- SessionException exc)
- {
- exc.printStackTrace();
- }
-
- public void closed(org.apache.qpid.transport.Session ssn) {}
-
- }
-
- private static final void native_publisher(Options opts) throws Exception
- {
- final long[] echos = { 0 };
- org.apache.qpid.transport.Connection conn = getConnection(opts);
- org.apache.qpid.transport.Session ssn = conn.createSession();
- ssn.setSessionListener(new NativeListener()
- {
- public void message(org.apache.qpid.transport.Session ssn,
- MessageTransfer xfr)
- {
- synchronized (echos)
- {
- echos[0]++;
- echos.notify();
- }
- ssn.processed(xfr);
- }
- });
-
- ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
- ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
-
- MessageProperties cached_mp = new MessageProperties();
- DeliveryProperties cached_dp = new DeliveryProperties();
- cached_dp.setRoutingKey("test-queue");
- cached_dp.setDeliveryMode
- (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
-
- int size = opts.size;
- ByteBuffer body = ByteBuffer.allocate(size);
- for (int i = 0; i < size; i++)
- {
- body.put((byte) i);
- }
- body.flip();
-
- ssn.invoke(new MessageSubscribe()
- .queue("echo-queue")
- .destination("echo-queue")
- .acceptMode(MessageAcceptMode.NONE)
- .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
- ssn.messageSetFlowMode("echo-queue", MessageFlowMode.WINDOW);
- ssn.messageFlow("echo-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
- ssn.messageFlow("echo-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
-
- UUIDGen gen = UUIDs.newGenerator();
-
- long count = 0;
- long lastTime = 0;
- long start = System.currentTimeMillis();
- while (opts.count == 0 || count < opts.count)
- {
- if (opts.window > 0 && (count % opts.window) == 0 && count > 0)
- {
- synchronized (echos)
- {
- while (echos[0] < (count/opts.window))
- {
- echos.wait();
- }
- }
- }
-
- if (opts.sample > 0 && (count % opts.sample) == 0)
- {
- long time = System.currentTimeMillis();
- sample(opts, Column.LEFT, "NP", count, start, time, lastTime);
- lastTime = time;
- }
-
- MessageProperties mp;
- DeliveryProperties dp;
- if (opts.message_cache)
- {
- mp = cached_mp;
- dp = cached_dp;
- }
- else
- {
- mp = new MessageProperties();
- dp = new DeliveryProperties();
- dp.setRoutingKey("test-queue");
- dp.setDeliveryMode
- (opts.persistent ? MessageDeliveryMode.PERSISTENT : MessageDeliveryMode.NON_PERSISTENT);
-
- }
-
- if (opts.message_id)
- {
- mp.setMessageId(gen.generate());
- }
-
- if (opts.timestamp)
- {
- dp.setTimestamp(System.currentTimeMillis());
- }
-
- ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
- new Header(dp, mp), body.slice());
- count++;
- }
-
- ssn.messageCancel("echo-queue");
-
- ssn.sync();
- ssn.close();
- conn.close();
- }
-
- private static final void native_consumer(final Options opts) throws Exception
- {
- final DeliveryProperties dp = new DeliveryProperties();
- final byte[] echo = new byte[0];
- dp.setRoutingKey("echo-queue");
- dp.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
- final MessageProperties mp = new MessageProperties();
- final Object done = new Object();
- org.apache.qpid.transport.Connection conn = getConnection(opts);
- org.apache.qpid.transport.Session ssn = conn.createSession();
- ssn.setSessionListener(new NativeListener()
- {
- private long count = 0;
- private long lastTime = 0;
- private long start;
-
- public void message(org.apache.qpid.transport.Session ssn,
- MessageTransfer xfr)
- {
- if (count == 0)
- {
- start = System.currentTimeMillis();
- }
-
- boolean sample = opts.sample > 0 && (count % opts.sample) == 0;
- long time = sample ? System.currentTimeMillis() : 0;
-
- if (opts.window > 0 && (count % opts.window) == 0)
- {
- ssn.messageTransfer("amq.direct",
- MessageAcceptMode.NONE,
- MessageAcquireMode.PRE_ACQUIRED,
- new Header(dp, mp),
- echo);
- }
-
- if (sample)
- {
- sample(opts, Column.RIGHT, "NC", count, start, time, lastTime);
- lastTime = time;
- }
- ssn.processed(xfr);
- count++;
-
- if (opts.count > 0 && count >= opts.count)
- {
- synchronized (done)
- {
- done.notify();
- }
- }
- }
- });
-
- ssn.invoke(new QueueDeclare().queue("test-queue").durable(false));
- ssn.invoke(new QueueDeclare().queue("echo-queue").durable(false));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("test-queue").bindingKey("test-queue"));
- ssn.invoke(new ExchangeBind().exchange("amq.direct").queue("echo-queue").bindingKey("echo-queue"));
-
- ssn.invoke(new MessageSubscribe()
- .queue("test-queue")
- .destination("test-queue")
- .acceptMode(MessageAcceptMode.NONE)
- .acquireMode(MessageAcquireMode.PRE_ACQUIRED));
- ssn.messageSetFlowMode("test-queue", MessageFlowMode.WINDOW);
- ssn.messageFlow("test-queue", MessageCreditUnit.MESSAGE, 0xFFFFFFFF);
- ssn.messageFlow("test-queue", MessageCreditUnit.BYTE, 0xFFFFFFFF);
-
- synchronized (done)
- {
- done.wait();
- }
-
- ssn.messageCancel("test-queue");
-
- ssn.sync();
- ssn.close();
- conn.close();
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
deleted file mode 100644
index 4092f0d59d..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidReceive.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.tools.report.BasicReporter;
-import org.apache.qpid.tools.report.Reporter;
-import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class QpidReceive implements MessageListener
-{
- private static final Logger _logger = LoggerFactory.getLogger(QpidReceive.class);
- private final CountDownLatch testCompleted = new CountDownLatch(1);
-
- private Connection con;
- private Session session;
- private Destination dest;
- private MessageConsumer consumer;
- private boolean transacted = false;
- private boolean isRollback = false;
- private int txSize = 0;
- private int rollbackFrequency = 0;
- private int ackFrequency = 0;
- private int expected = 0;
- private int received = 0;
- private Reporter report;
- private TestConfiguration config;
-
- public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest)
- {
- this(report,config, con, dest, UUID.randomUUID().toString());
- }
-
- public QpidReceive(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
- {
- //System.out.println("Producer ID : " + id);
- this.report = report;
- this.config = config;
- this.con = con;
- this.dest = dest;
- }
-
- public void setUp() throws Exception
- {
- con.start();
- if (config.isTransacted())
- {
- session = con.createSession(true, Session.SESSION_TRANSACTED);
- }
- else if (config.getAckFrequency() > 0)
- {
- session = con.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
- }
- else
- {
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- consumer = session.createConsumer(dest);
- consumer.setMessageListener(this);
- if (_logger.isDebugEnabled())
- {
- System.out.println("Consumer: " + /*id +*/ " Receiving messages from : " + ((AMQDestination)dest).getAddressName() + "\n");
- }
-
- transacted = config.isTransacted();
- txSize = config.getTransactionSize();
- isRollback = config.getRollbackFrequency() > 0;
- rollbackFrequency = config.getRollbackFrequency();
- ackFrequency = config.getAckFrequency();
-
- _logger.debug("Ready address : " + config.getReadyAddress());
- if (config.getReadyAddress() != null)
- {
- MessageProducer prod = session.createProducer(AMQDestination
- .createDestination(config.getReadyAddress(), false));
- prod.send(session.createMessage());
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending message to ready address " + prod.getDestination());
- }
- }
- }
-
- public void resetCounters()
- {
- received = 0;
- expected = 0;
- report.clear();
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- if (msg instanceof TextMessage &&
- TestConfiguration.EOS.equals(((TextMessage)msg).getText()))
- {
- testCompleted.countDown();
- return;
- }
-
- received++;
- report.message(msg);
-
- if (config.isPrintHeaders())
- {
- System.out.println(((AbstractJMSMessage)msg).toHeaderString());
- }
-
- if (config.isPrintContent())
- {
- System.out.println(((AbstractJMSMessage)msg).toBodyString());
- }
-
- if (transacted && (received % txSize == 0))
- {
- if (isRollback && (received % rollbackFrequency == 0))
- {
- session.rollback();
- }
- else
- {
- session.commit();
- }
- }
- else if (ackFrequency > 0)
- {
- msg.acknowledge();
- }
-
- if (received >= expected)
- {
- testCompleted.countDown();
- }
-
- }
- catch(Exception e)
- {
- _logger.error("Error when receiving messages",e);
- }
- }
-
- public void waitforCompletion(int expected) throws Exception
- {
- this.expected = expected;
- testCompleted.await();
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- Reporter reporter = new BasicReporter(ThroughputAndLatency.class,
- System.out,
- config.reportEvery(),
- config.isReportHeader());
- Destination dest = AMQDestination.createDestination(config.getAddress(), false);
- QpidReceive receiver = new QpidReceive(reporter,config, config.createConnection(),dest);
- receiver.setUp();
- receiver.waitforCompletion(config.getMsgCount() + config.getSendEOS());
- if (config.isReportTotal())
- {
- reporter.report();
- }
- receiver.tearDown();
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
deleted file mode 100644
index 58a643726c..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidSend.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.tools.TestConfiguration.MessageType;
-import org.apache.qpid.tools.report.BasicReporter;
-import org.apache.qpid.tools.report.Reporter;
-import org.apache.qpid.tools.report.Statistics.Throughput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class QpidSend
-{
- private Connection con;
- private Session session;
- private Destination dest;
- private MessageProducer producer;
- private MessageType msgType;
- private Message msg;
- private Object payload;
- private List<Object> payloads;
- private boolean cacheMsg = false;
- private boolean randomMsgSize = false;
- private boolean durable = false;
- private Random random;
- private int msgSizeRange = 1024;
- private int totalMsgCount = 0;
- private boolean rateLimitProducer = false;
- private boolean transacted = false;
- private int txSize = 0;
-
- private static final Logger _logger = LoggerFactory.getLogger(QpidSend.class);
- Reporter report;
- TestConfiguration config;
-
- public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest)
- {
- this(report,config, con, dest, UUID.randomUUID().toString());
- }
-
- public QpidSend(Reporter report, TestConfiguration config, Connection con, Destination dest, String prefix)
- {
- //System.out.println("Producer ID : " + id);
- this.report = report;
- this.config = config;
- this.con = con;
- this.dest = dest;
- }
-
- public void setUp() throws Exception
- {
- con.start();
- if (config.isTransacted())
- {
- session = con.createSession(true, Session.SESSION_TRANSACTED);
- }
- else
- {
- session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- durable = config.isDurable();
- rateLimitProducer = config.getSendRate() > 0 ? true : false;
- if (_logger.isDebugEnabled() && rateLimitProducer)
- {
- _logger.debug("The test will attempt to limit the producer to " + config.getSendRate() + " msg/sec");
- }
-
- transacted = config.isTransacted();
- txSize = config.getTransactionSize();
-
- msgType = MessageType.getType(config.getMessageType());
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (config.isCacheMessage())
- {
- cacheMsg = true;
- msg = createMessage(createPayload(config.getMsgSize()));
- msg.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else if (config.isRandomMsgSize())
- {
- random = new Random(20080921);
- randomMsgSize = true;
- msgSizeRange = config.getMsgSize();
- payloads = new ArrayList<Object>(msgSizeRange);
-
- for (int i=0; i < msgSizeRange; i++)
- {
- payloads.add(createPayload(i));
- }
- }
- else
- {
- payload = createPayload(config.getMsgSize());
- }
-
- producer = session.createProducer(dest);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Producer: " + /*id +*/ " Sending messages to: " + ((AMQDestination)dest).getAddressName());
- }
- producer.setDisableMessageID(config.isDisableMessageID());
- //we add a separate timestamp to allow interoperability with other clients.
- producer.setDisableMessageTimestamp(true);
- if (config.getTTL() > 0)
- {
- producer.setTimeToLive(config.getTTL());
- }
- if (config.getPriority() > 0)
- {
- producer.setPriority(config.getPriority());
- }
- }
-
- Object createPayload(int size)
- {
- if (msgType == MessageType.TEXT)
- {
- return MessageFactory.createMessagePayload(size);
- }
- else
- {
- return MessageFactory.createMessagePayload(size).getBytes();
- }
- }
-
- Message createMessage(Object payload) throws Exception
- {
- if (msgType == MessageType.TEXT)
- {
- return session.createTextMessage((String)payload);
- }
- else
- {
- BytesMessage m = session.createBytesMessage();
- m.writeBytes((byte[])payload);
- return m;
- }
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (cacheMsg)
- {
- return msg;
- }
- else
- {
- Message m;
-
- if (!randomMsgSize)
- {
- m = createMessage(payload);
- }
- else
- {
- m = createMessage(payloads.get(random.nextInt(msgSizeRange)));
- }
- m.setJMSDeliveryMode(durable?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- return m;
- }
- }
-
- public void commit() throws Exception
- {
- session.commit();
- }
-
- public void send() throws Exception
- {
- send(config.getMsgCount());
- }
-
- public void send(int count) throws Exception
- {
- int sendRate = config.getSendRate();
- if (rateLimitProducer)
- {
- int iterations = count/sendRate;
- int remainder = count%sendRate;
- for (int i=0; i < iterations; i++)
- {
- long iterationStart = System.currentTimeMillis();
- sendMessages(sendRate);
- long elapsed = System.currentTimeMillis() - iterationStart;
- long diff = Clock.SEC - elapsed;
- if (diff > 0)
- {
- // We have sent more messages in a sec than specified by the rate.
- Thread.sleep(diff);
- }
- }
- sendMessages(remainder);
- }
- else
- {
- sendMessages(count);
- }
- }
-
- private void sendMessages(int count) throws Exception
- {
- boolean isTimestamp = !config.isDisableTimestamp();
- long s = System.currentTimeMillis();
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- if (isTimestamp)
- {
- msg.setLongProperty(TestConfiguration.TIMESTAMP, System.currentTimeMillis());
- }
- producer.send(msg);
- //report.message(msg);
- totalMsgCount++;
-
- if ( transacted && ((totalMsgCount) % txSize == 0))
- {
- session.commit();
- }
- }
- long e = System.currentTimeMillis() - s;
- //System.out.println("Rate : " + totalMsgCount/e);
- }
-
- public void resetCounters()
- {
- totalMsgCount = 0;
- report.clear();
- }
-
- public void sendEndMessage() throws Exception
- {
- Message msg = session.createTextMessage(TestConfiguration.EOS);
- producer.send(msg);
- }
-
- public void tearDown() throws Exception
- {
- session.close();
- }
-
- public static void main(String[] args) throws Exception
- {
- TestConfiguration config = new JVMArgConfiguration();
- Reporter reporter = new BasicReporter(Throughput.class,
- System.out,
- config.reportEvery(),
- config.isReportHeader()
- );
- Destination dest = AMQDestination.createDestination(config.getAddress(), false);
- QpidSend sender = new QpidSend(reporter,config, config.createConnection(),dest);
- sender.setUp();
- sender.send();
- if (config.getSendEOS() > 0)
- {
- sender.sendEndMessage();
- }
- if (config.isReportTotal())
- {
- reporter.report();
- }
- sender.tearDown();
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java
deleted file mode 100644
index 790ed80e5f..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/RestStressTestClient.java
+++ /dev/null
@@ -1,667 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.qpid.tools.util.ArgumentsParser;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-
-public class RestStressTestClient
-{
-
- public static void main(String[] args) throws Exception
- {
- ArgumentsParser parser = new ArgumentsParser();
- Arguments arguments;
- try
- {
- arguments = parser.parse(args, Arguments.class);
- arguments.validate();
- }
- catch(IllegalArgumentException e)
- {
- System.out.println("Invalid argument:" + e.getMessage());
- parser.usage(Arguments.class, Arguments.REQUIRED);
- System.out.println("\nRun examples:" );
- System.out.println(" Using Basic authentication:" );
- System.out.println(" java -cp qpid-tools.jar:commons-codec.jar:jackson-core.jar:jackson-mapper.jar \\" );
- System.out.println(" -Djavax.net.ssl.trustStore=java_client_truststore.jks \\");
- System.out.println(" -Djavax.net.ssl.trustStorePassword=password \\");
- System.out.println(" org.apache.qpid.tools.RestStressTestClient \\");
- System.out.println(" repetitions=10 brokerUrl=https://localhost:8081 username=admin password=admin \\");
- System.out.println(" virtualHost=default virtualHostNode=default createQueue=true bindQueue=true \\");
- System.out.println(" deleteQueue=true uniqueQueues=true queueName=boo exchangeName=amq.fanout" );
- System.out.println(" Using CRAM-MD5 SASL authentication:" );
- System.out.println(" java -cp qpid-tools.jar:commons-codec.jar:jackson-core.jar:jackson-mapper.jar \\" );
- System.out.println(" org.apache.qpid.tools.RestStressTestClient saslMechanism=CRAM-MD5 \\");
- System.out.println(" repetitions=10 brokerUrl=http://localhost:8080 username=admin password=admin \\");
- System.out.println(" virtualHost=default virtualHostNode=default createQueue=true bindQueue=true \\");
- System.out.println(" deleteQueue=true uniqueQueues=true queueName=boo exchangeName=amq.fanout" );
- return;
- }
-
- RestStressTestClient client = new RestStressTestClient();
- client.run(arguments);
- }
-
- public void run(Arguments arguments) throws IOException
- {
- log(arguments.toString());
- for (int i = 0; i < arguments.getRepetitions(); i++)
- {
- runIteration(arguments, i);
- }
- }
-
- private void runIteration(Arguments arguments, int iteration) throws IOException
- {
- log("Iteration " + iteration);
-
- RestClient client = new RestClient(arguments.getBrokerUrl(), arguments.getUsername(), arguments.getPassword(), arguments.getSaslMechanism());
- client.authenticateIfSaslAuthenticationRequested();
- try
- {
- List<Map<String, Object>> brokerData = client.get("/api/latest/broker?depth=0");
- log(" Connected to broker " + brokerData.get(0).get("name"));
- createAndBindQueueIfRequired(arguments, client, iteration);
- }
- finally
- {
- if (arguments.isLogout())
- {
- client.logout();
- }
- }
- }
-
- private void log(String logMessage)
- {
- System.out.println(logMessage);
- }
-
- private void createAndBindQueueIfRequired(Arguments arguments, RestClient client, int iteration) throws IOException
- {
- if (arguments.isCreateQueue())
- {
- String virtualHostNode = arguments.getVirtualHostNode();
- String virtualHost = arguments.getVirtualHost();
- String queueName = arguments.getQueueName();
-
- if (queueName == null)
- {
- queueName = "temp-queue-" + System.nanoTime();
- }
- else if (arguments.isUniqueQueues())
- {
- queueName = queueName + "-" + iteration;
- }
-
- createQueue(client, virtualHostNode, virtualHost, queueName);
-
- if (arguments.isBindQueue())
- {
- bindQueue(client, virtualHostNode, virtualHost, queueName, arguments.getExchangeName());
- }
-
- if (arguments.isDeleteQueue())
- {
- deleteQueue(client, virtualHostNode, virtualHost, queueName);
- }
- }
- }
-
- private void createQueue(RestClient client, String virtualHostNode, String virtualHost, String queueName) throws IOException
- {
- log(" Create queue " + queueName);
-
- String queueUrl = getQueueServiceUrl(virtualHostNode, virtualHost, queueName);
- Map<String, Object> queueData = new HashMap<>();
- queueData.put("name", queueName);
- queueData.put("durable", true);
-
- int result = client.put(queueUrl, queueData);
-
- if (result != RestClient.RESPONSE_PUT_CREATE_OK)
- {
- throw new RuntimeException("Failure to create queue " + queueName);
- }
- }
-
- private String getQueueServiceUrl(String virtualHostNode, String virtualHost, String queueName)
- {
- return "/api/latest/queue/" + virtualHostNode + "/" + virtualHost + "/" + queueName;
- }
-
- private void deleteQueue(RestClient client, String virtualHostNode, String virtualHost, String queueName) throws IOException
- {
- log(" Delete queue " + queueName);
- int result = client.delete(getQueueServiceUrl(virtualHostNode, virtualHost, queueName));
- if (result != RestClient.RESPONSE_PUT_UPDATE_OK)
- {
- throw new RuntimeException("Failure to delete queue " + queueName);
- }
- }
-
- private void bindQueue(RestClient client, String virtualHostNode, String virtualHost, String queueName, String exchangeName)
- throws IOException
- {
- if (exchangeName == null)
- {
- exchangeName = "amq.direct";
- }
-
- log(" Bind queue " + queueName + " to " + exchangeName + " using binding key " + queueName);
-
- String bindingUrl = "/api/latest/binding/" + virtualHostNode + "/" + virtualHost + "/" + exchangeName + "/" + queueName + "/" + queueName;
-
- Map<String, Object> bindingData = new HashMap<>();
- bindingData.put("name", queueName);
- bindingData.put("queue", queueName);
- bindingData.put("exchange", exchangeName);
-
- int result = client.put(bindingUrl, bindingData);
-
- if (result != RestClient.RESPONSE_PUT_CREATE_OK)
- {
- throw new RuntimeException("Failure to bind queue " + queueName + " to " + exchangeName);
- }
- }
-
- public static class RestClient
- {
- private static final TypeReference<List<LinkedHashMap<String, Object>>> TYPE_LIST_OF_LINKED_HASH_MAPS = new TypeReference<List<LinkedHashMap<String, Object>>>()
- {
- };
-
- private static final TypeReference<LinkedHashMap<String, Object>> TYPE_HASH_MAP = new TypeReference<LinkedHashMap<String, Object>>()
- {
- };
-
- public static final int RESPONSE_PUT_CREATE_OK = 201;
- public static final int RESPONSE_PUT_UPDATE_OK = 200;
- public static final int RESPONSE_OK = 200;
- public static final int RESPONSE_AUTHENTICATION_REQUIRED = 401;
-
- private final ObjectMapper _mapper;
- private final String _brokerUrl;
- private final String _username;
- private final String _password;
- private final String _saslMechanism;
- private final String _authorizationHeader;
-
- private List<String> _cookies;
-
- public RestClient(String brokerUrl, String username, String password, String saslMechanism)
- {
- _mapper = new ObjectMapper();
- _brokerUrl = brokerUrl;
- _username = username;
- _password = password;
- _saslMechanism = saslMechanism;
-
- if (saslMechanism == null)
- {
- _authorizationHeader = "Basic " + new String(new Base64().encode((_username + ":" + _password).getBytes()));
- }
- else
- {
- _authorizationHeader = null;
- }
- }
-
- public List<Map<String, Object>> get(String restServiceUrl) throws IOException
- {
- HttpURLConnection connection = createConnection("GET", restServiceUrl, _cookies);
- try
- {
- connection.connect();
- byte[] data = readConnectionInputStream(connection);
- checkResponseCode(connection);
- return _mapper.readValue(new ByteArrayInputStream(data), TYPE_LIST_OF_LINKED_HASH_MAPS);
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- public int put(String restServiceUrl, Map<String, Object> attributes) throws IOException
- {
- HttpURLConnection connection = createConnection("PUT", restServiceUrl, _cookies);
- try
- {
- connection.connect();
- if (attributes != null)
- {
- ObjectMapper mapper = new ObjectMapper();
- mapper.writeValue(connection.getOutputStream(), attributes);
- }
- checkResponseCode(connection);
- return connection.getResponseCode();
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- public int delete(String restServiceUrl) throws IOException
- {
- HttpURLConnection connection = createConnection("DELETE", restServiceUrl, _cookies);
- try
- {
- checkResponseCode(connection);
- return connection.getResponseCode();
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- public int post(String restServiceUrl, Map<String, String> postData) throws IOException
- {
- HttpURLConnection connection = createConnectionAndPostData(restServiceUrl, postData, _cookies);
- try
- {
- checkResponseCode(connection);
- return connection.getResponseCode();
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- private HttpURLConnection createConnectionAndPostData(String restServiceUrl, Map<String, String> postData, List<String> cookies) throws IOException
- {
- String postParameters = getPostDataString(postData);
- HttpURLConnection connection = createConnection("POST", restServiceUrl, cookies);
- try
- {
- OutputStream os = connection.getOutputStream();
- os.write(postParameters.getBytes());
- os.flush();
- }
- catch (IOException e)
- {
- connection.disconnect();
- throw e;
- }
- return connection;
- }
-
- private void checkResponseCode(HttpURLConnection connection) throws IOException
- {
- if (connection.getResponseCode() == RESPONSE_AUTHENTICATION_REQUIRED)
- {
- _cookies = null;
- throw new IllegalArgumentException("Authentication is required");
- }
- }
-
- private String getPostDataString(Map<String, String> postData)
- {
- StringBuilder sb = new StringBuilder();
- if (postData != null)
- {
- Iterator<String> iterator = postData.keySet().iterator();
- while (iterator.hasNext())
- {
- String key = iterator.next();
- sb.append(key + "=" + postData.get(key));
- if (iterator.hasNext())
- {
- sb.append("&");
- }
- }
- }
- return sb.toString();
- }
-
- private HttpURLConnection createConnection(String method, String restServiceUrl, List<String> cookies) throws IOException
- {
- HttpURLConnection httpConnection = (HttpURLConnection) new URL(_brokerUrl + restServiceUrl).openConnection();
- if (cookies != null)
- {
- for (String cookie : cookies)
- {
- httpConnection.addRequestProperty("Cookie", cookie.split(";", 2)[0]);
- }
- }
- if (_saslMechanism == null)
- {
- httpConnection.setRequestProperty("Authorization", _authorizationHeader);
- }
-
- httpConnection.setDoOutput(true);
- httpConnection.setRequestMethod(method);
- return httpConnection;
- }
-
- public void authenticateIfSaslAuthenticationRequested() throws IOException
- {
- if (_saslMechanism == null)
- {
- // basic authentication will be used with each request
- }
- else if ("CRAM-MD5".equals(_saslMechanism))
- {
- _cookies = performCramMD5Authentication();
- }
- else
- {
- throw new IllegalArgumentException("Unsupported SASL mechanism :" + _saslMechanism);
- }
- }
-
-
- public void logout() throws IOException
- {
- if (_cookies != null)
- {
- HttpURLConnection connection = createConnection("GET", "/service/logout", _cookies);
- try
- {
- connection.connect();
- _cookies = null;
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- //TODO: we need to track sessions for basic auth in order to logout those
- }
-
- private List<String> performCramMD5Authentication() throws IOException
- {
- // request the challenge for CRAM-MD5
- HttpURLConnection connection = createConnectionAndPostData("/service/sasl", Collections.singletonMap("mechanism", "CRAM-MD5"), null);
- try
- {
- List<String> cookies = connection.getHeaderFields().get("Set-Cookie");
-
- // get response
- byte[] data = readConnectionInputStream(connection);
- Map<String, Object> response = _mapper.readValue(new ByteArrayInputStream(data), TYPE_HASH_MAP);
- String challenge = (String) response.get("challenge");
-
- // generate the authentication response for the received challenge
- String responseData = generateResponseForChallengeAndCredentials(challenge, _username, _password);
-
- Map<String, String> saslResponse = new HashMap<>();
- saslResponse.put("id", (String)response.get("id"));
- saslResponse.put("response", responseData);
-
- HttpURLConnection authenticateConnection = createConnectionAndPostData("/service/sasl", saslResponse, cookies);
- try
- {
- int code = authenticateConnection.getResponseCode();
- if (code != RESPONSE_OK)
- {
- throw new RuntimeException("Authentication failed");
- }
- else
- {
- return cookies;
- }
- }
- finally
- {
- authenticateConnection.disconnect();
- }
- }
- finally
- {
- connection.disconnect();
- }
- }
-
- private String generateResponseForChallengeAndCredentials(String challenge, String username, String password)
- {
- try
- {
- byte[] challengeBytes = Base64.decodeBase64(challenge);
-
- String macAlgorithm = "HmacMD5";
- Mac mac = Mac.getInstance(macAlgorithm);
- mac.init(new SecretKeySpec(password.getBytes("UTF-8"), macAlgorithm));
- final byte[] messageAuthenticationCode = mac.doFinal(challengeBytes);
- String responseAsString = username + " " + toHex(messageAuthenticationCode);
- byte[] responseBytes = responseAsString.getBytes();
- return Base64.encodeBase64String(responseBytes);
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Unexpected exception", e);
- }
- }
-
- private String toHex(byte[] data)
- {
- StringBuffer hash = new StringBuffer();
- for (int i = 0; i < data.length; i++)
- {
- String hex = Integer.toHexString(0xFF & data[i]);
- if (hex.length() == 1)
- {
- hash.append('0');
- }
- hash.append(hex);
- }
- return hash.toString();
- }
-
- private byte[] readConnectionInputStream(HttpURLConnection connection) throws IOException
- {
- if (connection.getResponseCode() == RESPONSE_AUTHENTICATION_REQUIRED)
- {
- _cookies = null;
- }
- InputStream is = connection.getInputStream();
- try(ByteArrayOutputStream baos = new ByteArrayOutputStream())
- {
- byte[] buffer = new byte[1024];
- int len;
- while ((len = is.read(buffer)) != -1)
- {
- baos.write(buffer, 0, len);
- }
- return baos.toByteArray();
- }
- }
-
- }
-
- public static class Arguments
- {
- private static final Set<String> REQUIRED = new HashSet<>(Arrays.asList("brokerUrl", "username", "password"));
-
- private String brokerUrl = null;
- private String username = null;
- private String password = null;
- private String saslMechanism = null;
-
- private String virtualHostNode = null;
- private String virtualHost = null;
- private String queueName = null;
- private String exchangeName = null;
-
- private int repetitions = 1;
-
- private boolean createQueue = false;
- private boolean deleteQueue = false;
- private boolean uniqueQueues = false;
- private boolean bindQueue = false;
-
- private boolean logout = true;
-
- public Arguments()
- {
- }
-
- public void validate()
- {
- if (brokerUrl == null || brokerUrl.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'brokerUrl' is not specified");
- }
-
- if (username == null || username.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'username' is not specified");
- }
-
- if (password == null || password.equals(""))
- {
- throw new IllegalArgumentException("Mandatory argument 'password' is not specified");
- }
-
- if (createQueue)
- {
- if (virtualHostNode == null || virtualHostNode.equals(""))
- {
- throw new IllegalArgumentException("Virtual host node name needs to be specified for queue creation");
- }
-
- if (virtualHost == null || virtualHost.equals(""))
- {
- throw new IllegalArgumentException("Virtual host name needs to be specified for queue creation");
- }
- }
- }
-
- public String getUsername()
- {
- return username;
- }
-
- public String getPassword()
- {
- return password;
- }
-
- public String getVirtualHost()
- {
- return virtualHost;
- }
-
- public boolean isCreateQueue()
- {
- return createQueue;
- }
-
- public boolean isDeleteQueue()
- {
- return deleteQueue;
- }
-
- public boolean isUniqueQueues()
- {
- return uniqueQueues;
- }
-
- public String getQueueName()
- {
- return queueName;
- }
-
- public boolean isBindQueue()
- {
- return bindQueue;
- }
-
- public String getExchangeName()
- {
- return exchangeName;
- }
-
- public String getVirtualHostNode()
- {
- return virtualHostNode;
- }
-
-
- public int getRepetitions()
- {
- return repetitions;
- }
-
- public String getBrokerUrl()
- {
- return brokerUrl;
- }
-
- public String getSaslMechanism()
- {
- return saslMechanism;
- }
-
- public boolean isLogout()
- {
- return logout;
- }
-
- @Override
- public String toString()
- {
- return "Arguments{" +
- "brokerUrl='" + brokerUrl + '\'' +
- ", username='" + username + '\'' +
- ", password='" + password + '\'' +
- ", saslMechanism='" + saslMechanism + '\'' +
- ", virtualHostNode='" + virtualHostNode + '\'' +
- ", virtualHost='" + virtualHost + '\'' +
- ", queueName='" + queueName + '\'' +
- ", exchangeName='" + exchangeName + '\'' +
- ", repetitions=" + repetitions +
- ", createQueue=" + createQueue +
- ", deleteQueue=" + deleteQueue +
- ", uniqueQueues=" + uniqueQueues +
- ", bindQueue=" + bindQueue +
- ", logout=" + logout +
- '}';
- }
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
deleted file mode 100644
index 6494a2e800..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/StressTestClient.java
+++ /dev/null
@@ -1,446 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-
-public class StressTestClient
-{
- private static final String QUEUE_NAME_PREFIX = "BURL:direct://amq.direct//stress-test-queue";
- private static final String DURABLE_SUFFIX = "?durable='true'";
-
- public static final String CONNECTIONS_ARG = "connections";
- public static final String SESSIONS_ARG = "sessions";
- public static final String CONSUME_IMMEDIATELY_ARG = "consumeImmediately";
- public static final String CONSUMERS_ARG = "consumers";
- public static final String CLOSE_CONSUMERS_ARG = "closeconsumers";
- public static final String PRODUCERS_ARG = "producers";
- public static final String MESSAGE_COUNT_ARG = "messagecount";
- public static final String MESSAGE_SIZE_ARG = "size";
- public static final String SUFFIX_ARG = "suffix";
- public static final String REPETITIONS_ARG = "repetitions";
- public static final String PERSISTENT_ARG = "persistent";
- public static final String RANDOM_ARG = "random";
- public static final String TIMEOUT_ARG = "timeout";
- public static final String DELAYCLOSE_ARG = "delayclose";
- public static final String REPORT_MOD_ARG = "reportmod";
- public static final String LOW_PREFETCH_ARG = "lowprefetch";
- public static final String TRANSACTED_ARG = "transacted";
- public static final String TX_BATCH_ARG = "txbatch";
-
- public static final String CONNECTIONS_DEFAULT = "1";
- public static final String SESSIONS_DEFAULT = "1";
- public static final String CONSUME_IMMEDIATELY_DEFAULT = "true";
- public static final String CLOSE_CONSUMERS_DEFAULT = "true";
- public static final String PRODUCERS_DEFAULT = "1";
- public static final String CONSUMERS_DEFAULT = "1";
- public static final String MESSAGE_COUNT_DEFAULT = "1";
- public static final String MESSAGE_SIZE_DEFAULT = "256";
- public static final String SUFFIX_DEFAULT = "";
- public static final String REPETITIONS_DEFAULT = "1";
- public static final String PERSISTENT_DEFAULT = "false";
- public static final String RANDOM_DEFAULT = "true";
- public static final String TIMEOUT_DEFAULT = "30000";
- public static final String DELAYCLOSE_DEFAULT = "0";
- public static final String REPORT_MOD_DEFAULT = "1";
- public static final String LOW_PREFETCH_DEFAULT = "false";
- public static final String TRANSACTED_DEFAULT = "false";
- public static final String TX_BATCH_DEFAULT = "1";
-
- private static final String CLASS = "StressTestClient";
-
- public static void main(String[] args)
- {
- Map<String,String> options = new HashMap<>();
- options.put(CONNECTIONS_ARG, CONNECTIONS_DEFAULT);
- options.put(SESSIONS_ARG, SESSIONS_DEFAULT);
- options.put(CONSUME_IMMEDIATELY_ARG, CONSUME_IMMEDIATELY_DEFAULT);
- options.put(PRODUCERS_ARG, PRODUCERS_DEFAULT);
- options.put(CONSUMERS_ARG, CONSUMERS_DEFAULT);
- options.put(CLOSE_CONSUMERS_ARG, CLOSE_CONSUMERS_DEFAULT);
- options.put(MESSAGE_COUNT_ARG, MESSAGE_COUNT_DEFAULT);
- options.put(MESSAGE_SIZE_ARG, MESSAGE_SIZE_DEFAULT);
- options.put(SUFFIX_ARG, SUFFIX_DEFAULT);
- options.put(REPETITIONS_ARG, REPETITIONS_DEFAULT);
- options.put(PERSISTENT_ARG, PERSISTENT_DEFAULT);
- options.put(RANDOM_ARG, RANDOM_DEFAULT);
- options.put(TIMEOUT_ARG, TIMEOUT_DEFAULT);
- options.put(DELAYCLOSE_ARG, DELAYCLOSE_DEFAULT);
- options.put(REPORT_MOD_ARG, REPORT_MOD_DEFAULT);
- options.put(LOW_PREFETCH_ARG, LOW_PREFETCH_DEFAULT);
- options.put(TRANSACTED_ARG, TRANSACTED_DEFAULT);
- options.put(TX_BATCH_ARG, TX_BATCH_DEFAULT);
-
- if(args.length == 1 &&
- (args[0].equals("-h") || args[0].equals("--help") || args[0].equals("help")))
- {
- System.out.println("arg=value options: \n" + options.keySet());
- return;
- }
-
- parseArgumentsIntoConfig(options, args);
-
- StressTestClient testClient = new StressTestClient();
- testClient.runTest(options);
- }
-
- public static void parseArgumentsIntoConfig(Map<String, String> initialValues, String[] args)
- {
- for(String arg: args)
- {
- String[] splitArg = arg.split("=");
- if(splitArg.length != 2)
- {
- throw new IllegalArgumentException("arguments must have format <name>=<value>: " + arg);
- }
-
- if(initialValues.put(splitArg[0], splitArg[1]) == null)
- {
- throw new IllegalArgumentException("not a valid configuration property: " + arg);
- }
- }
- }
-
-
- private void runTest(Map<String,String> options)
- {
- int numConnections = Integer.parseInt(options.get(CONNECTIONS_ARG));
- int numSessions = Integer.parseInt(options.get(SESSIONS_ARG));
- int numProducers = Integer.parseInt(options.get(PRODUCERS_ARG));
- int numConsumers = Integer.parseInt(options.get(CONSUMERS_ARG));
- boolean closeConsumers = Boolean.valueOf(options.get(CLOSE_CONSUMERS_ARG));
- boolean consumeImmediately = Boolean.valueOf(options.get(CONSUME_IMMEDIATELY_ARG));
- int numMessage = Integer.parseInt(options.get(MESSAGE_COUNT_ARG));
- int messageSize = Integer.parseInt(options.get(MESSAGE_SIZE_ARG));
- int repetitions = Integer.parseInt(options.get(REPETITIONS_ARG));
- String queueString = QUEUE_NAME_PREFIX + options.get(SUFFIX_ARG) + DURABLE_SUFFIX;
- int deliveryMode = Boolean.valueOf(options.get(PERSISTENT_ARG)) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
- boolean random = Boolean.valueOf(options.get(RANDOM_ARG));
- long recieveTimeout = Long.parseLong(options.get(TIMEOUT_ARG));
- long delayClose = Long.parseLong(options.get(DELAYCLOSE_ARG));
- int reportingMod = Integer.parseInt(options.get(REPORT_MOD_ARG));
- boolean lowPrefetch = Boolean.valueOf(options.get(LOW_PREFETCH_ARG));
- boolean transacted = Boolean.valueOf(options.get(TRANSACTED_ARG));
- int txBatch = Integer.parseInt(options.get(TX_BATCH_ARG));
-
- System.out.println(CLASS + ": Using options: " + options);
-
- System.out.println(CLASS + ": Creating message payload of " + messageSize + " (bytes)");
- byte[] sentBytes = generateMessage(random, messageSize);
-
- try
- {
- // Load JNDI properties
- Properties properties = new Properties();
- try(InputStream is = this.getClass().getClassLoader().getResourceAsStream("stress-test-client.properties"))
- {
- properties.load(is);
- }
- Context ctx = new InitialContext(properties);
-
- ConnectionFactory conFac;
- if(lowPrefetch)
- {
- System.out.println(CLASS + ": Using lowprefetch connection factory");
- conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactoryLowPrefetch");
- }
- else
- {
- conFac = (ConnectionFactory)ctx.lookup("qpidConnectionfactory");
- }
-
- //ensure the queue to be used exists and is bound
- System.out.println(CLASS + ": Creating queue: " + queueString);
- Connection startupConn = conFac.createConnection();
- Session startupSess = startupConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination startupDestination = startupSess.createQueue(queueString);
- MessageConsumer startupConsumer = startupSess.createConsumer(startupDestination);
- startupConsumer.close();
- startupSess.close();
- startupConn.close();
-
- for(int rep = 1 ; rep <= repetitions; rep++)
- {
- ArrayList<Connection> connectionList = new ArrayList<>();
-
- for (int co= 1; co<= numConnections ; co++)
- {
- if( co % reportingMod == 0)
- {
- System.out.println(CLASS + ": Creating connection " + co);
- }
- Connection conn = conFac.createConnection();
- conn.setExceptionListener(new ExceptionListener()
- {
- public void onException(JMSException jmse)
- {
- System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
- jmse.printStackTrace();
- System.exit(0);
- }
- });
-
- connectionList.add(conn);
- conn.start();
- for (int se= 1; se<= numSessions ; se++)
- {
- if( se % reportingMod == 0)
- {
- System.out.println(CLASS + ": Creating Session " + se);
- }
- try
- {
- Session sess;
- if(transacted)
- {
- sess = conn.createSession(true, Session.SESSION_TRANSACTED);
- }
- else
- {
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- BytesMessage message = sess.createBytesMessage();
-
- message.writeBytes(sentBytes);
-
- if(!random && numMessage == 1 && numSessions == 1 && numConnections == 1 && repetitions == 1)
- {
- //null the array to save memory
- sentBytes = null;
- }
-
- Destination destination = sess.createQueue(queueString);
-
- MessageConsumer consumer = null;
- for(int cns = 1 ; cns <= numConsumers ; cns++)
- {
- if( cns % reportingMod == 0)
- {
- System.out.println(CLASS + ": Creating Consumer " + cns);
- }
- consumer = sess.createConsumer(destination);
- }
-
- for(int pr = 1 ; pr <= numProducers ; pr++)
- {
- if( pr % reportingMod == 0)
- {
- System.out.println(CLASS + ": Creating Producer " + pr);
- }
- MessageProducer prod = sess.createProducer(destination);
- for(int me = 1; me <= numMessage ; me++)
- {
- if( me % reportingMod == 0)
- {
- System.out.println(CLASS + ": Sending Message " + me);
- }
- prod.send(message, deliveryMode,
- Message.DEFAULT_PRIORITY,
- Message.DEFAULT_TIME_TO_LIVE);
- if(transacted && me % txBatch == 0)
- {
- sess.commit();
- }
- }
- }
-
- if(numConsumers > 0 && consumeImmediately)
- {
- for(int cs = 1 ; cs <= numMessage ; cs++)
- {
- if( cs % reportingMod == 0)
- {
- System.out.println(CLASS + ": Consuming Message " + cs);
- }
- BytesMessage msg = (BytesMessage) consumer.receive(recieveTimeout);
-
- if(transacted && cs % txBatch == 0)
- {
- sess.commit();
- }
-
- if(msg == null)
- {
- throw new RuntimeException("Expected message not received in allowed time: " + recieveTimeout);
- }
-
- validateReceivedMessageContent(sentBytes, msg, random, messageSize);
- }
-
- if(closeConsumers)
- {
- sess.close();
- }
- }
-
- }
- catch (Exception exp)
- {
- System.err.println(CLASS + ": Caught an Exception: " + exp);
- exp.printStackTrace();
- }
-
- }
- }
-
- if(numConsumers == -1 && !consumeImmediately)
- {
- System.out.println(CLASS + ": Consuming left over messages, using recieve timeout:" + recieveTimeout);
-
- Connection conn = conFac.createConnection();
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination destination = sess.createQueue(queueString);
- MessageConsumer consumer = sess.createConsumer(destination);
- conn.start();
-
- int count = 0;
- while(true)
- {
- BytesMessage msg = (BytesMessage) consumer.receive(recieveTimeout);
-
- if(msg == null)
- {
- System.out.println(CLASS + ": Received " + count + " messages");
- break;
- }
- else
- {
- count++;
- }
-
- validateReceivedMessageContent(sentBytes, msg, random, messageSize);
- }
-
- consumer.close();
- sess.close();
- conn.close();
- }
-
- if(delayClose > 0)
- {
- System.out.println(CLASS + ": Delaying closing connections: " + delayClose);
- Thread.sleep(delayClose);
- }
-
- // Close the connections to the server
- System.out.println(CLASS + ": Closing connections");
-
- for(int connection = 0 ; connection < connectionList.size() ; connection++)
- {
- if( (connection+1) % reportingMod == 0)
- {
- System.out.println(CLASS + ": Closing connection " + (connection+1));
- }
- Connection c = connectionList.get(connection);
- c.close();
- }
-
- // Close the JNDI reference
- System.out.println(CLASS + ": Closing JNDI context");
- ctx.close();
- }
- }
- catch (Exception exp)
- {
- System.err.println(CLASS + ": Caught an Exception: " + exp);
- exp.printStackTrace();
- }
- }
-
-
- private byte[] generateMessage(boolean random, int messageSize)
- {
- byte[] sentBytes = new byte[messageSize];
- if(random)
- {
- //fill the array with numbers from 0-9
- Random rand = new Random(System.currentTimeMillis());
- for(int r = 0 ; r < messageSize ; r++)
- {
- sentBytes[r] = (byte) (48 + rand.nextInt(10));
- }
- }
- else
- {
- //use sequential numbers from 0-9
- for(int r = 0 ; r < messageSize ; r++)
- {
- sentBytes[r] = (byte) (48 + (r % 10));
- }
- }
- return sentBytes;
- }
-
-
- private void validateReceivedMessageContent(byte[] sentBytes,
- BytesMessage msg, boolean random, int messageSize) throws JMSException
- {
- Long length = msg.getBodyLength();
-
- if(length != messageSize)
- {
- throw new RuntimeException("Incorrect number of bytes received");
- }
-
- byte[] recievedBytes = new byte[length.intValue()];
- msg.readBytes(recievedBytes);
-
- if(random)
- {
- if(!Arrays.equals(sentBytes, recievedBytes))
- {
- throw new RuntimeException("Incorrect value of bytes received");
- }
- }
- else
- {
- for(int r = 0 ; r < messageSize ; r++)
- {
- if(! (recievedBytes[r] == (byte) (48 + (r % 10))))
- {
- throw new RuntimeException("Incorrect value of bytes received");
- }
- }
- }
- }
-}
-
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
deleted file mode 100644
index 18870bac59..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/TestConfiguration.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools;
-
-import java.text.DecimalFormat;
-
-import javax.jms.Connection;
-
-public interface TestConfiguration
-{
- enum MessageType {
- BYTES, TEXT, MAP, OBJECT;
-
- public static MessageType getType(String s) throws Exception
- {
- if ("text".equalsIgnoreCase(s))
- {
- return TEXT;
- }
- else if ("bytes".equalsIgnoreCase(s))
- {
- return BYTES;
- }
- /*else if ("map".equalsIgnoreCase(s))
- {
- return MAP;
- }
- else if ("object".equalsIgnoreCase(s))
- {
- return OBJECT;
- }*/
- else
- {
- throw new Exception("Unsupported message type");
- }
- }
- };
-
- public final static String TIMESTAMP = "ts";
-
- public final static String EOS = "eos";
-
- public final static String SEQUENCE_NUMBER = "sn";
-
- public String getUrl();
-
- public String getHost();
-
- public int getPort();
-
- public String getAddress();
-
- public long getTimeout();
-
- public int getAckMode();
-
- public int getMsgCount();
-
- public int getMsgSize();
-
- public int getRandomMsgSizeStartFrom();
-
- public boolean isDurable();
-
- public boolean isTransacted();
-
- public int getTransactionSize();
-
- public int getWarmupCount();
-
- public boolean isCacheMessage();
-
- public boolean isDisableMessageID();
-
- public boolean isDisableTimestamp();
-
- public boolean isRandomMsgSize();
-
- public String getMessageType();
-
- public boolean isPrintStdDev();
-
- public int getSendRate();
-
- public boolean isExternalController();
-
- public boolean isUseUniqueDests();
-
- public int getAckFrequency();
-
- public Connection createConnection() throws Exception;
-
- public DecimalFormat getDecimalFormat();
-
- public int reportEvery();
-
- public boolean isReportTotal();
-
- public boolean isReportHeader();
-
- public int getSendEOS();
-
- public int getConnectionCount();
-
- public int getRollbackFrequency();
-
- public boolean isPrintHeaders();
-
- public boolean isPrintContent();
-
- public long getTTL();
-
- public int getPriority();
-
- public String getReadyAddress();
-} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java
deleted file mode 100644
index a9896c1d4e..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/BasicReporter.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.report;
-
-import java.io.PrintStream;
-import java.lang.reflect.Constructor;
-
-import javax.jms.Message;
-
-public class BasicReporter implements Reporter
-{
- PrintStream out;
- int batchSize = 0;
- int batchCount = 0;
- boolean headerPrinted = false;
- protected Statistics overall;
- Statistics batch;
- Constructor<? extends Statistics> statCtor;
-
- public BasicReporter(Class<? extends Statistics> clazz, PrintStream out,
- int batchSize, boolean printHeader) throws Exception
- {
- this.out = out;
- this.batchSize = batchSize;
- this.headerPrinted = !printHeader;
- statCtor = clazz.getConstructor();
- overall = statCtor.newInstance();
- if (batchSize > 0)
- {
- batch = statCtor.newInstance();
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#message(javax.jms.Message)
- */
- @Override
- public void message(Message msg)
- {
- overall.message(msg);
- if (batchSize > 0)
- {
- batch.message(msg);
- if (++batchCount == batchSize)
- {
- if (!headerPrinted)
- {
- header();
- }
- batch.report(out);
- batch.clear();
- batchCount = 0;
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#report()
- */
- @Override
- public void report()
- {
- if (!headerPrinted)
- {
- header();
- }
- overall.report(out);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#header()
- */
- @Override
- public void header()
- {
- headerPrinted = true;
- overall.header(out);
- }
-
- /* (non-Javadoc)
- * @see org.apache.qpid.tools.report.Reporter#log()
- */
- @Override
- public void log(String s)
- {
- // NOOP
- }
-
- @Override
- public void clear()
- {
- batch.clear();
- overall.clear();
- }
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java
deleted file mode 100644
index e9bf7100c1..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/MercuryReporter.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.report;
-
-import java.io.PrintStream;
-
-import org.apache.qpid.tools.report.Statistics.Throughput;
-import org.apache.qpid.tools.report.Statistics.ThroughputAndLatency;
-
-public class MercuryReporter extends BasicReporter
-{
- MercuryStatistics stats;
-
- public MercuryReporter(Class<? extends MercuryStatistics> clazz, PrintStream out,
- int batchSize, boolean printHeader) throws Exception
- {
- super(clazz, out, batchSize, printHeader);
- stats = (MercuryStatistics)overall;
- }
-
- public double getRate()
- {
- return stats.getRate();
- }
-
- public double getAvgLatency()
- {
- return stats.getAvgLatency();
- }
-
- public double getStdDev()
- {
- return stats.getStdDev();
- }
-
- public double getMinLatency()
- {
- return stats.getMinLatency();
- }
-
- public double getMaxLatency()
- {
- return stats.getMaxLatency();
- }
-
- public int getSampleSize()
- {
- return stats.getSampleSize();
- }
-
- public interface MercuryStatistics extends Statistics
- {
- public double getRate();
- public long getMinLatency();
- public long getMaxLatency();
- public double getAvgLatency();
- public double getStdDev();
- public int getSampleSize();
- }
-
- public static class MercuryThroughput extends Throughput implements MercuryStatistics
- {
- double rate = 0;
-
- @Override
- public void report(PrintStream out)
- {
- long elapsed = System.currentTimeMillis() - start;
- rate = (double)messages/(double)elapsed;
- }
-
- @Override
- public void clear()
- {
- super.clear();
- rate = 0;
- }
-
- public double getRate()
- {
- return rate;
- }
-
- public int getSampleSize()
- {
- return messages;
- }
-
- public long getMinLatency() { return 0; }
- public long getMaxLatency() { return 0; }
- public double getAvgLatency(){ return 0; }
- public double getStdDev(){ return 0; }
-
- }
-
- public static class MercuryThroughputAndLatency extends ThroughputAndLatency implements MercuryStatistics
- {
- double rate = 0;
- double avgLatency = 0;
- double stdDev;
-
- @Override
- public void report(PrintStream out)
- {
- long elapsed = System.currentTimeMillis() - start;
- rate = (double)messages/(double)elapsed;
- avgLatency = totalLatency/(double)sampleCount;
- }
-
- @Override
- public void clear()
- {
- super.clear();
- rate = 0;
- avgLatency = 0;
- }
-
- public double getRate()
- {
- return rate;
- }
-
- public long getMinLatency()
- {
- return minLatency;
- }
-
- public long getMaxLatency()
- {
- return maxLatency;
- }
-
- public double getAvgLatency()
- {
- return avgLatency;
- }
-
- public double getStdDev()
- {
- return stdDev;
- }
-
- public int getSampleSize()
- {
- return messages;
- }
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java
deleted file mode 100644
index 5e481458be..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Reporter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.report;
-
-import javax.jms.Message;
-
-public interface Reporter
-{
-
- public void message(Message msg);
-
- public void report();
-
- public void header();
-
- // Will be used by some reporters to print statements which are greped by
- // scripts. Example see java/tools/bin/perf-report
- public void log(String s);
-
- public void clear();
-
-} \ No newline at end of file
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java
deleted file mode 100644
index db8b4ddcee..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/report/Statistics.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.report;
-
-import java.io.PrintStream;
-import java.text.DecimalFormat;
-
-import javax.jms.Message;
-
-import org.apache.qpid.tools.TestConfiguration;
-
-public interface Statistics
-{
- public void message(Message msg);
- public void report(PrintStream out);
- public void header(PrintStream out);
- public void clear();
-
- static class Throughput implements Statistics
- {
- DecimalFormat df = new DecimalFormat("###");
- int messages = 0;
- long start = 0;
- boolean started = false;
-
- @Override
- public void message(Message msg)
- {
- ++messages;
- if (!started)
- {
- start = System.currentTimeMillis();
- started = true;
- }
- }
-
- @Override
- public void report(PrintStream out)
- {
- long elapsed = System.currentTimeMillis() - start;
- out.println(df.format((double)messages/(double)elapsed));
- }
-
- @Override
- public void header(PrintStream out)
- {
- out.println("tp(m/s)");
- }
-
- public void clear()
- {
- messages = 0;
- start = 0;
- started = false;
- }
- }
-
- static class ThroughputAndLatency extends Throughput
- {
- long minLatency = Long.MAX_VALUE;
- long maxLatency = Long.MIN_VALUE;
- double totalLatency = 0;
- int sampleCount = 0;
-
- @Override
- public void message(Message msg)
- {
- super.message(msg);
- try
- {
- long ts = msg.getLongProperty(TestConfiguration.TIMESTAMP);
- long latency = System.currentTimeMillis() - ts;
- minLatency = Math.min(latency, minLatency);
- maxLatency = Math.max(latency, maxLatency);
- totalLatency = totalLatency + latency;
- sampleCount++;
- }
- catch(Exception e)
- {
- System.out.println("Error calculating latency " + e);
- }
- }
-
- @Override
- public void report(PrintStream out)
- {
- long elapsed = System.currentTimeMillis() - start;
- double rate = (double)messages/(double)elapsed;
- double avgLatency = totalLatency/(double)sampleCount;
- out.append("\n")
- .append(df.format(rate))
- .append('\t')
- .append(String.valueOf(minLatency))
- .append('\t')
- .append(String.valueOf(maxLatency))
- .append('\t')
- .append(df.format(avgLatency))
- .append("\n");
-
- out.flush();
- }
-
- @Override
- public void header(PrintStream out)
- {
- out.append("tp(m/s)")
- .append('\t')
- .append("l-min")
- .append('\t')
- .append("l-max")
- .append('\t')
- .append("l-avg");
-
- out.flush();
- }
-
- public void clear()
- {
- super.clear();
- minLatency = 0;
- maxLatency = 0;
- totalLatency = 0;
- sampleCount = 0;
- }
- }
-
-}
diff --git a/qpid/java/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java b/qpid/java/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java
deleted file mode 100644
index a71b466a0f..0000000000
--- a/qpid/java/tools/src/main/java/org/apache/qpid/tools/util/ArgumentsParser.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tools.util;
-
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Set;
-
-public class ArgumentsParser
-{
- public ArgumentsParser()
- {
- }
-
- public <T> T parse(String[] args, Class<T> pojoClass)
- {
- T object;
- try
- {
- object = pojoClass.newInstance();
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Cannot instantiate object of class " + pojoClass, e);
- }
-
- for (String arg: args)
- {
- int pos = arg.indexOf('=');
- if (pos == -1)
- {
- throw new IllegalArgumentException("Invalid argument '" + arg + "' Argument should be specified in format <name>=<value>");
- }
- String name = arg.substring(0, pos);
- String value = arg.substring(pos + 1);
-
- Field field = findField(pojoClass, name);
- if (field != null)
- {
- setField(object, field, value);
- }
- }
- return object;
- }
-
- private Field findField(Class<?> objectClass, String name)
- {
- Field[] fields = objectClass.getDeclaredFields();
-
- Field field = null;
- for (int i = 0 ; i< fields.length ; i++)
- {
- if (fields[i].getName().equals(name) && !Modifier.isFinal(fields[i].getModifiers()))
- {
- field = fields[i];
- break;
- }
- }
- return field;
- }
-
- private void setField(Object object, Field field, String value)
- {
- Object convertedValue = convertStringToType(value, field.getType(), field.getName());
-
- field.setAccessible(true);
-
- try
- {
- field.set(object, convertedValue);
- }
- catch (IllegalAccessException e)
- {
- throw new RuntimeException("Cannot access field " + field.getName());
- }
- }
-
- private Object convertStringToType(String value, Class<?> fieldType, String fieldName)
- {
- Object o;
- if (fieldType == String.class)
- {
- o = value;
- }
- else if (fieldType == boolean.class)
- {
- try
- {
- o = Boolean.parseBoolean(value);
- }
- catch(Exception e)
- {
- throw new RuntimeException("Cannot convert to boolean argument " + fieldName);
- }
- }
- else if (fieldType == int.class)
- {
- try
- {
- o = Integer.parseInt(value);
- }
- catch(Exception e)
- {
- throw new RuntimeException("Cannot convert to int argument " + fieldName);
- }
- }
- else
- {
- throw new RuntimeException("Unsupported tye " + fieldType + " in " + fieldName);
- }
- return o;
- }
-
- public void usage(Class<?> objectClass, Set<String> requiredFields)
- {
- System.out.println("Supported arguments:");
- Field[] fields = objectClass.getDeclaredFields();
-
- Object object = null;
- try
- {
- object = objectClass.newInstance();
- }
- catch(Exception e)
- {
- // ignore any
- }
-
- for (int i = 0 ; i< fields.length ; i++)
- {
- Field field = fields[i];
- if (!Modifier.isFinal(field.getModifiers()))
- {
- Object defaultValue = null;
- try
- {
- field.setAccessible(true);
- defaultValue = field.get(object);
- }
- catch(Exception e)
- {
- // ignore any
- }
-
- System.out.println(" " + field.getName() + " ( type: "
- + field.getType().getSimpleName().toLowerCase()
- + (object != null ? ", default: " + defaultValue : "")
- + (requiredFields != null && requiredFields.contains(field.getName()) ? ", mandatory" : "")
- + ")");
- }
- }
- }
-}
diff --git a/qpid/java/tools/src/main/resources/stress-test-client.properties b/qpid/java/tools/src/main/resources/stress-test-client.properties
deleted file mode 100644
index 2ef8c258b4..0000000000
--- a/qpid/java/tools/src/main/resources/stress-test-client.properties
+++ /dev/null
@@ -1,3 +0,0 @@
-java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
-connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672'
-connectionfactory.qpidConnectionfactoryLowPrefetch=amqp://guest:guest@clientid/?brokerlist='tcp://localhost:5672?maxprefetch='10''