diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-15 13:56:45 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-08-15 13:56:45 +0000 |
| commit | 8e3751d921d2915b4edb57beb7a4db66c02963bd (patch) | |
| tree | a7402fd5c38d0c6261841e837f11d0131fb09d2d /java/client/src | |
| parent | df9e676a9c6b326aa239f5d7ee1f458db96dabad (diff) | |
| download | qpid-python-8e3751d921d2915b4edb57beb7a4db66c02963bd.tar.gz | |
Added clearData() and getTransferId() to the Message interface
Added Java doc
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566155 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
7 files changed, 66 insertions, 9 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java index cd7b66111d..769fc3aeaa 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java @@ -52,6 +52,7 @@ public class ClientSessionDelegate extends SessionDelegate { _currentTransfer = currentTransfer; _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); + _currentMessageListener.messageTransfer(currentTransfer.getId()); //a better way is to tell the broker to stop the transfer if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java index def7b5ca41..273b9b899a 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java +++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java @@ -35,6 +35,13 @@ import org.apache.qpidity.Struct; public interface MessagePartListener { /** + * Indicates the Message transfer has started. + * + * @param transferId + */ + public void messageTransfer(long transferId); + + /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. * diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java index d67b32d019..e06d4d3a1e 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -23,20 +23,39 @@ public class ByteBufferMessage implements Message { private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>(); private ByteBuffer _readBuffer; - private int dataSize; + private int _dataSize; private DeliveryProperties _currentDeliveryProps; private MessageProperties _currentMessageProps; + private long _transferId; + public ByteBufferMessage(long transferId) + { + _transferId = transferId; + } + + public long getMessageTransferId() + { + return _transferId; + } + public void clearData() + { + _data = new LinkedList<ByteBuffer>(); + _readBuffer = null; + } + public void appendData(byte[] src) throws IOException { appendData(ByteBuffer.wrap(src)); } + /** + * write the data from the current position up to the buffer limit + */ public void appendData(ByteBuffer src) throws IOException { _data.offer(src); - dataSize += src.remaining(); + _dataSize += src.remaining(); } public DeliveryProperties getDeliveryProperties() @@ -88,7 +107,7 @@ public class ByteBufferMessage implements Message } else { - _readBuffer = ByteBuffer.allocate(dataSize); + _readBuffer = ByteBuffer.allocate(_dataSize); for(ByteBuffer buf:_data) { _readBuffer.put(buf); @@ -104,7 +123,7 @@ public class ByteBufferMessage implements Message buildReadBuffer(); } ByteBuffer temp = _readBuffer.duplicate(); - byte[] b = new byte[temp.limit()]; + byte[] b = new byte[temp.remaining()]; temp.get(b); return new String(b); } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java index 8f62334fcd..73e71ed84d 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java @@ -73,4 +73,15 @@ public class FileMessage extends ReadOnlyMessage implements Message return bb; } + /** + * This message is used by an application user to + * provide data to the client library using pull style + * semantics. Since the message is not transfered yet, it + * does not have a transfer id. Hence this method is not + * applicable to this implementation. + */ + public long getMessageTransferId() + { + throw new UnsupportedOperationException(); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java index 88f950eb5d..9e4cf00c87 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java @@ -24,8 +24,12 @@ public class MessagePartListenerAdapter implements MessagePartListener public MessagePartListenerAdapter(MessageListener listener) { - _adaptee = listener; - _currentMsg = new ByteBufferMessage(); + _adaptee = listener; + } + + public void messageTransfer(long transferId) + { + _currentMsg = new ByteBufferMessage(transferId); } public void addData(ByteBuffer src) diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java index d6b4b65942..2527856798 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java @@ -29,6 +29,10 @@ public abstract class ReadOnlyMessage implements Message public MessageProperties getMessageProperties() { return _messageProperties; - } - + } + + public void clearData() + { + throw new UnsupportedOperationException("This Message is read only after the initial source, cannot clear data"); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java index 9dfab40721..23089c7931 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java @@ -44,5 +44,16 @@ public class StreamingMessage extends ReadOnlyMessage implements Message return _readBuf.duplicate(); } - + + /** + * This message is used by an application user to + * provide data to the client library using pull style + * semantics. Since the message is not transfered yet, it + * does not have a transfer id. Hence this method is not + * applicable to this implementation. + */ + public long getMessageTransferId() + { + throw new UnsupportedOperationException(); + } } |
