diff options
Diffstat (limited to 'java/client/example/src')
5 files changed, 69 insertions, 230 deletions
diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java index 0db54af3b6..7c27051fb2 100755 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java +++ b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java @@ -1,18 +1,49 @@ package org.apache.qpid.example.amqpexample.direct; +import java.nio.ByteBuffer; + +import org.apache.qpidity.api.Message; import org.apache.qpidity.nclient.Client; import org.apache.qpidity.nclient.Connection; import org.apache.qpidity.nclient.Session; +import org.apache.qpidity.nclient.util.MessageListener; import org.apache.qpidity.transport.DeliveryProperties; -public class DirectProducer +public class DirectProducer implements MessageListener { - /** - * This sends 10 messages to the - * amq.direct exchange using the - * routing key as "routing_key" - * - */ + boolean finish = false; + + public void onMessage(Message m) + { + String data = null; + + try + { + ByteBuffer buf = m.readData(); + byte[] b = new byte[buf.remaining()]; + buf.get(b); + data = new String(b); + } + catch(Exception e) + { + System.out.print("Error reading message"); + e.printStackTrace(); + } + + System.out.println("Message: " + data); + + + if (data != null && data.equals("That's all, folks!")) + { + finish = true; + } + } + + public boolean isFinished() + { + return finish; + } + public static void main(String[] args) { // Create connection diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java deleted file mode 100755 index d9573b0425..0000000000 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java +++ /dev/null @@ -1,51 +0,0 @@ -package org.apache.qpid.example.amqpexample.fannout; - -import org.apache.qpidity.nclient.Client; -import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.Session; - -/** - * This creates a queue a queue and binds it to the - * amq.direct exchange - * - */ -public class DeclareQueue -{ - - public static void main(String[] args) - { - // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } - - // Create session - Session session = con.createSession(0); - - // declare and bind queue - session.queueDeclare("message_queue", null, null); - session.queueBind("message_queue", "amq.fanout",null, null); - - // confirm completion - session.sync(); - - //cleanup - session.sessionClose(); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } - } -} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java deleted file mode 100755 index 752d973998..0000000000 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.apache.qpid.example.amqpexample.fannout; - -import org.apache.qpidity.nclient.Client; -import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.Session; -import org.apache.qpidity.transport.DeliveryProperties; - -public class FannoutProducer -{ - /** - * This sends 10 messages to the - * amq.fannout exchange - */ - public static void main(String[] args) - { - // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } - - // Create session - Session session = con.createSession(0); - DeliveryProperties deliveryProps = new DeliveryProperties(); - deliveryProps.setRoutingKey("routing_key"); - - for (int i=0; i<10; i++) - { - session.messageTransfer("amq.fanout", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); - session.header(deliveryProps); - session.data("Message " + i); - session.endData(); - } - - session.messageTransfer("amq.fanout", Session.TRANSFER_CONFIRM_MODE_REQUIRED, Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); - session.header(deliveryProps); - session.data("That's all, folks!"); - session.endData(); - - // confirm completion - session.sync(); - - //cleanup - session.sessionClose(); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } - } - -} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java b/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java deleted file mode 100755 index 3fada3422c..0000000000 --- a/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java +++ /dev/null @@ -1,110 +0,0 @@ -package org.apache.qpid.example.amqpexample.fannout; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.api.Message; -import org.apache.qpidity.nclient.Client; -import org.apache.qpidity.nclient.Connection; -import org.apache.qpidity.nclient.Session; -import org.apache.qpidity.nclient.util.MessageListener; -import org.apache.qpidity.nclient.util.MessagePartListenerAdapter; - -/** - * This listens to messages on a queue and terminates - * when it sees the final message - * - */ -public class Listener implements MessageListener -{ - boolean finish = false; - - public void onMessage(Message m) - { - String data = null; - - try - { - ByteBuffer buf = m.readData(); - byte[] b = new byte[buf.remaining()]; - buf.get(b); - data = new String(b); - } - catch(Exception e) - { - System.out.print("Error reading message"); - e.printStackTrace(); - } - - System.out.println("Message: " + data); - - if (data != null && data.equals("That's all, folks!")) - { - finish = true; - } - } - - public boolean isFinished() - { - return finish; - } - - /** - * This sends 10 messages to the - * amq.direct exchange using the - * routing key as "routing_key" - * - */ - public static void main(String[] args) - { - // Create connection - Connection con = Client.createConnection(); - try - { - con.connect("localhost", 5672, "test", "guest", "guest"); - } - catch(Exception e) - { - System.out.print("Error connecting to broker"); - e.printStackTrace(); - } - - // Create session - Session session = con.createSession(0); - - // Create an instance of the listener - Listener listener = new Listener(); - - // create a subscription - session.messageSubscribe("message_queue", - "listener_destination", - Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, - Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE, - new MessagePartListenerAdapter(listener), null); - - - // issue credits - session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES); - session.messageFlow("listener_destination", Session.MESSAGE_FLOW_UNIT_MESSAGE, 11); - - // confirm completion - session.sync(); - - // check to see if we have received all the messages - while (!listener.isFinished()){} - System.out.println("Shutting down listener for listener_destination"); - session.messageCancel("listener_destination"); - - //cleanup - session.sessionClose(); - try - { - con.close(); - } - catch(Exception e) - { - System.out.print("Error closing broker connection"); - e.printStackTrace(); - } - } - -} diff --git a/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties new file mode 100644 index 0000000000..1d428d26d5 --- /dev/null +++ b/java/client/example/src/main/java/org/apache/qpid/example/jmsexample/direct/direct.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory + +# use the following property to configure the default connector +#java.naming.provider.url - ignored. + +# register some connection factories +# connectionfactory.[jndiname] = [ConnectionURL] +connectionfactory.local = qpid:password=pass;username=name@tcp:localhost:5672 + +# Register an AMQP destination in JNDI +# NOTE: Qpid currently only supports direct,topics and headers +# destination.[jniName] = [BindingURL] +destination.directQueue = direct://amq.direct//message_queue?routingkey='routing_key'
\ No newline at end of file |
