From 014bfd8e935b09f42440fd5427cdf9db25dfe865 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Mon, 30 Jun 2008 16:35:41 +0000 Subject: This commit is related to QPID-1161. Please refer to the JIRA for complete details. In Summary this contains a simple test kit comprising of perf and soak tests. The focus is on producing a packaged set of tests that can be easily deployed on target environment. For Quick perf report for a particular release, please run perf_report.sh which will show results for 8 common use cases in a tabular format. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@672810 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/testkit/MessageFactory.java | 43 ++++ .../org/apache/qpid/testkit/perf/PerfBase.java | 102 +++++++++ .../org/apache/qpid/testkit/perf/PerfConsumer.java | 248 +++++++++++++++++++++ .../org/apache/qpid/testkit/perf/PerfProducer.java | 207 +++++++++++++++++ .../org/apache/qpid/testkit/perf/TestParams.java | 160 +++++++++++++ .../org/apache/qpid/testkit/soak/BaseTest.java | 149 +++++++++++++ .../qpid/testkit/soak/MultiThreadedConsumer.java | 129 +++++++++++ .../qpid/testkit/soak/MultiThreadedProducer.java | 167 ++++++++++++++ .../apache/qpid/testkit/soak/ResourceLeakTest.java | 137 ++++++++++++ .../apache/qpid/testkit/soak/SimpleConsumer.java | 109 +++++++++ .../apache/qpid/testkit/soak/SimpleProducer.java | 147 ++++++++++++ 11 files changed, 1598 insertions(+) create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java new file mode 100644 index 0000000000..f2784ef499 --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java @@ -0,0 +1,43 @@ +package org.apache.qpid.testkit; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.TextMessage; + +public class MessageFactory +{ + public static Message createBytesMessage(Session ssn, int size) throws JMSException + { + BytesMessage msg = ssn.createBytesMessage(); + msg.writeBytes(createMessagePayload(size).getBytes()); + return msg; + } + + public static Message createTextMessage(Session ssn, int size) throws JMSException + { + TextMessage msg = ssn.createTextMessage(); + msg.setText(createMessagePayload(size)); + return msg; + } + + public static String createMessagePayload(int size) + { + String msgData = "Qpid Test Message"; + + StringBuffer buf = new StringBuffer(size); + int count = 0; + while (count <= (size - msgData.length())) + { + buf.append(msgData); + count += msgData.length(); + } + if (count < size) + { + buf.append(msgData, 0, size - count); + } + + return buf.toString(); + } +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java new file mode 100644 index 0000000000..95670f0507 --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit.perf; + +import java.text.DecimalFormat; +import java.util.Hashtable; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; + +public class PerfBase +{ + TestParams params; + Connection con; + Session session; + Destination dest; + Destination feedbackDest; + DecimalFormat df = new DecimalFormat("###.##"); + + public PerfBase() + { + params = new TestParams(); + } + + public void setUp() throws Exception + { + Hashtable env = new Hashtable(); + env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory()); + env.put(Context.PROVIDER_URL, params.getProviderURL()); + + Context ctx = null; + try + { + ctx = new InitialContext(env); + } + catch(Exception e) + { + throw new Exception("Error initializing JNDI",e); + + } + + ConnectionFactory conFac = null; + try + { + conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory()); + } + catch(Exception e) + { + throw new Exception("Error looking up connection factory",e); + } + + con = conFac.createConnection(); + con.start(); + session = con.createSession(params.isTransacted(), + params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); + + try + { + dest = (Destination)ctx.lookup( params.isDurable()? + params.getDurableDestination(): + params.getTransientDestination() + ); + } + catch(Exception e) + { + throw new Exception("Error looking up destination",e); + } + } + + public void handleError(Exception e,String msg) + { + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + } +} + diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java new file mode 100644 index 0000000000..cd12c7010d --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java @@ -0,0 +1,248 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit.perf; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +/** + * PerfConsumer will receive x no of messages in warmup mode. + * Once it receives the Start message it will then signal the PerfProducer. + * It will start recording stats from the first message it receives after + * the warmup mode is done. + * + * The following calculations are done. + * The important numbers to look at is + * a) Avg Latency + * b) System throughput. + * + * Latency. + * ========= + * Currently this test is written with the assumption that either + * a) The Perf Producer and Consumer are on the same machine + * b) They are on separate machines that have their time synced via a Time Server + * + * In order to calculate latency the producer inserts a timestamp + * hen the message is sent. The consumer will note the current time the message is + * received and will calculate the latency as follows + * latency = rcvdTime - msg.getJMSTimestamp() + * + * Through out the test it will keep track of the max and min latency to show the + * variance in latencies. + * + * Avg latency is measured by adding all latencies and dividing by the total msgs. + * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount + * + * Throughput + * =========== + * System throughput is calculated as follows + * rcvdMsgCount/(rcvdTime - testStartTime) + * + * Consumer rate is calculated as + * rcvdMsgCount/(rcvdTime - startTime) + * + * Note that the testStartTime referes to when the producer sent the first message + * and startTime is when the consumer first received a message. + * + * rcvdTime keeps track of when the last message is received. + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + */ + +public class PerfConsumer extends PerfBase implements MessageListener +{ + MessageConsumer consumer; + long maxLatency = 0; + long minLatency = Long.MAX_VALUE; + long totalLatency = 0; // to calculate avg latency. + int rcvdMsgCount = 0; + long testStartTime = 0; // to measure system throughput + long startTime = 0; // to measure consumer throughput + long rcvdTime = 0; + boolean transacted = false; + int transSize = 0; + + final Object lock = new Object(); + + public PerfConsumer() + { + super(); + } + + public void setUp() throws Exception + { + super.setUp(); + consumer = session.createConsumer(dest); + + // Storing the following two for efficiency + transacted = params.isTransacted(); + transSize = params.getTransactionSize(); + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + + boolean start = false; + while (!start) + { + Message msg = consumer.receive(); + if (msg instanceof TextMessage) + { + if (((TextMessage)msg).getText().equals("End")) + { + start = true; + MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); + temp.send(session.createMessage()); + if (params.isTransacted()) + { + session.commit(); + } + temp.close(); + } + } + } + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + consumer.setMessageListener(this); + } + + public void printResults() throws Exception + { + synchronized (lock) + { + lock.wait(); + } + + double avgLatency = (double)totalLatency/(double)rcvdMsgCount; + double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; + double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; + System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); + System.out.println(new StringBuilder("Consumer rate : "). + append(df.format(consRate)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("System Throughput : "). + append(df.format(throughput)). + append(" msg/sec").toString()); + System.out.println(new StringBuilder("Avg Latency : "). + append(df.format(avgLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min Latency : "). + append(minLatency). + append(" ms").toString()); + System.out.println(new StringBuilder("Max Latency : "). + append(maxLatency). + append(" ms").toString()); + System.out.println("Completed the test......\n"); + } + + public void notifyCompletion(Destination replyTo) throws Exception + { + MessageProducer tmp = session.createProducer(replyTo); + Message endMsg = session.createMessage(); + tmp.send(endMsg); + if (params.isTransacted()) + { + session.commit(); + } + tmp.close(); + } + + public void tearDown() throws Exception + { + consumer.close(); + session.close(); + con.close(); + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) + { + notifyCompletion(msg.getJMSReplyTo()); + + synchronized (lock) + { + lock.notifyAll(); + } + } + else + { + rcvdTime = System.currentTimeMillis(); + rcvdMsgCount ++; + + if (rcvdMsgCount == 1) + { + startTime = rcvdTime; + testStartTime = msg.getJMSTimestamp(); + } + + if (transacted && (rcvdMsgCount % transSize == 0)) + { + session.commit(); + } + + long latency = rcvdTime - msg.getJMSTimestamp(); + maxLatency = Math.max(maxLatency, latency); + minLatency = Math.min(minLatency, latency); + totalLatency = totalLatency + latency; + } + + } + catch(Exception e) + { + handleError(e,"Error when receiving messages"); + } + + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + printResults(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + public static void main(String[] args) + { + PerfConsumer cons = new PerfConsumer(); + cons.test(); + } +} \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java new file mode 100644 index 0000000000..757b1bfcda --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java @@ -0,0 +1,207 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit.perf; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; + +import org.apache.qpid.testkit.MessageFactory; + +/** + * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y no of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * This is done with the assumption that both consumer and producer are running on + * the same machine or different machines which have time synced using a time server. + * + * This test also calculates the producer rate as follows. + * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) + * + * All throughput rates are given as msg/sec so the rates are multiplied by 1000. + * + * Rajith - Producer rate is not an accurate perf metric IMO. + * It is heavily inlfuenced by any in memory buffering. + * System throughput and latencies calculated by the PerfConsumer are more realistic + * numbers. + * + */ +public class PerfProducer extends PerfBase +{ + MessageProducer producer; + Message msg; + byte[] payload; + + public PerfProducer() + { + super(); + } + + public void setUp() throws Exception + { + super.setUp(); + feedbackDest = session.createTemporaryQueue(); + + // if message caching is enabled we pre create the message + // else we pre create the payload + if (params.isCacheMessage()) + { + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg.setJMSDeliveryMode(params.isDurable()? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else + { + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + } + + producer = session.createProducer(dest); + producer.setDisableMessageID(params.isDisableMessageID()); + producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + } + + protected Message getNextMessage() throws Exception + { + if (params.isCacheMessage()) + { + return msg; + } + else + { + msg = session.createBytesMessage(); + ((BytesMessage)msg).writeBytes(payload); + return msg; + } + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + MessageConsumer tmp = session.createConsumer(feedbackDest); + + for (int i=0; i < params.getWarmupCount() -1; i++) + { + producer.send(getNextMessage()); + } + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + int count = params.getMsgCount(); + boolean transacted = params.isTransacted(); + int tranSize = params.getTransactionSize(); + + long start = System.currentTimeMillis(); + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + msg.setJMSTimestamp(System.currentTimeMillis()); + producer.send(msg); + if ( transacted && ((i+1) % tranSize == 0)) + { + session.commit(); + } + } + long time = System.currentTimeMillis() - start; + double rate = ((double)count/(double)time)*1000; + System.out.println(new StringBuilder("Producer rate: "). + append(df.format(rate)). + append(" msg/sec"). + toString()); + } + + public void waitForCompletion() throws Exception + { + MessageConsumer tmp = session.createConsumer(feedbackDest); + Message msg = session.createTextMessage("End"); + msg.setJMSReplyTo(feedbackDest); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.receive(); + + if (params.isTransacted()) + { + session.commit(); + } + + tmp.close(); + System.out.println("Consumer has completed the test......"); + } + + public void tearDown() throws Exception + { + producer.close(); + session.close(); + con.close(); + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + waitForCompletion(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + + public static void main(String[] args) + { + PerfProducer prod = new PerfProducer(); + prod.test(); + } +} \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java new file mode 100644 index 0000000000..15142cfced --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java @@ -0,0 +1,160 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit.perf; + +import javax.jms.Session; + +public class TestParams +{ + private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties"; + + private String connectionFactory = "connectionFactory"; + + private String transientDest = "transientQueue"; + + private String durableDest = "durableQueue"; + + private int msg_size = 512; + + private int msg_type = 1; // not used yet + + private boolean cacheMessage = true; + + private boolean disableMessageID = false; + + private boolean disableTimestamp = false; + + private boolean durable = false; + + private boolean transacted = false; + + private int transaction_size = 1000; + + private int ack_mode = Session.AUTO_ACKNOWLEDGE; + + private int msg_count = 10; + + private int warmup_count = 1; + + public TestParams() + { + initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory); + providerURL = System.getProperty("java.naming.provider.url",providerURL); + + transientDest = System.getProperty("transDest",transientDest); + durableDest = System.getProperty("durableDest",durableDest); + + msg_size = Integer.getInteger("msg_size", 512); + msg_type = Integer.getInteger("msg_type",1); + cacheMessage = Boolean.getBoolean("cache_msg"); + disableMessageID = Boolean.getBoolean("disableMessageID"); + disableTimestamp = Boolean.getBoolean("disableTimestamp"); + durable = Boolean.getBoolean("durable"); + transacted = Boolean.getBoolean("transacted"); + transaction_size = Integer.getInteger("trans_size",1000); + ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); + msg_count = Integer.getInteger("msg_count",msg_count); + warmup_count = Integer.getInteger("warmup_count",warmup_count); + } + + public int getAckMode() + { + return ack_mode; + } + + public String getConnectionFactory() + { + return connectionFactory; + } + + public String getTransientDestination() + { + return transientDest; + } + + public String getDurableDestination() + { + return durableDest; + } + + public String getInitialContextFactory() + { + return initialContextFactory; + } + + public int getMsgCount() + { + return msg_count; + } + + public int getMsgSize() + { + return msg_size; + } + + public int getMsgType() + { + return msg_type; + } + + public boolean isDurable() + { + return durable; + } + + public String getProviderURL() + { + return providerURL; + } + + public boolean isTransacted() + { + return transacted; + } + + public int getTransactionSize() + { + return transaction_size; + } + + public int getWarmupCount() + { + return warmup_count; + } + + public boolean isCacheMessage() + { + return cacheMessage; + } + + public boolean isDisableMessageID() + { + return disableMessageID; + } + + public boolean isDisableTimestamp() + { + return disableTimestamp; + } + +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java new file mode 100644 index 0000000000..be8c4bbc75 --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit.soak; + + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.Session; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.testkit.MessageFactory; + +public class BaseTest +{ + protected String host = "127.0.0.1"; + protected int msg_size = 100; + protected int msg_count = 10; + protected int session_count = 1; + protected boolean durable = false; + protected String queue_name = "message_queue"; + protected String exchange_name = "amq.direct"; + protected String routing_key = "routing_key"; + protected String contentType = "application/octet-stream"; + protected int port = 5672; + protected String url; + protected Message[] msgArray; + + protected AMQConnection con; + protected Destination dest = null; + protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + + public BaseTest() + { + host = System.getProperty("host", "127.0.0.1"); + port = Integer.getInteger("port", 5672); + msg_size = Integer.getInteger("msg_size", 100); + msg_count = Integer.getInteger("msg_count", 10); + session_count = Integer.getInteger("session_count", 1); + durable = Boolean.getBoolean("durable"); + queue_name = System.getProperty("queue_name", "message_queue"); + exchange_name = System.getProperty("exchange_name", "amq.direct"); + routing_key = System.getProperty("routing_key", "routing_key"); + contentType = System.getProperty("content_type","application/octet-stream"); + + + + url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'"; + } + + public void setUp() + { + try + { + con = new AMQConnection(url); + con.start(); + + + if (exchange_name.equals("amq.topic")) + { + dest = new AMQTopic(new AMQShortString(exchange_name), + new AMQShortString(routing_key), + false, //auto-delete + null, //queue name + durable); + } + else + { + dest = new AMQQueue(new AMQShortString(exchange_name), + new AMQShortString(routing_key), + new AMQShortString(queue_name), + false, //exclusive + false, //auto-delete + durable); + } + + // Create the session to setup the messages + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (msg_size == -1) + { + // This creates an array of 1000 messages from 500-1500 bytes + // During the tests a message will be picked randomly + msgArray = new Message[1000]; + for (int i = 0; i < 1000; i++) + { + Message msg = (contentType.equals("text/plain")) ? + MessageFactory.createTextMessage(session,500 + i) : + MessageFactory.createBytesMessage(session, 500 + i); + msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + msgArray[i] = msg; + } + } + else + { + Message msg = (contentType.equals("text/plain")) ? + MessageFactory.createTextMessage(session, msg_size): + MessageFactory.createBytesMessage(session, msg_size); + msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); + msgArray = new Message[] + { msg }; + } + + session.close(); + + } + catch (Exception e) + { + handleError(e,"Error while setting up the test"); + } + } + + public void handleError(Exception e,String msg) + { + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" @ "); + sb.append(df.format(new Date(System.currentTimeMillis()))); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + } +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java new file mode 100644 index 0000000000..3117d268a3 --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit.soak; + + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * Test Description + * ================ + * The difference between this test and the + * LongDurationConsumer is that each Session runs + * in it's own Thread and the ability to receive + * messages transactionally. + * + * All consumers will still share the same destination. + * + */ +public class MultiThreadedConsumer extends BaseTest +{ + protected final boolean transacted; + + public MultiThreadedConsumer() + { + super(); + transacted = Boolean.getBoolean("transacted"); + } + + /** + * Creates a Session and a consumer that runs in its + * own thread. + * It can also consume transactionally. + * + */ + public void test() + { + try + { + for (int i = 0; i < session_count; i++) + { + + final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Thread t = new Thread(new Runnable() + { + public void run() + { + try + { + MessageConsumer consumer = session.createConsumer(dest); + + consumer.setMessageListener(new MessageListener() + { + + public void onMessage(Message m) + { + try + { + String payload = ((TextMessage) m).getText(); + if (payload.equals("End")) + { + System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis()); + MessageProducer temp = session.createProducer(m.getJMSReplyTo()); + Message controlMsg = session.createTextMessage(); + temp.send(controlMsg); + if (transacted) + { + session.commit(); + } + temp.close(); + } + } + catch (JMSException e) + { + handleError(e,"Exception receiving messages"); + } + } + }); + } + catch (Exception e) + { + handleError(e,"Exception creating a consumer"); + } + + } + + }); + t.setName("session-" + i); + t.start(); + } // for loop + } + catch (Exception e) + { + handleError(e,"Exception while setting up the test"); + } + + } + + public static void main(String[] args) + { + MultiThreadedConsumer test = new MultiThreadedConsumer(); + test.setUp(); + test.test(); + } + +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java new file mode 100644 index 0000000000..886c64bb81 --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java @@ -0,0 +1,167 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit.soak; + + +import java.util.Random; +import java.util.UUID; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.framing.AMQShortString; + +/** + * Test Description + * ================ + * + * This test creats x number of sessions, where each session + * runs in it's own thread. Each session creates a producer + * and it's own feedback queue. + * + * A producer will send n-1 messages, followed by the n-th + * message which contains "End" in it's payload to signal + * that this is the last message message in the sequence. + * The end message has the feedback queue as it's replyTo. + * It will then listen on the feedback queue waiting for the + * confirmation and then sleeps for 1000 ms before proceeding + * with the next n messages. + * + * This hand shaking mechanism ensures that all of the + * messages sent are consumed by some consumer. This prevents + * the producers from saturating the broker especially when + * the consumers are slow. + * + * All producers send to a single destination + * If using transactions it's best to use smaller message count + * as the test only commits after sending all messages in a batch. + * + */ + +public class MultiThreadedProducer extends SimpleProducer +{ + protected final boolean transacted; + + public MultiThreadedProducer() + { + super(); + transacted = Boolean.getBoolean("transacted"); + } + + public void test() + { + try + { + final int msg_count_per_session = msg_count/session_count; + + for (int i = 0; i < session_count; i++) + { + final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + Thread t = new Thread(new Runnable() + { + private Random gen = new Random(); + + private Message getNextMessage() + { + if (msg_size == -1) + { + int index = gen.nextInt(1000); + return msgArray[index]; + } + else + { + return msgArray[0]; + } + } + + public void run() + { + try + { + MessageProducer prod = session.createProducer(dest); + // this will ensure that the producer will not overun the consumer. + feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString(UUID + .randomUUID().toString()), new AMQShortString("control")); + + MessageConsumer feedbackConsumer = session.createConsumer(feedbackQueue); + + while (true) + { + for (int i = 0; i < msg_count_per_session; i++) + { + Message msg = getNextMessage(); + msg.setJMSMessageID("ID:" + UUID.randomUUID()); + prod.send(msg); + } + + TextMessage m = session.createTextMessage("End"); + m.setJMSMessageID("ID:" + UUID.randomUUID()); + m.setJMSReplyTo(feedbackQueue); + prod.send(m); + + if (transacted) + { + session.commit(); + } + + System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis()); + feedbackConsumer.receive(); + if (transacted) + { + session.commit(); + } + Thread.sleep(1000); + } + + } + catch (Exception e) + { + handleError(e,"Exception in producing message"); + } + + } + + }); + t.setName("session-" + i); + t.start(); + + } + + } + catch (Exception e) + { + handleError(e,"Exception while setting up the test"); + } + + } + + public static void main(String[] args) + { + MultiThreadedProducer test = new MultiThreadedProducer(); + test.setUp(); + test.test(); + } + +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java new file mode 100644 index 0000000000..1faa9be864 --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java @@ -0,0 +1,137 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit.soak; + + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.framing.AMQShortString; + +/** + * Test Description + * ================ + * This test will open x number of connections where each + * connection will create a session and a producer/consumer pair, + * and then send configurable no of messages. + * It will them sleep for configurable time interval and + * tear down the connections/sessions/consumers. + * It will then repeat the process again until the test is stopped. + * + * Purpose of the test + * =================== + * To find if the broker has leaks when cleaning resources. + * To find if the client has leaks with resources. + */ +public class ResourceLeakTest extends BaseTest +{ + protected int connection_count = 10; + protected long connection_idle_time = 5000; + + public ResourceLeakTest() + { + super(); + connection_count = Integer.getInteger("con_count",10); + connection_idle_time = Long.getLong("con_idle_time", 5000); + } + + public void test() + { + try + { + + AMQConnection[] cons = new AMQConnection[connection_count]; + Session[] sessions = new Session[connection_count]; + MessageConsumer[] msgCons = new MessageConsumer[connection_count]; + MessageProducer [] msgProds = new MessageProducer[connection_count]; + Destination dest = new AMQQueue(new AMQShortString(exchange_name), + new AMQShortString(routing_key), + new AMQShortString(queue_name), + true, //exclusive + true // auto delete + ); + + while (true) + { + for (int i = 0; i < connection_count; i++) + { + AMQConnection con = new AMQConnection(url); + con.start(); + cons[i] = con; + Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + sessions[i] = ssn; + MessageConsumer msgCon = ssn.createConsumer(dest); + msgCons[i] = msgCon; + MessageProducer msgProd = ssn.createProducer(dest); + msgProds[i] = msgProd; + + BytesMessage msg = ssn.createBytesMessage(); + msg.writeBytes("Test Msg".getBytes()); + + for (int j = 0; j < msg_count;j++) + { + msgProd.send(msg); + } + + int j = 0; + while (j < msg_count) + { + msgCon.receive(); + j++; + } + } + Thread.sleep(connection_idle_time); + + try + { + for (int i = 0; i < connection_count; i++) + { + msgCons[i].close(); + msgProds[i].close(); + sessions[i].close(); + cons[i].close(); + } + } + catch (Exception e) + { + handleError(e,"Exception closing resources"); + } + } + } + catch (Exception e) + { + handleError(e,"Exception in setting up the test"); + } + + } + + public static void main(String[] args) + { + ResourceLeakTest test = new ResourceLeakTest(); + test.test(); + } + +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java new file mode 100644 index 0000000000..5ef72d7538 --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit.soak; + + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * Test Description + * ================ + * This test will create x number of sessions. + * Each session will have it's own consumer. + * Once a consumer receives the "End" message it + * will send a message to the destination indicated + * by the replyTo field in the End message. + * This will signal the producer that all the previous + * messages have been consumed. The producer will + * then start sending messages again. + * + * This prevents the producer from overruning the + * consumer. + * * + * All consumers share a single destination + * + */ + +public class SimpleConsumer extends BaseTest +{ + public SimpleConsumer() + { + super(); + } + + public void test() + { + try + { + final Session[] sessions = new Session[session_count]; + MessageConsumer[] cons = new MessageConsumer[session_count]; + + for (int i = 0; i < session_count; i++) + { + sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + cons[i] = sessions[i].createConsumer(dest); + cons[i].setMessageListener(new MessageListener() + { + + public void onMessage(Message m) + { + try + { + String payload = ((TextMessage) m).getText(); + if (payload.equals("End")) + { + System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis()); + MessageProducer temp = sessions[0].createProducer(m.getJMSReplyTo()); + Message controlMsg = sessions[0].createTextMessage(); + temp.send(controlMsg); + temp.close(); + } + } + catch (JMSException e) + { + handleError(e,"Exception when receiving the message"); + } + } + }); + } + + } + catch (Exception e) + { + handleError(e,"Exception when setting up the consumers"); + } + + } + + public static void main(String[] args) + { + SimpleConsumer test = new SimpleConsumer(); + test.setUp(); + test.test(); + } + +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java new file mode 100644 index 0000000000..bdae79fd41 --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java @@ -0,0 +1,147 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit.soak; + + +import java.util.Random; +import java.util.UUID; + +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.framing.AMQShortString; + +/** + * Test Description + * ================ + * This test will send n-1 messages, followed by the n-th + * message which contains "End" in it's payload to signal + * that this is the last message message in the sequence. + * The end message has the feedback queue as it's replyTo. + * It will then listen on the feedback queue waiting for the + * confirmation and then sleeps for 1000 ms before proceeding + * with the next n messages. + * + * This hand shaking mechanism ensures that all of the + * messages sent are consumed by some consumer. This prevents + * the producers from saturating the broker especially when + * the consumers are slow. + * + * It creates a producer per session. + * If session_count is > 1 it will round robin the messages + * btw the producers. + * + * All producers send to a single destination + * + */ + +public class SimpleProducer extends BaseTest +{ + protected Destination feedbackQueue; + Random gen = new Random(); + + public SimpleProducer() + { + super(); + } + + protected Message getNextMessage() + { + if (msg_size == -1) + { + int index = gen.nextInt(1000); + return msgArray[index]; + } + else + { + return msgArray[0]; + } + } + + public void test() + { + try + { + Session[] sessions = new Session[session_count]; + MessageProducer[] prods = new MessageProducer[session_count]; + + for (int i = 0; i < session_count; i++) + { + sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + prods[i] = sessions[i].createProducer(dest); + } + + // this will ensure that the producer will not overun the consumer. + feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"), + new AMQShortString(UUID.randomUUID().toString()), + new AMQShortString("control")); + + MessageConsumer feedbackConsumer = sessions[0].createConsumer(feedbackQueue); + + int prod_pointer = 0; + boolean multi_session = session_count > 1 ? true : false; + + while (true) + { + for (int i = 0; i < msg_count - 1; i++) + { + Message msg = getNextMessage(); + msg.setJMSMessageID("ID:" + UUID.randomUUID()); + prods[prod_pointer].send(msg); + if (multi_session) + { + prod_pointer++; + if (prod_pointer == session_count) + { + prod_pointer = 0; + } + } + } + + TextMessage m = sessions[0].createTextMessage("End"); + m.setJMSMessageID("ID:" + UUID.randomUUID()); + m.setJMSReplyTo(feedbackQueue); + prods[prod_pointer].send(m); + System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis()); + feedbackConsumer.receive(); + Thread.sleep(1000); + } + } + catch (Exception e) + { + handleError(e,"Exception while setting up the producer"); + } + + } + + public static void main(String[] args) + { + SimpleProducer test = new SimpleProducer(); + test.setUp(); + test.test(); + } + +} -- cgit v1.2.1 From 9018a2fd309658a4b2fa10078a9efb463060da1b Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Mon, 7 Jul 2008 18:01:37 +0000 Subject: This is related to QPId-1161. Modified the soak tests to print latency samples and throughput rates for every iteration. Added run_soak_client.sh soak_report.sh as an example of how to use soak test and produce a report. Modified other scripts to add comments. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@674569 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/testkit/soak/BaseTest.java | 3 +++ .../qpid/testkit/soak/MultiThreadedConsumer.java | 28 ++++++++++++++++++--- .../qpid/testkit/soak/MultiThreadedProducer.java | 3 +-- .../apache/qpid/testkit/soak/ResourceLeakTest.java | 1 + .../apache/qpid/testkit/soak/SimpleConsumer.java | 29 +++++++++++++++++++--- .../apache/qpid/testkit/soak/SimpleProducer.java | 5 ++-- 6 files changed, 58 insertions(+), 11 deletions(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java index be8c4bbc75..0c3a17b3d8 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java @@ -22,6 +22,8 @@ package org.apache.qpid.testkit.soak; import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; import java.text.SimpleDateFormat; import java.util.Date; @@ -54,6 +56,7 @@ public class BaseTest protected AMQConnection con; protected Destination dest = null; protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + protected NumberFormat nf = new DecimalFormat("##.00"); public BaseTest() { diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java index 3117d268a3..a44760be46 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java @@ -48,6 +48,9 @@ public class MultiThreadedConsumer extends BaseTest { super(); transacted = Boolean.getBoolean("transacted"); + // needed only to calculate throughput. + // If msg_count is different set it via -Dmsg_count + msg_count = 10; } /** @@ -75,14 +78,33 @@ public class MultiThreadedConsumer extends BaseTest consumer.setMessageListener(new MessageListener() { + private boolean startIteration = true; + private long startTime = 0; + public void onMessage(Message m) { try { - String payload = ((TextMessage) m).getText(); - if (payload.equals("End")) + long now = System.currentTimeMillis(); + if (startIteration) + { + startTime = m.getJMSTimestamp(); + startIteration = false; + } + + if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) { - System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis()); + startIteration = true; + long totalIterationTime = now - startTime; + double throughput = ((double)msg_count/(double)totalIterationTime) * 1000; + long latencySample = now - m.getJMSTimestamp(); + + StringBuilder sb = new StringBuilder(); + sb.append(m.getJMSMessageID()).append(","). + append(nf.format(throughput)).append(",").append(latencySample); + + System.out.println(sb.toString()); + MessageProducer temp = session.createProducer(m.getJMSReplyTo()); Message controlMsg = session.createTextMessage(); temp.send(controlMsg); diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java index 886c64bb81..279e5ea0bf 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java @@ -117,7 +117,6 @@ public class MultiThreadedProducer extends SimpleProducer } TextMessage m = session.createTextMessage("End"); - m.setJMSMessageID("ID:" + UUID.randomUUID()); m.setJMSReplyTo(feedbackQueue); prod.send(m); @@ -126,7 +125,7 @@ public class MultiThreadedProducer extends SimpleProducer session.commit(); } - System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis()); + System.out.println(df.format(System.currentTimeMillis())); feedbackConsumer.receive(); if (transacted) { diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java index 1faa9be864..c33f9ffbf2 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java @@ -103,6 +103,7 @@ public class ResourceLeakTest extends BaseTest j++; } } + System.out.println(df.format(System.currentTimeMillis())); Thread.sleep(connection_idle_time); try diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java index 5ef72d7538..d353e44816 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java @@ -53,6 +53,9 @@ public class SimpleConsumer extends BaseTest public SimpleConsumer() { super(); + //needed only to calculate throughput. + // If msg_count is different set it via -Dmsg_count + msg_count = 10; } public void test() @@ -69,14 +72,34 @@ public class SimpleConsumer extends BaseTest cons[i].setMessageListener(new MessageListener() { + private boolean startIteration = true; + private long startTime = 0; + public void onMessage(Message m) { try { - String payload = ((TextMessage) m).getText(); - if (payload.equals("End")) + long now = System.currentTimeMillis(); + if (startIteration) + { + startTime = m.getJMSTimestamp(); + startIteration = false; + } + + if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) { - System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis()); + + long totalIterationTime = now - startTime; + startIteration = true; + double throughput = ((double)msg_count/(double)totalIterationTime) * 1000; + long latencySample = now - m.getJMSTimestamp(); + + StringBuilder sb = new StringBuilder(); + sb.append(m.getJMSMessageID()).append(","). + append(nf.format(throughput)).append(",").append(latencySample); + + System.out.println(sb.toString()); + MessageProducer temp = sessions[0].createProducer(m.getJMSReplyTo()); Message controlMsg = sessions[0].createTextMessage(); temp.send(controlMsg); diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java index bdae79fd41..1080092536 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java @@ -109,7 +109,7 @@ public class SimpleProducer extends BaseTest for (int i = 0; i < msg_count - 1; i++) { Message msg = getNextMessage(); - msg.setJMSMessageID("ID:" + UUID.randomUUID()); + msg.setJMSTimestamp(System.currentTimeMillis()); prods[prod_pointer].send(msg); if (multi_session) { @@ -122,10 +122,9 @@ public class SimpleProducer extends BaseTest } TextMessage m = sessions[0].createTextMessage("End"); - m.setJMSMessageID("ID:" + UUID.randomUUID()); m.setJMSReplyTo(feedbackQueue); prods[prod_pointer].send(m); - System.out.println(m.getJMSMessageID() + "," + System.currentTimeMillis()); + System.out.println(df.format(System.currentTimeMillis())); feedbackConsumer.receive(); Thread.sleep(1000); } -- cgit v1.2.1 From 6e92b10d74de7c8acd9803e6488c13673ceadd40 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Mon, 7 Jul 2008 20:30:40 +0000 Subject: This is related to QPID-1162 Added a README file to describe what the tests are and how they can be run. Modified to consumers to print the iteration number instead of the message id. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@674622 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java | 4 +++- .../src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java index a44760be46..a91d9e7e85 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java @@ -80,6 +80,7 @@ public class MultiThreadedConsumer extends BaseTest private boolean startIteration = true; private long startTime = 0; + private long iterations = 0; public void onMessage(Message m) { @@ -98,9 +99,10 @@ public class MultiThreadedConsumer extends BaseTest long totalIterationTime = now - startTime; double throughput = ((double)msg_count/(double)totalIterationTime) * 1000; long latencySample = now - m.getJMSTimestamp(); + iterations++; StringBuilder sb = new StringBuilder(); - sb.append(m.getJMSMessageID()).append(","). + sb.append(iterations).append(","). append(nf.format(throughput)).append(",").append(latencySample); System.out.println(sb.toString()); diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java index d353e44816..b3eb97dafe 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java @@ -74,6 +74,7 @@ public class SimpleConsumer extends BaseTest private boolean startIteration = true; private long startTime = 0; + private long iterations = 0; public void onMessage(Message m) { @@ -93,9 +94,10 @@ public class SimpleConsumer extends BaseTest startIteration = true; double throughput = ((double)msg_count/(double)totalIterationTime) * 1000; long latencySample = now - m.getJMSTimestamp(); + iterations++; StringBuilder sb = new StringBuilder(); - sb.append(m.getJMSMessageID()).append(","). + sb.append(iterations).append(","). append(nf.format(throughput)).append(",").append(latencySample); System.out.println(sb.toString()); -- cgit v1.2.1 From 720bc5b1e76bc150e30a41789ae5ca529a03725e Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Thu, 28 Aug 2008 19:26:51 +0000 Subject: Add ASL to everywhere, to everything. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@689937 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/testkit/MessageFactory.java | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java index f2784ef499..8b7b7fa434 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java @@ -1,4 +1,25 @@ package org.apache.qpid.testkit; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + import javax.jms.BytesMessage; import javax.jms.JMSException; -- cgit v1.2.1 From 386fc2f8dc103ae078c98e3fe5bcdfb7842f27de Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 23 Oct 2008 00:06:43 +0000 Subject: This is related to QPID-1161 The objective of the latency test is to get latency sample after the system achives steady state. It sends X messages in warmup mode (and waits for confirmation) before sending Y messages which it uses to takes measurements. It measures std dev, min, max and avg latency. The value for X should be sufficiently large to ensure that the system is in steady state to get a more acurate representation. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@707232 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/testkit/perf/LatencyTest.java | 332 +++++++++++++++++++++ 1 file changed, 332 insertions(+) create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java new file mode 100644 index 0000000000..35a2374fbc --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java @@ -0,0 +1,332 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit.perf; + +import java.io.FileOutputStream; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.testkit.MessageFactory; + +/** + * Latency test sends an x number of messages in warmup mode and wait for a confirmation + * from the consumer that it has successfully consumed them and ready to start the + * test. It will start sending y number of messages and each message will contain a time + * stamp. This will be used at the receiving end to measure the latency. + * + * It is important to have a sufficiently large number for the warmup count to + * ensure the system is in steady state before the test is started. + * + * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000) + * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1 + * + * The idea is to get a latency sample for the system once it achieves steady state. + * + */ + +public class LatencyTest extends PerfBase implements MessageListener +{ + MessageProducer producer; + MessageConsumer consumer; + Message msg; + byte[] payload; + long maxLatency = 0; + long minLatency = Long.MAX_VALUE; + long totalLatency = 0; // to calculate avg latency. + int rcvdMsgCount = 0; + double stdDev = 0; + double avgLatency = 0; + boolean warmup_mode = true; + boolean transacted = false; + int transSize = 0; + + final List latencies; + final Lock lock = new ReentrantLock(); + final Condition warmedUp; + final Condition testCompleted; + + public LatencyTest() + { + super(); + warmedUp = lock.newCondition(); + testCompleted = lock.newCondition(); + // Storing the following two for efficiency + transacted = params.isTransacted(); + transSize = params.getTransactionSize(); + latencies = new ArrayList (params.getMsgCount()); + } + + public void setUp() throws Exception + { + super.setUp(); + consumer = session.createConsumer(dest); + consumer.setMessageListener(this); + + // if message caching is enabled we pre create the message + // else we pre create the payload + if (params.isCacheMessage()) + { + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); + msg.setJMSDeliveryMode(params.isDurable()? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); + } + else + { + payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); + } + + producer = session.createProducer(dest); + producer.setDisableMessageID(params.isDisableMessageID()); + producer.setDisableMessageTimestamp(params.isDisableTimestamp()); + } + + protected Message getNextMessage() throws Exception + { + if (params.isCacheMessage()) + { + return msg; + } + else + { + msg = session.createBytesMessage(); + ((BytesMessage)msg).writeBytes(payload); + return msg; + } + } + + public void warmup()throws Exception + { + System.out.println("Warming up......"); + int count = params.getWarmupCount(); + for (int i=0; i < count; i++) + { + producer.send(getNextMessage()); + } + Message msg = session.createTextMessage("End"); + producer.send(msg); + + if (params.isTransacted()) + { + session.commit(); + } + + try + { + lock.lock(); + warmedUp.await(); + } + finally + { + lock.unlock(); + } + } + + public void onMessage(Message msg) + { + try + { + if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) + { + if (warmup_mode) + { + warmup_mode = false; + try + { + lock.lock(); + warmedUp.signal(); + } + finally + { + lock.unlock(); + } + } + else + { + computeStats(); + } + } + else if (!warmup_mode) + { + long time = System.currentTimeMillis(); + rcvdMsgCount ++; + + if (transacted && (rcvdMsgCount % transSize == 0)) + { + session.commit(); + } + + long latency = time - msg.getJMSTimestamp(); + latencies.add(latency); + totalLatency = totalLatency + latency; + } + + } + catch(Exception e) + { + handleError(e,"Error when receiving messages"); + } + + } + + private void computeStats() + { + avgLatency = (double)totalLatency/(double)rcvdMsgCount; + double sigma = 0; + + for (long latency: latencies) + { + maxLatency = Math.max(maxLatency, latency); + minLatency = Math.min(minLatency, latency); + sigma = sigma + Math.pow(latency - avgLatency,2); + } + + stdDev = Math.sqrt(sigma/(rcvdMsgCount -1)); + + try + { + lock.lock(); + testCompleted.signal(); + } + finally + { + lock.unlock(); + } + } + + public void writeToFile() throws Exception + { + String fileName = System.getProperty("file"); + PrintWriter writer = new PrintWriter(new FileOutputStream(fileName)); + for (long latency: latencies) + { + writer.println(String.valueOf(latency)); + } + writer.flush(); + writer.close(); + } + + public void printToConsole() + { + System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); + System.out.println(new StringBuilder("Standard Deviation : "). + append(df.format(stdDev)). + append(" ms").toString()); + System.out.println(new StringBuilder("Avg Latency : "). + append(df.format(avgLatency)). + append(" ms").toString()); + System.out.println(new StringBuilder("Min Latency : "). + append(minLatency). + append(" ms").toString()); + System.out.println(new StringBuilder("Max Latency : "). + append(maxLatency). + append(" ms").toString()); + System.out.println("Completed the test......\n"); + } + + public void startTest() throws Exception + { + System.out.println("Starting test......"); + int count = params.getMsgCount(); + + for(int i=0; i < count; i++ ) + { + Message msg = getNextMessage(); + msg.setJMSTimestamp(System.currentTimeMillis()); + producer.send(msg); + if ( transacted && ((i+1) % transSize == 0)) + { + session.commit(); + } + } + Message msg = session.createTextMessage("End"); + producer.send(msg); + if (params.isTransacted()) + { + session.commit(); + } + } + + public void tearDown() throws Exception + { + try + { + lock.lock(); + testCompleted.await(); + } + finally + { + lock.unlock(); + } + + producer.close(); + consumer.close(); + session.close(); + con.close(); + } + + public void test() + { + try + { + setUp(); + warmup(); + startTest(); + tearDown(); + } + catch(Exception e) + { + handleError(e,"Error when running test"); + } + } + + + public static void main(String[] args) + { + LatencyTest latencyTest = new LatencyTest(); + latencyTest.test(); + latencyTest.printToConsole(); + if (System.getProperty("file") != null) + { + try + { + latencyTest.writeToFile(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + } +} \ No newline at end of file -- cgit v1.2.1 From fbeb3752a902f5cbf225dd9fa4c6f00dbcbc3a68 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 21 Nov 2008 16:32:53 +0000 Subject: This is related to QPID-1479. For starters I have changed the IoSender.java IoReceiver.java and AMQSession.java#Dispatcher to use the Thread factory to create the threads they require. The ThreadFactory has two implimentations, the default being the java.lang.Threads. The other is the RealtimeThreadFactory which uses reflection to create threads with a specific priority. -Dqpid.thread_factory= will decide which thread factory should be loaded. -Dqpid.rt_thread_priority= specifies the gloabl real time thread priority and defaults to 20. You could also set individual thread priorities by adding the nessacery config+code changes. I have also changed the Testkit and QpidBench to use the Thread factory so you could use them for testing/benchmarking work on RT JVMs. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@719628 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/testkit/perf/LatencyTest.java | 38 ++++++++++++++++------ .../org/apache/qpid/testkit/perf/PerfConsumer.java | 23 +++++++++++-- .../org/apache/qpid/testkit/perf/PerfProducer.java | 22 +++++++++++-- .../qpid/testkit/soak/MultiThreadedConsumer.java | 17 ++++++++-- .../qpid/testkit/soak/MultiThreadedProducer.java | 14 ++++++-- .../apache/qpid/testkit/soak/ResourceLeakTest.java | 20 ++++++++++-- .../apache/qpid/testkit/soak/SimpleConsumer.java | 23 +++++++++++-- .../apache/qpid/testkit/soak/SimpleProducer.java | 22 +++++++++++-- 8 files changed, 153 insertions(+), 26 deletions(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java index 35a2374fbc..4a4f3d124b 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java @@ -37,6 +37,7 @@ import javax.jms.MessageProducer; import javax.jms.TextMessage; import org.apache.qpid.testkit.MessageFactory; +import org.apache.qpid.thread.Threading; /** * Latency test sends an x number of messages in warmup mode and wait for a confirmation @@ -314,19 +315,36 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - LatencyTest latencyTest = new LatencyTest(); - latencyTest.test(); - latencyTest.printToConsole(); - if (System.getProperty("file") != null) + final LatencyTest latencyTest = new LatencyTest(); + Runnable r = new Runnable() { - try + public void run() { - latencyTest.writeToFile(); - } - catch(Exception e) - { - e.printStackTrace(); + latencyTest.test(); + latencyTest.printToConsole(); + if (System.getProperty("file") != null) + { + try + { + latencyTest.writeToFile(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating latency test thread",e); } + t.start(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java index cd12c7010d..9781a7e839 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java @@ -27,6 +27,8 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * PerfConsumer will receive x no of messages in warmup mode. * Once it receives the Start message it will then signal the PerfProducer. @@ -242,7 +244,24 @@ public class PerfConsumer extends PerfBase implements MessageListener public static void main(String[] args) { - PerfConsumer cons = new PerfConsumer(); - cons.test(); + final PerfConsumer cons = new PerfConsumer(); + Runnable r = new Runnable() + { + public void run() + { + cons.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java index 757b1bfcda..e9421d7f22 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java @@ -27,6 +27,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import org.apache.qpid.testkit.MessageFactory; +import org.apache.qpid.thread.Threading; /** * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation @@ -201,7 +202,24 @@ public class PerfProducer extends PerfBase public static void main(String[] args) { - PerfProducer prod = new PerfProducer(); - prod.test(); + final PerfProducer prod = new PerfProducer(); + Runnable r = new Runnable() + { + public void run() + { + prod.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java index a91d9e7e85..d5514873e6 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java @@ -29,6 +29,8 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * Test Description * ================ @@ -67,7 +69,7 @@ public class MultiThreadedConsumer extends BaseTest { final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Thread t = new Thread(new Runnable() + Runnable r = new Runnable() { public void run() { @@ -131,7 +133,18 @@ public class MultiThreadedConsumer extends BaseTest } - }); + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.setName("session-" + i); t.start(); } // for loop diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java index 279e5ea0bf..1cf4ee28ca 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java @@ -32,6 +32,7 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -79,7 +80,7 @@ public class MultiThreadedProducer extends SimpleProducer for (int i = 0; i < session_count; i++) { final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - Thread t = new Thread(new Runnable() + Runnable r = new Runnable() { private Random gen = new Random(); @@ -142,7 +143,16 @@ public class MultiThreadedProducer extends SimpleProducer } - }); + }; + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } t.setName("session-" + i); t.start(); diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java index c33f9ffbf2..1ae2c35970 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java @@ -30,6 +30,7 @@ import javax.jms.Session; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -131,8 +132,23 @@ public class ResourceLeakTest extends BaseTest public static void main(String[] args) { - ResourceLeakTest test = new ResourceLeakTest(); - test.test(); + final ResourceLeakTest test = new ResourceLeakTest(); + Runnable r = new Runnable(){ + public void run() + { + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating test thread",e); + } } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java index b3eb97dafe..cd6d9013f8 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java @@ -29,6 +29,8 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * Test Description * ================ @@ -126,9 +128,24 @@ public class SimpleConsumer extends BaseTest public static void main(String[] args) { - SimpleConsumer test = new SimpleConsumer(); - test.setUp(); - test.test(); + final SimpleConsumer test = new SimpleConsumer(); + Runnable r = new Runnable(){ + public void run() + { + test.setUp(); + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java index 1080092536..805ce7ac29 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java @@ -33,6 +33,7 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -138,9 +139,24 @@ public class SimpleProducer extends BaseTest public static void main(String[] args) { - SimpleProducer test = new SimpleProducer(); - test.setUp(); - test.test(); + final SimpleProducer test = new SimpleProducer(); + Runnable r = new Runnable(){ + public void run() + { + test.setUp(); + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } } } -- cgit v1.2.1 From 8417094a3f0c28fef298d57db5616854458b7a8b Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 21 Nov 2008 17:57:16 +0000 Subject: Appologies for the sudden checkin without notice, close to the release cycle. Reverting the changes back. Will attach a patch and commit after the release. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@719657 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/testkit/perf/LatencyTest.java | 38 ++++++---------------- .../org/apache/qpid/testkit/perf/PerfConsumer.java | 23 ++----------- .../org/apache/qpid/testkit/perf/PerfProducer.java | 22 ++----------- .../qpid/testkit/soak/MultiThreadedConsumer.java | 17 ++-------- .../qpid/testkit/soak/MultiThreadedProducer.java | 14 ++------ .../apache/qpid/testkit/soak/ResourceLeakTest.java | 20 ++---------- .../apache/qpid/testkit/soak/SimpleConsumer.java | 23 ++----------- .../apache/qpid/testkit/soak/SimpleProducer.java | 22 ++----------- 8 files changed, 26 insertions(+), 153 deletions(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java index 4a4f3d124b..35a2374fbc 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java @@ -37,7 +37,6 @@ import javax.jms.MessageProducer; import javax.jms.TextMessage; import org.apache.qpid.testkit.MessageFactory; -import org.apache.qpid.thread.Threading; /** * Latency test sends an x number of messages in warmup mode and wait for a confirmation @@ -315,36 +314,19 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - final LatencyTest latencyTest = new LatencyTest(); - Runnable r = new Runnable() + LatencyTest latencyTest = new LatencyTest(); + latencyTest.test(); + latencyTest.printToConsole(); + if (System.getProperty("file") != null) { - public void run() + try { - latencyTest.test(); - latencyTest.printToConsole(); - if (System.getProperty("file") != null) - { - try - { - latencyTest.writeToFile(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } + latencyTest.writeToFile(); + } + catch(Exception e) + { + e.printStackTrace(); } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating latency test thread",e); } - t.start(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java index 9781a7e839..cd12c7010d 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java @@ -27,8 +27,6 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.TextMessage; -import org.apache.qpid.thread.Threading; - /** * PerfConsumer will receive x no of messages in warmup mode. * Once it receives the Start message it will then signal the PerfProducer. @@ -244,24 +242,7 @@ public class PerfConsumer extends PerfBase implements MessageListener public static void main(String[] args) { - final PerfConsumer cons = new PerfConsumer(); - Runnable r = new Runnable() - { - public void run() - { - cons.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); + PerfConsumer cons = new PerfConsumer(); + cons.test(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java index e9421d7f22..757b1bfcda 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java @@ -27,7 +27,6 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import org.apache.qpid.testkit.MessageFactory; -import org.apache.qpid.thread.Threading; /** * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation @@ -202,24 +201,7 @@ public class PerfProducer extends PerfBase public static void main(String[] args) { - final PerfProducer prod = new PerfProducer(); - Runnable r = new Runnable() - { - public void run() - { - prod.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); + PerfProducer prod = new PerfProducer(); + prod.test(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java index d5514873e6..a91d9e7e85 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java @@ -29,8 +29,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import org.apache.qpid.thread.Threading; - /** * Test Description * ================ @@ -69,7 +67,7 @@ public class MultiThreadedConsumer extends BaseTest { final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Runnable r = new Runnable() + Thread t = new Thread(new Runnable() { public void run() { @@ -133,18 +131,7 @@ public class MultiThreadedConsumer extends BaseTest } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - + }); t.setName("session-" + i); t.start(); } // for loop diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java index 1cf4ee28ca..279e5ea0bf 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java @@ -32,7 +32,6 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; /** * Test Description @@ -80,7 +79,7 @@ public class MultiThreadedProducer extends SimpleProducer for (int i = 0; i < session_count; i++) { final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - Runnable r = new Runnable() + Thread t = new Thread(new Runnable() { private Random gen = new Random(); @@ -143,16 +142,7 @@ public class MultiThreadedProducer extends SimpleProducer } - }; - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } + }); t.setName("session-" + i); t.start(); diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java index 1ae2c35970..c33f9ffbf2 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java @@ -30,7 +30,6 @@ import javax.jms.Session; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; /** * Test Description @@ -132,23 +131,8 @@ public class ResourceLeakTest extends BaseTest public static void main(String[] args) { - final ResourceLeakTest test = new ResourceLeakTest(); - Runnable r = new Runnable(){ - public void run() - { - test.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating test thread",e); - } + ResourceLeakTest test = new ResourceLeakTest(); + test.test(); } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java index cd6d9013f8..b3eb97dafe 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java @@ -29,8 +29,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import org.apache.qpid.thread.Threading; - /** * Test Description * ================ @@ -128,24 +126,9 @@ public class SimpleConsumer extends BaseTest public static void main(String[] args) { - final SimpleConsumer test = new SimpleConsumer(); - Runnable r = new Runnable(){ - public void run() - { - test.setUp(); - test.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } + SimpleConsumer test = new SimpleConsumer(); + test.setUp(); + test.test(); } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java index 805ce7ac29..1080092536 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java @@ -33,7 +33,6 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; /** * Test Description @@ -139,24 +138,9 @@ public class SimpleProducer extends BaseTest public static void main(String[] args) { - final SimpleProducer test = new SimpleProducer(); - Runnable r = new Runnable(){ - public void run() - { - test.setUp(); - test.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } + SimpleProducer test = new SimpleProducer(); + test.setUp(); + test.test(); } } -- cgit v1.2.1 From fd19e27856105247b669d52a492bfc0ab2beec28 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Tue, 13 Jan 2009 18:29:41 +0000 Subject: This is related to QPID-1479 This commit contains themodifications done to the perf test classes to use the thread abstraction patch. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@734212 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/testkit/perf/LatencyTest.java | 38 ++++++++++++++++------ .../org/apache/qpid/testkit/perf/PerfConsumer.java | 23 +++++++++++-- .../org/apache/qpid/testkit/perf/PerfProducer.java | 22 +++++++++++-- .../qpid/testkit/soak/MultiThreadedConsumer.java | 17 ++++++++-- .../qpid/testkit/soak/MultiThreadedProducer.java | 14 ++++++-- .../apache/qpid/testkit/soak/ResourceLeakTest.java | 20 ++++++++++-- .../apache/qpid/testkit/soak/SimpleConsumer.java | 23 +++++++++++-- .../apache/qpid/testkit/soak/SimpleProducer.java | 22 +++++++++++-- 8 files changed, 153 insertions(+), 26 deletions(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java index 35a2374fbc..4a4f3d124b 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java @@ -37,6 +37,7 @@ import javax.jms.MessageProducer; import javax.jms.TextMessage; import org.apache.qpid.testkit.MessageFactory; +import org.apache.qpid.thread.Threading; /** * Latency test sends an x number of messages in warmup mode and wait for a confirmation @@ -314,19 +315,36 @@ public class LatencyTest extends PerfBase implements MessageListener public static void main(String[] args) { - LatencyTest latencyTest = new LatencyTest(); - latencyTest.test(); - latencyTest.printToConsole(); - if (System.getProperty("file") != null) + final LatencyTest latencyTest = new LatencyTest(); + Runnable r = new Runnable() { - try + public void run() { - latencyTest.writeToFile(); - } - catch(Exception e) - { - e.printStackTrace(); + latencyTest.test(); + latencyTest.printToConsole(); + if (System.getProperty("file") != null) + { + try + { + latencyTest.writeToFile(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating latency test thread",e); } + t.start(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java index cd12c7010d..9781a7e839 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java @@ -27,6 +27,8 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * PerfConsumer will receive x no of messages in warmup mode. * Once it receives the Start message it will then signal the PerfProducer. @@ -242,7 +244,24 @@ public class PerfConsumer extends PerfBase implements MessageListener public static void main(String[] args) { - PerfConsumer cons = new PerfConsumer(); - cons.test(); + final PerfConsumer cons = new PerfConsumer(); + Runnable r = new Runnable() + { + public void run() + { + cons.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.start(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java index 757b1bfcda..e9421d7f22 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java @@ -27,6 +27,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import org.apache.qpid.testkit.MessageFactory; +import org.apache.qpid.thread.Threading; /** * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation @@ -201,7 +202,24 @@ public class PerfProducer extends PerfBase public static void main(String[] args) { - PerfProducer prod = new PerfProducer(); - prod.test(); + final PerfProducer prod = new PerfProducer(); + Runnable r = new Runnable() + { + public void run() + { + prod.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } + t.start(); } } \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java index a91d9e7e85..d5514873e6 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java @@ -29,6 +29,8 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * Test Description * ================ @@ -67,7 +69,7 @@ public class MultiThreadedConsumer extends BaseTest { final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Thread t = new Thread(new Runnable() + Runnable r = new Runnable() { public void run() { @@ -131,7 +133,18 @@ public class MultiThreadedConsumer extends BaseTest } - }); + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } + t.setName("session-" + i); t.start(); } // for loop diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java index 279e5ea0bf..1cf4ee28ca 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java @@ -32,6 +32,7 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -79,7 +80,7 @@ public class MultiThreadedProducer extends SimpleProducer for (int i = 0; i < session_count; i++) { final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - Thread t = new Thread(new Runnable() + Runnable r = new Runnable() { private Random gen = new Random(); @@ -142,7 +143,16 @@ public class MultiThreadedProducer extends SimpleProducer } - }); + }; + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } t.setName("session-" + i); t.start(); diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java index c33f9ffbf2..1ae2c35970 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java @@ -30,6 +30,7 @@ import javax.jms.Session; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -131,8 +132,23 @@ public class ResourceLeakTest extends BaseTest public static void main(String[] args) { - ResourceLeakTest test = new ResourceLeakTest(); - test.test(); + final ResourceLeakTest test = new ResourceLeakTest(); + Runnable r = new Runnable(){ + public void run() + { + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating test thread",e); + } } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java index b3eb97dafe..cd6d9013f8 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java @@ -29,6 +29,8 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.qpid.thread.Threading; + /** * Test Description * ================ @@ -126,9 +128,24 @@ public class SimpleConsumer extends BaseTest public static void main(String[] args) { - SimpleConsumer test = new SimpleConsumer(); - test.setUp(); - test.test(); + final SimpleConsumer test = new SimpleConsumer(); + Runnable r = new Runnable(){ + public void run() + { + test.setUp(); + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating consumer thread",e); + } } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java index 1080092536..805ce7ac29 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java @@ -33,6 +33,7 @@ import javax.jms.TextMessage; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; /** * Test Description @@ -138,9 +139,24 @@ public class SimpleProducer extends BaseTest public static void main(String[] args) { - SimpleProducer test = new SimpleProducer(); - test.setUp(); - test.test(); + final SimpleProducer test = new SimpleProducer(); + Runnable r = new Runnable(){ + public void run() + { + test.setUp(); + test.test(); + } + }; + + Thread t; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + throw new Error("Error creating producer thread",e); + } } } -- cgit v1.2.1 From 79c769a7d87655ac6663b89efe3bcb243facdd71 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 7 May 2009 03:16:03 +0000 Subject: Modified the default message size to 1024 bytes git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@772491 13f79535-47bb-0310-9956-ffa450edef68 --- .../testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java index 15142cfced..c5082282c6 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java @@ -34,7 +34,7 @@ public class TestParams private String durableDest = "durableQueue"; - private int msg_size = 512; + private int msg_size = 1024; private int msg_type = 1; // not used yet -- cgit v1.2.1 From 5e863a609ccacb8d36af160417ae4bad5b491acb Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 7 May 2009 03:19:10 +0000 Subject: Modified the default message size to 1024 - I missed the default value for the system property in the previous commit git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@772492 13f79535-47bb-0310-9956-ffa450edef68 --- .../testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java index c5082282c6..924fa24fa5 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java @@ -64,7 +64,7 @@ public class TestParams transientDest = System.getProperty("transDest",transientDest); durableDest = System.getProperty("durableDest",durableDest); - msg_size = Integer.getInteger("msg_size", 512); + msg_size = Integer.getInteger("msg_size", 1024); msg_type = Integer.getInteger("msg_type",1); cacheMessage = Boolean.getBoolean("cache_msg"); disableMessageID = Boolean.getBoolean("disableMessageID"); -- cgit v1.2.1 From c4e2827dfacd28e79a5c14d011ce49cb3a3757b4 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Sat, 23 May 2009 13:35:38 +0000 Subject: The default value for message cache is set to false. Added the functionality to support variable message sizes using -Drandom_msg_size=true. This will create an array of payloads from 1 byte to x bytes where x is configured by -Dmsg_size=x (default is 1024). The Random number generator works of the same seed so different test runs can be compared properly. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@777922 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/testkit/perf/PerfProducer.java | 48 +++++++++++++++++++--- .../org/apache/qpid/testkit/perf/TestParams.java | 10 ++++- 2 files changed, 52 insertions(+), 6 deletions(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java index e9421d7f22..30885ab461 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.testkit.perf; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.Message; @@ -54,7 +58,13 @@ public class PerfProducer extends PerfBase MessageProducer producer; Message msg; byte[] payload; - + List payloads; + boolean cacheMsg = false; + boolean randomMsgSize = false; + boolean durable = false; + Random random; + int msgSizeRange = 1024; + public PerfProducer() { super(); @@ -65,16 +75,32 @@ public class PerfProducer extends PerfBase super.setUp(); feedbackDest = session.createTemporaryQueue(); + durable = params.isDurable(); + // if message caching is enabled we pre create the message // else we pre create the payload if (params.isCacheMessage()) { + cacheMsg = true; + msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); - msg.setJMSDeliveryMode(params.isDurable()? + msg.setJMSDeliveryMode(durable? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT ); } + else if (params.isRandomMsgSize()) + { + random = new Random(20080921); + randomMsgSize = true; + msgSizeRange = params.getMsgSize(); + payloads = new ArrayList(msgSizeRange); + + for (int i=1; i < msgSizeRange; i++) + { + payloads.add(MessageFactory.createMessagePayload(i).getBytes()); + } + } else { payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); @@ -87,14 +113,26 @@ public class PerfProducer extends PerfBase protected Message getNextMessage() throws Exception { - if (params.isCacheMessage()) + if (cacheMsg) { return msg; } else - { + { msg = session.createBytesMessage(); - ((BytesMessage)msg).writeBytes(payload); + + if (!randomMsgSize) + { + ((BytesMessage)msg).writeBytes(payload); + } + else + { + ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); + } + msg.setJMSDeliveryMode(durable? + DeliveryMode.PERSISTENT : + DeliveryMode.NON_PERSISTENT + ); return msg; } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java index 924fa24fa5..2612af36e1 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java @@ -38,7 +38,7 @@ public class TestParams private int msg_type = 1; // not used yet - private boolean cacheMessage = true; + private boolean cacheMessage = false; private boolean disableMessageID = false; @@ -55,6 +55,8 @@ public class TestParams private int msg_count = 10; private int warmup_count = 1; + + private boolean random_msg_size = false; public TestParams() { @@ -75,6 +77,7 @@ public class TestParams ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); msg_count = Integer.getInteger("msg_count",msg_count); warmup_count = Integer.getInteger("warmup_count",warmup_count); + random_msg_size = Boolean.getBoolean("random_msg_size"); } public int getAckMode() @@ -156,5 +159,10 @@ public class TestParams { return disableTimestamp; } + + public boolean isRandomMsgSize() + { + return random_msg_size; + } } -- cgit v1.2.1 From 88136d8696689d7478a5daa7d1c0b2b89c78d93b Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Sun, 24 May 2009 13:57:20 +0000 Subject: Fixed a minor bug in pre-creating the payload array used in random msg size test. It now creates an array of payloads starting from zero bytes (empty payload) to msg_size-1 bytes. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@778150 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java index 30885ab461..d84c2a43e5 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java @@ -96,10 +96,14 @@ public class PerfProducer extends PerfBase msgSizeRange = params.getMsgSize(); payloads = new ArrayList(msgSizeRange); - for (int i=1; i < msgSizeRange; i++) + for (int i=0; i < msgSizeRange; i++) { payloads.add(MessageFactory.createMessagePayload(i).getBytes()); } + + System.out.println("Payload size " + payloads.size()); + System.out.println("Payload min size " + payloads.get(0).length); + System.out.println("Payload max size " + payloads.get(1023).length); } else { -- cgit v1.2.1 From 293f80d50fc7624beb50303120fcdedf1ae5ba0a Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 27 May 2009 15:30:20 +0000 Subject: Removed extra system out statements from PerfProducer.java Added more test cases to the perf report Uncommented and added clientid and subscription props to the testTopicD destination. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@779212 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java | 4 ---- 1 file changed, 4 deletions(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java index d84c2a43e5..62392e0e83 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java @@ -100,10 +100,6 @@ public class PerfProducer extends PerfBase { payloads.add(MessageFactory.createMessagePayload(i).getBytes()); } - - System.out.println("Payload size " + payloads.size()); - System.out.println("Payload min size " + payloads.get(0).length); - System.out.println("Payload max size " + payloads.get(1023).length); } else { -- cgit v1.2.1 From b81852ece48447a8b00e3fd2bafe66b044f2bf80 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 11 Nov 2009 00:13:13 +0000 Subject: Removed the following files as they will be replaced by a generic Sender and Receiver. The bin files and the files under o/a/qpid/teskit/perf are moved under tools. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@834721 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/testkit/perf/LatencyTest.java | 350 --------------------- .../org/apache/qpid/testkit/perf/PerfBase.java | 102 ------ .../org/apache/qpid/testkit/perf/PerfConsumer.java | 267 ---------------- .../org/apache/qpid/testkit/perf/PerfProducer.java | 263 ---------------- .../org/apache/qpid/testkit/perf/TestParams.java | 168 ---------- .../org/apache/qpid/testkit/soak/BaseTest.java | 152 --------- .../qpid/testkit/soak/MultiThreadedConsumer.java | 166 ---------- .../qpid/testkit/soak/MultiThreadedProducer.java | 176 ----------- .../apache/qpid/testkit/soak/ResourceLeakTest.java | 76 +++-- .../apache/qpid/testkit/soak/SimpleConsumer.java | 151 --------- .../apache/qpid/testkit/soak/SimpleProducer.java | 162 ---------- 11 files changed, 51 insertions(+), 1982 deletions(-) delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java deleted file mode 100644 index 4a4f3d124b..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/LatencyTest.java +++ /dev/null @@ -1,350 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.perf; - -import java.io.FileOutputStream; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.testkit.MessageFactory; -import org.apache.qpid.thread.Threading; - -/** - * Latency test sends an x number of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y number of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * It is important to have a sufficiently large number for the warmup count to - * ensure the system is in steady state before the test is started. - * - * If you plan to plot the latencies then msg_count should be a smaller number (ex 500 or 1000) - * You also need to specify a file name using -Dfile=/home/rajith/latency.log.1 - * - * The idea is to get a latency sample for the system once it achieves steady state. - * - */ - -public class LatencyTest extends PerfBase implements MessageListener -{ - MessageProducer producer; - MessageConsumer consumer; - Message msg; - byte[] payload; - long maxLatency = 0; - long minLatency = Long.MAX_VALUE; - long totalLatency = 0; // to calculate avg latency. - int rcvdMsgCount = 0; - double stdDev = 0; - double avgLatency = 0; - boolean warmup_mode = true; - boolean transacted = false; - int transSize = 0; - - final List latencies; - final Lock lock = new ReentrantLock(); - final Condition warmedUp; - final Condition testCompleted; - - public LatencyTest() - { - super(); - warmedUp = lock.newCondition(); - testCompleted = lock.newCondition(); - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - latencies = new ArrayList (params.getMsgCount()); - } - - public void setUp() throws Exception - { - super.setUp(); - consumer = session.createConsumer(dest); - consumer.setMessageListener(this); - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); - msg.setJMSDeliveryMode(params.isDurable()? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else - { - payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); - } - - producer = session.createProducer(dest); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - } - - protected Message getNextMessage() throws Exception - { - if (params.isCacheMessage()) - { - return msg; - } - else - { - msg = session.createBytesMessage(); - ((BytesMessage)msg).writeBytes(payload); - return msg; - } - } - - public void warmup()throws Exception - { - System.out.println("Warming up......"); - int count = params.getWarmupCount(); - for (int i=0; i < count; i++) - { - producer.send(getNextMessage()); - } - Message msg = session.createTextMessage("End"); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - try - { - lock.lock(); - warmedUp.await(); - } - finally - { - lock.unlock(); - } - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) - { - if (warmup_mode) - { - warmup_mode = false; - try - { - lock.lock(); - warmedUp.signal(); - } - finally - { - lock.unlock(); - } - } - else - { - computeStats(); - } - } - else if (!warmup_mode) - { - long time = System.currentTimeMillis(); - rcvdMsgCount ++; - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = time - msg.getJMSTimestamp(); - latencies.add(latency); - totalLatency = totalLatency + latency; - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - - private void computeStats() - { - avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double sigma = 0; - - for (long latency: latencies) - { - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - sigma = sigma + Math.pow(latency - avgLatency,2); - } - - stdDev = Math.sqrt(sigma/(rcvdMsgCount -1)); - - try - { - lock.lock(); - testCompleted.signal(); - } - finally - { - lock.unlock(); - } - } - - public void writeToFile() throws Exception - { - String fileName = System.getProperty("file"); - PrintWriter writer = new PrintWriter(new FileOutputStream(fileName)); - for (long latency: latencies) - { - writer.println(String.valueOf(latency)); - } - writer.flush(); - writer.close(); - } - - public void printToConsole() - { - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Standard Deviation : "). - append(df.format(stdDev)). - append(" ms").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(minLatency). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(maxLatency). - append(" ms").toString()); - System.out.println("Completed the test......\n"); - } - - public void startTest() throws Exception - { - System.out.println("Starting test......"); - int count = params.getMsgCount(); - - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); - producer.send(msg); - if ( transacted && ((i+1) % transSize == 0)) - { - session.commit(); - } - } - Message msg = session.createTextMessage("End"); - producer.send(msg); - if (params.isTransacted()) - { - session.commit(); - } - } - - public void tearDown() throws Exception - { - try - { - lock.lock(); - testCompleted.await(); - } - finally - { - lock.unlock(); - } - - producer.close(); - consumer.close(); - session.close(); - con.close(); - } - - public void test() - { - try - { - setUp(); - warmup(); - startTest(); - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - - public static void main(String[] args) - { - final LatencyTest latencyTest = new LatencyTest(); - Runnable r = new Runnable() - { - public void run() - { - latencyTest.test(); - latencyTest.printToConsole(); - if (System.getProperty("file") != null) - { - try - { - latencyTest.writeToFile(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating latency test thread",e); - } - t.start(); - } -} \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java deleted file mode 100644 index 95670f0507..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfBase.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.perf; - -import java.text.DecimalFormat; -import java.util.Hashtable; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Session; -import javax.naming.Context; -import javax.naming.InitialContext; - -public class PerfBase -{ - TestParams params; - Connection con; - Session session; - Destination dest; - Destination feedbackDest; - DecimalFormat df = new DecimalFormat("###.##"); - - public PerfBase() - { - params = new TestParams(); - } - - public void setUp() throws Exception - { - Hashtable env = new Hashtable(); - env.put(Context.INITIAL_CONTEXT_FACTORY, params.getInitialContextFactory()); - env.put(Context.PROVIDER_URL, params.getProviderURL()); - - Context ctx = null; - try - { - ctx = new InitialContext(env); - } - catch(Exception e) - { - throw new Exception("Error initializing JNDI",e); - - } - - ConnectionFactory conFac = null; - try - { - conFac = (ConnectionFactory)ctx.lookup(params.getConnectionFactory()); - } - catch(Exception e) - { - throw new Exception("Error looking up connection factory",e); - } - - con = conFac.createConnection(); - con.start(); - session = con.createSession(params.isTransacted(), - params.isTransacted()? Session.SESSION_TRANSACTED:params.getAckMode()); - - try - { - dest = (Destination)ctx.lookup( params.isDurable()? - params.getDurableDestination(): - params.getTransientDestination() - ); - } - catch(Exception e) - { - throw new Exception("Error looking up destination",e); - } - } - - public void handleError(Exception e,String msg) - { - StringBuilder sb = new StringBuilder(); - sb.append(msg); - sb.append(" "); - sb.append(e.getMessage()); - System.err.println(sb.toString()); - e.printStackTrace(); - } -} - diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java deleted file mode 100644 index 9781a7e839..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfConsumer.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.perf; - -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.thread.Threading; - -/** - * PerfConsumer will receive x no of messages in warmup mode. - * Once it receives the Start message it will then signal the PerfProducer. - * It will start recording stats from the first message it receives after - * the warmup mode is done. - * - * The following calculations are done. - * The important numbers to look at is - * a) Avg Latency - * b) System throughput. - * - * Latency. - * ========= - * Currently this test is written with the assumption that either - * a) The Perf Producer and Consumer are on the same machine - * b) They are on separate machines that have their time synced via a Time Server - * - * In order to calculate latency the producer inserts a timestamp - * hen the message is sent. The consumer will note the current time the message is - * received and will calculate the latency as follows - * latency = rcvdTime - msg.getJMSTimestamp() - * - * Through out the test it will keep track of the max and min latency to show the - * variance in latencies. - * - * Avg latency is measured by adding all latencies and dividing by the total msgs. - * You can also compute this by (rcvdTime - testStartTime)/rcvdMsgCount - * - * Throughput - * =========== - * System throughput is calculated as follows - * rcvdMsgCount/(rcvdTime - testStartTime) - * - * Consumer rate is calculated as - * rcvdMsgCount/(rcvdTime - startTime) - * - * Note that the testStartTime referes to when the producer sent the first message - * and startTime is when the consumer first received a message. - * - * rcvdTime keeps track of when the last message is received. - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - */ - -public class PerfConsumer extends PerfBase implements MessageListener -{ - MessageConsumer consumer; - long maxLatency = 0; - long minLatency = Long.MAX_VALUE; - long totalLatency = 0; // to calculate avg latency. - int rcvdMsgCount = 0; - long testStartTime = 0; // to measure system throughput - long startTime = 0; // to measure consumer throughput - long rcvdTime = 0; - boolean transacted = false; - int transSize = 0; - - final Object lock = new Object(); - - public PerfConsumer() - { - super(); - } - - public void setUp() throws Exception - { - super.setUp(); - consumer = session.createConsumer(dest); - - // Storing the following two for efficiency - transacted = params.isTransacted(); - transSize = params.getTransactionSize(); - } - - public void warmup()throws Exception - { - System.out.println("Warming up......"); - - boolean start = false; - while (!start) - { - Message msg = consumer.receive(); - if (msg instanceof TextMessage) - { - if (((TextMessage)msg).getText().equals("End")) - { - start = true; - MessageProducer temp = session.createProducer(msg.getJMSReplyTo()); - temp.send(session.createMessage()); - if (params.isTransacted()) - { - session.commit(); - } - temp.close(); - } - } - } - } - - public void startTest() throws Exception - { - System.out.println("Starting test......"); - consumer.setMessageListener(this); - } - - public void printResults() throws Exception - { - synchronized (lock) - { - lock.wait(); - } - - double avgLatency = (double)totalLatency/(double)rcvdMsgCount; - double throughput = ((double)rcvdMsgCount/(double)(rcvdTime - testStartTime))*1000; - double consRate = ((double)rcvdMsgCount/(double)(rcvdTime - startTime))*1000; - System.out.println(new StringBuilder("Total Msgs Received : ").append(rcvdMsgCount).toString()); - System.out.println(new StringBuilder("Consumer rate : "). - append(df.format(consRate)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("System Throughput : "). - append(df.format(throughput)). - append(" msg/sec").toString()); - System.out.println(new StringBuilder("Avg Latency : "). - append(df.format(avgLatency)). - append(" ms").toString()); - System.out.println(new StringBuilder("Min Latency : "). - append(minLatency). - append(" ms").toString()); - System.out.println(new StringBuilder("Max Latency : "). - append(maxLatency). - append(" ms").toString()); - System.out.println("Completed the test......\n"); - } - - public void notifyCompletion(Destination replyTo) throws Exception - { - MessageProducer tmp = session.createProducer(replyTo); - Message endMsg = session.createMessage(); - tmp.send(endMsg); - if (params.isTransacted()) - { - session.commit(); - } - tmp.close(); - } - - public void tearDown() throws Exception - { - consumer.close(); - session.close(); - con.close(); - } - - public void onMessage(Message msg) - { - try - { - if (msg instanceof TextMessage && ((TextMessage)msg).getText().equals("End")) - { - notifyCompletion(msg.getJMSReplyTo()); - - synchronized (lock) - { - lock.notifyAll(); - } - } - else - { - rcvdTime = System.currentTimeMillis(); - rcvdMsgCount ++; - - if (rcvdMsgCount == 1) - { - startTime = rcvdTime; - testStartTime = msg.getJMSTimestamp(); - } - - if (transacted && (rcvdMsgCount % transSize == 0)) - { - session.commit(); - } - - long latency = rcvdTime - msg.getJMSTimestamp(); - maxLatency = Math.max(maxLatency, latency); - minLatency = Math.min(minLatency, latency); - totalLatency = totalLatency + latency; - } - - } - catch(Exception e) - { - handleError(e,"Error when receiving messages"); - } - - } - - public void test() - { - try - { - setUp(); - warmup(); - startTest(); - printResults(); - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - public static void main(String[] args) - { - final PerfConsumer cons = new PerfConsumer(); - Runnable r = new Runnable() - { - public void run() - { - cons.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - t.start(); - } -} \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java deleted file mode 100644 index 62392e0e83..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/PerfProducer.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.perf; - -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import javax.jms.BytesMessage; -import javax.jms.DeliveryMode; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; - -import org.apache.qpid.testkit.MessageFactory; -import org.apache.qpid.thread.Threading; - -/** - * PerfProducer sends an x no of messages in warmup mode and wait for a confirmation - * from the consumer that it has successfully consumed them and ready to start the - * test. It will start sending y no of messages and each message will contain a time - * stamp. This will be used at the receiving end to measure the latency. - * - * This is done with the assumption that both consumer and producer are running on - * the same machine or different machines which have time synced using a time server. - * - * This test also calculates the producer rate as follows. - * rate = msg_count/(time_before_sending_msgs - time_after_sending_msgs) - * - * All throughput rates are given as msg/sec so the rates are multiplied by 1000. - * - * Rajith - Producer rate is not an accurate perf metric IMO. - * It is heavily inlfuenced by any in memory buffering. - * System throughput and latencies calculated by the PerfConsumer are more realistic - * numbers. - * - */ -public class PerfProducer extends PerfBase -{ - MessageProducer producer; - Message msg; - byte[] payload; - List payloads; - boolean cacheMsg = false; - boolean randomMsgSize = false; - boolean durable = false; - Random random; - int msgSizeRange = 1024; - - public PerfProducer() - { - super(); - } - - public void setUp() throws Exception - { - super.setUp(); - feedbackDest = session.createTemporaryQueue(); - - durable = params.isDurable(); - - // if message caching is enabled we pre create the message - // else we pre create the payload - if (params.isCacheMessage()) - { - cacheMsg = true; - - msg = MessageFactory.createBytesMessage(session, params.getMsgSize()); - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - } - else if (params.isRandomMsgSize()) - { - random = new Random(20080921); - randomMsgSize = true; - msgSizeRange = params.getMsgSize(); - payloads = new ArrayList(msgSizeRange); - - for (int i=0; i < msgSizeRange; i++) - { - payloads.add(MessageFactory.createMessagePayload(i).getBytes()); - } - } - else - { - payload = MessageFactory.createMessagePayload(params.getMsgSize()).getBytes(); - } - - producer = session.createProducer(dest); - producer.setDisableMessageID(params.isDisableMessageID()); - producer.setDisableMessageTimestamp(params.isDisableTimestamp()); - } - - protected Message getNextMessage() throws Exception - { - if (cacheMsg) - { - return msg; - } - else - { - msg = session.createBytesMessage(); - - if (!randomMsgSize) - { - ((BytesMessage)msg).writeBytes(payload); - } - else - { - ((BytesMessage)msg).writeBytes(payloads.get(random.nextInt(msgSizeRange))); - } - msg.setJMSDeliveryMode(durable? - DeliveryMode.PERSISTENT : - DeliveryMode.NON_PERSISTENT - ); - return msg; - } - } - - public void warmup()throws Exception - { - System.out.println("Warming up......"); - MessageConsumer tmp = session.createConsumer(feedbackDest); - - for (int i=0; i < params.getWarmupCount() -1; i++) - { - producer.send(getNextMessage()); - } - Message msg = session.createTextMessage("End"); - msg.setJMSReplyTo(feedbackDest); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.receive(); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.close(); - } - - public void startTest() throws Exception - { - System.out.println("Starting test......"); - int count = params.getMsgCount(); - boolean transacted = params.isTransacted(); - int tranSize = params.getTransactionSize(); - - long start = System.currentTimeMillis(); - for(int i=0; i < count; i++ ) - { - Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); - producer.send(msg); - if ( transacted && ((i+1) % tranSize == 0)) - { - session.commit(); - } - } - long time = System.currentTimeMillis() - start; - double rate = ((double)count/(double)time)*1000; - System.out.println(new StringBuilder("Producer rate: "). - append(df.format(rate)). - append(" msg/sec"). - toString()); - } - - public void waitForCompletion() throws Exception - { - MessageConsumer tmp = session.createConsumer(feedbackDest); - Message msg = session.createTextMessage("End"); - msg.setJMSReplyTo(feedbackDest); - producer.send(msg); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.receive(); - - if (params.isTransacted()) - { - session.commit(); - } - - tmp.close(); - System.out.println("Consumer has completed the test......"); - } - - public void tearDown() throws Exception - { - producer.close(); - session.close(); - con.close(); - } - - public void test() - { - try - { - setUp(); - warmup(); - startTest(); - waitForCompletion(); - tearDown(); - } - catch(Exception e) - { - handleError(e,"Error when running test"); - } - } - - - public static void main(String[] args) - { - final PerfProducer prod = new PerfProducer(); - Runnable r = new Runnable() - { - public void run() - { - prod.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.start(); - } -} \ No newline at end of file diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java deleted file mode 100644 index 2612af36e1..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/perf/TestParams.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.perf; - -import javax.jms.Session; - -public class TestParams -{ - private String initialContextFactory = "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; - - private String providerURL = System.getenv("QPID_TEST_HOME") + "/etc/jndi.properties"; - - private String connectionFactory = "connectionFactory"; - - private String transientDest = "transientQueue"; - - private String durableDest = "durableQueue"; - - private int msg_size = 1024; - - private int msg_type = 1; // not used yet - - private boolean cacheMessage = false; - - private boolean disableMessageID = false; - - private boolean disableTimestamp = false; - - private boolean durable = false; - - private boolean transacted = false; - - private int transaction_size = 1000; - - private int ack_mode = Session.AUTO_ACKNOWLEDGE; - - private int msg_count = 10; - - private int warmup_count = 1; - - private boolean random_msg_size = false; - - public TestParams() - { - initialContextFactory = System.getProperty("java.naming.factory.initial",initialContextFactory); - providerURL = System.getProperty("java.naming.provider.url",providerURL); - - transientDest = System.getProperty("transDest",transientDest); - durableDest = System.getProperty("durableDest",durableDest); - - msg_size = Integer.getInteger("msg_size", 1024); - msg_type = Integer.getInteger("msg_type",1); - cacheMessage = Boolean.getBoolean("cache_msg"); - disableMessageID = Boolean.getBoolean("disableMessageID"); - disableTimestamp = Boolean.getBoolean("disableTimestamp"); - durable = Boolean.getBoolean("durable"); - transacted = Boolean.getBoolean("transacted"); - transaction_size = Integer.getInteger("trans_size",1000); - ack_mode = Integer.getInteger("ack_mode",Session.AUTO_ACKNOWLEDGE); - msg_count = Integer.getInteger("msg_count",msg_count); - warmup_count = Integer.getInteger("warmup_count",warmup_count); - random_msg_size = Boolean.getBoolean("random_msg_size"); - } - - public int getAckMode() - { - return ack_mode; - } - - public String getConnectionFactory() - { - return connectionFactory; - } - - public String getTransientDestination() - { - return transientDest; - } - - public String getDurableDestination() - { - return durableDest; - } - - public String getInitialContextFactory() - { - return initialContextFactory; - } - - public int getMsgCount() - { - return msg_count; - } - - public int getMsgSize() - { - return msg_size; - } - - public int getMsgType() - { - return msg_type; - } - - public boolean isDurable() - { - return durable; - } - - public String getProviderURL() - { - return providerURL; - } - - public boolean isTransacted() - { - return transacted; - } - - public int getTransactionSize() - { - return transaction_size; - } - - public int getWarmupCount() - { - return warmup_count; - } - - public boolean isCacheMessage() - { - return cacheMessage; - } - - public boolean isDisableMessageID() - { - return disableMessageID; - } - - public boolean isDisableTimestamp() - { - return disableTimestamp; - } - - public boolean isRandomMsgSize() - { - return random_msg_size; - } - -} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java deleted file mode 100644 index 0c3a17b3d8..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/BaseTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.soak; - - -import java.text.DateFormat; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.Date; - -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.Session; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.testkit.MessageFactory; - -public class BaseTest -{ - protected String host = "127.0.0.1"; - protected int msg_size = 100; - protected int msg_count = 10; - protected int session_count = 1; - protected boolean durable = false; - protected String queue_name = "message_queue"; - protected String exchange_name = "amq.direct"; - protected String routing_key = "routing_key"; - protected String contentType = "application/octet-stream"; - protected int port = 5672; - protected String url; - protected Message[] msgArray; - - protected AMQConnection con; - protected Destination dest = null; - protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - protected NumberFormat nf = new DecimalFormat("##.00"); - - public BaseTest() - { - host = System.getProperty("host", "127.0.0.1"); - port = Integer.getInteger("port", 5672); - msg_size = Integer.getInteger("msg_size", 100); - msg_count = Integer.getInteger("msg_count", 10); - session_count = Integer.getInteger("session_count", 1); - durable = Boolean.getBoolean("durable"); - queue_name = System.getProperty("queue_name", "message_queue"); - exchange_name = System.getProperty("exchange_name", "amq.direct"); - routing_key = System.getProperty("routing_key", "routing_key"); - contentType = System.getProperty("content_type","application/octet-stream"); - - - - url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'"; - } - - public void setUp() - { - try - { - con = new AMQConnection(url); - con.start(); - - - if (exchange_name.equals("amq.topic")) - { - dest = new AMQTopic(new AMQShortString(exchange_name), - new AMQShortString(routing_key), - false, //auto-delete - null, //queue name - durable); - } - else - { - dest = new AMQQueue(new AMQShortString(exchange_name), - new AMQShortString(routing_key), - new AMQShortString(queue_name), - false, //exclusive - false, //auto-delete - durable); - } - - // Create the session to setup the messages - Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - - if (msg_size == -1) - { - // This creates an array of 1000 messages from 500-1500 bytes - // During the tests a message will be picked randomly - msgArray = new Message[1000]; - for (int i = 0; i < 1000; i++) - { - Message msg = (contentType.equals("text/plain")) ? - MessageFactory.createTextMessage(session,500 + i) : - MessageFactory.createBytesMessage(session, 500 + i); - msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - msgArray[i] = msg; - } - } - else - { - Message msg = (contentType.equals("text/plain")) ? - MessageFactory.createTextMessage(session, msg_size): - MessageFactory.createBytesMessage(session, msg_size); - msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - msgArray = new Message[] - { msg }; - } - - session.close(); - - } - catch (Exception e) - { - handleError(e,"Error while setting up the test"); - } - } - - public void handleError(Exception e,String msg) - { - StringBuilder sb = new StringBuilder(); - sb.append(msg); - sb.append(" @ "); - sb.append(df.format(new Date(System.currentTimeMillis()))); - sb.append(" "); - sb.append(e.getMessage()); - System.err.println(sb.toString()); - e.printStackTrace(); - } -} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java deleted file mode 100644 index d5514873e6..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedConsumer.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.soak; - - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.thread.Threading; - -/** - * Test Description - * ================ - * The difference between this test and the - * LongDurationConsumer is that each Session runs - * in it's own Thread and the ability to receive - * messages transactionally. - * - * All consumers will still share the same destination. - * - */ -public class MultiThreadedConsumer extends BaseTest -{ - protected final boolean transacted; - - public MultiThreadedConsumer() - { - super(); - transacted = Boolean.getBoolean("transacted"); - // needed only to calculate throughput. - // If msg_count is different set it via -Dmsg_count - msg_count = 10; - } - - /** - * Creates a Session and a consumer that runs in its - * own thread. - * It can also consume transactionally. - * - */ - public void test() - { - try - { - for (int i = 0; i < session_count; i++) - { - - final Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Runnable r = new Runnable() - { - public void run() - { - try - { - MessageConsumer consumer = session.createConsumer(dest); - - consumer.setMessageListener(new MessageListener() - { - - private boolean startIteration = true; - private long startTime = 0; - private long iterations = 0; - - public void onMessage(Message m) - { - try - { - long now = System.currentTimeMillis(); - if (startIteration) - { - startTime = m.getJMSTimestamp(); - startIteration = false; - } - - if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) - { - startIteration = true; - long totalIterationTime = now - startTime; - double throughput = ((double)msg_count/(double)totalIterationTime) * 1000; - long latencySample = now - m.getJMSTimestamp(); - iterations++; - - StringBuilder sb = new StringBuilder(); - sb.append(iterations).append(","). - append(nf.format(throughput)).append(",").append(latencySample); - - System.out.println(sb.toString()); - - MessageProducer temp = session.createProducer(m.getJMSReplyTo()); - Message controlMsg = session.createTextMessage(); - temp.send(controlMsg); - if (transacted) - { - session.commit(); - } - temp.close(); - } - } - catch (JMSException e) - { - handleError(e,"Exception receiving messages"); - } - } - }); - } - catch (Exception e) - { - handleError(e,"Exception creating a consumer"); - } - - } - - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - - t.setName("session-" + i); - t.start(); - } // for loop - } - catch (Exception e) - { - handleError(e,"Exception while setting up the test"); - } - - } - - public static void main(String[] args) - { - MultiThreadedConsumer test = new MultiThreadedConsumer(); - test.setUp(); - test.test(); - } - -} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java deleted file mode 100644 index 1cf4ee28ca..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/MultiThreadedProducer.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.soak; - - -import java.util.Random; -import java.util.UUID; - -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; - -/** - * Test Description - * ================ - * - * This test creats x number of sessions, where each session - * runs in it's own thread. Each session creates a producer - * and it's own feedback queue. - * - * A producer will send n-1 messages, followed by the n-th - * message which contains "End" in it's payload to signal - * that this is the last message message in the sequence. - * The end message has the feedback queue as it's replyTo. - * It will then listen on the feedback queue waiting for the - * confirmation and then sleeps for 1000 ms before proceeding - * with the next n messages. - * - * This hand shaking mechanism ensures that all of the - * messages sent are consumed by some consumer. This prevents - * the producers from saturating the broker especially when - * the consumers are slow. - * - * All producers send to a single destination - * If using transactions it's best to use smaller message count - * as the test only commits after sending all messages in a batch. - * - */ - -public class MultiThreadedProducer extends SimpleProducer -{ - protected final boolean transacted; - - public MultiThreadedProducer() - { - super(); - transacted = Boolean.getBoolean("transacted"); - } - - public void test() - { - try - { - final int msg_count_per_session = msg_count/session_count; - - for (int i = 0; i < session_count; i++) - { - final Session session = con.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - Runnable r = new Runnable() - { - private Random gen = new Random(); - - private Message getNextMessage() - { - if (msg_size == -1) - { - int index = gen.nextInt(1000); - return msgArray[index]; - } - else - { - return msgArray[0]; - } - } - - public void run() - { - try - { - MessageProducer prod = session.createProducer(dest); - // this will ensure that the producer will not overun the consumer. - feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"), new AMQShortString(UUID - .randomUUID().toString()), new AMQShortString("control")); - - MessageConsumer feedbackConsumer = session.createConsumer(feedbackQueue); - - while (true) - { - for (int i = 0; i < msg_count_per_session; i++) - { - Message msg = getNextMessage(); - msg.setJMSMessageID("ID:" + UUID.randomUUID()); - prod.send(msg); - } - - TextMessage m = session.createTextMessage("End"); - m.setJMSReplyTo(feedbackQueue); - prod.send(m); - - if (transacted) - { - session.commit(); - } - - System.out.println(df.format(System.currentTimeMillis())); - feedbackConsumer.receive(); - if (transacted) - { - session.commit(); - } - Thread.sleep(1000); - } - - } - catch (Exception e) - { - handleError(e,"Exception in producing message"); - } - - } - - }; - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - t.setName("session-" + i); - t.start(); - - } - - } - catch (Exception e) - { - handleError(e,"Exception while setting up the test"); - } - - } - - public static void main(String[] args) - { - MultiThreadedProducer test = new MultiThreadedProducer(); - test.setUp(); - test.test(); - } - -} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java index 1ae2c35970..c240ecdf2e 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/ResourceLeakTest.java @@ -21,6 +21,8 @@ package org.apache.qpid.testkit.soak; +import java.util.Random; + import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.MessageConsumer; @@ -29,7 +31,11 @@ import javax.jms.Session; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.BasicMessageConsumer; +import org.apache.qpid.client.BasicMessageProducer; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.testkit.TestLauncher; import org.apache.qpid.thread.Threading; /** @@ -37,8 +43,9 @@ import org.apache.qpid.thread.Threading; * ================ * This test will open x number of connections where each * connection will create a session and a producer/consumer pair, - * and then send configurable no of messages. - * It will them sleep for configurable time interval and + * and then a randomly selected set of connections (about 1/4th) + * will send a configurable no of messages and try to receive them. + * It will then sleep for configurable time interval and * tear down the connections/sessions/consumers. * It will then repeat the process again until the test is stopped. * @@ -47,16 +54,14 @@ import org.apache.qpid.thread.Threading; * To find if the broker has leaks when cleaning resources. * To find if the client has leaks with resources. */ -public class ResourceLeakTest extends BaseTest +public class ResourceLeakTest extends TestLauncher { - protected int connection_count = 10; - protected long connection_idle_time = 5000; - + /* protected long connection_idle_time = 5000; + protected Random rand = new Random(); + public ResourceLeakTest() { - super(); - connection_count = Integer.getInteger("con_count",10); - connection_idle_time = Long.getLong("con_idle_time", 5000); + super(); } public void test() @@ -68,13 +73,7 @@ public class ResourceLeakTest extends BaseTest Session[] sessions = new Session[connection_count]; MessageConsumer[] msgCons = new MessageConsumer[connection_count]; MessageProducer [] msgProds = new MessageProducer[connection_count]; - Destination dest = new AMQQueue(new AMQShortString(exchange_name), - new AMQShortString(routing_key), - new AMQShortString(queue_name), - true, //exclusive - true // auto delete - ); - + while (true) { for (int i = 0; i < connection_count; i++) @@ -84,23 +83,36 @@ public class ResourceLeakTest extends BaseTest cons[i] = con; Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE); sessions[i] = ssn; + Destination dest = new AMQQueue(new AMQShortString(exchange_name), + new AMQShortString(routing_key + i), + new AMQShortString(queue_name + i), + true, //exclusive + true // auto delete + ); MessageConsumer msgCon = ssn.createConsumer(dest); msgCons[i] = msgCon; MessageProducer msgProd = ssn.createProducer(dest); msgProds[i] = msgProd; - - BytesMessage msg = ssn.createBytesMessage(); + } + + // Select some connections randomly and send/recv messages + // Exercise around quarter of the connections + for (int i=0; i < connection_count/4; i++) + { + int k = rand.nextInt(connection_count); + + BytesMessage msg = sessions[k].createBytesMessage(); msg.writeBytes("Test Msg".getBytes()); for (int j = 0; j < msg_count;j++) { - msgProd.send(msg); + msgProds[k].send(msg); } int j = 0; while (j < msg_count) { - msgCon.receive(); + msgCons[k].receive(); j++; } } @@ -111,10 +123,24 @@ public class ResourceLeakTest extends BaseTest { for (int i = 0; i < connection_count; i++) { - msgCons[i].close(); - msgProds[i].close(); - sessions[i].close(); - cons[i].close(); + if (!((BasicMessageConsumer)msgCons[i]).isClosed()) + { + msgCons[i].close(); + } + + if (!((BasicMessageProducer)msgProds[i]).isClosed()) + { + msgProds[i].close(); + } + + if (!((AMQSession)sessions[i]).isClosed()) + { + sessions[i].close(); + } + if (!((AMQConnection)cons[i]).isClosed()) + { + cons[i].close(); + } } } catch (Exception e) @@ -149,6 +175,6 @@ public class ResourceLeakTest extends BaseTest { throw new Error("Error creating test thread",e); } - } + }*/ } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java deleted file mode 100644 index cd6d9013f8..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleConsumer.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.soak; - - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.thread.Threading; - -/** - * Test Description - * ================ - * This test will create x number of sessions. - * Each session will have it's own consumer. - * Once a consumer receives the "End" message it - * will send a message to the destination indicated - * by the replyTo field in the End message. - * This will signal the producer that all the previous - * messages have been consumed. The producer will - * then start sending messages again. - * - * This prevents the producer from overruning the - * consumer. - * * - * All consumers share a single destination - * - */ - -public class SimpleConsumer extends BaseTest -{ - public SimpleConsumer() - { - super(); - //needed only to calculate throughput. - // If msg_count is different set it via -Dmsg_count - msg_count = 10; - } - - public void test() - { - try - { - final Session[] sessions = new Session[session_count]; - MessageConsumer[] cons = new MessageConsumer[session_count]; - - for (int i = 0; i < session_count; i++) - { - sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - cons[i] = sessions[i].createConsumer(dest); - cons[i].setMessageListener(new MessageListener() - { - - private boolean startIteration = true; - private long startTime = 0; - private long iterations = 0; - - public void onMessage(Message m) - { - try - { - long now = System.currentTimeMillis(); - if (startIteration) - { - startTime = m.getJMSTimestamp(); - startIteration = false; - } - - if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) - { - - long totalIterationTime = now - startTime; - startIteration = true; - double throughput = ((double)msg_count/(double)totalIterationTime) * 1000; - long latencySample = now - m.getJMSTimestamp(); - iterations++; - - StringBuilder sb = new StringBuilder(); - sb.append(iterations).append(","). - append(nf.format(throughput)).append(",").append(latencySample); - - System.out.println(sb.toString()); - - MessageProducer temp = sessions[0].createProducer(m.getJMSReplyTo()); - Message controlMsg = sessions[0].createTextMessage(); - temp.send(controlMsg); - temp.close(); - } - } - catch (JMSException e) - { - handleError(e,"Exception when receiving the message"); - } - } - }); - } - - } - catch (Exception e) - { - handleError(e,"Exception when setting up the consumers"); - } - - } - - public static void main(String[] args) - { - final SimpleConsumer test = new SimpleConsumer(); - Runnable r = new Runnable(){ - public void run() - { - test.setUp(); - test.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating consumer thread",e); - } - } - -} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java deleted file mode 100644 index 805ce7ac29..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/soak/SimpleProducer.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit.soak; - - -import java.util.Random; -import java.util.UUID; - -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; - -/** - * Test Description - * ================ - * This test will send n-1 messages, followed by the n-th - * message which contains "End" in it's payload to signal - * that this is the last message message in the sequence. - * The end message has the feedback queue as it's replyTo. - * It will then listen on the feedback queue waiting for the - * confirmation and then sleeps for 1000 ms before proceeding - * with the next n messages. - * - * This hand shaking mechanism ensures that all of the - * messages sent are consumed by some consumer. This prevents - * the producers from saturating the broker especially when - * the consumers are slow. - * - * It creates a producer per session. - * If session_count is > 1 it will round robin the messages - * btw the producers. - * - * All producers send to a single destination - * - */ - -public class SimpleProducer extends BaseTest -{ - protected Destination feedbackQueue; - Random gen = new Random(); - - public SimpleProducer() - { - super(); - } - - protected Message getNextMessage() - { - if (msg_size == -1) - { - int index = gen.nextInt(1000); - return msgArray[index]; - } - else - { - return msgArray[0]; - } - } - - public void test() - { - try - { - Session[] sessions = new Session[session_count]; - MessageProducer[] prods = new MessageProducer[session_count]; - - for (int i = 0; i < session_count; i++) - { - sessions[i] = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - prods[i] = sessions[i].createProducer(dest); - } - - // this will ensure that the producer will not overun the consumer. - feedbackQueue = new AMQQueue(new AMQShortString("amq.direct"), - new AMQShortString(UUID.randomUUID().toString()), - new AMQShortString("control")); - - MessageConsumer feedbackConsumer = sessions[0].createConsumer(feedbackQueue); - - int prod_pointer = 0; - boolean multi_session = session_count > 1 ? true : false; - - while (true) - { - for (int i = 0; i < msg_count - 1; i++) - { - Message msg = getNextMessage(); - msg.setJMSTimestamp(System.currentTimeMillis()); - prods[prod_pointer].send(msg); - if (multi_session) - { - prod_pointer++; - if (prod_pointer == session_count) - { - prod_pointer = 0; - } - } - } - - TextMessage m = sessions[0].createTextMessage("End"); - m.setJMSReplyTo(feedbackQueue); - prods[prod_pointer].send(m); - System.out.println(df.format(System.currentTimeMillis())); - feedbackConsumer.receive(); - Thread.sleep(1000); - } - } - catch (Exception e) - { - handleError(e,"Exception while setting up the producer"); - } - - } - - public static void main(String[] args) - { - final SimpleProducer test = new SimpleProducer(); - Runnable r = new Runnable(){ - public void run() - { - test.setUp(); - test.test(); - } - }; - - Thread t; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - throw new Error("Error creating producer thread",e); - } - } - -} -- cgit v1.2.1 From 34f7b03fcbce2e80dc71640783dfbff772878060 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 11 Nov 2009 00:21:27 +0000 Subject: Moved MessageFactory to the tools module. Added a Generic Sender and a Receiver. They can be run standalone or used as a building block to create more complex tests. TestLauncher is a utility to start a sender or receiver in multiple threads with some added plumbing. Please refer to each class to see the full set of options available. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@834724 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/testkit/Client.java | 90 +++++ .../java/org/apache/qpid/testkit/ErrorHandler.java | 6 + .../org/apache/qpid/testkit/MessageFactory.java | 64 ---- .../java/org/apache/qpid/testkit/Receiver.java | 225 +++++++++++++ .../main/java/org/apache/qpid/testkit/Sender.java | 195 +++++++++++ .../java/org/apache/qpid/testkit/TestLauncher.java | 361 +++++++++++++++++++++ 6 files changed, 877 insertions(+), 64 deletions(-) create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java create mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java new file mode 100644 index 0000000000..88d78ee78c --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Session; + +public abstract class Client +{ + protected Connection con; + protected Session ssn; + protected boolean durable = false; + protected boolean transacted = false; + protected int txSize = 10; + protected int ack_mode = Session.AUTO_ACKNOWLEDGE; + protected String contentType = "application/octet-stream"; + protected Destination dest = null; + + protected long reportFrequency = 60000; // every min + protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + protected NumberFormat nf = new DecimalFormat("##.00"); + + protected long startTime = System.currentTimeMillis(); + protected ErrorHandler errorHandler = null; + + public Client(Connection con) throws Exception + { + this.con = con; + durable = Boolean.getBoolean("durable"); + transacted = Boolean.getBoolean("transacted"); + txSize = Integer.getInteger("tx_size",10); + contentType = System.getProperty("content_type","application/octet-stream"); + reportFrequency = Long.getLong("report_frequency", 60000); + } + + public void close() + { + try + { + con.close(); + } + catch (Exception e) + { + handleError("Error closing connection",e); + } + } + + public void setErrorHandler(ErrorHandler h) + { + this.errorHandler = h; + } + + public void handleError(String msg,Exception e) + { + if (errorHandler != null) + { + errorHandler.handleError(msg, e); + } + else + { + System.err.println(msg); + e.printStackTrace(); + } + } +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java new file mode 100644 index 0000000000..a1add8e03f --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java @@ -0,0 +1,6 @@ +package org.apache.qpid.testkit; + +public interface ErrorHandler { + + public void handleError(String msg,Exception e); +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java deleted file mode 100644 index 8b7b7fa434..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/MessageFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -package org.apache.qpid.testkit; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import javax.jms.BytesMessage; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.Session; -import javax.jms.TextMessage; - -public class MessageFactory -{ - public static Message createBytesMessage(Session ssn, int size) throws JMSException - { - BytesMessage msg = ssn.createBytesMessage(); - msg.writeBytes(createMessagePayload(size).getBytes()); - return msg; - } - - public static Message createTextMessage(Session ssn, int size) throws JMSException - { - TextMessage msg = ssn.createTextMessage(); - msg.setText(createMessagePayload(size)); - return msg; - } - - public static String createMessagePayload(int size) - { - String msgData = "Qpid Test Message"; - - StringBuffer buf = new StringBuffer(size); - int count = 0; - while (count <= (size - msgData.length())) - { - buf.append(msgData); - count += msgData.length(); - } - if (count < size) - { - buf.append(msgData, 0, size - count); - } - - return buf.toString(); - } -} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java new file mode 100644 index 0000000000..19ae325d4b --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java @@ -0,0 +1,225 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQConnection; + +/** + * A generic receiver which consumers a stream of messages + * from a given address in a broker (host/port) + * until told to stop by killing it. + * + * It participates in a feedback loop to ensure the producer + * doesn't fill up the queue. If it receives an "End" msg + * it sends a reply to the replyTo address in that msg. + * + * It doesn't check for correctness or measure anything + * leaving those concerns to another entity. + * However it prints a timestamp every x secs(-Dreport_frequency) + * as checkpoint to figure out how far the test has progressed if + * a failure occurred. + * + * It also takes in an optional Error handler to + * pass out any error in addition to writing them to std err. + * + * This is intended more as building block to create + * more complex test cases. However there is a main method + * provided to use this standalone. + * + * The following options are available and configurable + * via jvm args. + * + * sync_rcv - Whether to consume sync (instead of using a listener). + * report_frequency - how often a timestamp is printed + * durable + * transacted + * tx_size - size of transaction batch in # msgs. + */ +public class Receiver extends Client implements MessageListener +{ + // Until addressing is properly supported. + protected enum Reliability { + AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE; + + Reliability getReliability(String s) + { + if (s.equalsIgnoreCase("at_most_once")) + { + return AT_MOST_ONCE; + } + else if (s.equalsIgnoreCase("at_least_once")) + { + return AT_LEAST_ONCE; + } + else + { + return EXACTLY_ONCE; + } + } + }; + + long msg_count = 0; + int sequence = 0; + boolean sync_rcv = Boolean.getBoolean("sync_rcv"); + boolean uniqueDests = Boolean.getBoolean("unique_dests"); + Reliability reliability = Reliability.EXACTLY_ONCE; + MessageConsumer consumer; + List duplicateMessages = new ArrayList(); + + public Receiver(Connection con,Destination dest) throws Exception + { + super(con); + reliability = reliability.getReliability(System.getProperty("reliability","exactly_once")); + ssn = con.createSession(transacted,ack_mode); + consumer = ssn.createConsumer(dest); + if (!sync_rcv) + { + consumer.setMessageListener(this); + } + + System.out.println("Operating in mode : " + reliability); + System.out.println("Receiving messages from : " + dest); + } + + public void onMessage(Message msg) + { + handleMessage(msg); + } + + public void run() throws Exception + { + while(true) + { + if(sync_rcv) + { + Message msg = consumer.receive(); + handleMessage(msg); + } + Thread.sleep(reportFrequency); + System.out.println(df.format(System.currentTimeMillis()) + + " - messages received : " + msg_count); + } + } + + private void handleMessage(Message m) + { + try + { + if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) + { + MessageProducer temp = ssn.createProducer(m.getJMSReplyTo()); + Message controlMsg = ssn.createTextMessage(); + temp.send(controlMsg); + if (transacted) + { + ssn.commit(); + } + temp.close(); + } + else + { + + int seq = m.getIntProperty("sequence"); + if (uniqueDests) + { + if (seq == 0) + { + sequence = 0; // wrap around for each iteration + } + + if (seq < sequence) + { + duplicateMessages.add(seq); + if (reliability == Reliability.EXACTLY_ONCE) + { + throw new Exception(": Received a duplicate message (expected=" + + sequence + ",received=" + seq + ")" ); + } + } + else if (seq == sequence) + { + sequence++; + msg_count ++; + } + else + { + // Multiple publishers are not allowed in this test case. + // So out of order messages are not allowed. + throw new Exception(": Received an out of order message (expected=" + + sequence + ",received=" + seq + ")" ); + } + } + // Please note that this test case doesn't expect duplicates + // When testing for transactions. + if (transacted && msg_count % txSize == 0) + { + ssn.commit(); + } + } + } + catch (Exception e) + { + e.printStackTrace(); + handleError("Exception receiving messages",e); + } + } + + // Receiver host port address + public static void main(String[] args) throws Exception + { + String host = "127.0.0.1"; + int port = 5672; + + if (args.length > 0) + { + host = args[0]; + } + if (args.length > 1) + { + port = Integer.parseInt(args[1]); + } + // #3rd argument should be an address + // Any other properties is best configured via jvm args + + AMQConnection con = new AMQConnection( + "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "'"); + + // FIXME Need to add support for the new address format + // Then it's trivial to add destination for that. + Receiver rcv = new Receiver(con,null); + rcv.run(); + } + +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java new file mode 100644 index 0000000000..4dbe278e33 --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java @@ -0,0 +1,195 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.Random; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.tools.MessageFactory; + +/** + * A generic sender which sends a stream of messages + * to a given address in a broker (host/port) + * until told to stop by killing it. + * + * It has a feedback loop to ensure it doesn't fill + * up queues due to a slow consumer. + * + * It doesn't check for correctness or measure anything + * leaving those concerns to another entity. + * However it prints a timestamp every x secs(-Dreport_frequency) + * as checkpoint to figure out how far the test has progressed if + * a failure occurred. + * + * It also takes in an optional Error handler to + * pass out any error in addition to writing them to std err. + * + * This is intended more as building block to create + * more complex test cases. However there is a main method + * provided to use this standalone. + * + * The following options are available and configurable + * via jvm args. + * + * msg_size (256) + * msg_count (10) - # messages before waiting for feedback + * sleep_time (1000 ms) - sleep time btw each iteration + * report_frequency - how often a timestamp is printed + * durable + * transacted + * tx_size - size of transaction batch in # msgs. + */ +public class Sender extends Client +{ + protected int msg_size = 256; + protected int msg_count = 10; + protected int iterations = -1; + protected long sleep_time = 1000; + + protected Destination dest = null; + protected Destination replyTo = null; + protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + protected NumberFormat nf = new DecimalFormat("##.00"); + + protected MessageProducer producer; + Random gen = new Random(19770905); + + public Sender(Connection con,Destination dest) throws Exception + { + super(con); + this.msg_size = Integer.getInteger("msg_size", 100); + this.msg_count = Integer.getInteger("msg_count", 10); + this.iterations = Integer.getInteger("iterations", -1); + this.sleep_time = Long.getLong("sleep_time", 1000); + this.ssn = con.createSession(transacted,Session.AUTO_ACKNOWLEDGE); + this.dest = dest; + this.producer = ssn.createProducer(dest); + this.replyTo = ssn.createTemporaryQueue(); + + System.out.println("Sending messages to : " + dest); + } + + /* + * If msg_size not specified it generates a message + * between 500-1500 bytes. + */ + protected Message getNextMessage() throws Exception + { + int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size; + Message msg = (contentType.equals("text/plain")) ? + MessageFactory.createTextMessage(ssn, s): + MessageFactory.createBytesMessage(ssn, s); + + msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT + : DeliveryMode.NON_PERSISTENT); + return msg; + } + + public void run() + { + try + { + boolean infinite = (iterations == -1); + for (int x=0; infinite || x < iterations; x++) + { + long now = System.currentTimeMillis(); + if (now - startTime >= reportFrequency) + { + System.out.println(df.format(now) + " - iterations : " + x); + startTime = now; + } + + for (int i = 0; i < msg_count; i++) + { + Message msg = getNextMessage(); + msg.setIntProperty("sequence",i); + producer.send(msg); + if (transacted && msg_count % txSize == 0) + { + ssn.commit(); + } + } + TextMessage m = ssn.createTextMessage("End"); + m.setJMSReplyTo(replyTo); + producer.send(m); + + if (transacted) + { + ssn.commit(); + } + + MessageConsumer feedbackConsumer = ssn.createConsumer(replyTo); + feedbackConsumer.receive(); + feedbackConsumer.close(); + if (transacted) + { + ssn.commit(); + } + Thread.sleep(sleep_time); + } + } + catch (Exception e) + { + handleError("Exception sending messages",e); + } + } + + // Receiver host port address + public static void main(String[] args) throws Exception + { + String host = "127.0.0.1"; + int port = 5672; + + if (args.length > 0) + { + host = args[0]; + } + if (args.length > 1) + { + port = Integer.parseInt(args[1]); + } + // #3rd argument should be an address + // Any other properties is best configured via jvm args + + AMQConnection con = new AMQConnection( + "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "'"); + + // FIXME Need to add support for the new address format + // Then it's trivial to add destination for that. + Sender sender = new Sender(con,null); + sender.run(); + } +} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java new file mode 100644 index 0000000000..f13ee3f75c --- /dev/null +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java @@ -0,0 +1,361 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.testkit; + + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.text.DateFormat; +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.thread.Threading; + +/** + * A basic test case class that could launch a Sender/Receiver + * or both, each on it's own separate thread. + * + * If con_count == ssn_count, then each entity created will have + * it's own Connection. Else if con_count < ssn_count, then + * a connection will be shared by ssn_count/con_count # of entities. + * + * The if both sender and receiver options are set, it will + * share a connection. + * + * The following options are available as jvm args + * host, port + * con_count,ssn_count + * con_idle_time - which determines heartbeat + * sender, receiver - booleans which indicate which entity to create. + * Setting them both is also a valid option. + */ +public class TestLauncher implements ErrorHandler +{ + protected String host = "127.0.0.1"; + protected int port = 5672; + protected int session_count = 1; + protected int connection_count = 1; + protected long connection_idle_time = 5000; + protected boolean sender = false; + protected boolean receiver = false; + protected String url; + + protected String queue_name = "message_queue"; + protected String exchange_name = "amq.direct"; + protected String routing_key = "routing_key"; + protected boolean uniqueDests = false; + protected boolean durable = false; + protected String failover = ""; + protected AMQConnection controlCon; + protected Destination controlDest = null; + protected Session controlSession = null; + protected MessageProducer statusSender; + protected List clients = new ArrayList(); + protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + protected NumberFormat nf = new DecimalFormat("##.00"); + protected String testName; + + public TestLauncher() + { + testName = System.getProperty("test_name","UNKNOWN"); + host = System.getProperty("host", "127.0.0.1"); + port = Integer.getInteger("port", 5672); + session_count = Integer.getInteger("ssn_count", 1); + connection_count = Integer.getInteger("con_count", 1); + connection_idle_time = Long.getLong("con_idle_time", 5000); + sender = Boolean.getBoolean("sender"); + receiver = Boolean.getBoolean("receiver"); + + queue_name = System.getProperty("queue_name", "message_queue"); + exchange_name = System.getProperty("exchange_name", "amq.direct"); + routing_key = System.getProperty("routing_key", "routing_key"); + failover = System.getProperty("failover", ""); + uniqueDests = Boolean.getBoolean("unique_dests"); + durable = Boolean.getBoolean("durable"); + + url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" + + host + ":" + port + "?idle_timeout=" + connection_idle_time + + "'"; + + if (failover.equalsIgnoreCase("failover_exchange")) + { + url += "&failover='failover_exchange'"; + + System.out.println("Failover exchange " + url ); + } + } + + public void setUpControlChannel() + { + try + { + controlCon = new AMQConnection(url); + controlCon.start(); + + controlDest = new AMQQueue(new AMQShortString(""), + new AMQShortString("control"), + new AMQShortString("control"), + false, //exclusive + false, //auto-delete + false); // durable + + // Create the session to setup the messages + controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE); + statusSender = controlSession.createProducer(controlDest); + + } + catch (Exception e) + { + handleError("Error while setting up the test",e); + } + } + + public void cleanup() + { + try + { + controlSession.close(); + controlCon.close(); + for (AMQConnection con : clients) + { + con.close(); + } + } + catch (Exception e) + { + handleError("Error while tearing down the test",e); + } + } + + public void start() + { + try + { + + int ssn_per_con = session_count; + if (connection_count < session_count) + { + ssn_per_con = session_count/connection_count; + } + + for (int i = 0; i< connection_count; i++) + { + AMQConnection con = new AMQConnection(url); + con.start(); + clients.add(con); + for (int j = 0; j< ssn_per_con; j++) + { + String prefix = createPrefix(i,j); + Destination dest = createDest(prefix); + if (sender) + { + createSender(prefix,con,dest,this); + } + + if (receiver) + { + createReceiver(prefix,con,dest,this); + } + } + } + } + catch (Exception e) + { + handleError("Exception while setting up the test",e); + } + + } + + protected void createReceiver(String index,final AMQConnection con, final Destination dest, final ErrorHandler h) + { + Runnable r = new Runnable() + { + public void run() + { + try + { + Receiver rcv = new Receiver(con,dest); + rcv.setErrorHandler(h); + rcv.run(); + } + catch (Exception e) + { + h.handleError("Error Starting Receiver", e); + } + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + handleError("Error creating Receive thread",e); + } + + t.setName("ReceiverThread-" + index); + t.start(); + } + + protected void createSender(String index,final AMQConnection con, final Destination dest, final ErrorHandler h) + { + Runnable r = new Runnable() + { + public void run() + { + try + { + Sender sender = new Sender(con, dest); + sender.setErrorHandler(h); + sender.run(); + } + catch (Exception e) + { + h.handleError("Error Starting Sender", e); + } + } + }; + + Thread t = null; + try + { + t = Threading.getThreadFactory().createThread(r); + } + catch(Exception e) + { + handleError("Error creating Sender thread",e); + } + + t.setName("SenderThread-" + index); + t.start(); + } + + public void handleError(String msg,Exception e) + { + // In case sending the message fails + StringBuilder sb = new StringBuilder(); + sb.append(msg); + sb.append(" @ "); + sb.append(df.format(new Date(System.currentTimeMillis()))); + sb.append(" "); + sb.append(e.getMessage()); + System.err.println(sb.toString()); + e.printStackTrace(); + + try + { + TextMessage errorMsg = controlSession.createTextMessage(); + errorMsg.setStringProperty("status", "error"); + errorMsg.setStringProperty("desc", msg); + errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis()))); + errorMsg.setStringProperty("exception-trace", serializeStackTrace(e)); + synchronized (this) + { + statusSender.send(errorMsg); + } + } catch (JMSException e1) { + e1.printStackTrace(); + } + } + + private String serializeStackTrace(Exception e) + { + ByteArrayOutputStream bOut = new ByteArrayOutputStream(); + PrintStream printStream = new PrintStream(bOut); + e.printStackTrace(printStream); + printStream.close(); + return bOut.toString(); + } + + private String createPrefix(int i, int j) + { + return String.valueOf(i).concat(String.valueOf(j)); + } + + /** + * The following are supported. + * + * 1. A producer/consumer pair on a topic or a queue + * 2. A single producer with multiple consumers on topic/queue + * + * Multiple consumers on a topic will result in a private queue + * for each consumers. + * + * We want to avoid multiple producers on the same topic/queue + * as the queues will fill up in no time. + */ + private Destination createDest(String prefix) + { + Destination dest = null; + if (exchange_name.equals("amq.topic")) + { + dest = new AMQTopic( + new AMQShortString(exchange_name), + new AMQShortString(uniqueDests ? prefix + routing_key : + routing_key), + false, //auto-delete + null, //queue name + durable); + } + else + { + dest = new AMQQueue( + new AMQShortString(exchange_name), + new AMQShortString(uniqueDests ? prefix + routing_key : + routing_key), + new AMQShortString(uniqueDests ? prefix + queue_name : + queue_name), + false, //exclusive + false, //auto-delete + durable); + } + return dest; + } + + public static void main(String[] args) + { + final TestLauncher test = new TestLauncher(); + test.setUpControlChannel(); + test.start(); + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { test.cleanup(); } + }); + + } +} -- cgit v1.2.1 From 2bddecb9a34dd9f541ac9f7fc8e09c0210b2592f Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 18 Nov 2009 19:36:38 +0000 Subject: Added log4j config to the test launcher Added shell script to run testkit.py Removed brokertest.py, instead using the version checked in under python/qpid folder by Alan. The shell scripts and the setup is work in progress, checking in now to help Alan reproduce an issue. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@881896 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/testkit/TestLauncher.java | 23 ++++++++++++++++++++++ 1 file changed, 23 insertions(+) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java index f13ee3f75c..b55afa7066 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java @@ -39,6 +39,11 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; @@ -117,8 +122,26 @@ public class TestLauncher implements ErrorHandler System.out.println("Failover exchange " + url ); } + + configureLogging(); } + protected void configureLogging() + { + PatternLayout layout = new PatternLayout(); + layout.setConversionPattern("%t %d %p [%c{4}] %m%n"); + BasicConfigurator.configure(new ConsoleAppender(layout)); + + String logLevel = System.getProperty("log.level","warn"); + String logComponent = System.getProperty("log.comp","org.apache.qpid"); + + Logger logger = Logger.getLogger(logComponent); + logger.setLevel(Level.toLevel(logLevel, Level.WARN)); + + System.out.println("Level " + logger.getLevel()); + + } + public void setUpControlChannel() { try -- cgit v1.2.1 From 8618b7e45ba6b0f74b27c68d2bcf94cd57f15758 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Fri, 12 Feb 2010 22:30:38 +0000 Subject: I have added the license header to the files included in this commit. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@909641 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/testkit/ErrorHandler.java | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java index a1add8e03f..dbc73c404f 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java @@ -1,4 +1,25 @@ package org.apache.qpid.testkit; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + public interface ErrorHandler { -- cgit v1.2.1 From 8c8130d0288446f88deef393823cccffcaed474d Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 12 Sep 2010 22:40:40 +0000 Subject: QPID-2857 : Address issues found by running FindBugs against the Java codebase git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@996393 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/testkit/Client.java | 86 ++++++++++++++++++---- .../java/org/apache/qpid/testkit/Receiver.java | 21 +++--- .../main/java/org/apache/qpid/testkit/Sender.java | 34 ++++----- 3 files changed, 98 insertions(+), 43 deletions(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java index 88d78ee78c..34818fcbea 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java @@ -32,21 +32,21 @@ import javax.jms.Session; public abstract class Client { - protected Connection con; - protected Session ssn; - protected boolean durable = false; - protected boolean transacted = false; - protected int txSize = 10; - protected int ack_mode = Session.AUTO_ACKNOWLEDGE; - protected String contentType = "application/octet-stream"; - protected Destination dest = null; - - protected long reportFrequency = 60000; // every min - protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - protected NumberFormat nf = new DecimalFormat("##.00"); - - protected long startTime = System.currentTimeMillis(); - protected ErrorHandler errorHandler = null; + private Connection con; + private Session ssn; + private boolean durable = false; + private boolean transacted = false; + private int txSize = 10; + private int ack_mode = Session.AUTO_ACKNOWLEDGE; + private String contentType = "application/octet-stream"; + + private long reportFrequency = 60000; // every min + + private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); + private NumberFormat nf = new DecimalFormat("##.00"); + + private long startTime = System.currentTimeMillis(); + private ErrorHandler errorHandler = null; public Client(Connection con) throws Exception { @@ -87,4 +87,60 @@ public abstract class Client e.printStackTrace(); } } + + protected Session getSsn() + { + return ssn; + } + + protected void setSsn(Session ssn) + { + this.ssn = ssn; + } + + protected boolean isDurable() + { + return durable; + } + + protected boolean isTransacted() + { + return transacted; + } + + protected int getTxSize() + { + return txSize; + } + + protected int getAck_mode() + { + return ack_mode; + } + + protected String getContentType() + { + return contentType; + } + + protected long getReportFrequency() + { + return reportFrequency; + } + + protected long getStartTime() + { + return startTime; + } + + protected void setStartTime(long startTime) + { + this.startTime = startTime; + } + + public DateFormat getDf() + { + return df; + } + } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java index 19ae325d4b..6d33a5f788 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java @@ -26,7 +26,6 @@ import java.util.List; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -101,8 +100,8 @@ public class Receiver extends Client implements MessageListener { super(con); reliability = reliability.getReliability(System.getProperty("reliability","exactly_once")); - ssn = con.createSession(transacted,ack_mode); - consumer = ssn.createConsumer(dest); + setSsn(con.createSession(isTransacted(), getAck_mode())); + consumer = getSsn().createConsumer(dest); if (!sync_rcv) { consumer.setMessageListener(this); @@ -126,8 +125,8 @@ public class Receiver extends Client implements MessageListener Message msg = consumer.receive(); handleMessage(msg); } - Thread.sleep(reportFrequency); - System.out.println(df.format(System.currentTimeMillis()) + Thread.sleep(getReportFrequency()); + System.out.println(getDf().format(System.currentTimeMillis()) + " - messages received : " + msg_count); } } @@ -138,12 +137,12 @@ public class Receiver extends Client implements MessageListener { if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) { - MessageProducer temp = ssn.createProducer(m.getJMSReplyTo()); - Message controlMsg = ssn.createTextMessage(); + MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo()); + Message controlMsg = getSsn().createTextMessage(); temp.send(controlMsg); - if (transacted) + if (isTransacted()) { - ssn.commit(); + getSsn().commit(); } temp.close(); } @@ -182,9 +181,9 @@ public class Receiver extends Client implements MessageListener } // Please note that this test case doesn't expect duplicates // When testing for transactions. - if (transacted && msg_count % txSize == 0) + if (isTransacted() && msg_count % getTxSize() == 0) { - ssn.commit(); + getSsn().commit(); } } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java index 4dbe278e33..de50894491 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java @@ -93,10 +93,10 @@ public class Sender extends Client this.msg_count = Integer.getInteger("msg_count", 10); this.iterations = Integer.getInteger("iterations", -1); this.sleep_time = Long.getLong("sleep_time", 1000); - this.ssn = con.createSession(transacted,Session.AUTO_ACKNOWLEDGE); + this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE)); this.dest = dest; - this.producer = ssn.createProducer(dest); - this.replyTo = ssn.createTemporaryQueue(); + this.producer = getSsn().createProducer(dest); + this.replyTo = getSsn().createTemporaryQueue(); System.out.println("Sending messages to : " + dest); } @@ -108,11 +108,11 @@ public class Sender extends Client protected Message getNextMessage() throws Exception { int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size; - Message msg = (contentType.equals("text/plain")) ? - MessageFactory.createTextMessage(ssn, s): - MessageFactory.createBytesMessage(ssn, s); + Message msg = (getContentType().equals("text/plain")) ? + MessageFactory.createTextMessage(getSsn(), s): + MessageFactory.createBytesMessage(getSsn(), s); - msg.setJMSDeliveryMode((durable) ? DeliveryMode.PERSISTENT + msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); return msg; } @@ -125,10 +125,10 @@ public class Sender extends Client for (int x=0; infinite || x < iterations; x++) { long now = System.currentTimeMillis(); - if (now - startTime >= reportFrequency) + if (now - getStartTime() >= getReportFrequency()) { System.out.println(df.format(now) + " - iterations : " + x); - startTime = now; + setStartTime(now); } for (int i = 0; i < msg_count; i++) @@ -136,26 +136,26 @@ public class Sender extends Client Message msg = getNextMessage(); msg.setIntProperty("sequence",i); producer.send(msg); - if (transacted && msg_count % txSize == 0) + if (isTransacted() && msg_count % getTxSize() == 0) { - ssn.commit(); + getSsn().commit(); } } - TextMessage m = ssn.createTextMessage("End"); + TextMessage m = getSsn().createTextMessage("End"); m.setJMSReplyTo(replyTo); producer.send(m); - if (transacted) + if (isTransacted()) { - ssn.commit(); + getSsn().commit(); } - MessageConsumer feedbackConsumer = ssn.createConsumer(replyTo); + MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo); feedbackConsumer.receive(); feedbackConsumer.close(); - if (transacted) + if (isTransacted()) { - ssn.commit(); + getSsn().commit(); } Thread.sleep(sleep_time); } -- cgit v1.2.1 From a14a3a1c1a73c58f4d4682bdf72be3db40ce1eb4 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 29 Sep 2010 01:55:36 +0000 Subject: Modified the Sender and Receiver to work with the new addressing strings. You could now invoke the sender or receiver by passing an addressing string as a program argument. The sender and receiver could now be utilised more easily as a building block for scripting test cases. Modified the TestLauncher to also work with addressing strings. The Receiver was also modified to work as a durable subscriber if specified via a JVM arg. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1002446 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/testkit/Receiver.java | 87 ++++++-------- .../main/java/org/apache/qpid/testkit/Sender.java | 18 +-- .../java/org/apache/qpid/testkit/TestLauncher.java | 128 +++++++++------------ 3 files changed, 103 insertions(+), 130 deletions(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java index 6d33a5f788..b6b1bd29a0 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java @@ -32,10 +32,11 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.TextMessage; +import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; /** - * A generic receiver which consumers a stream of messages + * A generic receiver which consumes messages * from a given address in a broker (host/port) * until told to stop by killing it. * @@ -63,52 +64,31 @@ import org.apache.qpid.client.AMQConnection; * report_frequency - how often a timestamp is printed * durable * transacted - * tx_size - size of transaction batch in # msgs. + * tx_size - size of transaction batch in # msgs. * + * check_for_dups - check for duplicate messages and out of order messages. + * jms_durable_sub - create a durable subscription instead of a regular subscription. */ public class Receiver extends Client implements MessageListener { - // Until addressing is properly supported. - protected enum Reliability { - AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE; - - Reliability getReliability(String s) - { - if (s.equalsIgnoreCase("at_most_once")) - { - return AT_MOST_ONCE; - } - else if (s.equalsIgnoreCase("at_least_once")) - { - return AT_LEAST_ONCE; - } - else - { - return EXACTLY_ONCE; - } - } - }; - long msg_count = 0; int sequence = 0; - boolean sync_rcv = Boolean.getBoolean("sync_rcv"); - boolean uniqueDests = Boolean.getBoolean("unique_dests"); - Reliability reliability = Reliability.EXACTLY_ONCE; + boolean syncRcv = Boolean.getBoolean("sync_rcv"); + boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub"); + boolean checkForDups = Boolean.getBoolean("check_for_dups"); MessageConsumer consumer; List duplicateMessages = new ArrayList(); - public Receiver(Connection con,Destination dest) throws Exception + public Receiver(Connection con,String addr) throws Exception { super(con); - reliability = reliability.getReliability(System.getProperty("reliability","exactly_once")); setSsn(con.createSession(isTransacted(), getAck_mode())); - consumer = getSsn().createConsumer(dest); - if (!sync_rcv) + consumer = getSsn().createConsumer(new AMQAnyDestination(addr)); + if (!syncRcv) { consumer.setMessageListener(this); } - System.out.println("Operating in mode : " + reliability); - System.out.println("Receiving messages from : " + dest); + System.out.println("Receiving messages from : " + addr); } public void onMessage(Message msg) @@ -118,21 +98,30 @@ public class Receiver extends Client implements MessageListener public void run() throws Exception { + long sleepTime = getReportFrequency(); while(true) { - if(sync_rcv) - { - Message msg = consumer.receive(); - handleMessage(msg); + if(syncRcv) + { + long t = sleepTime; + while (t > 0) + { + long start = System.currentTimeMillis(); + Message msg = consumer.receive(t); + t = t - (System.currentTimeMillis() - start); + handleMessage(msg); + } } - Thread.sleep(getReportFrequency()); - System.out.println(getDf().format(System.currentTimeMillis()) - + " - messages received : " + msg_count); + Thread.sleep(sleepTime); + System.out.println(getDf().format(System.currentTimeMillis()) + + " - messages received : " + msg_count); } } private void handleMessage(Message m) { + if (m == null) { return; } + try { if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) @@ -150,21 +139,18 @@ public class Receiver extends Client implements MessageListener { int seq = m.getIntProperty("sequence"); - if (uniqueDests) + if (checkForDups) { if (seq == 0) { sequence = 0; // wrap around for each iteration + System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration"); + duplicateMessages.clear(); } if (seq < sequence) { duplicateMessages.add(seq); - if (reliability == Reliability.EXACTLY_ONCE) - { - throw new Exception(": Received a duplicate message (expected=" - + sequence + ",received=" + seq + ")" ); - } } else if (seq == sequence) { @@ -199,6 +185,7 @@ public class Receiver extends Client implements MessageListener { String host = "127.0.0.1"; int port = 5672; + String addr = "message_queue"; if (args.length > 0) { @@ -208,16 +195,16 @@ public class Receiver extends Client implements MessageListener { port = Integer.parseInt(args[1]); } - // #3rd argument should be an address - // Any other properties is best configured via jvm args + if (args.length > 2) + { + addr = args[2]; + } AMQConnection con = new AMQConnection( "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'"); - // FIXME Need to add support for the new address format - // Then it's trivial to add destination for that. - Receiver rcv = new Receiver(con,null); + Receiver rcv = new Receiver(con,addr); rcv.run(); } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java index de50894491..14b9b7302f 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java @@ -36,6 +36,7 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.tools.MessageFactory; @@ -86,7 +87,7 @@ public class Sender extends Client protected MessageProducer producer; Random gen = new Random(19770905); - public Sender(Connection con,Destination dest) throws Exception + public Sender(Connection con,String addr) throws Exception { super(con); this.msg_size = Integer.getInteger("msg_size", 100); @@ -94,11 +95,11 @@ public class Sender extends Client this.iterations = Integer.getInteger("iterations", -1); this.sleep_time = Long.getLong("sleep_time", 1000); this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE)); - this.dest = dest; + this.dest = new AMQAnyDestination(addr); this.producer = getSsn().createProducer(dest); this.replyTo = getSsn().createTemporaryQueue(); - System.out.println("Sending messages to : " + dest); + System.out.println("Sending messages to : " + addr); } /* @@ -171,6 +172,7 @@ public class Sender extends Client { String host = "127.0.0.1"; int port = 5672; + String addr = "message_queue"; if (args.length > 0) { @@ -180,16 +182,16 @@ public class Sender extends Client { port = Integer.parseInt(args[1]); } - // #3rd argument should be an address - // Any other properties is best configured via jvm args + if (args.length > 2) + { + addr = args[2]; + } AMQConnection con = new AMQConnection( "amqp://username:password@topicClientid/test?brokerlist='tcp://" + host + ":" + port + "'"); - // FIXME Need to add support for the new address format - // Then it's trivial to add destination for that. - Sender sender = new Sender(con,null); + Sender sender = new Sender(con,addr); sender.run(); } } diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java index b55afa7066..560ada244d 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java @@ -44,6 +44,7 @@ import org.apache.log4j.ConsoleAppender; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; +import org.apache.qpid.client.AMQAnyDestination; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; @@ -72,17 +73,15 @@ public class TestLauncher implements ErrorHandler { protected String host = "127.0.0.1"; protected int port = 5672; - protected int session_count = 1; + protected int sessions_per_con = 1; protected int connection_count = 1; - protected long connection_idle_time = 5000; + protected long heartbeat = 5000; protected boolean sender = false; protected boolean receiver = false; + protected boolean useUniqueDests = false; protected String url; - protected String queue_name = "message_queue"; - protected String exchange_name = "amq.direct"; - protected String routing_key = "routing_key"; - protected boolean uniqueDests = false; + protected String address = "my_queue; {create: always}"; protected boolean durable = false; protected String failover = ""; protected AMQConnection controlCon; @@ -99,22 +98,18 @@ public class TestLauncher implements ErrorHandler testName = System.getProperty("test_name","UNKNOWN"); host = System.getProperty("host", "127.0.0.1"); port = Integer.getInteger("port", 5672); - session_count = Integer.getInteger("ssn_count", 1); + sessions_per_con = Integer.getInteger("ssn_per_con", 1); connection_count = Integer.getInteger("con_count", 1); - connection_idle_time = Long.getLong("con_idle_time", 5000); + heartbeat = Long.getLong("heartbeat", 5); sender = Boolean.getBoolean("sender"); receiver = Boolean.getBoolean("receiver"); + useUniqueDests = Boolean.getBoolean("use_unique_dests"); - queue_name = System.getProperty("queue_name", "message_queue"); - exchange_name = System.getProperty("exchange_name", "amq.direct"); - routing_key = System.getProperty("routing_key", "routing_key"); failover = System.getProperty("failover", ""); - uniqueDests = Boolean.getBoolean("unique_dests"); durable = Boolean.getBoolean("durable"); url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" - + host + ":" + port + "?idle_timeout=" + connection_idle_time - + "'"; + + host + ":" + port + "?heartbeat='" + heartbeat+ "''"; if (failover.equalsIgnoreCase("failover_exchange")) { @@ -149,12 +144,7 @@ public class TestLauncher implements ErrorHandler controlCon = new AMQConnection(url); controlCon.start(); - controlDest = new AMQQueue(new AMQShortString(""), - new AMQShortString("control"), - new AMQShortString("control"), - false, //exclusive - false, //auto-delete - false); // durable + controlDest = new AMQAnyDestination("control; {create: always}"); // durable // Create the session to setup the messages controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -184,17 +174,17 @@ public class TestLauncher implements ErrorHandler } } - public void start() + public void start(String addr) { try { + if (addr == null) + { + addr = address; + } - int ssn_per_con = session_count; - if (connection_count < session_count) - { - ssn_per_con = session_count/connection_count; - } - + int ssn_per_con = sessions_per_con; + String addrTemp = addr; for (int i = 0; i< connection_count; i++) { AMQConnection con = new AMQConnection(url); @@ -202,16 +192,20 @@ public class TestLauncher implements ErrorHandler clients.add(con); for (int j = 0; j< ssn_per_con; j++) { - String prefix = createPrefix(i,j); - Destination dest = createDest(prefix); + String index = createPrefix(i,j); + if (useUniqueDests) + { + addrTemp = modifySubject(index,addr); + } + if (sender) { - createSender(prefix,con,dest,this); + createSender(index,con,addrTemp,this); } if (receiver) { - createReceiver(prefix,con,dest,this); + createReceiver(index,con,addrTemp,this); } } } @@ -223,7 +217,7 @@ public class TestLauncher implements ErrorHandler } - protected void createReceiver(String index,final AMQConnection con, final Destination dest, final ErrorHandler h) + protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h) { Runnable r = new Runnable() { @@ -231,7 +225,7 @@ public class TestLauncher implements ErrorHandler { try { - Receiver rcv = new Receiver(con,dest); + Receiver rcv = new Receiver(con,addr); rcv.setErrorHandler(h); rcv.run(); } @@ -256,7 +250,7 @@ public class TestLauncher implements ErrorHandler t.start(); } - protected void createSender(String index,final AMQConnection con, final Destination dest, final ErrorHandler h) + protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h) { Runnable r = new Runnable() { @@ -264,7 +258,7 @@ public class TestLauncher implements ErrorHandler { try { - Sender sender = new Sender(con, dest); + Sender sender = new Sender(con, addr); sender.setErrorHandler(h); sender.run(); } @@ -289,7 +283,7 @@ public class TestLauncher implements ErrorHandler t.start(); } - public void handleError(String msg,Exception e) + public synchronized void handleError(String msg,Exception e) { // In case sending the message fails StringBuilder sb = new StringBuilder(); @@ -308,11 +302,13 @@ public class TestLauncher implements ErrorHandler errorMsg.setStringProperty("desc", msg); errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis()))); errorMsg.setStringProperty("exception-trace", serializeStackTrace(e)); - synchronized (this) - { - statusSender.send(errorMsg); - } - } catch (JMSException e1) { + + System.out.println("Msg " + errorMsg); + + statusSender.send(errorMsg); + } + catch (JMSException e1) + { e1.printStackTrace(); } } @@ -332,50 +328,38 @@ public class TestLauncher implements ErrorHandler } /** - * The following are supported. - * - * 1. A producer/consumer pair on a topic or a queue - * 2. A single producer with multiple consumers on topic/queue - * - * Multiple consumers on a topic will result in a private queue - * for each consumers. - * - * We want to avoid multiple producers on the same topic/queue - * as the queues will fill up in no time. + * A basic helper function to modify the subjects by + * appending an index. */ - private Destination createDest(String prefix) + private String modifySubject(String index,String addr) { - Destination dest = null; - if (exchange_name.equals("amq.topic")) + if (addr.indexOf("/") > 0) { - dest = new AMQTopic( - new AMQShortString(exchange_name), - new AMQShortString(uniqueDests ? prefix + routing_key : - routing_key), - false, //auto-delete - null, //queue name - durable); + addr = addr.substring(0,addr.indexOf("/")+1) + + index + + addr.substring(addr.indexOf("/")+1,addr.length()); + } + else if (addr.indexOf(";") > 0) + { + addr = addr.substring(0,addr.indexOf(";")) + + "/" + index + + addr.substring(addr.indexOf(";"),addr.length()); } else { - dest = new AMQQueue( - new AMQShortString(exchange_name), - new AMQShortString(uniqueDests ? prefix + routing_key : - routing_key), - new AMQShortString(uniqueDests ? prefix + queue_name : - queue_name), - false, //exclusive - false, //auto-delete - durable); + addr = addr + "/" + index; } - return dest; + + return addr; } public static void main(String[] args) { final TestLauncher test = new TestLauncher(); test.setUpControlChannel(); - test.start(); + System.out.println("args.length " + args.length); + System.out.println("args [0] " + args [0]); + test.start(args.length > 0 ? args [0] : null); Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { test.cleanup(); } }); -- cgit v1.2.1 From 8c32d038ac6790d437f075285ae811e21d5149d3 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Thu, 30 Sep 2010 01:55:15 +0000 Subject: A connection listener is set to all connections, to catch any connection level exceptions and report them via the error handler. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1002927 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/testkit/Client.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java index 34818fcbea..b10129d855 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java @@ -28,9 +28,11 @@ import java.text.SimpleDateFormat; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; import javax.jms.Session; -public abstract class Client +public abstract class Client implements ExceptionListener { private Connection con; private Session ssn; @@ -50,7 +52,8 @@ public abstract class Client public Client(Connection con) throws Exception { - this.con = con; + this.con = con; + this.con.setExceptionListener(this); durable = Boolean.getBoolean("durable"); transacted = Boolean.getBoolean("transacted"); txSize = Integer.getInteger("tx_size",10); @@ -70,6 +73,11 @@ public abstract class Client } } + public void onException(JMSException e) + { + handleError("Connection error",e); + } + public void setErrorHandler(ErrorHandler h) { this.errorHandler = h; -- cgit v1.2.1 From 5fc3df4477567b000c6f3639be6dea3cf6e9c74a Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 6 Oct 2010 13:22:05 +0000 Subject: The receiver was only counting messages if dups check was on. Therefore added logic to count messages even when we are not checking for dups. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1005024 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java index b6b1bd29a0..b4294ee4cc 100644 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java +++ b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java @@ -165,6 +165,11 @@ public class Receiver extends Client implements MessageListener + sequence + ",received=" + seq + ")" ); } } + else + { + msg_count ++; + } + // Please note that this test case doesn't expect duplicates // When testing for transactions. if (isTransacted() && msg_count % getTxSize() == 0) -- cgit v1.2.1 From 8e77acf02345b5c16ba14c3de1b9bce4444e600d Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Mon, 8 Nov 2010 21:35:19 +0000 Subject: This is related to rev 1032640 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1032733 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/testkit/Client.java | 154 --------- .../java/org/apache/qpid/testkit/ErrorHandler.java | 27 -- .../java/org/apache/qpid/testkit/Receiver.java | 216 ------------ .../main/java/org/apache/qpid/testkit/Sender.java | 197 ----------- .../java/org/apache/qpid/testkit/TestLauncher.java | 368 --------------------- 5 files changed, 962 deletions(-) delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java delete mode 100644 qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java (limited to 'qpid/java/testkit/src') diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java deleted file mode 100644 index b10129d855..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Client.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.text.DateFormat; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Session; - -public abstract class Client implements ExceptionListener -{ - private Connection con; - private Session ssn; - private boolean durable = false; - private boolean transacted = false; - private int txSize = 10; - private int ack_mode = Session.AUTO_ACKNOWLEDGE; - private String contentType = "application/octet-stream"; - - private long reportFrequency = 60000; // every min - - private DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - private NumberFormat nf = new DecimalFormat("##.00"); - - private long startTime = System.currentTimeMillis(); - private ErrorHandler errorHandler = null; - - public Client(Connection con) throws Exception - { - this.con = con; - this.con.setExceptionListener(this); - durable = Boolean.getBoolean("durable"); - transacted = Boolean.getBoolean("transacted"); - txSize = Integer.getInteger("tx_size",10); - contentType = System.getProperty("content_type","application/octet-stream"); - reportFrequency = Long.getLong("report_frequency", 60000); - } - - public void close() - { - try - { - con.close(); - } - catch (Exception e) - { - handleError("Error closing connection",e); - } - } - - public void onException(JMSException e) - { - handleError("Connection error",e); - } - - public void setErrorHandler(ErrorHandler h) - { - this.errorHandler = h; - } - - public void handleError(String msg,Exception e) - { - if (errorHandler != null) - { - errorHandler.handleError(msg, e); - } - else - { - System.err.println(msg); - e.printStackTrace(); - } - } - - protected Session getSsn() - { - return ssn; - } - - protected void setSsn(Session ssn) - { - this.ssn = ssn; - } - - protected boolean isDurable() - { - return durable; - } - - protected boolean isTransacted() - { - return transacted; - } - - protected int getTxSize() - { - return txSize; - } - - protected int getAck_mode() - { - return ack_mode; - } - - protected String getContentType() - { - return contentType; - } - - protected long getReportFrequency() - { - return reportFrequency; - } - - protected long getStartTime() - { - return startTime; - } - - protected void setStartTime(long startTime) - { - this.startTime = startTime; - } - - public DateFormat getDf() - { - return df; - } - -} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java deleted file mode 100644 index dbc73c404f..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/ErrorHandler.java +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.qpid.testkit; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -public interface ErrorHandler { - - public void handleError(String msg,Exception e); -} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java deleted file mode 100644 index b4294ee4cc..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Receiver.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.util.ArrayList; -import java.util.List; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; - -/** - * A generic receiver which consumes messages - * from a given address in a broker (host/port) - * until told to stop by killing it. - * - * It participates in a feedback loop to ensure the producer - * doesn't fill up the queue. If it receives an "End" msg - * it sends a reply to the replyTo address in that msg. - * - * It doesn't check for correctness or measure anything - * leaving those concerns to another entity. - * However it prints a timestamp every x secs(-Dreport_frequency) - * as checkpoint to figure out how far the test has progressed if - * a failure occurred. - * - * It also takes in an optional Error handler to - * pass out any error in addition to writing them to std err. - * - * This is intended more as building block to create - * more complex test cases. However there is a main method - * provided to use this standalone. - * - * The following options are available and configurable - * via jvm args. - * - * sync_rcv - Whether to consume sync (instead of using a listener). - * report_frequency - how often a timestamp is printed - * durable - * transacted - * tx_size - size of transaction batch in # msgs. * - * check_for_dups - check for duplicate messages and out of order messages. - * jms_durable_sub - create a durable subscription instead of a regular subscription. - */ -public class Receiver extends Client implements MessageListener -{ - long msg_count = 0; - int sequence = 0; - boolean syncRcv = Boolean.getBoolean("sync_rcv"); - boolean jmsDurableSub = Boolean.getBoolean("jms_durable_sub"); - boolean checkForDups = Boolean.getBoolean("check_for_dups"); - MessageConsumer consumer; - List duplicateMessages = new ArrayList(); - - public Receiver(Connection con,String addr) throws Exception - { - super(con); - setSsn(con.createSession(isTransacted(), getAck_mode())); - consumer = getSsn().createConsumer(new AMQAnyDestination(addr)); - if (!syncRcv) - { - consumer.setMessageListener(this); - } - - System.out.println("Receiving messages from : " + addr); - } - - public void onMessage(Message msg) - { - handleMessage(msg); - } - - public void run() throws Exception - { - long sleepTime = getReportFrequency(); - while(true) - { - if(syncRcv) - { - long t = sleepTime; - while (t > 0) - { - long start = System.currentTimeMillis(); - Message msg = consumer.receive(t); - t = t - (System.currentTimeMillis() - start); - handleMessage(msg); - } - } - Thread.sleep(sleepTime); - System.out.println(getDf().format(System.currentTimeMillis()) - + " - messages received : " + msg_count); - } - } - - private void handleMessage(Message m) - { - if (m == null) { return; } - - try - { - if (m instanceof TextMessage && ((TextMessage) m).getText().equals("End")) - { - MessageProducer temp = getSsn().createProducer(m.getJMSReplyTo()); - Message controlMsg = getSsn().createTextMessage(); - temp.send(controlMsg); - if (isTransacted()) - { - getSsn().commit(); - } - temp.close(); - } - else - { - - int seq = m.getIntProperty("sequence"); - if (checkForDups) - { - if (seq == 0) - { - sequence = 0; // wrap around for each iteration - System.out.println("Received " + duplicateMessages.size() + " duplicate messages during the iteration"); - duplicateMessages.clear(); - } - - if (seq < sequence) - { - duplicateMessages.add(seq); - } - else if (seq == sequence) - { - sequence++; - msg_count ++; - } - else - { - // Multiple publishers are not allowed in this test case. - // So out of order messages are not allowed. - throw new Exception(": Received an out of order message (expected=" - + sequence + ",received=" + seq + ")" ); - } - } - else - { - msg_count ++; - } - - // Please note that this test case doesn't expect duplicates - // When testing for transactions. - if (isTransacted() && msg_count % getTxSize() == 0) - { - getSsn().commit(); - } - } - } - catch (Exception e) - { - e.printStackTrace(); - handleError("Exception receiving messages",e); - } - } - - // Receiver host port address - public static void main(String[] args) throws Exception - { - String host = "127.0.0.1"; - int port = 5672; - String addr = "message_queue"; - - if (args.length > 0) - { - host = args[0]; - } - if (args.length > 1) - { - port = Integer.parseInt(args[1]); - } - if (args.length > 2) - { - addr = args[2]; - } - - AMQConnection con = new AMQConnection( - "amqp://username:password@topicClientid/test?brokerlist='tcp://" - + host + ":" + port + "'"); - - Receiver rcv = new Receiver(con,addr); - rcv.run(); - } - -} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java deleted file mode 100644 index 14b9b7302f..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/Sender.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.text.DateFormat; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.Random; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.tools.MessageFactory; - -/** - * A generic sender which sends a stream of messages - * to a given address in a broker (host/port) - * until told to stop by killing it. - * - * It has a feedback loop to ensure it doesn't fill - * up queues due to a slow consumer. - * - * It doesn't check for correctness or measure anything - * leaving those concerns to another entity. - * However it prints a timestamp every x secs(-Dreport_frequency) - * as checkpoint to figure out how far the test has progressed if - * a failure occurred. - * - * It also takes in an optional Error handler to - * pass out any error in addition to writing them to std err. - * - * This is intended more as building block to create - * more complex test cases. However there is a main method - * provided to use this standalone. - * - * The following options are available and configurable - * via jvm args. - * - * msg_size (256) - * msg_count (10) - # messages before waiting for feedback - * sleep_time (1000 ms) - sleep time btw each iteration - * report_frequency - how often a timestamp is printed - * durable - * transacted - * tx_size - size of transaction batch in # msgs. - */ -public class Sender extends Client -{ - protected int msg_size = 256; - protected int msg_count = 10; - protected int iterations = -1; - protected long sleep_time = 1000; - - protected Destination dest = null; - protected Destination replyTo = null; - protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - protected NumberFormat nf = new DecimalFormat("##.00"); - - protected MessageProducer producer; - Random gen = new Random(19770905); - - public Sender(Connection con,String addr) throws Exception - { - super(con); - this.msg_size = Integer.getInteger("msg_size", 100); - this.msg_count = Integer.getInteger("msg_count", 10); - this.iterations = Integer.getInteger("iterations", -1); - this.sleep_time = Long.getLong("sleep_time", 1000); - this.setSsn(con.createSession(isTransacted(),Session.AUTO_ACKNOWLEDGE)); - this.dest = new AMQAnyDestination(addr); - this.producer = getSsn().createProducer(dest); - this.replyTo = getSsn().createTemporaryQueue(); - - System.out.println("Sending messages to : " + addr); - } - - /* - * If msg_size not specified it generates a message - * between 500-1500 bytes. - */ - protected Message getNextMessage() throws Exception - { - int s = msg_size == -1 ? 500 + gen.nextInt(1000) : msg_size; - Message msg = (getContentType().equals("text/plain")) ? - MessageFactory.createTextMessage(getSsn(), s): - MessageFactory.createBytesMessage(getSsn(), s); - - msg.setJMSDeliveryMode((isDurable()) ? DeliveryMode.PERSISTENT - : DeliveryMode.NON_PERSISTENT); - return msg; - } - - public void run() - { - try - { - boolean infinite = (iterations == -1); - for (int x=0; infinite || x < iterations; x++) - { - long now = System.currentTimeMillis(); - if (now - getStartTime() >= getReportFrequency()) - { - System.out.println(df.format(now) + " - iterations : " + x); - setStartTime(now); - } - - for (int i = 0; i < msg_count; i++) - { - Message msg = getNextMessage(); - msg.setIntProperty("sequence",i); - producer.send(msg); - if (isTransacted() && msg_count % getTxSize() == 0) - { - getSsn().commit(); - } - } - TextMessage m = getSsn().createTextMessage("End"); - m.setJMSReplyTo(replyTo); - producer.send(m); - - if (isTransacted()) - { - getSsn().commit(); - } - - MessageConsumer feedbackConsumer = getSsn().createConsumer(replyTo); - feedbackConsumer.receive(); - feedbackConsumer.close(); - if (isTransacted()) - { - getSsn().commit(); - } - Thread.sleep(sleep_time); - } - } - catch (Exception e) - { - handleError("Exception sending messages",e); - } - } - - // Receiver host port address - public static void main(String[] args) throws Exception - { - String host = "127.0.0.1"; - int port = 5672; - String addr = "message_queue"; - - if (args.length > 0) - { - host = args[0]; - } - if (args.length > 1) - { - port = Integer.parseInt(args[1]); - } - if (args.length > 2) - { - addr = args[2]; - } - - AMQConnection con = new AMQConnection( - "amqp://username:password@topicClientid/test?brokerlist='tcp://" - + host + ":" + port + "'"); - - Sender sender = new Sender(con,addr); - sender.run(); - } -} diff --git a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java b/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java deleted file mode 100644 index 560ada244d..0000000000 --- a/qpid/java/testkit/src/main/java/org/apache/qpid/testkit/TestLauncher.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.testkit; - - -import java.io.ByteArrayOutputStream; -import java.io.PrintStream; -import java.text.DateFormat; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.log4j.PatternLayout; -import org.apache.qpid.client.AMQAnyDestination; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.thread.Threading; - -/** - * A basic test case class that could launch a Sender/Receiver - * or both, each on it's own separate thread. - * - * If con_count == ssn_count, then each entity created will have - * it's own Connection. Else if con_count < ssn_count, then - * a connection will be shared by ssn_count/con_count # of entities. - * - * The if both sender and receiver options are set, it will - * share a connection. - * - * The following options are available as jvm args - * host, port - * con_count,ssn_count - * con_idle_time - which determines heartbeat - * sender, receiver - booleans which indicate which entity to create. - * Setting them both is also a valid option. - */ -public class TestLauncher implements ErrorHandler -{ - protected String host = "127.0.0.1"; - protected int port = 5672; - protected int sessions_per_con = 1; - protected int connection_count = 1; - protected long heartbeat = 5000; - protected boolean sender = false; - protected boolean receiver = false; - protected boolean useUniqueDests = false; - protected String url; - - protected String address = "my_queue; {create: always}"; - protected boolean durable = false; - protected String failover = ""; - protected AMQConnection controlCon; - protected Destination controlDest = null; - protected Session controlSession = null; - protected MessageProducer statusSender; - protected List clients = new ArrayList(); - protected DateFormat df = new SimpleDateFormat("yyyy.MM.dd 'at' HH:mm:ss"); - protected NumberFormat nf = new DecimalFormat("##.00"); - protected String testName; - - public TestLauncher() - { - testName = System.getProperty("test_name","UNKNOWN"); - host = System.getProperty("host", "127.0.0.1"); - port = Integer.getInteger("port", 5672); - sessions_per_con = Integer.getInteger("ssn_per_con", 1); - connection_count = Integer.getInteger("con_count", 1); - heartbeat = Long.getLong("heartbeat", 5); - sender = Boolean.getBoolean("sender"); - receiver = Boolean.getBoolean("receiver"); - useUniqueDests = Boolean.getBoolean("use_unique_dests"); - - failover = System.getProperty("failover", ""); - durable = Boolean.getBoolean("durable"); - - url = "amqp://username:password@topicClientid/test?brokerlist='tcp://" - + host + ":" + port + "?heartbeat='" + heartbeat+ "''"; - - if (failover.equalsIgnoreCase("failover_exchange")) - { - url += "&failover='failover_exchange'"; - - System.out.println("Failover exchange " + url ); - } - - configureLogging(); - } - - protected void configureLogging() - { - PatternLayout layout = new PatternLayout(); - layout.setConversionPattern("%t %d %p [%c{4}] %m%n"); - BasicConfigurator.configure(new ConsoleAppender(layout)); - - String logLevel = System.getProperty("log.level","warn"); - String logComponent = System.getProperty("log.comp","org.apache.qpid"); - - Logger logger = Logger.getLogger(logComponent); - logger.setLevel(Level.toLevel(logLevel, Level.WARN)); - - System.out.println("Level " + logger.getLevel()); - - } - - public void setUpControlChannel() - { - try - { - controlCon = new AMQConnection(url); - controlCon.start(); - - controlDest = new AMQAnyDestination("control; {create: always}"); // durable - - // Create the session to setup the messages - controlSession = controlCon.createSession(false, Session.AUTO_ACKNOWLEDGE); - statusSender = controlSession.createProducer(controlDest); - - } - catch (Exception e) - { - handleError("Error while setting up the test",e); - } - } - - public void cleanup() - { - try - { - controlSession.close(); - controlCon.close(); - for (AMQConnection con : clients) - { - con.close(); - } - } - catch (Exception e) - { - handleError("Error while tearing down the test",e); - } - } - - public void start(String addr) - { - try - { - if (addr == null) - { - addr = address; - } - - int ssn_per_con = sessions_per_con; - String addrTemp = addr; - for (int i = 0; i< connection_count; i++) - { - AMQConnection con = new AMQConnection(url); - con.start(); - clients.add(con); - for (int j = 0; j< ssn_per_con; j++) - { - String index = createPrefix(i,j); - if (useUniqueDests) - { - addrTemp = modifySubject(index,addr); - } - - if (sender) - { - createSender(index,con,addrTemp,this); - } - - if (receiver) - { - createReceiver(index,con,addrTemp,this); - } - } - } - } - catch (Exception e) - { - handleError("Exception while setting up the test",e); - } - - } - - protected void createReceiver(String index,final AMQConnection con, final String addr, final ErrorHandler h) - { - Runnable r = new Runnable() - { - public void run() - { - try - { - Receiver rcv = new Receiver(con,addr); - rcv.setErrorHandler(h); - rcv.run(); - } - catch (Exception e) - { - h.handleError("Error Starting Receiver", e); - } - } - }; - - Thread t = null; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - handleError("Error creating Receive thread",e); - } - - t.setName("ReceiverThread-" + index); - t.start(); - } - - protected void createSender(String index,final AMQConnection con, final String addr, final ErrorHandler h) - { - Runnable r = new Runnable() - { - public void run() - { - try - { - Sender sender = new Sender(con, addr); - sender.setErrorHandler(h); - sender.run(); - } - catch (Exception e) - { - h.handleError("Error Starting Sender", e); - } - } - }; - - Thread t = null; - try - { - t = Threading.getThreadFactory().createThread(r); - } - catch(Exception e) - { - handleError("Error creating Sender thread",e); - } - - t.setName("SenderThread-" + index); - t.start(); - } - - public synchronized void handleError(String msg,Exception e) - { - // In case sending the message fails - StringBuilder sb = new StringBuilder(); - sb.append(msg); - sb.append(" @ "); - sb.append(df.format(new Date(System.currentTimeMillis()))); - sb.append(" "); - sb.append(e.getMessage()); - System.err.println(sb.toString()); - e.printStackTrace(); - - try - { - TextMessage errorMsg = controlSession.createTextMessage(); - errorMsg.setStringProperty("status", "error"); - errorMsg.setStringProperty("desc", msg); - errorMsg.setStringProperty("time", df.format(new Date(System.currentTimeMillis()))); - errorMsg.setStringProperty("exception-trace", serializeStackTrace(e)); - - System.out.println("Msg " + errorMsg); - - statusSender.send(errorMsg); - } - catch (JMSException e1) - { - e1.printStackTrace(); - } - } - - private String serializeStackTrace(Exception e) - { - ByteArrayOutputStream bOut = new ByteArrayOutputStream(); - PrintStream printStream = new PrintStream(bOut); - e.printStackTrace(printStream); - printStream.close(); - return bOut.toString(); - } - - private String createPrefix(int i, int j) - { - return String.valueOf(i).concat(String.valueOf(j)); - } - - /** - * A basic helper function to modify the subjects by - * appending an index. - */ - private String modifySubject(String index,String addr) - { - if (addr.indexOf("/") > 0) - { - addr = addr.substring(0,addr.indexOf("/")+1) + - index + - addr.substring(addr.indexOf("/")+1,addr.length()); - } - else if (addr.indexOf(";") > 0) - { - addr = addr.substring(0,addr.indexOf(";")) + - "/" + index + - addr.substring(addr.indexOf(";"),addr.length()); - } - else - { - addr = addr + "/" + index; - } - - return addr; - } - - public static void main(String[] args) - { - final TestLauncher test = new TestLauncher(); - test.setUpControlChannel(); - System.out.println("args.length " + args.length); - System.out.println("args [0] " + args [0]); - test.start(args.length > 0 ? args [0] : null); - Runtime.getRuntime().addShutdownHook(new Thread() { - public void run() { test.cleanup(); } - }); - - } -} -- cgit v1.2.1