summaryrefslogtreecommitdiff
path: root/java/perftests/src
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-11-14 09:38:05 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-11-14 09:38:05 +0000
commit03fd65578602ff436f4f2938528b47aa35f074e0 (patch)
tree50d0116605bde038a5d57df79327625ed96c2d52 /java/perftests/src
parent5447b80198e5ddac4a10678e1dd5fe76cd435d2b (diff)
downloadqpid-python-03fd65578602ff436f4f2938528b47aa35f074e0.tar.gz
Added distinction between sync and async consumers
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@594814 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests/src')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java91
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java124
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java102
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java74
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java4
5 files changed, 262 insertions, 133 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java
new file mode 100644
index 0000000000..111b43dfb9
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSAsyncConsumer.java
@@ -0,0 +1,91 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.client.perf;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ *
+ *
+ */
+public class JMSAsyncConsumer implements MessageListener, JMSConsumer
+{
+ private static final Logger _logger = LoggerFactory.getLogger(JMSSyncConsumer.class);
+
+ private String _id;
+ private Connection _connection;
+ private Session _session;
+ private MessageConsumer _consumer;
+ private Destination _destination;
+ private boolean _transacted;
+ private int _ackMode = Session.AUTO_ACKNOWLEDGE;
+ private AtomicBoolean _run = new AtomicBoolean(true);
+ private long _currentMsgCount;
+
+ /* Not implementing transactions for first phase */
+ public JMSAsyncConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception
+ {
+ _id = id;
+ _connection = connection;
+ _destination = destination;
+ _transacted = transacted;
+ _ackMode = ackMode;
+ _session = _connection.createSession(_transacted, _ackMode);
+ _consumer = _session.createConsumer(_destination);
+ _consumer.setMessageListener(this);
+ }
+
+
+
+ public void onMessage(Message message)
+ {
+ _currentMsgCount ++;
+ }
+
+ public void stopConsuming()
+ {
+ System.out.println("Producer received notification to stop");
+ try
+ {
+ _session.close();
+ _connection.close();
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error Closing JMSSyncConsumer:"+ _id, e);
+ }
+ }
+
+ public String getId()
+ {
+ return _id;
+ }
+
+ /* Not worried about synchronizing as accuracy here is not that important.
+ * So if this method is invoked the count maybe off by a few digits.
+ * But when the test stops, this will always return the proper count.
+ */
+ public long getCurrentMessageCount()
+ {
+ return _currentMsgCount;
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
index 635fcaa248..4c41461cd4 100644
--- a/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
+++ b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSConsumer.java
@@ -1,102 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.qpid.client.perf;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JMSConsumer implements Runnable
+/**
+ *
+ *
+ */
+public interface JMSConsumer
{
- private static final Logger _logger = LoggerFactory.getLogger(JMSConsumer.class);
-
- private String _id;
- private Connection _connection;
- private Session _session;
- private MessageConsumer _consumer;
- private Destination _destination;
- private boolean _transacted;
- private int _ackMode = Session.AUTO_ACKNOWLEDGE;
- private AtomicBoolean _run = new AtomicBoolean(true);
- private long _currentMsgCount;
-
- /* Not implementing transactions for first phase */
- public JMSConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception
- {
- _id = id;
- _connection = connection;
- _destination = destination;
- _transacted = transacted;
- _ackMode = ackMode;
- }
-
- public void run()
- {
- _run.set(true);
-
- try
- {
- _session = _connection.createSession(_transacted, _ackMode);
- _consumer = _session.createConsumer(_destination);
- }
- catch(Exception e)
- {
- _logger.error("Error Setting up JMSProducer:"+ _id, e);
- }
-
- while (_run.get())
- {
- try
- {
- BytesMessage msg = (BytesMessage)_consumer.receive();
- if (msg != null)
- {
- long msgId = Integer.parseInt(msg.getJMSCorrelationID());
- /*if (_currentMsgCount+1 != msgId)
- {
- _logger.error("Error : Message received out of order in JMSConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1);
- }*/
- _currentMsgCount ++;
- }
- }
- catch(Exception e)
- {
- _logger.error("Error Receiving message from JMSConsumer:" + _id, e);
- }
- }
- try
- {
- _session.close();
- _connection.close();
- }
- catch(Exception e)
- {
- _logger.error("Error Closing JMSConsumer:"+ _id, e);
- }
- }
-
- public void stopConsuming()
- {
- _run.set(false);
- System.out.println("Producer received notification to stop");
- }
-
- public String getId()
- {
- return _id;
- }
-
- /* Not worried about synchronizing as accuracy here is not that important.
- * So if this method is invoked the count maybe off by a few digits.
- * But when the test stops, this will always return the proper count.
- */
- public long getCurrentMessageCount()
- {
- return _currentMsgCount;
- }
+ public String getId();
+ public void stopConsuming();
+ public long getCurrentMessageCount();
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java
new file mode 100644
index 0000000000..5fd103746b
--- /dev/null
+++ b/java/perftests/src/main/java/org/apache/qpid/client/perf/JMSSyncConsumer.java
@@ -0,0 +1,102 @@
+package org.apache.qpid.client.perf;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSSyncConsumer implements Runnable, JMSConsumer
+{
+ private static final Logger _logger = LoggerFactory.getLogger(JMSSyncConsumer.class);
+
+ private String _id;
+ private Connection _connection;
+ private Session _session;
+ private MessageConsumer _consumer;
+ private Destination _destination;
+ private boolean _transacted;
+ private int _ackMode = Session.AUTO_ACKNOWLEDGE;
+ private AtomicBoolean _run = new AtomicBoolean(true);
+ private long _currentMsgCount;
+
+ /* Not implementing transactions for first phase */
+ public JMSSyncConsumer(String id,Connection connection, Destination destination,boolean transacted,int ackMode) throws Exception
+ {
+ _id = id;
+ _connection = connection;
+ _destination = destination;
+ _transacted = transacted;
+ _ackMode = ackMode;
+ }
+
+ public void run()
+ {
+ _run.set(true);
+
+ try
+ {
+ _session = _connection.createSession(_transacted, _ackMode);
+ _consumer = _session.createConsumer(_destination);
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error Setting up JMSProducer:"+ _id, e);
+ }
+
+ while (_run.get())
+ {
+ try
+ {
+ BytesMessage msg = (BytesMessage)_consumer.receive();
+ if (msg != null)
+ {
+ // long msgId = Integer.parseInt(msg.getJMSCorrelationID());
+ /*if (_currentMsgCount+1 != msgId)
+ {
+ _logger.error("Error : Message received out of order in JMSSyncConsumer:" + _id + " message id was " + msgId + " expected: " + _currentMsgCount+1);
+ }*/
+ _currentMsgCount ++;
+ }
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error Receiving message from JMSSyncConsumer:" + _id, e);
+ }
+ }
+ try
+ {
+ _session.close();
+ _connection.close();
+ }
+ catch(Exception e)
+ {
+ _logger.error("Error Closing JMSSyncConsumer:"+ _id, e);
+ }
+ }
+
+ public void stopConsuming()
+ {
+ _run.set(false);
+ System.out.println("Producer received notification to stop");
+ }
+
+ public String getId()
+ {
+ return _id;
+ }
+
+ /* Not worried about synchronizing as accuracy here is not that important.
+ * So if this method is invoked the count maybe off by a few digits.
+ * But when the test stops, this will always return the proper count.
+ */
+ public long getCurrentMessageCount()
+ {
+ return _currentMsgCount;
+ }
+}
diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
index 99659fed44..52356ae243 100644
--- a/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
+++ b/java/perftests/src/main/java/org/apache/qpid/client/perf/MessageConsumerTest.java
@@ -1,13 +1,11 @@
package org.apache.qpid.client.perf;
import java.io.FileWriter;
-import java.io.RandomAccessFile;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Session;
@@ -21,7 +19,7 @@ public class MessageConsumerTest extends Options implements Runnable
private static final Logger _logger = LoggerFactory.getLogger(MessageConsumerTest.class);
private SimpleDateFormat df = new SimpleDateFormat("h:mm a");
- private Map<Integer,JMSConsumer> _consumers = new ConcurrentHashMap<Integer,JMSConsumer>();
+ private Map<Integer, JMSConsumer> _consumers = new ConcurrentHashMap<Integer, JMSConsumer>();
private int _count;
String _logFileName;
private long _gracePeriod = 5 * 60 * 1000;
@@ -30,30 +28,38 @@ public class MessageConsumerTest extends Options implements Runnable
public void start() throws Exception
{
- this.parseOptions();
- boolean useSameDest = true;
- _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis();
+ this.parseOptions();
+ boolean useSameDest = true;
+ _logFileName = _logFilePath + "/MessageConsumerTest_" + System.currentTimeMillis();
- // use each destination with a different producer
- if (_producerCount == destArray.length)
- {
- useSameDest = false;
- }
- for (;_count < _producerCount;_count++)
- {
- createAndStartProducer(useSameDest?destArray[0]:destArray[_count]);
- }
+ // use each destination with a different producer
+ if (_producerCount == destArray.length)
+ {
+ useSameDest = false;
+ }
+ for (; _count < _producerCount; _count++)
+ {
+ createAndStartConsumer(useSameDest ? destArray[0] : destArray[_count]);
+ }
}
- private void createAndStartProducer(String routingKey)throws Exception
+ private void createAndStartConsumer(String routingKey) throws Exception
{
AMQConnection con = ConnectionUtility.getInstance().getConnection();
con.start();
- Destination dest = new AMQTopic(con,routingKey);
- JMSConsumer prod = new JMSConsumer(String.valueOf(_count),(Connection)con, dest, _transacted,Session.AUTO_ACKNOWLEDGE);
- Thread t = new Thread(prod);
- t.setName("JMSConsumer-"+_count);
- t.start();
+ Destination dest = new AMQTopic(con, routingKey);
+ JMSConsumer prod;
+ if (_synchronous)
+ {
+ prod = new JMSSyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE);
+ Thread t = new Thread((JMSSyncConsumer) prod);
+ t.setName("JMSSyncConsumer-" + _count);
+ t.start();
+ }
+ else
+ {
+ prod = new JMSAsyncConsumer(String.valueOf(_count), con, dest, _transacted, Session.AUTO_ACKNOWLEDGE);
+ }
_consumers.put(_count, prod);
}
@@ -71,12 +77,12 @@ public class MessageConsumerTest extends Options implements Runnable
runReaper(false);
try
{
- while(run)
+ while (run)
{
Thread.sleep(_logDuration);
runReaper(false);
- if(System.currentTimeMillis() + _gracePeriod - _startTime > _expiry )
+ if (System.currentTimeMillis() + _gracePeriod - _startTime > _expiry)
{
// time to stop the test.
for (Integer id : _consumers.keySet())
@@ -91,7 +97,7 @@ public class MessageConsumerTest extends Options implements Runnable
}
catch (InterruptedException e)
{
- _logger.error("The timer thread exited",e);
+ _logger.error("The timer thread exited", e);
}
}
@@ -99,11 +105,11 @@ public class MessageConsumerTest extends Options implements Runnable
{
try
{
- FileWriter _logFile = new FileWriter(_logFileName + ".csv",true);
+ FileWriter _logFile = new FileWriter(_logFileName + ".csv", true);
for (Integer id : _consumers.keySet())
{
JMSConsumer prod = _consumers.get(id);
- StringBuffer buf = new StringBuffer("JMSConsumer(").append(prod.getId()).append("),");
+ StringBuffer buf = new StringBuffer("JMSSyncConsumer(").append(prod.getId()).append("),");
Date d = new Date(System.currentTimeMillis());
buf.append(df.format(d)).append(",");
buf.append(d.getTime()).append(",");
@@ -113,21 +119,21 @@ public class MessageConsumerTest extends Options implements Runnable
}
_logFile.close();
- FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv",true);
- StringBuffer buf = new StringBuffer("JMSConsumer,");
+ FileWriter _memoryLog = new FileWriter(_logFileName + "_memory.csv", true);
+ StringBuffer buf = new StringBuffer("JMSSyncConsumer,");
Date d = new Date(System.currentTimeMillis());
buf.append(df.format(d)).append(",");
buf.append(d.getTime()).append(",");
buf.append(totalMsgCount).append(",");
- buf.append(Runtime.getRuntime().totalMemory() -Runtime.getRuntime().freeMemory()).append("\n");
+ buf.append(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()).append("\n");
_memoryLog.write(buf.toString());
_memoryLog.close();
if (printSummary)
{
double totaltime = d.getTime() - _startTime;
double dCount = totalMsgCount;
- double ratio = (dCount/totaltime)*1000;
- FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary",true);
+ double ratio = (dCount / totaltime) * 1000;
+ FileWriter _summaryLog = new FileWriter(_logFileName + "_Summary", true);
buf = new StringBuffer("MessageConsumerTest \n Test started at : ");
buf.append(df.format(new Date(_startTime))).append("\n Test finished at : ");
d = new Date(System.currentTimeMillis());
@@ -140,9 +146,9 @@ public class MessageConsumerTest extends Options implements Runnable
_summaryLog.close();
}
}
- catch(Exception e)
+ catch (Exception e)
{
- _logger.error("Error printing info to the log file",e);
+ _logger.error("Error printing info to the log file", e);
}
}
@@ -154,7 +160,7 @@ public class MessageConsumerTest extends Options implements Runnable
test.start();
test.startTimerThread();
}
- catch(Exception e)
+ catch (Exception e)
{
e.printStackTrace();
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java b/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java
index ee22b6804b..0ff420bf75 100644
--- a/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java
+++ b/java/perftests/src/main/java/org/apache/qpid/client/perf/Options.java
@@ -4,6 +4,7 @@ public class Options
{
int _messageSize;
boolean _transacted;
+ boolean _synchronous;
String[] destArray;
int _producerCount;
int _consumerCount;
@@ -25,6 +26,7 @@ public class Options
public void parseOptions()
{
_messageSize = Integer.parseInt(System.getProperty("messageSize","100"));
+ _synchronous =Boolean.parseBoolean( System.getProperty("synchronous", "false"));
_transacted = false;
String destinations = System.getProperty("destinations");
destArray = destinations.split(",");
@@ -37,7 +39,7 @@ public class Options
System.out.println("============= Test Data ===================");
System.out.println("Total no of producers : " + _producerCount);
- System.out.println("Total no of consumer : " + _consumerCount);
+ System.out.println(_synchronous? "Total no of synchronous consumers : " : "Total no of asynchronous consumers :" + _consumerCount);
System.out.println("Log Frequency in mins : " + _logDuration/(1000*60));
System.out.println("Log file path : " + _logFilePath);
System.out.println("Test Duration : " + printTestDuration());