From d947dab3de62830cbfb41dc989ec7a5ff6af8f55 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Wed, 23 Apr 2008 23:50:34 +0000 Subject: Delete stuff that's just going to get synced from M2.x git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@651111 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/client/latency/MessageConsumer.java | 129 -- .../qpid/client/latency/MessageProducer.java | 116 -- .../qpid/client/message/TestMessageFactory.java | 109 -- .../apache/qpid/client/perf/ConnectionUtility.java | 50 - .../qpid/client/perf/MessageConsumerTest.java | 171 -- .../qpid/client/perf/MessageProducerTest.java | 182 --- .../java/org/apache/qpid/client/perf/Options.java | 105 -- .../java/org/apache/qpid/client/topic/Client.java | 210 --- .../java/org/apache/qpid/client/topic/Server.java | 171 -- .../org/apache/qpid/client/topic/topic.properties | 24 - .../org/apache/qpid/ping/PingAsyncTestPerf.java | 292 ---- .../main/java/org/apache/qpid/ping/PingClient.java | 107 -- .../org/apache/qpid/ping/PingDurableClient.java | 451 ----- .../org/apache/qpid/ping/PingLatencyTestPerf.java | 314 ---- .../org/apache/qpid/ping/PingSendOnlyClient.java | 92 -- .../java/org/apache/qpid/ping/PingTestPerf.java | 196 --- .../qpid/requestreply/InitialContextHelper.java | 55 - .../apache/qpid/requestreply/PingPongBouncer.java | 392 ----- .../apache/qpid/requestreply/PingPongProducer.java | 1717 -------------------- .../apache/qpid/requestreply/PingPongTestPerf.java | 251 --- java/perftests/src/main/java/perftests.log4j | 51 - java/perftests/src/main/java/perftests.properties | 46 - 22 files changed, 5231 deletions(-) delete mode 100644 java/perftests/src/main/java/org/apache/qpid/client/latency/MessageConsumer.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/client/latency/MessageProducer.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/InitialContextHelper.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java delete mode 100644 java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java delete mode 100644 java/perftests/src/main/java/perftests.log4j delete mode 100644 java/perftests/src/main/java/perftests.properties (limited to 'java/perftests/src/main') diff --git a/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageConsumer.java b/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageConsumer.java deleted file mode 100644 index 80f965eae5..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageConsumer.java +++ /dev/null @@ -1,129 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.client.latency; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.perf.Options; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.requestreply.InitialContextHelper; - -import javax.jms.*; -import javax.naming.Context; - -/** - * - * - */ -public class MessageConsumer extends Options implements MessageListener -{ - private javax.jms.MessageProducer _producer; - private AMQConnection _connection; - private final Object _lock = new Object(); - private Session _session; - private int _receivedMessages = 0; - private long _timeFirstMessage; - private long _timeLastMessage; - private void init() - { - this.parseOptions(); - try - { - Context context = InitialContextHelper.getInitialContext(""); - ConnectionFactory factory = (ConnectionFactory) context.lookup("local"); - _connection = (AMQConnection) factory.createConnection("guest","guest"); - _session = _connection.createSession(_transacted, Session.AUTO_ACKNOWLEDGE); - Destination dest = Boolean.getBoolean("useQueue")? (Destination) context.lookup("testQueue") : - (Destination) context.lookup("testTopic"); - Destination syncQueue = (Destination) context.lookup("syncQueue"); - _producer = _session.createProducer(syncQueue); - // this should speedup the message producer - _producer.setDisableMessageTimestamp(true); - javax.jms.MessageConsumer consumer = _session.createConsumer(dest); - consumer.setMessageListener(this); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - - private void run() - { - try - { - synchronized(_lock) - { - _connection.start(); - try - { - _lock.wait(); - } - catch (InterruptedException e) - { - e.printStackTrace(); - } - } - // send sync message; - _producer.send(_session.createMessage()); - System.out.println("Time to receive " + _logFrequency + " messages is: " + (_timeLastMessage - _timeFirstMessage) ); - double rate = _logFrequency / ((_timeLastMessage - _timeFirstMessage) *1.0) *1000 ; - System.out.println("The rate is " + rate + " msg/s" ); - double latency = ((_timeLastMessage - _timeFirstMessage) *1.0) / _logFrequency; - System.out.println("The latency is " + latency + " milli secs" ); - _connection.close(); - } - catch (JMSException e) - { - e.printStackTrace(); - } - } - - public void onMessage(Message message) - { - if( _receivedMessages == 0) - { - _timeFirstMessage = System.currentTimeMillis(); - } - _receivedMessages++; - if( _receivedMessages == _logFrequency) - { - _timeLastMessage = System.currentTimeMillis(); - synchronized(_lock) - { - _lock.notify(); - } - } - } - - public static void main(String[] args) - { - try - { - MessageConsumer test = new MessageConsumer(); - test.init(); - test.run(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - -} \ No newline at end of file diff --git a/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageProducer.java b/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageProducer.java deleted file mode 100644 index c084a55bcc..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/client/latency/MessageProducer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.client.latency; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.message.TestMessageFactory; -import org.apache.qpid.client.perf.Options; -import org.apache.qpid.requestreply.InitialContextHelper; -import org.apache.qpidity.transport.network.nio.NioSender; - -import javax.jms.*; -import javax.naming.Context; - -/** - * - * - */ -public class MessageProducer extends Options -{ - private BytesMessage _payload; - private javax.jms.MessageProducer _producer; - private javax.jms.MessageConsumer _consumer; - private AMQConnection _connection; - private void init() - { - this.parseOptions(); - try - { - Context context = InitialContextHelper.getInitialContext(""); - ConnectionFactory factory = (ConnectionFactory) context.lookup("local"); - _connection = (AMQConnection) factory.createConnection("guest","guest"); - Destination dest = Boolean.getBoolean("useQueue")? (Destination) context.lookup("testQueue") : - (Destination) context.lookup("testTopic"); - Destination syncQueue = (Destination) context.lookup("syncQueue"); - _connection.start(); - Session session = _connection.createSession(_transacted, Session.AUTO_ACKNOWLEDGE); - _payload = TestMessageFactory.newBytesMessage(session, _messageSize); - _producer = session.createProducer(dest); - _consumer = session.createConsumer(syncQueue); - // this should speedup the message producer - _producer.setDisableMessageTimestamp(true); - System.out.println("Init end" ); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - - private void run() - { - try - { - System.out.println("Sending " + _logFrequency + " messages"); - - // NioSender.setStartBatching(); - long startTime = System.currentTimeMillis(); - for(int i =0; i < _logFrequency; i++ ) - { - _producer.send(_payload, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, 0); - } - long endProducing = System.currentTimeMillis(); - double throughput = (_logFrequency * 1000.0) / (endProducing - startTime); - System.out.println("The producer throughput is: " + throughput + " msg/s"); - - // startTime = System.currentTimeMillis(); - // NioSender.purge(); - // endProducing = System.currentTimeMillis(); - // throughput = (_logFrequency * 1000.0) / (endProducing - startTime); - // System.out.println("The NIO throughput is: " + throughput + " msg/s"); - - - // now wait for the sync message - _consumer.receive(); - // this is done - long endTime = System.currentTimeMillis(); - System.out.println("Time to send and receive " + _logFrequency + " messages is: " + (endTime - startTime) ); - double latency = ( (endTime - startTime) * 1.0) /_logFrequency; - System.out.println("The latency is " + latency + " milli secs" ); - _connection.close(); - } - catch (JMSException e) - { - e.printStackTrace(); - } - } - - public static void main(String[] args) - { - try - { - MessageProducer test = new MessageProducer(); - test.init(); - test.run(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java deleted file mode 100644 index 4d038db0a8..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.message; - -import javax.jms.JMSException; -import javax.jms.Session; -import javax.jms.ObjectMessage; -import javax.jms.StreamMessage; -import javax.jms.BytesMessage; -import javax.jms.TextMessage; -import javax.jms.DeliveryMode; -import javax.jms.Destination; - -public class TestMessageFactory -{ - private static final String MESSAGE_DATA_BYTES = "-message payload-message paylaod-message payload-message paylaod"; - - public static TextMessage newTextMessage(Session session, int size) throws JMSException - { - return session.createTextMessage(createMessagePayload(size)); - } - - public static BytesMessage newBytesMessage(Session session, int size) throws JMSException - { - BytesMessage message = session.createBytesMessage(); - message.writeUTF(createMessagePayload(size)); - return message; - } - - public static StreamMessage newStreamMessage(Session session, int size) throws JMSException - { - StreamMessage message = session.createStreamMessage(); - message.writeString(createMessagePayload(size)); - return message; - } - - public static ObjectMessage newObjectMessage(Session session, int size) throws JMSException - { - if (size == 0) - { - return session.createObjectMessage(); - } - else - { - return session.createObjectMessage(createMessagePayload(size)); - } - } - - /** - * Creates an ObjectMessage with given size and sets the JMS properties - * (JMSReplyTo and DeliveryMode) - * @param session - * @param replyDestination - * @param size - * @param persistent - * @return the new ObjectMessage - * @throws JMSException - */ - public static ObjectMessage newObjectMessage(Session session, Destination replyDestination, int size, boolean persistent) throws JMSException - { - ObjectMessage msg = newObjectMessage(session, size); - - // Set the messages persistent delivery flag. - msg.setJMSDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - // Ensure that the temporary reply queue is set as the reply to destination for the message. - if (replyDestination != null) - { - msg.setJMSReplyTo(replyDestination); - } - - return msg; - } - - public static String createMessagePayload(int size) - { - StringBuffer buf = new StringBuffer(size); - int count = 0; - while (count <= (size - MESSAGE_DATA_BYTES.length())) - { - buf.append(MESSAGE_DATA_BYTES); - count += MESSAGE_DATA_BYTES.length(); - } - if (count < size) - { - buf.append(MESSAGE_DATA_BYTES, 0, size - count); - } - - return buf.toString(); - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java deleted file mode 100644 index 133ef5f854..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/client/perf/ConnectionUtility.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.apache.qpid.client.perf; - -import javax.naming.InitialContext; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQConnectionFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConnectionUtility -{ - private static final Logger _logger = LoggerFactory.getLogger(ConnectionUtility.class); - - private InitialContext _initialContext; - private AMQConnectionFactory _connectionFactory; - - private static ConnectionUtility _instance = new ConnectionUtility(); - - public static ConnectionUtility getInstance() - { - return _instance; - } - - private InitialContext getInitialContext() throws Exception - { - _logger.info("get InitialContext"); - if (_initialContext == null) - { - _initialContext = new InitialContext(); - } - return _initialContext; - } - - private AMQConnectionFactory getConnectionFactory() throws Exception - { - _logger.info("get ConnectionFactory"); - if (_connectionFactory == null) - { - _connectionFactory = (AMQConnectionFactory) getInitialContext().lookup("local"); - } - return _connectionFactory; - } - - public AMQConnection getConnection() throws Exception - { - _logger.info("get Connection"); - return (AMQConnection)getConnectionFactory().createConnection(); - } - -} diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java deleted file mode 100644 index 1124e070a1..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java +++ /dev/null @@ -1,171 +0,0 @@ -package org.apache.qpid.client.perf; - -import java.io.FileWriter; -import java.io.IOException; -import java.sql.Date; -import java.text.SimpleDateFormat; - -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.Session; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MessageConsumerTest extends Options implements MessageListener -{ - private static final Logger _logger = LoggerFactory.getLogger(MessageConsumerTest.class); - private SimpleDateFormat df = new SimpleDateFormat("h:mm a"); - - String _logFileName; - long _startTime; - long _intervalStartTime; - long _totalMsgCount; - long _intervalCount; - - private AMQConnection _connection; - private Session _session; - - public void init() throws Exception - { - this.parseOptions(); - _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis(); - _connection = ConnectionUtility.getInstance().getConnection(); - _connection.start(); - Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(_connection,_destination) : new AMQTopic(_connection,_destination); - _session = _connection.createSession(_transacted, Session.AUTO_ACKNOWLEDGE); - MessageConsumer _consumer = _session.createConsumer(dest); - _consumer.setMessageListener(this); - _startTime = System.currentTimeMillis(); - if(Boolean.getBoolean("collect_stats")) - { - printHeading(); - runReaper(); - } - } - - public void onMessage(Message message) - { - try - { - /* long msgId = Integer.parseInt(message.getJMSMessageID()); - if (_verifyOrder && _totalMsgCount+1 != msgId) - { - _logger.error("Error : Message received out of order in JMSSyncConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1); - }*/ - _totalMsgCount ++; - _intervalCount++; - if(_intervalCount >= _logFrequency) - { - _intervalCount = 0; - if (Boolean.getBoolean("collect_stats")) - { - runReaper(); - } - if (System.currentTimeMillis() - _startTime >= _expiry) - { - printSummary(); - _session.close(); - _connection.stop(); - } - } - } - catch(Exception e) - { - e.printStackTrace(); - } - } - - public void runReaper() - { - try - { - FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true); - StringBuffer buf = new StringBuffer(); - Date d = new Date(System.currentTimeMillis()); - long currentTime = d.getTime(); - long intervalTime = currentTime - _intervalStartTime; - long totalTime = currentTime - _startTime; - buf.append(df.format(d)).append(","); - buf.append(d.getTime()).append(","); - buf.append(" total Msg Count: ").append(_totalMsgCount).append(","); - if(totalTime > 0 ) - buf.append(" rate: ").append(_totalMsgCount * 1000 / totalTime); - buf.append(","); - buf.append(" interval Count: ").append(_intervalCount).append(","); - if(intervalTime > 0 ) - buf.append(" interval rate: ").append(_intervalCount * 1000 / intervalTime).append(","); - buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n"); - buf.append("\n"); - _memoryLog.write(buf.toString()); - _memoryLog.close(); - System.out.println(buf); - _intervalStartTime = d.getTime(); - } - catch (Exception e) - { - _logger.error("Error printing info to the log file", e); - } - } - - private void printHeading() - { - try - { - FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true); - String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),memory"; - _memoryLog.write(s); - _memoryLog.close(); - } - catch (IOException e) - { - e.printStackTrace(); - } - } - - private void printSummary() - { - try - { - - long current = System.currentTimeMillis(); - double time = current - _startTime; - double ratio = _totalMsgCount*1000/time; - FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true); - - StringBuffer buf = new StringBuffer("MessageProducerTest \n Test started at : "); - buf.append(df.format(new Date(_startTime))).append("\n Test finished at : "); - Date d = new Date(current); - buf.append(df.format(d)).append("\n Total Time taken (ms):"); - buf.append(time).append("\n Total messages sent:"); - buf.append(_totalMsgCount).append("\n producer rate:"); - buf.append(ratio).append("\n"); - _summaryLog.write(buf.toString()); - System.out.println("---------- Test Ended -------------"); - _summaryLog.close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - - public static void main(String[] args) - { - try - { - MessageConsumerTest test = new MessageConsumerTest(); - test.init(); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - -} diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java deleted file mode 100644 index 4398b28131..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java +++ /dev/null @@ -1,182 +0,0 @@ -package org.apache.qpid.client.perf; - -import java.io.FileWriter; -import java.io.IOException; -import java.sql.Date; -import java.text.SimpleDateFormat; - -import javax.jms.BytesMessage; -import javax.jms.Destination; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.AMQQueue; -import org.apache.qpid.client.AMQTopic; -import org.apache.qpid.client.message.TestMessageFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MessageProducerTest extends Options -{ - private static final Logger _logger = LoggerFactory.getLogger(MessageProducerTest.class); - private SimpleDateFormat df = new SimpleDateFormat("h:mm a"); - - String _logFileName; - long _startTime; - long _intervalStartTime; - long _totalMsgCount; - long _intervalCount; - - private AMQConnection _connection; - private Session _session; - private BytesMessage _payload; - private MessageProducer _producer; - - public void init() throws Exception - { - this.parseOptions(); - _logFileName = _logFilePath + "/MessageProducerTest_" + System.currentTimeMillis(); - _connection = ConnectionUtility.getInstance().getConnection(); - _connection.start(); - Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(_connection,_destination) : new AMQTopic(_connection,_destination); - _session = _connection.createSession(_transacted, Session.AUTO_ACKNOWLEDGE); - _payload = TestMessageFactory.newBytesMessage(_session, _messageSize); - _producer = _session.createProducer(dest); - // this should speedup the message producer - _producer.setDisableMessageTimestamp(true); - } - - public void run() - { - _startTime = System.currentTimeMillis(); - boolean run = true; - if(Boolean.getBoolean("collect_stats")) - { - printHeading(); - runReaper(); - } - - try - { - while (run) - { - _payload.setJMSMessageID(String.valueOf(_totalMsgCount+1)); - _producer.send(_payload); - _totalMsgCount ++; - _intervalCount ++; - - // check every x messages to see if times up - if(_intervalCount >= _logFrequency) - { - if (Boolean.getBoolean("collect_stats")) - { - runReaper(); - _intervalCount = 0; - } - if (System.currentTimeMillis() - _startTime >= _expiry) - { - // time to stop the test. - _session.close(); - _connection.stop(); - run = false; - } - } - } - } - catch (Exception e) - { - _logger.error("The timer thread exited", e); - } - printSummary(); - } - - public void runReaper() - { - try - { - FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true); - StringBuffer buf = new StringBuffer(); - Date d = new Date(System.currentTimeMillis()); - long currentTime = d.getTime(); - long intervalTime = currentTime - _intervalStartTime; - long totalTime = currentTime - _startTime; - buf.append(df.format(d)).append(","); - buf.append(d.getTime()).append(","); - buf.append(" total Msg Count: ").append(_totalMsgCount).append(","); - if(totalTime > 0 ) - buf.append(" rate: ").append(_totalMsgCount * 1000 / totalTime); - buf.append(","); - buf.append(" interval Count: ").append(_intervalCount).append(","); - if(intervalTime > 0 ) - buf.append(" interval rate: ").append(_intervalCount * 1000 / intervalTime).append(","); - buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n"); - buf.append("\n"); - _memoryLog.write(buf.toString()); - _memoryLog.close(); - System.out.println(buf); - _intervalStartTime = d.getTime(); - } - catch (Exception e) - { - _logger.error("Error printing info to the log file", e); - } - } - - private void printHeading() - { - try - { - FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true); - String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),memory"; - _memoryLog.write(s); - _memoryLog.close(); - } - catch (IOException e) - { - e.printStackTrace(); - } - } - - private void printSummary() - { - try - { - - long current = System.currentTimeMillis(); - double time = current - _startTime; - double ratio = _totalMsgCount*1000/time; - FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true); - - StringBuffer buf = new StringBuffer("MessageProducerTest \n Test started at : "); - buf.append(df.format(new Date(_startTime))).append("\n Test finished at : "); - Date d = new Date(current); - buf.append(df.format(d)).append("\n Total Time taken (ms):"); - buf.append(time).append("\n Total messages sent:"); - buf.append(_totalMsgCount).append("\n producer rate:"); - buf.append(ratio).append("\n"); - _summaryLog.write(buf.toString()); - System.out.println("---------- Test Ended -------------"); - _summaryLog.close(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - - public static void main(String[] args) - { - try - { - MessageProducerTest test = new MessageProducerTest(); - test.init(); - test.run(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - -} diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java deleted file mode 100644 index c0f51738db..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java +++ /dev/null @@ -1,105 +0,0 @@ -package org.apache.qpid.client.perf; - -public class Options -{ - public int _messageSize; - public boolean _transacted; - public boolean _synchronous; - public String _destination; - public long _expiry; - public long _logFrequency; - public String _logFilePath; - - /** - * System props - * -DmessageSize - * -DuseQueue - * -Dtransacted - * -Ddestinations - * -DlogFilePath - * -Duration=1H,30M,10S - * -DlogDuration=10 in mins - */ - public void parseOptions() - { - _messageSize = Integer.parseInt(System.getProperty("messageSize","100")); - _transacted = false; - _destination = System.getProperty("destination", "foo"); - _logFrequency = Long.parseLong(System.getProperty("logFrequency","10000")); - _logFilePath = System.getProperty("logFilePath"); - _expiry = getExpiry(); - - System.out.println("============= Test Data ==================="); - System.out.println("Destination : " + _destination); - System.out.println("Collect stats : " + Boolean.getBoolean("collect_stats")); - System.out.println("Log Frequency in msgs : " + _logFrequency); - System.out.println("Log file path : " + _logFilePath); - System.out.println("Test Duration : " + printTestDuration()); - System.out.println("============= /Test Data ==================="); - } - - private String printTestDuration() - { - StringBuffer buf = new StringBuffer(); - long temp = _expiry; - int hours = (int)temp/(60*60*1000); - temp = temp -hours*60*60*1000; - - int mins = (int)(temp)/(60*1000); - temp = temp -mins*60*1000; - - int secs = (int)temp/1000; - - if (hours > 0) - { - buf.append(hours).append(" hours "); - } - if (mins > 0) - { - buf.append(mins).append(" mins "); - } - if (secs > 0) - { - buf.append(secs).append(" secs"); - } - - return buf.toString(); - } - - private long getExpiry() - { - // default is 30 mins - long time = 0; - String s = System.getProperty("duration"); - if(s != null) - { - String[] temp = s.split(","); - for (String st:temp) - { - if(st.indexOf("H")>0) - { - int hour = Integer.parseInt(st.substring(0,st.indexOf("H"))); - time = time + hour * 60 * 60 * 1000; - } - else if(st.indexOf("M")>0) - { - int min = Integer.parseInt(st.substring(0,st.indexOf("M"))); - time = time + min * 60 * 1000; - } - else if(st.indexOf("S")>0) - { - int sec = Integer.parseInt(st.substring(0,st.indexOf("S"))); - time = time + sec * 1000; - } - - } - } - if (time == 0) - { - time = 30 * 60 * 1000; - } - - return time; - } - -} diff --git a/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java b/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java deleted file mode 100644 index 4a726c19ea..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/client/topic/Client.java +++ /dev/null @@ -1,210 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.client.topic; - -import org.apache.qpid.client.message.TestMessageFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.jms.*; -import java.util.Properties; -import java.util.Random; -import java.util.List; -import java.util.ArrayList; -import java.io.FileWriter; - -public class Client -{ - /** - * This class logger - */ - private static final Logger _logger=LoggerFactory.getLogger(Client.class); - - private long _messagesProduced=0; - private final Object _lock=new Object(); - private Message _message; - private List _runners=new ArrayList(); - - - /** - * Run the message consumer example. - * - * @param args Command line arguments. - */ - public static void main(String[] args) - { - Client syncConsumer=new Client(); - int firstArg=120; - if (args.length > 0) - { - try - { - firstArg=Integer.parseInt(args[0]); - } - catch (NumberFormatException e) - { - _logger.warn("Argument must be an integer, running for 2 minutes"); - } - } - syncConsumer.runClient(firstArg); - } - - - void runClient(long duration) - { - try - { - // Load JNDI properties - Properties properties=new Properties(); - properties.load(this.getClass().getResourceAsStream("topic.properties")); - - String logFilePath = System.getProperty("logFilePath", "./"); - FileWriter file = new FileWriter(logFilePath + "client-" + System.currentTimeMillis() + ".cvs",true); - - //Create the initial context - Context ctx=new InitialContext(properties); - - // Lookup the connection factory - ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory"); - // create the connection - Connection connection=conFac.createConnection(); - - connection.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException jmse) - { - // The connection may have broken invoke reconnect code if available. - // The connection may have broken invoke reconnect code if available. - System.err.println("Received an exception through the ExceptionListener"); - System.exit(0); - } - }); - - // Now the messageConsumer is set up we can start the connection - connection.start(); - - // Create a session on the connection - // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. - Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Queue queueCompleted = session.createQueue("completed"); - Queue queueStarted = session.createQueue("started"); - MessageProducer prod = session.createProducer(queueCompleted); - MessageConsumer cons = session.createConsumer(queueStarted); - cons.receive(); - _logger.info("Starting producing messages"); - - _message=TestMessageFactory.newBytesMessage(session, 1024); - - Random random=new Random(); - long testDuration=0; - long totalMessagesProduced; - long messagesProducedLastInterval=0; - long intervalThroughput; - long totalThroughput; - long numProducers=1; - String info; - startNewProducer(session, random); - while (testDuration < duration) - { - // every 5 second creates a thread an print the throughput - synchronized (_lock) - { - _lock.wait(5000); - totalMessagesProduced=_messagesProduced; - } - testDuration=testDuration + 5; - intervalThroughput=(totalMessagesProduced - messagesProducedLastInterval) / 5; - totalThroughput=totalMessagesProduced / testDuration; - messagesProducedLastInterval=totalMessagesProduced; - info = "Number of producers " + numProducers + " | This interval throughput = " + - intervalThroughput + " | Total throughput = " + totalThroughput; - _logger.info(info); - file.write(info + "\n"); - startNewProducer(session, random); - numProducers++; - } - file.close(); - // stop all the producers - for (Runner runner : _runners) - { - runner.stop(); - } - _logger.info("Stopping server"); - prod.send(session.createTextMessage("stop")); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - - private void startNewProducer(Session session, Random random) - throws JMSException - { - // select a random topic - int topicNumber=random.nextInt(50); - _logger.info("creating producer for topic: topic- " + topicNumber); - Topic topic=session.createTopic("topic-" + topicNumber); - MessageProducer prod=session.createProducer(topic); - Runner runner=new Runner(prod); - _runners.add(runner); - Thread thread=new Thread(runner); - thread.setDaemon(true); - thread.start(); - } - - private class Runner implements Runnable - { - MessageProducer _prod; - boolean _produce=true; - private Runner(MessageProducer prod) - { - _prod=prod; - } - - public void run() - { - while (_produce) - { - try - { - _prod.send(_message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, - Message.DEFAULT_TIME_TO_LIVE); - synchronized (_lock) - { - _messagesProduced++; - } - } - catch (Exception e) - { - e.printStackTrace(); - _produce=false; - } - } - } - - public void stop() - { - _produce=false; - } - } - -} diff --git a/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java b/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java deleted file mode 100644 index 883a7465a1..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/client/topic/Server.java +++ /dev/null @@ -1,171 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.qpid.client.topic; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.naming.Context; -import javax.naming.InitialContext; - -import javax.jms.*; -import java.util.Properties; -import java.io.FileWriter; - - -public class Server -{ - /** - * This class logger - */ - private static final Logger _logger=LoggerFactory.getLogger(Server.class); - - - private final Object _lock=new Object(); - private long _numMessages=0; - public FileWriter _file; - public boolean _running=true; - - public static void main(String[] args) - { - (new Server()).runServer(); - } - - void runServer() - { - try - { - // Load JNDI properties - Properties properties=new Properties(); - properties.load(this.getClass().getResourceAsStream("topic.properties")); - - String logFilePath=System.getProperty("logFilePath", "./"); - _file=new FileWriter(logFilePath + "server-" + System.currentTimeMillis() + ".cvs", true); - - //Create the initial context - Context ctx=new InitialContext(properties); - - // Lookup the connection factory - ConnectionFactory conFac=(ConnectionFactory) ctx.lookup("qpidConnectionfactory"); - // create the connection - Connection connection=conFac.createConnection(); - - connection.setExceptionListener(new ExceptionListener() - { - public void onException(JMSException jmse) - { - // The connection may have broken invoke reconnect code if available. - // The connection may have broken invoke reconnect code if available. - _logger.warn("Received an exception through the ExceptionListener"); - System.exit(0); - } - }); - - // Create a session on the connection - // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. - // Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - for (int i=0; i < 50; i++) - { - Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic=session.createTopic("topic-" + i); - TopicSubscriber dursub=session.createDurableSubscriber(topic, "durable-" + i); - dursub.setMessageListener(new MyListener()); - } - - // Now the messageConsumer is set up we can start the connection - connection.start(); - _logger.info("Ready to consume messages"); - // listen for the termination message - Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queueCompleted=session.createQueue("completed"); - Queue queueStarted=session.createQueue("started"); - MessageProducer prod=session.createProducer(queueStarted); - - Thread logger=new Thread(new MyLogger()); - logger.setDaemon(true); - logger.start(); - - prod.send(session.createTextMessage("start")); - long startTime=System.currentTimeMillis(); - MessageConsumer cons=session.createConsumer(queueCompleted); - cons.receive(); - - _running=false; - - long endTime=System.currentTimeMillis(); - session.close(); - _logger.info("Received " + _numMessages); - _file.write("Received " + _numMessages + "\n"); - _logger.info("Throughput " + _numMessages / (endTime - startTime) * 1000 + "msg/s"); - _file.write("Throughput " + _numMessages / (endTime - startTime) * 1000 + "msg/s"); - _file.close(); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - - private class MyListener implements MessageListener - { - public void onMessage(Message message) - { - synchronized (_lock) - { - _numMessages++; - /*if(_numMessages % 1000 == 0) - { - _logger.info("received: " + _numMessages); - } */ - } - } - } - - private class MyLogger implements Runnable - { - public void run() - { - long endTime=0; - while (_running) - { - synchronized (_lock) - { - try - { - _lock.wait(5000); - if (_running) - { - endTime=endTime + 5; - String s="Throughput " + _numMessages / endTime + " msg/s"; - _logger.info(s); - _file.write(s + "\n"); - } - - } - catch (Exception e) - { - e.printStackTrace(); - } - - } - } - } - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties b/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties deleted file mode 100644 index cff5275e36..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/client/topic/topic.properties +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory - -connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' - -# A 0.10 connection factory -#connectionfactory.qpidConnectionfactory = qpid:password=pass;username=name@tcp:localhost:5672 diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java deleted file mode 100644 index 06081e6ebf..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingAsyncTestPerf.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; - -import uk.co.thebadgerset.junit.extensions.TimingController; -import uk.co.thebadgerset.junit.extensions.TimingControllerAware; - -import javax.jms.JMSException; -import javax.jms.Message; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -/** - * PingAsyncTestPerf is a performance test that outputs multiple timings from its test method, using the timing controller - * interface supplied by the test runner from a seperate listener thread. It differs from the {@link PingTestPerf} test - * that it extends because it can output timings as replies are received, rather than waiting until all expected replies - * are received. This is less 'blocky' than the tests in {@link PingTestPerf}, and provides a truer simulation of sending - * and recieving clients working asynchronously. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Send many ping messages and output timings asynchronously on batches received. - *
- */ -public class PingAsyncTestPerf extends PingTestPerf implements TimingControllerAware -{ - private static Logger _logger = Logger.getLogger(PingAsyncTestPerf.class); - - /** Holds the name of the property to get the test results logging batch size. */ - public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; - - /** Holds the default test results logging batch size. */ - public static final int TEST_RESULTS_BATCH_SIZE_DEFAULT = 1000; - - /** Used to hold the timing controller passed from the test runner. */ - private TimingController _timingController; - - /** Used to generate unique correlation ids for each test run. */ - private AtomicLong corellationIdGenerator = new AtomicLong(); - - /** Holds test specifics by correlation id. This consists of the expected number of messages and the timing controler. */ - private Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); - - /** Holds the batched results listener, that does logging on batch boundaries. */ - private BatchedResultsListener batchedResultsListener = null; - - /** - * Creates a new asynchronous ping performance test with the specified name. - * - * @param name The test name. - */ - public PingAsyncTestPerf(String name) - { - super(name); - - // Sets up the test parameters with defaults. - testParameters.setPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, - Integer.toString(TEST_RESULTS_BATCH_SIZE_DEFAULT)); - } - - /** - * Compile all the tests into a test suite. - * @return The test suite to run. Should only contain testAsyncPingOk method. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingAsyncTestPerf("testAsyncPingOk")); - - return suite; - } - - /** - * Accepts a timing controller from the test runner. - * - * @param timingController The timing controller to register mutliple timings with. - */ - public void setTimingController(TimingController timingController) - { - _timingController = timingController; - } - - /** - * Gets the timing controller passed in by the test runner. - * - * @return The timing controller passed in by the test runner. - */ - public TimingController getTimingController() - { - return _timingController; - } - - /** - * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until - * all replies have been received or a time out occurs before exiting this method. - * - * @param numPings The number of pings to send. - * @throws Exception pass all errors out to the test harness - */ - public void testAsyncPingOk(int numPings) throws Exception - { - // _logger.debug("public void testAsyncPingOk(int numPings): called"); - - // Ensure that at least one ping was requeusted. - if (numPings == 0) - { - _logger.error("Number of pings requested was zero."); - fail("Number of pings requested was zero."); - } - - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - PingClient pingClient = perThreadSetup._pingClient; - - // Advance the correlation id of messages to send, to make it unique for this run. - perThreadSetup._correlationId = Long.toString(corellationIdGenerator.incrementAndGet()); - // String messageCorrelationId = perThreadSetup._correlationId; - // _logger.debug("messageCorrelationId = " + messageCorrelationId); - - // Initialize the count and timing controller for the new correlation id. - PerCorrelationId perCorrelationId = new PerCorrelationId(); - TimingController tc = getTimingController().getControllerForCurrentThread(); - perCorrelationId._tc = tc; - perCorrelationId._expectedCount = pingClient.getExpectedNumPings(numPings); - perCorrelationIds.put(perThreadSetup._correlationId, perCorrelationId); - - // Send the requested number of messages, and wait until they have all been received. - long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(null, numPings, timeout, perThreadSetup._correlationId); - - // Check that all the replies were received and log a fail if they were not. - if (numReplies < perCorrelationId._expectedCount) - { - perCorrelationId._tc.completeTest(false, numPings - perCorrelationId._expectedCount); - } - - // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. - perCorrelationIds.remove(perThreadSetup._correlationId); - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - _logger.debug("public void threadSetUp(): called"); - - try - { - // Call the set up method in the super class. This creates a PingClient pinger. - super.threadSetUp(); - - // Create the chained message listener, only if it has not already been created. This is set up with the - // batch size property, to tell it what batch size to output results on. A synchronized block is used to - // ensure that only one thread creates this. - synchronized (this) - { - if (batchedResultsListener == null) - { - int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); - batchedResultsListener = new BatchedResultsListener(batchSize); - } - } - - // Get the set up that the super class created. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Register the chained message listener on the pinger to do its asynchronous test timings from. - perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * BatchedResultsListener is a {@link PingPongProducer.ChainedMessageListener} that can be attached to the - * pinger, in order to receive notifications about every message received and the number remaining to be - * received. Whenever the number remaining crosses a batch size boundary this results listener outputs - * a test timing for the actual number of messages received in the current batch. - */ - private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener - { - /** The test results logging batch size. */ - int _batchSize; - - /** - * Creates a results listener on the specified batch size. - * - * @param batchSize The batch size to use. - */ - public BatchedResultsListener(int batchSize) - { - _batchSize = batchSize; - } - - /** - * This callback method is called from all of the pingers that this test creates. It uses the correlation id - * from the message to identify the timing controller for the test thread that was responsible for sending those - * messages. - * - * @param message The message. - * @param remainingCount The count of messages remaining to be received with a particular correlation id. - * - * @throws JMSException Any underlying JMSException is allowed to fall through. - */ - public void onMessage(Message message, int remainingCount, long latency) throws JMSException - { - // Check if a batch boundary has been crossed. - if ((remainingCount % _batchSize) == 0) - { - // Extract the correlation id from the message. - String correlationId = message.getJMSCorrelationID(); - - /*_logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount - + "): called on batch boundary for message id: " + correlationId + " with thread id: " - + Thread.currentThread().getId());*/ - - // Get the details for the correlation id and check that they are not null. They can become null - // if a test times out. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); - if (perCorrelationId != null) - { - // Get the timing controller and expected count for this correlation id. - TimingController tc = perCorrelationId._tc; - int expected = perCorrelationId._expectedCount; - - // Calculate how many messages were actually received in the last batch. This will be the batch size - // except where the number expected is not a multiple of the batch size and this is the first remaining - // count to cross a batch size boundary, in which case it will be the number expected modulo the batch - // size. - int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; - - // Register a test result for the correlation id. - try - { - tc.completeTest(true, receivedInBatch); - } - catch (InterruptedException e) - { - // Ignore this. It means the test runner wants to stop as soon as possible. - _logger.warn("Got InterruptedException.", e); - } - } - // Else ignore, test timed out. Should log a fail here? - } - } - } - - /** - * Holds state specific to each correlation id, needed to output test results. This consists of the count of - * the total expected number of messages, and the timing controller for the thread sending those message ids. - */ - private static class PerCorrelationId - { - public int _expectedCount; - public TimingController _tc; - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java deleted file mode 100644 index b9632eee4c..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; - -import javax.jms.Destination; - -import java.util.List; -import java.util.Properties; - -/** - * PingClient is a {@link PingPongProducer} that does not need a {@link org.apache.qpid.requestreply.PingPongBouncer} - * to send replies to its pings. It simply listens to its own ping destinations, rather than seperate reply queues. - * It is an all in one ping client, that produces and consumes its own pings. - * - *

The constructor increments a count of the number of ping clients created. It is assumed that where many - * are created they will all be run in parallel and be active in sending and consuming pings at the same time. - * If the unique destinations flag is not set and a pub/sub ping cycle is being run, this means that they will all hear - * pings sent by each other. The expected number of pings received will therefore be multiplied up by the number of - * active ping clients. The {@link #getConsumersPerDestination()} method is used to supply this multiplier under these - * conditions. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Create a ping producer that listens to its own pings {@link PingPongProducer} - *
Count the number of ping producers and produce multiplier for scaling up messages expected over topic pings. - *
- */ -public class PingClient extends PingPongProducer -{ - /** Used for debugging. */ - private final Logger log = Logger.getLogger(PingClient.class); - - /** Used to count the number of ping clients created. */ - private static int _pingClientCount; - - /** - * Creates a ping producer with the specified parameters, of which there are many. See the class level comments - * for {@link PingPongProducer} for details. This constructor creates a connection to the broker and creates - * producer and consumer sessions on it, to send and recieve its pings and replies on. - * - * @param overrides Properties containing any desired overrides to the defaults. - * - * @throws Exception Any exceptions are allowed to fall through. - */ - public PingClient(Properties overrides) throws Exception - { - super(overrides); - - _pingClientCount++; - } - - /** - * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the - * effect of making this pinger listen to its own pings. - * - * @return The ping destinations. - */ - public List getReplyDestinations() - { - return _pingDestinations; - } - - /** - * Supplies the multiplier for the number of ping clients that will hear each ping when doing pub/sub pinging. - * - * @return The scaling up of the number of expected pub/sub pings. - */ - public int getConsumersPerDestination() - { - log.debug("public int getConsumersPerDestination(): called"); - - if (_isUnique) - { - log.debug(_noOfConsumers + " consumer per destination."); - - return _noOfConsumers; - } - else - { - log.debug((_pingClientCount * _noOfConsumers) + " consumers per destination."); - - return _pingClientCount * _noOfConsumers; - } - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java deleted file mode 100644 index 2750790354..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java +++ /dev/null @@ -1,451 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; -import org.apache.qpid.util.CommandLineParser; - -import uk.co.thebadgerset.junit.extensions.util.MathUtils; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; - -import javax.jms.*; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * PingDurableClient is a variation of the {@link PingPongProducer} ping tool. Instead of sending its pings and - * receiving replies to them at the same time, this tool sends pings until it is signalled by some 'event' to stop - * sending. It then waits for another signal before it re-opens a fresh connection and attempts to receive all of the - * pings that it has succesfully sent. It is intended to be an interactive test that lets a user experiment with - * failure conditions when using durable messaging. - * - *

The events that can stop it from sending are input from the user on the console, failure of its connection to - * the broker, completion of sending a specified number of messages, or expiry of a specified duration. In all cases - * it will do its best to clean up and close the connection before opening a fresh connection to receive the pings - * with. - * - *

The event to re-connect and attempt to recieve the pings is input from the user on the console. - * - *

This ping client inherits the configuration properties of its parent class ({@link PingPongProducer}) and - * additionally accepts the following parameters: - * - *

- *
Parameters
Parameter Default Comments - *
numMessages 100 The total number of messages to send. - *
numMessagesToAction -1 The number of messages to send before taking a custom 'action'. - *
duration 30S The length of time to ping for. (Format dDhHmMsS, for d days, h hours, - * m minutes and s seconds). - *
- * - *

This ping client also overrides some of the defaults of its parent class, to provide a reasonable set up - * when no parameters are specified. - * - *

- *
Parameters
Parameter Default Comments - *
uniqueDests false Prevents destination names being timestamped. - *
transacted true Only makes sense to test with transactions. - *
persistent true Only makes sense to test persistent. - *
durableDests true Should use durable queues with persistent messages. - *
commitBatchSize 10 - *
rate 20 Total default test time is 5 seconds. - *
- * - *

When a number of messages or duration is specified, this ping client will ping until the first of those limits - * is reached. Reaching the limit will be interpreted as the first signal to stop sending, and the ping client will - * wait for the second signal before receiving its pings. - * - *

This class provides a mechanism for extensions to add arbitrary actions, after a particular number of messages - * have been sent. When the number of messages equal the value set in the 'numMessagesToAction' property is method, - * the {@link #takeAction} method is called. By default this does nothing, but extensions of this class can provide - * custom behaviour with alternative implementations of this method (for example taking a backup). - * - *

- *
CRC Card
Responsibilities Collaborations - *
Send and receive pings. - *
Accept user input to signal stop sending. - *
Accept user input to signal start receiving. - *
Provide feedback on pings sent versus pings received. - *
Provide extension point for arbitrary action on a particular message count. - *
- */ -public class PingDurableClient extends PingPongProducer implements ExceptionListener -{ - private static final Logger log = Logger.getLogger(PingDurableClient.class); - - public static final String NUM_MESSAGES_PROPNAME = "numMessages"; - public static final String NUM_MESSAGES_DEFAULT = "100"; - public static final String DURATION_PROPNAME = "duration"; - public static final String DURATION_DEFAULT = "30S"; - public static final String NUM_MESSAGES_TO_ACTION_PROPNAME = "numMessagesToAction"; - public static final String NUM_MESSAGES_TO_ACTION_DEFAULT = "-1"; - - /** The maximum length of time to wait whilst receiving pings before assuming that no more are coming. */ - private static final long TIME_OUT = 3000; - - static - { - defaults.setProperty(NUM_MESSAGES_PROPNAME, NUM_MESSAGES_DEFAULT); - defaults.setProperty(DURATION_PROPNAME, DURATION_DEFAULT); - defaults.setProperty(UNIQUE_DESTS_PROPNAME, "false"); - defaults.setProperty(TRANSACTED_PROPNAME, "true"); - defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true"); - defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10"); - defaults.setProperty(RATE_PROPNAME, "20"); - defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT); - } - - /** Specifies the number of pings to send, if larger than 0. 0 means send until told to stop. */ - private int numMessages; - - /** Holds the number of messages to send before taking triggering the action. */ - private int numMessagesToAction; - - /** Sepcifies how long to ping for, if larger than 0. 0 means send until told to stop. */ - private long duration; - - /** Used to indciate that this application should terminate. Set by the shutdown hook. */ - private boolean terminate = false; - - /** - * @throws Exception Any exceptions are allowed to fall through. - */ - public PingDurableClient(Properties overrides) throws Exception - { - super(overrides); - log.debug("public PingDurableClient(Properties overrides = " + overrides + "): called"); - - // Extract the additional configuration parameters. - ParsedProperties properties = new ParsedProperties(defaults); - properties.putAll(overrides); - - numMessages = properties.getPropertyAsInteger(NUM_MESSAGES_PROPNAME); - String durationSpec = properties.getProperty(DURATION_PROPNAME); - numMessagesToAction = properties.getPropertyAsInteger(NUM_MESSAGES_TO_ACTION_PROPNAME); - - if (durationSpec != null) - { - duration = MathUtils.parseDuration(durationSpec) * 1000000; - } - } - - /** - * Starts the ping/wait/receive process. - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - // Create a ping producer overriding its defaults with all options passed on the command line. - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); - PingDurableClient pingProducer = new PingDurableClient(options); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - // pingProducer.getConnection().setExceptionListener(pingProducer); - - // Run the test procedure. - int sent = pingProducer.send(); - pingProducer.closeConnection(); - pingProducer.waitForUser("Press return to begin receiving the pings."); - pingProducer.receive(sent); - - System.exit(0); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(1); - } - } - - /** - * Performs the main test procedure implemented by this ping client. See the class level comment for details. - */ - protected int send() throws Exception - { - log.debug("public void sendWaitReceive(): called"); - - log.debug("duration = " + duration); - log.debug("numMessages = " + numMessages); - - if (duration > 0) - { - System.out.println("Sending for up to " + (duration / 1000000000f) + " seconds."); - } - - if (_rate > 0) - { - System.out.println("Sending at " + _rate + " messages per second."); - } - - if (numMessages > 0) - { - System.out.println("Sending up to " + numMessages + " messages."); - } - - // Establish the connection and the message producer. - establishConnection(true, false); - _connection.start(); - - Message message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); - - // Send pings until a terminating condition is received. - boolean endCondition = false; - int messagesSent = 0; - int messagesCommitted = 0; - int messagesNotCommitted = 0; - long start = System.nanoTime(); - - // Clear console in. - clearConsole(); - - while (!endCondition) - { - boolean committed = false; - - try - { - committed = sendMessage(messagesSent, message) && _transacted; - - messagesSent++; - messagesNotCommitted++; - - // Keep count of the number of messsages currently committed and pending commit. - if (committed) - { - log.debug("Adding " + messagesNotCommitted + " messages to the committed count."); - messagesCommitted += messagesNotCommitted; - messagesNotCommitted = 0; - - System.out.println("Commited: " + messagesCommitted); - } - } - catch (JMSException e) - { - log.debug("Got JMSException whilst sending."); - _publish = false; - } - - // Perform the arbitrary action if the number of messages sent has reached the right number. - if (messagesSent == numMessagesToAction) - { - System.out.println("At action point, Messages sent = " + messagesSent + ", Messages Committed = " - + messagesCommitted + ", Messages not Committed = " + messagesNotCommitted); - takeAction(); - } - - // Determine if the end condition has been met, based on the number of messages, time passed, errors on - // the connection or user input. - long now = System.nanoTime(); - - if ((duration != 0) && ((now - start) > duration)) - { - System.out.println("Send halted because duration expired."); - endCondition = true; - } - else if ((numMessages != 0) && (messagesSent >= numMessages)) - { - System.out.println("Send halted because # messages completed."); - endCondition = true; - } - else if (System.in.available() > 0) - { - System.out.println("Send halted by user input."); - endCondition = true; - - clearConsole(); - } - else if (!_publish) - { - System.out.println("Send halted by error on the connection."); - endCondition = true; - } - } - - log.debug("messagesSent = " + messagesSent); - log.debug("messagesCommitted = " + messagesCommitted); - log.debug("messagesNotCommitted = " + messagesNotCommitted); - - System.out.println("Messages sent: " + messagesSent + ", Messages Committed = " + messagesCommitted - + ", Messages not Committed = " + messagesNotCommitted); - - return messagesSent; - } - - protected void closeConnection() - { - // Clean up the connection. - try - { - close(); - } - catch (JMSException e) - { - log.debug("There was an error whilst closing the connection: " + e, e); - System.out.println("There was an error whilst closing the connection."); - - // Ignore as did best could manage to clean up. - } - } - - protected void receive(int messagesSent) throws Exception - { - // Re-establish the connection and the message consumer. - _queueJVMSequenceID = new AtomicInteger(); - _queueSharedID = new AtomicInteger(); - - establishConnection(false, true); - _consumer[0].setMessageListener(null); - _consumerConnection[0].start(); - - // Try to receive all of the pings that were successfully sent. - int messagesReceived = 0; - boolean endCondition = false; - - while (!endCondition) - { - // Message received = _consumer.receiveNoWait(); - Message received = _consumer[0].receive(TIME_OUT); - log.debug("received = " + received); - - if (received != null) - { - messagesReceived++; - } - - // Determine if the end condition has been met, based on the number of messages and time passed since last - // receiving a message. - if (received == null) - { - System.out.println("Timed out."); - endCondition = true; - } - else if (messagesReceived >= messagesSent) - { - System.out.println("Got all messages."); - endCondition = true; - } - } - - // Ensure messages received are committed. - if (_consTransacted) - { - try - { - _consumerSession[0].commit(); - System.out.println("Committed for all messages received."); - } - catch (JMSException e) - { - log.debug("Error during commit: " + e, e); - System.out.println("Error during commit."); - try - { - _consumerSession[0].rollback(); - System.out.println("Rolled back on all messages received."); - } - catch (JMSException e2) - { - log.debug("Error during rollback: " + e, e); - System.out.println("Error on roll back of all messages received."); - } - - } - } - - log.debug("messagesReceived = " + messagesReceived); - - System.out.println("Messages received: " + messagesReceived); - - // Clean up the connection. - close(); - } - - /** - * Clears any pending input from the console. - */ - private void clearConsole() - { - try - { - BufferedReader bis = new BufferedReader(new InputStreamReader(System.in)); - - // System.in.skip(System.in.available()); - while (bis.ready()) - { - bis.readLine(); - } - } - catch (IOException e) - { } - } - - /** - * Returns the ping destinations themselves as the reply destinations for this pinger to listen to. This has the - * effect of making this pinger listen to its own pings. - * - * @return The ping destinations. - */ - public List getReplyDestinations() - { - return _pingDestinations; - } - - /** - * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered with - * the runtime system as a shutdown hook. This shutdown hook sets an additional terminate flag, compared with the - * shutdown hook in {@link PingPongProducer}, because the publish flag is used to indicate that sending or receiving - * message should stop, not that the application should termiante. - * - * @return A shutdown hook for the ping loop. - */ - public Thread getShutdownHook() - { - return new Thread(new Runnable() - { - public void run() - { - stop(); - terminate = true; - } - }); - } - - /** - * Performs an aribtrary action once the 'numMesagesToAction' count is reached on sending messages. This default - * implementation does nothing. - */ - public void takeAction() - { } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java deleted file mode 100644 index 55414664da..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingLatencyTestPerf.java +++ /dev/null @@ -1,314 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.AMQSession; -import org.apache.qpid.client.message.AMQMessage; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.requestreply.PingPongProducer; - -import uk.co.thebadgerset.junit.extensions.TimingController; -import uk.co.thebadgerset.junit.extensions.TimingControllerAware; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; - -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.ObjectMessage; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -/** - * PingLatencyTestPerf is a performance test that outputs multiple timings from its test method, using the timing - * controller interface supplied by the test runner from a seperate listener thread. It outputs round trip timings for - * individual ping messages rather than for how long a complete batch of messages took to process. It also differs from - * the {@link PingTestPerf} test that it extends because it can output timings as replies are received, rather than - * waiting until all expected replies are received. - * - *

This test does not output timings for every single ping message, as when running at high volume, writing the test - * log for a vast number of messages would slow the testing down. Instead samples ping latency occasionally. The - * frequency of ping sampling is set using the {@link #TEST_RESULTS_BATCH_SIZE_PROPNAME} property, to override the - * default of every {@link #DEFAULT_TEST_RESULTS_BATCH_SIZE}. - * - *

The size parameter logged for each individual ping is set to the size of the batch of messages that the - * individual timed ping was taken from, rather than 1 for a single message. This is so that the total throughput - * (messages / time) can be calculated in order to examine the relationship between throughput and latency. - * - *

CRC Card
Responsibilities Collaborations
Send many ping - * messages and output timings for sampled individual pings.
- */ -public class PingLatencyTestPerf extends PingTestPerf implements TimingControllerAware -{ - private static Logger _logger = Logger.getLogger(PingLatencyTestPerf.class); - - /** Holds the name of the property to get the test results logging batch size. */ - public static final String TEST_RESULTS_BATCH_SIZE_PROPNAME = "batchSize"; - - /** Holds the default test results logging batch size. */ - public static final int DEFAULT_TEST_RESULTS_BATCH_SIZE = 1000; - - /** Used to hold the timing controller passed from the test runner. */ - private TimingController _timingController; - - /** Used to generate unique correlation ids for each test run. */ - private AtomicLong corellationIdGenerator = new AtomicLong(); - - /** - * Holds test specifics by correlation id. This consists of the expected number of messages and the timing - * controler. - */ - private Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); - - /** Holds the batched results listener, that does logging on batch boundaries. */ - private BatchedResultsListener batchedResultsListener = null; - - /** - * Creates a new asynchronous ping performance test with the specified name. - * - * @param name The test name. - */ - public PingLatencyTestPerf(String name) - { - super(name); - - // Sets up the test parameters with defaults. - ParsedProperties.setSysPropertyIfNull(TEST_RESULTS_BATCH_SIZE_PROPNAME, - Integer.toString(DEFAULT_TEST_RESULTS_BATCH_SIZE)); - } - - /** Compile all the tests into a test suite. */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Latency Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingLatencyTestPerf("testPingLatency")); - - return suite; - } - - /** - * Accepts a timing controller from the test runner. - * - * @param timingController The timing controller to register mutliple timings with. - */ - public void setTimingController(TimingController timingController) - { - _timingController = timingController; - } - - /** - * Gets the timing controller passed in by the test runner. - * - * @return The timing controller passed in by the test runner. - */ - public TimingController getTimingController() - { - return _timingController; - } - - /** - * Sends the specified number of pings, asynchronously outputs timings on every batch boundary, and waits until all - * replies have been received or a time out occurs before exiting this method. - * - * @param numPings The number of pings to send. - */ - public void testPingLatency(int numPings) throws Exception - { - _logger.debug("public void testPingLatency(int numPings): called"); - - // Ensure that at least one ping was requeusted. - if (numPings == 0) - { - _logger.error("Number of pings requested was zero."); - } - - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - PingClient pingClient = perThreadSetup._pingClient; - - // Advance the correlation id of messages to send, to make it unique for this run. - String messageCorrelationId = Long.toString(corellationIdGenerator.incrementAndGet()); - _logger.debug("messageCorrelationId = " + messageCorrelationId); - - // Initialize the count and timing controller for the new correlation id. - PerCorrelationId perCorrelationId = new PerCorrelationId(); - TimingController tc = getTimingController().getControllerForCurrentThread(); - perCorrelationId._tc = tc; - perCorrelationId._expectedCount = numPings; - perCorrelationIds.put(messageCorrelationId, perCorrelationId); - - // Attach the chained message listener to the ping producer to listen asynchronously for the replies to these - // messages. - pingClient.setChainedMessageListener(batchedResultsListener); - - // Generate a sample message of the specified size. - Message msg = - pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // Send the requested number of messages, and wait until they have all been received. - long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = pingClient.pingAndWaitForReply(msg, numPings, timeout, null); - - // Check that all the replies were received and log a fail if they were not. - if (numReplies < numPings) - { - tc.completeTest(false, 0); - } - - // Remove the chained message listener from the ping producer. - pingClient.removeChainedMessageListener(); - - // Remove the expected count and timing controller for the message correlation id, to ensure they are cleaned up. - perCorrelationIds.remove(messageCorrelationId); - } - - /** Performs test fixture creation on a per thread basis. This will only be called once for each test thread. */ - public void threadSetUp() - { - _logger.debug("public void threadSetUp(): called"); - - try - { - // Call the set up method in the super class. This creates a PingClient pinger. - super.threadSetUp(); - - // Create the chained message listener, only if it has not already been created. This is set up with the - // batch size property, to tell it what batch size to output results on. A synchronized block is used to - // ensure that only one thread creates this. - synchronized (this) - { - if (batchedResultsListener == null) - { - int batchSize = Integer.parseInt(testParameters.getProperty(TEST_RESULTS_BATCH_SIZE_PROPNAME)); - batchedResultsListener = new BatchedResultsListener(batchSize); - } - } - - // Get the set up that the super class created. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Register the chained message listener on the pinger to do its asynchronous test timings from. - perThreadSetup._pingClient.setChainedMessageListener(batchedResultsListener); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * BatchedResultsListener is a {@link org.apache.qpid.requestreply.PingPongProducer.ChainedMessageListener} that can - * be attached to the pinger, in order to receive notifications about every message received and the number - * remaining to be received. Whenever the number remaining crosses a batch size boundary this results listener - * outputs a test timing for the actual number of messages received in the current batch. - */ - private class BatchedResultsListener implements PingPongProducer.ChainedMessageListener - { - /** The test results logging batch size. */ - int _batchSize; - private boolean _strictAMQP; - - /** - * Creates a results listener on the specified batch size. - * - * @param batchSize The batch size to use. - */ - public BatchedResultsListener(int batchSize) - { - _batchSize = batchSize; - _strictAMQP = - Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, - AMQSession.STRICT_AMQP_DEFAULT)); - } - - /** - * This callback method is called from all of the pingers that this test creates. It uses the correlation id - * from the message to identify the timing controller for the test thread that was responsible for sending those - * messages. - * - * @param message The message. - * @param remainingCount The count of messages remaining to be received with a particular correlation id. - * - * @throws javax.jms.JMSException Any underlying JMSException is allowed to fall through. - */ - public void onMessage(Message message, int remainingCount, long latency) throws JMSException - { - _logger.debug("public void onMessage(Message message, int remainingCount = " + remainingCount + "): called"); - - // Check if a batch boundary has been crossed. - if ((remainingCount % _batchSize) == 0) - { - // Extract the correlation id from the message. - String correlationId = message.getJMSCorrelationID(); - - // Get the details for the correlation id and check that they are not null. They can become null - // if a test times out. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationId); - if (perCorrelationId != null) - { - // Get the timing controller and expected count for this correlation id. - TimingController tc = perCorrelationId._tc; - int expected = perCorrelationId._expectedCount; - - // Calculate how many messages were actually received in the last batch. This will be the batch size - // except where the number expected is not a multiple of the batch size and this is the first remaining - // count to cross a batch size boundary, in which case it will be the number expected modulo the batch - // size. - int receivedInBatch = ((expected - remainingCount) < _batchSize) ? (expected % _batchSize) : _batchSize; - - // Register a test result for the correlation id. - try - { - tc.completeTest(true, receivedInBatch, latency); - } - catch (InterruptedException e) - { - // Ignore this. It means the test runner wants to stop as soon as possible. - _logger.warn("Got InterruptedException.", e); - } - } - // Else ignore, test timed out. Should log a fail here? - } - } - } - - /** - * Holds state specific to each correlation id, needed to output test results. This consists of the count of the - * total expected number of messages, and the timing controller for the thread sending those message ids. - */ - private static class PerCorrelationId - { - public int _expectedCount; - public TimingController _tc; - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java deleted file mode 100644 index 150f7c0d52..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.ping; - -import java.util.Properties; - -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; - -import org.apache.log4j.Logger; - -import org.apache.qpid.client.message.TestMessageFactory; -import org.apache.qpid.util.CommandLineParser; - -/** - *

- *
CRC Card
Responsibilities Collaborations - *
- */ -public class PingSendOnlyClient extends PingDurableClient -{ - private static final Logger log = Logger.getLogger(PingSendOnlyClient.class); - - public PingSendOnlyClient(Properties overrides) throws Exception - { - super(overrides); - } - - /** - * Starts the ping/wait/receive process. - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - // Create a ping producer overriding its defaults with all options passed on the command line. - Properties options = CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); - PingSendOnlyClient pingProducer = new PingSendOnlyClient(options); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - // pingProducer.getConnection().setExceptionListener(pingProducer); - - // Run the test procedure. - int sent = pingProducer.send(); - pingProducer.waitForUser("Press return to close connection and quit."); - pingProducer.closeConnection(); - - System.exit(0); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(1); - } - } - - public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException - { - Message msg = TestMessageFactory.newTextMessage(_producerSession, messageSize); - - // Timestamp the message in nanoseconds. - msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); - - return msg; - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java deleted file mode 100644 index 375007584b..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/ping/PingTestPerf.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.ping; - -import junit.framework.Assert; -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import org.apache.qpid.requestreply.PingPongProducer; - -import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; -import uk.co.thebadgerset.junit.extensions.TestThreadAware; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; - -import javax.jms.*; - -/** - * PingTestPerf is a ping test, that has been written with the intention of being scaled up to run many times - * simultaneously to simluate many clients/producers/connections. - * - *

A single run of the test using the default JUnit test runner will result in the sending and timing of a single - * full round trip ping. This test may be scaled up using a suitable JUnit test runner. - * - *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a - * temporary queue for replies. This setup is only established once for all the test repeats/threads that may be run, - * except if the connection is lost in which case an attempt to re-establish the setup is made. - * - *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that - * is the name of the temporary queue, fires off a message on the original queue and waits for a response on the - * temporary queue. - * - *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- */ -public class PingTestPerf extends AsymptoticTestCase implements TestThreadAware -{ - private static Logger _logger = Logger.getLogger(PingTestPerf.class); - - /** Thread local to hold the per-thread test setup fields. */ - ThreadLocal threadSetup = new ThreadLocal(); - - /** Holds a property reader to extract the test parameters from. */ - protected ParsedProperties testParameters = - TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); - - public PingTestPerf(String name) - { - super(name); - - _logger.debug("testParameters = " + testParameters); - } - - /** - * Compile all the tests into a test suite. - * @return The test method testPingOk. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingTestPerf("testPingOk")); - - return suite; - } - - public void testPingOk(int numPings) throws Exception - { - if (numPings == 0) - { - Assert.fail("Number of pings requested was zero."); - } - - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - - if (perThreadSetup == null) - { - Assert.fail("Could not get per thread test setup, it was null."); - } - - // Generate a sample message. This message is already time stamped and has its reply-to destination set. - Message msg = - perThreadSetup._pingClient.getTestMessage(perThreadSetup._pingClient.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // start the test - long timeout = Long.parseLong(testParameters.getProperty(PingPongProducer.TIMEOUT_PROPNAME)); - int numReplies = perThreadSetup._pingClient.pingAndWaitForReply(msg, numPings, timeout, null); - - // Fail the test if the timeout was exceeded. - if (numReplies != perThreadSetup._pingClient.getExpectedNumPings(numPings)) - { - Assert.fail("The ping timed out after " + timeout + " ms. Messages Sent = " + numPings + ", MessagesReceived = " - + numReplies); - } - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - _logger.debug("public void threadSetUp(): called"); - - try - { - PerThreadSetup perThreadSetup = new PerThreadSetup(); - - // This is synchronized because there is a race condition, which causes one connection to sleep if - // all threads try to create connection concurrently. - synchronized (this) - { - // Establish a client to ping a Destination and listen the reply back from same Destination - perThreadSetup._pingClient = new PingClient(testParameters); - perThreadSetup._pingClient.establishConnection(true, true); - } - // Start the client connection - perThreadSetup._pingClient.start(); - - // Attach the per-thread set to the thread. - threadSetup.set(perThreadSetup); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * Performs test fixture clean - */ - public void threadTearDown() - { - _logger.debug("public void threadTearDown(): called"); - - try - { - // Get the per thread test fixture. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Close the pingers so that it cleans up its connection cleanly. - synchronized (this) - { - if ((perThreadSetup != null) && (perThreadSetup._pingClient != null)) - { - perThreadSetup._pingClient.close(); - } - } - } - catch (JMSException e) - { - _logger.warn("There was an exception during per thread tear down."); - } - finally - { - // Ensure the per thread fixture is reclaimed. - threadSetup.remove(); - } - } - - protected static class PerThreadSetup - { - /** - * Holds the test ping client. - */ - protected PingClient _pingClient; - protected String _correlationId; - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/InitialContextHelper.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/InitialContextHelper.java deleted file mode 100644 index b684fd6b9b..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/InitialContextHelper.java +++ /dev/null @@ -1,55 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.requestreply; - -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; -import java.util.Properties; -import java.io.InputStream; -import java.io.IOException; - -/** - * - * - */ -public class InitialContextHelper -{ - - public static Context getInitialContext(String propertyFile) throws IOException, NamingException - { - if ((propertyFile == null) || (propertyFile.length() == 0)) - { - propertyFile = "/perftests.properties"; - } - - Properties fileProperties = new Properties(); - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - // NB: Need to change path to reflect package if moving classes around ! - InputStream is = cl.getResourceAsStream(propertyFile); - if( is != null ) - { - fileProperties.load(is); - return new InitialContext(fileProperties); - } - else - { - return new InitialContext(); - } - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java deleted file mode 100644 index 8bcbdbd369..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java +++ /dev/null @@ -1,392 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import java.io.IOException; -import java.net.InetAddress; -import java.text.SimpleDateFormat; -import java.util.Date; - -import javax.jms.*; -import javax.naming.Context; - -import org.apache.log4j.Logger; - -/** - * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return - * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes - * too. - *

- *

The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages - * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique - * temporary queue or the correlation id to correlate the original message to the reply. - *

- *

There is a verbose mode flag which causes information about each ping to be output to the console - * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should - * be disabled for real timing tests as writing to the console will slow things down. - *

- *

- *
CRC Card
Responsibilities Collaborations - *
Bounce back messages to their reply to destination. - *
Provide command line invocation to start the bounce back on a configurable broker url. - *
- * - * @todo Replace the command line parsing with a neater tool. - * @todo Make verbose accept a number of messages, only prints to console every X messages. - */ -public class PingPongBouncer implements MessageListener -{ - private static final Logger _logger = Logger.getLogger(PingPongBouncer.class); - - /** - * The default prefetch size for the message consumer. - */ - private static final int PREFETCH = 1; - - /** - * The default no local flag for the message consumer. - */ - private static final boolean NO_LOCAL = true; - - private static final String DEFAULT_DESTINATION_NAME = "ping"; - - /** - * The default exclusive flag for the message consumer. - */ - private static final boolean EXCLUSIVE = false; - - /** - * A convenient formatter to use when time stamping output. - */ - protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - - /** - * Used to indicate that the reply generator should log timing info to the console (logger info level). - */ - private boolean _verbose = false; - - /** - * Determines whether this bounce back client bounces back messages persistently. - */ - private boolean _persistent = false; - - private Destination _consumerDestination; - - /** - * Keeps track of the response destination of the previous message for the last reply to producer cache. - */ - private Destination _lastResponseDest; - - /** - * The producer for sending replies with. - */ - private MessageProducer _replyProducer; - - /** - * The consumer controlSession. - */ - private Session _consumerSession; - - /** - * The producer controlSession. - */ - private Session _producerSession; - - /** - * Holds the connection to the broker. - */ - private Connection _connection; - - /** - * Flag used to indicate if this is a point to point or pub/sub ping client. - */ - private boolean _isPubSub = false; - - /** - * This flag is used to indicate that the user should be prompted to kill a broker, in order to test - * failover, immediately before committing a transaction. - */ - protected boolean _failBeforeCommit = false; - - /** - * This flag is used to indicate that the user should be prompted to a kill a broker, in order to test - * failover, immediate after committing a transaction. - */ - protected boolean _failAfterCommit = false; - - /** - * Creates a PingPongBouncer on the specified producer and consumer sessions. - * - * @param fileProperties The path to the file properties - * @param factoryName The factory name - * @param username The broker username. - * @param password The broker password. - * @param destinationName The name of the queue to receive pings on - * (or root of the queue name where many queues are generated). - * @param persistent A flag to indicate that persistent message should be used. - * @param transacted A flag to indicate that pings should be sent within transactions. - * @param selector A message selector to filter received pings with. - * @param verbose A flag to indicate that message timings should be sent to the console. - * @throws Exception All underlying exceptions allowed to fall through. This is only test code... - */ - public PingPongBouncer(String fileProperties, String factoryName, String username, String password, - String destinationName, boolean persistent, boolean transacted, - String selector, boolean verbose, boolean pubsub) throws Exception - { - // Create a client id to uniquely identify this client. - InetAddress address = InetAddress.getLocalHost(); - String clientId = address.getHostName() + System.currentTimeMillis(); - _verbose = verbose; - _persistent = persistent; - setPubSub(pubsub); - // Connect to the broker. - Context context = InitialContextHelper.getInitialContext(fileProperties); - ConnectionFactory factory = (ConnectionFactory) context.lookup(factoryName); - setConnection(factory.createConnection(username, password)); - - // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the - // command line option. - _consumerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); - _producerSession = (Session) getConnection().createSession(transacted, Session.AUTO_ACKNOWLEDGE); - - // Create the queue to listen for message on. - createConsumerDestination(destinationName); - MessageConsumer consumer = _consumerSession.createConsumer(_consumerDestination, selector, NO_LOCAL); - - // Create a producer for the replies, without a default destination. - _replyProducer = _producerSession.createProducer(null); - _replyProducer.setDisableMessageTimestamp(true); - _replyProducer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - // Set this up to listen for messages on the queue. - consumer.setMessageListener(this); - } - - private static void usage() - { - System.err.println( - "Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" + "-persistent : (true/false). Default is false\n" + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n"); - } - - /** - * This is a callback method that is notified of all messages for which this has been registered as a message - * listener on a message consumer. It sends a reply (pong) to all messages it receieves on the reply to - * destination of the message. - * - * @param message The message that triggered this callback. - */ - public void onMessage(Message message) - { - try - { - String messageCorrelationId = message.getJMSCorrelationID(); - if (_verbose) - { - _logger.info(timestampFormatter - .format(new Date()) + ": Got ping with correlation id, " + messageCorrelationId); - } - - // Get the reply to destination from the message and check it is set. - Destination responseDest = message.getJMSReplyTo(); - - if (responseDest == null) - { - _logger.debug("Cannot send reply because reply-to destination is null."); - - return; - } - - // Spew out some timing information if verbose mode is on. - if (_verbose) - { - Long timestamp = message.getLongProperty("timestamp"); - - if (timestamp != null) - { - long diff = System.currentTimeMillis() - timestamp; - _logger.info("Time to bounce point: " + diff); - } - } - - // Correlate the reply to the original. - message.setJMSCorrelationID(messageCorrelationId); - - // Send the receieved message as the pong reply. - _replyProducer.send(responseDest, message); - - if (_verbose) - { - _logger.info(timestampFormatter - .format(new Date()) + ": Sent reply with correlation id, " + messageCorrelationId); - } - - // Commit the transaction if running in transactional mode. - commitTx(_producerSession); - } - catch (JMSException e) - { - _logger.debug("There was a JMSException: " + e.getMessage(), e); - } - } - - /** - * Gets the underlying connection that this ping client is running on. - * - * @return The underlying connection that this ping client is running on. - */ - public Connection getConnection() - { - return _connection; - } - - /** - * Sets the connection that this ping client is using. - * - * @param connection The ping connection. - */ - public void setConnection(Connection connection) - { - this._connection = connection; - } - - /** - * Sets or clears the pub/sub flag to indiciate whether this client is pinging a queue or a topic. - * - * @param pubsub true if this client is pinging a topic, false if it is pinging a queue. - */ - public void setPubSub(boolean pubsub) - { - _isPubSub = pubsub; - } - - /** - * Checks whether this client is a p2p or pub/sub ping client. - * - * @return true if this client is pinging a topic, false if it is pinging a queue. - */ - public boolean isPubSub() - { - return _isPubSub; - } - - /** - * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not - * a transactional controlSession, this method does nothing. - *

- *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the - * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker - * after the commit is applied. - * - * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - */ - protected void commitTx(Session session) throws JMSException - { - if (session.getTransacted()) - { - try - { - if (_failBeforeCommit) - { - _logger.trace("Failing Before Commit"); - doFailover(); - } - - session.commit(); - - if (_failAfterCommit) - { - _logger.trace("Failing After Commit"); - doFailover(); - } - - _logger.trace("Session Commited."); - } - catch (JMSException e) - { - _logger.trace("JMSException on commit:" + e.getMessage(), e); - - try - { - session.rollback(); - _logger.debug("Message rolled back."); - } - catch (JMSException jmse) - { - _logger.trace("JMSE on rollback:" + jmse.getMessage(), jmse); - - // Both commit and rollback failed. Throw the rollback exception. - throw jmse; - } - } - } - } - - /** - * Prompts the user to terminate the named broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - * - * @param broker The name of the broker to terminate. - */ - protected void doFailover(String broker) - { - System.out.println("Kill Broker " + broker + " now."); - try - { - System.in.read(); - } - catch (IOException e) - { - } - - System.out.println("Continuing."); - } - - /** - * Prompts the user to terminate the broker, in order to test failover functionality. This method will block - * until the user supplied some input on the terminal. - */ - protected void doFailover() - { - System.out.println("Kill Broker now."); - try - { - System.in.read(); - } - catch (IOException e) - { - } - - System.out.println("Continuing."); - - } - - private void createConsumerDestination(String name) throws JMSException - { - if (isPubSub()) - { - _consumerDestination = _consumerSession.createTopic(name); - } - else - { - _consumerDestination = _consumerSession.createQueue(name); - } - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java deleted file mode 100644 index da6d6eb7d0..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java +++ /dev/null @@ -1,1717 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import org.apache.log4j.Logger; -import org.apache.log4j.NDC; - -import org.apache.qpid.test.framework.TestUtils; - -import uk.co.thebadgerset.junit.extensions.BatchedThrottle; -import uk.co.thebadgerset.junit.extensions.Throttle; -import uk.co.thebadgerset.junit.extensions.util.CommandLineParser; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; - -import javax.jms.*; -import javax.naming.Context; -import javax.naming.InitialContext; -import javax.naming.NamingException; - -import java.io.*; -import java.net.InetAddress; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -/** - * PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may - * either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens - * to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping - * pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour - * configurable. - * - *

The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This - * means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation - * id in the ping to be bounced back in the reply correlation id. - * - *

This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It - * can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within - * transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover - * testing. A complete list of accepted parameters, default values and comments on their usage is provided here: - * - *

- *
Parameters
Parameter Default Comments - *
messageSize 0 Message size in bytes. Not including any headers. - *
destinationName ping The root name to use to generate destination names to ping. - *
persistent false Determines whether peristent delivery is used. - *
transacted false Determines whether messages are sent/received in transactions. - *
broker tcp://localhost:5672 Determines the broker to connect to. - *
virtualHost test Determines the virtual host to send all ping over. - *
rate 0 The maximum rate (in hertz) to send messages at. 0 means no limit. - *
verbose false The verbose flag for debugging. Prints to console on every message. - *
pubsub false Whether to ping topics or queues. Uses p2p by default. - *
failAfterCommit false Whether to prompt user to kill broker after a commit batch. - *
failBeforeCommit false Whether to prompt user to kill broker before a commit batch. - *
failAfterSend false Whether to prompt user to kill broker after a send. - *
failBeforeSend false Whether to prompt user to kill broker before a send. - *
failOnce true Whether to prompt for failover only once. - *
username guest The username to access the broker with. - *
password guest The password to access the broker with. - *
selector null Not used. Defines a message selector to filter pings with. - *
destinationCount 1 The number of destinations to send pings to. - *
numConsumers 1 The number of consumers on each destination. - *
timeout 30000 In milliseconds. The timeout to stop waiting for replies. - *
commitBatchSize 1 The number of messages per transaction in transactional mode. - *
uniqueDests true Whether each receivers only listens to one ping destination or all. - *
durableDests false Whether or not durable destinations are used. - *
ackMode AUTO_ACK The message acknowledgement mode. Possible values are: - * 0 - SESSION_TRANSACTED - * 1 - AUTO_ACKNOWLEDGE - * 2 - CLIENT_ACKNOWLEDGE - * 3 - DUPS_OK_ACKNOWLEDGE - * 257 - NO_ACKNOWLEDGE - * 258 - PRE_ACKNOWLEDGE - *
consTransacted false Whether or not consumers use transactions. Defaults to the same value - * as the 'transacted' option if not seperately defined. - *
consAckMode AUTO_ACK The message acknowledgement mode for consumers. Defaults to the same - * value as 'ackMode' if not seperately defined. - *
maxPending 0 The maximum size in bytes, of messages sent but not yet received. - * Limits the volume of messages currently buffered on the client - * or broker. Can help scale test clients by limiting amount of buffered - * data to avoid out of memory errors. - *
- * - *

This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop - * does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by - * starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also - * registered to terminate the ping-pong loop cleanly. - * - *

- *
CRC Card
Responsibilities Collaborations - *
Provide a ping and wait for all responses cycle. - *
Provide command line invocation to loop the ping cycle on a configurable broker url. - *
- * - * @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair. - * Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a - * block that obtains the write lock for the very last message, releases any waiting producer. Means that the last - * message waits until all other messages have been handled before releasing producers but allows messages to be - * processed concurrently, unlike the current synchronized block. - */ -public class PingPongProducer implements Runnable, ExceptionListener -{ - /** Used for debugging. */ - private static final Logger log = Logger.getLogger(PingPongProducer.class); - - /** Holds the name of the property to determine whether of not client id is overridden at connection time. */ - public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId"; - - /** Holds the default value of the override client id flag. */ - public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false"; - - /** Holds the name of the property to define the JNDI factory name with. */ - public static final String FACTORY_NAME_PROPNAME = "factoryName"; - - /** Holds the default JNDI name of the connection factory. */ - public static final String FACTORY_NAME_DEAFULT = "local"; - - /** Holds the name of the property to set the JNDI initial context properties with. */ - public static final String FILE_PROPERTIES_PROPNAME = "properties"; - - /** Holds the default file name of the JNDI initial context properties. */ - public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties"; - - /** Holds the name of the property to get the test message size from. */ - public static final String MESSAGE_SIZE_PROPNAME = "messageSize"; - - /** Used to set up a default message size. */ - public static final int MESSAGE_SIZE_DEAFULT = 0; - - /** Holds the name of the property to get the ping queue name from. */ - public static final String PING_QUEUE_NAME_PROPNAME = "destinationName"; - - /** Holds the name of the default destination to send pings on. */ - public static final String PING_QUEUE_NAME_DEFAULT = "ping"; - - /** Holds the name of the property to get the queue name postfix from. */ - public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix"; - - /** Holds the default queue name postfix value. */ - public static final String QUEUE_NAME_POSTFIX_DEFAULT = ""; - - /** Holds the name of the property to get the test delivery mode from. */ - public static final String PERSISTENT_MODE_PROPNAME = "persistent"; - - /** Holds the message delivery mode to use for the test. */ - public static final boolean PERSISTENT_MODE_DEFAULT = false; - - /** Holds the name of the property to get the test transactional mode from. */ - public static final String TRANSACTED_PROPNAME = "transacted"; - - /** Holds the transactional mode to use for the test. */ - public static final boolean TRANSACTED_DEFAULT = false; - - /** Holds the name of the property to get the test consumer transacted mode from. */ - public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted"; - - /** Holds the consumer transactional mode default setting. */ - public static final boolean CONSUMER_TRANSACTED_DEFAULT = false; - - /** Holds the name of the property to get the test broker url from. */ - public static final String BROKER_PROPNAME = "broker"; - - /** Holds the default broker url for the test. */ - public static final String BROKER_DEFAULT = "tcp://localhost:5672"; - - /** Holds the name of the property to get the test broker virtual path. */ - public static final String VIRTUAL_HOST_PROPNAME = "virtualHost"; - - /** Holds the default virtual path for the test. */ - public static final String VIRTUAL_HOST_DEFAULT = ""; - - /** Holds the name of the property to get the message rate from. */ - public static final String RATE_PROPNAME = "rate"; - - /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */ - public static final int RATE_DEFAULT = 0; - - /** Holds the name of the property to get the verbose mode proeprty from. */ - public static final String VERBOSE_PROPNAME = "verbose"; - - /** Holds the default verbose mode. */ - public static final boolean VERBOSE_DEFAULT = false; - - /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */ - public static final String PUBSUB_PROPNAME = "pubsub"; - - /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */ - public static final boolean PUBSUB_DEFAULT = false; - - /** Holds the name of the property to get the fail after commit flag from. */ - public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit"; - - /** Holds the default failover after commit test flag. */ - public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false; - - /** Holds the name of the proeprty to get the fail before commit flag from. */ - public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit"; - - /** Holds the default failover before commit test flag. */ - public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false; - - /** Holds the name of the proeprty to get the fail after send flag from. */ - public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend"; - - /** Holds the default failover after send test flag. */ - public static final boolean FAIL_AFTER_SEND_DEFAULT = false; - - /** Holds the name of the property to get the fail before send flag from. */ - public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend"; - - /** Holds the default failover before send test flag. */ - public static final boolean FAIL_BEFORE_SEND_DEFAULT = false; - - /** Holds the name of the property to get the fail once flag from. */ - public static final String FAIL_ONCE_PROPNAME = "failOnce"; - - /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */ - public static final boolean FAIL_ONCE_DEFAULT = true; - - /** Holds the name of the property to get the broker access username from. */ - public static final String USERNAME_PROPNAME = "username"; - - /** Holds the default broker log on username. */ - public static final String USERNAME_DEFAULT = "guest"; - - /** Holds the name of the property to get the broker access password from. */ - public static final String PASSWORD_PROPNAME = "password"; - - /** Holds the default broker log on password. */ - public static final String PASSWORD_DEFAULT = "guest"; - - /** Holds the name of the proeprty to get the. */ - public static final String SELECTOR_PROPNAME = "selector"; - - /** Holds the default message selector. */ - public static final String SELECTOR_DEFAULT = ""; - - /** Holds the name of the property to get the destination count from. */ - public static final String DESTINATION_COUNT_PROPNAME = "destinationCount"; - - /** Defines the default number of destinations to ping. */ - public static final int DESTINATION_COUNT_DEFAULT = 1; - - /** Holds the name of the property to get the number of consumers per destination from. */ - public static final String NUM_CONSUMERS_PROPNAME = "numConsumers"; - - /** Defines the default number consumers per destination. */ - public static final int NUM_CONSUMERS_DEFAULT = 1; - - /** Holds the name of the property to get the waiting timeout for response messages. */ - public static final String TIMEOUT_PROPNAME = "timeout"; - - /** Default time to wait before assuming that a ping has timed out. */ - public static final long TIMEOUT_DEFAULT = 30000; - - /** Holds the name of the property to get the commit batch size from. */ - public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize"; - - /** Defines the default number of pings to send in each transaction when running transactionally. */ - public static final int TX_BATCH_SIZE_DEFAULT = 1; - - /** Holds the name of the property to get the unique destinations flag from. */ - public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests"; - - /** Defines the default value for the unique destinations property. */ - public static final boolean UNIQUE_DESTS_DEFAULT = true; - - /** Holds the name of the property to get the durable destinations flag from. */ - public static final String DURABLE_DESTS_PROPNAME = "durableDests"; - - /** Defines the default value of the durable destinations flag. */ - public static final boolean DURABLE_DESTS_DEFAULT = false; - - /** Holds the name of the proeprty to get the message acknowledgement mode from. */ - public static final String ACK_MODE_PROPNAME = "ackMode"; - - /** Defines the default message acknowledgement mode. */ - public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; - - /** Holds the name of the property to get the consumers message acknowledgement mode from. */ - public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode"; - - /** Defines the default consumers message acknowledgement mode. */ - public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE; - - /** Holds the name of the property to get the maximum pending message size setting from. */ - public static final String MAX_PENDING_PROPNAME = "maxPending"; - - /** Defines the default value for the maximum pending message size setting. 0 means no limit. */ - public static final int MAX_PENDING_DEFAULT = 0; - - /** Defines the default prefetch size to use when consuming messages. */ - public static final int PREFETCH_DEFAULT = 100; - - /** Defines the default value of the no local flag to use when consuming messages. */ - public static final boolean NO_LOCAL_DEFAULT = false; - - /** Defines the default value of the exclusive flag to use when consuming messages. */ - public static final boolean EXCLUSIVE_DEFAULT = false; - - /** Holds the name of the property to store nanosecond timestamps in ping messages with. */ - public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp"; - - /** Holds the default configuration properties. */ - public static ParsedProperties defaults = new ParsedProperties(); - - static - { - defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT); - defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT); - defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT); - defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT); - defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT); - defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT); - defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT); - defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT); - defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT); - defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT); - defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT); - defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT); - defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT); - defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT); - defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT); - defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT); - defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT); - defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT); - defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT); - defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT); - defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT); - defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT); - defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT); - defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT); - defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT); - defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT); - defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT); - defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT); - defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT); - defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT); - defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT); - } - - /** Allows setting of client ID on the connection, rather than through the connection URL. */ - protected boolean _overrideClientId; - - /** Holds the JNDI name of the JMS connection factory. */ - protected String _factoryName; - - /** Holds the name of the properties file to configure JNDI with. */ - protected String _fileProperties; - - /** Holds the broker url. */ - protected String _brokerDetails; - - /** Holds the username to access the broker with. */ - protected String _username; - - /** Holds the password to access the broker with. */ - protected String _password; - - /** Holds the virtual host on the broker to run the tests through. */ - protected String _virtualpath; - - /** Holds the root name from which to generate test destination names. */ - protected String _destinationName; - - /** Holds the default queue name postfix value. */ - protected String _queueNamePostfix; - - /** Holds the message selector to filter the pings with. */ - protected String _selector; - - /** Holds the producers transactional mode flag. */ - protected boolean _transacted; - - /** Holds the consumers transactional mode flag. */ - protected boolean _consTransacted; - - /** Determines whether this producer sends persistent messages. */ - protected boolean _persistent; - - /** Holds the acknowledgement mode used for the producers. */ - protected int _ackMode; - - /** Holds the acknowledgement mode setting for the consumers. */ - protected int _consAckMode; - - /** Determines what size of messages this producer sends. */ - protected int _messageSize; - - /** Used to indicate that the ping loop should print out whenever it pings. */ - protected boolean _verbose; - - /** Flag used to indicate if this is a point to point or pub/sub ping client. */ - protected boolean _isPubSub; - - /** Flag used to indicate if the destinations should be unique client. */ - protected boolean _isUnique; - - /** Flag used to indicate that durable destination should be used. */ - protected boolean _isDurable; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */ - protected boolean _failBeforeCommit; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */ - protected boolean _failAfterCommit; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */ - protected boolean _failBeforeSend; - - /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */ - protected boolean _failAfterSend; - - /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */ - protected boolean _failOnce; - - /** Holds the number of sends that should be performed in every transaction when using transactions. */ - protected int _txBatchSize; - - /** Holds the number of destinations to ping. */ - protected int _noOfDestinations; - - /** Holds the number of consumers per destination. */ - protected int _noOfConsumers; - - /** Holds the maximum send rate in herz. */ - protected int _rate; - - /** - * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended - * if this limit is breached. - */ - protected int _maxPendingSize; - - /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */ - private static AtomicLong _correlationIdGenerator = new AtomicLong(0L); - - /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */ - private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0); - - /** Holds this instances unique id. */ - private int instanceId; - - /** - * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple - * ping producers on the same JVM. - */ - private static Map perCorrelationIds = - Collections.synchronizedMap(new HashMap()); - - /** A convenient formatter to use when time stamping output. */ - protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS"); - - /** Holds the connection for the message producer. */ - protected Connection _connection; - - /** Holds the consumer connections. */ - protected Connection[] _consumerConnection; - - /** Holds the controlSession on which ping replies are received. */ - protected Session[] _consumerSession; - - /** Holds the producer controlSession, needed to create ping messages. */ - protected Session _producerSession; - - /** Holds the destination where the response messages will arrive. */ - protected Destination _replyDestination; - - /** Holds the set of destinations that this ping producer pings. */ - protected List _pingDestinations; - - /** Used to restrict the sending rate to a specified limit. */ - protected Throttle _rateLimiter; - - /** Holds a message listener that this message listener chains all its messages to. */ - protected ChainedMessageListener _chainedMessageListener = null; - - /** - * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when - * creating multiple ping producers in the same JVM. - */ - protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger(); - - /** - * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers - * on the same JVM using this id generator will allow them to ping on the same queues. - */ - protected AtomicInteger _queueSharedID = new AtomicInteger(); - - /** Used to tell the ping loop when to terminate, it only runs while this is true. */ - protected boolean _publish = true; - - /** Holds the message producer to send the pings through. */ - protected MessageProducer _producer; - - /** Holds the message consumer to receive the ping replies through. */ - protected MessageConsumer[] _consumer; - - /** The prompt to display when asking the user to kill the broker for failover testing. */ - private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return."; - - /** Holds the name for this test client to be identified to the broker with. */ - private String _clientID; - - /** Keeps count of the total messages sent purely for debugging purposes. */ - private static AtomicInteger numSent = new AtomicInteger(); - - /** - * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected - * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a - * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an - * equal chance to produce messages. - */ - static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true); - - /** Keeps a count of the number of message currently sent but not received. */ - static AtomicInteger _unreceived = new AtomicInteger(0); - - /** - * Creates a ping producer with the specified parameters, of which there are many. See the class level comments - * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on - * it, to send and recieve its pings and replies on. - * - * @param overrides Properties containing any desired overrides to the defaults. - * - * @throws Exception Any exceptions are allowed to fall through. - */ - public PingPongProducer(Properties overrides) throws Exception - { - // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called"); - instanceId = _instanceIdGenerator.getAndIncrement(); - - // Create a set of parsed properties from the defaults overriden by the passed in values. - ParsedProperties properties = new ParsedProperties(defaults); - properties.putAll(overrides); - - // Extract the configuration properties to set the pinger up with. - _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME); - _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME); - _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME); - _brokerDetails = properties.getProperty(BROKER_PROPNAME); - _username = properties.getProperty(USERNAME_PROPNAME); - _password = properties.getProperty(PASSWORD_PROPNAME); - _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME); - _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME); - _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME); - _selector = properties.getProperty(SELECTOR_PROPNAME); - _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME); - _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME); - _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME); - _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME); - _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME); - _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME); - _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME); - _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME); - _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME); - _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME); - _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME); - _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME); - _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME); - _rate = properties.getPropertyAsInteger(RATE_PROPNAME); - _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME); - _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME); - _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME); - _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME); - _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME); - _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME); - - // Check that one or more destinations were specified. - if (_noOfDestinations < 1) - { - throw new IllegalArgumentException("There must be at least one destination."); - } - - // Set up a throttle to control the send rate, if a rate > 0 is specified. - if (_rate > 0) - { - _rateLimiter = new BatchedThrottle(); - _rateLimiter.setRate(_rate); - } - - // Create the connection and message producers/consumers. - // establishConnection(true, true); - } - - /** - * Establishes a connection to the broker and creates message consumers and producers based on the parameters - * that this ping client was created with. - * - * @param producer Flag to indicate whether or not the producer should be set up. - * @param consumer Flag to indicate whether or not the consumers should be set up. - * - * @throws Exception Any exceptions are allowed to fall through. - */ - public void establishConnection(boolean producer, boolean consumer) throws Exception - { - // log.debug("public void establishConnection(): called"); - - // Generate a unique identifying name for this client, based on it ip address and the current time. - InetAddress address = InetAddress.getLocalHost(); - // _clientID = address.getHostName() + System.currentTimeMillis(); - _clientID = "perftest_" + instanceId; - - // Create a connection to the broker. - createConnection(_clientID); - - // Create transactional or non-transactional sessions, based on the command line arguments. - _producerSession = _connection.createSession(_transacted, _ackMode); - - _consumerSession = new Session[_noOfConsumers]; - - for (int i = 0; i < _noOfConsumers; i++) - { - _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode); - } - - // Create the destinations to send pings to and receive replies from. - _replyDestination = _consumerSession[0].createTemporaryQueue(); - createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable); - - // Create the message producer only if instructed to. - if (producer) - { - createProducer(); - } - - // Create the message consumer only if instructed to. - if (consumer) - { - createReplyConsumers(getReplyDestinations(), _selector); - } - } - - /** - * Establishes a connection to the broker, based on the configuration parameters that this ping client was - * created with. - * - * @param clientID The clients identifier. - * - * @throws JMSException Underlying exceptions allowed to fall through. - * @throws NamingException Underlying exceptions allowed to fall through. - * @throws IOException Underlying exceptions allowed to fall through. - */ - protected void createConnection(String clientID) throws JMSException, NamingException, IOException - { - // _log.debug("protected void createConnection(String clientID = " + clientID + "): called"); - - // _log.debug("Creating a connection for the message producer."); - - - Context context = InitialContextHelper.getInitialContext(_fileProperties); - ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName); - _connection = factory.createConnection(_username, _password); - - if (_overrideClientId) - { - _connection.setClientID(clientID); - } - - // _log.debug("Creating " + _noOfConsumers + " connections for the consumers."); - - _consumerConnection = new Connection[_noOfConsumers]; - - for (int i = 0; i < _noOfConsumers; i++) - { - _consumerConnection[i] = factory.createConnection(_username, _password); - // _consumerConnection[i].setClientID(clientID); - } - } - - /** - * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs - * to be started to bounce the pings back again. - * - * @param args The command line arguments. - */ - public static void main(String[] args) - { - try - { - Properties options = - CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties()); - - // Create a ping producer overriding its defaults with all options passed on the command line. - PingPongProducer pingProducer = new PingPongProducer(options); - pingProducer.establishConnection(true, true); - - // Start the ping producers dispatch thread running. - pingProducer._connection.start(); - - // Create a shutdown hook to terminate the ping-pong producer. - Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook()); - - // Ensure that the ping pong producer is registered to listen for exceptions on the connection too. - pingProducer._connection.setExceptionListener(pingProducer); - - // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception. - Thread pingThread = new Thread(pingProducer); - pingThread.run(); - pingThread.join(); - } - catch (Exception e) - { - System.err.println(e.getMessage()); - log.error("Top level handler caught execption.", e); - System.exit(1); - } - } - - /** - * Convenience method for a short pause. - * - * @param sleepTime The time in milliseconds to pause for. - */ - public static void pause(long sleepTime) - { - if (sleepTime > 0) - { - try - { - Thread.sleep(sleepTime); - } - catch (InterruptedException ie) - { } - } - } - - /** - * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to - * destination of this pinger. - * - * @return The single reply to destination of this pinger, wrapped in a list. - */ - public List getReplyDestinations() - { - // log.debug("public List getReplyDestinations(): called"); - - List replyDestinations = new ArrayList(); - replyDestinations.add(_replyDestination); - - // log.debug("replyDestinations = " + replyDestinations); - - return replyDestinations; - } - - /** - * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery - * flag is set accoring the ping producer creation options. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - public void createProducer() throws JMSException - { - // log.debug("public void createProducer(): called"); - - _producer = (MessageProducer) _producerSession.createProducer(null); - _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - - // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages."); - } - - /** - * Creates consumers for the specified number of destinations. The destinations themselves are also created by this - * method. - * - * @param noOfDestinations The number of destinations to create consumers for. - * @param selector The message selector to filter the consumers with. - * @param rootName The root of the name, or actual name if only one is being created. - * @param unique true to make the destinations unique to this pinger, false to share the - * numbering with all pingers on the same JVM. - * @param durable If the destinations are durable topics. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique, - boolean durable) throws JMSException - { - /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = " - + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = " - + durable + "): called");*/ - - _pingDestinations = new ArrayList(); - - // Create the desired number of ping destinations and consumers for them. - // log.debug("Creating " + noOfDestinations + " destinations to ping."); - - for (int i = 0; i < noOfDestinations; i++) - { - Destination destination; - String id; - - // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag. - if (unique) - { - // log.debug("Creating unique destinations."); - id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID(); - } - else - { - // log.debug("Creating shared destinations."); - id = "_" + _queueSharedID.incrementAndGet(); - } - - // Check if this is a pub/sub pinger, in which case create topics. - if (_isPubSub) - { - destination = _producerSession.createTopic(rootName + id); - // log.debug("Created non-durable topic " + destination); - - if (durable) - { - _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID()); - } - } - // Otherwise this is a p2p pinger, in which case create queues. - else - { - destination = _producerSession.createQueue(rootName + id + _queueNamePostfix); - // log.debug("Created queue " + destination); - } - - // Keep the destination. - _pingDestinations.add(destination); - } - } - - /** - * Creates consumers for the specified destinations and registers this pinger to listen to their messages. - * - * @param destinations The destinations to listen to. - * @param selector A selector to filter the messages with. - * - * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through. - */ - public void createReplyConsumers(Collection destinations, String selector) throws JMSException - { - /*log.debug("public void createReplyConsumers(Collection destinations = " + destinations - + ", String selector = " + selector + "): called");*/ - - log.debug("There are " + destinations.size() + " destinations."); - log.debug("Creating " + _noOfConsumers + " consumers on each destination."); - log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers)); - - for (Destination destination : destinations) - { - _consumer = new MessageConsumer[_noOfConsumers]; - - for (int i = 0; i < _noOfConsumers; i++) - { - // Create a consumer for the destination and set this pinger to listen to its messages. - _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT); - - final int consumerNo = i; - - _consumer[i].setMessageListener(new MessageListener() - { - public void onMessage(Message message) - { - onMessageWithConsumerNo(message, consumerNo); - } - }); - - log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination); - } - } - } - - /** - * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a - * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the - * replies map. - * - * @param message The received message. - * @param consumerNo The consumer number within this test pinger instance. - */ - public void onMessageWithConsumerNo(Message message, int consumerNo) - { - // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called"); - try - { - long now = System.nanoTime(); - long timestamp = getTimestamp(message); - long pingTime = now - timestamp; - - // NDC.push("id" + instanceId + "/cons" + consumerNo); - - // Extract the messages correlation id. - String correlationID = message.getJMSCorrelationID(); - // log.debug("correlationID = " + correlationID); - - // int num = message.getIntProperty("MSG_NUM"); - // log.info("Message " + num + " received."); - - boolean isRedelivered = message.getJMSRedelivered(); - // log.debug("isRedelivered = " + isRedelivered); - - if (!isRedelivered) - { - // Countdown on the traffic light if there is one for the matching correlation id. - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); - - if (perCorrelationId != null) - { - CountDownLatch trafficLight = perCorrelationId.trafficLight; - - // Restart the timeout timer on every message. - perCorrelationId.timeOutStart = System.nanoTime(); - - // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID); - - // Release waiting senders if there are some and using maxPending limit. - if ((_maxPendingSize > 0)) - { - // Decrement the count of sent but not yet received messages. - int unreceived = _unreceived.decrementAndGet(); - int unreceivedSize = - (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) - / (_isPubSub ? getConsumersPerDestination() : 1); - - // log.debug("unreceived = " + unreceived); - // log.debug("unreceivedSize = " + unreceivedSize); - - // synchronized (_sendPauseMonitor) - // { - if (unreceivedSize < _maxPendingSize) - { - _sendPauseMonitor.poll(); - } - // } - } - - // Decrement the countdown latch. Before this point, it is possible that two threads might enter this - // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block - // ensures that each thread will get a unique value for the remaining messages. - long trueCount; - long remainingCount; - - synchronized (trafficLight) - { - trafficLight.countDown(); - - trueCount = trafficLight.getCount(); - remainingCount = trueCount - 1; - - // NDC.push("/rem" + remainingCount); - - // log.debug("remainingCount = " + remainingCount); - // log.debug("trueCount = " + trueCount); - - // Commit on transaction batch size boundaries. At this point in time the waiting producer - // remains blocked, even on the last message. - // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on - // each batch boundary. For pub/sub each consumer gets every message so no division is done. - // When running in client ack mode, an ack is done instead of a commit, on the commit batch - // size boundaries. - long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers); - // log.debug("commitCount = " + commitCount); - - if ((commitCount % _txBatchSize) == 0) - { - if (_consAckMode == 2) - { - // log.debug("Doing client ack for consumer " + consumerNo + "."); - message.acknowledge(); - } - else - { - // log.debug("Trying commit for consumer " + consumerNo + "."); - commitTx(_consumerSession[consumerNo]); - // log.info("Tx committed on consumer " + consumerNo); - } - } - - // Forward the message and remaining count to any interested chained message listener. - if (_chainedMessageListener != null) - { - _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime); - } - - // Check if this is the last message, in which case release any waiting producers. This is done - // after the transaction has been committed and any listeners notified. - if (trueCount == 1) - { - trafficLight.countDown(); - } - } - } - else - { - log.warn("Got unexpected message with correlationId: " + correlationID); - } - } - else - { - log.warn("Got redelivered message, ignoring."); - } - } - catch (JMSException e) - { - log.warn("There was a JMSException: " + e.getMessage(), e); - } - finally - { - // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending"); - // NDC.clear(); - } - } - - /** - * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out - * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify - * the correlation id. - * - * @param message The message to send. If this is null, one is generated. - * @param numPings The number of ping messages to send. - * @param timeout The timeout in milliseconds. - * @param messageCorrelationId The message correlation id. If this is null, one is generated. - * - * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait - * for all prematurely. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - * @throws InterruptedException When interrupted by a timeout - */ - public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId) - throws JMSException, InterruptedException - { - /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = " - + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ - - // Generate a unique correlation id to put on the messages before sending them, if one was not specified. - if (messageCorrelationId == null) - { - messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet()); - } - - try - { - // NDC.push("prod"); - - // Create a count down latch to count the number of replies with. This is created before the messages are - // sent so that the replies cannot be received before the count down is created. - // One is added to this, so that the last reply becomes a special case. The special case is that the - // chained message listener must be called before this sender can be unblocked, but that decrementing the - // countdown needs to be done before the chained listener can be called. - PerCorrelationId perCorrelationId = new PerCorrelationId(); - - perCorrelationId.trafficLight = new CountDownLatch(getExpectedNumPings(numPings) + 1); - perCorrelationIds.put(messageCorrelationId, perCorrelationId); - - // Set up the current time as the start time for pinging on the correlation id. This is used to determine - // timeouts. - perCorrelationId.timeOutStart = System.nanoTime(); - - // Send the specifed number of messages. - pingNoWaitForReply(message, numPings, messageCorrelationId); - - boolean timedOut; - boolean allMessagesReceived; - int numReplies; - - do - { - // Block the current thread until replies to all the messages are received, or it times out. - perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS); - - // Work out how many replies were receieved. - numReplies = getExpectedNumPings(numPings) - (int) perCorrelationId.trafficLight.getCount(); - - allMessagesReceived = numReplies == getExpectedNumPings(numPings); - - // log.debug("numReplies = " + numReplies); - // log.debug("allMessagesReceived = " + allMessagesReceived); - - // Recheck the timeout condition. - long now = System.nanoTime(); - long lastMessageReceievedAt = perCorrelationId.timeOutStart; - timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000); - - // log.debug("now = " + now); - // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt); - } - while (!timedOut && !allMessagesReceived); - - if ((numReplies < getExpectedNumPings(numPings)) && _verbose) - { - log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId); - } - else if (_verbose) - { - log.info("Got all replies on id, " + messageCorrelationId); - } - - // commitTx(_consumerSession); - - // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending"); - - return numReplies; - } - // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived, - // so will be a memory leak if this is not done. - finally - { - // NDC.pop(); - perCorrelationIds.remove(messageCorrelationId); - } - } - - /** - * Sends the specified number of ping messages and does not wait for correlating replies. - * - * @param message The message to send. - * @param numPings The number of pings to send. - * @param messageCorrelationId A correlation id to place on all messages sent. - * - * @throws JMSException All underlying JMSExceptions are allowed to fall through. - */ - public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException - { - /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings - + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/ - - if (message == null) - { - message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent); - } - - message.setJMSCorrelationID(messageCorrelationId); - - // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the - // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is - // needed. - boolean committed = false; - - // Send all of the ping messages. - for (int i = 0; i < numPings; i++) - { - // Re-timestamp the message. - // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); - - // Send the message, passing in the message count. - committed = sendMessage(i, message); - - // Spew out per message timings on every message sonly in verbose mode. - /*if (_verbose) - { - log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId); - }*/ - } - - // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages. - if (!committed) - { - commitTx(_producerSession); - } - } - - /** - * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of - * messages sent so far must be specified and is used to round robin the ping destinations (where there are more - * than one), and to determine if the transaction batch size has been reached and the sent messages should be - * committed. - * - * @param i The count of messages sent so far in a loop of multiple calls to this send method. - * @param message The message to send. - * - * @return true if the messages were committed, false otherwise. - * - * @throws JMSException All underlyiung JMSExceptions are allowed to fall through. - */ - protected boolean sendMessage(int i, Message message) throws JMSException - { - try - { - NDC.push("id" + instanceId + "/prod"); - - // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called"); - // log.debug("_txBatchSize = " + _txBatchSize); - - // Round robin the destinations as the messages are sent. - Destination destination = _pingDestinations.get(i % _pingDestinations.size()); - - // Prompt the user to kill the broker when doing failover testing. - _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend); - - // Get the test setup for the correlation id. - String correlationID = message.getJMSCorrelationID(); - PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID); - - // If necessary, wait until the max pending message size comes within its limit. - if (_maxPendingSize > 0) - { - synchronized (_sendPauseMonitor) - { - // Used to keep track of the number of times that send has to wait. - int numWaits = 0; - - // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with - // the test timeout. - int waitLimit = (int) (TIMEOUT_DEFAULT / 10000); - - while (true) - { - // Get the size estimate of sent but not yet received messages. - int unreceived = _unreceived.get(); - int unreceivedSize = - (unreceived * ((_messageSize == 0) ? 1 : _messageSize)) - / (_isPubSub ? getConsumersPerDestination() : 1); - - // log.debug("unreceived = " + unreceived); - // log.debug("unreceivedSize = " + unreceivedSize); - // log.debug("_maxPendingSize = " + _maxPendingSize); - - if (unreceivedSize > _maxPendingSize) - { - // log.debug("unreceived size estimate over limit = " + unreceivedSize); - - // Fail the test if the send has had to wait more than the maximum allowed number of times. - if (numWaits > waitLimit) - { - String errorMessage = - "Send has had to wait for the unreceivedSize (" + unreceivedSize - + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit - + " times."; - log.warn(errorMessage); - throw new RuntimeException(errorMessage); - } - - // Wait on the send pause barrier for the limit to be re-established. - try - { - long start = System.nanoTime(); - // _sendPauseMonitor.wait(10000); - _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS); - long end = System.nanoTime(); - - // Count the wait only if it was for > 99% of the requested wait time. - if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99) - { - numWaits++; - } - } - catch (InterruptedException e) - { - // Restore the interrupted status - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - else - { - break; - } - } - } - } - - // Send the message either to its round robin destination, or its default destination. - // int num = numSent.incrementAndGet(); - // message.setIntProperty("MSG_NUM", num); - setTimestamp(message); - - if (destination == null) - { - _producer.send(message); - } - else - { - _producer.send(destination, message); - } - - // Increase the unreceived size, this may actually happen after the message is received. - // The unreceived size is incremented by the number of consumers that will get a copy of the message, - // in pub/sub mode. - if (_maxPendingSize > 0) - { - int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1); - // log.debug("newUnreceivedCount = " + newUnreceivedCount); - } - - // Apply message rate throttling if a rate limit has been set up. - if (_rateLimiter != null) - { - _rateLimiter.throttle(); - } - - // Call commit every time the commit batch size is reached. - boolean committed = false; - - // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent. - if (((i + 1) % _txBatchSize) == 0) - { - // log.debug("Trying commit on producer session."); - committed = commitTx(_producerSession); - } - - return committed; - } - finally - { - NDC.clear(); - } - } - - /** - * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the - * test that the failure has occurred, before the method returns. - * - * @param failFlag The fail flag to test. - * - * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only - * used once, then reset. - */ - private boolean waitForUserToPromptOnFailure(boolean failFlag) - { - if (failFlag) - { - if (_failOnce) - { - failFlag = false; - } - - // log.debug("Failing Before Send"); - waitForUser(KILL_BROKER_PROMPT); - } - - return failFlag; - } - - /** - * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch - * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will - * terminate the pinger. - */ - public void pingLoop() - { - try - { - // Generate a sample message and time stamp it. - Message msg = getTestMessage(_replyDestination, _messageSize, _persistent); - // setTimestamp(msg); - - // Send the message and wait for a reply. - pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null); - } - catch (JMSException e) - { - _publish = false; - // log.debug("There was a JMSException: " + e.getMessage(), e); - } - catch (InterruptedException e) - { - _publish = false; - // log.debug("There was an interruption: " + e.getMessage(), e); - } - } - - /** - * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set - * here. - * - * @param messageListener The chained message listener. - */ - public void setChainedMessageListener(ChainedMessageListener messageListener) - { - _chainedMessageListener = messageListener; - } - - /** Removes any chained message listeners from this pinger. */ - public void removeChainedMessageListener() - { - _chainedMessageListener = null; - } - - /** - * Generates a test message of the specified size, with the specified reply-to destination and persistence flag. - * - * @param replyQueue The reply-to destination for the message. - * @param messageSize The desired size of the message in bytes. - * @param persistent true if the message should use persistent delivery, false otherwise. - * - * @return A freshly generated test message. - * - * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through. - */ - public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException - { - // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent); - return TestUtils.createTestMessageOfSize(_producerSession, messageSize); - } - - /** - * Sets the current time in nanoseconds as the timestamp on the message. - * - * @param msg The message to timestamp. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - protected void setTimestamp(Message msg) throws JMSException - { - /*if (((AMQSession)_producerSession).isStrictAMQP()) - { - ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime()); - } - else - {*/ - msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime()); - // } - } - - /** - * Extracts the nanosecond timestamp from a message. - * - * @param msg The message to extract the time stamp from. - * - * @return The timestamp in nanos. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - protected long getTimestamp(Message msg) throws JMSException - { - /*if (((AMQSession)_producerSession).isStrictAMQP()) - { - Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME)); - - return (value == null) ? 0L : value; - } - else - {*/ - return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME); - // } - } - - /** - * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag - * has been cleared. - */ - public void stop() - { - _publish = false; - } - - /** - * Starts the producer and consumer connections. - * - * @throws JMSException Any JMSExceptions are allowed to fall through. - */ - public void start() throws JMSException - { - // log.debug("public void start(): called"); - - _connection.start(); - // log.debug("Producer started."); - - for (int i = 0; i < _noOfConsumers; i++) - { - _consumerConnection[i].start(); - // log.debug("Consumer " + i + " started."); - } - } - - /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */ - public void run() - { - // Keep running until the publish flag is cleared. - while (_publish) - { - pingLoop(); - } - } - - /** - * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the - * connection, this clears the publish flag which in turn will halt the ping loop. - * - * @param e The exception that triggered this callback method. - */ - public void onException(JMSException e) - { - // log.debug("public void onException(JMSException e = " + e + "): called", e); - _publish = false; - } - - /** - * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered - * with the runtime system as a shutdown hook. - * - * @return A shutdown hook for the ping loop. - */ - public Thread getShutdownHook() - { - return new Thread(new Runnable() - { - public void run() - { - stop(); - } - }); - } - - /** - * Closes all of the producer and consumer connections. - * - * @throws JMSException All JMSException are allowed to fall through. - */ - public void close() throws JMSException - { - // log.debug("public void close(): called"); - - try - { - if (_connection != null) - { - // log.debug("Before close producer connection."); - _connection.close(); - // log.debug("Closed producer connection."); - } - - for (int i = 0; i < _noOfConsumers; i++) - { - if (_consumerConnection[i] != null) - { - // log.debug("Before close consumer connection " + i + "."); - _consumerConnection[i].close(); - // log.debug("Closed consumer connection " + i + "."); - } - } - } - finally - { - _connection = null; - _producerSession = null; - _consumerSession = null; - _consumerConnection = null; - _producer = null; - _consumer = null; - _pingDestinations = null; - _replyDestination = null; - } - } - - /** - * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a - * transactional controlSession, this method does nothing (unless the failover after send flag is set). - * - *

If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is - * applied. This flag applies whether the pinger is transactional or not. - * - *

If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit - * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the - * commit is applied. These flags will only apply if using a transactional pinger. - * - * @param session The controlSession to commit - * - * @return true if the controlSession was committed, false if it was not. - * - * @throws javax.jms.JMSException If the commit fails and then the rollback fails. - * - * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit - * method, because commits only apply to transactional pingers, but fail after send applied to transactional and - * non-transactional alike. - */ - protected boolean commitTx(Session session) throws JMSException - { - // log.debug("protected void commitTx(Session session): called"); - - boolean committed = false; - - _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend); - - if (session.getTransacted()) - { - // log.debug("Session is transacted."); - - try - { - _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit); - - long start = System.nanoTime(); - session.commit(); - committed = true; - // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms"); - - _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit); - - // log.debug("Session Commited."); - } - catch (JMSException e) - { - // log.debug("JMSException on commit:" + e.getMessage(), e); - - try - { - session.rollback(); - // log.debug("Message rolled back."); - } - catch (JMSException jmse) - { - // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse); - - // Both commit and rollback failed. Throw the rollback exception. - throw jmse; - } - } - } - - return committed; - } - - /** - * Outputs a prompt to the console and waits for the user to press return. - * - * @param prompt The prompt to display on the console. - */ - public void waitForUser(String prompt) - { - System.out.println(prompt); - - try - { - System.in.read(); - } - catch (IOException e) - { - // Ignored. - } - - System.out.println("Continuing."); - } - - /** - * Gets the number of consumers that are listening to each destination in the test. - * - * @return int The number of consumers subscribing to each topic. - */ - public int getConsumersPerDestination() - { - return _noOfConsumers; - } - - /** - * Calculates how many pings are expected to be received for the given number sent. - * - * @param numpings The number of pings that will be sent. - * - * @return The number that should be received, for the test to pass. - */ - public int getExpectedNumPings(int numpings) - { - // log.debug("public int getExpectedNumPings(int numpings = " + numpings + "): called"); - - // log.debug("Each ping will be received by " + (_isPubSub ? getConsumersPerDestination() : 1) + " consumers."); - - return numpings * (_isPubSub ? getConsumersPerDestination() : 1); - } - - /** - * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link - * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link - * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of - * messages with that correlation id. - * - *

Provided only one pinger is producing messages with that correlation id, the chained listener will always be - * given unique message counts. It will always be called while the producer waiting for all messages to arrive is - * still blocked. - */ - public static interface ChainedMessageListener - { - /** - * Notifies interested listeners about message arrival and important test stats, the number of messages - * remaining in the test, and the messages send timestamp. - * - * @param message The newly arrived message. - * @param remainingCount The number of messages left to complete the test. - * @param latency The nanosecond latency of the message. - * - * @throws JMSException Any JMS exceptions is allowed to fall through. - */ - public void onMessage(Message message, int remainingCount, long latency) throws JMSException; - } - - /** - * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be - * added to this: read/write lock to make onMessage more concurrent as described in class header comment. - */ - protected static class PerCorrelationId - { - /** Holds a countdown on number of expected messages. */ - CountDownLatch trafficLight; - - /** Holds the last timestamp that the timeout was reset to. */ - Long timeOutStart; - } -} diff --git a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java b/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java deleted file mode 100644 index 780589768f..0000000000 --- a/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.requestreply; - -import junit.framework.Assert; -import junit.framework.Test; -import junit.framework.TestSuite; - -import org.apache.log4j.Logger; - -import uk.co.thebadgerset.junit.extensions.AsymptoticTestCase; -import uk.co.thebadgerset.junit.extensions.util.ParsedProperties; -import uk.co.thebadgerset.junit.extensions.util.TestContextProperties; - -import javax.jms.*; - -/** - * PingPongTestPerf is a full round trip ping test, that has been written with the intention of being scaled up to run - * many times simultaneously to simluate many clients/producer/connections. A full round trip ping sends a message from - * a producer to a conumer, then the consumer replies to the message on a temporary queue. - * - *

A single run of the test using the default JUnit test runner will result in the sending and timing of the number - * of pings specified by the test size and time how long it takes for all of these to complete. This test may be scaled - * up using a suitable JUnit test runner. See {@link uk.co.thebadgerset.junit.extensions.TKTestRunner} for more - * information on how to do this. - * - *

The setup/teardown cycle establishes a connection to a broker and sets up a queue to send ping messages to and a - * temporary queue for replies. This setup is only established once for all the test repeats, but each test threads - * gets its own connection/producer/consumer, this is only re-established if the connection is lost. - * - *

The test cycle is: Connects to a queue, creates a temporary queue, creates messages containing a property that - * is the name of the temporary queue, fires off many messages on the original queue and waits for them all to come - * back on the temporary queue. - * - *

Configurable test properties: message size, transacted or not, persistent or not. Broker connection details. - * - *

- *
CRC Card
Responsibilities Collaborations - *
- */ -public class PingPongTestPerf extends AsymptoticTestCase -{ - private static Logger _logger = Logger.getLogger(PingPongTestPerf.class); - - /** Thread local to hold the per-thread test setup fields. */ - ThreadLocal threadSetup = new ThreadLocal(); - - // Set up a property reader to extract the test parameters from. Once ContextualProperties is available in - // the project dependencies, use it to get property overrides for configurable tests and to notify the test runner - // of the test parameters to log with the results. It also providers some basic type parsing convenience methods. - // private Properties testParameters = System.getProperties(); - private ParsedProperties testParameters = - TestContextProperties.getInstance(PingPongProducer.defaults /*System.getProperties()*/); - - public PingPongTestPerf(String name) - { - super(name); - - _logger.debug(testParameters); - - // Sets up the test parameters with defaults. - /*testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, - Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.MESSAGE_SIZE_PROPNAME, - Integer.toString(PingPongProducer.MESSAGE_SIZE_DEAFULT)); - testParameters.setPropertyIfNull(PingPongProducer.PING_QUEUE_NAME_PROPNAME, - PingPongProducer.PING_QUEUE_NAME_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PERSISTENT_MODE_PROPNAME, - Boolean.toString(PingPongProducer.PERSISTENT_MODE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TRANSACTED_PROPNAME, - Boolean.toString(PingPongProducer.TRANSACTED_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.BROKER_PROPNAME, PingPongProducer.BROKER_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.USERNAME_PROPNAME, PingPongProducer.USERNAME_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.PASSWORD_PROPNAME, PingPongProducer.PASSWORD_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.VIRTUAL_HOST_PROPNAME, PingPongProducer.VIRTUAL_HOST_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.VERBOSE_PROPNAME, - Boolean.toString(PingPongProducer.VERBOSE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.RATE_PROPNAME, Integer.toString(PingPongProducer.RATE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.PUBSUB_PROPNAME, - Boolean.toString(PingPongProducer.PUBSUB_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TX_BATCH_SIZE_PROPNAME, - Integer.toString(PingPongProducer.TX_BATCH_SIZE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.TIMEOUT_PROPNAME, Long.toString(PingPongProducer.TIMEOUT_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.DESTINATION_COUNT_PROPNAME, - Integer.toString(PingPongProducer.DESTINATION_COUNT_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_COMMIT_PROPNAME, - PingPongProducer.FAIL_AFTER_COMMIT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_COMMIT_PROPNAME, - PingPongProducer.FAIL_BEFORE_COMMIT_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_AFTER_SEND_PROPNAME, - PingPongProducer.FAIL_AFTER_SEND_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_BEFORE_SEND_PROPNAME, - PingPongProducer.FAIL_BEFORE_SEND_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.FAIL_ONCE_PROPNAME, PingPongProducer.FAIL_ONCE_DEFAULT); - testParameters.setPropertyIfNull(PingPongProducer.UNIQUE_DESTS_PROPNAME, - Boolean.toString(PingPongProducer.UNIQUE_DESTS_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.ACK_MODE_PROPNAME, - Integer.toString(PingPongProducer.ACK_MODE_DEFAULT)); - testParameters.setPropertyIfNull(PingPongProducer.PAUSE_AFTER_BATCH_PROPNAME, - PingPongProducer.PAUSE_AFTER_BATCH_DEFAULT);*/ - } - - /** - * Compile all the tests into a test suite. - */ - public static Test suite() - { - // Build a new test suite - TestSuite suite = new TestSuite("Ping-Pong Performance Tests"); - - // Run performance tests in read committed mode. - suite.addTest(new PingPongTestPerf("testPingPongOk")); - - return suite; - } - - private static void setSystemPropertyIfNull(String propName, String propValue) - { - if (System.getProperty(propName) == null) - { - System.setProperty(propName, propValue); - } - } - - public void testPingPongOk(int numPings) throws Exception - { - // Get the per thread test setup to run the test through. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Generate a sample message. This message is already time stamped and has its reply-to destination set. - Message msg = - perThreadSetup._testPingProducer.getTestMessage(perThreadSetup._testPingProducer.getReplyDestinations().get(0), - testParameters.getPropertyAsInteger(PingPongProducer.MESSAGE_SIZE_PROPNAME), - testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME)); - - // Send the message and wait for a reply. - int numReplies = - perThreadSetup._testPingProducer.pingAndWaitForReply(msg, numPings, PingPongProducer.TIMEOUT_DEFAULT, null); - - // Fail the test if the timeout was exceeded. - if (numReplies != numPings) - { - Assert.fail("The ping timed out, got " + numReplies + " out of " + numPings); - } - } - - /** - * Performs test fixture creation on a per thread basis. This will only be called once for each test thread. - */ - public void threadSetUp() - { - try - { - PerThreadSetup perThreadSetup = new PerThreadSetup(); - - // Extract the test set up paramaeters. - String fileProperties = testParameters.getProperty(PingPongProducer.FILE_PROPERTIES_PROPNAME); - String factoryName = testParameters.getProperty(PingPongProducer.FACTORY_NAME_PROPNAME); - String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME); - String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME); - String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME); - boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME); - boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME); - String selector = testParameters.getProperty(PingPongProducer.SELECTOR_PROPNAME); - boolean verbose = testParameters.getPropertyAsBoolean(PingPongProducer.VERBOSE_PROPNAME); - boolean pubsub = testParameters.getPropertyAsBoolean(PingPongProducer.PUBSUB_PROPNAME); - - synchronized (this) - { - // Establish a bounce back client on the ping queue to bounce back the pings. - perThreadSetup._testPingBouncer = - new PingPongBouncer(fileProperties, factoryName, username, password, destinationName, persistent, - transacted, selector, verbose, pubsub); - - // Start the connections for client and producer running. - perThreadSetup._testPingBouncer.getConnection().start(); - - // Establish a ping-pong client on the ping queue to send the pings and receive replies with. - perThreadSetup._testPingProducer = new PingPongProducer(testParameters); - perThreadSetup._testPingProducer.establishConnection(true, true); - perThreadSetup._testPingProducer.start(); - } - - // Attach the per-thread set to the thread. - threadSetup.set(perThreadSetup); - } - catch (Exception e) - { - _logger.warn("There was an exception during per thread setup.", e); - } - } - - /** - * Performs test fixture clean - */ - public void threadTearDown() - { - _logger.debug("public void threadTearDown(): called"); - - try - { - // Get the per thread test fixture. - PerThreadSetup perThreadSetup = threadSetup.get(); - - // Close the pingers so that it cleans up its connection cleanly. - synchronized (this) - { - perThreadSetup._testPingProducer.close(); - // perThreadSetup._testPingBouncer.close(); - } - - // Ensure the per thread fixture is reclaimed. - threadSetup.remove(); - } - catch (JMSException e) - { - _logger.warn("There was an exception during per thread tear down."); - } - } - - protected static class PerThreadSetup - { - /** - * Holds the test ping-pong producer. - */ - private PingPongProducer _testPingProducer; - - /** - * Holds the test ping client. - */ - private PingPongBouncer _testPingBouncer; - } -} diff --git a/java/perftests/src/main/java/perftests.log4j b/java/perftests/src/main/java/perftests.log4j deleted file mode 100644 index badb617432..0000000000 --- a/java/perftests/src/main/java/perftests.log4j +++ /dev/null @@ -1,51 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -log4j.rootLogger=${root.logging.level} - - -log4j.logger.org.apache.qpid=${amqj.logging.level}, console -log4j.additivity.org.apache.qpid=false - -log4j.logger.org.apache.qpidity=${amqj.logging.level}, console -log4j.additivity.org.apache.qpidity=false - -log4j.logger.org.apache.mina=ERROR, console -log4j.additivity.org.apache.mina=false - -log4j.logger.org.apache.qpid.requestreply=${amqj.test.logging.level} -log4j.logger.org.apache.qpid.pingpong=${amqj.test.logging.level} -log4j.logger.org.apache.qpid.ping=${amqj.test.logging.level} -log4j.logger.org.apache.qpid.topic=${amqj.test.logging.level} - - -log4j.logger.uk.co.thebadgerset.junit.extensions=${badger.level}, console -log4j.additivity.uk.co.thebadgerset.junit.extensions=false - -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.Threshold=all -log4j.appender.console.layout=org.apache.log4j.PatternLayout - -log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n -#log4j.appender.console.layout.ConversionPattern=%t %p [%c] %m%n - -log4j.appender.fileApp=org.apache.log4j.FileAppender -log4j.appender.fileApp.file=${log.dir}/perftests.volumetest.log -log4j.appender.fileApp.Threshold=info -log4j.appender.fileApp.append=false -log4j.appender.fileApp.layout=org.apache.log4j.PatternLayout diff --git a/java/perftests/src/main/java/perftests.properties b/java/perftests/src/main/java/perftests.properties deleted file mode 100644 index d97d0d3906..0000000000 --- a/java/perftests/src/main/java/perftests.properties +++ /dev/null @@ -1,46 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory - -# use the following property to configure the default connector -#java.naming.provider.url - ignored. - -# register some connection factories -# connectionfactory.[jndiname] = [ConnectionURL] -#connectionfactory.local = amqp://guest:guest@clientid/test?brokerlist='tcp://192.168.1.10:5672' -connectionfactory.local = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' - -# A 0.10 connection factory -##connectionfactory.local = qpid:password=pass;username=name@tcp:192.168.1.10:5672 - -# register some queues in JNDI using the form -# queue.[jndiName] = [physicalName] -#queue.MyQueue = example.MyQueue -queue.syncQueue = syncQueue -queue.testQueue = testQueue - -# register some topics in JNDI using the form -# topic.[jndiName] = [physicalName] -#topic.ibmStocks = stocks.nyse.ibm -topic.testTopic = testTopic - -# Register an AMQP destination in JNDI -# NOTE: Qpid currently only supports direct,topics and headers -# destination.[jniName] = [BindingURL] -destination.direct = direct://amq.direct//directQueue -- cgit v1.2.1