summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-05-22 21:43:13 +0000
committerRafael H. Schloming <rhs@apache.org>2008-05-22 21:43:13 +0000
commit7db4a1119ade3a1e882602c2fc2b689da2497ae7 (patch)
treeb64bb049e7daec15c86867cbbe087f3992907340 /qpid/java/client
parent411d8ab04176a9608548244cb86894eb2760ba4c (diff)
downloadqpid-python-7db4a1119ade3a1e882602c2fc2b689da2497ae7.tar.gz
Made Range, RangeSet, and Session all use proper RFC1982 comparisons per QPID-861. Also switched command ids from long -> int, and added a mutex to channel to prevent multi-frame commands from interleaving when invoked from separate threads.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@659271 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java6
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java2
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java2
-rw-r--r--qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java2
8 files changed, 16 insertions, 16 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index d071bcf0c2..202bd90991 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -173,7 +173,7 @@ public class AMQSession_0_10 extends AMQSession
{
if( messageTag <= deliveryTag )
{
- ranges.add(messageTag);
+ ranges.add((int) (long) messageTag);
_unacknowledgedMessageTags.remove(messageTag);
}
}
@@ -182,7 +182,7 @@ public class AMQSession_0_10 extends AMQSession
}
else
{
- ranges.add(deliveryTag);
+ ranges.add((int) deliveryTag);
_unacknowledgedMessageTags.remove(deliveryTag);
}
getQpidSession().messageAcknowledge(ranges);
@@ -287,7 +287,7 @@ public class AMQSession_0_10 extends AMQSession
{
break;
}
- ranges.add(tag);
+ ranges.add((int) (long) tag);
}
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
// We need to sync so that we get notify of an error.
@@ -311,7 +311,7 @@ public class AMQSession_0_10 extends AMQSession
break;
}
- ranges.add(tag);
+ ranges.add((int) (long) tag);
}
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
}
@@ -326,7 +326,7 @@ public class AMQSession_0_10 extends AMQSession
{
// The value of requeue is always true
RangeSet ranges = new RangeSet();
- ranges.add(deliveryTag);
+ ranges.add((int) deliveryTag);
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
//I don't think we need to sync
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index fcc21428e9..f050cbe455 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -358,7 +358,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getDeliveryTag());
+ ranges.add((int) message.getDeliveryTag());
_0_10session.getQpidSession().messageAcknowledge(ranges);
_0_10session.getCurrentException();
}
@@ -375,7 +375,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
if (_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getDeliveryTag());
+ ranges.add((int) message.getDeliveryTag());
_0_10session.getQpidSession().messageRelease(ranges);
_0_10session.getCurrentException();
}
@@ -394,7 +394,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getDeliveryTag());
+ ranges.add((int) message.getDeliveryTag());
Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
index ecdd2d7952..19e12adc3c 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
@@ -39,7 +39,7 @@ public interface MessagePartListener
*
* @param transferId
*/
- public void messageTransfer(long transferId);
+ public void messageTransfer(int transferId);
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
index 56443e2aeb..833f905b58 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
@@ -27,7 +27,7 @@ public class ByteBufferMessage implements Message
private int _dataSize;
private DeliveryProperties _currentDeliveryProps;
private MessageProperties _currentMessageProps;
- private long _transferId;
+ private int _transferId;
private Header _header;
public void setHeader(Header header) {
@@ -44,12 +44,12 @@ public class ByteBufferMessage implements Message
_currentMessageProps = new MessageProperties();
}
- public ByteBufferMessage(long transferId)
+ public ByteBufferMessage(int transferId)
{
_transferId = transferId;
}
- public long getMessageTransferId()
+ public int getMessageTransferId()
{
return _transferId;
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
index 308a16ce36..289d03574d 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
@@ -89,7 +89,7 @@ public class FileMessage extends ReadOnlyMessage implements Message
* does not have a transfer id. Hence this method is not
* applicable to this implementation.
*/
- public long getMessageTransferId()
+ public int getMessageTransferId()
{
throw new UnsupportedOperationException();
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
index 885841bc2a..757d44fbbb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
@@ -26,7 +26,7 @@ public class MessagePartListenerAdapter implements MessagePartListener
_adaptee = listener;
}
- public void messageTransfer(long transferId)
+ public void messageTransfer(int transferId)
{
_currentMsg = new ByteBufferMessage(transferId);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
index fd3e812cbc..6c7f9e9db7 100644
--- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
+++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
@@ -61,7 +61,7 @@ public class StreamingMessage extends ReadOnlyMessage implements Message
* does not have a transfer id. Hence this method is not
* applicable to this implementation.
*/
- public long getMessageTransferId()
+ public int getMessageTransferId()
{
throw new UnsupportedOperationException();
}
diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
index a49765baaf..74c0098d72 100644
--- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
+++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
@@ -199,7 +199,7 @@ public class ChannelCloseOkTest extends QpidTestCase
private void waitFor(List<Message> received, int count) throws InterruptedException
{
- long timeout = 6000;
+ long timeout = 20000;
synchronized (received)
{