diff options
| author | Robert Greig <rgreig@apache.org> | 2007-02-12 13:25:36 +0000 |
|---|---|---|
| committer | Robert Greig <rgreig@apache.org> | 2007-02-12 13:25:36 +0000 |
| commit | b53c13e9d33aa35ed38c647bfa29fab0bbe58915 (patch) | |
| tree | a3f9401cd095238ca4f4f5d6b04582f6a33adc56 /java/perftests | |
| parent | cd8ccb1a691ef5eb260b165f08fd9a07d1e5867d (diff) | |
| download | qpid-python-b53c13e9d33aa35ed38c647bfa29fab0bbe58915.tar.gz | |
(Patch submitted by Rupert Smith) Qpid-360 fixes.
Message type defaults to ByteMessage when not specified.
Unknown destination type is used as default when not specified.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@506439 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/perftests')
| -rw-r--r-- | java/perftests/src/main/java/org/apache/qpid/topic/Listener.java | 279 | ||||
| -rw-r--r-- | java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java | 47 |
2 files changed, 242 insertions, 84 deletions
diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java index 47c608cfe4..76a0690b8c 100644 --- a/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java +++ b/java/perftests/src/main/java/org/apache/qpid/topic/Listener.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,122 +20,277 @@ */ package org.apache.qpid.topic; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; +import java.util.Random; + +import javax.jms.*; + +import org.apache.log4j.Logger; +import org.apache.log4j.NDC; +import org.apache.qpid.client.AMQConnection; +import org.apache.qpid.client.AMQQueue; +import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQTopic; + +/** + * This class has not kept up to date with the topic_listener in the cpp tests. It should provide identical behaviour for + * cross testing the java and cpp clients. + * + * <p/>How the cpp topic_publisher operates: + * It publishes text messages to the default topic exchange, on virtual host "/test", on the topic "topic_control", for + * the specified number of test messages to be sent. + * It publishes a report request message (on same topic), with the header text field "TYPE", value "REPORT_REQUEST", + * optionally within a transaction, and waits for the specified number of consumers to reply to this request. The + * listeners should reply to this message on a queue named "response", on virtual host "/test", with some sort of message + * about the number of messages received and how long it took, although the publisher never looks at the message content. + * The publisher then send a message (on the same topic), with the header text field "TYPE", value "TERMINATION_REQUEST", + * which the listener should close its connection and terminate upon receipt of. + * + * @deprecated Use PingPongBouncer instead once the below todo is completed. + * + * @todo Make the functionality of this class available through PingPongBouncer. Rename PingPongBouncer to + * PingListener and make its bouncing functionality optional, either through a switch or as an extending class + * called PingBouncer. Want to have as few ping classes as possible with configurable behaviour, re-using code + * accross p2p and topic style tests in almost all cases. + */ public class Listener implements MessageListener { + private static Logger log = Logger.getLogger(Listener.class); + + private static final char[] DATA = "abcdefghijklmnopqrstuvwxyz".toCharArray(); + public static final String CONTROL_TOPIC = "topic_control"; + public static final String RESPONSE_QUEUE = "response"; + + private final Topic _topic; + //private final Topic _control; + + private final Queue _response; + + private final byte[] _payload; + + /** Holds the connection to listen on. */ private final Connection _connection; + + /** Holds the producer to send control messages on. */ private final MessageProducer _controller; + + /** Holds the JMS session. */ private final javax.jms.Session _session; - private final MessageFactory _factory; + + /** Holds a flag to indicate that a timer has begun on the first message. Reset when report is sent. */ private boolean init; + + /** Holds the count of messages received by this listener. */ private int count; - private long start; - Listener(Connection connection, int ackMode) throws Exception - { - this(connection, ackMode, null); - } + /** Used to hold the start time of the first message. */ + private long start; + private static String clientId; Listener(Connection connection, int ackMode, String name) throws Exception { + log.debug("Listener(Connection connection = " + connection + ", int ackMode = " + ackMode + ", String name = " + name + + "): called"); + _connection = connection; _session = connection.createSession(false, ackMode); - _factory = new MessageFactory(_session); - //register for events - if(name == null) + if (_session instanceof AMQSession) { - _factory.createTopicConsumer().setMessageListener(this); + _topic = new AMQTopic(CONTROL_TOPIC); + //_control = new AMQTopic(CONTROL_TOPIC); + _response = new AMQQueue(RESPONSE_QUEUE); } else { - _factory.createDurableTopicConsumer(name).setMessageListener(this); + _topic = _session.createTopic(CONTROL_TOPIC); + //_control = _session.createTopic(CONTROL_TOPIC); + _response = _session.createQueue(RESPONSE_QUEUE); } - _connection.start(); + int size = 256; - _controller = _factory.createControlPublisher(); - System.out.println("Waiting for messages " + - Config.getAckModeDescription(ackMode) - + (name == null ? "" : " (subscribed with name " + name + " and client id " + connection.getClientID() + ")") - + "..."); + _payload = new byte[size]; - } + for (int i = 0; i < size; i++) + { + _payload[i] = (byte) DATA[i % DATA.length]; + } - private void shutdown() - { - try + //register for events + if (name == null) { - _session.close(); - _connection.stop(); - _connection.close(); + log.debug("Calling _factory.createTopicConsumer().setMessageListener(this)"); + createTopicConsumer().setMessageListener(this); } - catch(Exception e) + else { - e.printStackTrace(System.out); + log.debug("Calling createDurableTopicConsumer(name).setMessageListener(this)"); + createDurableTopicConsumer(name).setMessageListener(this); } + + _connection.start(); + + _controller = createControlPublisher(); + System.out.println("Waiting for messages " + Config.getAckModeDescription(ackMode) + + + ((name == null) + ? "" : (" (subscribed with name " + name + " and client id " + connection.getClientID() + ")")) + + "..."); } - private void report() + public static void main(String[] argv) throws Exception { - try - { - String msg = getReport(); - _controller.send(_factory.createReportResponseMessage(msg)); - System.out.println("Sent report: " + msg); - } - catch(Exception e) + clientId = "Listener-" + System.currentTimeMillis(); + + NDC.push(clientId); + + Config config = new Config(); + config.setOptions(argv); + + //Connection con = config.createConnection(); + Connection con = + new AMQConnection("amqp://guest:guest@testid/test?brokerlist='" + config.getHost() + ":" + config.getPort() + + "'"); + + if (config.getClientId() != null) { - e.printStackTrace(System.out); + con.setClientID(config.getClientId()); } + + new Listener(con, config.getAckMode(), config.getSubscriptionId()); + + NDC.pop(); + NDC.remove(); } - private String getReport() + /** + * Checks whether or not a text field on a message has the specified value. + * + * @param m The message to check. + * @param fieldName The name of the field to check. + * @param value The expected value of the field to compare with. + * + * @return <tt>true</tt>If the specified field has the specified value, <tt>fals</tt> otherwise. + * + * @throws JMSException Any JMSExceptions are allowed to fall through. + */ + private static boolean checkTextField(Message m, String fieldName, String value) throws JMSException { - long time = (System.currentTimeMillis() - start); - return "Received " + count + " in " + time + "ms"; + log.debug("private static boolean checkTextField(Message m = " + m + ", String fieldName = " + fieldName + + ", String value = " + value + "): called"); + + String comp = m.getStringProperty(fieldName); + + return (comp != null) && comp.equals(value); } public void onMessage(Message message) { - if(!init) + NDC.push(clientId); + + log.debug("public void onMessage(Message message): called"); + + if (!init) { - start = System.currentTimeMillis(); + start = System.nanoTime() / 1000000; count = 0; init = true; } - if(_factory.isShutdown(message)) + try { - shutdown(); + if (isShutdown(message)) + { + shutdown(); + } + else if (isReport(message)) + { + //send a report: + report(); + init = false; + } } - else if(_factory.isReport(message)) + catch (JMSException e) { - //send a report: - report(); - init = false; + log.warn("There was a JMSException during onMessage.", e); } - else if (++count % 100 == 0) + finally { - System.out.println("Received " + count + " messages."); + NDC.pop(); } } - public static void main(String[] argv) throws Exception + Message createReportResponseMessage(String msg) throws JMSException { - Config config = new Config(); - config.setOptions(argv); + return _session.createTextMessage(msg); + } + + boolean isShutdown(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "TERMINATION_REQUEST"); + + log.debug("isShutdown = " + result); + + return result; + } + + boolean isReport(Message m) throws JMSException + { + boolean result = checkTextField(m, "TYPE", "REPORT_REQUEST"); + + log.debug("isReport = " + result); + + return result; + } + + MessageConsumer createTopicConsumer() throws Exception + { + return _session.createConsumer(_topic); + } + + MessageConsumer createDurableTopicConsumer(String name) throws Exception + { + return _session.createDurableSubscriber(_topic, name); + } + + MessageProducer createControlPublisher() throws Exception + { + return _session.createProducer(_response); + } - Connection con = config.createConnection(); - if(config.getClientId() != null) + private void shutdown() + { + try { - con.setClientID(config.getClientId()); + _session.close(); + _connection.stop(); + _connection.close(); } - new Listener(con, config.getAckMode(), config.getSubscriptionId()); + catch (Exception e) + { + e.printStackTrace(System.out); + } + } + + private void report() + { + try + { + String msg = getReport(); + _controller.send(createReportResponseMessage(msg)); + System.out.println("Sent report: " + msg); + } + catch (Exception e) + { + e.printStackTrace(System.out); + } + } + + private String getReport() + { + long time = ((System.nanoTime() / 1000000) - start); + + return "Received " + count + " in " + time + "ms"; } } diff --git a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java index 1520f18408..8b87f76c3e 100644 --- a/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java +++ b/java/perftests/src/main/java/org/apache/qpid/topic/MessageFactory.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,11 +20,11 @@ */ package org.apache.qpid.topic; +import javax.jms.*; + import org.apache.qpid.client.AMQSession; import org.apache.qpid.client.AMQTopic; -import javax.jms.*; - /** */ class MessageFactory @@ -36,7 +36,6 @@ class MessageFactory private final Topic _control; private final byte[] _payload; - MessageFactory(Session session) throws JMSException { this(session, 256); @@ -45,24 +44,39 @@ class MessageFactory MessageFactory(Session session, int size) throws JMSException { _session = session; - if(session instanceof AMQSession) + if (session instanceof AMQSession) { - _topic = new AMQTopic("topictest.messages"); + _topic = new AMQTopic("topic_control"); _control = new AMQTopic("topictest.control"); } else { - _topic = session.createTopic("topictest.messages"); + _topic = session.createTopic("topic_control"); _control = session.createTopic("topictest.control"); } + _payload = new byte[size]; - for(int i = 0; i < size; i++) + for (int i = 0; i < size; i++) { _payload[i] = (byte) DATA[i % DATA.length]; } } + private static boolean checkText(Message m, String s) + { + try + { + return (m instanceof TextMessage) && ((TextMessage) m).getText().equals(s); + } + catch (JMSException e) + { + e.printStackTrace(System.out); + + return false; + } + } + Topic getTopic() { return _topic; @@ -72,6 +86,7 @@ class MessageFactory { BytesMessage msg = _session.createBytesMessage(); msg.writeBytes(_payload); + return msg; } @@ -109,6 +124,7 @@ class MessageFactory catch (JMSException e) { e.printStackTrace(System.out); + return e.toString(); } } @@ -137,17 +153,4 @@ class MessageFactory { return _session.createProducer(_control); } - - private static boolean checkText(Message m, String s) - { - try - { - return m instanceof TextMessage && ((TextMessage) m).getText().equals(s); - } - catch (JMSException e) - { - e.printStackTrace(System.out); - return false; - } - } } |
