From dae146867b5794cc86a7b3d8c162dbabd7ab9d22 Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Mon, 13 Aug 2007 16:41:34 +0000 Subject: Added support for message handling. Sending Messages ------------------- ByteBufferMessage for small messages - data will be in memory. FileMessage and StreamingMessage for sending large messages. Receiving Messages ------------------- For small messages u can use MessageListener and receive ByteBufferMessage. You need to use the MessageListener with the MessagePartListenerAdapter. For large messages it is recomended to use MessagePartListener. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@565407 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpidity/client/ClientSession.java | 34 ++++++- .../java/org/apache/qpidity/client/Session.java | 45 ++++++++- .../qpidity/client/util/ByteBufferMessage.java | 111 +++++++++++++++++++++ .../apache/qpidity/client/util/FileMessage.java | 41 ++------ .../client/util/MessagePartListenerAdapter.java | 93 +---------------- .../qpidity/client/util/ReadOnlyMessage.java | 34 +++++++ .../qpidity/client/util/StreamingMessage.java | 48 +++++++++ 7 files changed, 277 insertions(+), 129 deletions(-) create mode 100644 java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java create mode 100644 java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java (limited to 'java/client') diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java index fcde60fa04..b5d8add9e2 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java +++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java @@ -1,5 +1,7 @@ package org.apache.qpidity.client; +import java.io.EOFException; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,15 +47,37 @@ public class ClientSession extends org.apache.qpidity.Session implements org.apa super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options); } - public void messageTransfer(String exchange, Message msg, short confirmMode, short acquireMode) + public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) throws IOException { - // need to break it down into small pieces - super.messageTransfer(exchange, confirmMode, acquireMode); + // The javadoc clearly says that this method is suitable for small messages + // therefore reading the content in one shot. + super.messageTransfer(destination, confirmMode, acquireMode); super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); - // super.data(bytes); * - // super.endData() + super.data(msg.readData()); + super.endData(); } + public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException + { + super.messageTransfer(destination, confirmMode, acquireMode); + super.headers(msg.getDeliveryProperties(),msg.getMessageProperties()); + boolean b = true; + int count = 0; + while(b) + { + try + { + System.out.println("count : " + count++); + super.data(msg.readData()); + } + catch(EOFException e) + { + b = false; + } + } + + super.endData(); + } public RangeSet getAccquiredMessages() { diff --git a/java/client/src/main/java/org/apache/qpidity/client/Session.java b/java/client/src/main/java/org/apache/qpidity/client/Session.java index 09595c8d0b..e4f2ae217c 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/Session.java +++ b/java/client/src/main/java/org/apache/qpidity/client/Session.java @@ -18,6 +18,7 @@ */ package org.apache.qpidity.client; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; @@ -72,7 +73,47 @@ public interface Session //------------------------------------------------------ /** * Transfer the given message to a specified exchange. + * + *

This is a convinience method for providing a complete message + * using a single method which internaly is mapped to messageTransfer(), headers() followed + * by data() and an endData(). + * This method should only be used by small messages

* + * @param destination The exchange the message is being sent. + * @param msg The Message to be sent + * @param confirmMode + * @param acquireMode + */ + public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode)throws IOException; + + /** + *

This is a convinience method for streaming a message using pull semantics + * using a single method as opposed to doing a push using messageTransfer(), headers() followed + * by a series of data() and an endData().

+ *

Internally data will be pulled from Message object(which wrap's a data stream) using it's read() + * and pushed using messageTransfer(), headers() followed by a series of data() and an endData(). + *
This method should only be used by large messages
+ * There are two convinience Message classes provided to facilitate this. + *

+ * You could also implement a the Message interface to and wrap any + * data stream you want. + *

+ * + * @param destination The exchange the message is being sent. + * @param msg The Message to be sent * @param confirmMode - * @param exchange The exchange the message is being sent. - * @param msg The Message to be sent */ - public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode); + public void messageStream(String destination, Message msg, short confirmMode, short acquireMode)throws IOException; /** * Declare the beginning of a message transfer operation. This operation must diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java new file mode 100644 index 0000000000..d67b32d019 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -0,0 +1,111 @@ +package org.apache.qpidity.client.util; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.Queue; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.api.Message; + +/** + *

A Simple implementation of the message interface + * for small messages. When the readData methods are called + * we assume the message is complete. i.e there want be any + * appendData operations after that.

+ * + *

If you need large message support please see + * FileMessage and StreamingMessage + *

+ */ +public class ByteBufferMessage implements Message +{ + private Queue _data = new LinkedList(); + private ByteBuffer _readBuffer; + private int dataSize; + private DeliveryProperties _currentDeliveryProps; + private MessageProperties _currentMessageProps; + + + public void appendData(byte[] src) throws IOException + { + appendData(ByteBuffer.wrap(src)); + } + + public void appendData(ByteBuffer src) throws IOException + { + _data.offer(src); + dataSize += src.remaining(); + } + + public DeliveryProperties getDeliveryProperties() + { + return _currentDeliveryProps; + } + + public MessageProperties getMessageProperties() + { + return _currentMessageProps; + } + + public void setDeliveryProperties(DeliveryProperties props) + { + _currentDeliveryProps = props; + } + + public void setMessageProperties(MessageProperties props) + { + _currentMessageProps = props; + } + + public void readData(byte[] target) throws IOException + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + + _readBuffer.get(target); + } + + public ByteBuffer readData() throws IOException + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + + return _readBuffer; + } + + private void buildReadBuffer() + { + //optimize for the simple cases + if(_data.size() == 1) + { + _readBuffer = _data.element().duplicate(); + } + else + { + _readBuffer = ByteBuffer.allocate(dataSize); + for(ByteBuffer buf:_data) + { + _readBuffer.put(buf); + } + } + } + + //hack for testing + @Override public String toString() + { + if (_data.size() >0 && _readBuffer == null) + { + buildReadBuffer(); + } + ByteBuffer temp = _readBuffer.duplicate(); + byte[] b = new byte[temp.limit()]; + temp.get(b); + return new String(b); + } +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java index 84f18dcca4..8f62334fcd 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java @@ -1,5 +1,6 @@ package org.apache.qpidity.client.util; +import java.io.EOFException; import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -29,10 +30,8 @@ import org.apache.qpidity.api.Message; * and stream it. * */ -public class FileMessage implements Message +public class FileMessage extends ReadOnlyMessage implements Message { - private MessageProperties _messageProperties; - private DeliveryProperties _deliveryProperties; private FileChannel _fileChannel; private int _chunkSize; private long _fileSize; @@ -52,46 +51,24 @@ public class FileMessage implements Message _chunkSize = (int)_fileSize; } } - - public void appendData(byte[] src) - { - throw new UnsupportedOperationException("This Message is read only after the initial source"); - } - - public void appendData(ByteBuffer src) - { - throw new UnsupportedOperationException("This Message is read only after the initial source"); - } - - public DeliveryProperties getDeliveryProperties() - { - return _deliveryProperties; - } - - public MessageProperties getMessageProperties() - { - return _messageProperties; - } public void readData(byte[] target) throws IOException { - int readLen = target.length <= _chunkSize ? target.length : _chunkSize; - if (_pos + readLen > _fileSize) - { - readLen = (int)(_fileSize - _pos); - } - MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, readLen); - _pos += readLen; - bb.get(target); + throw new UnsupportedOperationException(); } public ByteBuffer readData() throws IOException { + if (_pos == _fileSize) + { + throw new EOFException(); + } + if (_pos + _chunkSize > _fileSize) { _chunkSize = (int)(_fileSize - _pos); } - MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize); + MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize); _pos += _chunkSize; return bb; } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java index 4ff83db939..88f950eb5d 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java @@ -2,13 +2,10 @@ package org.apache.qpidity.client.util; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.Queue; import org.apache.qpidity.DeliveryProperties; import org.apache.qpidity.MessageProperties; import org.apache.qpidity.Struct; -import org.apache.qpidity.api.Message; import org.apache.qpidity.client.MessageListener; import org.apache.qpidity.client.MessagePartListener; @@ -23,94 +20,12 @@ import org.apache.qpidity.client.MessagePartListener; public class MessagePartListenerAdapter implements MessagePartListener { MessageListener _adaptee; - Message _currentMsg; - DeliveryProperties _currentDeliveryProps; - MessageProperties _currentMessageProps; + ByteBufferMessage _currentMsg; public MessagePartListenerAdapter(MessageListener listener) { _adaptee = listener; - - // temp solution. - _currentMsg = new Message() - { - Queue _data = new LinkedList(); - ByteBuffer _readBuffer; - private int dataSize; - - public void appendData(byte[] src) throws IOException - { - appendData(ByteBuffer.wrap(src)); - } - - public void appendData(ByteBuffer src) throws IOException - { - _data.offer(src); - dataSize += src.remaining(); - } - - public DeliveryProperties getDeliveryProperties() - { - return _currentDeliveryProps; - } - - public MessageProperties getMessageProperties() - { - return _currentMessageProps; - } - - // since we provide the message only after completion - // we can assume that when this method is called we have - // received all data. - public void readData(byte[] target) throws IOException - { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - - _readBuffer.get(target); - } - - public ByteBuffer readData() throws IOException - { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - - return _readBuffer; - } - - private void buildReadBuffer() - { - //optimize for the simple cases - if(_data.size() == 1) - { - _readBuffer = _data.element().duplicate(); - } - else - { - _readBuffer = ByteBuffer.allocate(dataSize); - for(ByteBuffer buf:_data) - { - _readBuffer.put(buf); - } - } - } - - //hack for testing - @Override public String toString() - { - if (_data.size() >0 && _readBuffer == null) - { - buildReadBuffer(); - } - byte[] b = new byte[_readBuffer.limit()]; - _readBuffer.get(b); - return new String(b); - } - }; + _currentMsg = new ByteBufferMessage(); } public void addData(ByteBuffer src) @@ -133,11 +48,11 @@ public class MessagePartListenerAdapter implements MessagePartListener { if(struct instanceof DeliveryProperties) { - _currentDeliveryProps = (DeliveryProperties)struct; + _currentMsg.setDeliveryProperties((DeliveryProperties)struct); } else if (struct instanceof MessageProperties) { - _currentMessageProps = (MessageProperties)struct; + _currentMsg.setMessageProperties((MessageProperties)struct); } } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java new file mode 100644 index 0000000000..d6b4b65942 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java @@ -0,0 +1,34 @@ +package org.apache.qpidity.client.util; + +import java.nio.ByteBuffer; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.api.Message; + +public abstract class ReadOnlyMessage implements Message +{ + MessageProperties _messageProperties; + DeliveryProperties _deliveryProperties; + + public void appendData(byte[] src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public void appendData(ByteBuffer src) + { + throw new UnsupportedOperationException("This Message is read only after the initial source"); + } + + public DeliveryProperties getDeliveryProperties() + { + return _deliveryProperties; + } + + public MessageProperties getMessageProperties() + { + return _messageProperties; + } + +} diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java new file mode 100644 index 0000000000..9dfab40721 --- /dev/null +++ b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java @@ -0,0 +1,48 @@ +package org.apache.qpidity.client.util; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.qpidity.DeliveryProperties; +import org.apache.qpidity.MessageProperties; +import org.apache.qpidity.api.Message; + +public class StreamingMessage extends ReadOnlyMessage implements Message +{ + SocketChannel _socChannel; + private int _chunkSize; + private ByteBuffer _readBuf; + + public StreamingMessage(SocketChannel in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException + { + _messageProperties = messageProperties; + _deliveryProperties = deliveryProperties; + + _socChannel = in; + _chunkSize = chunkSize; + _readBuf = ByteBuffer.allocate(_chunkSize); + } + + public void readData(byte[] target) throws IOException + { + throw new UnsupportedOperationException(); + } + + public ByteBuffer readData() throws IOException + { + if(_socChannel.isConnected() && _socChannel.isOpen()) + { + _readBuf.clear(); + _socChannel.read(_readBuf); + } + else + { + throw new EOFException("The underlying socket/channel has closed"); + } + + return _readBuf.duplicate(); + } + +} -- cgit v1.2.1