diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-05-22 21:43:13 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-05-22 21:43:13 +0000 |
| commit | 7db4a1119ade3a1e882602c2fc2b689da2497ae7 (patch) | |
| tree | b64bb049e7daec15c86867cbbe087f3992907340 /qpid/java/client | |
| parent | 411d8ab04176a9608548244cb86894eb2760ba4c (diff) | |
| download | qpid-python-7db4a1119ade3a1e882602c2fc2b689da2497ae7.tar.gz | |
Made Range, RangeSet, and Session all use proper RFC1982 comparisons per QPID-861. Also switched command ids from long -> int, and added a mutex to channel to prevent multi-frame commands from interleaving when invoked from separate threads.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@659271 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
8 files changed, 16 insertions, 16 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index d071bcf0c2..202bd90991 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -173,7 +173,7 @@ public class AMQSession_0_10 extends AMQSession { if( messageTag <= deliveryTag ) { - ranges.add(messageTag); + ranges.add((int) (long) messageTag); _unacknowledgedMessageTags.remove(messageTag); } } @@ -182,7 +182,7 @@ public class AMQSession_0_10 extends AMQSession } else { - ranges.add(deliveryTag); + ranges.add((int) deliveryTag); _unacknowledgedMessageTags.remove(deliveryTag); } getQpidSession().messageAcknowledge(ranges); @@ -287,7 +287,7 @@ public class AMQSession_0_10 extends AMQSession { break; } - ranges.add(tag); + ranges.add((int) (long) tag); } getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); // We need to sync so that we get notify of an error. @@ -311,7 +311,7 @@ public class AMQSession_0_10 extends AMQSession break; } - ranges.add(tag); + ranges.add((int) (long) tag); } getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); } @@ -326,7 +326,7 @@ public class AMQSession_0_10 extends AMQSession { // The value of requeue is always true RangeSet ranges = new RangeSet(); - ranges.add(deliveryTag); + ranges.add((int) deliveryTag); getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); //I don't think we need to sync } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index fcc21428e9..f050cbe455 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -358,7 +358,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By if (!_preAcquire) { RangeSet ranges = new RangeSet(); - ranges.add(message.getDeliveryTag()); + ranges.add((int) message.getDeliveryTag()); _0_10session.getQpidSession().messageAcknowledge(ranges); _0_10session.getCurrentException(); } @@ -375,7 +375,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By if (_preAcquire) { RangeSet ranges = new RangeSet(); - ranges.add(message.getDeliveryTag()); + ranges.add((int) message.getDeliveryTag()); _0_10session.getQpidSession().messageRelease(ranges); _0_10session.getCurrentException(); } @@ -394,7 +394,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By if (!_preAcquire) { RangeSet ranges = new RangeSet(); - ranges.add(message.getDeliveryTag()); + ranges.add((int) message.getDeliveryTag()); Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get(); diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java index ecdd2d7952..19e12adc3c 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java @@ -39,7 +39,7 @@ public interface MessagePartListener * * @param transferId */ - public void messageTransfer(long transferId); + public void messageTransfer(int transferId); /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java index 56443e2aeb..833f905b58 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java @@ -27,7 +27,7 @@ public class ByteBufferMessage implements Message private int _dataSize; private DeliveryProperties _currentDeliveryProps; private MessageProperties _currentMessageProps; - private long _transferId; + private int _transferId; private Header _header; public void setHeader(Header header) { @@ -44,12 +44,12 @@ public class ByteBufferMessage implements Message _currentMessageProps = new MessageProperties(); } - public ByteBufferMessage(long transferId) + public ByteBufferMessage(int transferId) { _transferId = transferId; } - public long getMessageTransferId() + public int getMessageTransferId() { return _transferId; } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java index 308a16ce36..289d03574d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java @@ -89,7 +89,7 @@ public class FileMessage extends ReadOnlyMessage implements Message * does not have a transfer id. Hence this method is not * applicable to this implementation. */ - public long getMessageTransferId() + public int getMessageTransferId() { throw new UnsupportedOperationException(); } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java index 885841bc2a..757d44fbbb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java @@ -26,7 +26,7 @@ public class MessagePartListenerAdapter implements MessagePartListener _adaptee = listener; } - public void messageTransfer(long transferId) + public void messageTransfer(int transferId) { _currentMsg = new ByteBufferMessage(transferId); } diff --git a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java index fd3e812cbc..6c7f9e9db7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java @@ -61,7 +61,7 @@ public class StreamingMessage extends ReadOnlyMessage implements Message * does not have a transfer id. Hence this method is not * applicable to this implementation. */ - public long getMessageTransferId() + public int getMessageTransferId() { throw new UnsupportedOperationException(); } diff --git a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index a49765baaf..74c0098d72 100644 --- a/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -199,7 +199,7 @@ public class ChannelCloseOkTest extends QpidTestCase private void waitFor(List<Message> received, int count) throws InterruptedException { - long timeout = 6000; + long timeout = 20000; synchronized (received) { |
