summaryrefslogtreecommitdiff
path: root/java/client/example
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-11-20 10:59:04 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-11-20 10:59:04 +0000
commit19861e2b6110e60132d8b19635cd9b470cd9d2b9 (patch)
tree1e140dd0cbb09f012e059fb627b65b05d95d84dd /java/client/example
parent73eee018d301031a212fe3c8a8127b84c2b580ac (diff)
downloadqpid-python-19861e2b6110e60132d8b19635cd9b470cd9d2b9.tar.gz
Added new JMS examples
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@596610 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/example')
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/BaseExample.java176
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/ArgProcessor.java201
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java122
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java176
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java94
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java214
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java148
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/MessageMirror.java166
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/P2PRequestor.java147
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/PubSubRequestor.java139
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java244
11 files changed, 1827 insertions, 0 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/BaseExample.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/BaseExample.java
new file mode 100644
index 0000000000..dac116f91d
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/BaseExample.java
@@ -0,0 +1,176 @@
+package org.apache.qpid.example.jmsexample;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import java.util.Hashtable;
+import java.util.Properties;
+
+import org.apache.qpid.example.jmsexample.common.ArgProcessor;
+
+/**
+ * Abstract base class for providing common argument parsing features.
+ * <p/>
+ * <p>Classes that extend BaseExample support the following command-line arguments:</p>
+ * <table>
+ * <tr><td>-factoryName</td> <td>ConnectionFactory name</td></tr>
+ * <tr><td>-delMode</td> <td>Delivery mode [persistent | non-persistent]</td></tr>
+ * <tr><td>-numMessages</td> <td>Number of messages to process</td></tr>
+ * </table>
+ */
+
+abstract public class BaseExample
+{
+ /* The AMQP INITIAL_CONTEXT_FACTORY */
+ private static final String INITIAL_CONTEXT_FACTORY_NAME =
+ "org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+
+ /* Default connection factory name. */
+ private static final String DEFAULT_CONNECTION_FACTORY_NAME = "ConnectionFactory";
+
+ /* Default number of messages to process. */
+ private static final int DEFAULT_NUMBER_MESSAGES = 1;
+
+ /* JNDI provider URL. */
+ private String _providerURL;
+
+ /* Number of messages to process. */
+ private int _numberMessages;
+
+ /* The delivery Mode */
+ private int _deliveryMode;
+
+ /* The argument processor */
+ protected ArgProcessor _argProcessor;
+
+ /* The supported properties */
+ protected static Properties _options = new Properties();
+
+ /* The properties default values */
+ protected static Properties _defaults = new Properties();
+
+ /* The broker communication objects */
+ private InitialContext _initialContext;
+ private ConnectionFactory _connectionFactory;
+
+ /**
+ * Protected constructor to create a example client.
+ *
+ * @param Id Identity string used in log messages, for example, the name of the example program.
+ * @param args String array of arguments.
+ */
+ protected BaseExample(String Id, String[] args)
+ {
+ _options.put("-factoryName", "ConnectionFactory name");
+ _defaults.put("-factoryName", DEFAULT_CONNECTION_FACTORY_NAME);
+ _options.put("-providerURL", "JNDI Provider URL");
+ _options.put("-deliveryMode", "Delivery mode [persistent | non-persistent]");
+ _defaults.put("-deliveryMode", "non-persistent");
+ _options.put("-numMessages", "Number of messages to process");
+ _defaults.put("-numMessages", String.valueOf(DEFAULT_NUMBER_MESSAGES));
+
+ _argProcessor = new ArgProcessor(Id, args, _options, _defaults);
+ _argProcessor.display();
+ //Set the initial context factory
+ _providerURL = _argProcessor.getStringArgument("-providerURL");
+ // Set the number of messages
+ _numberMessages = _argProcessor.getIntegerArgument("-numMessages");
+ // Set the delivery mode
+ _deliveryMode = _argProcessor.getStringArgument("-deliveryMode")
+ .equals("persistent") ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+ }
+
+ /**
+ * Get the DeliveryMode to use when publishing messages.
+ *
+ * @return The delivery mode, either javax.jms.DeliveryMode.NON_PERSISTENT
+ * or javax.jms.DeliveryMode.PERSISTENT.
+ */
+ protected int getDeliveryMode()
+ {
+ return _deliveryMode;
+ }
+
+ /**
+ * Get the number of messages to be used.
+ *
+ * @return the number of messages to be used.
+ */
+ protected int getNumberMessages()
+ {
+ return _numberMessages;
+ }
+
+
+ /**
+ * Get the JNDI provider URL.
+ *
+ * @return the JNDI provider URL.
+ */
+ private String getProviderURL()
+ {
+ return _providerURL;
+ }
+
+ /**
+ * we assume that the environment is correctly set
+ * i.e. -Djava.naming.provider.url="..//example.properties"
+ *
+ * @return an initial context
+ * @throws Exception if there is an error getting the context
+ */
+ public InitialContext getInitialContext() throws Exception
+ {
+ if (_initialContext == null)
+ {
+ Hashtable<String, String> jndiEnvironment = new Hashtable<String, String>();
+ jndiEnvironment.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY_NAME);
+ if (getProviderURL() != null)
+ {
+ jndiEnvironment.put(Context.PROVIDER_URL, getProviderURL());
+ }
+ else
+ {
+ jndiEnvironment.put("connectionfactory.ConnectionFactory",
+ "qpid:password=guest;username=guest;client_id=clientid;virtualhost=test@tcp:127.0.0.1:5672");
+ jndiEnvironment.put("queue.message_queue", "message_queue");
+ jndiEnvironment.put("topic.usa.news", "usa.news");
+ jndiEnvironment.put("topic.usa.weather", "usa.weather");
+ jndiEnvironment.put("topic.usa", "usa.#");
+ jndiEnvironment.put("topic.europe.weather", "europe.weather");
+ jndiEnvironment.put("topic.europe.news", "europe.news");
+ jndiEnvironment.put("topic.europe", "europe.#");
+ jndiEnvironment.put("topic.news", "#.news");
+ jndiEnvironment.put("topic.weather", "#.weather");
+ }
+ _initialContext = new InitialContext(jndiEnvironment);
+ }
+ return _initialContext;
+ }
+
+ /**
+ * Get a connection factory for the currently used broker
+ *
+ * @return A conection factory
+ * @throws Exception if there is an error getting the tactory
+ */
+ public ConnectionFactory getConnectionFactory() throws Exception
+ {
+ if (_connectionFactory == null)
+ {
+ _connectionFactory = (ConnectionFactory) getInitialContext().lookup(DEFAULT_CONNECTION_FACTORY_NAME);
+ }
+ return _connectionFactory;
+ }
+
+ /**
+ * Get a connection (remote or in-VM)
+ *
+ * @return a newly created connection
+ * @throws Exception if there is an error getting the connection
+ */
+ public Connection getConnection() throws Exception
+ {
+ return getConnectionFactory().createConnection("guest", "guest");
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/ArgProcessor.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/ArgProcessor.java
new file mode 100644
index 0000000000..d551685230
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/common/ArgProcessor.java
@@ -0,0 +1,201 @@
+package org.apache.qpid.example.jmsexample.common;
+
+import java.util.Enumeration;
+import java.util.Properties;
+
+/**
+ * Command-line argument processor.
+ */
+public class ArgProcessor
+{
+ /** Textual representation of program using parser. */
+ private String _id;
+
+ /** Command line arguments to parse. */
+ private String[] _argv;
+
+ /** Properties table mapping argument name to description */
+ private Properties _options;
+
+ /** Properties table mapping argument name to default value */
+ private Properties _defaults;
+
+ /** Properties table containing parsed arguments */
+ private Properties _parsed;
+
+ /**
+ * Create an argument parser and parse the supplied arguments. If the arguments
+ * cannot be parsed (or the argument <code>-help</code> is supplied) then
+ * exit
+ *
+ * @param id textual representation of program identity using parser.
+ * @param argv the argument array.
+ * @param options list of allowable options stored in the keys.
+ * @param defaults list of option default values keyed on option name.
+ */
+ public ArgProcessor(String id, String[] argv, Properties options, Properties defaults)
+ {
+ _id = id;
+ _argv = argv;
+ _options = options;
+ _defaults = defaults;
+ // Try to parse. If we can't then exit
+ if (!parse())
+ {
+ System.exit(0);
+ }
+ }
+
+ /**
+ * Get the processed arguments.
+ * @return Properties table mapping argument name to current value, eg, ["-foo", "MyFoo"].
+ */
+ public Properties getProcessedArgs()
+ {
+ return _parsed;
+ }
+
+ /**
+ * Display the current property settings on the supplied stream.
+ * Output sent to <code>System.out</code>.
+ */
+ public void display()
+ {
+ System.out.println(_id + " current settings:");
+ Enumeration optionEnumeration = _options.keys();
+ while (optionEnumeration.hasMoreElements())
+ {
+ String option = (String) optionEnumeration.nextElement();
+ String description = (String) _options.get(option);
+ String currentValue = (String) _parsed.get(option);
+ if (currentValue != null)
+ {
+ System.out.println("\t" + description + " = " + currentValue);
+ }
+ }
+ System.out.println();
+ }
+
+ /**
+ * Get the value of the specified option as a String.
+ * @param option the option to query.
+ * @return the value of the option.
+ */
+ public String getStringArgument(String option)
+ {
+ return _parsed.getProperty(option);
+ }
+
+ /**
+ * Get the value of the specified option as an integer.
+ * @param option the option to query.
+ * @return the value of the option.
+ */
+ public int getIntegerArgument(String option)
+ {
+ String value = _parsed.getProperty(option);
+ return Integer.parseInt(value);
+ }
+
+ /**
+ * Get the value of the specified option as a boolean.
+ * @param option the option to query.
+ * @return the value of the option.
+ */
+ public boolean getBooleanArgument(String option)
+ {
+ String value = _parsed.getProperty(option);
+ return Boolean.valueOf(value);
+ }
+
+ /**
+ * Parse the arguments.
+ * @return true if parsed.
+ */
+ private boolean parse()
+ {
+ boolean parsed = false;
+ _parsed = new Properties();
+ if ((_argv.length == 1) && (_argv[0].equalsIgnoreCase("-help")))
+ {
+ displayHelp();
+ }
+ else
+ {
+ // Parse argv looking for options putting the results in results
+ for (int i = 0; i < _argv.length; i++)
+ {
+ String arg = _argv[i];
+ if (arg.equals("-help"))
+ {
+ continue;
+ }
+ if (!arg.startsWith("-"))
+ {
+ System.err.println(_id + ": unexpected argument: " + arg);
+ }
+ else
+ {
+ if (_options.containsKey(arg))
+ {
+ if (i == _argv.length - 1 || _argv[i + 1].startsWith("-"))
+ {
+ System.err.println(_id + ": missing value argument for: " + arg);
+ }
+ else
+ {
+ _parsed.put(arg, _argv[++i]);
+ }
+ }
+ else
+ {
+ System.err.println(_id + ": unrecognised option: " + arg);
+ }
+ }
+ }
+
+ // Now add the default values if none have been specified in aggv
+ Enumeration optionsEnum = _options.keys();
+ while (optionsEnum.hasMoreElements())
+ {
+ String option = (String) optionsEnum.nextElement();
+
+ if (_parsed.get(option) == null)
+ {
+ String defaultValue = (String) _defaults.get(option);
+
+ if (defaultValue != null)
+ {
+ _parsed.put(option, defaultValue);
+ }
+ }
+ }
+ parsed = true;
+ }
+ return parsed;
+ }
+
+ /**
+ * Display all options with descriptions and default values (if specified).
+ * Output is sent to <code>System.out</code>.
+ */
+ private void displayHelp()
+ {
+ System.out.println(_id + " available options:");
+ Enumeration optionEnumeration = _options.keys();
+ while (optionEnumeration.hasMoreElements())
+ {
+ String option = (String) optionEnumeration.nextElement();
+ String value = (String) _options.get(option);
+ String defaultValue = (String) _defaults.get(option);
+ if (defaultValue != null)
+ {
+ System.out.println("\t" + option + " <" + value + "> [" + defaultValue + "]");
+ }
+ else
+ {
+ System.out.println("\t" + option + " <" + value + ">");
+ }
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java
new file mode 100644
index 0000000000..9ea6d715e1
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Consumer.java
@@ -0,0 +1,122 @@
+package org.apache.qpid.example.jmsexample.direct;
+
+import org.redhat.mrg.messaging.examples.BaseExample;
+
+import javax.jms.*;
+
+/**
+ * The example creates a MessageConsumer on the specified
+ * Queue which is used to synchronously consume messages.
+ */
+public class Consumer extends BaseExample
+{
+ /**
+ * Used in log output.
+ */
+ private static final String CLASS = "Consumer";
+
+ /* The queue name */
+ private String _queueName;
+
+ /**
+ * Create a Consumer client.
+ *
+ * @param args Command line arguments.
+ */
+ public Consumer(String[] args)
+ {
+ super(CLASS, args);
+ _queueName = _argProcessor.getStringArgument("-queueName");
+ }
+
+ /**
+ * Run the message consumer example.
+ *
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ _options.put("-queueName", "Queue name");
+ _defaults.put("-queueName", "message_queue");
+ Consumer syncConsumer = new Consumer(args);
+ syncConsumer.runTest();
+ }
+
+ /**
+ * Start the example.
+ */
+ private void runTest()
+ {
+ try
+ {
+ // lookup the queue
+ Queue destination = (Queue) getInitialContext().lookup(_queueName);
+
+ // Declare the connection
+ Connection connection = getConnection();
+
+ // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection
+ // so that errors raised within the JMS client library can be reported to the application
+ System.out.println(
+ CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer");
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if available.
+ // The connection may have broken invoke reconnect code if available.
+ System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
+ System.exit(0);
+ }
+ });
+
+ // Create a session on the connection
+ // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session.
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a MessageConsumer
+ System.out.println(CLASS + ": Creating a MessageConsumer");
+ MessageConsumer messageConsumer = session.createConsumer(destination);
+
+ // Now the messageConsumer is set up we can start the connection
+ System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages");
+ connection.start();
+
+ // Cycle round until all the messages are consumed.
+ Message message;
+ boolean end = false;
+ while (!end)
+ {
+ System.out.println(CLASS + ": Receiving a message");
+ message = messageConsumer.receive();
+ if (message instanceof TextMessage)
+ {
+ System.out.println(" - contents = " + ((TextMessage) message).getText());
+ if (((TextMessage) message).getText().equals("That's all, folks!"))
+ {
+ System.out.println("Received final message for " + _queueName);
+ end = true;
+ }
+ }
+ else
+ {
+ System.out.println(" not text message");
+ }
+ }
+
+ // Close the connection to the server
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ getInitialContext().close();
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java
new file mode 100644
index 0000000000..f595cc0abc
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Listener.java
@@ -0,0 +1,176 @@
+package org.apache.qpid.example.jmsexample.direct;
+
+import org.redhat.mrg.messaging.examples.BaseExample;
+
+import javax.jms.*;
+
+/**
+ * The example creates a MessageConsumer on the specified
+ * Queue and uses a MessageListener with this MessageConsumer
+ * in order to enable asynchronous delivery.
+ */
+public class Listener extends BaseExample implements MessageListener
+{
+ /* Used in log output. */
+ private static final String CLASS = "Listener";
+
+ /* The queue name */
+ private String _queueName;
+
+ /**
+ * An object to synchronize on.
+ */
+ private final static Object _lock = new Object();
+
+ /**
+ * A boolean to indicate a clean finish.
+ */
+ private static boolean _finished = false;
+
+ /**
+ * A boolean to indicate an unsuccesful finish.
+ */
+ private static boolean _failed = false;
+
+ /**
+ * Create an Listener client.
+ *
+ * @param args Command line arguments.
+ */
+ public Listener(String[] args)
+ {
+ super(CLASS, args);
+ _queueName = _argProcessor.getStringArgument("-queueName");
+ }
+
+ /**
+ * Run the message consumer example.
+ *
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ _options.put("-queueName", "Queue name");
+ _defaults.put("-queueName", "message_queue");
+ Listener listener = new Listener(args);
+ listener.runTest();
+ }
+
+ /**
+ * Start the example.
+ */
+ private void runTest()
+ {
+ try
+ {
+ // lookup the queue
+ Queue destination = (Queue) getInitialContext().lookup(_queueName);
+
+ // Declare the connection
+ Connection connection = getConnection();
+
+ // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection
+ // so that errors raised within the JMS client library can be reported to the application
+ System.out.println(
+ CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer");
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if available.
+ System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
+ System.exit(0);
+ }
+ });
+
+ // Create a session on the connection
+ // This session is a default choice of non-transacted and uses
+ // the auto acknowledge feature of a session.
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a MessageConsumer
+ System.out.println(CLASS + ": Creating a MessageConsumer");
+
+ MessageConsumer messageConsumer = session.createConsumer(destination);
+
+ // Set a message listener on the messageConsumer
+ messageConsumer.setMessageListener(this);
+
+ // Now the messageConsumer is set up we can start the connection
+ System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages");
+ connection.start();
+
+ // Wait for the messageConsumer to have received all the messages it needs
+ synchronized (_lock)
+ {
+ while (!_finished && !_failed)
+ {
+ _lock.wait();
+ }
+ }
+
+ // If the MessageListener abruptly failed (probably due to receiving a non-text message)
+ if (_failed)
+ {
+ System.out.println(CLASS + ": This sample failed as it received unexpected messages");
+ }
+
+ // Close the connection to the server
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ getInitialContext().close();
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ }
+ }
+
+ /**
+ * This method is required by the <CODE>MessageListener</CODE> interface. It
+ * will be invoked when messages are available.
+ * After receiving the finish message (That's all, folks!) it releases a lock so that the
+ * main program may continue.
+ *
+ * @param message The message.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ if (message instanceof TextMessage)
+ {
+ System.out.println(" - contents = " + ((TextMessage) message).getText());
+ if (((TextMessage) message).getText().equals("That's all, folks!"))
+ {
+ System.out.println("Shutting down listener for " + _queueName);
+ synchronized (_lock)
+ {
+ _finished = true;
+ _lock.notifyAll();
+ }
+ }
+ }
+ else
+ {
+ System.out.println(" [not text message]");
+ }
+ }
+ catch (JMSException exp)
+ {
+ System.out.println(CLASS + ": Caught an exception handling a received message");
+ exp.printStackTrace();
+ synchronized (_lock)
+ {
+ _failed = true;
+ _lock.notifyAll();
+ }
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java
new file mode 100644
index 0000000000..2d80799bed
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/Producer.java
@@ -0,0 +1,94 @@
+package org.apache.qpid.example.jmsexample.direct;
+
+import org.redhat.mrg.messaging.examples.BaseExample;
+
+import javax.jms.*;
+
+/**
+ * Message producer example, sends message to a queue.
+ */
+public class Producer extends BaseExample
+{
+ /* Used in log output. */
+ private static final String CLASS = "Producer";
+
+ /* The queue name */
+ private String _queueName;
+
+ /**
+ * Create a Producer client.
+ * @param args Command line arguments.
+ */
+ public Producer (String[] args)
+ {
+ super(CLASS, args);
+ _queueName = _argProcessor.getStringArgument("-queueName");
+ }
+
+ /**
+ * Run the message producer example.
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ _options.put("-queueName", "Queue name");
+ _defaults.put("-queueName", "message_queue");
+ Producer producer = new Producer(args);
+ producer.runTest();
+ }
+
+ private void runTest()
+ {
+ try
+ {
+ // lookup the queue
+ Queue destination = (Queue) getInitialContext().lookup(_queueName);
+
+ // Declare the connection
+ Connection connection = getConnection();
+
+ // Create a session on the connection
+ // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session.
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a Message producer
+ System.out.println(CLASS + ": Creating a Message PRoducer");
+ MessageProducer messageProducer = session.createProducer(destination);
+
+ // Create a Message
+ TextMessage message;
+ System.out.println(CLASS + ": Creating a TestMessage to send to the destination");
+ message = session.createTextMessage();
+
+ // Set a property for illustrative purposes
+ //message.setDoubleProperty("Amount", 10.1);
+
+ // Loop to publish the requested number of messages.
+ for (int i = 1; i < getNumberMessages() + 1; i++)
+ {
+ // NOTE: We have NOT HAD TO START THE CONNECTION TO BEGIN SENDING messages,
+ // this is different to the consumer end as a CONSUMERS CONNECTIONS MUST BE STARTED BEFORE RECEIVING.
+ message.setText("Message " + i);
+ System.out.println(CLASS + ": Sending message: " + i);
+ messageProducer.send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ // And send a final message to indicate termination.
+ message.setText("That's all, folks!");
+ messageProducer.send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ // Close the connection to the broker
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ getInitialContext().close();
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java
new file mode 100644
index 0000000000..6b37e2d47b
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Listener.java
@@ -0,0 +1,214 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.example.jmsexample.pubsub;
+
+import org.redhat.mrg.messaging.examples.BaseExample;
+
+import javax.jms.*;
+
+/**
+ * The example creates a MessageConsumer on the specified
+ * Topic and uses a MessageListener with this MessageConsumer
+ * in order to enable asynchronous delivery.
+ */
+public class Listener extends BaseExample
+{
+ /* Used in log output. */
+ private static final String CLASS = "Listener";
+
+ /* An object to synchronize on. */
+ private final static Object _lock = new Object();
+
+ /* A boolean to indicate a clean finish. */
+ private static int _finished = 0;
+
+ /* A boolean to indicate an unsuccesful finish. */
+ private static boolean _failed = false;
+
+ /**
+ * Create an Listener client.
+ *
+ * @param args Command line arguments.
+ */
+ public Listener(String[] args)
+ {
+ super(CLASS, args);
+ }
+
+ /**
+ * Run the message consumer example.
+ *
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ Listener listener = new Listener(args);
+ listener.runTest();
+ }
+
+ /**
+ * Start the example.
+ */
+ private void runTest()
+ {
+ try
+ {
+ // Declare the connection
+ TopicConnection connection = (TopicConnection) getConnection();
+
+ // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection
+ // so that errors raised within the JMS client library can be reported to the application
+ System.out.println(
+ CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer");
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if available.
+ System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
+ System.exit(0);
+ }
+ });
+
+ // Create a session on the connection
+ // This session is a default choice of non-transacted and uses
+ // the auto acknowledge feature of a session.
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+ TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // lookup the topics usa
+ Topic topic = (Topic) getInitialContext().lookup("usa");
+ // Create a Message Subscriber
+ System.out.println(CLASS + ": Creating a Message Subscriber");
+ TopicSubscriber messageSubscriber = session.createSubscriber(topic);
+ // Set a message listener on the messageConsumer
+ messageSubscriber.setMessageListener(new MyMessageListener("usa"));
+
+ // lookup the topics world.usa.news
+ topic = (Topic) getInitialContext().lookup("europe");
+ // Create a Message Subscriber
+ System.out.println(CLASS + ": Creating a Message Subscriber");
+ messageSubscriber = session.createSubscriber(topic);
+ // Set a message listener on the messageConsumer
+ messageSubscriber.setMessageListener(new MyMessageListener("europe"));
+
+ // lookup the topics world.europw
+ topic = (Topic) getInitialContext().lookup("news");
+ // Create a Message Subscriber
+ System.out.println(CLASS + ": Creating a Message Subscriber");
+ messageSubscriber = session.createSubscriber(topic);
+ // Set a message listener on the messageConsumer
+ messageSubscriber.setMessageListener(new MyMessageListener("news"));
+
+ // lookup the topics world.europw
+ topic = (Topic) getInitialContext().lookup("weather");
+ // Create a Message Subscriber
+ System.out.println(CLASS + ": Creating a Message Subscriber");
+ messageSubscriber = session.createSubscriber(topic);
+ // Set a message listener on the messageConsumer
+ messageSubscriber.setMessageListener(new MyMessageListener("weather"));
+
+ // Now the messageConsumer is set up we can start the connection
+ System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages");
+ connection.start();
+
+ // Wait for the messageConsumer to have received all the messages it needs
+ synchronized (_lock)
+ {
+ while (_finished < 3 && !_failed)
+ {
+ _lock.wait();
+ }
+ }
+
+ // If the MessageListener abruptly failed (probably due to receiving a non-text message)
+ if (_failed)
+ {
+ System.out.println(CLASS + ": This sample failed as it received unexpected messages");
+ }
+
+ // Close the connection to the server
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ getInitialContext().close();
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ }
+ }
+
+ private class MyMessageListener implements MessageListener
+ {
+ /* The number of messages processed. */
+ private int _messageCount = 0;
+
+ /* The topic this subscriber is subscribing to */
+ private String _topicName;
+
+ public MyMessageListener(String topicName)
+ {
+ _topicName = topicName;
+ }
+
+ /**
+ * This method is required by the <CODE>MessageListener</CODE> interface. It
+ * will be invoked when messages are available.
+ * After receiving the final message it releases a lock so that the
+ * main program may continue.
+ *
+ * @param message The message.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ // Increment the number of messages that have been received
+ _messageCount = _messageCount + 1;
+ // Print out the details of the just received message
+ System.out
+ .print(_topicName + ": message received: " + _messageCount + " " + message.getJMSMessageID());
+ System.out.println(" - contents = " + ((TextMessage) message).getText());
+ // If this is the total number of messages required
+ if (((TextMessage) message).getText().equals("That's all, folks!"))
+ {
+ System.out.println("Shutting down listener for " + _topicName);
+ synchronized (_lock)
+ {
+ _finished++;
+ _lock.notifyAll();
+ }
+ }
+ }
+ catch (JMSException exp)
+ {
+ System.out.println(CLASS + ": Caught an exception handling a received message");
+ exp.printStackTrace();
+ synchronized (_lock)
+ {
+ _failed = true;
+ _lock.notifyAll();
+ }
+ }
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java
new file mode 100644
index 0000000000..8594993e86
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/pubsub/Publisher.java
@@ -0,0 +1,148 @@
+package org.apache.qpid.example.jmsexample.pubsub;
+
+import org.redhat.mrg.messaging.examples.BaseExample;
+
+import javax.jms.*;
+
+/**
+ * Publish messages to topics
+ * <p/>
+ * <p>Run with <code>-help</code> argument for a description of command line arguments.
+ */
+public class Publisher extends BaseExample
+{
+ /* Used in log output. */
+ private static final String CLASS = "Publisher";
+
+ /**
+ * Create a Publisher client.
+ *
+ * @param args Command line arguments.
+ */
+ public Publisher(String[] args)
+ {
+ super(CLASS, args);
+ }
+
+ /**
+ * Run the message publisher example.
+ *
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ Publisher publisher = new Publisher(args);
+ publisher.runTest();
+ }
+
+ private void runTest()
+ {
+ try
+ {
+ // Declare the connection
+ TopicConnection connection = (TopicConnection) getConnection();
+
+ // Create a session on the connection
+ // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session.
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+ TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a Message
+ TextMessage message;
+ System.out.println(CLASS + ": Creating a TestMessage to send to the topics");
+ message = session.createTextMessage();
+
+ // lookup the topics .usa.weather
+ Topic topic = (Topic) getInitialContext().lookup("usa.weather");
+ message.setStringProperty("topicName", "usa.weather");
+ // Create a Message Publisher
+ System.out.println(CLASS + ": Creating a Message Publisherr");
+ TopicPublisher messagePublisher = session.createPublisher(topic);
+ publishMessages(message, messagePublisher);
+
+ // lookup the topics usa.news
+ topic = (Topic) getInitialContext().lookup("usa.news");
+ message.setStringProperty("topicName", "usa.news");
+ // Create a Message Publisher
+ System.out.println(CLASS + ": Creating a Message Publisherr");
+ messagePublisher = session.createPublisher(topic);
+ publishMessages(message, messagePublisher);
+
+ // lookup the topics europe.weather
+ topic = (Topic) getInitialContext().lookup("europe.weather");
+ message.setStringProperty("topicName", "europe.weather");
+ // Create a Message Publisher
+ System.out.println(CLASS + ": Creating a Message Publisherr");
+ messagePublisher = session.createPublisher(topic);
+ publishMessages(message, messagePublisher);
+
+ // lookup the topics europe.news
+ topic = (Topic) getInitialContext().lookup("europe.news");
+ message.setStringProperty("topicName", "europe.news");
+ // Create a Message Publisher
+ System.out.println(CLASS + ": Creating a Message Publisherr");
+ messagePublisher = session.createPublisher(topic);
+ publishMessages(message, messagePublisher);
+
+ // send the final message
+ message.setText("That's all, folks!");
+ topic = (Topic) getInitialContext().lookup("news");
+ message.setStringProperty("topicName", "news");
+ // Create a Message Publisher
+ System.out.println(CLASS + ": Creating a Message Publisherr");
+ messagePublisher = session.createPublisher(topic);
+ messagePublisher
+ .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ topic = (Topic) getInitialContext().lookup("weather");
+ message.setStringProperty("topicName", "weather");
+ // Create a Message Publisher
+ System.out.println(CLASS + ": Creating a Message Publisherr");
+ messagePublisher = session.createPublisher(topic);
+ messagePublisher
+ .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ topic = (Topic) getInitialContext().lookup("europe");
+ message.setStringProperty("topicName", "europe");
+ // Create a Message Publisher
+ System.out.println(CLASS + ": Creating a Message Publisherr");
+ messagePublisher = session.createPublisher(topic);
+ messagePublisher
+ .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ topic = (Topic) getInitialContext().lookup("usa");
+ message.setStringProperty("topicName", "usa");
+ // Create a Message Publisher
+ System.out.println(CLASS + ": Creating a Message Publisherr");
+ messagePublisher = session.createPublisher(topic);
+ messagePublisher
+ .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ // Close the connection to the broker
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ getInitialContext().close();
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ }
+ }
+
+ private void publishMessages(TextMessage message, TopicPublisher messagePublisher) throws JMSException
+ {
+ // Loop to publish the requested number of messages.
+ for (int i = 1; i < getNumberMessages() + 1; i++)
+ {
+ // NOTE: We have NOT HAD TO START THE CONNECTION TO BEGIN SENDING messages,
+ // this is different to the consumer end as a CONSUMERS CONNECTIONS MUST BE STARTED BEFORE RECEIVING.
+ message.setText("Message " + i);
+ System.out.println(CLASS + ": Sending message: " + i);
+ messagePublisher
+ .send(message, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/MessageMirror.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/MessageMirror.java
new file mode 100644
index 0000000000..7b2bf4c780
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/MessageMirror.java
@@ -0,0 +1,166 @@
+package org.apache.qpid.example.jmsexample.requestResponse;
+
+import org.redhat.mrg.messaging.examples.BaseExample;
+
+import javax.jms.*;
+
+/**
+ * The example creates a MessageConsumer on the specified
+ * Destination which is used to synchronously consume messages. If a
+ * received message has a ReplyTo header then a new response message is sent
+ * to that specified destination.
+ *
+ */
+public class MessageMirror extends BaseExample
+{
+ /* Used in log output. */
+ private static final String CLASS = "MessageMirror";
+
+ /* The destination type */
+ private String _destinationType;
+
+ /* The destination Name */
+ private String _destinationName;
+
+ /**
+ * Create a MessageMirror client.
+ * @param args Command line arguments.
+ */
+ public MessageMirror(String[] args)
+ {
+ super(CLASS, args);
+ _destinationType = _argProcessor.getStringArgument("-destinationType");
+ _destinationName = _argProcessor.getStringArgument("-destinationName");
+ }
+
+ /**
+ * Run the message mirror example.
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ _options.put("-destinationType", "Destination Type: queue/topic");
+ _defaults.put("-destinationType", "queue");
+ _options.put("-destinationName", "Destination Name");
+ _defaults.put("-destinationName", "message_queue");
+ MessageMirror messageMirror = new MessageMirror(args);
+ messageMirror.runTest();
+ }
+
+ /**
+ * Start the example.
+ */
+ private void runTest()
+ {
+ try
+ {
+ Destination destination;
+
+ if (_destinationType.equals("queue"))
+ {
+ // Lookup the queue
+ System.out.println(CLASS + ": Looking up queue with name: " + _destinationName);
+ destination = (Queue) getInitialContext().lookup(_destinationName);
+ }
+ else
+ {
+ // Lookup the topic
+ System.out.println(CLASS + ": Looking up topic with name: " + _destinationName);
+ destination = (Topic) getInitialContext().lookup(_destinationName);
+ }
+
+ // Declare the connection
+ Connection connection = getConnection();
+
+ // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection
+ // so that errors raised within the JMS client library can be reported to the application
+ System.out.println(CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer");
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if available.
+ // The connection may have broken invoke reconnect code if available.
+ System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
+ System.exit(0);
+ }
+ });
+
+ // Create a session on the connection
+ // This session is a default choice of non-transacted and uses
+ // the auto acknowledge feature of a session.
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a MessageConsumer
+ System.out.println(CLASS + ": Creating a MessageConsumer");
+ MessageConsumer messageConsumer = session.createConsumer(destination);
+
+ /**
+ * Create a MessageProducer - note that although we create the
+ */
+ System.out.println(CLASS + ": Creating a MessageProducer");
+ MessageProducer messageProducer;
+
+ // Now the messageConsumer is set up we can start the connection
+ System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages");
+ connection.start();
+
+ // Cycle round until all the messages are consumed.
+ Message requestMessage;
+ TextMessage responseMessage;
+ boolean end = false;
+ while (!end)
+ {
+ System.out.println(CLASS + ": Receiving the message");
+
+ requestMessage = messageConsumer.receive();
+
+ // Print out the details of the just received message
+ System.out.println(CLASS + ": Message received:");
+ System.out.println("\tID=" + requestMessage.getJMSMessageID());
+ System.out.println("\tCorrelationID=" + requestMessage.getJMSCorrelationID());
+
+ if (requestMessage instanceof TextMessage)
+ {
+ if (((TextMessage) requestMessage).getText().equals("That's all, folks!"))
+ {
+ System.out.println("Received final message for " + destination);
+ end = true;
+ }
+ System.out.println("\tContents = " + ((TextMessage)requestMessage).getText());
+ }
+
+ // Now bounce the message if a ReplyTo header was set.
+ if (requestMessage.getJMSReplyTo() != null)
+ {
+ System.out.println("Activating response queue listener for: " + destination);
+ responseMessage = session.createTextMessage("Activating response queue listener for: " + destination);
+ String correlationID = requestMessage.getJMSCorrelationID();
+ if (correlationID != null)
+ {
+ responseMessage.setJMSCorrelationID(correlationID);
+ }
+ messageProducer = session.createProducer(requestMessage.getJMSReplyTo()) ;
+ messageProducer.send(responseMessage);
+ }
+ System.out.println();
+ }
+
+ // Close the connection to the server
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ getInitialContext().close();
+ }
+ catch (Exception exp)
+ {
+ exp.printStackTrace();
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/P2PRequestor.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/P2PRequestor.java
new file mode 100644
index 0000000000..6331491d2e
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/P2PRequestor.java
@@ -0,0 +1,147 @@
+package org.apache.qpid.example.jmsexample.requestResponse;
+
+import org.redhat.mrg.messaging.examples.BaseExample;
+
+import javax.jms.*;
+
+/**
+ * This example illustrates the use of the JMS utility class <code>QueueRequestor</code>
+ * which provides a synchronous RPC-like abstraction using temporary destinations
+ * to deliver responses back to the client.
+ *
+ * <p>Run with <code>-help</code> argument for a description of command line arguments.
+ *
+ */
+public class P2PRequestor extends BaseExample
+{
+ /* Used in log output. */
+ private static final String CLASS = "P2PRequestor";
+
+ /* The queue name */
+ private String _queueName;
+
+ /**
+ * Create a P2PRequestor client.
+ * @param args Command line arguments.
+ */
+ public P2PRequestor(String[] args)
+ {
+ super(CLASS, args);
+ _queueName = _argProcessor.getStringArgument("-queueName");
+ }
+
+ /**
+ * Run the message requestor example.
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ _options.put("-queueName", "The queue name");
+ _defaults.put("-queueName", "message_queue");
+ P2PRequestor requestor = new P2PRequestor(args);
+ requestor.runTest();
+ }
+
+ /**
+ * Start the example.
+ */
+ private void runTest()
+ {
+ try
+ {
+ // Declare the connection
+ QueueConnection connection = (QueueConnection) getConnection();
+
+ // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection
+ // so that errors raised within the JMS client library can be reported to the application
+ System.out.println(CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer");
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if available.
+ // The connection may have broken invoke reconnect code if available.
+ System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
+ System.exit(0);
+ }
+ });
+
+ // Create a session on the connection.
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+ QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Lookup the destination
+ System.out.println(CLASS + ": Looking up queue with name: " + _queueName);
+ Queue destination = (Queue) getInitialContext().lookup(_queueName);
+
+ // Create a QueueRequestor
+ System.out.println(CLASS + ": Creating a QueueRequestor");
+
+ QueueRequestor requestor = new QueueRequestor(session, destination);
+
+ // Now start the connection
+ System.out.println(CLASS + ": Starting connection");
+ connection.start();
+
+ // Create a message to send as a request for service
+ TextMessage request;
+
+ request = session.createTextMessage("\"Twas brillig, and the slithy toves\",\n" +
+ "\t\t\"Did gire and gymble in the wabe.\",\n" +
+ "\t\t\"All mimsy were the borogroves,\",\n" +
+ "\t\t\"And the mome raths outgrabe.\"");
+
+ // Declare a message to be used for receiving any response
+ Message response;
+
+ // Get the number of times that this sample should request service
+ for (int i = 0; i < getNumberMessages(); i++)
+ {
+ /**
+ * Set a message correlation value. This is not strictly required it is
+ * just an example of how messages requests can be tied together.
+ */
+ request.setJMSCorrelationID("Message " + i);
+ System.out.println(CLASS + ": Sending request " + i);
+
+ response = requestor.request(request);
+
+ // Print out the details of the message sent
+ System.out.println(CLASS + ": Message details of request");
+ System.out.println("\tID = " + request.getJMSMessageID());
+ System.out.println("\tCorrelationID = " + request.getJMSCorrelationID());
+ System.out.println("\tContents= " + ((TextMessage)request).getText());
+
+ // Print out the details of the response received
+ System.out.println(CLASS + ": Message details of response");
+ System.out.println("\tID = " + response.getJMSMessageID());
+ System.out.println("\tCorrelationID = " + response.getJMSCorrelationID());
+ if (response instanceof TextMessage)
+ {
+ System.out.println("\tContents= " + ((TextMessage) response).getText());
+ }
+
+ System.out.println();
+ }
+
+ //send the final message for ending the mirror
+ // And send a final message to indicate termination.
+ request.setText("That's all, folks!");
+ MessageProducer messageProducer = session.createProducer(destination);
+ messageProducer.send(request, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ // Close the connection to the server
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ getInitialContext().close();
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/PubSubRequestor.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/PubSubRequestor.java
new file mode 100644
index 0000000000..cb91500aed
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/requestResponse/PubSubRequestor.java
@@ -0,0 +1,139 @@
+package org.apache.qpid.example.jmsexample.requestResponse;
+
+import org.redhat.mrg.messaging.examples.BaseExample;
+
+import javax.jms.*;
+
+/**
+ * This example illustrates the use of the JMS utility class <code>TopicRequestor</code>
+ * which provides a synchronous RPC-like abstraction using temporary destinations
+ * to deliver responses back to the client.
+ */
+public class PubSubRequestor extends BaseExample
+{
+ /* Used in log output. */
+ private static final String CLASS = "PubSubRequestor";
+
+ /* The topic name */
+ private String _topicName;
+
+ /**
+ * Create a PubSubRequestor client.
+ *
+ * @param args Command line arguments.
+ */
+ public PubSubRequestor(String[] args)
+ {
+ super(CLASS, args);
+ _topicName = _argProcessor.getStringArgument("-topicName");
+ }
+
+ /**
+ * Run the message requestor example.
+ *
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ _options.put("-topicName", "The topic name");
+ _defaults.put("-topicName", "world");
+ PubSubRequestor requestor = new PubSubRequestor(args);
+ requestor.runTest();
+ }
+
+ /**
+ * Start the example.
+ */
+ private void runTest()
+ {
+ try
+ {
+ // Declare the connection
+ TopicConnection connection = (TopicConnection) getConnection();
+
+ // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection
+ // so that errors raised within the JMS client library can be reported to the application
+ System.out.println(
+ CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer");
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if available.
+ // The connection may have broken invoke reconnect code if available.
+ System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
+ System.exit(0);
+ }
+ });
+
+ // Create a session on the connection.
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+ TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Lookup the destination
+ System.out.println(CLASS + ": Looking up topic with name: " + _topicName);
+ Topic destination = (Topic) getInitialContext().lookup(_topicName);
+
+ // Create a TopicRequestor
+ System.out.println(CLASS + ": Creating a TopicRequestor");
+ TopicRequestor requestor = new TopicRequestor(session, destination);
+
+ // Now start the connection
+ System.out.println(CLASS + ": Starting connection");
+ connection.start();
+
+ // Create a message to send as a request for service
+ TextMessage request;
+ request = session.createTextMessage(
+ "\"Twas brillig, and the slithy toves\",\n" + "\t\t\"Did gire and gymble in the wabe.\",\n" + "\t\t\"All mimsy were the borogroves,\",\n" + "\t\t\"And the mome raths outgrabe.\"");
+
+ // Declare a message to be used for receiving any response
+ Message response;
+
+ // Get the number of times that this sample should request service
+ for (int i = 0; i < getNumberMessages(); i++)
+ {
+ /**
+ * Set a message correlation value. This is not strictly required it is
+ * just an example of how messages requests can be tied together.
+ */
+ request.setJMSCorrelationID("Message " + i);
+ System.out.println(CLASS + ": Sending request " + i);
+
+ response = requestor.request(request);
+
+ // Print out the details of the message sent
+ System.out.println(CLASS + ": Message details of request");
+ System.out.println("\tID = " + request.getJMSMessageID());
+ System.out.println("\tCorrelationID = " + request.getJMSCorrelationID());
+ System.out.println("\tContents= " + ((TextMessage) request).getText());
+
+ // Print out the details of the response received
+ System.out.println(CLASS + ": Message details of response");
+ System.out.println("\tID = " + response.getJMSMessageID());
+ System.out.println("\tCorrelationID = " + response.getJMSCorrelationID());
+ if (response instanceof TextMessage)
+ {
+ System.out.println("\tContents= " + ((TextMessage) response).getText());
+ }
+ }
+ // And send a final message to indicate termination.
+ request.setText("That's all, folks!");
+ MessageProducer messageProducer = session.createProducer(destination);
+ messageProducer.send(request, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ // Close the connection to the server
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ getInitialContext().close();
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java
new file mode 100644
index 0000000000..b7df3db345
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/transacted/QueueToTopic.java
@@ -0,0 +1,244 @@
+package org.apache.qpid.example.jmsexample.transacted;
+
+import org.redhat.mrg.messaging.examples.BaseExample;
+
+import javax.jms.*;
+
+/**
+ * Transactional message example sends a number of messages to a Queue
+ * and then uses a transacted session to move them from the Queue to a Topic.
+ * <p/>
+ * <p>The program completes the following steps:
+ * <ul>
+ * <li>Publish the specified number of messages to the queue.</li>
+ * <li>Within a transacted session consume all messages from the queue
+ * and publish them to the topic.</li>
+ * <li>By default commit the transacted session, unless the "<code>-rollback true</code>"
+ * option is specified in which case roll it back.</li>
+ * <li>Check for outstanding messages on the queue.</li>
+ * <li>Check for outstanding messages on the topic.</li>
+ * </ul>
+ * <p/>
+ */
+public class QueueToTopic extends BaseExample
+{
+ /* Used in log output. */
+ private static final String CLASS = "QueueToTopic";
+
+ /* The queue name */
+ private String _queueName;
+
+ /* The topic name */
+ private String _topicName;
+
+ /* Specify if the transaction is committed */
+ private boolean _commit;
+
+ /**
+ * Create a QueueToTopic client.
+ *
+ * @param args Command line arguments.
+ */
+ public QueueToTopic(String[] args)
+ {
+ super(CLASS, args);
+ _queueName = _argProcessor.getStringArgument("-queueName");
+ _topicName = _argProcessor.getStringArgument("-topicName");
+ _commit = _argProcessor.getBooleanArgument("-commit");
+ }
+
+ /**
+ * Run the message mover example.
+ *
+ * @param args Command line arguments.
+ * @see BaseExample
+ */
+ public static void main(String[] args)
+ {
+ _options.put("-topicName", "The topic name");
+ _defaults.put("-topicName", "world");
+ _options.put("-queueName", "The queue name");
+ _defaults.put("-queueName", "message_queue");
+ _options.put("-commit", "Commit or rollback the transaction (true|false)");
+ _defaults.put("-commit", "true");
+ QueueToTopic mover = new QueueToTopic(args);
+ mover.runTest();
+ }
+
+ private void runTest()
+ {
+ try
+ {
+
+ // Lookup the queue
+ System.out.println(CLASS + ": Looking up queue with name: " + _queueName);
+ Queue queue = (Queue) getInitialContext().lookup(_queueName);
+
+ // Lookup the topic
+ System.out.println(CLASS + ": Looking up topic with name: " + _topicName);
+ Topic topic = (Topic) getInitialContext().lookup(_topicName);
+
+ // Declare the connection
+ Connection connection = getConnection();
+
+ // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection
+ // so that errors raised within the JMS client library can be reported to the application
+ System.out.println(
+ CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer");
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if available.
+ System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
+ System.err.println(
+ CLASS + ": If this was a real application it should now go through reconnect code");
+ System.err.println();
+ System.err.println("Exception: " + jmse);
+ System.err.println();
+ System.err.println("Now exiting.");
+ System.exit(0);
+ }
+ });
+
+ // Start the connection
+ connection.start();
+
+ /**
+ * Create nonTransactedSession. This non-transacted auto-ack session is used to create the MessageProducer
+ * that is used to populate the queue and the MessageConsumer that is used to consume the messages
+ * from the topic.
+ */
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+ Session nonTransactedSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Make sure that the queue is empty
+ System.out.print(CLASS + ": Purging messages from queue...");
+ MessageConsumer queueMessageConsumer = nonTransactedSession.createConsumer(queue);
+ Message purgedMessage;
+ int numberPurged = -1;
+ do
+ {
+ purgedMessage = queueMessageConsumer.receiveNoWait();
+ numberPurged++;
+ }
+ while (purgedMessage != null);
+ System.out.println(numberPurged + " message(s) purged.");
+
+ // Create the MessageProducer for the queue
+ System.out.println(CLASS + ": Creating a MessageProducer for the queue");
+ MessageProducer messageProducer = nonTransactedSession.createProducer(queue);
+
+ // Now create the MessageConsumer for the topic
+ System.out.println(CLASS + ": Creating a MessageConsumer for the topic");
+ MessageConsumer topicMessageConsumer = nonTransactedSession.createConsumer(topic);
+
+ // Create a textMessage. We're using a TextMessage for this example.
+ System.out.println(CLASS + ": Creating a TestMessage to send to the destination");
+ TextMessage textMessage = nonTransactedSession.createTextMessage("Sample text message");
+
+ // Loop to publish the requested number of messages to the queue.
+ for (int i = 1; i < getNumberMessages() + 1; i++)
+ {
+ messageProducer
+ .send(textMessage, getDeliveryMode(), Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ // Print out details of textMessage just sent
+ System.out.println(CLASS + ": Message sent: " + i + " " + textMessage.getJMSMessageID());
+ }
+
+ // Create a new transacted Session to move the messages from the queue to the topic
+ Session transactedSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ // Create a new message consumer from the queue
+ MessageConsumer transactedConsumer = transactedSession.createConsumer(queue);
+
+ // Create a new message producer for the topic
+ MessageProducer transactedProducer = transactedSession.createProducer(topic);
+
+ // Loop to consume the messages from the queue and publish them to the topic
+ Message receivedMessage;
+ for (int i = 1; i < getNumberMessages() + 1; i++)
+ {
+ // Receive a message
+ receivedMessage = transactedConsumer.receive();
+ System.out.println(CLASS + ": Moving message: " + i + " " + receivedMessage.getJMSMessageID());
+ // Publish it to the topic
+ transactedProducer.send(receivedMessage);
+ }
+
+ // Either commit or rollback the transacted session based on the command line args.
+ if (_commit)
+ {
+ System.out.println(CLASS + ": Committing transacted session.");
+ transactedSession.commit();
+ }
+ else
+ {
+ System.out.println(CLASS + ": Rolling back transacted session.");
+ transactedSession.rollback();
+ }
+
+ // Now consume any outstanding messages on the queue
+ System.out.print(CLASS + ": Mopping up messages from queue");
+ if (_commit)
+ {
+ System.out.print(" (expecting none)...");
+ }
+ else
+ {
+ System.out.print(" (expecting " + getNumberMessages() + ")...");
+ }
+
+ Message moppedMessage;
+ int numberMopped = 0;
+ do
+ {
+ moppedMessage = queueMessageConsumer.receiveNoWait();
+ if( moppedMessage != null)
+ {
+ numberMopped++;
+ }
+ }
+ while (moppedMessage != null);
+ System.out.println(numberMopped + " message(s) mopped.");
+
+ // Now consume any outstanding messages for the topic subscriber
+ System.out.print(CLASS + ": Mopping up messages from topic");
+
+ if (_commit)
+ {
+ System.out.print(" (expecting " + getNumberMessages() + ")...");
+ }
+ else
+ {
+ System.out.print(" (expecting none)...");
+ }
+
+ numberMopped = 0;
+ do
+ {
+ moppedMessage = topicMessageConsumer.receiveNoWait();
+ if( moppedMessage != null)
+ {
+ numberMopped++;
+ }
+ }
+ while (moppedMessage != null);
+ System.out.println(numberMopped + " message(s) mopped.");
+
+ // Close the QueueConnection to the server
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ getInitialContext().close();
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ }
+ }
+}