diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-29 22:46:28 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-11-29 22:46:28 +0000 |
| commit | 5ae1ec0df4a3d088e413d5d1e4c534409b307155 (patch) | |
| tree | 1e16f6eeaa6e1105136b0b8379153f7c5b272497 /qpid/java | |
| parent | 660bd9a2f5885cc1a99681c54c7f51592d2808fb (diff) | |
| download | qpid-python-5ae1ec0df4a3d088e413d5d1e4c534409b307155.tar.gz | |
Removed all the fancy threading with the test cases as these interfer with the test cases
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@599611 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
7 files changed, 154 insertions, 559 deletions
diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java deleted file mode 100644 index 3f21a3f0a6..0000000000 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java +++ /dev/null @@ -1,106 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.client.perf; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.*; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * - * - */ -public class JMSAsyncConsumer implements MessageListener, JMSConsumer -{ - private static final Logger _logger = LoggerFactory.getLogger(JMSSyncConsumer.class); - - private String _id; - private Connection _connection; - private Session _session; - private MessageConsumer _consumer; - private Destination _destination; - private boolean _transacted; - private int _ackMode = Session.AUTO_ACKNOWLEDGE; - private AtomicBoolean _run = new AtomicBoolean(true); - private long _currentMsgCount; - private boolean _verifyOrder = false; - - /* Not implementing transactions for first phase */ - public JMSAsyncConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception - { - _id = id; - _connection = connection; - _destination = destination; - _transacted = transacted; - _ackMode = ackMode; - _session = _connection.createSession(_transacted, _ackMode); - _consumer = _session.createConsumer(_destination); - _consumer.setMessageListener(this); - _verifyOrder = Boolean.getBoolean("verifyOrder"); - } - - - - public void onMessage(Message message) - { - try - { - long msgId = Integer.parseInt(message.getJMSCorrelationID()); - if (_verifyOrder && _currentMsgCount+1 != msgId) - { - _logger.error("Error : Message received out of order in JMSSyncConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1); - } - message = null; - _currentMsgCount ++; - } - catch(Exception e) - { - e.printStackTrace(); - } - } - - public void stopConsuming() - { - System.out.println("Consumer received notification to stop"); - try - { - _session.close(); - _connection.close(); - } - catch(Exception e) - { - _logger.error("Error Closing JMSSyncConsumer:"+ _id, e); - } - } - - public String getId() - { - return _id; - } - - /* Not worried about synchronizing as accuracy here is not that important. - * So if this method is invoked the count maybe off by a few digits. - * But when the test stops, this will always return the proper count. - */ - public long getCurrentMessageCount() - { - return _currentMsgCount; - } -} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java deleted file mode 100644 index 4c41461cd4..0000000000 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.client.perf; - -/** - * - * - */ -public interface JMSConsumer -{ - public String getId(); - public void stopConsuming(); - public long getCurrentMessageCount(); -} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java deleted file mode 100644 index 58b17efecc..0000000000 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSProducer.java +++ /dev/null @@ -1,99 +0,0 @@ -package org.apache.qpid.client.perf; - -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.qpid.client.message.TestMessageFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JMSProducer implements Runnable -{ - private static final Logger _logger = LoggerFactory.getLogger(JMSProducer.class); - - private String _id; - private int _messageSize; - private Connection _connection; - private Session _session; - private MessageProducer _producer; - private Destination _destination; - private BytesMessage _payload; - private boolean _transacted; - private int _ackMode = Session.AUTO_ACKNOWLEDGE; - private AtomicBoolean _run = new AtomicBoolean(true); - private long _currentMsgCount; - - /* Not implementing transactions for first phase */ - public JMSProducer(String id,Connection connection, Destination destination,int messageSize, boolean transacted) throws Exception - { - _id = id; - _connection = connection; - _destination = destination; - _messageSize = messageSize; - _transacted = transacted; - } - - public void run() - { - try - { - _session = _connection.createSession(_transacted, _ackMode); - _payload = TestMessageFactory.newBytesMessage(_session, _messageSize); - _producer = _session.createProducer(_destination); - // this should speedup the message producer - _producer.setDisableMessageTimestamp(true); - } - catch(Exception e) - { - _logger.error("Error Setting up JMSProducer:"+ _id, e); - } - - while (_run.get()) - { - try - { - _payload.setJMSCorrelationID(String.valueOf(_currentMsgCount+1)); - _producer.send(_payload); - _currentMsgCount ++; - } - catch(Exception e) - { - _logger.error("Error Sending message from JMSProducer:" + _id, e); - } - } - try - { - _session.close(); - _connection.close(); - } - catch(Exception e) - { - _logger.error("Error Closing JMSProducer:"+ _id, e); - } - } - - public void stopProducing() - { - _run.set(false); - System.out.println("Producer received notification to stop"); - } - - public String getId() - { - return _id; - } - - /* Not worried about synchronizing as accuracy here is not that important. - * So if this method is invoked the count maybe off by a few digits. - * But when the test stops, this will always return the proper count. - */ - public long getCurrentMessageCount() - { - return _currentMsgCount; - } -} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java deleted file mode 100644 index b320f3cdfc..0000000000 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java +++ /dev/null @@ -1,102 +0,0 @@ -package org.apache.qpid.client.perf; - -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JMSSyncConsumer implements Runnable, JMSConsumer -{ - private static final Logger _logger = LoggerFactory.getLogger(JMSSyncConsumer.class); - - private String _id; - private Connection _connection; - private Session _session; - private MessageConsumer _consumer; - private Destination _destination; - private boolean _transacted; - private int _ackMode = Session.AUTO_ACKNOWLEDGE; - private AtomicBoolean _run = new AtomicBoolean(true); - private long _currentMsgCount; - - /* Not implementing transactions for first phase */ - public JMSSyncConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception - { - _id = id; - _connection = connection; - _destination = destination; - _transacted = transacted; - _ackMode = ackMode; - } - - public void run() - { - _run.set(true); - - try - { - _session = _connection.createSession(_transacted, _ackMode); - _consumer = _session.createConsumer(_destination); - } - catch(Exception e) - { - _logger.error("Error Setting up JMSProducer:"+ _id, e); - } - - while (_run.get()) - { - try - { - BytesMessage msg = (BytesMessage)_consumer.receive(); - if (msg != null) - { - // long msgId = Integer.parseInt(msg.getJMSCorrelationID()); - /*if (_currentMsgCount+1 != msgId) - { - _logger.error("Error : Message received out of order in JMSSyncConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1); - }*/ - _currentMsgCount ++; - } - } - catch(Exception e) - { - _logger.error("Error Receiving message from JMSSyncConsumer:" + _id, e); - } - } - try - { - _session.close(); - _connection.close(); - } - catch(Exception e) - { - _logger.error("Error Closing JMSSyncConsumer:"+ _id, e); - } - } - - public void stopConsuming() - { - _run.set(false); - System.out.println("Consumer received notification to stop"); - } - - public String getId() - { - return _id; - } - - /* Not worried about synchronizing as accuracy here is not that important. - * So if this method is invoked the count maybe off by a few digits. - * But when the test stops, this will always return the proper count. - */ - public long getCurrentMessageCount() - { - return _currentMsgCount; - } -} diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java index 831d4dd9c4..d820b22de9 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java @@ -4,114 +4,96 @@ import java.io.FileWriter; import java.io.IOException; import java.sql.Date; import java.text.SimpleDateFormat; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import javax.jms.BytesMessage; +import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.message.TestMessageFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MessageConsumerTest extends Options implements Runnable +public class MessageConsumerTest extends Options implements MessageListener { private static final Logger _logger = LoggerFactory.getLogger(MessageConsumerTest.class); private SimpleDateFormat df = new SimpleDateFormat("h:mm a"); - private Map<Integer, JMSConsumer> _consumers = new ConcurrentHashMap<Integer, JMSConsumer>(); - private int _count; String _logFileName; - private long _gracePeriod = 5 * 60 * 1000; long _startTime; long _totalMsgCount; + long _intervalCount; - public void start() throws Exception - { - this.parseOptions(); - boolean useSameDest = true; - _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis(); + private Connection _connection; + private Session _session; + private BytesMessage _payload; + private MessageConsumer _consumer; + private boolean _verifyOrder = false; - // use each destination with a different consumerucer - if (_consumerCount == destArray.length) - { - useSameDest = false; - } - for (; _count < _consumerCount; _count++) - { - createAndStartConsumer(useSameDest ? destArray[0] : destArray[_count]); - } - } - - private void createAndStartConsumer(String routingKey) throws Exception + public void init() throws Exception { - AMQConnection con = ConnectionUtility.getInstance().getConnection(); - con.start(); - Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(con,routingKey) : new AMQTopic(con,routingKey); - JMSConsumer consumer; - if (_synchronous) - { - consumer = new JMSSyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE); - Thread t = new Thread((JMSSyncConsumer) consumer); - t.setName("JMSSyncConsumer-" + _count); - t.start(); - } - else - { - consumer = new JMSAsyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE); - } - _consumers.put(_count, consumer); + this.parseOptions(); + _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis(); + + AMQConnection _connection = ConnectionUtility.getInstance().getConnection(); + _connection.start(); + Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(_connection,_destination) : new AMQTopic(_connection,_destination); + _session = _connection.createSession(_transacted, Session.AUTO_ACKNOWLEDGE); + _payload = TestMessageFactory.newBytesMessage(_session, _messageSize); + _consumer = _session.createConsumer(dest); + if(!_synchronous) + { + _consumer.setMessageListener(this); + } + _verifyOrder = Boolean.getBoolean("verifyOrder"); + + _startTime = System.currentTimeMillis(); + boolean run = true; + if(Boolean.getBoolean("collect_stats")) + { + printHeading(); + runReaper(); + } } - private void startTimerThread() + public void onMessage(Message message) { - _startTime = System.currentTimeMillis(); - if(Boolean.getBoolean("collect_stats")) - { - Thread t = new Thread(this); - t.setName("MessageConsumerTest-TimerThread"); - t.start(); - } try { - printSummary(); - } - catch(Exception e) - { - e.printStackTrace(); - } - } - - public void run() - { - boolean run = true; - printHeading(); - runReaper(); - try - { - while (run) + /* long msgId = Integer.parseInt(message.getJMSMessageID()); + if (_verifyOrder && _totalMsgCount+1 != msgId) { - Thread.sleep(_logDuration); - runReaper(); - - if (System.currentTimeMillis() + _gracePeriod - _startTime > _expiry) + _logger.error("Error : Message received out of order in JMSSyncConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1); + }*/ + message = null; + _totalMsgCount ++; + _intervalCount++; + if(_intervalCount >= _logFrequency) + { + _intervalCount = 0; + if (Boolean.getBoolean("collect_stats")) { - // time to stop the test. - for (Integer id : _consumers.keySet()) - { - JMSConsumer consumer = _consumers.get(id); - consumer.stopConsuming(); - } runReaper(); - run = false; + } + if (System.currentTimeMillis() - _startTime >= _expiry) + { + _session.close(); + _connection.stop(); + printSummary(); + return; } } } - catch (InterruptedException e) + catch(Exception e) { - _logger.error("The timer thread exited", e); + e.printStackTrace(); } } @@ -119,16 +101,6 @@ public class MessageConsumerTest extends Options implements Runnable { try { - long totalMsgCountThisInterval = 0; - - for (Integer id : _consumers.keySet()) - { - JMSConsumer consumer = _consumers.get(id); - totalMsgCountThisInterval = totalMsgCountThisInterval + consumer.getCurrentMessageCount(); - - } - _totalMsgCount = _totalMsgCount + totalMsgCountThisInterval; - FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true); StringBuffer buf = new StringBuffer(); Date d = new Date(System.currentTimeMillis()); @@ -137,8 +109,6 @@ public class MessageConsumerTest extends Options implements Runnable buf.append(d.getTime()).append(","); buf.append(_totalMsgCount).append(","); buf.append(_totalMsgCount*1000 /totaltime).append(","); - buf.append(totalMsgCountThisInterval).append(","); - buf.append(totalMsgCountThisInterval*1000/_logDuration).append(","); buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n"); buf.append("\n"); _memoryLog.write(buf.toString()); @@ -156,7 +126,7 @@ public class MessageConsumerTest extends Options implements Runnable try { FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true); - String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),interval count,interval rate (msg/sec),memory"; + String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),memory"; _memoryLog.write(s); _memoryLog.close(); } @@ -166,33 +136,31 @@ public class MessageConsumerTest extends Options implements Runnable } } - private void printSummary() throws Exception + private void printSummary() { - if (Boolean.getBoolean("collect_stats")) + try { - for (Integer id : _consumers.keySet()) - { - JMSConsumer consumer = _consumers.get(id); - _totalMsgCount = _totalMsgCount + consumer.getCurrentMessageCount(); - } - } + long current = System.currentTimeMillis(); + double time = current - _startTime; + double ratio = _totalMsgCount*1000/time; + FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true); - long current = System.currentTimeMillis(); - double time = current - _startTime; - double ratio = _totalMsgCount*1000/time; - FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true); - - StringBuffer buf = new StringBuffer("MessageConsumerTest \n Test started at : "); - buf.append(df.format(new Date(_startTime))).append("\n Test finished at : "); - Date d = new Date(current); - buf.append(df.format(d)).append("\n Total Time taken (ms):"); - buf.append(time).append("\n Total messages sent:"); - buf.append(_totalMsgCount).append("\n consumer rate:"); - buf.append(ratio).append("\n"); - _summaryLog.write(buf.toString()); - System.out.println("---------- Test Ended -------------"); - _summaryLog.close(); + StringBuffer buf = new StringBuffer("MessageProducerTest \n Test started at : "); + buf.append(df.format(new Date(_startTime))).append("\n Test finished at : "); + Date d = new Date(current); + buf.append(df.format(d)).append("\n Total Time taken (ms):"); + buf.append(time).append("\n Total messages sent:"); + buf.append(_totalMsgCount).append("\n producer rate:"); + buf.append(ratio).append("\n"); + _summaryLog.write(buf.toString()); + System.out.println("---------- Test Ended -------------"); + _summaryLog.close(); + } + catch(Exception e) + { + e.printStackTrace(); + } } public static void main(String[] args) @@ -200,8 +168,7 @@ public class MessageConsumerTest extends Options implements Runnable try { MessageConsumerTest test = new MessageConsumerTest(); - test.start(); - test.startTimerThread(); + test.init(); } catch (Exception e) { diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java index 67a3aaa786..084e098cc3 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageProducerTest.java @@ -7,119 +7,97 @@ import java.text.SimpleDateFormat; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; import org.apache.qpid.client.AMQConnection; import org.apache.qpid.client.AMQQueue; import org.apache.qpid.client.AMQTopic; +import org.apache.qpid.client.message.TestMessageFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MessageProducerTest extends Options implements Runnable +public class MessageProducerTest extends Options { private static final Logger _logger = LoggerFactory.getLogger(MessageProducerTest.class); private SimpleDateFormat df = new SimpleDateFormat("h:mm a"); - private Map<Integer,JMSProducer> _producers = new ConcurrentHashMap<Integer,JMSProducer>(); - private int _count; String _logFileName; long _startTime; long _totalMsgCount; + long _intervalCount; - public void start() throws Exception + private Connection _connection; + private Session _session; + private BytesMessage _payload; + private MessageProducer _producer; + + public void init() throws Exception { this.parseOptions(); - boolean useSameDest = true; _logFileName = _logFilePath + "/MessageProducerTest_" + System.currentTimeMillis(); - // use each destination with a different producer - if (_producerCount == destArray.length) - { - useSameDest = false; - } - for (;_count < _producerCount;_count++) - { - createAndStartProducer(useSameDest?destArray[0]:destArray[_count]); - } + AMQConnection _connection = ConnectionUtility.getInstance().getConnection(); + _connection.start(); + Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(_connection,_destination) : new AMQTopic(_connection,_destination); + _session = _connection.createSession(_transacted, Session.AUTO_ACKNOWLEDGE); + _payload = TestMessageFactory.newBytesMessage(_session, _messageSize); + _producer = _session.createProducer(dest); + // this should speedup the message producer + _producer.setDisableMessageTimestamp(true); } - private void createAndStartProducer(String routingKey)throws Exception - { - AMQConnection con = ConnectionUtility.getInstance().getConnection(); - con.start(); - Destination dest = Boolean.getBoolean("useQueue")? new AMQQueue(con,routingKey) : new AMQTopic(con,routingKey); - JMSProducer prod = new JMSProducer(String.valueOf(_count),(Connection)con, dest,_messageSize, _transacted); - Thread t = new Thread(prod); - t.setName("JMSProducer-"+_count); - t.start(); - _producers.put(_count, prod); - } - - private void startTimerThread() + public void run() { _startTime = System.currentTimeMillis(); + boolean run = true; if(Boolean.getBoolean("collect_stats")) { - Thread t = new Thread(this); - t.setName("MessageProducerTest-TimerThread"); - t.start(); - } - try - { - printSummary(); + printHeading(); + runReaper(); } - catch(Exception e) - { - e.printStackTrace(); - } - } - public void run() - { - boolean run = true; - printHeading(); - runReaper(); try { while (run) { - Thread.sleep(_logDuration); - runReaper(); + _payload.setJMSMessageID(String.valueOf(_totalMsgCount+1)); + _producer.send(_payload); + _totalMsgCount ++; + _intervalCount ++; - if (System.currentTimeMillis() - _startTime > _expiry) + // check every x messages to see if times up + if(_intervalCount >= _logFrequency) { - // time to stop the test. - for (Integer id : _producers.keySet()) + _intervalCount = 0; + if (Boolean.getBoolean("collect_stats")) { - JMSProducer producer = _producers.get(id); - producer.stopProducing(); + runReaper(); + } + if (System.currentTimeMillis() - _startTime >= _expiry) + { + // time to stop the test. + _session.close(); + _connection.stop(); + run = false; } - runReaper(); - run = false; } } } - catch (InterruptedException e) + catch (Exception e) { _logger.error("The timer thread exited", e); } + printSummary(); } public void runReaper() { try { - long totalMsgCountThisInterval = 0; - - for (Integer id : _producers.keySet()) - { - JMSProducer producer = _producers.get(id); - totalMsgCountThisInterval = totalMsgCountThisInterval + producer.getCurrentMessageCount(); - - } - _totalMsgCount = _totalMsgCount + totalMsgCountThisInterval; - FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true); StringBuffer buf = new StringBuffer(); Date d = new Date(System.currentTimeMillis()); @@ -128,8 +106,6 @@ public class MessageProducerTest extends Options implements Runnable buf.append(d.getTime()).append(","); buf.append(_totalMsgCount).append(","); buf.append(_totalMsgCount*1000 /totaltime).append(","); - buf.append(totalMsgCountThisInterval).append(","); - buf.append(totalMsgCountThisInterval*1000/_logDuration).append(","); buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n"); buf.append("\n"); _memoryLog.write(buf.toString()); @@ -147,7 +123,7 @@ public class MessageProducerTest extends Options implements Runnable try { FileWriter _memoryLog = new FileWriter(_logFileName + ".csv",true); - String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),interval count,interval rate (msg/sec),memory"; + String s = "Date/Time,Time (ms),total msg count,total rate (msg/sec),memory"; _memoryLog.write(s); _memoryLog.close(); } @@ -157,33 +133,31 @@ public class MessageProducerTest extends Options implements Runnable } } - private void printSummary() throws Exception + private void printSummary() { - if (Boolean.getBoolean("collect_stats")) + try { - for (Integer id : _producers.keySet()) - { - JMSProducer producer = _producers.get(id); - _totalMsgCount = _totalMsgCount + producer.getCurrentMessageCount(); - } + long current = System.currentTimeMillis(); + double time = current - _startTime; + double ratio = _totalMsgCount*1000/time; + FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true); + + StringBuffer buf = new StringBuffer("MessageProducerTest \n Test started at : "); + buf.append(df.format(new Date(_startTime))).append("\n Test finished at : "); + Date d = new Date(current); + buf.append(df.format(d)).append("\n Total Time taken (ms):"); + buf.append(time).append("\n Total messages sent:"); + buf.append(_totalMsgCount).append("\n producer rate:"); + buf.append(ratio).append("\n"); + _summaryLog.write(buf.toString()); + System.out.println("---------- Test Ended -------------"); + _summaryLog.close(); + } + catch(Exception e) + { + e.printStackTrace(); } - - long current = System.currentTimeMillis(); - double time = current - _startTime; - double ratio = _totalMsgCount*1000/time; - FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true); - - StringBuffer buf = new StringBuffer("MessageProducerTest \n Test started at : "); - buf.append(df.format(new Date(_startTime))).append("\n Test finished at : "); - Date d = new Date(current); - buf.append(df.format(d)).append("\n Total Time taken (ms):"); - buf.append(time).append("\n Total messages sent:"); - buf.append(_totalMsgCount).append("\n producer rate:"); - buf.append(ratio).append("\n"); - _summaryLog.write(buf.toString()); - System.out.println("---------- Test Ended -------------"); - _summaryLog.close(); } public static void main(String[] args) @@ -191,8 +165,8 @@ public class MessageProducerTest extends Options implements Runnable try { MessageProducerTest test = new MessageProducerTest(); - test.start(); - test.startTimerThread(); + test.init(); + test.run(); } catch(Exception e) { diff --git a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java index 2d04426794..6f68d5fedc 100644 --- a/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java +++ b/qpid/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java @@ -5,11 +5,9 @@ public class Options int _messageSize; boolean _transacted; boolean _synchronous; - String[] destArray; - int _producerCount; - int _consumerCount; + String _destination; long _expiry; - long _logDuration; + long _logFrequency; String _logFilePath; /** @@ -17,8 +15,6 @@ public class Options * -DmessageSize * -DuseQueue * -Dtransacted - * -DproducerCount - * -DconsumerCount * -Ddestinations * -DlogFilePath * -Duration=1H,30M,10S @@ -27,21 +23,16 @@ public class Options public void parseOptions() { _messageSize = Integer.parseInt(System.getProperty("messageSize","100")); - _synchronous = Boolean.parseBoolean( System.getProperty("synchronous", "false")); _transacted = false; - String destinations = System.getProperty("destinations"); - destArray = destinations.split(","); - _producerCount = Integer.parseInt(System.getProperty("producerCount","1")); - _consumerCount = Integer.parseInt(System.getProperty("consumerCount","1")); - _logDuration = Long.parseLong(System.getProperty("logDuration","10")); - _logDuration = _logDuration*1000*60; + _destination = System.getProperty("destinations"); + _logFrequency = Long.parseLong(System.getProperty("logDuration","10")); _logFilePath = System.getProperty("logFilePath"); _expiry = getExpiry(); System.out.println("============= Test Data ==================="); - System.out.println("Total no of producers : " + _producerCount); - System.out.println(_synchronous? "Total no of synchronous consumers : " : "Total no of asynchronous consumers :" + _consumerCount); - System.out.println("Log Frequency in mins : " + _logDuration/(1000*60)); + System.out.println("Destination : " + _destination); + System.out.println("Collect stats : " + Boolean.getBoolean("collect_stats")); + System.out.println("Log Frequency in msgs : " + _logFrequency); System.out.println("Log file path : " + _logFilePath); System.out.println("Test Duration : " + printTestDuration()); System.out.println("============= /Test Data ==================="); |
