summaryrefslogtreecommitdiff
path: root/java/client/example/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/client/example/src')
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java51
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java64
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java112
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java51
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java62
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java110
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java121
-rwxr-xr-xjava/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java75
8 files changed, 646 insertions, 0 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
new file mode 100755
index 0000000000..a9257ccf70
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
@@ -0,0 +1,51 @@
+package org.apache.qpid.example.amqpexample.direct;
+
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+
+/**
+ * This creates a queue a queue and binds it to the
+ * amq.direct exchange
+ *
+ */
+public class DeclareQueue
+{
+
+ public static void main(String[] args)
+ {
+ // Create connection
+ Connection con = Client.createConnection();
+ try
+ {
+ con.connect("localhost", 5672, "test", "guest", "guest");
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error connecting to broker");
+ e.printStackTrace();
+ }
+
+ // Create session
+ Session session = con.createSession(0);
+
+ // declare and bind queue
+ session.queueDeclare("message_queue", null, null);
+ session.queueBind("message_queue", "amq.direct", "routing_key", null);
+
+ // confirm completion
+ session.sync();
+
+ //cleanup
+ session.sessionClose();
+ try
+ {
+ con.close();
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error closing broker connection");
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
new file mode 100755
index 0000000000..0db54af3b6
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
@@ -0,0 +1,64 @@
+package org.apache.qpid.example.amqpexample.direct;
+
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.transport.DeliveryProperties;
+
+public class DirectProducer
+{
+ /**
+ * This sends 10 messages to the
+ * amq.direct exchange using the
+ * routing key as "routing_key"
+ *
+ */
+ public static void main(String[] args)
+ {
+ // Create connection
+ Connection con = Client.createConnection();
+ try
+ {
+ con.connect("localhost", 5672, "test", "guest", "guest");
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error connecting to broker");
+ e.printStackTrace();
+ }
+
+ // Create session
+ Session session = con.createSession(0);
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ deliveryProps.setRoutingKey("routing_key");
+
+ for (int i=0; i<10; i++)
+ {
+ session.messageTransfer("amq.direct", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ session.header(deliveryProps);
+ session.data("Message " + i);
+ session.endData();
+ }
+
+ session.messageTransfer("amq.direct", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ session.header(deliveryProps);
+ session.data("That's all, folks!");
+ session.endData();
+
+ // confirm completion
+ session.sync();
+
+ //cleanup
+ session.sessionClose();
+ try
+ {
+ con.close();
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error closing broker connection");
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
new file mode 100755
index 0000000000..244dbdbeae
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
@@ -0,0 +1,112 @@
+package org.apache.qpid.example.amqpexample.direct;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+
+/**
+ * This listens to messages on a queue and terminates
+ * when it sees the final message
+ *
+ */
+public class Listener implements MessageListener
+{
+ boolean finish = false;
+
+ public void onMessage(Message m)
+ {
+ String data = null;
+
+ try
+ {
+ ByteBuffer buf = m.readData();
+ byte[] b = new byte[buf.remaining()];
+ buf.get(b);
+ data = new String(b);
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error reading message");
+ e.printStackTrace();
+ }
+
+ System.out.println("Message: " + data);
+
+
+ if (data != null && data.equals("That's all, folks!"))
+ {
+ finish = true;
+ }
+ }
+
+ public boolean isFinished()
+ {
+ return finish;
+ }
+
+ /**
+ * This sends 10 messages to the
+ * amq.direct exchange using the
+ * routing key as "routing_key"
+ *
+ */
+ public static void main(String[] args)
+ {
+ // Create connection
+ Connection con = Client.createConnection();
+ try
+ {
+ con.connect("localhost", 5672, "test", "guest", "guest");
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error connecting to broker");
+ e.printStackTrace();
+ }
+
+ // Create session
+ Session session = con.createSession(0);
+
+ // Create an instance of the listener
+ Listener listener = new Listener();
+
+ // create a subscription
+ session.messageSubscribe("message_queue",
+ "listener_destination",
+ Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+ Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+ new MessagePartListenerAdapter(listener), null);
+
+
+ // issue credits
+ session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
+ session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_MESSAGE, 11);
+
+ // confirm completion
+ session.sync();
+
+ // check to see if we have received all the messages
+ while (!listener.isFinished()){}
+ System.out.println("Shutting down listener for listener_destination");
+ session.messageCancel("listener_destination");
+
+ //cleanup
+ session.sessionClose();
+
+ try
+ {
+ con.close();
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error closing broker connection");
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java
new file mode 100755
index 0000000000..d9573b0425
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java
@@ -0,0 +1,51 @@
+package org.apache.qpid.example.amqpexample.fannout;
+
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+
+/**
+ * This creates a queue a queue and binds it to the
+ * amq.direct exchange
+ *
+ */
+public class DeclareQueue
+{
+
+ public static void main(String[] args)
+ {
+ // Create connection
+ Connection con = Client.createConnection();
+ try
+ {
+ con.connect("localhost", 5672, "test", "guest", "guest");
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error connecting to broker");
+ e.printStackTrace();
+ }
+
+ // Create session
+ Session session = con.createSession(0);
+
+ // declare and bind queue
+ session.queueDeclare("message_queue", null, null);
+ session.queueBind("message_queue", "amq.fanout",null, null);
+
+ // confirm completion
+ session.sync();
+
+ //cleanup
+ session.sessionClose();
+ try
+ {
+ con.close();
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error closing broker connection");
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java
new file mode 100755
index 0000000000..752d973998
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java
@@ -0,0 +1,62 @@
+package org.apache.qpid.example.amqpexample.fannout;
+
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.transport.DeliveryProperties;
+
+public class FannoutProducer
+{
+ /**
+ * This sends 10 messages to the
+ * amq.fannout exchange
+ */
+ public static void main(String[] args)
+ {
+ // Create connection
+ Connection con = Client.createConnection();
+ try
+ {
+ con.connect("localhost", 5672, "test", "guest", "guest");
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error connecting to broker");
+ e.printStackTrace();
+ }
+
+ // Create session
+ Session session = con.createSession(0);
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ deliveryProps.setRoutingKey("routing_key");
+
+ for (int i=0; i<10; i++)
+ {
+ session.messageTransfer("amq.fanout", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ session.header(deliveryProps);
+ session.data("Message " + i);
+ session.endData();
+ }
+
+ session.messageTransfer("amq.fanout", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ session.header(deliveryProps);
+ session.data("That's all, folks!");
+ session.endData();
+
+ // confirm completion
+ session.sync();
+
+ //cleanup
+ session.sessionClose();
+ try
+ {
+ con.close();
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error closing broker connection");
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java
new file mode 100755
index 0000000000..3fada3422c
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java
@@ -0,0 +1,110 @@
+package org.apache.qpid.example.amqpexample.fannout;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+
+/**
+ * This listens to messages on a queue and terminates
+ * when it sees the final message
+ *
+ */
+public class Listener implements MessageListener
+{
+ boolean finish = false;
+
+ public void onMessage(Message m)
+ {
+ String data = null;
+
+ try
+ {
+ ByteBuffer buf = m.readData();
+ byte[] b = new byte[buf.remaining()];
+ buf.get(b);
+ data = new String(b);
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error reading message");
+ e.printStackTrace();
+ }
+
+ System.out.println("Message: " + data);
+
+ if (data != null && data.equals("That's all, folks!"))
+ {
+ finish = true;
+ }
+ }
+
+ public boolean isFinished()
+ {
+ return finish;
+ }
+
+ /**
+ * This sends 10 messages to the
+ * amq.direct exchange using the
+ * routing key as "routing_key"
+ *
+ */
+ public static void main(String[] args)
+ {
+ // Create connection
+ Connection con = Client.createConnection();
+ try
+ {
+ con.connect("localhost", 5672, "test", "guest", "guest");
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error connecting to broker");
+ e.printStackTrace();
+ }
+
+ // Create session
+ Session session = con.createSession(0);
+
+ // Create an instance of the listener
+ Listener listener = new Listener();
+
+ // create a subscription
+ session.messageSubscribe("message_queue",
+ "listener_destination",
+ Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+ Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+ new MessagePartListenerAdapter(listener), null);
+
+
+ // issue credits
+ session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
+ session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_MESSAGE, 11);
+
+ // confirm completion
+ session.sync();
+
+ // check to see if we have received all the messages
+ while (!listener.isFinished()){}
+ System.out.println("Shutting down listener for listener_destination");
+ session.messageCancel("listener_destination");
+
+ //cleanup
+ session.sessionClose();
+ try
+ {
+ con.close();
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error closing broker connection");
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
new file mode 100755
index 0000000000..e5c560860e
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
@@ -0,0 +1,121 @@
+package org.apache.qpid.example.amqpexample.pubsub;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.Option;
+
+
+public class TopicListener implements MessageListener
+{
+ boolean finish = false;
+ int count = 0;
+
+ public void onMessage(Message m)
+ {
+ String data = null;
+
+ try
+ {
+ ByteBuffer buf = m.readData();
+ byte[] b = new byte[buf.remaining()];
+ buf.get(b);
+ data = new String(b);
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error reading message");
+ e.printStackTrace();
+ }
+
+ System.out.println("Message: " + data + " with routing_key " + m.getDeliveryProperties().getRoutingKey());
+
+ if (data != null && data.equals("That's all, folks!"))
+ {
+ count++;
+ if (count == 4){
+ finish = true;
+ }
+ }
+ }
+
+ public void prepareQueue(Session session,String queueName,String routingKey)
+ {
+ session.queueDeclare(queueName, null, null, Option.EXCLUSIVE, Option.AUTO_DELETE);
+ session.queueBind(queueName, "amq.topic", routingKey, null);
+ session.queueBind(queueName, "amq.topic", "control", null);
+
+ session.messageSubscribe(queueName,queueName,
+ Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+ Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+ new MessagePartListenerAdapter(this),
+ null, Option.NO_OPTION);
+ // issue credits
+ session.messageFlow(queueName, Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
+ session.messageFlow(queueName, Session.MESSAGE_FLOW_UNIT_MESSAGE, 24);
+ }
+
+ public void cancelSubscription(Session session,String dest)
+ {
+ session.messageCancel(dest);
+ }
+
+ public boolean isFinished()
+ {
+ return finish;
+ }
+
+ public static void main(String[] args)
+ {
+ // Create connection
+ Connection con = Client.createConnection();
+ try
+ {
+ con.connect("localhost", 5672, "test", "guest", "guest");
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error connecting to broker");
+ e.printStackTrace();
+ }
+
+ // Create session
+ Session session = con.createSession(0);
+
+ // Create an instance of the listener
+ TopicListener listener = new TopicListener();
+
+ listener.prepareQueue(session,"usa", "usa.#");
+ listener.prepareQueue(session,"europe", "europe.#");
+ listener.prepareQueue(session,"news", "#.news");
+ listener.prepareQueue(session,"weather", "#.weather");
+
+ // confirm completion
+ session.sync();
+ // check to see if we have received all the messages
+ while (!listener.isFinished()){}
+ System.out.println("Shutting down listener for listener_destination");
+ listener.cancelSubscription(session,"usa");
+ listener.cancelSubscription(session,"europe");
+ listener.cancelSubscription(session,"news");
+ listener.cancelSubscription(session,"weather");
+
+ //cleanup
+ session.sessionClose();
+ try
+ {
+ con.close();
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error closing broker connection");
+ e.printStackTrace();
+ }
+ }
+
+}
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
new file mode 100755
index 0000000000..b33a6b967b
--- /dev/null
+++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
@@ -0,0 +1,75 @@
+package org.apache.qpid.example.amqpexample.pubsub;
+
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.transport.DeliveryProperties;
+
+public class TopicPublisher
+{
+ public void publishMessages(Session session, String routing_key)
+ {
+ // Set the routing key once, we'll use the same routing key for all
+ // messages.
+
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ deliveryProps.setRoutingKey(routing_key);
+
+ for (int i=0; i<5; i++) {
+ session.messageTransfer("amq.topic", Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ session.header(deliveryProps);
+ session.data("Message " + i);
+ session.endData();
+ }
+
+ }
+
+ public void noMoreMessages(Session session)
+ {
+ session.messageTransfer("amq.topic", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+ session.header(new DeliveryProperties().setRoutingKey("control"));
+ session.data("That's all, folks!");
+ session.endData();
+ }
+
+ public static void main(String[] args)
+ {
+ // Create connection
+ Connection con = Client.createConnection();
+ try
+ {
+ con.connect("localhost", 5672, "test", "guest", "guest");
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error connecting to broker");
+ e.printStackTrace();
+ }
+
+ // Create session
+ Session session = con.createSession(0);
+
+ // Create an instance of the listener
+ TopicPublisher publisher = new TopicPublisher();
+
+ publisher.publishMessages(session, "usa.news");
+ publisher.publishMessages(session, "usa.weather");
+ publisher.publishMessages(session, "europe.news");
+ publisher.publishMessages(session, "europe.weather");
+
+ // confirm completion
+ session.sync();
+
+ //cleanup
+ session.sessionClose();
+ try
+ {
+ con.close();
+ }
+ catch(Exception e)
+ {
+ System.out.print("Error closing broker connection");
+ e.printStackTrace();
+ }
+ }
+}