summaryrefslogtreecommitdiff
path: root/java/client/example/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2008-01-16 22:31:06 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2008-01-16 22:31:06 +0000
commit9c6b48d196c9d90a1287f5abdb27781f3bd34ac7 (patch)
treee6d3b760e832d0ed007fcef350e887f1e95a3bd2 /java/client/example/src
parent4b1efc5908b5a8c5a9bc274f7a8e8ff2d5ddccab (diff)
downloadqpid-python-9c6b48d196c9d90a1287f5abdb27781f3bd34ac7.tar.gz
This example is written to interoperate with the c++ and python examples and looksup a destination defined in the properties file. The destination defines a fannout exchange and a queue
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@612597 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/example/src')
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Consumer.java163
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Listener.java210
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Producer.java128
-rw-r--r--java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties31
4 files changed, 532 insertions, 0 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Consumer.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Consumer.java
new file mode 100755
index 0000000000..1cdd2d941a
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Consumer.java
@@ -0,0 +1,163 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.jmsexample.fanout;
+
+import java.util.Properties;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+/**
+ * The example creates a MessageConsumer on the specified
+ * Queue which is used to synchronously consume messages.
+ */
+public class Consumer
+{
+ /**
+ * Used in log output.
+ */
+ private static final String CLASS = "Consumer";
+
+ /**
+ * Create a Consumer client.
+ *
+ * @param args Command line arguments.
+ */
+ public Consumer()
+ {
+ }
+
+ /**
+ * Run the message consumer example.
+ *
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ //_options.put("-queueName", "Queue name");
+ //_defaults.put("-queueName", "message_queue");
+ Consumer syncConsumer = new Consumer();
+ syncConsumer.runTest();
+ }
+
+ /**
+ * Start the example.
+ */
+ private void runTest()
+ {
+ try
+ {
+ // Load JNDI properties
+ Properties properties = new Properties();
+ properties.load(this.getClass().getResourceAsStream("direct.properties"));
+
+ //Create the initial context
+ Context ctx = new InitialContext(properties);
+
+ // look up destination
+ Destination destination = (Destination)ctx.lookup("directQueue");
+
+ // Lookup the connection factory
+ ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("local");
+ // create the connection
+ Connection connection = conFac.createConnection();
+
+ // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection
+ // so that errors raised within the JMS client library can be reported to the application
+ System.out.println(
+ CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer");
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if available.
+ // The connection may have broken invoke reconnect code if available.
+ System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
+ System.exit(0);
+ }
+ });
+
+ // Create a session on the connection
+ // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session.
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a MessageConsumer
+ System.out.println(CLASS + ": Creating a MessageConsumer");
+ MessageConsumer messageConsumer = session.createConsumer(destination);
+
+ // Now the messageConsumer is set up we can start the connection
+ System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages");
+ connection.start();
+
+ // Cycle round until all the messages are consumed.
+ Message message;
+ boolean end = false;
+ while (!end)
+ {
+ message = messageConsumer.receive();
+ String text = "";
+ if (message instanceof TextMessage)
+ {
+ text = ((TextMessage) message).getText();
+ }
+ else
+ {
+ byte[] body = new byte[(int) ((BytesMessage) message).getBodyLength()];
+ ((BytesMessage) message).readBytes(body);
+ text = new String(body);
+ }
+ if (text.equals("That's all, folks!"))
+ {
+ System.out.println(CLASS + ": Received final message " + text);
+ end = true;
+ }
+ else
+ {
+ System.out.println(CLASS + ": Received message: " + text);
+ }
+ }
+
+ // Close the connection to the server
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ ctx.close();
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Listener.java
new file mode 100755
index 0000000000..d7d2956dbb
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Listener.java
@@ -0,0 +1,210 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.jmsexample.fanout;
+
+import java.util.Properties;
+
+import org.apache.qpid.example.jmsexample.common.BaseExample;
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+/**
+ * The example creates a MessageConsumer on the specified
+ * Queue and uses a MessageListener with this MessageConsumer
+ * in order to enable asynchronous delivery.
+ */
+public class Listener implements MessageListener
+{
+ /* Used in log output. */
+ private static final String CLASS = "Listener";
+
+ /**
+ * An object to synchronize on.
+ */
+ private final static Object _lock = new Object();
+
+ /**
+ * A boolean to indicate a clean finish.
+ */
+ private static boolean _finished = false;
+
+ /**
+ * A boolean to indicate an unsuccesful finish.
+ */
+ private static boolean _failed = false;
+
+ /**
+ * Create an Listener client.
+ *
+ * @param args Command line arguments.
+ */
+ public Listener()
+ {
+ //super(CLASS, args);
+ //_queueName = _argProcessor.getStringArgument("-queueName");
+ }
+
+ /**
+ * Run the message consumer example.
+ *
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ //_options.put("-queueName", "Queue name");
+ //_defaults.put("-queueName", "message_queue");
+ Listener listener = new Listener();
+ listener.runTest();
+ }
+
+ /**
+ * Start the example.
+ */
+ private void runTest()
+ {
+ try
+ {
+ Properties properties = new Properties();
+ properties.load(this.getClass().getResourceAsStream("direct.properties"));
+
+ //Create the initial context
+ Context ctx = new InitialContext(properties);
+
+ Destination destination = (Destination)ctx.lookup("directQueue");
+
+ // Declare the connection
+ ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("local");
+ Connection connection = conFac.createConnection();
+
+ // As this application is using a MessageConsumer we need to set an ExceptionListener on the connection
+ // so that errors raised within the JMS client library can be reported to the application
+ System.out.println(
+ CLASS + ": Setting an ExceptionListener on the connection as sample uses a MessageConsumer");
+
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException jmse)
+ {
+ // The connection may have broken invoke reconnect code if available.
+ System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
+ System.exit(0);
+ }
+ });
+
+ // Create a session on the connection
+ // This session is a default choice of non-transacted and uses
+ // the auto acknowledge feature of a session.
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a MessageConsumer
+ System.out.println(CLASS + ": Creating a MessageConsumer");
+
+ MessageConsumer messageConsumer = session.createConsumer(destination);
+
+ // Set a message listener on the messageConsumer
+ messageConsumer.setMessageListener(this);
+
+ // Now the messageConsumer is set up we can start the connection
+ System.out.println(CLASS + ": Starting connection so MessageConsumer can receive messages");
+ connection.start();
+
+ // Wait for the messageConsumer to have received all the messages it needs
+ synchronized (_lock)
+ {
+ while (!_finished && !_failed)
+ {
+ _lock.wait();
+ }
+ }
+
+ // If the MessageListener abruptly failed (probably due to receiving a non-text message)
+ if (_failed)
+ {
+ System.out.println(CLASS + ": This sample failed as it received unexpected messages");
+ }
+
+ // Close the connection to the server
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ ctx.close();
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ }
+ }
+
+ /**
+ * This method is required by the <CODE>MessageListener</CODE> interface. It
+ * will be invoked when messages are available.
+ * After receiving the finish message (That's all, folks!) it releases a lock so that the
+ * main program may continue.
+ *
+ * @param message The message.
+ */
+ public void onMessage(Message message)
+ {
+ try
+ {
+ String text = "";
+ if (message instanceof TextMessage)
+ {
+ text = ((TextMessage) message).getText();
+ }
+ else
+ {
+ byte[] body = new byte[(int) ((BytesMessage) message).getBodyLength()];
+ ((BytesMessage) message).readBytes(body);
+ text = new String(body);
+ }
+ if (text.equals("That's all, folks!"))
+ {
+ System.out.println(CLASS + ": Received final message " + text);
+ synchronized (_lock)
+ {
+ _finished = true;
+ _lock.notifyAll();
+ }
+ }
+ else
+ {
+ System.out.println(CLASS + ": Received message: " + text);
+ }
+ }
+ catch (JMSException exp)
+ {
+ System.out.println(CLASS + ": Caught an exception handling a received message");
+ exp.printStackTrace();
+ synchronized (_lock)
+ {
+ _failed = true;
+ _lock.notifyAll();
+ }
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Producer.java b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Producer.java
new file mode 100755
index 0000000000..c917f6d753
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/Producer.java
@@ -0,0 +1,128 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.jmsexample.fanout;
+
+import java.util.Properties;
+
+import org.apache.qpid.example.jmsexample.common.BaseExample;
+
+
+import javax.jms.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+
+/**
+ * Message producer example, sends message to a queue.
+ */
+public class Producer
+{
+ /* Used in log output. */
+ private static final String CLASS = "Producer";
+
+ /* The queue name */
+ private String _queueName;
+ private int numMessages = 10;
+ private short deliveryMode = 0;
+
+ /**
+ * Create a Producer client.
+ * @param args Command line arguments.
+ */
+ public Producer ()
+ {
+ // super(CLASS, args);
+ //_queueName = _argProcessor.getStringArgument("-queueName");
+ }
+
+ /**
+ * Run the message producer example.
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args)
+ {
+ // _options.put("-queueName", "Queue name");
+ // _defaults.put("-queueName", "message_queue");
+ Producer producer = new Producer();
+ producer.runTest();
+ }
+
+ private void runTest()
+ {
+ try
+ {
+
+ Properties properties = new Properties();
+ properties.load(this.getClass().getResourceAsStream("direct.properties"));
+
+ //Create the initial context
+ Context ctx = new InitialContext(properties);
+
+ Destination destination = (Destination)ctx.lookup("directQueue");
+
+ // Declare the connection
+ ConnectionFactory conFac = (ConnectionFactory)ctx.lookup("local");
+ Connection connection = conFac.createConnection();
+
+ // Create a session on the connection
+ // This session is a default choice of non-transacted and uses the auto acknowledge feature of a session.
+ System.out.println(CLASS + ": Creating a non-transacted, auto-acknowledged session");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // lookup the queue
+ //Queue destination = session.createQueue(_queueName);
+
+ // Create a Message producer
+ System.out.println(CLASS + ": Creating a Message Producer");
+ MessageProducer messageProducer = session.createProducer(destination);
+
+ // Create a Message
+ TextMessage message;
+ System.out.println(CLASS + ": Creating a TestMessage to send to the destination");
+
+ // Loop to publish the requested number of messages.
+ for (int i = 1; i < numMessages + 1; i++)
+ {
+ // NOTE: We have NOT HAD TO START THE CONNECTION TO BEGIN SENDING messages,
+ // this is different to the consumer end as a CONSUMERS CONNECTIONS MUST BE STARTED BEFORE RECEIVING.
+ message = session.createTextMessage("Message " + i);
+ System.out.println(CLASS + ": Sending message: " + i);
+ messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ // And send a final message to indicate termination.
+ message = session.createTextMessage("That's all, folks!");
+ messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+
+ // Close the connection to the broker
+ System.out.println(CLASS + ": Closing connection");
+ connection.close();
+
+ // Close the JNDI reference
+ System.out.println(CLASS + ": Closing JNDI context");
+ ctx.close();
+ }
+ catch (Exception exp)
+ {
+ System.err.println(CLASS + ": Caught an Exception: " + exp);
+ exp.printStackTrace();
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties
new file mode 100644
index 0000000000..446327d7e1
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/fanout/fanout.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory
+
+# use the following property to configure the default connector
+#java.naming.provider.url - ignored.
+
+# register some connection factories
+# connectionfactory.[jndiname] = [ConnectionURL]
+connectionfactory.local = qpid:password=pass;username=name@tcp:localhost:5672
+
+# Register an AMQP destination in JNDI
+# NOTE: Qpid currently only supports direct,topics and headers
+# destination.[jniName] = [BindingURL]
+destination.directQueue = fanout://amq.fanout//message_queue \ No newline at end of file