summaryrefslogtreecommitdiff
path: root/java/perftests/src
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-02-12 13:25:36 +0000
committerRobert Greig <rgreig@apache.org>2007-02-12 13:25:36 +0000
commitb53c13e9d33aa35ed38c647bfa29fab0bbe58915 (patch)
treea3f9401cd095238ca4f4f5d6b04582f6a33adc56 /java/perftests/src
parentcd8ccb1a691ef5eb260b165f08fd9a07d1e5867d (diff)
downloadqpid-python-b53c13e9d33aa35ed38c647bfa29fab0bbe58915.tar.gz
(Patch submitted by Rupert Smith) Qpid-360 fixes.
Message type defaults to ByteMessage when not specified. Unknown destination type is used as default when not specified. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@506439 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests/src')
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/topic/Listener.java279
-rw-r--r--java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java47
2 files changed, 242 insertions, 84 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
index 47c608cfe4..76a0690b8c 100644
--- a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
+++ b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,122 +20,277 @@
*/
package org.apache.qpid.topic;
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
+import java.util.Random;
+
+import javax.jms.*;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.NDC;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
+
+/**
+ * This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for
+ * cross testing the java and cpp clients.
+ *
+ * <p/>How the cpp topic_publisher operates:
+ * It publishes text messages to the default topic exchange, on virtual host "/test", on the topic "topic_control", for
+ * the specified number of test messages to be sent.
+ * It publishes a report request message (on same topic), with the header text field "TYPE", value "REPORT_REQUEST",
+ * optionally within a transaction, and waits for the specified number of consumers to reply to this request. The
+ * listeners should reply to this message on a queue named "response", on virtual host "/test", with some sort of message
+ * about the number of messages received and how long it took, although the publisher never looks at the message content.
+ * The publisher then send a message (on the same topic), with the header text field "TYPE", value "TERMINATION_REQUEST",
+ * which the listener should close its connection and terminate upon receipt of.
+ *
+ * @deprecated Use PingPongBouncer instead once the below todo is completed.
+ *
+ * @todo Make the functionality of this class available through PingPongBouncer. Rename PingPongBouncer to
+ * PingListener and make its bouncing functionality optional, either through a switch or as an extending class
+ * called PingBouncer. Want to have as few ping classes as possible with configurable behaviour, re-using code
+ * accross p2p and topic style tests in almost all cases.
+ */
public class Listener implements MessageListener
{
+ private static Logger log = Logger.getLogger(Listener.class);
+
+ private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+ public static final String CONTROL_TOPIC = "topic_control";
+ public static final String RESPONSE_QUEUE = "response";
+
+ private final Topic _topic;
+ //private final Topic _control;
+
+ private final Queue _response;
+
+ private final byte[] _payload;
+
+ /** Holds the connection to listen on. */
private final Connection _connection;
+
+ /** Holds the producer to send control messages on. */
private final MessageProducer _controller;
+
+ /** Holds the JMS session. */
private final javax.jms.Session _session;
- private final MessageFactory _factory;
+
+ /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */
private boolean init;
+
+ /** Holds the count of messages received by this listener. */
private int count;
- private long start;
- Listener(Connection connection, int ackMode) throws Exception
- {
- this(connection, ackMode, null);
- }
+ /** Used to hold the start time of the first message. */
+ private long start;
+ private static String clientId;
Listener(Connection connection, int ackMode, String name) throws Exception
{
+ log.debug("Listener(Connection connection = " + connection + ", int ackMode = " + ackMode + ", String name = " + name
+ + "): called");
+
_connection = connection;
_session = connection.createSession(false, ackMode);
- _factory = new MessageFactory(_session);
- //register for events
- if(name == null)
+ if (_session instanceof AMQSession)
{
- _factory.createTopicConsumer().setMessageListener(this);
+ _topic = new AMQTopic(CONTROL_TOPIC);
+ //_control = new AMQTopic(CONTROL_TOPIC);
+ _response = new AMQQueue(RESPONSE_QUEUE);
}
else
{
- _factory.createDurableTopicConsumer(name).setMessageListener(this);
+ _topic = _session.createTopic(CONTROL_TOPIC);
+ //_control = _session.createTopic(CONTROL_TOPIC);
+ _response = _session.createQueue(RESPONSE_QUEUE);
}
- _connection.start();
+ int size = 256;
- _controller = _factory.createControlPublisher();
- System.out.println("Waiting for messages " +
- Config.getAckModeDescription(ackMode)
- + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")")
- + "...");
+ _payload = new byte[size];
- }
+ for (int i = 0; i < size; i++)
+ {
+ _payload[i] = (byte) DATA[i % DATA.length];
+ }
- private void shutdown()
- {
- try
+ //register for events
+ if (name == null)
{
- _session.close();
- _connection.stop();
- _connection.close();
+ log.debug("Calling _factory.createTopicConsumer().setMessageListener(this)");
+ createTopicConsumer().setMessageListener(this);
}
- catch(Exception e)
+ else
{
- e.printStackTrace(System.out);
+ log.debug("Calling createDurableTopicConsumer(name).setMessageListener(this)");
+ createDurableTopicConsumer(name).setMessageListener(this);
}
+
+ _connection.start();
+
+ _controller = createControlPublisher();
+ System.out.println("Waiting for messages " + Config.getAckModeDescription(ackMode)
+ +
+ ((name == null)
+ ? "" : (" (subscribed with name " + name + " and client id " + connection.getClientID() + ")"))
+ + "...");
}
- private void report()
+ public static void main(String[] argv) throws Exception
{
- try
- {
- String msg = getReport();
- _controller.send(_factory.createReportResponseMessage(msg));
- System.out.println("Sent report: " + msg);
- }
- catch(Exception e)
+ clientId = "Listener-" + System.currentTimeMillis();
+
+ NDC.push(clientId);
+
+ Config config = new Config();
+ config.setOptions(argv);
+
+ //Connection con = config.createConnection();
+ Connection con =
+ new AMQConnection("amqp://guest:guest@testid/test?brokerlist='" + config.getHost() + ":" + config.getPort()
+ + "'");
+
+ if (config.getClientId() != null)
{
- e.printStackTrace(System.out);
+ con.setClientID(config.getClientId());
}
+
+ new Listener(con, config.getAckMode(), config.getSubscriptionId());
+
+ NDC.pop();
+ NDC.remove();
}
- private String getReport()
+ /**
+ * Checks whether or not a text field on a message has the specified value.
+ *
+ * @param m The message to check.
+ * @param fieldName The name of the field to check.
+ * @param value The expected value of the field to compare with.
+ *
+ * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise.
+ *
+ * @throws JMSException Any JMSExceptions are allowed to fall through.
+ */
+ private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException
{
- long time = (System.currentTimeMillis() - start);
- return "Received " + count + " in " + time + "ms";
+ log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName
+ + ", String value = " + value + "): called");
+
+ String comp = m.getStringProperty(fieldName);
+
+ return (comp != null) && comp.equals(value);
}
public void onMessage(Message message)
{
- if(!init)
+ NDC.push(clientId);
+
+ log.debug("public void onMessage(Message message): called");
+
+ if (!init)
{
- start = System.currentTimeMillis();
+ start = System.nanoTime() / 1000000;
count = 0;
init = true;
}
- if(_factory.isShutdown(message))
+ try
{
- shutdown();
+ if (isShutdown(message))
+ {
+ shutdown();
+ }
+ else if (isReport(message))
+ {
+ //send a report:
+ report();
+ init = false;
+ }
}
- else if(_factory.isReport(message))
+ catch (JMSException e)
{
- //send a report:
- report();
- init = false;
+ log.warn("There was a JMSException during onMessage.", e);
}
- else if (++count % 100 == 0)
+ finally
{
- System.out.println("Received " + count + " messages.");
+ NDC.pop();
}
}
- public static void main(String[] argv) throws Exception
+ Message createReportResponseMessage(String msg) throws JMSException
{
- Config config = new Config();
- config.setOptions(argv);
+ return _session.createTextMessage(msg);
+ }
+
+ boolean isShutdown(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST");
+
+ log.debug("isShutdown = " + result);
+
+ return result;
+ }
+
+ boolean isReport(Message m) throws JMSException
+ {
+ boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST");
+
+ log.debug("isReport = " + result);
+
+ return result;
+ }
+
+ MessageConsumer createTopicConsumer() throws Exception
+ {
+ return _session.createConsumer(_topic);
+ }
+
+ MessageConsumer createDurableTopicConsumer(String name) throws Exception
+ {
+ return _session.createDurableSubscriber(_topic, name);
+ }
+
+ MessageProducer createControlPublisher() throws Exception
+ {
+ return _session.createProducer(_response);
+ }
- Connection con = config.createConnection();
- if(config.getClientId() != null)
+ private void shutdown()
+ {
+ try
{
- con.setClientID(config.getClientId());
+ _session.close();
+ _connection.stop();
+ _connection.close();
}
- new Listener(con, config.getAckMode(), config.getSubscriptionId());
+ catch (Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private void report()
+ {
+ try
+ {
+ String msg = getReport();
+ _controller.send(createReportResponseMessage(msg));
+ System.out.println("Sent report: " + msg);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ private String getReport()
+ {
+ long time = ((System.nanoTime() / 1000000) - start);
+
+ return "Received " + count + " in " + time + "ms";
}
}
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
index 1520f18408..8b87f76c3e 100644
--- a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
+++ b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.topic;
+import javax.jms.*;
+
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import javax.jms.*;
-
/**
*/
class MessageFactory
@@ -36,7 +36,6 @@ class MessageFactory
private final Topic _control;
private final byte[] _payload;
-
MessageFactory(Session session) throws JMSException
{
this(session, 256);
@@ -45,24 +44,39 @@ class MessageFactory
MessageFactory(Session session, int size) throws JMSException
{
_session = session;
- if(session instanceof AMQSession)
+ if (session instanceof AMQSession)
{
- _topic = new AMQTopic("topictest.messages");
+ _topic = new AMQTopic("topic_control");
_control = new AMQTopic("topictest.control");
}
else
{
- _topic = session.createTopic("topictest.messages");
+ _topic = session.createTopic("topic_control");
_control = session.createTopic("topictest.control");
}
+
_payload = new byte[size];
- for(int i = 0; i < size; i++)
+ for (int i = 0; i < size; i++)
{
_payload[i] = (byte) DATA[i % DATA.length];
}
}
+ private static boolean checkText(Message m, String s)
+ {
+ try
+ {
+ return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace(System.out);
+
+ return false;
+ }
+ }
+
Topic getTopic()
{
return _topic;
@@ -72,6 +86,7 @@ class MessageFactory
{
BytesMessage msg = _session.createBytesMessage();
msg.writeBytes(_payload);
+
return msg;
}
@@ -109,6 +124,7 @@ class MessageFactory
catch (JMSException e)
{
e.printStackTrace(System.out);
+
return e.toString();
}
}
@@ -137,17 +153,4 @@ class MessageFactory
{
return _session.createProducer(_control);
}
-
- private static boolean checkText(Message m, String s)
- {
- try
- {
- return m instanceof TextMessage && ((TextMessage) m).getText().equals(s);
- }
- catch (JMSException e)
- {
- e.printStackTrace(System.out);
- return false;
- }
- }
}