diff options
Diffstat (limited to 'java')
3 files changed, 47 insertions, 106 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java index efa9f03527..f6a52ec14a 100644 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java @@ -1,63 +1,22 @@ package org.apache.qpid.nclient.impl; -import org.apache.qpid.nclient.api.Message; -import org.apache.qpid.nclient.api.MessageReceiver; -import org.apache.qpidity.Header; +import java.util.Map; + +import org.apache.qpid.nclient.api.MessageListener; import org.apache.qpidity.Option; import org.apache.qpidity.QpidException; import org.apache.qpidity.Session; public class ClientSession extends Session implements org.apache.qpid.nclient.api.Session { - /** - * --------------------------------------------------- - * Message methods - * --------------------------------------------------- - */ - /*public MessageSender createSender(String queueName) throws QpidException - { - return null; - }*/ - - public MessageReceiver createReceiver(String queueName, Option... options) throws QpidException - { - // TODO Auto-generated method stub - return null; - } - - public void setTransacted() throws QpidException, IllegalStateException - { - // TODO Auto-generated method stub - - } - - public void messageBody(byte[] src) throws QpidException + public void addMessageListener(String destination,MessageListener listener) { - // TODO Auto-generated method stub - + super.addMessageListener(destination, new StreamingListenerAdapter(listener)); } - - public void messageClose() throws QpidException + + //temproary until rafi updates the xml when the new message stuff is voted in. + public void messageSubscribe(String queue, String destination, Map<String, ?> filter, Option... _options) throws QpidException { - // TODO Auto-generated method stub - + // TODO Auto-generated method stub } - - public void messageHeaders(Header... headers) throws QpidException - { - // TODO Auto-generated method stub - - } - - public void messageTransfer(String destination, Message msg) throws QpidException - { - // TODO Auto-generated method stub - - } - - public void messageTransfer(Option... options) throws QpidException - { - // TODO Auto-generated method stub - - } } diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/SessionDelegate.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/SessionDelegate.java deleted file mode 100644 index ad4f6ce117..0000000000 --- a/java/client/src/main/java/org/apache/qpid/nclient/impl/SessionDelegate.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.apache.qpid.nclient.impl; - -import org.apache.qpidity.CommonSessionDelegate; -import org.apache.qpidity.Delegate; -import org.apache.qpidity.ExchangeDeclare; -import org.apache.qpidity.ExchangeDelete; -import org.apache.qpidity.ExchangeQuery; -import org.apache.qpidity.ExchangeQueryOk; -import org.apache.qpidity.QueueBind; -import org.apache.qpidity.QueueDeclare; -import org.apache.qpidity.QueueDeclareOk; -import org.apache.qpidity.QueueDelete; -import org.apache.qpidity.QueueDeleteOk; -import org.apache.qpidity.QueuePurge; -import org.apache.qpidity.QueuePurgeOk; -import org.apache.qpidity.QueueUnbind; -import org.apache.qpidity.Session; - - -public class SessionDelegate extends CommonSessionDelegate -{ - - /** - * -------------------------------------------- - * Exchange related functionality - * -------------------------------------------- - */ - public void exchangeDeclare(Session session, ExchangeDeclare struct) {} - - public void exchangeDelete(Session session, ExchangeDelete struct) {} - - public void exchangeQuery(Session session, ExchangeQuery struct) {} - - public void exchangeQueryOk(Session session, ExchangeQueryOk struct) {} - - /** - * -------------------------------------------- - * Queue related functionality - * -------------------------------------------- - */ - public void queueDeclare(Session session, QueueDeclare struct) {} - - public void queueDeclareOk(Session session, QueueDeclareOk struct) {} - - public void queueBind(Session session, QueueBind struct) {} - - public void queueUnbind(Session session, QueueUnbind struct) {} - - public void queuePurge(Session session, QueuePurge struct) {} - - public void queuePurgeOk(Session session, QueuePurgeOk struct) {} - - public void queueDelete(Session session, QueueDelete struct) {} - - public void queueDeleteOk(Session session, QueueDeleteOk struct) {} -} diff --git a/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java b/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java new file mode 100644 index 0000000000..b8f04ca0df --- /dev/null +++ b/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java @@ -0,0 +1,38 @@ +package org.apache.qpid.nclient.impl; + +import org.apache.qpid.nclient.api.MessageListener; +import org.apache.qpidity.Header; +import org.apache.qpidity.Option; +import org.apache.qpidity.api.Message; +import org.apache.qpidity.api.StreamingMessageListener; + +public class StreamingListenerAdapter implements StreamingMessageListener +{ + MessageListener _adaptee; + Message _currentMsg; + + public StreamingListenerAdapter(MessageListener l) + { + _adaptee = l; + } + + public void data(byte[] src) + { + _currentMsg.appendData(src); + } + + public void endData() + { + _adaptee.onMessage(_currentMsg); + } + + public void messageHeaders(Header... headers) + { + //_currentMsg add the headers + } + + public void messageTransfer(String destination, Option... options) + { + // _currentMsg create message from factory + } +} |
