summaryrefslogtreecommitdiff
path: root/java/testkit
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2008-06-30 16:35:41 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2008-06-30 16:35:41 +0000
commit826abf41305c856fe1a889dadf1d77525538d90a (patch)
tree50e2f76fdc848d341e0af04bfb1ef7d098c18b68 /java/testkit
parent9cf0d38fe457a52536ec6418823fa7f37709c767 (diff)
downloadqpid-python-826abf41305c856fe1a889dadf1d77525538d90a.tar.gz
This commit is related to QPID-1161.
Please refer to the JIRA for complete details. In Summary this contains a simple test kit comprising of perf and soak tests. The focus is on producing a packaged set of tests that can be easily deployed on target environment. For Quick perf report for a particular release, please run perf_report.sh which will show results for 8 common use cases in a tabular format. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@672810 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/testkit')
-rw-r--r--java/testkit/bin/perf_report.sh109
-rw-r--r--java/testkit/bin/run_pub.sh31
-rw-r--r--java/testkit/bin/run_sub.sh30
-rw-r--r--java/testkit/bin/setenv.sh52
-rw-r--r--java/testkit/build.xml27
-rw-r--r--java/testkit/etc/jndi.properties35
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java43
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java102
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java248
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java207
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java160
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java149
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java129
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java167
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java137
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java109
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java147
17 files changed, 1882 insertions, 0 deletions
diff --git a/java/testkit/bin/perf_report.sh b/java/testkit/bin/perf_report.sh
new file mode 100644
index 0000000000..8e25ced685
--- /dev/null
+++ b/java/testkit/bin/perf_report.sh
@@ -0,0 +1,109 @@
+#!/bin/sh -xv
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Helper script to set classpath for running Qpid example classes
+# NB: You must add the Qpid client and common jars to your CLASSPATH
+# before running this script
+
+MAX_SUB_MEM=1024M
+MAX_PUB_MEM=1024M
+
+. setenv.sh
+
+waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; }
+cleanup()
+{
+ pids=`ps aux | grep java | grep Perf | awk '{print $2}'`
+ if [ "$pids" != "" ]; then
+ kill -3 $pids
+ kill -9 $pids >/dev/null 2>&1
+ fi
+}
+
+# $1 test name
+# $2 consumer options
+# $3 producer options
+run_testcase()
+{
+ sh run_sub.sh $2 > sub.out &
+ waitfor sub.out "Warming up"
+ sh run_pub.sh $3 > pub.out &
+ waitfor sub.out "Completed the test"
+ waitfor pub.out "Consumer has completed the test"
+ sleep 2 #give a grace period to shutdown
+ print_result $1
+}
+
+run_sub()
+{
+ java -cp $CLASSPATH -Xmx$MAX_SUB_MEM $@ org.apache.qpid.testkit.perf.PerfConsumer
+}
+
+run_pub()
+{
+ java -cp $CLASSPATH -Xmx$MAX_PUB_MEM $@ org.apache.qpid.testkit.perf.PerfProducer
+}
+
+print_result()
+{
+ prod_rate=`cat pub.out | grep "Producer rate" | awk '{print $3}'`
+ sys_rate=`cat sub.out | grep "System Throughput" | awk '{print $4}'`
+ cons_rate=`cat sub.out | grep "Consumer rate" | awk '{print $4}'`
+ avg_latency=`cat sub.out | grep "Avg Latency" | awk '{print $4}'`
+ min_latency=`cat sub.out | grep "Min Latency" | awk '{print $4}'`
+ max_latency=`cat sub.out | grep "Max Latency" | awk '{print $4}'`
+
+ printf "|%-15s|%15.2f|%13.2f|%13.2f|%11.2f|%11d|%11d|\n" $1 $sys_rate $prod_rate $cons_rate $avg_latency $min_latency $max_latency
+ echo "------------------------------------------------------------------------------------------------"
+}
+
+trap cleanup EXIT
+
+echo "Test report on " `date +%F`
+echo "================================================================================================"
+echo "|Test |System throuput|Producer rate|Consumer Rate|Avg Latency|Min Latency|Max Latency|"
+echo "------------------------------------------------------------------------------------------------"
+
+# Test 1 Trans Queue
+run_testcase "Trans_Queue" "" "-Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 2 Dura Queue
+run_testcase "Dura_Queue" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 3 Dura Queue Sync
+run_testcase "Dura_Queue_Sync" "-Ddurable=true" "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dsync_persistence=true"
+
+# Test 4 Topic
+#run_testcase "Topic" "-DtransDest=transientTopic" "-DtransDest=transientTopic -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 5 Durable Topic
+run_testcase "Dura_Topic" "-Ddurable=true -DtransDest=durableTopic" "-Ddurable=true -DtransDest=durableTopic -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 6 Fanout
+run_testcase "Fanout" "-DtransDest=fanoutQueue" "-DtransDest=fanoutQueue -Dwarmup_count=1 -Dmsg_count=10"
+
+# Test 7 Small TX
+#run_testcase "Small_Txs_2" "-Ddurable=true -Dtransacted=true -Dtrans_size=1" \
+# "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=1"
+
+# Test 8 Large TX
+#run_testcase "Large_Txs_1000" "-Ddurable=true -Dtransacted=true -Dtrans_size-10" \
+# "-Ddurable=true -Dwarmup_count=1 -Dmsg_count=10 -Dtransacted=true -Dtrans_size=10"
+
diff --git a/java/testkit/bin/run_pub.sh b/java/testkit/bin/run_pub.sh
new file mode 100644
index 0000000000..27acd52e47
--- /dev/null
+++ b/java/testkit/bin/run_pub.sh
@@ -0,0 +1,31 @@
+#!/bin/sh -xv
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Helper script to set classpath for running Qpid example classes
+# NB: You must add the Qpid client and common jars to your CLASSPATH
+# before running this script
+
+. setenv.sh
+
+MAX_MEM=1024M
+
+echo "$@"
+java -cp $CLASSPATH -Xmx$MAX_MEM $@ org.apache.qpid.testkit.perf.PerfProducer
+
diff --git a/java/testkit/bin/run_sub.sh b/java/testkit/bin/run_sub.sh
new file mode 100644
index 0000000000..35884b9a4a
--- /dev/null
+++ b/java/testkit/bin/run_sub.sh
@@ -0,0 +1,30 @@
+#!/bin/sh -xv
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Helper script to set classpath for running Qpid example classes
+# NB: You must add the Qpid client and common jars to your CLASSPATH
+# before running this script
+
+. setenv.sh
+
+MAX_MEM=1024M
+
+java -cp $CLASSPATH -Xmx$MAX_MEM $@ org.apache.qpid.testkit.perf.PerfConsumer
+
diff --git a/java/testkit/bin/setenv.sh b/java/testkit/bin/setenv.sh
new file mode 100644
index 0000000000..3dba383029
--- /dev/null
+++ b/java/testkit/bin/setenv.sh
@@ -0,0 +1,52 @@
+#!/bin/sh -xv
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Helper script to set classpath for running Qpid example classes
+# NB: You must add the Qpid client and common jars to your CLASSPATH
+# before running this script
+
+# check for QPID_TEST_HOME
+if [ "$QPID_TEST_HOME" = "" ] ; then
+ echo "ERROR: Please set QPID_TEST_HOME ...."
+ exit 1
+fi
+
+# check for JAVA_HOME
+if [ "$JAVA_HOME" = "" ] ; then
+ echo "ERROR: Please set JAVA_HOME ...."
+ exit 1
+fi
+
+# VENDOR_LIB path needs to be set
+# for Qpid set this to {qpid_checkout}/java/build/lib
+VENDOR_LIB=""
+if [ "$VENDOR_LIB" = "" ] ; then
+ echo "ERROR: Please set VENDOR_LIB path in the script ...."
+ exit 1
+fi
+
+
+[ -d $QPID_TEST_HOME/classes ] || mkdir $QPID_TEST_HOME/classes
+
+CLASSPATH=`find $VENDOR_LIB -name *.jar* | tr '\n' ":"`
+javac -cp $CLASSPATH -d $QPID_TEST_HOME/classes -sourcepath $QPID_TEST_HOME/src `find $QPID_TEST_HOME/src -name '*.java'`
+
+export CLASSPATH=$QPID_TEST_HOME/classes:$CLASSPATH
+
diff --git a/java/testkit/build.xml b/java/testkit/build.xml
new file mode 100644
index 0000000000..94b97d270d
--- /dev/null
+++ b/java/testkit/build.xml
@@ -0,0 +1,27 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="Test Kit" default="build">
+
+ <property name="module.depends" value="client broker common"/>
+
+ <import file="../module.xml"/>
+
+</project>
diff --git a/java/testkit/etc/jndi.properties b/java/testkit/etc/jndi.properties
new file mode 100644
index 0000000000..f535975844
--- /dev/null
+++ b/java/testkit/etc/jndi.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+
+# use the following property to configure the default connector
+#java.naming.provider.url - ignored.
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.connectionFactory = amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'
+
+# Register an AMQP destination in JNDI
+destination.transientQueue = direct://amq.direct//testQueueT
+destination.durableQueue = direct://amq.direct//testQueueD?durable='true'
+
+destination.transientTopic = topic://amq.topic//testTopicT
+#destination.durableTopic = topic://amq.topic//testTopicD?durable='true'
+
+destination.fanoutQueue = fanout://amq.fanout//fanoutQueue \ No newline at end of file
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java b/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java
new file mode 100644
index 0000000000..f2784ef499
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java
@@ -0,0 +1,43 @@
+package org.apache.qpid.testkit;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+public class MessageFactory
+{
+ public static Message createBytesMessage(Session ssn, int size) throws JMSException
+ {
+ BytesMessage msg = ssn.createBytesMessage();
+ msg.writeBytes(createMessagePayload(size).getBytes());
+ return msg;
+ }
+
+ public static Message createTextMessage(Session ssn, int size) throws JMSException
+ {
+ TextMessage msg = ssn.createTextMessage();
+ msg.setText(createMessagePayload(size));
+ return msg;
+ }
+
+ public static String createMessagePayload(int size)
+ {
+ String msgData = "Qpid Test Message";
+
+ StringBuffer buf = new StringBuffer(size);
+ int count = 0;
+ while (count <= (size - msgData.length()))
+ {
+ buf.append(msgData);
+ count += msgData.length();
+ }
+ if (count < size)
+ {
+ buf.append(msgData, 0, size - count);
+ }
+
+ return buf.toString();
+ }
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java
new file mode 100644
index 0000000000..95670f0507
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.perf;
+
+import java.text.DecimalFormat;
+import java.util.Hashtable;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+public class PerfBase
+{
+ TestParams params;
+ Connection con;
+ Session session;
+ Destination dest;
+ Destination feedbackDest;
+ DecimalFormat df = new DecimalFormat("###.##");
+
+ public PerfBase()
+ {
+ params = new TestParams();
+ }
+
+ public void setUp() throws Exception
+ {
+ Hashtable<String,String> env = new Hashtable<String,String>();
+ env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory());
+ env.put(Context.PROVIDER_URL, params.getProviderURL());
+
+ Context ctx = null;
+ try
+ {
+ ctx = new InitialContext(env);
+ }
+ catch(Exception e)
+ {
+ throw new Exception("Error initializing JNDI",e);
+
+ }
+
+ ConnectionFactory conFac = null;
+ try
+ {
+ conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory());
+ }
+ catch(Exception e)
+ {
+ throw new Exception("Error looking up connection factory",e);
+ }
+
+ con = conFac.createConnection();
+ con.start();
+ session = con.createSession(params.isTransacted(),
+ params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
+
+ try
+ {
+ dest = (Destination)ctx.lookup( params.isDurable()?
+ params.getDurableDestination():
+ params.getTransientDestination()
+ );
+ }
+ catch(Exception e)
+ {
+ throw new Exception("Error looking up destination",e);
+ }
+ }
+
+ public void handleError(Exception e,String msg)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg);
+ sb.append(" ");
+ sb.append(e.getMessage());
+ System.err.println(sb.toString());
+ e.printStackTrace();
+ }
+}
+
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
new file mode 100644
index 0000000000..cd12c7010d
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
@@ -0,0 +1,248 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.perf;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+/**
+ * PerfConsumer will receive x no of messages in warmup mode.
+ * Once it receives the Start message it will then signal the PerfProducer.
+ * It will start recording stats from the first message it receives after
+ * the warmup mode is done.
+ *
+ * The following calculations are done.
+ * The important numbers to look at is
+ * a) Avg Latency
+ * b) System throughput.
+ *
+ * Latency.
+ * =========
+ * Currently this test is written with the assumption that either
+ * a) The Perf Producer and Consumer are on the same machine
+ * b) They are on separate machines that have their time synced via a Time Server
+ *
+ * In order to calculate latency the producer inserts a timestamp
+ * hen the message is sent. The consumer will note the current time the message is
+ * received and will calculate the latency as follows
+ * latency = rcvdTime - msg.getJMSTimestamp()
+ *
+ * Through out the test it will keep track of the max and min latency to show the
+ * variance in latencies.
+ *
+ * Avg latency is measured by adding all latencies and dividing by the total msgs.
+ * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
+ *
+ * Throughput
+ * ===========
+ * System throughput is calculated as follows
+ * rcvdMsgCount/(rcvdTime - testStartTime)
+ *
+ * Consumer rate is calculated as
+ * rcvdMsgCount/(rcvdTime - startTime)
+ *
+ * Note that the testStartTime referes to when the producer sent the first message
+ * and startTime is when the consumer first received a message.
+ *
+ * rcvdTime keeps track of when the last message is received.
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ */
+
+public class PerfConsumer extends PerfBase implements MessageListener
+{
+ MessageConsumer consumer;
+ long maxLatency = 0;
+ long minLatency = Long.MAX_VALUE;
+ long totalLatency = 0; // to calculate avg latency.
+ int rcvdMsgCount = 0;
+ long testStartTime = 0; // to measure system throughput
+ long startTime = 0; // to measure consumer throughput
+ long rcvdTime = 0;
+ boolean transacted = false;
+ int transSize = 0;
+
+ final Object lock = new Object();
+
+ public PerfConsumer()
+ {
+ super();
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ consumer = session.createConsumer(dest);
+
+ // Storing the following two for efficiency
+ transacted = params.isTransacted();
+ transSize = params.getTransactionSize();
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+
+ boolean start = false;
+ while (!start)
+ {
+ Message msg = consumer.receive();
+ if (msg instanceof TextMessage)
+ {
+ if (((TextMessage)msg).getText().equals("End"))
+ {
+ start = true;
+ MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
+ temp.send(session.createMessage());
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ temp.close();
+ }
+ }
+ }
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ consumer.setMessageListener(this);
+ }
+
+ public void printResults() throws Exception
+ {
+ synchronized (lock)
+ {
+ lock.wait();
+ }
+
+ double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
+ double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
+ double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
+ System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
+ System.out.println(new StringBuilder("Consumer rate : ").
+ append(df.format(consRate)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("System Throughput : ").
+ append(df.format(throughput)).
+ append(" msg/sec").toString());
+ System.out.println(new StringBuilder("Avg Latency : ").
+ append(df.format(avgLatency)).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Min Latency : ").
+ append(minLatency).
+ append(" ms").toString());
+ System.out.println(new StringBuilder("Max Latency : ").
+ append(maxLatency).
+ append(" ms").toString());
+ System.out.println("Completed the test......\n");
+ }
+
+ public void notifyCompletion(Destination replyTo) throws Exception
+ {
+ MessageProducer tmp = session.createProducer(replyTo);
+ Message endMsg = session.createMessage();
+ tmp.send(endMsg);
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+ tmp.close();
+ }
+
+ public void tearDown() throws Exception
+ {
+ consumer.close();
+ session.close();
+ con.close();
+ }
+
+ public void onMessage(Message msg)
+ {
+ try
+ {
+ if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
+ {
+ notifyCompletion(msg.getJMSReplyTo());
+
+ synchronized (lock)
+ {
+ lock.notifyAll();
+ }
+ }
+ else
+ {
+ rcvdTime = System.currentTimeMillis();
+ rcvdMsgCount ++;
+
+ if (rcvdMsgCount == 1)
+ {
+ startTime = rcvdTime;
+ testStartTime = msg.getJMSTimestamp();
+ }
+
+ if (transacted && (rcvdMsgCount % transSize == 0))
+ {
+ session.commit();
+ }
+
+ long latency = rcvdTime - msg.getJMSTimestamp();
+ maxLatency = Math.max(maxLatency, latency);
+ minLatency = Math.min(minLatency, latency);
+ totalLatency = totalLatency + latency;
+ }
+
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when receiving messages");
+ }
+
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ printResults();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+ public static void main(String[] args)
+ {
+ PerfConsumer cons = new PerfConsumer();
+ cons.test();
+ }
+} \ No newline at end of file
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
new file mode 100644
index 0000000000..757b1bfcda
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
@@ -0,0 +1,207 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.perf;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+
+import org.apache.qpid.testkit.MessageFactory;
+
+/**
+ * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
+ * from the consumer that it has successfully consumed them and ready to start the
+ * test. It will start sending y no of messages and each message will contain a time
+ * stamp. This will be used at the receiving end to measure the latency.
+ *
+ * This is done with the assumption that both consumer and producer are running on
+ * the same machine or different machines which have time synced using a time server.
+ *
+ * This test also calculates the producer rate as follows.
+ * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
+ *
+ * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
+ *
+ * Rajith - Producer rate is not an accurate perf metric IMO.
+ * It is heavily inlfuenced by any in memory buffering.
+ * System throughput and latencies calculated by the PerfConsumer are more realistic
+ * numbers.
+ *
+ */
+public class PerfProducer extends PerfBase
+{
+ MessageProducer producer;
+ Message msg;
+ byte[] payload;
+
+ public PerfProducer()
+ {
+ super();
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ feedbackDest = session.createTemporaryQueue();
+
+ // if message caching is enabled we pre create the message
+ // else we pre create the payload
+ if (params.isCacheMessage())
+ {
+ msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
+ msg.setJMSDeliveryMode(params.isDurable()?
+ DeliveryMode.PERSISTENT :
+ DeliveryMode.NON_PERSISTENT
+ );
+ }
+ else
+ {
+ payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
+ }
+
+ producer = session.createProducer(dest);
+ producer.setDisableMessageID(params.isDisableMessageID());
+ producer.setDisableMessageTimestamp(params.isDisableTimestamp());
+ }
+
+ protected Message getNextMessage() throws Exception
+ {
+ if (params.isCacheMessage())
+ {
+ return msg;
+ }
+ else
+ {
+ msg = session.createBytesMessage();
+ ((BytesMessage)msg).writeBytes(payload);
+ return msg;
+ }
+ }
+
+ public void warmup()throws Exception
+ {
+ System.out.println("Warming up......");
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+
+ for (int i=0; i < params.getWarmupCount() -1; i++)
+ {
+ producer.send(getNextMessage());
+ }
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ }
+
+ public void startTest() throws Exception
+ {
+ System.out.println("Starting test......");
+ int count = params.getMsgCount();
+ boolean transacted = params.isTransacted();
+ int tranSize = params.getTransactionSize();
+
+ long start = System.currentTimeMillis();
+ for(int i=0; i < count; i++ )
+ {
+ Message msg = getNextMessage();
+ msg.setJMSTimestamp(System.currentTimeMillis());
+ producer.send(msg);
+ if ( transacted && ((i+1) % tranSize == 0))
+ {
+ session.commit();
+ }
+ }
+ long time = System.currentTimeMillis() - start;
+ double rate = ((double)count/(double)time)*1000;
+ System.out.println(new StringBuilder("Producer rate: ").
+ append(df.format(rate)).
+ append(" msg/sec").
+ toString());
+ }
+
+ public void waitForCompletion() throws Exception
+ {
+ MessageConsumer tmp = session.createConsumer(feedbackDest);
+ Message msg = session.createTextMessage("End");
+ msg.setJMSReplyTo(feedbackDest);
+ producer.send(msg);
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.receive();
+
+ if (params.isTransacted())
+ {
+ session.commit();
+ }
+
+ tmp.close();
+ System.out.println("Consumer has completed the test......");
+ }
+
+ public void tearDown() throws Exception
+ {
+ producer.close();
+ session.close();
+ con.close();
+ }
+
+ public void test()
+ {
+ try
+ {
+ setUp();
+ warmup();
+ startTest();
+ waitForCompletion();
+ tearDown();
+ }
+ catch(Exception e)
+ {
+ handleError(e,"Error when running test");
+ }
+ }
+
+
+ public static void main(String[] args)
+ {
+ PerfProducer prod = new PerfProducer();
+ prod.test();
+ }
+} \ No newline at end of file
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java
new file mode 100644
index 0000000000..15142cfced
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java
@@ -0,0 +1,160 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.perf;
+
+import javax.jms.Session;
+
+public class TestParams
+{
+ private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+
+ private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties";
+
+ private String connectionFactory = "connectionFactory";
+
+ private String transientDest = "transientQueue";
+
+ private String durableDest = "durableQueue";
+
+ private int msg_size = 512;
+
+ private int msg_type = 1; // not used yet
+
+ private boolean cacheMessage = true;
+
+ private boolean disableMessageID = false;
+
+ private boolean disableTimestamp = false;
+
+ private boolean durable = false;
+
+ private boolean transacted = false;
+
+ private int transaction_size = 1000;
+
+ private int ack_mode = Session.AUTO_ACKNOWLEDGE;
+
+ private int msg_count = 10;
+
+ private int warmup_count = 1;
+
+ public TestParams()
+ {
+ initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory);
+ providerURL = System.getProperty("java.naming.provider.url",providerURL);
+
+ transientDest = System.getProperty("transDest",transientDest);
+ durableDest = System.getProperty("durableDest",durableDest);
+
+ msg_size = Integer.getInteger("msg_size", 512);
+ msg_type = Integer.getInteger("msg_type",1);
+ cacheMessage = Boolean.getBoolean("cache_msg");
+ disableMessageID = Boolean.getBoolean("disableMessageID");
+ disableTimestamp = Boolean.getBoolean("disableTimestamp");
+ durable = Boolean.getBoolean("durable");
+ transacted = Boolean.getBoolean("transacted");
+ transaction_size = Integer.getInteger("trans_size",1000);
+ ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE);
+ msg_count = Integer.getInteger("msg_count",msg_count);
+ warmup_count = Integer.getInteger("warmup_count",warmup_count);
+ }
+
+ public int getAckMode()
+ {
+ return ack_mode;
+ }
+
+ public String getConnectionFactory()
+ {
+ return connectionFactory;
+ }
+
+ public String getTransientDestination()
+ {
+ return transientDest;
+ }
+
+ public String getDurableDestination()
+ {
+ return durableDest;
+ }
+
+ public String getInitialContextFactory()
+ {
+ return initialContextFactory;
+ }
+
+ public int getMsgCount()
+ {
+ return msg_count;
+ }
+
+ public int getMsgSize()
+ {
+ return msg_size;
+ }
+
+ public int getMsgType()
+ {
+ return msg_type;
+ }
+
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public String getProviderURL()
+ {
+ return providerURL;
+ }
+
+ public boolean isTransacted()
+ {
+ return transacted;
+ }
+
+ public int getTransactionSize()
+ {
+ return transaction_size;
+ }
+
+ public int getWarmupCount()
+ {
+ return warmup_count;
+ }
+
+ public boolean isCacheMessage()
+ {
+ return cacheMessage;
+ }
+
+ public boolean isDisableMessageID()
+ {
+ return disableMessageID;
+ }
+
+ public boolean isDisableTimestamp()
+ {
+ return disableTimestamp;
+ }
+
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java
new file mode 100644
index 0000000000..be8c4bbc75
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java
@@ -0,0 +1,149 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.testkit.MessageFactory;
+
+public class BaseTest
+{
+ protected String host = "127.0.0.1";
+ protected int msg_size = 100;
+ protected int msg_count = 10;
+ protected int session_count = 1;
+ protected boolean durable = false;
+ protected String queue_name = "message_queue";
+ protected String exchange_name = "amq.direct";
+ protected String routing_key = "routing_key";
+ protected String contentType = "application/octet-stream";
+ protected int port = 5672;
+ protected String url;
+ protected Message[] msgArray;
+
+ protected AMQConnection con;
+ protected Destination dest = null;
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+
+ public BaseTest()
+ {
+ host = System.getProperty("host", "127.0.0.1");
+ port = Integer.getInteger("port", 5672);
+ msg_size = Integer.getInteger("msg_size", 100);
+ msg_count = Integer.getInteger("msg_count", 10);
+ session_count = Integer.getInteger("session_count", 1);
+ durable = Boolean.getBoolean("durable");
+ queue_name = System.getProperty("queue_name", "message_queue");
+ exchange_name = System.getProperty("exchange_name", "amq.direct");
+ routing_key = System.getProperty("routing_key", "routing_key");
+ contentType = System.getProperty("content_type","application/octet-stream");
+
+
+
+ url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'";
+ }
+
+ public void setUp()
+ {
+ try
+ {
+ con = new AMQConnection(url);
+ con.start();
+
+
+ if (exchange_name.equals("amq.topic"))
+ {
+ dest = new AMQTopic(new AMQShortString(exchange_name),
+ new AMQShortString(routing_key),
+ false, //auto-delete
+ null, //queue name
+ durable);
+ }
+ else
+ {
+ dest = new AMQQueue(new AMQShortString(exchange_name),
+ new AMQShortString(routing_key),
+ new AMQShortString(queue_name),
+ false, //exclusive
+ false, //auto-delete
+ durable);
+ }
+
+ // Create the session to setup the messages
+ Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ if (msg_size == -1)
+ {
+ // This creates an array of 1000 messages from 500-1500 bytes
+ // During the tests a message will be picked randomly
+ msgArray = new Message[1000];
+ for (int i = 0; i < 1000; i++)
+ {
+ Message msg = (contentType.equals("text/plain")) ?
+ MessageFactory.createTextMessage(session,500 + i) :
+ MessageFactory.createBytesMessage(session, 500 + i);
+ msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ msgArray[i] = msg;
+ }
+ }
+ else
+ {
+ Message msg = (contentType.equals("text/plain")) ?
+ MessageFactory.createTextMessage(session, msg_size):
+ MessageFactory.createBytesMessage(session, msg_size);
+ msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ msgArray = new Message[]
+ { msg };
+ }
+
+ session.close();
+
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Error while setting up the test");
+ }
+ }
+
+ public void handleError(Exception e,String msg)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg);
+ sb.append(" @ ");
+ sb.append(df.format(new Date(System.currentTimeMillis())));
+ sb.append(" ");
+ sb.append(e.getMessage());
+ System.err.println(sb.toString());
+ e.printStackTrace();
+ }
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
new file mode 100644
index 0000000000..3117d268a3
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * Test Description
+ * ================
+ * The difference between this test and the
+ * LongDurationConsumer is that each Session runs
+ * in it's own Thread and the ability to receive
+ * messages transactionally.
+ *
+ * All consumers will still share the same destination.
+ *
+ */
+public class MultiThreadedConsumer extends BaseTest
+{
+ protected final boolean transacted;
+
+ public MultiThreadedConsumer()
+ {
+ super();
+ transacted = Boolean.getBoolean("transacted");
+ }
+
+ /**
+ * Creates a Session and a consumer that runs in its
+ * own thread.
+ * It can also consume transactionally.
+ *
+ */
+ public void test()
+ {
+ try
+ {
+ for (int i = 0; i < session_count; i++)
+ {
+
+ final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ MessageConsumer consumer = session.createConsumer(dest);
+
+ consumer.setMessageListener(new MessageListener()
+ {
+
+ public void onMessage(Message m)
+ {
+ try
+ {
+ String payload = ((TextMessage) m).getText();
+ if (payload.equals("End"))
+ {
+ System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis());
+ MessageProducer temp = session.createProducer(m.getJMSReplyTo());
+ Message controlMsg = session.createTextMessage();
+ temp.send(controlMsg);
+ if (transacted)
+ {
+ session.commit();
+ }
+ temp.close();
+ }
+ }
+ catch (JMSException e)
+ {
+ handleError(e,"Exception receiving messages");
+ }
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception creating a consumer");
+ }
+
+ }
+
+ });
+ t.setName("session-" + i);
+ t.start();
+ } // for loop
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception while setting up the test");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ MultiThreadedConsumer test = new MultiThreadedConsumer();
+ test.setUp();
+ test.test();
+ }
+
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
new file mode 100644
index 0000000000..886c64bb81
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import java.util.Random;
+import java.util.UUID;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * Test Description
+ * ================
+ *
+ * This test creats x number of sessions, where each session
+ * runs in it's own thread. Each session creates a producer
+ * and it's own feedback queue.
+ *
+ * A producer will send n-1 messages, followed by the n-th
+ * message which contains "End" in it's payload to signal
+ * that this is the last message message in the sequence.
+ * The end message has the feedback queue as it's replyTo.
+ * It will then listen on the feedback queue waiting for the
+ * confirmation and then sleeps for 1000 ms before proceeding
+ * with the next n messages.
+ *
+ * This hand shaking mechanism ensures that all of the
+ * messages sent are consumed by some consumer. This prevents
+ * the producers from saturating the broker especially when
+ * the consumers are slow.
+ *
+ * All producers send to a single destination
+ * If using transactions it's best to use smaller message count
+ * as the test only commits after sending all messages in a batch.
+ *
+ */
+
+public class MultiThreadedProducer extends SimpleProducer
+{
+ protected final boolean transacted;
+
+ public MultiThreadedProducer()
+ {
+ super();
+ transacted = Boolean.getBoolean("transacted");
+ }
+
+ public void test()
+ {
+ try
+ {
+ final int msg_count_per_session = msg_count/session_count;
+
+ for (int i = 0; i < session_count; i++)
+ {
+ final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
+ Thread t = new Thread(new Runnable()
+ {
+ private Random gen = new Random();
+
+ private Message getNextMessage()
+ {
+ if (msg_size == -1)
+ {
+ int index = gen.nextInt(1000);
+ return msgArray[index];
+ }
+ else
+ {
+ return msgArray[0];
+ }
+ }
+
+ public void run()
+ {
+ try
+ {
+ MessageProducer prod = session.createProducer(dest);
+ // this will ensure that the producer will not overun the consumer.
+ feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString(UUID
+ .randomUUID().toString()), new AMQShortString("control"));
+
+ MessageConsumer feedbackConsumer = session.createConsumer(feedbackQueue);
+
+ while (true)
+ {
+ for (int i = 0; i < msg_count_per_session; i++)
+ {
+ Message msg = getNextMessage();
+ msg.setJMSMessageID("ID:" + UUID.randomUUID());
+ prod.send(msg);
+ }
+
+ TextMessage m = session.createTextMessage("End");
+ m.setJMSMessageID("ID:" + UUID.randomUUID());
+ m.setJMSReplyTo(feedbackQueue);
+ prod.send(m);
+
+ if (transacted)
+ {
+ session.commit();
+ }
+
+ System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis());
+ feedbackConsumer.receive();
+ if (transacted)
+ {
+ session.commit();
+ }
+ Thread.sleep(1000);
+ }
+
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception in producing message");
+ }
+
+ }
+
+ });
+ t.setName("session-" + i);
+ t.start();
+
+ }
+
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception while setting up the test");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ MultiThreadedProducer test = new MultiThreadedProducer();
+ test.setUp();
+ test.test();
+ }
+
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
new file mode 100644
index 0000000000..1faa9be864
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * Test Description
+ * ================
+ * This test will open x number of connections where each
+ * connection will create a session and a producer/consumer pair,
+ * and then send configurable no of messages.
+ * It will them sleep for configurable time interval and
+ * tear down the connections/sessions/consumers.
+ * It will then repeat the process again until the test is stopped.
+ *
+ * Purpose of the test
+ * ===================
+ * To find if the broker has leaks when cleaning resources.
+ * To find if the client has leaks with resources.
+ */
+public class ResourceLeakTest extends BaseTest
+{
+ protected int connection_count = 10;
+ protected long connection_idle_time = 5000;
+
+ public ResourceLeakTest()
+ {
+ super();
+ connection_count = Integer.getInteger("con_count",10);
+ connection_idle_time = Long.getLong("con_idle_time", 5000);
+ }
+
+ public void test()
+ {
+ try
+ {
+
+ AMQConnection[] cons = new AMQConnection[connection_count];
+ Session[] sessions = new Session[connection_count];
+ MessageConsumer[] msgCons = new MessageConsumer[connection_count];
+ MessageProducer [] msgProds = new MessageProducer[connection_count];
+ Destination dest = new AMQQueue(new AMQShortString(exchange_name),
+ new AMQShortString(routing_key),
+ new AMQShortString(queue_name),
+ true, //exclusive
+ true // auto delete
+ );
+
+ while (true)
+ {
+ for (int i = 0; i < connection_count; i++)
+ {
+ AMQConnection con = new AMQConnection(url);
+ con.start();
+ cons[i] = con;
+ Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ sessions[i] = ssn;
+ MessageConsumer msgCon = ssn.createConsumer(dest);
+ msgCons[i] = msgCon;
+ MessageProducer msgProd = ssn.createProducer(dest);
+ msgProds[i] = msgProd;
+
+ BytesMessage msg = ssn.createBytesMessage();
+ msg.writeBytes("Test Msg".getBytes());
+
+ for (int j = 0; j < msg_count;j++)
+ {
+ msgProd.send(msg);
+ }
+
+ int j = 0;
+ while (j < msg_count)
+ {
+ msgCon.receive();
+ j++;
+ }
+ }
+ Thread.sleep(connection_idle_time);
+
+ try
+ {
+ for (int i = 0; i < connection_count; i++)
+ {
+ msgCons[i].close();
+ msgProds[i].close();
+ sessions[i].close();
+ cons[i].close();
+ }
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception closing resources");
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception in setting up the test");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ ResourceLeakTest test = new ResourceLeakTest();
+ test.test();
+ }
+
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
new file mode 100644
index 0000000000..5ef72d7538
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * Test Description
+ * ================
+ * This test will create x number of sessions.
+ * Each session will have it's own consumer.
+ * Once a consumer receives the "End" message it
+ * will send a message to the destination indicated
+ * by the replyTo field in the End message.
+ * This will signal the producer that all the previous
+ * messages have been consumed. The producer will
+ * then start sending messages again.
+ *
+ * This prevents the producer from overruning the
+ * consumer.
+ * *
+ * All consumers share a single destination
+ *
+ */
+
+public class SimpleConsumer extends BaseTest
+{
+ public SimpleConsumer()
+ {
+ super();
+ }
+
+ public void test()
+ {
+ try
+ {
+ final Session[] sessions = new Session[session_count];
+ MessageConsumer[] cons = new MessageConsumer[session_count];
+
+ for (int i = 0; i < session_count; i++)
+ {
+ sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ cons[i] = sessions[i].createConsumer(dest);
+ cons[i].setMessageListener(new MessageListener()
+ {
+
+ public void onMessage(Message m)
+ {
+ try
+ {
+ String payload = ((TextMessage) m).getText();
+ if (payload.equals("End"))
+ {
+ System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis());
+ MessageProducer temp = sessions[0].createProducer(m.getJMSReplyTo());
+ Message controlMsg = sessions[0].createTextMessage();
+ temp.send(controlMsg);
+ temp.close();
+ }
+ }
+ catch (JMSException e)
+ {
+ handleError(e,"Exception when receiving the message");
+ }
+ }
+ });
+ }
+
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception when setting up the consumers");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ SimpleConsumer test = new SimpleConsumer();
+ test.setUp();
+ test.test();
+ }
+
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
new file mode 100644
index 0000000000..bdae79fd41
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit.soak;
+
+
+import java.util.Random;
+import java.util.UUID;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.framing.AMQShortString;
+
+/**
+ * Test Description
+ * ================
+ * This test will send n-1 messages, followed by the n-th
+ * message which contains "End" in it's payload to signal
+ * that this is the last message message in the sequence.
+ * The end message has the feedback queue as it's replyTo.
+ * It will then listen on the feedback queue waiting for the
+ * confirmation and then sleeps for 1000 ms before proceeding
+ * with the next n messages.
+ *
+ * This hand shaking mechanism ensures that all of the
+ * messages sent are consumed by some consumer. This prevents
+ * the producers from saturating the broker especially when
+ * the consumers are slow.
+ *
+ * It creates a producer per session.
+ * If session_count is > 1 it will round robin the messages
+ * btw the producers.
+ *
+ * All producers send to a single destination
+ *
+ */
+
+public class SimpleProducer extends BaseTest
+{
+ protected Destination feedbackQueue;
+ Random gen = new Random();
+
+ public SimpleProducer()
+ {
+ super();
+ }
+
+ protected Message getNextMessage()
+ {
+ if (msg_size == -1)
+ {
+ int index = gen.nextInt(1000);
+ return msgArray[index];
+ }
+ else
+ {
+ return msgArray[0];
+ }
+ }
+
+ public void test()
+ {
+ try
+ {
+ Session[] sessions = new Session[session_count];
+ MessageProducer[] prods = new MessageProducer[session_count];
+
+ for (int i = 0; i < session_count; i++)
+ {
+ sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ prods[i] = sessions[i].createProducer(dest);
+ }
+
+ // this will ensure that the producer will not overun the consumer.
+ feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"),
+ new AMQShortString(UUID.randomUUID().toString()),
+ new AMQShortString("control"));
+
+ MessageConsumer feedbackConsumer = sessions[0].createConsumer(feedbackQueue);
+
+ int prod_pointer = 0;
+ boolean multi_session = session_count > 1 ? true : false;
+
+ while (true)
+ {
+ for (int i = 0; i < msg_count - 1; i++)
+ {
+ Message msg = getNextMessage();
+ msg.setJMSMessageID("ID:" + UUID.randomUUID());
+ prods[prod_pointer].send(msg);
+ if (multi_session)
+ {
+ prod_pointer++;
+ if (prod_pointer == session_count)
+ {
+ prod_pointer = 0;
+ }
+ }
+ }
+
+ TextMessage m = sessions[0].createTextMessage("End");
+ m.setJMSMessageID("ID:" + UUID.randomUUID());
+ m.setJMSReplyTo(feedbackQueue);
+ prods[prod_pointer].send(m);
+ System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis());
+ feedbackConsumer.receive();
+ Thread.sleep(1000);
+ }
+ }
+ catch (Exception e)
+ {
+ handleError(e,"Exception while setting up the producer");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ SimpleProducer test = new SimpleProducer();
+ test.setUp();
+ test.test();
+ }
+
+}