summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-08-15 13:56:45 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-08-15 13:56:45 +0000
commit8e3751d921d2915b4edb57beb7a4db66c02963bd (patch)
treea7402fd5c38d0c6261841e837f11d0131fb09d2d /java/client/src
parentdf9e676a9c6b326aa239f5d7ee1f458db96dabad (diff)
downloadqpid-python-8e3751d921d2915b4edb57beb7a4db66c02963bd.tar.gz
Added clearData() and getTransferId() to the Message interface
Added Java doc git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566155 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java1
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java7
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java27
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java11
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java8
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java8
-rw-r--r--java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java13
7 files changed, 66 insertions, 9 deletions
diff --git a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
index cd7b66111d..769fc3aeaa 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
@@ -52,6 +52,7 @@ public class ClientSessionDelegate extends SessionDelegate
{
_currentTransfer = currentTransfer;
_currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
+ _currentMessageListener.messageTransfer(currentTransfer.getId());
//a better way is to tell the broker to stop the transfer
if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1)
diff --git a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
index def7b5ca41..273b9b899a 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
@@ -35,6 +35,13 @@ import org.apache.qpidity.Struct;
public interface MessagePartListener
{
/**
+ * Indicates the Message transfer has started.
+ *
+ * @param transferId
+ */
+ public void messageTransfer(long transferId);
+
+ /**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
* or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received.
*
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
index d67b32d019..e06d4d3a1e 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
@@ -23,20 +23,39 @@ public class ByteBufferMessage implements Message
{
private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
private ByteBuffer _readBuffer;
- private int dataSize;
+ private int _dataSize;
private DeliveryProperties _currentDeliveryProps;
private MessageProperties _currentMessageProps;
+ private long _transferId;
+ public ByteBufferMessage(long transferId)
+ {
+ _transferId = transferId;
+ }
+
+ public long getMessageTransferId()
+ {
+ return _transferId;
+ }
+ public void clearData()
+ {
+ _data = new LinkedList<ByteBuffer>();
+ _readBuffer = null;
+ }
+
public void appendData(byte[] src) throws IOException
{
appendData(ByteBuffer.wrap(src));
}
+ /**
+ * write the data from the current position up to the buffer limit
+ */
public void appendData(ByteBuffer src) throws IOException
{
_data.offer(src);
- dataSize += src.remaining();
+ _dataSize += src.remaining();
}
public DeliveryProperties getDeliveryProperties()
@@ -88,7 +107,7 @@ public class ByteBufferMessage implements Message
}
else
{
- _readBuffer = ByteBuffer.allocate(dataSize);
+ _readBuffer = ByteBuffer.allocate(_dataSize);
for(ByteBuffer buf:_data)
{
_readBuffer.put(buf);
@@ -104,7 +123,7 @@ public class ByteBufferMessage implements Message
buildReadBuffer();
}
ByteBuffer temp = _readBuffer.duplicate();
- byte[] b = new byte[temp.limit()];
+ byte[] b = new byte[temp.remaining()];
temp.get(b);
return new String(b);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
index 8f62334fcd..73e71ed84d 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
@@ -73,4 +73,15 @@ public class FileMessage extends ReadOnlyMessage implements Message
return bb;
}
+ /**
+ * This message is used by an application user to
+ * provide data to the client library using pull style
+ * semantics. Since the message is not transfered yet, it
+ * does not have a transfer id. Hence this method is not
+ * applicable to this implementation.
+ */
+ public long getMessageTransferId()
+ {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
index 88f950eb5d..9e4cf00c87 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
@@ -24,8 +24,12 @@ public class MessagePartListenerAdapter implements MessagePartListener
public MessagePartListenerAdapter(MessageListener listener)
{
- _adaptee = listener;
- _currentMsg = new ByteBufferMessage();
+ _adaptee = listener;
+ }
+
+ public void messageTransfer(long transferId)
+ {
+ _currentMsg = new ByteBufferMessage(transferId);
}
public void addData(ByteBuffer src)
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
index d6b4b65942..2527856798 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
@@ -29,6 +29,10 @@ public abstract class ReadOnlyMessage implements Message
public MessageProperties getMessageProperties()
{
return _messageProperties;
- }
-
+ }
+
+ public void clearData()
+ {
+ throw new UnsupportedOperationException("This Message is read only after the initial source, cannot clear data");
+ }
}
diff --git a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
index 9dfab40721..23089c7931 100644
--- a/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
@@ -44,5 +44,16 @@ public class StreamingMessage extends ReadOnlyMessage implements Message
return _readBuf.duplicate();
}
-
+
+ /**
+ * This message is used by an application user to
+ * provide data to the client library using pull style
+ * semantics. Since the message is not transfered yet, it
+ * does not have a transfer id. Hence this method is not
+ * applicable to this implementation.
+ */
+ public long getMessageTransferId()
+ {
+ throw new UnsupportedOperationException();
+ }
}