summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-01 22:22:54 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-01 22:22:54 +0000
commit41ef35032bfd2f0631a82a687fc96038f77449aa (patch)
treea6598f8b0c3c3734c13721375e08f590320fc162 /java
parentf52d071f19a662bfd06adeed4abde98a4b43b131 (diff)
downloadqpid-python-41ef35032bfd2f0631a82a687fc96038f77449aa.tar.gz
StreamingListenerAdapter - was added to adapt the StreamingMessageListener into a MessageListener by assembling the message parts underneath and then when the message is complete it will call onMessage() in the MessageListener
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@561976 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java59
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/SessionDelegate.java56
-rw-r--r--java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java38
3 files changed, 47 insertions, 106 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
index efa9f03527..f6a52ec14a 100644
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
@@ -1,63 +1,22 @@
package org.apache.qpid.nclient.impl;
-import org.apache.qpid.nclient.api.Message;
-import org.apache.qpid.nclient.api.MessageReceiver;
-import org.apache.qpidity.Header;
+import java.util.Map;
+
+import org.apache.qpid.nclient.api.MessageListener;
import org.apache.qpidity.Option;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Session;
public class ClientSession extends Session implements org.apache.qpid.nclient.api.Session
{
- /**
- * ---------------------------------------------------
- * Message methods
- * ---------------------------------------------------
- */
- /*public MessageSender createSender(String queueName) throws QpidException
- {
- return null;
- }*/
-
- public MessageReceiver createReceiver(String queueName, Option... options) throws QpidException
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- public void setTransacted() throws QpidException, IllegalStateException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageBody(byte[] src) throws QpidException
+ public void addMessageListener(String destination,MessageListener listener)
{
- // TODO Auto-generated method stub
-
+ super.addMessageListener(destination, new StreamingListenerAdapter(listener));
}
-
- public void messageClose() throws QpidException
+
+ //temproary until rafi updates the xml when the new message stuff is voted in.
+ public void messageSubscribe(String queue, String destination, Map<String, ?> filter, Option... _options) throws QpidException
{
- // TODO Auto-generated method stub
-
+ // TODO Auto-generated method stub
}
-
- public void messageHeaders(Header... headers) throws QpidException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageTransfer(String destination, Message msg) throws QpidException
- {
- // TODO Auto-generated method stub
-
- }
-
- public void messageTransfer(Option... options) throws QpidException
- {
- // TODO Auto-generated method stub
-
- }
}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/SessionDelegate.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/SessionDelegate.java
deleted file mode 100644
index ad4f6ce117..0000000000
--- a/java/client/src/main/java/org/apache/qpid/nclient/impl/SessionDelegate.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.qpid.nclient.impl;
-
-import org.apache.qpidity.CommonSessionDelegate;
-import org.apache.qpidity.Delegate;
-import org.apache.qpidity.ExchangeDeclare;
-import org.apache.qpidity.ExchangeDelete;
-import org.apache.qpidity.ExchangeQuery;
-import org.apache.qpidity.ExchangeQueryOk;
-import org.apache.qpidity.QueueBind;
-import org.apache.qpidity.QueueDeclare;
-import org.apache.qpidity.QueueDeclareOk;
-import org.apache.qpidity.QueueDelete;
-import org.apache.qpidity.QueueDeleteOk;
-import org.apache.qpidity.QueuePurge;
-import org.apache.qpidity.QueuePurgeOk;
-import org.apache.qpidity.QueueUnbind;
-import org.apache.qpidity.Session;
-
-
-public class SessionDelegate extends CommonSessionDelegate
-{
-
- /**
- * --------------------------------------------
- * Exchange related functionality
- * --------------------------------------------
- */
- public void exchangeDeclare(Session session, ExchangeDeclare struct) {}
-
- public void exchangeDelete(Session session, ExchangeDelete struct) {}
-
- public void exchangeQuery(Session session, ExchangeQuery struct) {}
-
- public void exchangeQueryOk(Session session, ExchangeQueryOk struct) {}
-
- /**
- * --------------------------------------------
- * Queue related functionality
- * --------------------------------------------
- */
- public void queueDeclare(Session session, QueueDeclare struct) {}
-
- public void queueDeclareOk(Session session, QueueDeclareOk struct) {}
-
- public void queueBind(Session session, QueueBind struct) {}
-
- public void queueUnbind(Session session, QueueUnbind struct) {}
-
- public void queuePurge(Session session, QueuePurge struct) {}
-
- public void queuePurgeOk(Session session, QueuePurgeOk struct) {}
-
- public void queueDelete(Session session, QueueDelete struct) {}
-
- public void queueDeleteOk(Session session, QueueDeleteOk struct) {}
-}
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java
new file mode 100644
index 0000000000..b8f04ca0df
--- /dev/null
+++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java
@@ -0,0 +1,38 @@
+package org.apache.qpid.nclient.impl;
+
+import org.apache.qpid.nclient.api.MessageListener;
+import org.apache.qpidity.Header;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.api.StreamingMessageListener;
+
+public class StreamingListenerAdapter implements StreamingMessageListener
+{
+ MessageListener _adaptee;
+ Message _currentMsg;
+
+ public StreamingListenerAdapter(MessageListener l)
+ {
+ _adaptee = l;
+ }
+
+ public void data(byte[] src)
+ {
+ _currentMsg.appendData(src);
+ }
+
+ public void endData()
+ {
+ _adaptee.onMessage(_currentMsg);
+ }
+
+ public void messageHeaders(Header... headers)
+ {
+ //_currentMsg add the headers
+ }
+
+ public void messageTransfer(String destination, Option... options)
+ {
+ // _currentMsg create message from factory
+ }
+}