diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-14 09:38:05 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-14 09:38:05 +0000 |
| commit | 03fd65578602ff436f4f2938528b47aa35f074e0 (patch) | |
| tree | 50d0116605bde038a5d57df79327625ed96c2d52 /java/perftests/src | |
| parent | 5447b80198e5ddac4a10678e1dd5fe76cd435d2b (diff) | |
| download | qpid-python-03fd65578602ff436f4f2938528b47aa35f074e0.tar.gz | |
Added distinction between sync and async consumers
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@594814 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests/src')
5 files changed, 262 insertions, 133 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java new file mode 100644 index 0000000000..111b43dfb9 --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java @@ -0,0 +1,91 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.client.perf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * + * + */ +public class JMSAsyncConsumer implements MessageListener, JMSConsumer +{ + private static final Logger _logger = LoggerFactory.getLogger(JMSSyncConsumer.class); + + private String _id; + private Connection _connection; + private Session _session; + private MessageConsumer _consumer; + private Destination _destination; + private boolean _transacted; + private int _ackMode = Session.AUTO_ACKNOWLEDGE; + private AtomicBoolean _run = new AtomicBoolean(true); + private long _currentMsgCount; + + /* Not implementing transactions for first phase */ + public JMSAsyncConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception + { + _id = id; + _connection = connection; + _destination = destination; + _transacted = transacted; + _ackMode = ackMode; + _session = _connection.createSession(_transacted, _ackMode); + _consumer = _session.createConsumer(_destination); + _consumer.setMessageListener(this); + } + + + + public void onMessage(Message message) + { + _currentMsgCount ++; + } + + public void stopConsuming() + { + System.out.println("Producer received notification to stop"); + try + { + _session.close(); + _connection.close(); + } + catch(Exception e) + { + _logger.error("Error Closing JMSSyncConsumer:"+ _id, e); + } + } + + public String getId() + { + return _id; + } + + /* Not worried about synchronizing as accuracy here is not that important. + * So if this method is invoked the count maybe off by a few digits. + * But when the test stops, this will always return the proper count. + */ + public long getCurrentMessageCount() + { + return _currentMsgCount; + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java index 635fcaa248..4c41461cd4 100644 --- a/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java +++ b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java @@ -1,102 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.qpid.client.perf; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JMSConsumer implements Runnable +/** + * + * + */ +public interface JMSConsumer { - private static final Logger _logger = LoggerFactory.getLogger(JMSConsumer.class); - - private String _id; - private Connection _connection; - private Session _session; - private MessageConsumer _consumer; - private Destination _destination; - private boolean _transacted; - private int _ackMode = Session.AUTO_ACKNOWLEDGE; - private AtomicBoolean _run = new AtomicBoolean(true); - private long _currentMsgCount; - - /* Not implementing transactions for first phase */ - public JMSConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception - { - _id = id; - _connection = connection; - _destination = destination; - _transacted = transacted; - _ackMode = ackMode; - } - - public void run() - { - _run.set(true); - - try - { - _session = _connection.createSession(_transacted, _ackMode); - _consumer = _session.createConsumer(_destination); - } - catch(Exception e) - { - _logger.error("Error Setting up JMSProducer:"+ _id, e); - } - - while (_run.get()) - { - try - { - BytesMessage msg = (BytesMessage)_consumer.receive(); - if (msg != null) - { - long msgId = Integer.parseInt(msg.getJMSCorrelationID()); - /*if (_currentMsgCount+1 != msgId) - { - _logger.error("Error : Message received out of order in JMSConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1); - }*/ - _currentMsgCount ++; - } - } - catch(Exception e) - { - _logger.error("Error Receiving message from JMSConsumer:" + _id, e); - } - } - try - { - _session.close(); - _connection.close(); - } - catch(Exception e) - { - _logger.error("Error Closing JMSConsumer:"+ _id, e); - } - } - - public void stopConsuming() - { - _run.set(false); - System.out.println("Producer received notification to stop"); - } - - public String getId() - { - return _id; - } - - /* Not worried about synchronizing as accuracy here is not that important. - * So if this method is invoked the count maybe off by a few digits. - * But when the test stops, this will always return the proper count. - */ - public long getCurrentMessageCount() - { - return _currentMsgCount; - } + public String getId(); + public void stopConsuming(); + public long getCurrentMessageCount(); } diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java new file mode 100644 index 0000000000..5fd103746b --- /dev/null +++ b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java @@ -0,0 +1,102 @@ +package org.apache.qpid.client.perf; + +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JMSSyncConsumer implements Runnable, JMSConsumer +{ + private static final Logger _logger = LoggerFactory.getLogger(JMSSyncConsumer.class); + + private String _id; + private Connection _connection; + private Session _session; + private MessageConsumer _consumer; + private Destination _destination; + private boolean _transacted; + private int _ackMode = Session.AUTO_ACKNOWLEDGE; + private AtomicBoolean _run = new AtomicBoolean(true); + private long _currentMsgCount; + + /* Not implementing transactions for first phase */ + public JMSSyncConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception + { + _id = id; + _connection = connection; + _destination = destination; + _transacted = transacted; + _ackMode = ackMode; + } + + public void run() + { + _run.set(true); + + try + { + _session = _connection.createSession(_transacted, _ackMode); + _consumer = _session.createConsumer(_destination); + } + catch(Exception e) + { + _logger.error("Error Setting up JMSProducer:"+ _id, e); + } + + while (_run.get()) + { + try + { + BytesMessage msg = (BytesMessage)_consumer.receive(); + if (msg != null) + { + // long msgId = Integer.parseInt(msg.getJMSCorrelationID()); + /*if (_currentMsgCount+1 != msgId) + { + _logger.error("Error : Message received out of order in JMSSyncConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1); + }*/ + _currentMsgCount ++; + } + } + catch(Exception e) + { + _logger.error("Error Receiving message from JMSSyncConsumer:" + _id, e); + } + } + try + { + _session.close(); + _connection.close(); + } + catch(Exception e) + { + _logger.error("Error Closing JMSSyncConsumer:"+ _id, e); + } + } + + public void stopConsuming() + { + _run.set(false); + System.out.println("Producer received notification to stop"); + } + + public String getId() + { + return _id; + } + + /* Not worried about synchronizing as accuracy here is not that important. + * So if this method is invoked the count maybe off by a few digits. + * But when the test stops, this will always return the proper count. + */ + public long getCurrentMessageCount() + { + return _currentMsgCount; + } +} diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java index 99659fed44..52356ae243 100644 --- a/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java +++ b/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java @@ -1,13 +1,11 @@ package org.apache.qpid.client.perf; import java.io.FileWriter; -import java.io.RandomAccessFile; import java.sql.Date; import java.text.SimpleDateFormat; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Session; @@ -21,7 +19,7 @@ public class MessageConsumerTest extends Options implements Runnable private static final Logger _logger = LoggerFactory.getLogger(MessageConsumerTest.class); private SimpleDateFormat df = new SimpleDateFormat("h:mm a"); - private Map<Integer,JMSConsumer> _consumers = new ConcurrentHashMap<Integer,JMSConsumer>(); + private Map<Integer, JMSConsumer> _consumers = new ConcurrentHashMap<Integer, JMSConsumer>(); private int _count; String _logFileName; private long _gracePeriod = 5 * 60 * 1000; @@ -30,30 +28,38 @@ public class MessageConsumerTest extends Options implements Runnable public void start() throws Exception { - this.parseOptions(); - boolean useSameDest = true; - _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis(); + this.parseOptions(); + boolean useSameDest = true; + _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis(); - // use each destination with a different producer - if (_producerCount == destArray.length) - { - useSameDest = false; - } - for (;_count < _producerCount;_count++) - { - createAndStartProducer(useSameDest?destArray[0]:destArray[_count]); - } + // use each destination with a different producer + if (_producerCount == destArray.length) + { + useSameDest = false; + } + for (; _count < _producerCount; _count++) + { + createAndStartConsumer(useSameDest ? destArray[0] : destArray[_count]); + } } - private void createAndStartProducer(String routingKey)throws Exception + private void createAndStartConsumer(String routingKey) throws Exception { AMQConnection con = ConnectionUtility.getInstance().getConnection(); con.start(); - Destination dest = new AMQTopic(con,routingKey); - JMSConsumer prod = new JMSConsumer(String.valueOf(_count),(Connection)con, dest, _transacted,Session.AUTO_ACKNOWLEDGE); - Thread t = new Thread(prod); - t.setName("JMSConsumer-"+_count); - t.start(); + Destination dest = new AMQTopic(con, routingKey); + JMSConsumer prod; + if (_synchronous) + { + prod = new JMSSyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE); + Thread t = new Thread((JMSSyncConsumer) prod); + t.setName("JMSSyncConsumer-" + _count); + t.start(); + } + else + { + prod = new JMSAsyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE); + } _consumers.put(_count, prod); } @@ -71,12 +77,12 @@ public class MessageConsumerTest extends Options implements Runnable runReaper(false); try { - while(run) + while (run) { Thread.sleep(_logDuration); runReaper(false); - if(System.currentTimeMillis() + _gracePeriod - _startTime > _expiry ) + if (System.currentTimeMillis() + _gracePeriod - _startTime > _expiry) { // time to stop the test. for (Integer id : _consumers.keySet()) @@ -91,7 +97,7 @@ public class MessageConsumerTest extends Options implements Runnable } catch (InterruptedException e) { - _logger.error("The timer thread exited",e); + _logger.error("The timer thread exited", e); } } @@ -99,11 +105,11 @@ public class MessageConsumerTest extends Options implements Runnable { try { - FileWriter _logFile = new FileWriter(_logFileName + ".csv",true); + FileWriter _logFile = new FileWriter(_logFileName + ".csv", true); for (Integer id : _consumers.keySet()) { JMSConsumer prod = _consumers.get(id); - StringBuffer buf = new StringBuffer("JMSConsumer(").append(prod.getId()).append("),"); + StringBuffer buf = new StringBuffer("JMSSyncConsumer(").append(prod.getId()).append("),"); Date d = new Date(System.currentTimeMillis()); buf.append(df.format(d)).append(","); buf.append(d.getTime()).append(","); @@ -113,21 +119,21 @@ public class MessageConsumerTest extends Options implements Runnable } _logFile.close(); - FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv",true); - StringBuffer buf = new StringBuffer("JMSConsumer,"); + FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv", true); + StringBuffer buf = new StringBuffer("JMSSyncConsumer,"); Date d = new Date(System.currentTimeMillis()); buf.append(df.format(d)).append(","); buf.append(d.getTime()).append(","); buf.append(totalMsgCount).append(","); - buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n"); + buf.append(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()).append("\n"); _memoryLog.write(buf.toString()); _memoryLog.close(); if (printSummary) { double totaltime = d.getTime() - _startTime; double dCount = totalMsgCount; - double ratio = (dCount/totaltime)*1000; - FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true); + double ratio = (dCount / totaltime) * 1000; + FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary", true); buf = new StringBuffer("MessageConsumerTest \n Test started at : "); buf.append(df.format(new Date(_startTime))).append("\n Test finished at : "); d = new Date(System.currentTimeMillis()); @@ -140,9 +146,9 @@ public class MessageConsumerTest extends Options implements Runnable _summaryLog.close(); } } - catch(Exception e) + catch (Exception e) { - _logger.error("Error printing info to the log file",e); + _logger.error("Error printing info to the log file", e); } } @@ -154,7 +160,7 @@ public class MessageConsumerTest extends Options implements Runnable test.start(); test.startTimerThread(); } - catch(Exception e) + catch (Exception e) { e.printStackTrace(); } diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java index ee22b6804b..0ff420bf75 100644 --- a/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java +++ b/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java @@ -4,6 +4,7 @@ public class Options { int _messageSize; boolean _transacted; + boolean _synchronous; String[] destArray; int _producerCount; int _consumerCount; @@ -25,6 +26,7 @@ public class Options public void parseOptions() { _messageSize = Integer.parseInt(System.getProperty("messageSize","100")); + _synchronous =Boolean.parseBoolean( System.getProperty("synchronous", "false")); _transacted = false; String destinations = System.getProperty("destinations"); destArray = destinations.split(","); @@ -37,7 +39,7 @@ public class Options System.out.println("============= Test Data ==================="); System.out.println("Total no of producers : " + _producerCount); - System.out.println("Total no of consumer : " + _consumerCount); + System.out.println(_synchronous? "Total no of synchronous consumers : " : "Total no of asynchronous consumers :" + _consumerCount); System.out.println("Log Frequency in mins : " + _logDuration/(1000*60)); System.out.println("Log file path : " + _logFilePath); System.out.println("Test Duration : " + printTestDuration()); |
