From 874ac4d63c3fab96e73ca9327617c26d51681c73 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 15 Aug 2007 23:29:13 +0000 Subject: Added initial ConnectionFactory support to JMS Rearranged package structure for qpid client Addded javadoc support for qpid client git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566403 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpidity/client/Client.java | 6 + .../org/apache/qpidity/client/ClientSession.java | 122 -------------------- .../qpidity/client/ClientSessionDelegate.java | 85 -------------- .../java/org/apache/qpidity/client/DemoClient.java | 84 -------------- .../apache/qpidity/client/MessagePartListener.java | 2 +- .../java/org/apache/qpidity/client/Session.java | 7 +- .../apache/qpidity/client/impl/ClientSession.java | 125 +++++++++++++++++++++ .../qpidity/client/impl/ClientSessionDelegate.java | 86 ++++++++++++++ .../org/apache/qpidity/client/impl/DemoClient.java | 89 +++++++++++++++ .../qpidity/client/impl/LargeMsgDemoClient.java | 74 ++++++++++++ .../client/util/MessagePartListenerAdapter.java | 2 +- .../apache/qpidity/jms/ConnectionFactoryImpl.java | 100 +++++++++++++++++ .../org/apache/qpidity/jms/ConnectionImpl.java | 41 +++++-- .../java/org/apache/qpidity/jms/QueueImpl.java | 2 +- .../org/apache/qpidity/jms/XAConnectionImpl.java | 5 + 15 files changed, 528 insertions(+), 302 deletions(-) delete mode 100644 java/client/src/main/java/org/apache/qpidity/client/ClientSession.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java delete mode 100644 java/client/src/main/java/org/apache/qpidity/client/DemoClient.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java create mode 100644 java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpidity/client/Client.java b/java/client/src/main/java/org/apache/qpidity/client/Client.java index b440561b66..25bebb4ae5 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Client.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Client.java @@ -14,6 +14,8 @@ import org.apache.qpidity.ErrorCode; import org.apache.qpidity.MinaHandler; import org.apache.qpidity.QpidException; import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.client.impl.ClientSession; +import org.apache.qpidity.client.impl.ClientSessionDelegate; public class Client implements org.apache.qpidity.client.Connection @@ -23,6 +25,10 @@ public class Client implements org.apache.qpidity.client.Connection private ExceptionListener _exceptionListner; private final Lock _lock = new ReentrantLock(); + /** + * + * @return returns a new connection to the broker. + */ public static org.apache.qpidity.client.Connection createConnection() { return new Client(); diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java deleted file mode 100644 index 13f3eeb1b6..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java +++ /dev/null @@ -1,122 +0,0 @@ -package org.apache.qpidity.client; - -import java.io.EOFException; -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.qpidity.Option; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.api.Message; - -/** - * Implements a Qpid Sesion. - */ -public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session -{ - private Map _messageListeners = new HashMap(); - private ExceptionListener _exceptionListner; - private RangeSet _acquiredMessages; - private RangeSet _rejectedMessages; - - @Override public void sessionClose() - { - super.sessionClose(); - } - - public void messageAcknowledge(RangeSet ranges) - { - for (Range range : ranges) - { - for (long l = range.getLower(); l <= range.getUpper(); l++) - { - System.out.println("Acknowleding message for : " + super.getCommand((int) l)); - super.processed(l); - } - } - } - - public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map filter, Option... options) - { - setMessageListener(destination,listener); - super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); - } - - public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) throws IOException - { - // The javadoc clearly says that this method is suitable for small messages - // therefore reading the content in one shot. - super.messageTransfer(destination, confirmMode, acquireMode); - super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); - super.data(msg.readData()); - super.endData(); - } - - public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException - { - super.messageTransfer(destination, confirmMode, acquireMode); - super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); - boolean b = true; - int count = 0; - while(b) - { - try - { - System.out.println("count : " + count++); - super.data(msg.readData()); - } - catch(EOFException e) - { - b = false; - } - } - - super.endData(); - } - - public RangeSet getAccquiredMessages() - { - return _acquiredMessages; - } - - public RangeSet getRejectedMessages() - { - return _rejectedMessages; - } - - public void setMessageListener(String destination, MessagePartListener listener) - { - _messageListeners.put(destination, listener); - } - - public void setExceptionListener(ExceptionListener exceptionListner) - { - _exceptionListner = exceptionListner; - } - - // ugly but nessacery - - void setAccquiredMessages(RangeSet acquiredMessages) - { - _acquiredMessages = acquiredMessages; - } - - void setRejectedMessages(RangeSet rejectedMessages) - { - _rejectedMessages = rejectedMessages; - } - - void notifyException(QpidException ex) - { - _exceptionListner.onException(ex); - } - - Map getMessageListerners() - { - return _messageListeners; - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java deleted file mode 100644 index 769fc3aeaa..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java +++ /dev/null @@ -1,85 +0,0 @@ -package org.apache.qpidity.client; - -import java.nio.ByteBuffer; - -import org.apache.qpidity.ErrorCode; -import org.apache.qpidity.Frame; -import org.apache.qpidity.MessageAcquired; -import org.apache.qpidity.MessageReject; -import org.apache.qpidity.MessageTransfer; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.Range; -import org.apache.qpidity.RangeSet; -import org.apache.qpidity.Session; -import org.apache.qpidity.SessionClosed; -import org.apache.qpidity.SessionDelegate; -import org.apache.qpidity.Struct; - - -public class ClientSessionDelegate extends SessionDelegate -{ - private MessageTransfer _currentTransfer; - private MessagePartListener _currentMessageListener; - - @Override public void sessionClosed(Session ssn,SessionClosed sessionClosed) - { - ((ClientSession)ssn).notifyException(new QpidException(sessionClosed.getReplyText(),ErrorCode.get(sessionClosed.getReplyCode()),null)); - } - - // -------------------------------------------- - // Message methods - // -------------------------------------------- - @Override public void data(Session ssn, Frame frame) - { - for (ByteBuffer b : frame) - { - _currentMessageListener.addData(b); - } - if (frame.isLastSegment() && frame.isLastFrame()) - { - _currentMessageListener.messageReceived(); - } - - } - - @Override public void headers(Session ssn, Struct... headers) - { - _currentMessageListener.messageHeaders(headers); - } - - - @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) - { - _currentTransfer = currentTransfer; - _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); - _currentMessageListener.messageTransfer(currentTransfer.getId()); - - //a better way is to tell the broker to stop the transfer - if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) - { - RangeSet transfers = new RangeSet(); - transfers.add(_currentTransfer.getId()); - session.messageRelease(transfers); - } - } - - - @Override public void messageReject(Session session, MessageReject struct) - { - for (Range range : struct.getTransfers()) - { - for (long l = range.getLower(); l <= range.getUpper(); l++) - { - System.out.println("message rejected: " + - session.getCommand((int) l)); - } - } - ((ClientSession)session).setRejectedMessages(struct.getTransfers()); - ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null)); - } - - @Override public void messageAcquired(Session session, MessageAcquired struct) - { - ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); - } -} diff --git a/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java deleted file mode 100644 index e46065e0a0..0000000000 --- a/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java +++ /dev/null @@ -1,84 +0,0 @@ -package org.apache.qpidity.client; - -import org.apache.qpidity.DeliveryProperties; -import org.apache.qpidity.MessageProperties; -import org.apache.qpidity.QpidException; -import org.apache.qpidity.api.Message; -import org.apache.qpidity.client.util.MessagePartListenerAdapter; - -public class DemoClient -{ - public static MessagePartListenerAdapter createAdapter() - { - return new MessagePartListenerAdapter(new MessageListener() - { - public void onMessage(Message m) - { - System.out.println("\n================== Received Msg =================="); - System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); - System.out.println(m.toString()); - System.out.println("================== End Msg ==================\n"); - } - - }); - } - - public static final void main(String[] args) - { - Connection conn = Client.createConnection(); - try{ - conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); - }catch(Exception e){ - e.printStackTrace(); - } - - Session ssn = conn.createSession(50000); - ssn.setExceptionListener(new ExceptionListener() - { - public void onException(QpidException e) - { - System.out.println(e); - } - }); - ssn.queueDeclare("queue1", null, null); - ssn.queueBind("queue1", "amq.direct", "queue1",null); - ssn.sync(); - - ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); - - // queue - ssn.messageTransfer("amq.direct", (short) 0, (short) 1); - ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123")); - ssn.data("this is the data"); - ssn.endData(); - - //reject - ssn.messageTransfer("amq.direct", (short) 0, (short) 1); - ssn.data("this should be rejected"); - ssn.headers(new DeliveryProperties().setRoutingKey("stocks")); - ssn.endData(); - ssn.sync(); - - // topic subs - ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null); - ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null); - ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null); - ssn.sync(); - - ssn.queueDeclare("topic1", null, null); - ssn.queueBind("topic1", "amq.topic", "stock.*",null); - ssn.queueDeclare("topic2", null, null); - ssn.queueBind("topic2", "amq.topic", "stock.us.*",null); - ssn.queueDeclare("topic3", null, null); - ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null); - ssn.sync(); - - // topic - ssn.messageTransfer("amq.topic", (short) 0, (short) 1); - ssn.data("Topic message"); - ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456")); - ssn.endData(); - ssn.sync(); - } - -} diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java index 273b9b899a..4ccef6df55 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java +++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java @@ -54,7 +54,7 @@ public interface MessagePartListener * * @param data Data to be added or streamed. */ - public void addData(ByteBuffer src); + public void data(ByteBuffer src); /** * Indicates that the message has been fully received. diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index e4f2ae217c..dea6a01da6 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -336,6 +336,9 @@ public interface Session */ public void messageReject(RangeSet ranges, int code, String text); + /** + * @return the rejected message ranges + */ public RangeSet getRejectedMessages(); /** @@ -350,7 +353,9 @@ public interface Session */ public void messageAcquire(RangeSet ranges, short mode); - + /** + * @return returns the message ranges marked by the broker as acquired. + */ public RangeSet getAccquiredMessages(); /** diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java new file mode 100644 index 0000000000..8059633cab --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java @@ -0,0 +1,125 @@ +package org.apache.qpidity.client.impl; + +import java.io.EOFException; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.qpidity.Option; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.MessagePartListener; +import org.apache.qpidity.client.Session; + +/** + * Implements a Qpid Sesion. + */ +public class ClientSession extends org.apache.qpidity.Session implements org.apache.qpidity.client.Session +{ + private Map _messageListeners = new HashMap(); + private ExceptionListener _exceptionListner; + private RangeSet _acquiredMessages; + private RangeSet _rejectedMessages; + + @Override public void sessionClose() + { + super.sessionClose(); + } + + public void messageAcknowledge(RangeSet ranges) + { + for (Range range : ranges) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("Acknowleding message for : " + super.getCommand((int) l)); + super.processed(l); + } + } + } + + public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map filter, Option... options) + { + setMessageListener(destination,listener); + super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); + } + + public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) throws IOException + { + // The javadoc clearly says that this method is suitable for small messages + // therefore reading the content in one shot. + super.messageTransfer(destination, confirmMode, acquireMode); + super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + super.data(msg.readData()); + super.endData(); + } + + public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException + { + super.messageTransfer(destination, confirmMode, acquireMode); + super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + boolean b = true; + int count = 0; + while(b) + { + try + { + System.out.println("count : " + count++); + super.data(msg.readData()); + } + catch(EOFException e) + { + b = false; + } + } + + super.endData(); + } + + public RangeSet getAccquiredMessages() + { + return _acquiredMessages; + } + + public RangeSet getRejectedMessages() + { + return _rejectedMessages; + } + + public void setMessageListener(String destination, MessagePartListener listener) + { + _messageListeners.put(destination, listener); + } + + public void setExceptionListener(ExceptionListener exceptionListner) + { + _exceptionListner = exceptionListner; + } + + // ugly but nessacery + + void setAccquiredMessages(RangeSet acquiredMessages) + { + _acquiredMessages = acquiredMessages; + } + + void setRejectedMessages(RangeSet rejectedMessages) + { + _rejectedMessages = rejectedMessages; + } + + void notifyException(QpidException ex) + { + _exceptionListner.onException(ex); + } + + Map getMessageListerners() + { + return _messageListeners; + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java new file mode 100644 index 0000000000..dc72f1f975 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java @@ -0,0 +1,86 @@ +package org.apache.qpidity.client.impl; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.ErrorCode; +import org.apache.qpidity.Frame; +import org.apache.qpidity.MessageAcquired; +import org.apache.qpidity.MessageReject; +import org.apache.qpidity.MessageTransfer; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.Range; +import org.apache.qpidity.RangeSet; +import org.apache.qpidity.Session; +import org.apache.qpidity.SessionClosed; +import org.apache.qpidity.SessionDelegate; +import org.apache.qpidity.Struct; +import org.apache.qpidity.client.MessagePartListener; + + +public class ClientSessionDelegate extends SessionDelegate +{ + private MessageTransfer _currentTransfer; + private MessagePartListener _currentMessageListener; + + @Override public void sessionClosed(Session ssn,SessionClosed sessionClosed) + { + ((ClientSession)ssn).notifyException(new QpidException(sessionClosed.getReplyText(),ErrorCode.get(sessionClosed.getReplyCode()),null)); + } + + // -------------------------------------------- + // Message methods + // -------------------------------------------- + @Override public void data(Session ssn, Frame frame) + { + for (ByteBuffer b : frame) + { + _currentMessageListener.data(b); + } + if (frame.isLastSegment() && frame.isLastFrame()) + { + _currentMessageListener.messageReceived(); + } + + } + + @Override public void headers(Session ssn, Struct... headers) + { + _currentMessageListener.messageHeaders(headers); + } + + + @Override public void messageTransfer(Session session, MessageTransfer currentTransfer) + { + _currentTransfer = currentTransfer; + _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); + _currentMessageListener.messageTransfer(currentTransfer.getId()); + + //a better way is to tell the broker to stop the transfer + if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) + { + RangeSet transfers = new RangeSet(); + transfers.add(_currentTransfer.getId()); + session.messageRelease(transfers); + } + } + + + @Override public void messageReject(Session session, MessageReject struct) + { + for (Range range : struct.getTransfers()) + { + for (long l = range.getLower(); l <= range.getUpper(); l++) + { + System.out.println("message rejected: " + + session.getCommand((int) l)); + } + } + ((ClientSession)session).setRejectedMessages(struct.getTransfers()); + ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null)); + } + + @Override public void messageAcquired(Session session, MessageAcquired struct) + { + ((ClientSession)session).setAccquiredMessages(struct.getTransfers()); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java new file mode 100644 index 0000000000..e2962b4f22 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java @@ -0,0 +1,89 @@ +package org.apache.qpidity.client.impl; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.Client; +import org.apache.qpidity.client.Connection; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.MessageListener; +import org.apache.qpidity.client.Session; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; + +public class DemoClient +{ + public static MessagePartListenerAdapter createAdapter() + { + return new MessagePartListenerAdapter(new MessageListener() + { + public void onMessage(Message m) + { + System.out.println("\n================== Received Msg =================="); + System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); + System.out.println(m.toString()); + System.out.println("================== End Msg ==================\n"); + } + + }); + } + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setExceptionListener(new ExceptionListener() + { + public void onException(QpidException e) + { + System.out.println(e); + } + }); + ssn.queueDeclare("queue1", null, null); + ssn.queueBind("queue1", "amq.direct", "queue1",null); + ssn.sync(); + + ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); + + // queue + ssn.messageTransfer("amq.direct", (short) 0, (short) 1); + ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123")); + ssn.data("this is the data"); + ssn.endData(); + + //reject + ssn.messageTransfer("amq.direct", (short) 0, (short) 1); + ssn.data("this should be rejected"); + ssn.headers(new DeliveryProperties().setRoutingKey("stocks")); + ssn.endData(); + ssn.sync(); + + // topic subs + ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null); + ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null); + ssn.sync(); + + ssn.queueDeclare("topic1", null, null); + ssn.queueBind("topic1", "amq.topic", "stock.*",null); + ssn.queueDeclare("topic2", null, null); + ssn.queueBind("topic2", "amq.topic", "stock.us.*",null); + ssn.queueDeclare("topic3", null, null); + ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null); + ssn.sync(); + + // topic + ssn.messageTransfer("amq.topic", (short) 0, (short) 1); + ssn.data("Topic message"); + ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456")); + ssn.endData(); + ssn.sync(); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java new file mode 100644 index 0000000000..38a7b36403 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java @@ -0,0 +1,74 @@ +package org.apache.qpidity.client.impl; + +import java.io.FileInputStream; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.QpidException; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.client.Client; +import org.apache.qpidity.client.Connection; +import org.apache.qpidity.client.ExceptionListener; +import org.apache.qpidity.client.MessageListener; +import org.apache.qpidity.client.Session; +import org.apache.qpidity.client.util.FileMessage; +import org.apache.qpidity.client.util.MessagePartListenerAdapter; + +public class LargeMsgDemoClient +{ + public static MessagePartListenerAdapter createAdapter() + { + return new MessagePartListenerAdapter(new MessageListener() + { + public void onMessage(Message m) + { + System.out.println("\n================== Received Msg =================="); + System.out.println("Message Id : " + m.getMessageProperties().getMessageId()); + System.out.println(m.toString()); + System.out.println("================== End Msg ==================\n"); + } + + }); + } + + public static final void main(String[] args) + { + Connection conn = Client.createConnection(); + try{ + conn.connect("0.0.0.0", 5672, "test", "guest", "guest"); + }catch(Exception e){ + e.printStackTrace(); + } + + Session ssn = conn.createSession(50000); + ssn.setExceptionListener(new ExceptionListener() + { + public void onException(QpidException e) + { + System.out.println(e); + } + }); + ssn.queueDeclare("queue1", null, null); + ssn.queueBind("queue1", "amq.direct", "queue1",null); + ssn.sync(); + + ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null); + + try + { + FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"), + 1024, + new DeliveryProperties().setRoutingKey("queue1"), + new MessageProperties().setMessageId("123")); + + // queue + ssn.messageStream("amq.direct",msg, (short) 0, (short) 1); + ssn.sync(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java index 9e4cf00c87..c4b8ae3f8b 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java @@ -32,7 +32,7 @@ public class MessagePartListenerAdapter implements MessagePartListener _currentMsg = new ByteBufferMessage(transferId); } - public void addData(ByteBuffer src) + public void data(ByteBuffer src) { try { diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java new file mode 100644 index 0000000000..6a786e3edb --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java @@ -0,0 +1,100 @@ +package org.apache.qpidity.jms; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + +import org.apache.qpidity.QpidException; + +public class ConnectionFactoryImpl implements ConnectionFactory,QueueConnectionFactory, TopicConnectionFactory, Referenceable +{ + private String _host; + private int _port; + private String _defaultUsername; + private String _defaultPassword; + private String _virtualPath; + private String _url; + + // Undefined at the moment + public ConnectionFactoryImpl(String url) + { + _url = url; + } + + public ConnectionFactoryImpl(String host,int port,String virtualHost,String defaultUsername,String defaultPassword) + { + _host = host; + _port = port; + _defaultUsername = defaultUsername; + _defaultPassword = defaultPassword; + _virtualPath = virtualHost; + } + + public Connection createConnection() throws JMSException + { + try + { + return new ConnectionImpl(_host,_port,_virtualPath,_defaultUsername,_defaultPassword); + } + catch(QpidException e) + { + // need to convert the qpid exception into jms exception + throw new JMSException("",""); + } + } + + public Connection createConnection(String username, String password) throws JMSException + { + try + { + return new ConnectionImpl(_host,_port,_virtualPath,username,password); + } + catch(QpidException e) + { + // need to convert the qpid exception into jms exception + throw new JMSException("",""); + } + } + + // ---------------------------------------- + // Support for JMS 1.0 classes + // ---------------------------------------- + public QueueConnection createQueueConnection() throws JMSException + { + return (QueueConnection) createConnection(); + } + + public QueueConnection createQueueConnection(String username, String password) throws JMSException + { + return (QueueConnection) createConnection(username, password); + } + + public TopicConnection createTopicConnection() throws JMSException + { + return (TopicConnection) createConnection(); + } + + public TopicConnection createTopicConnection(String username, String password) throws JMSException + { + return (TopicConnection) createConnection(username, password); + } + + + // ---------------------------------------- + // Support for JNDI + // ---------------------------------------- + public Reference getReference() throws NamingException + { + return new Reference( ConnectionFactoryImpl.class.getName(), + new StringRefAddr(ConnectionFactoryImpl.class.getName(),_url)); + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java index 3b6153c487..656a8cddd3 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java @@ -17,19 +17,38 @@ */ package org.apache.qpidity.jms; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.qpidity.QpidException; +import java.util.Vector; -import javax.jms.*; +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; -import java.util.Vector; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; +import javax.naming.NamingException; +import javax.naming.Reference; +import javax.naming.Referenceable; +import javax.naming.StringRefAddr; + +import org.apache.qpidity.QpidException; +import org.apache.qpidity.client.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implements javax.jms.Connection, javax.jms.QueueConnection adn javax.jms.TopicConnection */ -public class ConnectionImpl implements Connection, QueueConnection, TopicConnection +public class ConnectionImpl implements Connection, QueueConnection, TopicConnection, Referenceable { /** * This class's logger @@ -95,8 +114,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect /** * TODO define the parameters */ - public ConnectionImpl() + public ConnectionImpl(String host,int port,String virtualHost,String username,String password) throws QpidException { + _qpidConnection = Client.createConnection(); + _qpidConnection.connect(host, port, virtualHost, username, password); } //---- Interface javax.jms.Connection ---// @@ -478,4 +499,10 @@ public class ConnectionImpl implements Connection, QueueConnection, TopicConnect { return _qpidConnection; } + + public Reference getReference() throws NamingException + { + return new Reference( ConnectionImpl.class.getName(), + new StringRefAddr(ConnectionImpl.class.getName(),"")); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java index b95790486a..ce1d11c7e5 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java @@ -43,7 +43,7 @@ public class QueueImpl extends DestinationImpl implements Queue { super(session, name); _exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME; - _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS; + _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS; _queueName = name; // check that this queue exist on the server // As pasive is set the server will not create the queue. diff --git a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java index e76e566efd..3863adb07a 100644 --- a/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java +++ b/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java @@ -28,6 +28,11 @@ import javax.jms.XASession; */ public class XAConnectionImpl extends ConnectionImpl implements XAConnection { + public XAConnectionImpl(String host, int port, String virtualHost, String username, String password) throws QpidException + { + super(host, port, virtualHost, username, password); + } + /** * Creates an XASession. * -- cgit v1.2.1