summaryrefslogtreecommitdiff
path: root/java/testkit/src
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-12-26 12:42:57 +0000
committerRafael H. Schloming <rhs@apache.org>2009-12-26 12:42:57 +0000
commit248f1fe188fe2307b9dcf2c87a83b653eaa1920c (patch)
treed5d0959a70218946ff72e107a6c106e32479a398 /java/testkit/src
parent3c83a0e3ec7cf4dc23e83a340b25f5fc1676f937 (diff)
downloadqpid-python-248f1fe188fe2307b9dcf2c87a83b653eaa1920c.tar.gz
synchronized with trunk except for ruby dir
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/testkit/src')
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/Client.java90
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java6
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java43
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java225
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java195
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java384
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java102
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java248
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java207
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java160
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java152
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java153
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java166
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java96
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java134
-rw-r--r--java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java146
16 files changed, 969 insertions, 1538 deletions
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java b/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java
new file mode 100644
index 0000000000..88d78ee78c
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Session;
+
+public abstract class Client
+{
+ protected Connection con;
+ protected Session ssn;
+ protected boolean durable = false;
+ protected boolean transacted = false;
+ protected int txSize = 10;
+ protected int ack_mode = Session.AUTO_ACKNOWLEDGE;
+ protected String contentType = "application/octet-stream";
+ protected Destination dest = null;
+
+ protected long reportFrequency = 60000; // every min
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+
+ protected long startTime = System.currentTimeMillis();
+ protected ErrorHandler errorHandler = null;
+
+ public Client(Connection con) throws Exception
+ {
+ this.con = con;
+ durable = Boolean.getBoolean("durable");
+ transacted = Boolean.getBoolean("transacted");
+ txSize = Integer.getInteger("tx_size",10);
+ contentType = System.getProperty("content_type","application/octet-stream");
+ reportFrequency = Long.getLong("report_frequency", 60000);
+ }
+
+ public void close()
+ {
+ try
+ {
+ con.close();
+ }
+ catch (Exception e)
+ {
+ handleError("Error closing connection",e);
+ }
+ }
+
+ public void setErrorHandler(ErrorHandler h)
+ {
+ this.errorHandler = h;
+ }
+
+ public void handleError(String msg,Exception e)
+ {
+ if (errorHandler != null)
+ {
+ errorHandler.handleError(msg, e);
+ }
+ else
+ {
+ System.err.println(msg);
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
new file mode 100644
index 0000000000..a1add8e03f
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java
@@ -0,0 +1,6 @@
+package org.apache.qpid.testkit;
+
+public interface ErrorHandler {
+
+ public void handleError(String msg,Exception e);
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java b/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java
deleted file mode 100644
index f2784ef499..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package org.apache.qpid.testkit;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-public class MessageFactory
-{
- public static Message createBytesMessage(Session ssn, int size) throws JMSException
- {
- BytesMessage msg = ssn.createBytesMessage();
- msg.writeBytes(createMessagePayload(size).getBytes());
- return msg;
- }
-
- public static Message createTextMessage(Session ssn, int size) throws JMSException
- {
- TextMessage msg = ssn.createTextMessage();
- msg.setText(createMessagePayload(size));
- return msg;
- }
-
- public static String createMessagePayload(int size)
- {
- String msgData = "Qpid Test Message";
-
- StringBuffer buf = new StringBuffer(size);
- int count = 0;
- while (count <= (size - msgData.length()))
- {
- buf.append(msgData);
- count += msgData.length();
- }
- if (count < size)
- {
- buf.append(msgData, 0, size - count);
- }
-
- return buf.toString();
- }
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java b/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
new file mode 100644
index 0000000000..19ae325d4b
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java
@@ -0,0 +1,225 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+
+/**
+ * A generic receiver which consumers a stream of messages
+ * from a given address in a broker (host/port)
+ * until told to stop by killing it.
+ *
+ * It participates in a feedback loop to ensure the producer
+ * doesn't fill up the queue. If it receives an "End" msg
+ * it sends a reply to the replyTo address in that msg.
+ *
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity.
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ *
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err.
+ *
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable
+ * via jvm args.
+ *
+ * sync_rcv - Whether to consume sync (instead of using a listener).
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs.
+ */
+public class Receiver extends Client implements MessageListener
+{
+ // Until addressing is properly supported.
+ protected enum Reliability {
+ AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE;
+
+ Reliability getReliability(String s)
+ {
+ if (s.equalsIgnoreCase("at_most_once"))
+ {
+ return AT_MOST_ONCE;
+ }
+ else if (s.equalsIgnoreCase("at_least_once"))
+ {
+ return AT_LEAST_ONCE;
+ }
+ else
+ {
+ return EXACTLY_ONCE;
+ }
+ }
+ };
+
+ long msg_count = 0;
+ int sequence = 0;
+ boolean sync_rcv = Boolean.getBoolean("sync_rcv");
+ boolean uniqueDests = Boolean.getBoolean("unique_dests");
+ Reliability reliability = Reliability.EXACTLY_ONCE;
+ MessageConsumer consumer;
+ List<Integer> duplicateMessages = new ArrayList<Integer>();
+
+ public Receiver(Connection con,Destination dest) throws Exception
+ {
+ super(con);
+ reliability = reliability.getReliability(System.getProperty("reliability","exactly_once"));
+ ssn = con.createSession(transacted,ack_mode);
+ consumer = ssn.createConsumer(dest);
+ if (!sync_rcv)
+ {
+ consumer.setMessageListener(this);
+ }
+
+ System.out.println("Operating in mode : " + reliability);
+ System.out.println("Receiving messages from : " + dest);
+ }
+
+ public void onMessage(Message msg)
+ {
+ handleMessage(msg);
+ }
+
+ public void run() throws Exception
+ {
+ while(true)
+ {
+ if(sync_rcv)
+ {
+ Message msg = consumer.receive();
+ handleMessage(msg);
+ }
+ Thread.sleep(reportFrequency);
+ System.out.println(df.format(System.currentTimeMillis())
+ + " - messages received : " + msg_count);
+ }
+ }
+
+ private void handleMessage(Message m)
+ {
+ try
+ {
+ if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
+ {
+ MessageProducer temp = ssn.createProducer(m.getJMSReplyTo());
+ Message controlMsg = ssn.createTextMessage();
+ temp.send(controlMsg);
+ if (transacted)
+ {
+ ssn.commit();
+ }
+ temp.close();
+ }
+ else
+ {
+
+ int seq = m.getIntProperty("sequence");
+ if (uniqueDests)
+ {
+ if (seq == 0)
+ {
+ sequence = 0; // wrap around for each iteration
+ }
+
+ if (seq < sequence)
+ {
+ duplicateMessages.add(seq);
+ if (reliability == Reliability.EXACTLY_ONCE)
+ {
+ throw new Exception(": Received a duplicate message (expected="
+ + sequence + ",received=" + seq + ")" );
+ }
+ }
+ else if (seq == sequence)
+ {
+ sequence++;
+ msg_count ++;
+ }
+ else
+ {
+ // Multiple publishers are not allowed in this test case.
+ // So out of order messages are not allowed.
+ throw new Exception(": Received an out of order message (expected="
+ + sequence + ",received=" + seq + ")" );
+ }
+ }
+ // Please note that this test case doesn't expect duplicates
+ // When testing for transactions.
+ if (transacted && msg_count % txSize == 0)
+ {
+ ssn.commit();
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ handleError("Exception receiving messages",e);
+ }
+ }
+
+ // Receiver host port address
+ public static void main(String[] args) throws Exception
+ {
+ String host = "127.0.0.1";
+ int port = 5672;
+
+ if (args.length > 0)
+ {
+ host = args[0];
+ }
+ if (args.length > 1)
+ {
+ port = Integer.parseInt(args[1]);
+ }
+ // #3rd argument should be an address
+ // Any other properties is best configured via jvm args
+
+ AMQConnection con = new AMQConnection(
+ "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "'");
+
+ // FIXME Need to add support for the new address format
+ // Then it's trivial to add destination for that.
+ Receiver rcv = new Receiver(con,null);
+ rcv.run();
+ }
+
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java b/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
new file mode 100644
index 0000000000..4dbe278e33
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.tools.MessageFactory;
+
+/**
+ * A generic sender which sends a stream of messages
+ * to a given address in a broker (host/port)
+ * until told to stop by killing it.
+ *
+ * It has a feedback loop to ensure it doesn't fill
+ * up queues due to a slow consumer.
+ *
+ * It doesn't check for correctness or measure anything
+ * leaving those concerns to another entity.
+ * However it prints a timestamp every x secs(-Dreport_frequency)
+ * as checkpoint to figure out how far the test has progressed if
+ * a failure occurred.
+ *
+ * It also takes in an optional Error handler to
+ * pass out any error in addition to writing them to std err.
+ *
+ * This is intended more as building block to create
+ * more complex test cases. However there is a main method
+ * provided to use this standalone.
+ *
+ * The following options are available and configurable
+ * via jvm args.
+ *
+ * msg_size (256)
+ * msg_count (10) - # messages before waiting for feedback
+ * sleep_time (1000 ms) - sleep time btw each iteration
+ * report_frequency - how often a timestamp is printed
+ * durable
+ * transacted
+ * tx_size - size of transaction batch in # msgs.
+ */
+public class Sender extends Client
+{
+ protected int msg_size = 256;
+ protected int msg_count = 10;
+ protected int iterations = -1;
+ protected long sleep_time = 1000;
+
+ protected Destination dest = null;
+ protected Destination replyTo = null;
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+
+ protected MessageProducer producer;
+ Random gen = new Random(19770905);
+
+ public Sender(Connection con,Destination dest) throws Exception
+ {
+ super(con);
+ this.msg_size = Integer.getInteger("msg_size", 100);
+ this.msg_count = Integer.getInteger("msg_count", 10);
+ this.iterations = Integer.getInteger("iterations", -1);
+ this.sleep_time = Long.getLong("sleep_time", 1000);
+ this.ssn = con.createSession(transacted,Session.AUTO_ACKNOWLEDGE);
+ this.dest = dest;
+ this.producer = ssn.createProducer(dest);
+ this.replyTo = ssn.createTemporaryQueue();
+
+ System.out.println("Sending messages to : " + dest);
+ }
+
+ /*
+ * If msg_size not specified it generates a message
+ * between 500-1500 bytes.
+ */
+ protected Message getNextMessage() throws Exception
+ {
+ int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size;
+ Message msg = (contentType.equals("text/plain")) ?
+ MessageFactory.createTextMessage(ssn, s):
+ MessageFactory.createBytesMessage(ssn, s);
+
+ msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT
+ : DeliveryMode.NON_PERSISTENT);
+ return msg;
+ }
+
+ public void run()
+ {
+ try
+ {
+ boolean infinite = (iterations == -1);
+ for (int x=0; infinite || x < iterations; x++)
+ {
+ long now = System.currentTimeMillis();
+ if (now - startTime >= reportFrequency)
+ {
+ System.out.println(df.format(now) + " - iterations : " + x);
+ startTime = now;
+ }
+
+ for (int i = 0; i < msg_count; i++)
+ {
+ Message msg = getNextMessage();
+ msg.setIntProperty("sequence",i);
+ producer.send(msg);
+ if (transacted && msg_count % txSize == 0)
+ {
+ ssn.commit();
+ }
+ }
+ TextMessage m = ssn.createTextMessage("End");
+ m.setJMSReplyTo(replyTo);
+ producer.send(m);
+
+ if (transacted)
+ {
+ ssn.commit();
+ }
+
+ MessageConsumer feedbackConsumer = ssn.createConsumer(replyTo);
+ feedbackConsumer.receive();
+ feedbackConsumer.close();
+ if (transacted)
+ {
+ ssn.commit();
+ }
+ Thread.sleep(sleep_time);
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Exception sending messages",e);
+ }
+ }
+
+ // Receiver host port address
+ public static void main(String[] args) throws Exception
+ {
+ String host = "127.0.0.1";
+ int port = 5672;
+
+ if (args.length > 0)
+ {
+ host = args[0];
+ }
+ if (args.length > 1)
+ {
+ port = Integer.parseInt(args[1]);
+ }
+ // #3rd argument should be an address
+ // Any other properties is best configured via jvm args
+
+ AMQConnection con = new AMQConnection(
+ "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "'");
+
+ // FIXME Need to add support for the new address format
+ // Then it's trivial to add destination for that.
+ Sender sender = new Sender(con,null);
+ sender.run();
+ }
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
new file mode 100644
index 0000000000..b55afa7066
--- /dev/null
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java
@@ -0,0 +1,384 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.testkit;
+
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.text.DateFormat;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.thread.Threading;
+
+/**
+ * A basic test case class that could launch a Sender/Receiver
+ * or both, each on it's own separate thread.
+ *
+ * If con_count == ssn_count, then each entity created will have
+ * it's own Connection. Else if con_count < ssn_count, then
+ * a connection will be shared by ssn_count/con_count # of entities.
+ *
+ * The if both sender and receiver options are set, it will
+ * share a connection.
+ *
+ * The following options are available as jvm args
+ * host, port
+ * con_count,ssn_count
+ * con_idle_time - which determines heartbeat
+ * sender, receiver - booleans which indicate which entity to create.
+ * Setting them both is also a valid option.
+ */
+public class TestLauncher implements ErrorHandler
+{
+ protected String host = "127.0.0.1";
+ protected int port = 5672;
+ protected int session_count = 1;
+ protected int connection_count = 1;
+ protected long connection_idle_time = 5000;
+ protected boolean sender = false;
+ protected boolean receiver = false;
+ protected String url;
+
+ protected String queue_name = "message_queue";
+ protected String exchange_name = "amq.direct";
+ protected String routing_key = "routing_key";
+ protected boolean uniqueDests = false;
+ protected boolean durable = false;
+ protected String failover = "";
+ protected AMQConnection controlCon;
+ protected Destination controlDest = null;
+ protected Session controlSession = null;
+ protected MessageProducer statusSender;
+ protected List<AMQConnection> clients = new ArrayList<AMQConnection>();
+ protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
+ protected NumberFormat nf = new DecimalFormat("##.00");
+ protected String testName;
+
+ public TestLauncher()
+ {
+ testName = System.getProperty("test_name","UNKNOWN");
+ host = System.getProperty("host", "127.0.0.1");
+ port = Integer.getInteger("port", 5672);
+ session_count = Integer.getInteger("ssn_count", 1);
+ connection_count = Integer.getInteger("con_count", 1);
+ connection_idle_time = Long.getLong("con_idle_time", 5000);
+ sender = Boolean.getBoolean("sender");
+ receiver = Boolean.getBoolean("receiver");
+
+ queue_name = System.getProperty("queue_name", "message_queue");
+ exchange_name = System.getProperty("exchange_name", "amq.direct");
+ routing_key = System.getProperty("routing_key", "routing_key");
+ failover = System.getProperty("failover", "");
+ uniqueDests = Boolean.getBoolean("unique_dests");
+ durable = Boolean.getBoolean("durable");
+
+ url = "amqp://username:password@topicClientid/test?brokerlist='tcp://"
+ + host + ":" + port + "?idle_timeout=" + connection_idle_time
+ + "'";
+
+ if (failover.equalsIgnoreCase("failover_exchange"))
+ {
+ url += "&failover='failover_exchange'";
+
+ System.out.println("Failover exchange " + url );
+ }
+
+ configureLogging();
+ }
+
+ protected void configureLogging()
+ {
+ PatternLayout layout = new PatternLayout();
+ layout.setConversionPattern("%t %d %p [%c{4}] %m%n");
+ BasicConfigurator.configure(new ConsoleAppender(layout));
+
+ String logLevel = System.getProperty("log.level","warn");
+ String logComponent = System.getProperty("log.comp","org.apache.qpid");
+
+ Logger logger = Logger.getLogger(logComponent);
+ logger.setLevel(Level.toLevel(logLevel, Level.WARN));
+
+ System.out.println("Level " + logger.getLevel());
+
+ }
+
+ public void setUpControlChannel()
+ {
+ try
+ {
+ controlCon = new AMQConnection(url);
+ controlCon.start();
+
+ controlDest = new AMQQueue(new AMQShortString(""),
+ new AMQShortString("control"),
+ new AMQShortString("control"),
+ false, //exclusive
+ false, //auto-delete
+ false); // durable
+
+ // Create the session to setup the messages
+ controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ statusSender = controlSession.createProducer(controlDest);
+
+ }
+ catch (Exception e)
+ {
+ handleError("Error while setting up the test",e);
+ }
+ }
+
+ public void cleanup()
+ {
+ try
+ {
+ controlSession.close();
+ controlCon.close();
+ for (AMQConnection con : clients)
+ {
+ con.close();
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Error while tearing down the test",e);
+ }
+ }
+
+ public void start()
+ {
+ try
+ {
+
+ int ssn_per_con = session_count;
+ if (connection_count < session_count)
+ {
+ ssn_per_con = session_count/connection_count;
+ }
+
+ for (int i = 0; i< connection_count; i++)
+ {
+ AMQConnection con = new AMQConnection(url);
+ con.start();
+ clients.add(con);
+ for (int j = 0; j< ssn_per_con; j++)
+ {
+ String prefix = createPrefix(i,j);
+ Destination dest = createDest(prefix);
+ if (sender)
+ {
+ createSender(prefix,con,dest,this);
+ }
+
+ if (receiver)
+ {
+ createReceiver(prefix,con,dest,this);
+ }
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ handleError("Exception while setting up the test",e);
+ }
+
+ }
+
+ protected void createReceiver(String index,final AMQConnection con, final Destination dest, final ErrorHandler h)
+ {
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Receiver rcv = new Receiver(con,dest);
+ rcv.setErrorHandler(h);
+ rcv.run();
+ }
+ catch (Exception e)
+ {
+ h.handleError("Error Starting Receiver", e);
+ }
+ }
+ };
+
+ Thread t = null;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ handleError("Error creating Receive thread",e);
+ }
+
+ t.setName("ReceiverThread-" + index);
+ t.start();
+ }
+
+ protected void createSender(String index,final AMQConnection con, final Destination dest, final ErrorHandler h)
+ {
+ Runnable r = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ Sender sender = new Sender(con, dest);
+ sender.setErrorHandler(h);
+ sender.run();
+ }
+ catch (Exception e)
+ {
+ h.handleError("Error Starting Sender", e);
+ }
+ }
+ };
+
+ Thread t = null;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ handleError("Error creating Sender thread",e);
+ }
+
+ t.setName("SenderThread-" + index);
+ t.start();
+ }
+
+ public void handleError(String msg,Exception e)
+ {
+ // In case sending the message fails
+ StringBuilder sb = new StringBuilder();
+ sb.append(msg);
+ sb.append(" @ ");
+ sb.append(df.format(new Date(System.currentTimeMillis())));
+ sb.append(" ");
+ sb.append(e.getMessage());
+ System.err.println(sb.toString());
+ e.printStackTrace();
+
+ try
+ {
+ TextMessage errorMsg = controlSession.createTextMessage();
+ errorMsg.setStringProperty("status", "error");
+ errorMsg.setStringProperty("desc", msg);
+ errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis())));
+ errorMsg.setStringProperty("exception-trace", serializeStackTrace(e));
+ synchronized (this)
+ {
+ statusSender.send(errorMsg);
+ }
+ } catch (JMSException e1) {
+ e1.printStackTrace();
+ }
+ }
+
+ private String serializeStackTrace(Exception e)
+ {
+ ByteArrayOutputStream bOut = new ByteArrayOutputStream();
+ PrintStream printStream = new PrintStream(bOut);
+ e.printStackTrace(printStream);
+ printStream.close();
+ return bOut.toString();
+ }
+
+ private String createPrefix(int i, int j)
+ {
+ return String.valueOf(i).concat(String.valueOf(j));
+ }
+
+ /**
+ * The following are supported.
+ *
+ * 1. A producer/consumer pair on a topic or a queue
+ * 2. A single producer with multiple consumers on topic/queue
+ *
+ * Multiple consumers on a topic will result in a private queue
+ * for each consumers.
+ *
+ * We want to avoid multiple producers on the same topic/queue
+ * as the queues will fill up in no time.
+ */
+ private Destination createDest(String prefix)
+ {
+ Destination dest = null;
+ if (exchange_name.equals("amq.topic"))
+ {
+ dest = new AMQTopic(
+ new AMQShortString(exchange_name),
+ new AMQShortString(uniqueDests ? prefix + routing_key :
+ routing_key),
+ false, //auto-delete
+ null, //queue name
+ durable);
+ }
+ else
+ {
+ dest = new AMQQueue(
+ new AMQShortString(exchange_name),
+ new AMQShortString(uniqueDests ? prefix + routing_key :
+ routing_key),
+ new AMQShortString(uniqueDests ? prefix + queue_name :
+ queue_name),
+ false, //exclusive
+ false, //auto-delete
+ durable);
+ }
+ return dest;
+ }
+
+ public static void main(String[] args)
+ {
+ final TestLauncher test = new TestLauncher();
+ test.setUpControlChannel();
+ test.start();
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() { test.cleanup(); }
+ });
+
+ }
+}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java
deleted file mode 100644
index 95670f0507..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.perf;
-
-import java.text.DecimalFormat;
-import java.util.Hashtable;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Session;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-
-public class PerfBase
-{
- TestParams params;
- Connection con;
- Session session;
- Destination dest;
- Destination feedbackDest;
- DecimalFormat df = new DecimalFormat("###.##");
-
- public PerfBase()
- {
- params = new TestParams();
- }
-
- public void setUp() throws Exception
- {
- Hashtable<String,String> env = new Hashtable<String,String>();
- env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory());
- env.put(Context.PROVIDER_URL, params.getProviderURL());
-
- Context ctx = null;
- try
- {
- ctx = new InitialContext(env);
- }
- catch(Exception e)
- {
- throw new Exception("Error initializing JNDI",e);
-
- }
-
- ConnectionFactory conFac = null;
- try
- {
- conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory());
- }
- catch(Exception e)
- {
- throw new Exception("Error looking up connection factory",e);
- }
-
- con = conFac.createConnection();
- con.start();
- session = con.createSession(params.isTransacted(),
- params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode());
-
- try
- {
- dest = (Destination)ctx.lookup( params.isDurable()?
- params.getDurableDestination():
- params.getTransientDestination()
- );
- }
- catch(Exception e)
- {
- throw new Exception("Error looking up destination",e);
- }
- }
-
- public void handleError(Exception e,String msg)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
- }
-}
-
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
deleted file mode 100644
index cd12c7010d..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.perf;
-
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
-/**
- * PerfConsumer will receive x no of messages in warmup mode.
- * Once it receives the Start message it will then signal the PerfProducer.
- * It will start recording stats from the first message it receives after
- * the warmup mode is done.
- *
- * The following calculations are done.
- * The important numbers to look at is
- * a) Avg Latency
- * b) System throughput.
- *
- * Latency.
- * =========
- * Currently this test is written with the assumption that either
- * a) The Perf Producer and Consumer are on the same machine
- * b) They are on separate machines that have their time synced via a Time Server
- *
- * In order to calculate latency the producer inserts a timestamp
- * hen the message is sent. The consumer will note the current time the message is
- * received and will calculate the latency as follows
- * latency = rcvdTime - msg.getJMSTimestamp()
- *
- * Through out the test it will keep track of the max and min latency to show the
- * variance in latencies.
- *
- * Avg latency is measured by adding all latencies and dividing by the total msgs.
- * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount
- *
- * Throughput
- * ===========
- * System throughput is calculated as follows
- * rcvdMsgCount/(rcvdTime - testStartTime)
- *
- * Consumer rate is calculated as
- * rcvdMsgCount/(rcvdTime - startTime)
- *
- * Note that the testStartTime referes to when the producer sent the first message
- * and startTime is when the consumer first received a message.
- *
- * rcvdTime keeps track of when the last message is received.
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- */
-
-public class PerfConsumer extends PerfBase implements MessageListener
-{
- MessageConsumer consumer;
- long maxLatency = 0;
- long minLatency = Long.MAX_VALUE;
- long totalLatency = 0; // to calculate avg latency.
- int rcvdMsgCount = 0;
- long testStartTime = 0; // to measure system throughput
- long startTime = 0; // to measure consumer throughput
- long rcvdTime = 0;
- boolean transacted = false;
- int transSize = 0;
-
- final Object lock = new Object();
-
- public PerfConsumer()
- {
- super();
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- consumer = session.createConsumer(dest);
-
- // Storing the following two for efficiency
- transacted = params.isTransacted();
- transSize = params.getTransactionSize();
- }
-
- public void warmup()throws Exception
- {
- System.out.println("Warming up......");
-
- boolean start = false;
- while (!start)
- {
- Message msg = consumer.receive();
- if (msg instanceof TextMessage)
- {
- if (((TextMessage)msg).getText().equals("End"))
- {
- start = true;
- MessageProducer temp = session.createProducer(msg.getJMSReplyTo());
- temp.send(session.createMessage());
- if (params.isTransacted())
- {
- session.commit();
- }
- temp.close();
- }
- }
- }
- }
-
- public void startTest() throws Exception
- {
- System.out.println("Starting test......");
- consumer.setMessageListener(this);
- }
-
- public void printResults() throws Exception
- {
- synchronized (lock)
- {
- lock.wait();
- }
-
- double avgLatency = (double)totalLatency/(double)rcvdMsgCount;
- double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000;
- double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000;
- System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString());
- System.out.println(new StringBuilder("Consumer rate : ").
- append(df.format(consRate)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("System Throughput : ").
- append(df.format(throughput)).
- append(" msg/sec").toString());
- System.out.println(new StringBuilder("Avg Latency : ").
- append(df.format(avgLatency)).
- append(" ms").toString());
- System.out.println(new StringBuilder("Min Latency : ").
- append(minLatency).
- append(" ms").toString());
- System.out.println(new StringBuilder("Max Latency : ").
- append(maxLatency).
- append(" ms").toString());
- System.out.println("Completed the test......\n");
- }
-
- public void notifyCompletion(Destination replyTo) throws Exception
- {
- MessageProducer tmp = session.createProducer(replyTo);
- Message endMsg = session.createMessage();
- tmp.send(endMsg);
- if (params.isTransacted())
- {
- session.commit();
- }
- tmp.close();
- }
-
- public void tearDown() throws Exception
- {
- consumer.close();
- session.close();
- con.close();
- }
-
- public void onMessage(Message msg)
- {
- try
- {
- if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End"))
- {
- notifyCompletion(msg.getJMSReplyTo());
-
- synchronized (lock)
- {
- lock.notifyAll();
- }
- }
- else
- {
- rcvdTime = System.currentTimeMillis();
- rcvdMsgCount ++;
-
- if (rcvdMsgCount == 1)
- {
- startTime = rcvdTime;
- testStartTime = msg.getJMSTimestamp();
- }
-
- if (transacted && (rcvdMsgCount % transSize == 0))
- {
- session.commit();
- }
-
- long latency = rcvdTime - msg.getJMSTimestamp();
- maxLatency = Math.max(maxLatency, latency);
- minLatency = Math.min(minLatency, latency);
- totalLatency = totalLatency + latency;
- }
-
- }
- catch(Exception e)
- {
- handleError(e,"Error when receiving messages");
- }
-
- }
-
- public void test()
- {
- try
- {
- setUp();
- warmup();
- startTest();
- printResults();
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
- public static void main(String[] args)
- {
- PerfConsumer cons = new PerfConsumer();
- cons.test();
- }
-} \ No newline at end of file
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
deleted file mode 100644
index 757b1bfcda..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.perf;
-
-import javax.jms.BytesMessage;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-
-import org.apache.qpid.testkit.MessageFactory;
-
-/**
- * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation
- * from the consumer that it has successfully consumed them and ready to start the
- * test. It will start sending y no of messages and each message will contain a time
- * stamp. This will be used at the receiving end to measure the latency.
- *
- * This is done with the assumption that both consumer and producer are running on
- * the same machine or different machines which have time synced using a time server.
- *
- * This test also calculates the producer rate as follows.
- * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs)
- *
- * All throughput rates are given as msg/sec so the rates are multiplied by 1000.
- *
- * Rajith - Producer rate is not an accurate perf metric IMO.
- * It is heavily inlfuenced by any in memory buffering.
- * System throughput and latencies calculated by the PerfConsumer are more realistic
- * numbers.
- *
- */
-public class PerfProducer extends PerfBase
-{
- MessageProducer producer;
- Message msg;
- byte[] payload;
-
- public PerfProducer()
- {
- super();
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
- feedbackDest = session.createTemporaryQueue();
-
- // if message caching is enabled we pre create the message
- // else we pre create the payload
- if (params.isCacheMessage())
- {
- msg = MessageFactory.createBytesMessage(session, params.getMsgSize());
- msg.setJMSDeliveryMode(params.isDurable()?
- DeliveryMode.PERSISTENT :
- DeliveryMode.NON_PERSISTENT
- );
- }
- else
- {
- payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes();
- }
-
- producer = session.createProducer(dest);
- producer.setDisableMessageID(params.isDisableMessageID());
- producer.setDisableMessageTimestamp(params.isDisableTimestamp());
- }
-
- protected Message getNextMessage() throws Exception
- {
- if (params.isCacheMessage())
- {
- return msg;
- }
- else
- {
- msg = session.createBytesMessage();
- ((BytesMessage)msg).writeBytes(payload);
- return msg;
- }
- }
-
- public void warmup()throws Exception
- {
- System.out.println("Warming up......");
- MessageConsumer tmp = session.createConsumer(feedbackDest);
-
- for (int i=0; i < params.getWarmupCount() -1; i++)
- {
- producer.send(getNextMessage());
- }
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
- }
-
- public void startTest() throws Exception
- {
- System.out.println("Starting test......");
- int count = params.getMsgCount();
- boolean transacted = params.isTransacted();
- int tranSize = params.getTransactionSize();
-
- long start = System.currentTimeMillis();
- for(int i=0; i < count; i++ )
- {
- Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
- producer.send(msg);
- if ( transacted && ((i+1) % tranSize == 0))
- {
- session.commit();
- }
- }
- long time = System.currentTimeMillis() - start;
- double rate = ((double)count/(double)time)*1000;
- System.out.println(new StringBuilder("Producer rate: ").
- append(df.format(rate)).
- append(" msg/sec").
- toString());
- }
-
- public void waitForCompletion() throws Exception
- {
- MessageConsumer tmp = session.createConsumer(feedbackDest);
- Message msg = session.createTextMessage("End");
- msg.setJMSReplyTo(feedbackDest);
- producer.send(msg);
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.receive();
-
- if (params.isTransacted())
- {
- session.commit();
- }
-
- tmp.close();
- System.out.println("Consumer has completed the test......");
- }
-
- public void tearDown() throws Exception
- {
- producer.close();
- session.close();
- con.close();
- }
-
- public void test()
- {
- try
- {
- setUp();
- warmup();
- startTest();
- waitForCompletion();
- tearDown();
- }
- catch(Exception e)
- {
- handleError(e,"Error when running test");
- }
- }
-
-
- public static void main(String[] args)
- {
- PerfProducer prod = new PerfProducer();
- prod.test();
- }
-} \ No newline at end of file
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java b/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java
deleted file mode 100644
index 15142cfced..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.perf;
-
-import javax.jms.Session;
-
-public class TestParams
-{
- private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
-
- private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties";
-
- private String connectionFactory = "connectionFactory";
-
- private String transientDest = "transientQueue";
-
- private String durableDest = "durableQueue";
-
- private int msg_size = 512;
-
- private int msg_type = 1; // not used yet
-
- private boolean cacheMessage = true;
-
- private boolean disableMessageID = false;
-
- private boolean disableTimestamp = false;
-
- private boolean durable = false;
-
- private boolean transacted = false;
-
- private int transaction_size = 1000;
-
- private int ack_mode = Session.AUTO_ACKNOWLEDGE;
-
- private int msg_count = 10;
-
- private int warmup_count = 1;
-
- public TestParams()
- {
- initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory);
- providerURL = System.getProperty("java.naming.provider.url",providerURL);
-
- transientDest = System.getProperty("transDest",transientDest);
- durableDest = System.getProperty("durableDest",durableDest);
-
- msg_size = Integer.getInteger("msg_size", 512);
- msg_type = Integer.getInteger("msg_type",1);
- cacheMessage = Boolean.getBoolean("cache_msg");
- disableMessageID = Boolean.getBoolean("disableMessageID");
- disableTimestamp = Boolean.getBoolean("disableTimestamp");
- durable = Boolean.getBoolean("durable");
- transacted = Boolean.getBoolean("transacted");
- transaction_size = Integer.getInteger("trans_size",1000);
- ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE);
- msg_count = Integer.getInteger("msg_count",msg_count);
- warmup_count = Integer.getInteger("warmup_count",warmup_count);
- }
-
- public int getAckMode()
- {
- return ack_mode;
- }
-
- public String getConnectionFactory()
- {
- return connectionFactory;
- }
-
- public String getTransientDestination()
- {
- return transientDest;
- }
-
- public String getDurableDestination()
- {
- return durableDest;
- }
-
- public String getInitialContextFactory()
- {
- return initialContextFactory;
- }
-
- public int getMsgCount()
- {
- return msg_count;
- }
-
- public int getMsgSize()
- {
- return msg_size;
- }
-
- public int getMsgType()
- {
- return msg_type;
- }
-
- public boolean isDurable()
- {
- return durable;
- }
-
- public String getProviderURL()
- {
- return providerURL;
- }
-
- public boolean isTransacted()
- {
- return transacted;
- }
-
- public int getTransactionSize()
- {
- return transaction_size;
- }
-
- public int getWarmupCount()
- {
- return warmup_count;
- }
-
- public boolean isCacheMessage()
- {
- return cacheMessage;
- }
-
- public boolean isDisableMessageID()
- {
- return disableMessageID;
- }
-
- public boolean isDisableTimestamp()
- {
- return disableTimestamp;
- }
-
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java
deleted file mode 100644
index 0c3a17b3d8..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.soak;
-
-
-import java.text.DateFormat;
-import java.text.DecimalFormat;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.Session;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.testkit.MessageFactory;
-
-public class BaseTest
-{
- protected String host = "127.0.0.1";
- protected int msg_size = 100;
- protected int msg_count = 10;
- protected int session_count = 1;
- protected boolean durable = false;
- protected String queue_name = "message_queue";
- protected String exchange_name = "amq.direct";
- protected String routing_key = "routing_key";
- protected String contentType = "application/octet-stream";
- protected int port = 5672;
- protected String url;
- protected Message[] msgArray;
-
- protected AMQConnection con;
- protected Destination dest = null;
- protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss");
- protected NumberFormat nf = new DecimalFormat("##.00");
-
- public BaseTest()
- {
- host = System.getProperty("host", "127.0.0.1");
- port = Integer.getInteger("port", 5672);
- msg_size = Integer.getInteger("msg_size", 100);
- msg_count = Integer.getInteger("msg_count", 10);
- session_count = Integer.getInteger("session_count", 1);
- durable = Boolean.getBoolean("durable");
- queue_name = System.getProperty("queue_name", "message_queue");
- exchange_name = System.getProperty("exchange_name", "amq.direct");
- routing_key = System.getProperty("routing_key", "routing_key");
- contentType = System.getProperty("content_type","application/octet-stream");
-
-
-
- url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'";
- }
-
- public void setUp()
- {
- try
- {
- con = new AMQConnection(url);
- con.start();
-
-
- if (exchange_name.equals("amq.topic"))
- {
- dest = new AMQTopic(new AMQShortString(exchange_name),
- new AMQShortString(routing_key),
- false, //auto-delete
- null, //queue name
- durable);
- }
- else
- {
- dest = new AMQQueue(new AMQShortString(exchange_name),
- new AMQShortString(routing_key),
- new AMQShortString(queue_name),
- false, //exclusive
- false, //auto-delete
- durable);
- }
-
- // Create the session to setup the messages
- Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- if (msg_size == -1)
- {
- // This creates an array of 1000 messages from 500-1500 bytes
- // During the tests a message will be picked randomly
- msgArray = new Message[1000];
- for (int i = 0; i < 1000; i++)
- {
- Message msg = (contentType.equals("text/plain")) ?
- MessageFactory.createTextMessage(session,500 + i) :
- MessageFactory.createBytesMessage(session, 500 + i);
- msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- msgArray[i] = msg;
- }
- }
- else
- {
- Message msg = (contentType.equals("text/plain")) ?
- MessageFactory.createTextMessage(session, msg_size):
- MessageFactory.createBytesMessage(session, msg_size);
- msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- msgArray = new Message[]
- { msg };
- }
-
- session.close();
-
- }
- catch (Exception e)
- {
- handleError(e,"Error while setting up the test");
- }
- }
-
- public void handleError(Exception e,String msg)
- {
- StringBuilder sb = new StringBuilder();
- sb.append(msg);
- sb.append(" @ ");
- sb.append(df.format(new Date(System.currentTimeMillis())));
- sb.append(" ");
- sb.append(e.getMessage());
- System.err.println(sb.toString());
- e.printStackTrace();
- }
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
deleted file mode 100644
index a91d9e7e85..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.soak;
-
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-/**
- * Test Description
- * ================
- * The difference between this test and the
- * LongDurationConsumer is that each Session runs
- * in it's own Thread and the ability to receive
- * messages transactionally.
- *
- * All consumers will still share the same destination.
- *
- */
-public class MultiThreadedConsumer extends BaseTest
-{
- protected final boolean transacted;
-
- public MultiThreadedConsumer()
- {
- super();
- transacted = Boolean.getBoolean("transacted");
- // needed only to calculate throughput.
- // If msg_count is different set it via -Dmsg_count
- msg_count = 10;
- }
-
- /**
- * Creates a Session and a consumer that runs in its
- * own thread.
- * It can also consume transactionally.
- *
- */
- public void test()
- {
- try
- {
- for (int i = 0; i < session_count; i++)
- {
-
- final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Thread t = new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- MessageConsumer consumer = session.createConsumer(dest);
-
- consumer.setMessageListener(new MessageListener()
- {
-
- private boolean startIteration = true;
- private long startTime = 0;
- private long iterations = 0;
-
- public void onMessage(Message m)
- {
- try
- {
- long now = System.currentTimeMillis();
- if (startIteration)
- {
- startTime = m.getJMSTimestamp();
- startIteration = false;
- }
-
- if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
- {
- startIteration = true;
- long totalIterationTime = now - startTime;
- double throughput = ((double)msg_count/(double)totalIterationTime) * 1000;
- long latencySample = now - m.getJMSTimestamp();
- iterations++;
-
- StringBuilder sb = new StringBuilder();
- sb.append(iterations).append(",").
- append(nf.format(throughput)).append(",").append(latencySample);
-
- System.out.println(sb.toString());
-
- MessageProducer temp = session.createProducer(m.getJMSReplyTo());
- Message controlMsg = session.createTextMessage();
- temp.send(controlMsg);
- if (transacted)
- {
- session.commit();
- }
- temp.close();
- }
- }
- catch (JMSException e)
- {
- handleError(e,"Exception receiving messages");
- }
- }
- });
- }
- catch (Exception e)
- {
- handleError(e,"Exception creating a consumer");
- }
-
- }
-
- });
- t.setName("session-" + i);
- t.start();
- } // for loop
- }
- catch (Exception e)
- {
- handleError(e,"Exception while setting up the test");
- }
-
- }
-
- public static void main(String[] args)
- {
- MultiThreadedConsumer test = new MultiThreadedConsumer();
- test.setUp();
- test.test();
- }
-
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
deleted file mode 100644
index 279e5ea0bf..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.soak;
-
-
-import java.util.Random;
-import java.util.UUID;
-
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.framing.AMQShortString;
-
-/**
- * Test Description
- * ================
- *
- * This test creats x number of sessions, where each session
- * runs in it's own thread. Each session creates a producer
- * and it's own feedback queue.
- *
- * A producer will send n-1 messages, followed by the n-th
- * message which contains "End" in it's payload to signal
- * that this is the last message message in the sequence.
- * The end message has the feedback queue as it's replyTo.
- * It will then listen on the feedback queue waiting for the
- * confirmation and then sleeps for 1000 ms before proceeding
- * with the next n messages.
- *
- * This hand shaking mechanism ensures that all of the
- * messages sent are consumed by some consumer. This prevents
- * the producers from saturating the broker especially when
- * the consumers are slow.
- *
- * All producers send to a single destination
- * If using transactions it's best to use smaller message count
- * as the test only commits after sending all messages in a batch.
- *
- */
-
-public class MultiThreadedProducer extends SimpleProducer
-{
- protected final boolean transacted;
-
- public MultiThreadedProducer()
- {
- super();
- transacted = Boolean.getBoolean("transacted");
- }
-
- public void test()
- {
- try
- {
- final int msg_count_per_session = msg_count/session_count;
-
- for (int i = 0; i < session_count; i++)
- {
- final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
- Thread t = new Thread(new Runnable()
- {
- private Random gen = new Random();
-
- private Message getNextMessage()
- {
- if (msg_size == -1)
- {
- int index = gen.nextInt(1000);
- return msgArray[index];
- }
- else
- {
- return msgArray[0];
- }
- }
-
- public void run()
- {
- try
- {
- MessageProducer prod = session.createProducer(dest);
- // this will ensure that the producer will not overun the consumer.
- feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString(UUID
- .randomUUID().toString()), new AMQShortString("control"));
-
- MessageConsumer feedbackConsumer = session.createConsumer(feedbackQueue);
-
- while (true)
- {
- for (int i = 0; i < msg_count_per_session; i++)
- {
- Message msg = getNextMessage();
- msg.setJMSMessageID("ID:" + UUID.randomUUID());
- prod.send(msg);
- }
-
- TextMessage m = session.createTextMessage("End");
- m.setJMSReplyTo(feedbackQueue);
- prod.send(m);
-
- if (transacted)
- {
- session.commit();
- }
-
- System.out.println(df.format(System.currentTimeMillis()));
- feedbackConsumer.receive();
- if (transacted)
- {
- session.commit();
- }
- Thread.sleep(1000);
- }
-
- }
- catch (Exception e)
- {
- handleError(e,"Exception in producing message");
- }
-
- }
-
- });
- t.setName("session-" + i);
- t.start();
-
- }
-
- }
- catch (Exception e)
- {
- handleError(e,"Exception while setting up the test");
- }
-
- }
-
- public static void main(String[] args)
- {
- MultiThreadedProducer test = new MultiThreadedProducer();
- test.setUp();
- test.test();
- }
-
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
index c33f9ffbf2..c240ecdf2e 100644
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
+++ b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java
@@ -21,6 +21,8 @@
package org.apache.qpid.testkit.soak;
+import java.util.Random;
+
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
@@ -29,15 +31,21 @@ import javax.jms.Session;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageConsumer;
+import org.apache.qpid.client.BasicMessageProducer;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.testkit.TestLauncher;
+import org.apache.qpid.thread.Threading;
/**
* Test Description
* ================
* This test will open x number of connections where each
* connection will create a session and a producer/consumer pair,
- * and then send configurable no of messages.
- * It will them sleep for configurable time interval and
+ * and then a randomly selected set of connections (about 1/4th)
+ * will send a configurable no of messages and try to receive them.
+ * It will then sleep for configurable time interval and
* tear down the connections/sessions/consumers.
* It will then repeat the process again until the test is stopped.
*
@@ -46,16 +54,14 @@ import org.apache.qpid.framing.AMQShortString;
* To find if the broker has leaks when cleaning resources.
* To find if the client has leaks with resources.
*/
-public class ResourceLeakTest extends BaseTest
+public class ResourceLeakTest extends TestLauncher
{
- protected int connection_count = 10;
- protected long connection_idle_time = 5000;
-
+ /* protected long connection_idle_time = 5000;
+ protected Random rand = new Random();
+
public ResourceLeakTest()
{
- super();
- connection_count = Integer.getInteger("con_count",10);
- connection_idle_time = Long.getLong("con_idle_time", 5000);
+ super();
}
public void test()
@@ -67,13 +73,7 @@ public class ResourceLeakTest extends BaseTest
Session[] sessions = new Session[connection_count];
MessageConsumer[] msgCons = new MessageConsumer[connection_count];
MessageProducer [] msgProds = new MessageProducer[connection_count];
- Destination dest = new AMQQueue(new AMQShortString(exchange_name),
- new AMQShortString(routing_key),
- new AMQShortString(queue_name),
- true, //exclusive
- true // auto delete
- );
-
+
while (true)
{
for (int i = 0; i < connection_count; i++)
@@ -83,23 +83,36 @@ public class ResourceLeakTest extends BaseTest
cons[i] = con;
Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
sessions[i] = ssn;
+ Destination dest = new AMQQueue(new AMQShortString(exchange_name),
+ new AMQShortString(routing_key + i),
+ new AMQShortString(queue_name + i),
+ true, //exclusive
+ true // auto delete
+ );
MessageConsumer msgCon = ssn.createConsumer(dest);
msgCons[i] = msgCon;
MessageProducer msgProd = ssn.createProducer(dest);
msgProds[i] = msgProd;
-
- BytesMessage msg = ssn.createBytesMessage();
+ }
+
+ // Select some connections randomly and send/recv messages
+ // Exercise around quarter of the connections
+ for (int i=0; i < connection_count/4; i++)
+ {
+ int k = rand.nextInt(connection_count);
+
+ BytesMessage msg = sessions[k].createBytesMessage();
msg.writeBytes("Test Msg".getBytes());
for (int j = 0; j < msg_count;j++)
{
- msgProd.send(msg);
+ msgProds[k].send(msg);
}
int j = 0;
while (j < msg_count)
{
- msgCon.receive();
+ msgCons[k].receive();
j++;
}
}
@@ -110,10 +123,24 @@ public class ResourceLeakTest extends BaseTest
{
for (int i = 0; i < connection_count; i++)
{
- msgCons[i].close();
- msgProds[i].close();
- sessions[i].close();
- cons[i].close();
+ if (!((BasicMessageConsumer)msgCons[i]).isClosed())
+ {
+ msgCons[i].close();
+ }
+
+ if (!((BasicMessageProducer)msgProds[i]).isClosed())
+ {
+ msgProds[i].close();
+ }
+
+ if (!((AMQSession)sessions[i]).isClosed())
+ {
+ sessions[i].close();
+ }
+ if (!((AMQConnection)cons[i]).isClosed())
+ {
+ cons[i].close();
+ }
}
}
catch (Exception e)
@@ -131,8 +158,23 @@ public class ResourceLeakTest extends BaseTest
public static void main(String[] args)
{
- ResourceLeakTest test = new ResourceLeakTest();
- test.test();
- }
+ final ResourceLeakTest test = new ResourceLeakTest();
+ Runnable r = new Runnable(){
+ public void run()
+ {
+ test.test();
+ }
+ };
+
+ Thread t;
+ try
+ {
+ t = Threading.getThreadFactory().createThread(r);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating test thread",e);
+ }
+ }*/
}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
deleted file mode 100644
index b3eb97dafe..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.soak;
-
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-/**
- * Test Description
- * ================
- * This test will create x number of sessions.
- * Each session will have it's own consumer.
- * Once a consumer receives the "End" message it
- * will send a message to the destination indicated
- * by the replyTo field in the End message.
- * This will signal the producer that all the previous
- * messages have been consumed. The producer will
- * then start sending messages again.
- *
- * This prevents the producer from overruning the
- * consumer.
- * *
- * All consumers share a single destination
- *
- */
-
-public class SimpleConsumer extends BaseTest
-{
- public SimpleConsumer()
- {
- super();
- //needed only to calculate throughput.
- // If msg_count is different set it via -Dmsg_count
- msg_count = 10;
- }
-
- public void test()
- {
- try
- {
- final Session[] sessions = new Session[session_count];
- MessageConsumer[] cons = new MessageConsumer[session_count];
-
- for (int i = 0; i < session_count; i++)
- {
- sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- cons[i] = sessions[i].createConsumer(dest);
- cons[i].setMessageListener(new MessageListener()
- {
-
- private boolean startIteration = true;
- private long startTime = 0;
- private long iterations = 0;
-
- public void onMessage(Message m)
- {
- try
- {
- long now = System.currentTimeMillis();
- if (startIteration)
- {
- startTime = m.getJMSTimestamp();
- startIteration = false;
- }
-
- if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End"))
- {
-
- long totalIterationTime = now - startTime;
- startIteration = true;
- double throughput = ((double)msg_count/(double)totalIterationTime) * 1000;
- long latencySample = now - m.getJMSTimestamp();
- iterations++;
-
- StringBuilder sb = new StringBuilder();
- sb.append(iterations).append(",").
- append(nf.format(throughput)).append(",").append(latencySample);
-
- System.out.println(sb.toString());
-
- MessageProducer temp = sessions[0].createProducer(m.getJMSReplyTo());
- Message controlMsg = sessions[0].createTextMessage();
- temp.send(controlMsg);
- temp.close();
- }
- }
- catch (JMSException e)
- {
- handleError(e,"Exception when receiving the message");
- }
- }
- });
- }
-
- }
- catch (Exception e)
- {
- handleError(e,"Exception when setting up the consumers");
- }
-
- }
-
- public static void main(String[] args)
- {
- SimpleConsumer test = new SimpleConsumer();
- test.setUp();
- test.test();
- }
-
-}
diff --git a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
deleted file mode 100644
index 1080092536..0000000000
--- a/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.testkit.soak;
-
-
-import java.util.Random;
-import java.util.UUID;
-
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.framing.AMQShortString;
-
-/**
- * Test Description
- * ================
- * This test will send n-1 messages, followed by the n-th
- * message which contains "End" in it's payload to signal
- * that this is the last message message in the sequence.
- * The end message has the feedback queue as it's replyTo.
- * It will then listen on the feedback queue waiting for the
- * confirmation and then sleeps for 1000 ms before proceeding
- * with the next n messages.
- *
- * This hand shaking mechanism ensures that all of the
- * messages sent are consumed by some consumer. This prevents
- * the producers from saturating the broker especially when
- * the consumers are slow.
- *
- * It creates a producer per session.
- * If session_count is > 1 it will round robin the messages
- * btw the producers.
- *
- * All producers send to a single destination
- *
- */
-
-public class SimpleProducer extends BaseTest
-{
- protected Destination feedbackQueue;
- Random gen = new Random();
-
- public SimpleProducer()
- {
- super();
- }
-
- protected Message getNextMessage()
- {
- if (msg_size == -1)
- {
- int index = gen.nextInt(1000);
- return msgArray[index];
- }
- else
- {
- return msgArray[0];
- }
- }
-
- public void test()
- {
- try
- {
- Session[] sessions = new Session[session_count];
- MessageProducer[] prods = new MessageProducer[session_count];
-
- for (int i = 0; i < session_count; i++)
- {
- sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- prods[i] = sessions[i].createProducer(dest);
- }
-
- // this will ensure that the producer will not overun the consumer.
- feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"),
- new AMQShortString(UUID.randomUUID().toString()),
- new AMQShortString("control"));
-
- MessageConsumer feedbackConsumer = sessions[0].createConsumer(feedbackQueue);
-
- int prod_pointer = 0;
- boolean multi_session = session_count > 1 ? true : false;
-
- while (true)
- {
- for (int i = 0; i < msg_count - 1; i++)
- {
- Message msg = getNextMessage();
- msg.setJMSTimestamp(System.currentTimeMillis());
- prods[prod_pointer].send(msg);
- if (multi_session)
- {
- prod_pointer++;
- if (prod_pointer == session_count)
- {
- prod_pointer = 0;
- }
- }
- }
-
- TextMessage m = sessions[0].createTextMessage("End");
- m.setJMSReplyTo(feedbackQueue);
- prods[prod_pointer].send(m);
- System.out.println(df.format(System.currentTimeMillis()));
- feedbackConsumer.receive();
- Thread.sleep(1000);
- }
- }
- catch (Exception e)
- {
- handleError(e,"Exception while setting up the producer");
- }
-
- }
-
- public static void main(String[] args)
- {
- SimpleProducer test = new SimpleProducer();
- test.setUp();
- test.test();
- }
-
-}