From 8e3751d921d2915b4edb57beb7a4db66c02963bd Mon Sep 17 00:00:00 2001 From: Rajith Muditha Attapattu Date: Wed, 15 Aug 2007 13:56:45 +0000 Subject: Added clearData() and getTransferId() to the Message interface Added Java doc git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566155 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpidity/client/ClientSessionDelegate.java | 1 + .../apache/qpidity/client/MessagePartListener.java | 7 ++++++ .../qpidity/client/util/ByteBufferMessage.java | 27 ++++++++++++++++++---- .../apache/qpidity/client/util/FileMessage.java | 11 +++++++++ .../client/util/MessagePartListenerAdapter.java | 8 +++++-- .../qpidity/client/util/ReadOnlyMessage.java | 8 +++++-- .../qpidity/client/util/StreamingMessage.java | 13 ++++++++++- 7 files changed, 66 insertions(+), 9 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java index cd7b66111d..769fc3aeaa 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java +++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java @@ -52,6 +52,7 @@ public class ClientSessionDelegate extends SessionDelegate { _currentTransfer = currentTransfer; _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination()); + _currentMessageListener.messageTransfer(currentTransfer.getId()); //a better way is to tell the broker to stop the transfer if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1) diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java index def7b5ca41..273b9b899a 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java +++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java @@ -34,6 +34,13 @@ import org.apache.qpidity.Struct; */ public interface MessagePartListener { + /** + * Indicates the Message transfer has started. + * + * @param transferId + */ + public void messageTransfer(long transferId); + /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received. diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java index d67b32d019..e06d4d3a1e 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java @@ -23,20 +23,39 @@ public class ByteBufferMessage implements Message { private Queue _data = new LinkedList(); private ByteBuffer _readBuffer; - private int dataSize; + private int _dataSize; private DeliveryProperties _currentDeliveryProps; private MessageProperties _currentMessageProps; + private long _transferId; + public ByteBufferMessage(long transferId) + { + _transferId = transferId; + } + + public long getMessageTransferId() + { + return _transferId; + } + public void clearData() + { + _data = new LinkedList(); + _readBuffer = null; + } + public void appendData(byte[] src) throws IOException { appendData(ByteBuffer.wrap(src)); } + /** + * write the data from the current position up to the buffer limit + */ public void appendData(ByteBuffer src) throws IOException { _data.offer(src); - dataSize += src.remaining(); + _dataSize += src.remaining(); } public DeliveryProperties getDeliveryProperties() @@ -88,7 +107,7 @@ public class ByteBufferMessage implements Message } else { - _readBuffer = ByteBuffer.allocate(dataSize); + _readBuffer = ByteBuffer.allocate(_dataSize); for(ByteBuffer buf:_data) { _readBuffer.put(buf); @@ -104,7 +123,7 @@ public class ByteBufferMessage implements Message buildReadBuffer(); } ByteBuffer temp = _readBuffer.duplicate(); - byte[] b = new byte[temp.limit()]; + byte[] b = new byte[temp.remaining()]; temp.get(b); return new String(b); } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java index 8f62334fcd..73e71ed84d 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java @@ -73,4 +73,15 @@ public class FileMessage extends ReadOnlyMessage implements Message return bb; } + /** + * This message is used by an application user to + * provide data to the client library using pull style + * semantics. Since the message is not transfered yet, it + * does not have a transfer id. Hence this method is not + * applicable to this implementation. + */ + public long getMessageTransferId() + { + throw new UnsupportedOperationException(); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java index 88f950eb5d..9e4cf00c87 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java @@ -24,8 +24,12 @@ public class MessagePartListenerAdapter implements MessagePartListener public MessagePartListenerAdapter(MessageListener listener) { - _adaptee = listener; - _currentMsg = new ByteBufferMessage(); + _adaptee = listener; + } + + public void messageTransfer(long transferId) + { + _currentMsg = new ByteBufferMessage(transferId); } public void addData(ByteBuffer src) diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java index d6b4b65942..2527856798 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java @@ -29,6 +29,10 @@ public abstract class ReadOnlyMessage implements Message public MessageProperties getMessageProperties() { return _messageProperties; - } - + } + + public void clearData() + { + throw new UnsupportedOperationException("This Message is read only after the initial source, cannot clear data"); + } } diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java index 9dfab40721..23089c7931 100644 --- a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java @@ -44,5 +44,16 @@ public class StreamingMessage extends ReadOnlyMessage implements Message return _readBuf.duplicate(); } - + + /** + * This message is used by an application user to + * provide data to the client library using pull style + * semantics. Since the message is not transfered yet, it + * does not have a transfer id. Hence this method is not + * applicable to this implementation. + */ + public long getMessageTransferId() + { + throw new UnsupportedOperationException(); + } } -- cgit v1.2.1