diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-20 10:59:04 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-20 10:59:04 +0000 |
| commit | 19861e2b6110e60132d8b19635cd9b470cd9d2b9 (patch) | |
| tree | 1e140dd0cbb09f012e059fb627b65b05d95d84dd /java/client/example | |
| parent | 73eee018d301031a212fe3c8a8127b84c2b580ac (diff) | |
| download | qpid-python-19861e2b6110e60132d8b19635cd9b470cd9d2b9.tar.gz | |
Added new JMS examples
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@596610 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/example')
11 files changed, 1827 insertions, 0 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/BaseExample.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/BaseExample.java new file mode 100644 index 0000000000..dac116f91d --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/BaseExample.java @@ -0,0 +1,176 @@ +package org.apache.qpid.example.jmsexample; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import java.util.Hashtable; +import java.util.Properties; + +import org.apache.qpid.example.jmsexample.common.ArgProcessor; + +/** + * Abstract base class for providing common argument parsing features. + * <p/> + * <p>Classes that extend BaseExample support the following command-line arguments:</p> + * <table> + * <tr><td>-factoryName</td> <td>ConnectionFactory name</td></tr> + * <tr><td>-delMode</td> <td>Delivery mode [persistent | non-persistent]</td></tr> + * <tr><td>-numMessages</td> <td>Number of messages to process</td></tr> + * </table> + */ + +abstract public class BaseExample +{ + /* The AMQP INITIAL_CONTEXT_FACTORY */ + private static final String INITIAL_CONTEXT_FACTORY_NAME = + "org.apache.qpid.jndi.PropertiesFileInitialContextFactory"; + + /* Default connection factory name. */ + private static final String DEFAULT_CONNECTION_FACTORY_NAME = "ConnectionFactory"; + + /* Default number of messages to process. */ + private static final int DEFAULT_NUMBER_MESSAGES = 1; + + /* JNDI provider URL. */ + private String _providerURL; + + /* Number of messages to process. */ + private int _numberMessages; + + /* The delivery Mode */ + private int _deliveryMode; + + /* The argument processor */ + protected ArgProcessor _argProcessor; + + /* The supported properties */ + protected static Properties _options = new Properties(); + + /* The properties default values */ + protected static Properties _defaults = new Properties(); + + /* The broker communication objects */ + private InitialContext _initialContext; + private ConnectionFactory _connectionFactory; + + /** + * Protected constructor to create a example client. + * + * @param Id Identity string used in log messages, for example, the name of the example program. + * @param args String array of arguments. + */ + protected BaseExample(String Id, String[] args) + { + _options.put("-factoryName", "ConnectionFactory name"); + _defaults.put("-factoryName", DEFAULT_CONNECTION_FACTORY_NAME); + _options.put("-providerURL", "JNDI Provider URL"); + _options.put("-deliveryMode", "Delivery mode [persistent | non-persistent]"); + _defaults.put("-deliveryMode", "non-persistent"); + _options.put("-numMessages", "Number of messages to process"); + _defaults.put("-numMessages", String.valueOf(DEFAULT_NUMBER_MESSAGES)); + + _argProcessor = new ArgProcessor(Id, args, _options, _defaults); + _argProcessor.display(); + //Set the initial context factory + _providerURL = _argProcessor.getStringArgument("-providerURL"); + // Set the number of messages + _numberMessages = _argProcessor.getIntegerArgument("-numMessages"); + // Set the delivery mode + _deliveryMode = _argProcessor.getStringArgument("-deliveryMode") + .equals("persistent") ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; + } + + /** + * Get the DeliveryMode to use when publishing messages. + * + * @return The delivery mode, either javax.jms.DeliveryMode.NON_PERSISTENT + * or javax.jms.DeliveryMode.PERSISTENT. + */ + protected int getDeliveryMode() + { + return _deliveryMode; + } + + /** + * Get the number of messages to be used. + * + * @return the number of messages to be used. + */ + protected int getNumberMessages() + { + return _numberMessages; + } + + + /** + * Get the JNDI provider URL. + * + * @return the JNDI provider URL. + */ + private String getProviderURL() + { + return _providerURL; + } + + /** + * we assume that the environment is correctly set + * i.e. -Djava.naming.provider.url="..//example.properties" + * + * @return an initial context + * @throws Exception if there is an error getting the context + */ + public InitialContext getInitialContext() throws Exception + { + if (_initialContext == null) + { + Hashtable<String, String> jndiEnvironment = new Hashtable<String, String>(); + jndiEnvironment.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY_NAME); + if (getProviderURL() != null) + { + jndiEnvironment.put(Context.PROVIDER_URL, getProviderURL()); + } + else + { + jndiEnvironment.put("connectionfactory.ConnectionFactory", + "qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672"); + jndiEnvironment.put("queue.message_queue", "message_queue"); + jndiEnvironment.put("topic.usa.news", "usa.news"); + jndiEnvironment.put("topic.usa.weather", "usa.weather"); + jndiEnvironment.put("topic.usa", "usa.#"); + jndiEnvironment.put("topic.europe.weather", "europe.weather"); + jndiEnvironment.put("topic.europe.news", "europe.news"); + jndiEnvironment.put("topic.europe", "europe.#"); + jndiEnvironment.put("topic.news", "#.news"); + jndiEnvironment.put("topic.weather", "#.weather"); + } + _initialContext = new InitialContext(jndiEnvironment); + } + return _initialContext; + } + + /** + * Get a connection factory for the currently used broker + * + * @return A conection factory + * @throws Exception if there is an error getting the tactory + */ + public ConnectionFactory getConnectionFactory() throws Exception + { + if (_connectionFactory == null) + { + _connectionFactory = (ConnectionFactory) getInitialContext().lookup(DEFAULT_CONNECTION_FACTORY_NAME); + } + return _connectionFactory; + } + + /** + * Get a connection (remote or in-VM) + * + * @return a newly created connection + * @throws Exception if there is an error getting the connection + */ + public Connection getConnection() throws Exception + { + return getConnectionFactory().createConnection("guest", "guest"); + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/ArgProcessor.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/ArgProcessor.java new file mode 100644 index 0000000000..d551685230 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/ArgProcessor.java @@ -0,0 +1,201 @@ +package org.apache.qpid.example.jmsexample.common; + +import java.util.Enumeration; +import java.util.Properties; + +/** + * Command-line argument processor. + */ +public class ArgProcessor +{ + /** Textual representation of program using parser. */ + private String _id; + + /** Command line arguments to parse. */ + private String[] _argv; + + /** Properties table mapping argument name to description */ + private Properties _options; + + /** Properties table mapping argument name to default value */ + private Properties _defaults; + + /** Properties table containing parsed arguments */ + private Properties _parsed; + + /** + * Create an argument parser and parse the supplied arguments. If the arguments + * cannot be parsed (or the argument <code>-help</code> is supplied) then + * exit + * + * @param id textual representation of program identity using parser. + * @param argv the argument array. + * @param options list of allowable options stored in the keys. + * @param defaults list of option default values keyed on option name. + */ + public ArgProcessor(String id, String[] argv, Properties options, Properties defaults) + { + _id = id; + _argv = argv; + _options = options; + _defaults = defaults; + // Try to parse. If we can't then exit + if (!parse()) + { + System.exit(0); + } + } + + /** + * Get the processed arguments. + * @return Properties table mapping argument name to current value, eg, ["-foo", "MyFoo"]. + */ + public Properties getProcessedArgs() + { + return _parsed; + } + + /** + * Display the current property settings on the supplied stream. + * Output sent to <code>System.out</code>. + */ + public void display() + { + System.out.println(_id + " current settings:"); + Enumeration optionEnumeration = _options.keys(); + while (optionEnumeration.hasMoreElements()) + { + String option = (String) optionEnumeration.nextElement(); + String description = (String) _options.get(option); + String currentValue = (String) _parsed.get(option); + if (currentValue != null) + { + System.out.println("\t" + description + " = " + currentValue); + } + } + System.out.println(); + } + + /** + * Get the value of the specified option as a String. + * @param option the option to query. + * @return the value of the option. + */ + public String getStringArgument(String option) + { + return _parsed.getProperty(option); + } + + /** + * Get the value of the specified option as an integer. + * @param option the option to query. + * @return the value of the option. + */ + public int getIntegerArgument(String option) + { + String value = _parsed.getProperty(option); + return Integer.parseInt(value); + } + + /** + * Get the value of the specified option as a boolean. + * @param option the option to query. + * @return the value of the option. + */ + public boolean getBooleanArgument(String option) + { + String value = _parsed.getProperty(option); + return Boolean.valueOf(value); + } + + /** + * Parse the arguments. + * @return true if parsed. + */ + private boolean parse() + { + boolean parsed = false; + _parsed = new Properties(); + if ((_argv.length == 1) && (_argv[0].equalsIgnoreCase("-help"))) + { + displayHelp(); + } + else + { + // Parse argv looking for options putting the results in results + for (int i = 0; i < _argv.length; i++) + { + String arg = _argv[i]; + if (arg.equals("-help")) + { + continue; + } + if (!arg.startsWith("-")) + { + System.err.println(_id + ": unexpected argument: " + arg); + } + else + { + if (_options.containsKey(arg)) + { + if (i == _argv.length - 1 || _argv[i + 1].startsWith("-")) + { + System.err.println(_id + ": missing value argument for: " + arg); + } + else + { + _parsed.put(arg, _argv[++i]); + } + } + else + { + System.err.println(_id + ": unrecognised option: " + arg); + } + } + } + + // Now add the default values if none have been specified in aggv + Enumeration optionsEnum = _options.keys(); + while (optionsEnum.hasMoreElements()) + { + String option = (String) optionsEnum.nextElement(); + + if (_parsed.get(option) == null) + { + String defaultValue = (String) _defaults.get(option); + + if (defaultValue != null) + { + _parsed.put(option, defaultValue); + } + } + } + parsed = true; + } + return parsed; + } + + /** + * Display all options with descriptions and default values (if specified). + * Output is sent to <code>System.out</code>. + */ + private void displayHelp() + { + System.out.println(_id + " available options:"); + Enumeration optionEnumeration = _options.keys(); + while (optionEnumeration.hasMoreElements()) + { + String option = (String) optionEnumeration.nextElement(); + String value = (String) _options.get(option); + String defaultValue = (String) _defaults.get(option); + if (defaultValue != null) + { + System.out.println("\t" + option + " <" + value + "> [" + defaultValue + "]"); + } + else + { + System.out.println("\t" + option + " <" + value + ">"); + } + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java new file mode 100644 index 0000000000..9ea6d715e1 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java @@ -0,0 +1,122 @@ +package org.apache.qpid.example.jmsexample.direct; + +import org.redhat.mrg.messaging.examples.BaseExample; + +import javax.jms.*; + +/** + * The example creates a MessageConsumer on the specified + * Queue which is used to synchronously consume messages. + */ +public class Consumer extends BaseExample +{ + /** + * Used in log output. + */ + private static final String CLASS = "Consumer"; + + /* The queue name */ + private String _queueName; + + /** + * Create a Consumer client. + * + * @param args Command line arguments. + */ + public Consumer(String[] args) + { + super(CLASS, args); + _queueName = _argProcessor.getStringArgument("-queueName"); + } + + /** + * Run the message consumer example. + * + * @param args Command line arguments. + */ + public static void main(String[] args) + { + _options.put("-queueName", "Queue name"); + _defaults.put("-queueName", "message_queue"); + Consumer syncConsumer = new Consumer(args); + syncConsumer.runTest(); + } + + /** + * Start the example. + */ + private void runTest() + { + try + { + // lookup the queue + Queue destination = (Queue) getInitialContext().lookup(_queueName); + + // Declare the connection + Connection connection = getConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a MessageConsumer + System.out.println(CLASS + ": Creating a MessageConsumer"); + MessageConsumer messageConsumer = session.createConsumer(destination); + + // Now the messageConsumer is set up we can start the connection + System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages"); + connection.start(); + + // Cycle round until all the messages are consumed. + Message message; + boolean end = false; + while (!end) + { + System.out.println(CLASS + ": Receiving a message"); + message = messageConsumer.receive(); + if (message instanceof TextMessage) + { + System.out.println(" - contents = " + ((TextMessage) message).getText()); + if (((TextMessage) message).getText().equals("That's all, folks!")) + { + System.out.println("Received final message for " + _queueName); + end = true; + } + } + else + { + System.out.println(" not text message"); + } + } + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + getInitialContext().close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java new file mode 100644 index 0000000000..f595cc0abc --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java @@ -0,0 +1,176 @@ +package org.apache.qpid.example.jmsexample.direct; + +import org.redhat.mrg.messaging.examples.BaseExample; + +import javax.jms.*; + +/** + * The example creates a MessageConsumer on the specified + * Queue and uses a MessageListener with this MessageConsumer + * in order to enable asynchronous delivery. + */ +public class Listener extends BaseExample implements MessageListener +{ + /* Used in log output. */ + private static final String CLASS = "Listener"; + + /* The queue name */ + private String _queueName; + + /** + * An object to synchronize on. + */ + private final static Object _lock = new Object(); + + /** + * A boolean to indicate a clean finish. + */ + private static boolean _finished = false; + + /** + * A boolean to indicate an unsuccesful finish. + */ + private static boolean _failed = false; + + /** + * Create an Listener client. + * + * @param args Command line arguments. + */ + public Listener(String[] args) + { + super(CLASS, args); + _queueName = _argProcessor.getStringArgument("-queueName"); + } + + /** + * Run the message consumer example. + * + * @param args Command line arguments. + */ + public static void main(String[] args) + { + _options.put("-queueName", "Queue name"); + _defaults.put("-queueName", "message_queue"); + Listener listener = new Listener(args); + listener.runTest(); + } + + /** + * Start the example. + */ + private void runTest() + { + try + { + // lookup the queue + Queue destination = (Queue) getInitialContext().lookup(_queueName); + + // Declare the connection + Connection connection = getConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses + // the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a MessageConsumer + System.out.println(CLASS + ": Creating a MessageConsumer"); + + MessageConsumer messageConsumer = session.createConsumer(destination); + + // Set a message listener on the messageConsumer + messageConsumer.setMessageListener(this); + + // Now the messageConsumer is set up we can start the connection + System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages"); + connection.start(); + + // Wait for the messageConsumer to have received all the messages it needs + synchronized (_lock) + { + while (!_finished && !_failed) + { + _lock.wait(); + } + } + + // If the MessageListener abruptly failed (probably due to receiving a non-text message) + if (_failed) + { + System.out.println(CLASS + ": This sample failed as it received unexpected messages"); + } + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + getInitialContext().close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } + + /** + * This method is required by the <CODE>MessageListener</CODE> interface. It + * will be invoked when messages are available. + * After receiving the finish message (That's all, folks!) it releases a lock so that the + * main program may continue. + * + * @param message The message. + */ + public void onMessage(Message message) + { + try + { + if (message instanceof TextMessage) + { + System.out.println(" - contents = " + ((TextMessage) message).getText()); + if (((TextMessage) message).getText().equals("That's all, folks!")) + { + System.out.println("Shutting down listener for " + _queueName); + synchronized (_lock) + { + _finished = true; + _lock.notifyAll(); + } + } + } + else + { + System.out.println(" [not text message]"); + } + } + catch (JMSException exp) + { + System.out.println(CLASS + ": Caught an exception handling a received message"); + exp.printStackTrace(); + synchronized (_lock) + { + _failed = true; + _lock.notifyAll(); + } + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java new file mode 100644 index 0000000000..2d80799bed --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java @@ -0,0 +1,94 @@ +package org.apache.qpid.example.jmsexample.direct; + +import org.redhat.mrg.messaging.examples.BaseExample; + +import javax.jms.*; + +/** + * Message producer example, sends message to a queue. + */ +public class Producer extends BaseExample +{ + /* Used in log output. */ + private static final String CLASS = "Producer"; + + /* The queue name */ + private String _queueName; + + /** + * Create a Producer client. + * @param args Command line arguments. + */ + public Producer (String[] args) + { + super(CLASS, args); + _queueName = _argProcessor.getStringArgument("-queueName"); + } + + /** + * Run the message producer example. + * @param args Command line arguments. + */ + public static void main(String[] args) + { + _options.put("-queueName", "Queue name"); + _defaults.put("-queueName", "message_queue"); + Producer producer = new Producer(args); + producer.runTest(); + } + + private void runTest() + { + try + { + // lookup the queue + Queue destination = (Queue) getInitialContext().lookup(_queueName); + + // Declare the connection + Connection connection = getConnection(); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a Message producer + System.out.println(CLASS + ": Creating a Message PRoducer"); + MessageProducer messageProducer = session.createProducer(destination); + + // Create a Message + TextMessage message; + System.out.println(CLASS + ": Creating a TestMessage to send to the destination"); + message = session.createTextMessage(); + + // Set a property for illustrative purposes + //message.setDoubleProperty("Amount", 10.1); + + // Loop to publish the requested number of messages. + for (int i = 1; i < getNumberMessages() + 1; i++) + { + // NOTE: We have NOT HAD TO START THE CONNECTION TO BEGIN SENDING messages, + // this is different to the consumer end as a CONSUMERS CONNECTIONS MUST BE STARTED BEFORE RECEIVING. + message.setText("Message " + i); + System.out.println(CLASS + ": Sending message: " + i); + messageProducer.send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + + // And send a final message to indicate termination. + message.setText("That's all, folks!"); + messageProducer.send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + // Close the connection to the broker + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + getInitialContext().close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java new file mode 100644 index 0000000000..6b37e2d47b --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java @@ -0,0 +1,214 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.example.jmsexample.pubsub; + +import org.redhat.mrg.messaging.examples.BaseExample; + +import javax.jms.*; + +/** + * The example creates a MessageConsumer on the specified + * Topic and uses a MessageListener with this MessageConsumer + * in order to enable asynchronous delivery. + */ +public class Listener extends BaseExample +{ + /* Used in log output. */ + private static final String CLASS = "Listener"; + + /* An object to synchronize on. */ + private final static Object _lock = new Object(); + + /* A boolean to indicate a clean finish. */ + private static int _finished = 0; + + /* A boolean to indicate an unsuccesful finish. */ + private static boolean _failed = false; + + /** + * Create an Listener client. + * + * @param args Command line arguments. + */ + public Listener(String[] args) + { + super(CLASS, args); + } + + /** + * Run the message consumer example. + * + * @param args Command line arguments. + */ + public static void main(String[] args) + { + Listener listener = new Listener(args); + listener.runTest(); + } + + /** + * Start the example. + */ + private void runTest() + { + try + { + // Declare the connection + TopicConnection connection = (TopicConnection) getConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses + // the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + // lookup the topics usa + Topic topic = (Topic) getInitialContext().lookup("usa"); + // Create a Message Subscriber + System.out.println(CLASS + ": Creating a Message Subscriber"); + TopicSubscriber messageSubscriber = session.createSubscriber(topic); + // Set a message listener on the messageConsumer + messageSubscriber.setMessageListener(new MyMessageListener("usa")); + + // lookup the topics world.usa.news + topic = (Topic) getInitialContext().lookup("europe"); + // Create a Message Subscriber + System.out.println(CLASS + ": Creating a Message Subscriber"); + messageSubscriber = session.createSubscriber(topic); + // Set a message listener on the messageConsumer + messageSubscriber.setMessageListener(new MyMessageListener("europe")); + + // lookup the topics world.europw + topic = (Topic) getInitialContext().lookup("news"); + // Create a Message Subscriber + System.out.println(CLASS + ": Creating a Message Subscriber"); + messageSubscriber = session.createSubscriber(topic); + // Set a message listener on the messageConsumer + messageSubscriber.setMessageListener(new MyMessageListener("news")); + + // lookup the topics world.europw + topic = (Topic) getInitialContext().lookup("weather"); + // Create a Message Subscriber + System.out.println(CLASS + ": Creating a Message Subscriber"); + messageSubscriber = session.createSubscriber(topic); + // Set a message listener on the messageConsumer + messageSubscriber.setMessageListener(new MyMessageListener("weather")); + + // Now the messageConsumer is set up we can start the connection + System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages"); + connection.start(); + + // Wait for the messageConsumer to have received all the messages it needs + synchronized (_lock) + { + while (_finished < 3 && !_failed) + { + _lock.wait(); + } + } + + // If the MessageListener abruptly failed (probably due to receiving a non-text message) + if (_failed) + { + System.out.println(CLASS + ": This sample failed as it received unexpected messages"); + } + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + getInitialContext().close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } + + private class MyMessageListener implements MessageListener + { + /* The number of messages processed. */ + private int _messageCount = 0; + + /* The topic this subscriber is subscribing to */ + private String _topicName; + + public MyMessageListener(String topicName) + { + _topicName = topicName; + } + + /** + * This method is required by the <CODE>MessageListener</CODE> interface. It + * will be invoked when messages are available. + * After receiving the final message it releases a lock so that the + * main program may continue. + * + * @param message The message. + */ + public void onMessage(Message message) + { + try + { + // Increment the number of messages that have been received + _messageCount = _messageCount + 1; + // Print out the details of the just received message + System.out + .print(_topicName + ": message received: " + _messageCount + " " + message.getJMSMessageID()); + System.out.println(" - contents = " + ((TextMessage) message).getText()); + // If this is the total number of messages required + if (((TextMessage) message).getText().equals("That's all, folks!")) + { + System.out.println("Shutting down listener for " + _topicName); + synchronized (_lock) + { + _finished++; + _lock.notifyAll(); + } + } + } + catch (JMSException exp) + { + System.out.println(CLASS + ": Caught an exception handling a received message"); + exp.printStackTrace(); + synchronized (_lock) + { + _failed = true; + _lock.notifyAll(); + } + } + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java new file mode 100644 index 0000000000..8594993e86 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java @@ -0,0 +1,148 @@ +package org.apache.qpid.example.jmsexample.pubsub; + +import org.redhat.mrg.messaging.examples.BaseExample; + +import javax.jms.*; + +/** + * Publish messages to topics + * <p/> + * <p>Run with <code>-help</code> argument for a description of command line arguments. + */ +public class Publisher extends BaseExample +{ + /* Used in log output. */ + private static final String CLASS = "Publisher"; + + /** + * Create a Publisher client. + * + * @param args Command line arguments. + */ + public Publisher(String[] args) + { + super(CLASS, args); + } + + /** + * Run the message publisher example. + * + * @param args Command line arguments. + */ + public static void main(String[] args) + { + Publisher publisher = new Publisher(args); + publisher.runTest(); + } + + private void runTest() + { + try + { + // Declare the connection + TopicConnection connection = (TopicConnection) getConnection(); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a Message + TextMessage message; + System.out.println(CLASS + ": Creating a TestMessage to send to the topics"); + message = session.createTextMessage(); + + // lookup the topics .usa.weather + Topic topic = (Topic) getInitialContext().lookup("usa.weather"); + message.setStringProperty("topicName", "usa.weather"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisherr"); + TopicPublisher messagePublisher = session.createPublisher(topic); + publishMessages(message, messagePublisher); + + // lookup the topics usa.news + topic = (Topic) getInitialContext().lookup("usa.news"); + message.setStringProperty("topicName", "usa.news"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisherr"); + messagePublisher = session.createPublisher(topic); + publishMessages(message, messagePublisher); + + // lookup the topics europe.weather + topic = (Topic) getInitialContext().lookup("europe.weather"); + message.setStringProperty("topicName", "europe.weather"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisherr"); + messagePublisher = session.createPublisher(topic); + publishMessages(message, messagePublisher); + + // lookup the topics europe.news + topic = (Topic) getInitialContext().lookup("europe.news"); + message.setStringProperty("topicName", "europe.news"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisherr"); + messagePublisher = session.createPublisher(topic); + publishMessages(message, messagePublisher); + + // send the final message + message.setText("That's all, folks!"); + topic = (Topic) getInitialContext().lookup("news"); + message.setStringProperty("topicName", "news"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisherr"); + messagePublisher = session.createPublisher(topic); + messagePublisher + .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + topic = (Topic) getInitialContext().lookup("weather"); + message.setStringProperty("topicName", "weather"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisherr"); + messagePublisher = session.createPublisher(topic); + messagePublisher + .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + topic = (Topic) getInitialContext().lookup("europe"); + message.setStringProperty("topicName", "europe"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisherr"); + messagePublisher = session.createPublisher(topic); + messagePublisher + .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + topic = (Topic) getInitialContext().lookup("usa"); + message.setStringProperty("topicName", "usa"); + // Create a Message Publisher + System.out.println(CLASS + ": Creating a Message Publisherr"); + messagePublisher = session.createPublisher(topic); + messagePublisher + .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + // Close the connection to the broker + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + getInitialContext().close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } + + private void publishMessages(TextMessage message, TopicPublisher messagePublisher) throws JMSException + { + // Loop to publish the requested number of messages. + for (int i = 1; i < getNumberMessages() + 1; i++) + { + // NOTE: We have NOT HAD TO START THE CONNECTION TO BEGIN SENDING messages, + // this is different to the consumer end as a CONSUMERS CONNECTIONS MUST BE STARTED BEFORE RECEIVING. + message.setText("Message " + i); + System.out.println(CLASS + ": Sending message: " + i); + messagePublisher + .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/MessageMirror.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/MessageMirror.java new file mode 100644 index 0000000000..7b2bf4c780 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/MessageMirror.java @@ -0,0 +1,166 @@ +package org.apache.qpid.example.jmsexample.requestResponse; + +import org.redhat.mrg.messaging.examples.BaseExample; + +import javax.jms.*; + +/** + * The example creates a MessageConsumer on the specified + * Destination which is used to synchronously consume messages. If a + * received message has a ReplyTo header then a new response message is sent + * to that specified destination. + * + */ +public class MessageMirror extends BaseExample +{ + /* Used in log output. */ + private static final String CLASS = "MessageMirror"; + + /* The destination type */ + private String _destinationType; + + /* The destination Name */ + private String _destinationName; + + /** + * Create a MessageMirror client. + * @param args Command line arguments. + */ + public MessageMirror(String[] args) + { + super(CLASS, args); + _destinationType = _argProcessor.getStringArgument("-destinationType"); + _destinationName = _argProcessor.getStringArgument("-destinationName"); + } + + /** + * Run the message mirror example. + * @param args Command line arguments. + */ + public static void main(String[] args) + { + _options.put("-destinationType", "Destination Type: queue/topic"); + _defaults.put("-destinationType", "queue"); + _options.put("-destinationName", "Destination Name"); + _defaults.put("-destinationName", "message_queue"); + MessageMirror messageMirror = new MessageMirror(args); + messageMirror.runTest(); + } + + /** + * Start the example. + */ + private void runTest() + { + try + { + Destination destination; + + if (_destinationType.equals("queue")) + { + // Lookup the queue + System.out.println(CLASS + ": Looking up queue with name: " + _destinationName); + destination = (Queue) getInitialContext().lookup(_destinationName); + } + else + { + // Lookup the topic + System.out.println(CLASS + ": Looking up topic with name: " + _destinationName); + destination = (Topic) getInitialContext().lookup(_destinationName); + } + + // Declare the connection + Connection connection = getConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println(CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection + // This session is a default choice of non-transacted and uses + // the auto acknowledge feature of a session. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a MessageConsumer + System.out.println(CLASS + ": Creating a MessageConsumer"); + MessageConsumer messageConsumer = session.createConsumer(destination); + + /** + * Create a MessageProducer - note that although we create the + */ + System.out.println(CLASS + ": Creating a MessageProducer"); + MessageProducer messageProducer; + + // Now the messageConsumer is set up we can start the connection + System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages"); + connection.start(); + + // Cycle round until all the messages are consumed. + Message requestMessage; + TextMessage responseMessage; + boolean end = false; + while (!end) + { + System.out.println(CLASS + ": Receiving the message"); + + requestMessage = messageConsumer.receive(); + + // Print out the details of the just received message + System.out.println(CLASS + ": Message received:"); + System.out.println("\tID=" + requestMessage.getJMSMessageID()); + System.out.println("\tCorrelationID=" + requestMessage.getJMSCorrelationID()); + + if (requestMessage instanceof TextMessage) + { + if (((TextMessage) requestMessage).getText().equals("That's all, folks!")) + { + System.out.println("Received final message for " + destination); + end = true; + } + System.out.println("\tContents = " + ((TextMessage)requestMessage).getText()); + } + + // Now bounce the message if a ReplyTo header was set. + if (requestMessage.getJMSReplyTo() != null) + { + System.out.println("Activating response queue listener for: " + destination); + responseMessage = session.createTextMessage("Activating response queue listener for: " + destination); + String correlationID = requestMessage.getJMSCorrelationID(); + if (correlationID != null) + { + responseMessage.setJMSCorrelationID(correlationID); + } + messageProducer = session.createProducer(requestMessage.getJMSReplyTo()) ; + messageProducer.send(responseMessage); + } + System.out.println(); + } + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + getInitialContext().close(); + } + catch (Exception exp) + { + exp.printStackTrace(); + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/P2PRequestor.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/P2PRequestor.java new file mode 100644 index 0000000000..6331491d2e --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/P2PRequestor.java @@ -0,0 +1,147 @@ +package org.apache.qpid.example.jmsexample.requestResponse; + +import org.redhat.mrg.messaging.examples.BaseExample; + +import javax.jms.*; + +/** + * This example illustrates the use of the JMS utility class <code>QueueRequestor</code> + * which provides a synchronous RPC-like abstraction using temporary destinations + * to deliver responses back to the client. + * + * <p>Run with <code>-help</code> argument for a description of command line arguments. + * + */ +public class P2PRequestor extends BaseExample +{ + /* Used in log output. */ + private static final String CLASS = "P2PRequestor"; + + /* The queue name */ + private String _queueName; + + /** + * Create a P2PRequestor client. + * @param args Command line arguments. + */ + public P2PRequestor(String[] args) + { + super(CLASS, args); + _queueName = _argProcessor.getStringArgument("-queueName"); + } + + /** + * Run the message requestor example. + * @param args Command line arguments. + */ + public static void main(String[] args) + { + _options.put("-queueName", "The queue name"); + _defaults.put("-queueName", "message_queue"); + P2PRequestor requestor = new P2PRequestor(args); + requestor.runTest(); + } + + /** + * Start the example. + */ + private void runTest() + { + try + { + // Declare the connection + QueueConnection connection = (QueueConnection) getConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println(CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + + // Lookup the destination + System.out.println(CLASS + ": Looking up queue with name: " + _queueName); + Queue destination = (Queue) getInitialContext().lookup(_queueName); + + // Create a QueueRequestor + System.out.println(CLASS + ": Creating a QueueRequestor"); + + QueueRequestor requestor = new QueueRequestor(session, destination); + + // Now start the connection + System.out.println(CLASS + ": Starting connection"); + connection.start(); + + // Create a message to send as a request for service + TextMessage request; + + request = session.createTextMessage("\"Twas brillig, and the slithy toves\",\n" + + "\t\t\"Did gire and gymble in the wabe.\",\n" + + "\t\t\"All mimsy were the borogroves,\",\n" + + "\t\t\"And the mome raths outgrabe.\""); + + // Declare a message to be used for receiving any response + Message response; + + // Get the number of times that this sample should request service + for (int i = 0; i < getNumberMessages(); i++) + { + /** + * Set a message correlation value. This is not strictly required it is + * just an example of how messages requests can be tied together. + */ + request.setJMSCorrelationID("Message " + i); + System.out.println(CLASS + ": Sending request " + i); + + response = requestor.request(request); + + // Print out the details of the message sent + System.out.println(CLASS + ": Message details of request"); + System.out.println("\tID = " + request.getJMSMessageID()); + System.out.println("\tCorrelationID = " + request.getJMSCorrelationID()); + System.out.println("\tContents= " + ((TextMessage)request).getText()); + + // Print out the details of the response received + System.out.println(CLASS + ": Message details of response"); + System.out.println("\tID = " + response.getJMSMessageID()); + System.out.println("\tCorrelationID = " + response.getJMSCorrelationID()); + if (response instanceof TextMessage) + { + System.out.println("\tContents= " + ((TextMessage) response).getText()); + } + + System.out.println(); + } + + //send the final message for ending the mirror + // And send a final message to indicate termination. + request.setText("That's all, folks!"); + MessageProducer messageProducer = session.createProducer(destination); + messageProducer.send(request, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + getInitialContext().close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/PubSubRequestor.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/PubSubRequestor.java new file mode 100644 index 0000000000..cb91500aed --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/PubSubRequestor.java @@ -0,0 +1,139 @@ +package org.apache.qpid.example.jmsexample.requestResponse; + +import org.redhat.mrg.messaging.examples.BaseExample; + +import javax.jms.*; + +/** + * This example illustrates the use of the JMS utility class <code>TopicRequestor</code> + * which provides a synchronous RPC-like abstraction using temporary destinations + * to deliver responses back to the client. + */ +public class PubSubRequestor extends BaseExample +{ + /* Used in log output. */ + private static final String CLASS = "PubSubRequestor"; + + /* The topic name */ + private String _topicName; + + /** + * Create a PubSubRequestor client. + * + * @param args Command line arguments. + */ + public PubSubRequestor(String[] args) + { + super(CLASS, args); + _topicName = _argProcessor.getStringArgument("-topicName"); + } + + /** + * Run the message requestor example. + * + * @param args Command line arguments. + */ + public static void main(String[] args) + { + _options.put("-topicName", "The topic name"); + _defaults.put("-topicName", "world"); + PubSubRequestor requestor = new PubSubRequestor(args); + requestor.runTest(); + } + + /** + * Start the example. + */ + private void runTest() + { + try + { + // Declare the connection + TopicConnection connection = (TopicConnection) getConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.exit(0); + } + }); + + // Create a session on the connection. + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + + // Lookup the destination + System.out.println(CLASS + ": Looking up topic with name: " + _topicName); + Topic destination = (Topic) getInitialContext().lookup(_topicName); + + // Create a TopicRequestor + System.out.println(CLASS + ": Creating a TopicRequestor"); + TopicRequestor requestor = new TopicRequestor(session, destination); + + // Now start the connection + System.out.println(CLASS + ": Starting connection"); + connection.start(); + + // Create a message to send as a request for service + TextMessage request; + request = session.createTextMessage( + "\"Twas brillig, and the slithy toves\",\n" + "\t\t\"Did gire and gymble in the wabe.\",\n" + "\t\t\"All mimsy were the borogroves,\",\n" + "\t\t\"And the mome raths outgrabe.\""); + + // Declare a message to be used for receiving any response + Message response; + + // Get the number of times that this sample should request service + for (int i = 0; i < getNumberMessages(); i++) + { + /** + * Set a message correlation value. This is not strictly required it is + * just an example of how messages requests can be tied together. + */ + request.setJMSCorrelationID("Message " + i); + System.out.println(CLASS + ": Sending request " + i); + + response = requestor.request(request); + + // Print out the details of the message sent + System.out.println(CLASS + ": Message details of request"); + System.out.println("\tID = " + request.getJMSMessageID()); + System.out.println("\tCorrelationID = " + request.getJMSCorrelationID()); + System.out.println("\tContents= " + ((TextMessage) request).getText()); + + // Print out the details of the response received + System.out.println(CLASS + ": Message details of response"); + System.out.println("\tID = " + response.getJMSMessageID()); + System.out.println("\tCorrelationID = " + response.getJMSCorrelationID()); + if (response instanceof TextMessage) + { + System.out.println("\tContents= " + ((TextMessage) response).getText()); + } + } + // And send a final message to indicate termination. + request.setText("That's all, folks!"); + MessageProducer messageProducer = session.createProducer(destination); + messageProducer.send(request, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + // Close the connection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + getInitialContext().close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java new file mode 100644 index 0000000000..b7df3db345 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java @@ -0,0 +1,244 @@ +package org.apache.qpid.example.jmsexample.transacted; + +import org.redhat.mrg.messaging.examples.BaseExample; + +import javax.jms.*; + +/** + * Transactional message example sends a number of messages to a Queue + * and then uses a transacted session to move them from the Queue to a Topic. + * <p/> + * <p>The program completes the following steps: + * <ul> + * <li>Publish the specified number of messages to the queue.</li> + * <li>Within a transacted session consume all messages from the queue + * and publish them to the topic.</li> + * <li>By default commit the transacted session, unless the "<code>-rollback true</code>" + * option is specified in which case roll it back.</li> + * <li>Check for outstanding messages on the queue.</li> + * <li>Check for outstanding messages on the topic.</li> + * </ul> + * <p/> + */ +public class QueueToTopic extends BaseExample +{ + /* Used in log output. */ + private static final String CLASS = "QueueToTopic"; + + /* The queue name */ + private String _queueName; + + /* The topic name */ + private String _topicName; + + /* Specify if the transaction is committed */ + private boolean _commit; + + /** + * Create a QueueToTopic client. + * + * @param args Command line arguments. + */ + public QueueToTopic(String[] args) + { + super(CLASS, args); + _queueName = _argProcessor.getStringArgument("-queueName"); + _topicName = _argProcessor.getStringArgument("-topicName"); + _commit = _argProcessor.getBooleanArgument("-commit"); + } + + /** + * Run the message mover example. + * + * @param args Command line arguments. + * @see BaseExample + */ + public static void main(String[] args) + { + _options.put("-topicName", "The topic name"); + _defaults.put("-topicName", "world"); + _options.put("-queueName", "The queue name"); + _defaults.put("-queueName", "message_queue"); + _options.put("-commit", "Commit or rollback the transaction (true|false)"); + _defaults.put("-commit", "true"); + QueueToTopic mover = new QueueToTopic(args); + mover.runTest(); + } + + private void runTest() + { + try + { + + // Lookup the queue + System.out.println(CLASS + ": Looking up queue with name: " + _queueName); + Queue queue = (Queue) getInitialContext().lookup(_queueName); + + // Lookup the topic + System.out.println(CLASS + ": Looking up topic with name: " + _topicName); + Topic topic = (Topic) getInitialContext().lookup(_topicName); + + // Declare the connection + Connection connection = getConnection(); + + // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection + // so that errors raised within the JMS client library can be reported to the application + System.out.println( + CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer"); + + connection.setExceptionListener(new ExceptionListener() + { + public void onException(JMSException jmse) + { + // The connection may have broken invoke reconnect code if available. + System.err.println(CLASS + ": The sample received an exception through the ExceptionListener"); + System.err.println( + CLASS + ": If this was a real application it should now go through reconnect code"); + System.err.println(); + System.err.println("Exception: " + jmse); + System.err.println(); + System.err.println("Now exiting."); + System.exit(0); + } + }); + + // Start the connection + connection.start(); + + /** + * Create nonTransactedSession. This non-transacted auto-ack session is used to create the MessageProducer + * that is used to populate the queue and the MessageConsumer that is used to consume the messages + * from the topic. + */ + System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session"); + Session nonTransactedSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Make sure that the queue is empty + System.out.print(CLASS + ": Purging messages from queue..."); + MessageConsumer queueMessageConsumer = nonTransactedSession.createConsumer(queue); + Message purgedMessage; + int numberPurged = -1; + do + { + purgedMessage = queueMessageConsumer.receiveNoWait(); + numberPurged++; + } + while (purgedMessage != null); + System.out.println(numberPurged + " message(s) purged."); + + // Create the MessageProducer for the queue + System.out.println(CLASS + ": Creating a MessageProducer for the queue"); + MessageProducer messageProducer = nonTransactedSession.createProducer(queue); + + // Now create the MessageConsumer for the topic + System.out.println(CLASS + ": Creating a MessageConsumer for the topic"); + MessageConsumer topicMessageConsumer = nonTransactedSession.createConsumer(topic); + + // Create a textMessage. We're using a TextMessage for this example. + System.out.println(CLASS + ": Creating a TestMessage to send to the destination"); + TextMessage textMessage = nonTransactedSession.createTextMessage("Sample text message"); + + // Loop to publish the requested number of messages to the queue. + for (int i = 1; i < getNumberMessages() + 1; i++) + { + messageProducer + .send(textMessage, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + // Print out details of textMessage just sent + System.out.println(CLASS + ": Message sent: " + i + " " + textMessage.getJMSMessageID()); + } + + // Create a new transacted Session to move the messages from the queue to the topic + Session transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED); + + // Create a new message consumer from the queue + MessageConsumer transactedConsumer = transactedSession.createConsumer(queue); + + // Create a new message producer for the topic + MessageProducer transactedProducer = transactedSession.createProducer(topic); + + // Loop to consume the messages from the queue and publish them to the topic + Message receivedMessage; + for (int i = 1; i < getNumberMessages() + 1; i++) + { + // Receive a message + receivedMessage = transactedConsumer.receive(); + System.out.println(CLASS + ": Moving message: " + i + " " + receivedMessage.getJMSMessageID()); + // Publish it to the topic + transactedProducer.send(receivedMessage); + } + + // Either commit or rollback the transacted session based on the command line args. + if (_commit) + { + System.out.println(CLASS + ": Committing transacted session."); + transactedSession.commit(); + } + else + { + System.out.println(CLASS + ": Rolling back transacted session."); + transactedSession.rollback(); + } + + // Now consume any outstanding messages on the queue + System.out.print(CLASS + ": Mopping up messages from queue"); + if (_commit) + { + System.out.print(" (expecting none)..."); + } + else + { + System.out.print(" (expecting " + getNumberMessages() + ")..."); + } + + Message moppedMessage; + int numberMopped = 0; + do + { + moppedMessage = queueMessageConsumer.receiveNoWait(); + if( moppedMessage != null) + { + numberMopped++; + } + } + while (moppedMessage != null); + System.out.println(numberMopped + " message(s) mopped."); + + // Now consume any outstanding messages for the topic subscriber + System.out.print(CLASS + ": Mopping up messages from topic"); + + if (_commit) + { + System.out.print(" (expecting " + getNumberMessages() + ")..."); + } + else + { + System.out.print(" (expecting none)..."); + } + + numberMopped = 0; + do + { + moppedMessage = topicMessageConsumer.receiveNoWait(); + if( moppedMessage != null) + { + numberMopped++; + } + } + while (moppedMessage != null); + System.out.println(numberMopped + " message(s) mopped."); + + // Close the QueueConnection to the server + System.out.println(CLASS + ": Closing connection"); + connection.close(); + + // Close the JNDI reference + System.out.println(CLASS + ": Closing JNDI context"); + getInitialContext().close(); + } + catch (Exception exp) + { + System.err.println(CLASS + ": Caught an Exception: " + exp); + } + } +} |
