diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2008-01-11 18:26:00 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2008-01-11 18:26:00 +0000 |
| commit | b425b50dbcf2062b10cee8e6dd48e4f6d3240a03 (patch) | |
| tree | e831ea1a8f8aa6d4c41cc2ad6a41f77a96a32583 /java/client/example/src | |
| parent | fa9a8990ba8d84b2b01928ba82d561c0ba73d8d2 (diff) | |
| download | qpid-python-b425b50dbcf2062b10cee8e6dd48e4f6d3240a03.tar.gz | |
Added AMQP API examples. The intention is to include these in the M3 release.
Also they should interoperate with python and c++ examples.
Currently I couldn't test complete interoperability due to some c++ examples not compiling and also some python examples doesn't seem to run out of the box due to dependency issues.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@611251 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/example/src')
8 files changed, 646 insertions, 0 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java new file mode 100755 index 0000000000..a9257ccf70 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java @@ -0,0 +1,51 @@ +package org.apache.qpid.example.amqpexample.direct; + +import org.apache.qpidity.nclient.Client; +import org.apache.qpidity.nclient.Connection; +import org.apache.qpidity.nclient.Session; + +/** + * This creates a queue a queue and binds it to the + * amq.direct exchange + * + */ +public class DeclareQueue +{ + + public static void main(String[] args) + { + // Create connection + Connection con = Client.createConnection(); + try + { + con.connect("localhost", 5672, "test", "guest", "guest"); + } + catch(Exception e) + { + System.out.print("Error connecting to broker"); + e.printStackTrace(); + } + + // Create session + Session session = con.createSession(0); + + // declare and bind queue + session.queueDeclare("message_queue", null, null); + session.queueBind("message_queue", "amq.direct", "routing_key", null); + + // confirm completion + session.sync(); + + //cleanup + session.sessionClose(); + try + { + con.close(); + } + catch(Exception e) + { + System.out.print("Error closing broker connection"); + e.printStackTrace(); + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java new file mode 100755 index 0000000000..0db54af3b6 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java @@ -0,0 +1,64 @@ +package org.apache.qpid.example.amqpexample.direct; + +import org.apache.qpidity.nclient.Client; +import org.apache.qpidity.nclient.Connection; +import org.apache.qpidity.nclient.Session; +import org.apache.qpidity.transport.DeliveryProperties; + +public class DirectProducer +{ + /** + * This sends 10 messages to the + * amq.direct exchange using the + * routing key as "routing_key" + * + */ + public static void main(String[] args) + { + // Create connection + Connection con = Client.createConnection(); + try + { + con.connect("localhost", 5672, "test", "guest", "guest"); + } + catch(Exception e) + { + System.out.print("Error connecting to broker"); + e.printStackTrace(); + } + + // Create session + Session session = con.createSession(0); + DeliveryProperties deliveryProps = new DeliveryProperties(); + deliveryProps.setRoutingKey("routing_key"); + + for (int i=0; i<10; i++) + { + session.messageTransfer("amq.direct", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.header(deliveryProps); + session.data("Message " + i); + session.endData(); + } + + session.messageTransfer("amq.direct", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.header(deliveryProps); + session.data("That's all, folks!"); + session.endData(); + + // confirm completion + session.sync(); + + //cleanup + session.sessionClose(); + try + { + con.close(); + } + catch(Exception e) + { + System.out.print("Error closing broker connection"); + e.printStackTrace(); + } + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java new file mode 100755 index 0000000000..244dbdbeae --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java @@ -0,0 +1,112 @@ +package org.apache.qpid.example.amqpexample.direct; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.api.Message; +import org.apache.qpidity.nclient.Client; +import org.apache.qpidity.nclient.Connection; +import org.apache.qpidity.nclient.Session; +import org.apache.qpidity.nclient.util.MessageListener; +import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; + +/** + * This listens to messages on a queue and terminates + * when it sees the final message + * + */ +public class Listener implements MessageListener +{ + boolean finish = false; + + public void onMessage(Message m) + { + String data = null; + + try + { + ByteBuffer buf = m.readData(); + byte[] b = new byte[buf.remaining()]; + buf.get(b); + data = new String(b); + } + catch(Exception e) + { + System.out.print("Error reading message"); + e.printStackTrace(); + } + + System.out.println("Message: " + data); + + + if (data != null && data.equals("That's all, folks!")) + { + finish = true; + } + } + + public boolean isFinished() + { + return finish; + } + + /** + * This sends 10 messages to the + * amq.direct exchange using the + * routing key as "routing_key" + * + */ + public static void main(String[] args) + { + // Create connection + Connection con = Client.createConnection(); + try + { + con.connect("localhost", 5672, "test", "guest", "guest"); + } + catch(Exception e) + { + System.out.print("Error connecting to broker"); + e.printStackTrace(); + } + + // Create session + Session session = con.createSession(0); + + // Create an instance of the listener + Listener listener = new Listener(); + + // create a subscription + session.messageSubscribe("message_queue", + "listener_destination", + Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, + Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, + new MessagePartListenerAdapter(listener), null); + + + // issue credits + session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_MESSAGE, 11); + + // confirm completion + session.sync(); + + // check to see if we have received all the messages + while (!listener.isFinished()){} + System.out.println("Shutting down listener for listener_destination"); + session.messageCancel("listener_destination"); + + //cleanup + session.sessionClose(); + + try + { + con.close(); + } + catch(Exception e) + { + System.out.print("Error closing broker connection"); + e.printStackTrace(); + } + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java new file mode 100755 index 0000000000..d9573b0425 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java @@ -0,0 +1,51 @@ +package org.apache.qpid.example.amqpexample.fannout; + +import org.apache.qpidity.nclient.Client; +import org.apache.qpidity.nclient.Connection; +import org.apache.qpidity.nclient.Session; + +/** + * This creates a queue a queue and binds it to the + * amq.direct exchange + * + */ +public class DeclareQueue +{ + + public static void main(String[] args) + { + // Create connection + Connection con = Client.createConnection(); + try + { + con.connect("localhost", 5672, "test", "guest", "guest"); + } + catch(Exception e) + { + System.out.print("Error connecting to broker"); + e.printStackTrace(); + } + + // Create session + Session session = con.createSession(0); + + // declare and bind queue + session.queueDeclare("message_queue", null, null); + session.queueBind("message_queue", "amq.fanout",null, null); + + // confirm completion + session.sync(); + + //cleanup + session.sessionClose(); + try + { + con.close(); + } + catch(Exception e) + { + System.out.print("Error closing broker connection"); + e.printStackTrace(); + } + } +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java new file mode 100755 index 0000000000..752d973998 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java @@ -0,0 +1,62 @@ +package org.apache.qpid.example.amqpexample.fannout; + +import org.apache.qpidity.nclient.Client; +import org.apache.qpidity.nclient.Connection; +import org.apache.qpidity.nclient.Session; +import org.apache.qpidity.transport.DeliveryProperties; + +public class FannoutProducer +{ + /** + * This sends 10 messages to the + * amq.fannout exchange + */ + public static void main(String[] args) + { + // Create connection + Connection con = Client.createConnection(); + try + { + con.connect("localhost", 5672, "test", "guest", "guest"); + } + catch(Exception e) + { + System.out.print("Error connecting to broker"); + e.printStackTrace(); + } + + // Create session + Session session = con.createSession(0); + DeliveryProperties deliveryProps = new DeliveryProperties(); + deliveryProps.setRoutingKey("routing_key"); + + for (int i=0; i<10; i++) + { + session.messageTransfer("amq.fanout", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.header(deliveryProps); + session.data("Message " + i); + session.endData(); + } + + session.messageTransfer("amq.fanout", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.header(deliveryProps); + session.data("That's all, folks!"); + session.endData(); + + // confirm completion + session.sync(); + + //cleanup + session.sessionClose(); + try + { + con.close(); + } + catch(Exception e) + { + System.out.print("Error closing broker connection"); + e.printStackTrace(); + } + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java new file mode 100755 index 0000000000..3fada3422c --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java @@ -0,0 +1,110 @@ +package org.apache.qpid.example.amqpexample.fannout; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.api.Message; +import org.apache.qpidity.nclient.Client; +import org.apache.qpidity.nclient.Connection; +import org.apache.qpidity.nclient.Session; +import org.apache.qpidity.nclient.util.MessageListener; +import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; + +/** + * This listens to messages on a queue and terminates + * when it sees the final message + * + */ +public class Listener implements MessageListener +{ + boolean finish = false; + + public void onMessage(Message m) + { + String data = null; + + try + { + ByteBuffer buf = m.readData(); + byte[] b = new byte[buf.remaining()]; + buf.get(b); + data = new String(b); + } + catch(Exception e) + { + System.out.print("Error reading message"); + e.printStackTrace(); + } + + System.out.println("Message: " + data); + + if (data != null && data.equals("That's all, folks!")) + { + finish = true; + } + } + + public boolean isFinished() + { + return finish; + } + + /** + * This sends 10 messages to the + * amq.direct exchange using the + * routing key as "routing_key" + * + */ + public static void main(String[] args) + { + // Create connection + Connection con = Client.createConnection(); + try + { + con.connect("localhost", 5672, "test", "guest", "guest"); + } + catch(Exception e) + { + System.out.print("Error connecting to broker"); + e.printStackTrace(); + } + + // Create session + Session session = con.createSession(0); + + // Create an instance of the listener + Listener listener = new Listener(); + + // create a subscription + session.messageSubscribe("message_queue", + "listener_destination", + Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, + Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, + new MessagePartListenerAdapter(listener), null); + + + // issue credits + session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_MESSAGE, 11); + + // confirm completion + session.sync(); + + // check to see if we have received all the messages + while (!listener.isFinished()){} + System.out.println("Shutting down listener for listener_destination"); + session.messageCancel("listener_destination"); + + //cleanup + session.sessionClose(); + try + { + con.close(); + } + catch(Exception e) + { + System.out.print("Error closing broker connection"); + e.printStackTrace(); + } + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java new file mode 100755 index 0000000000..e5c560860e --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java @@ -0,0 +1,121 @@ +package org.apache.qpid.example.amqpexample.pubsub; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.api.Message; +import org.apache.qpidity.nclient.Client; +import org.apache.qpidity.nclient.Connection; +import org.apache.qpidity.nclient.Session; +import org.apache.qpidity.nclient.util.MessageListener; +import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; +import org.apache.qpidity.transport.Option; + + +public class TopicListener implements MessageListener +{ + boolean finish = false; + int count = 0; + + public void onMessage(Message m) + { + String data = null; + + try + { + ByteBuffer buf = m.readData(); + byte[] b = new byte[buf.remaining()]; + buf.get(b); + data = new String(b); + } + catch(Exception e) + { + System.out.print("Error reading message"); + e.printStackTrace(); + } + + System.out.println("Message: " + data + " with routing_key " + m.getDeliveryProperties().getRoutingKey()); + + if (data != null && data.equals("That's all, folks!")) + { + count++; + if (count == 4){ + finish = true; + } + } + } + + public void prepareQueue(Session session,String queueName,String routingKey) + { + session.queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE); + session.queueBind(queueName, "amq.topic", routingKey, null); + session.queueBind(queueName, "amq.topic", "control", null); + + session.messageSubscribe(queueName,queueName, + Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, + Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, + new MessagePartListenerAdapter(this), + null, Option.NO_OPTION); + // issue credits + session.messageFlow(queueName, Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES); + session.messageFlow(queueName, Session.MESSAGE_FLOW_UNIT_MESSAGE, 24); + } + + public void cancelSubscription(Session session,String dest) + { + session.messageCancel(dest); + } + + public boolean isFinished() + { + return finish; + } + + public static void main(String[] args) + { + // Create connection + Connection con = Client.createConnection(); + try + { + con.connect("localhost", 5672, "test", "guest", "guest"); + } + catch(Exception e) + { + System.out.print("Error connecting to broker"); + e.printStackTrace(); + } + + // Create session + Session session = con.createSession(0); + + // Create an instance of the listener + TopicListener listener = new TopicListener(); + + listener.prepareQueue(session,"usa", "usa.#"); + listener.prepareQueue(session,"europe", "europe.#"); + listener.prepareQueue(session,"news", "#.news"); + listener.prepareQueue(session,"weather", "#.weather"); + + // confirm completion + session.sync(); + // check to see if we have received all the messages + while (!listener.isFinished()){} + System.out.println("Shutting down listener for listener_destination"); + listener.cancelSubscription(session,"usa"); + listener.cancelSubscription(session,"europe"); + listener.cancelSubscription(session,"news"); + listener.cancelSubscription(session,"weather"); + + //cleanup + session.sessionClose(); + try + { + con.close(); + } + catch(Exception e) + { + System.out.print("Error closing broker connection"); + e.printStackTrace(); + } + } + +} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java new file mode 100755 index 0000000000..b33a6b967b --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java @@ -0,0 +1,75 @@ +package org.apache.qpid.example.amqpexample.pubsub; + +import org.apache.qpidity.nclient.Client; +import org.apache.qpidity.nclient.Connection; +import org.apache.qpidity.nclient.Session; +import org.apache.qpidity.transport.DeliveryProperties; + +public class TopicPublisher +{ + public void publishMessages(Session session, String routing_key) + { + // Set the routing key once, we'll use the same routing key for all + // messages. + + DeliveryProperties deliveryProps = new DeliveryProperties(); + deliveryProps.setRoutingKey(routing_key); + + for (int i=0; i<5; i++) { + session.messageTransfer("amq.topic", Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.header(deliveryProps); + session.data("Message " + i); + session.endData(); + } + + } + + public void noMoreMessages(Session session) + { + session.messageTransfer("amq.topic", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); + session.header(new DeliveryProperties().setRoutingKey("control")); + session.data("That's all, folks!"); + session.endData(); + } + + public static void main(String[] args) + { + // Create connection + Connection con = Client.createConnection(); + try + { + con.connect("localhost", 5672, "test", "guest", "guest"); + } + catch(Exception e) + { + System.out.print("Error connecting to broker"); + e.printStackTrace(); + } + + // Create session + Session session = con.createSession(0); + + // Create an instance of the listener + TopicPublisher publisher = new TopicPublisher(); + + publisher.publishMessages(session, "usa.news"); + publisher.publishMessages(session, "usa.weather"); + publisher.publishMessages(session, "europe.news"); + publisher.publishMessages(session, "europe.weather"); + + // confirm completion + session.sync(); + + //cleanup + session.sessionClose(); + try + { + con.close(); + } + catch(Exception e) + { + System.out.print("Error closing broker connection"); + e.printStackTrace(); + } + } +} |
