summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-05-22 21:43:13 +0000
committerRafael H. Schloming <rhs@apache.org>2008-05-22 21:43:13 +0000
commit40c23abab70a4c1a5f67e42517323a7964324917 (patch)
tree0632eb5c5cf9e8138de3f7fab2fd2cb0d7e090e3 /java
parent1829eedb1e82a0777a6ea8a180aacef4d89a59f0 (diff)
downloadqpid-python-40c23abab70a4c1a5f67e42517323a7964324917.tar.gz
Made Range, RangeSet, and Session all use proper RFC1982 comparisons per QPID-861. Also switched command ids from long -> int, and added a mutex to channel to prevent multi-frame commands from interleaving when invoked from separate threads.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@659271 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java10
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java6
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java6
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java2
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/util/Serial.java138
-rw-r--r--java/common/src/main/java/org/apache/qpidity/api/Message.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Channel.java18
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Connection.java9
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Method.java6
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java5
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Range.java16
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java7
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/Session.java64
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java4
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java2
-rw-r--r--java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java10
-rw-r--r--java/common/src/test/java/org/apache/qpid/util/SerialTest.java124
22 files changed, 183 insertions, 256 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index d071bcf0c2..202bd90991 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -173,7 +173,7 @@ public class AMQSession_0_10 extends AMQSession
{
if( messageTag <= deliveryTag )
{
- ranges.add(messageTag);
+ ranges.add((int) (long) messageTag);
_unacknowledgedMessageTags.remove(messageTag);
}
}
@@ -182,7 +182,7 @@ public class AMQSession_0_10 extends AMQSession
}
else
{
- ranges.add(deliveryTag);
+ ranges.add((int) deliveryTag);
_unacknowledgedMessageTags.remove(deliveryTag);
}
getQpidSession().messageAcknowledge(ranges);
@@ -287,7 +287,7 @@ public class AMQSession_0_10 extends AMQSession
{
break;
}
- ranges.add(tag);
+ ranges.add((int) (long) tag);
}
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
// We need to sync so that we get notify of an error.
@@ -311,7 +311,7 @@ public class AMQSession_0_10 extends AMQSession
break;
}
- ranges.add(tag);
+ ranges.add((int) (long) tag);
}
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
}
@@ -326,7 +326,7 @@ public class AMQSession_0_10 extends AMQSession
{
// The value of requeue is always true
RangeSet ranges = new RangeSet();
- ranges.add(deliveryTag);
+ ranges.add((int) deliveryTag);
getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
//I don't think we need to sync
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index fcc21428e9..f050cbe455 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -358,7 +358,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getDeliveryTag());
+ ranges.add((int) message.getDeliveryTag());
_0_10session.getQpidSession().messageAcknowledge(ranges);
_0_10session.getCurrentException();
}
@@ -375,7 +375,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
if (_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getDeliveryTag());
+ ranges.add((int) message.getDeliveryTag());
_0_10session.getQpidSession().messageRelease(ranges);
_0_10session.getCurrentException();
}
@@ -394,7 +394,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getDeliveryTag());
+ ranges.add((int) message.getDeliveryTag());
Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get();
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
index ecdd2d7952..19e12adc3c 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
@@ -39,7 +39,7 @@ public interface MessagePartListener
*
* @param transferId
*/
- public void messageTransfer(long transferId);
+ public void messageTransfer(int transferId);
/**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
index 56443e2aeb..833f905b58 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
@@ -27,7 +27,7 @@ public class ByteBufferMessage implements Message
private int _dataSize;
private DeliveryProperties _currentDeliveryProps;
private MessageProperties _currentMessageProps;
- private long _transferId;
+ private int _transferId;
private Header _header;
public void setHeader(Header header) {
@@ -44,12 +44,12 @@ public class ByteBufferMessage implements Message
_currentMessageProps = new MessageProperties();
}
- public ByteBufferMessage(long transferId)
+ public ByteBufferMessage(int transferId)
{
_transferId = transferId;
}
- public long getMessageTransferId()
+ public int getMessageTransferId()
{
return _transferId;
}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
index 308a16ce36..289d03574d 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
@@ -89,7 +89,7 @@ public class FileMessage extends ReadOnlyMessage implements Message
* does not have a transfer id. Hence this method is not
* applicable to this implementation.
*/
- public long getMessageTransferId()
+ public int getMessageTransferId()
{
throw new UnsupportedOperationException();
}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
index 885841bc2a..757d44fbbb 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
@@ -26,7 +26,7 @@ public class MessagePartListenerAdapter implements MessagePartListener
_adaptee = listener;
}
- public void messageTransfer(long transferId)
+ public void messageTransfer(int transferId)
{
_currentMsg = new ByteBufferMessage(transferId);
}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
index fd3e812cbc..6c7f9e9db7 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
@@ -61,7 +61,7 @@ public class StreamingMessage extends ReadOnlyMessage implements Message
* does not have a transfer id. Hence this method is not
* applicable to this implementation.
*/
- public long getMessageTransferId()
+ public int getMessageTransferId()
{
throw new UnsupportedOperationException();
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
index a49765baaf..74c0098d72 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
@@ -199,7 +199,7 @@ public class ChannelCloseOkTest extends QpidTestCase
private void waitFor(List<Message> received, int count) throws InterruptedException
{
- long timeout = 6000;
+ long timeout = 20000;
synchronized (received)
{
diff --git a/java/common/src/main/java/org/apache/qpid/util/Serial.java b/java/common/src/main/java/org/apache/qpid/util/Serial.java
index c828290644..cfc97c3d04 100644
--- a/java/common/src/main/java/org/apache/qpid/util/Serial.java
+++ b/java/common/src/main/java/org/apache/qpid/util/Serial.java
@@ -3,119 +3,75 @@ package org.apache.qpid.util;
import org.apache.qpid.SerialException;
/**
- * This class provides basic
- * serial number arithmetic as defined in RFC 1982.
+ * This class provides basic serial number comparisons as defined in
+ * RFC 1982.
*/
public class Serial
{
- private long _maxIncrement;
- private long _max;
- private long _maxComparison;
-
- public Serial(long serialbits)
- {
- if( serialbits < 2)
- {
- throw new IllegalArgumentException("Meaningful serial number space has SERIAL_BITS >= 2, wrong value "
- + serialbits);
- }
- _max = (long) Math.pow(2.0 , serialbits) - 1;
- _maxIncrement = (long) Math.pow(2.0, serialbits - 1) - 1;
- _maxComparison = (long) Math.pow(2.0, serialbits -1);
- }
/**
* Compares two numbers using serial arithmetic.
*
- * @param serial1 The first serial number
- * @param serial2 The second serial number
- * @return 0 if the 2 serials numbers are equal, a positive number if serial1 is greater
- * than serial2, and a negative number if serial2 is greater than serial1.
- * @throws IllegalArgumentException serial1 or serial2 is out of range
- * @throws SerialException serial1 and serial2 are not comparable.
+ * @param s1 the first serial number
+ * @param s2 the second serial number
+ *
+ * @return a negative integer, zero, or a positive integer as the
+ * first argument is less than, equal to, or greater than the
+ * second
*/
- public int compare(long serial1, long serial2) throws IllegalArgumentException, SerialException
+ public static final int compare(int s1, int s2)
{
- int result;
- if (serial1 < 0 || serial1 > _max)
- {
- throw new IllegalArgumentException(serial1 + " out of range");
- }
- if (serial2 < 0 || serial2 > _max)
- {
- throw new IllegalArgumentException(serial2 + " out of range");
- }
- double diff;
- if( serial1 < serial2 )
- {
- diff = serial2 - serial1;
- if( diff < _maxComparison )
- {
- result = -1;
- }
- else if ( diff > _maxComparison )
- {
- result = 1;
- }
- else
- {
- throw new SerialException("Cannot compare " + serial1 + " and " + serial2);
- }
- }
- else if( serial1 > serial2 )
+ return s1 - s2;
+ }
+
+ public static final boolean lt(int s1, int s2)
+ {
+ return compare(s1, s2) < 0;
+ }
+
+ public static final boolean le(int s1, int s2)
+ {
+ return compare(s1, s2) <= 0;
+ }
+
+ public static final boolean gt(int s1, int s2)
+ {
+ return compare(s1, s2) > 0;
+ }
+
+ public static final boolean ge(int s1, int s2)
+ {
+ return compare(s1, s2) >= 0;
+ }
+
+ public static final boolean eq(int s1, int s2)
+ {
+ return s1 == s2;
+ }
+
+ public static final int min(int s1, int s2)
+ {
+ if (lt(s1, s2))
{
- diff = serial1 - serial2;
- if( diff > _maxComparison )
- {
- result = -1;
- }
- else if( diff < _maxComparison )
- {
- result = 1;
- }
- else
- {
- throw new SerialException("Cannot compare " + serial1 + " and " + serial2);
- }
+ return s1;
}
else
{
- result = 0;
+ return s2;
}
- return result;
}
-
- /**
- * Increments a serial numbers by the addition of a positive integer n,
- * Serial numbers may be incremented by the addition of a positive
- * integer n, where n is taken from the range of integers
- * [0 .. (2^(SERIAL_BITS - 1) - 1)]. For a sequence number s, the
- * result of such an addition, s', is defined as
- * s' = (s + n) modulo (2 ^ SERIAL_BITS)
- * @param serial The serila number to be incremented
- * @param n The integer to be added to the serial number
- * @return The incremented serial number
- * @throws IllegalArgumentException serial number or n is out of range
- */
- public long increment(long serial, long n) throws IllegalArgumentException
+ public static final int max(int s1, int s2)
{
- if (serial < 0 || serial > _max)
- {
- throw new IllegalArgumentException("Serial number: " + serial + " is out of range");
- }
- if( n < 0 || n > _maxIncrement )
+ if (gt(s1, s2))
{
- throw new IllegalArgumentException("Increment: " + n + " is out of range");
+ return s1;
}
- long result = serial + n;
- // apply modulo (2 ^ SERIAL_BITS)
- if(result > _max)
+ else
{
- result = result - _max - 1;
+ return s2;
}
- return result;
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java
index d4b8343ba5..f5488fde52 100644
--- a/java/common/src/main/java/org/apache/qpidity/api/Message.java
+++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java
@@ -121,6 +121,6 @@ public interface Message
*
* @return the message transfer id.
*/
- public long getMessageTransferId();
+ public int getMessageTransferId();
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
index 651402bcb7..ad727676c4 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
@@ -20,13 +20,15 @@
*/
package org.apache.qpidity.transport;
+import org.apache.qpidity.transport.network.Frame;
import org.apache.qpidity.transport.util.Logger;
import java.nio.ByteBuffer;
-import java.util.List;
import java.util.ArrayList;
-
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import static org.apache.qpidity.transport.network.Frame.*;
import static org.apache.qpidity.transport.util.Functions.*;
@@ -51,6 +53,7 @@ public class Channel extends Invoker
// session may be null
private Session session;
+ private Lock commandLock = new ReentrantLock();
private boolean first = true;
private ByteBuffer data = null;
@@ -156,7 +159,17 @@ public class Channel extends Invoker
public void method(Method m)
{
+ if (m.getEncodedTrack() == Frame.L4)
+ {
+ commandLock.lock();
+ }
+
emit(m);
+
+ if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload())
+ {
+ commandLock.unlock();
+ }
}
public void header(Header header)
@@ -190,6 +203,7 @@ public class Channel extends Invoker
emit(new Data(data, first, true));
first = true;
data = null;
+ commandLock.unlock();
}
protected void invoke(Method m)
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
index 56b927f802..7d707ce17b 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
@@ -77,14 +77,14 @@ public class Connection
public void received(ConnectionEvent event)
{
- log.debug("RECV: %s", event);
+ log.debug("RECV: [%s] %s", this, event);
Channel channel = getChannel(event.getChannel());
channel.received(event.getProtocolEvent());
}
public void send(ConnectionEvent event)
{
- log.debug("SEND: %s", event);
+ log.debug("SEND: [%s] %s", this, event);
sender.send(event);
}
@@ -135,4 +135,9 @@ public class Connection
sender.close();
}
+ public String toString()
+ {
+ return String.format("conn:%x", System.identityHashCode(this));
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/java/common/src/main/java/org/apache/qpidity/transport/Method.java
index bc4ec6289d..f2063fe964 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Method.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Method.java
@@ -39,15 +39,15 @@ public abstract class Method extends Struct implements ProtocolEvent
}
// XXX: command subclass?
- private long id;
+ private int id;
private boolean sync = false;
- public final long getId()
+ public final int getId()
{
return id;
}
- void setId(long id)
+ void setId(int id)
{
this.id = id;
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java
index 4d1b83d43e..59fc3d2552 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java
@@ -64,4 +64,9 @@ public class ProtocolError implements NetworkEvent, ProtocolEvent
delegate.error(this);
}
+ public String toString()
+ {
+ return String.format("protocol error: %s", getMessage());
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Range.java b/java/common/src/main/java/org/apache/qpidity/transport/Range.java
index ed745bf5ec..827eade9f8 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Range.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Range.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpidity.transport;
-import static java.lang.Math.*;
+import static org.apache.qpid.util.Serial.*;
/**
@@ -31,28 +31,28 @@ import static java.lang.Math.*;
public class Range
{
- private final long lower;
- private final long upper;
+ private final int lower;
+ private final int upper;
- public Range(long lower, long upper)
+ public Range(int lower, int upper)
{
this.lower = lower;
this.upper = upper;
}
- public long getLower()
+ public int getLower()
{
return lower;
}
- public long getUpper()
+ public int getUpper()
{
return upper;
}
- public boolean includes(long value)
+ public boolean includes(int value)
{
- return lower <= value && value <= upper;
+ return le(lower, value) && le(value, upper);
}
public boolean includes(Range range)
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
index ccbcfa99d0..5097c849ee 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.ListIterator;
import java.util.LinkedList;
+import static org.apache.qpid.util.Serial.*;
/**
* RangeSet
@@ -72,7 +73,7 @@ public class RangeSet implements Iterable<Range>
it.remove();
range = range.span(next);
}
- else if (range.getUpper() < next.getLower())
+ else if (lt(range.getUpper(), next.getLower()))
{
it.previous();
it.add(range);
@@ -83,12 +84,12 @@ public class RangeSet implements Iterable<Range>
it.add(range);
}
- public void add(long lower, long upper)
+ public void add(int lower, int upper)
{
add(new Range(lower, upper));
}
- public void add(long value)
+ public void add(int value)
{
add(value, value);
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
index f6f4062c6d..a61f5c2aa3 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java
@@ -33,6 +33,8 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.qpidity.transport.Option.*;
+import static org.apache.qpidity.transport.util.Functions.*;
+import static org.apache.qpid.util.Serial.*;
/**
* Session
@@ -42,6 +44,11 @@ import static org.apache.qpidity.transport.Option.*;
public class Session extends Invoker
{
+
+ private static final Logger log = Logger.get(Session.class);
+
+ private static boolean ENABLE_REPLAY = false;
+
static
{
String enableReplay = "enable_command_replay";
@@ -54,8 +61,6 @@ public class Session extends Invoker
ENABLE_REPLAY = false;
}
}
- private static boolean ENABLE_REPLAY = false;
- private static final Logger log = Logger.get(Session.class);
private byte[] name;
private long timeout = 60000;
@@ -64,15 +69,15 @@ public class Session extends Invoker
Channel channel;
// incoming command count
- long commandsIn = 0;
+ int commandsIn = 0;
// completed incoming commands
private final RangeSet processed = new RangeSet();
private Range syncPoint = null;
// outgoing command count
- private long commandsOut = 0;
- private Map<Long,Method> commands = new HashMap<Long,Method>();
- private long maxComplete = -1;
+ private int commandsOut = 0;
+ private Map<Integer,Method> commands = new HashMap<Integer,Method>();
+ private int maxComplete = commandsOut - 1;
private AtomicBoolean closed = new AtomicBoolean(false);
@@ -86,22 +91,22 @@ public class Session extends Invoker
return name;
}
- public Map<Long,Method> getOutstandingCommands()
+ public Map<Integer,Method> getOutstandingCommands()
{
return commands;
}
- public long getCommandsOut()
+ public int getCommandsOut()
{
return commandsOut;
}
- public long getCommandsIn()
+ public int getCommandsIn()
{
return commandsIn;
}
- public long nextCommandId()
+ public int nextCommandId()
{
return commandsIn++;
}
@@ -116,12 +121,12 @@ public class Session extends Invoker
processed(command.getId());
}
- public void processed(long command)
+ public void processed(int command)
{
processed(new Range(command, command));
}
- public void processed(long lower, long upper)
+ public void processed(int lower, int upper)
{
processed(new Range(lower, upper));
@@ -155,7 +160,7 @@ public class Session extends Invoker
void syncPoint()
{
- long id = getCommandsIn() - 1;
+ int id = getCommandsIn() - 1;
log.debug("%s synced to %d", this, id);
Range range = new Range(0, id - 1);
boolean flush;
@@ -179,7 +184,7 @@ public class Session extends Invoker
channel.setSession(this);
}
- public Method getCommand(long id)
+ public Method getCommand(int id)
{
synchronized (commands)
{
@@ -187,18 +192,18 @@ public class Session extends Invoker
}
}
- void complete(long lower, long upper)
+ void complete(int lower, int upper)
{
log.debug("%s complete(%d, %d)", this, lower, upper);
synchronized (commands)
{
- for (long id = maxComplete; id <= upper; id++)
+ for (int id = max(maxComplete, lower); le(id, upper); id++)
{
commands.remove(id);
}
- if (lower <= maxComplete + 1)
+ if (le(lower, maxComplete + 1))
{
- maxComplete = Math.max(maxComplete, upper);
+ maxComplete = max(maxComplete, upper);
}
commands.notifyAll();
log.debug("%s commands remaining: %s", this, commands);
@@ -211,7 +216,7 @@ public class Session extends Invoker
{
synchronized (commands)
{
- long next = commandsOut++;
+ int next = commandsOut++;
if (next == 0)
{
sessionCommandPoint(0, 0);
@@ -276,9 +281,9 @@ public class Session extends Invoker
log.debug("%s sync()", this);
synchronized (commands)
{
- long point = commandsOut - 1;
+ int point = commandsOut - 1;
- if (maxComplete < point)
+ if (lt(maxComplete, point))
{
ExecutionSync sync = new ExecutionSync();
sync.setSync(true);
@@ -287,7 +292,7 @@ public class Session extends Invoker
long start = System.currentTimeMillis();
long elapsed = 0;
- while (!closed.get() && elapsed < timeout && maxComplete < point)
+ while (!closed.get() && elapsed < timeout && lt(maxComplete, point))
{
try {
log.debug("%s waiting for[%d]: %d, %s", this, point,
@@ -301,7 +306,7 @@ public class Session extends Invoker
}
}
- if (maxComplete < point)
+ if (lt(maxComplete, point))
{
if (closed.get())
{
@@ -315,10 +320,10 @@ public class Session extends Invoker
}
}
- private Map<Long,ResultFuture<?>> results =
- new HashMap<Long,ResultFuture<?>>();
+ private Map<Integer,ResultFuture<?>> results =
+ new HashMap<Integer,ResultFuture<?>>();
- void result(long command, Struct result)
+ void result(int command, Struct result)
{
ResultFuture<?> future;
synchronized (results)
@@ -332,7 +337,7 @@ public class Session extends Invoker
{
synchronized (commands)
{
- long command = commandsOut;
+ int command = commandsOut;
ResultFuture<T> future = new ResultFuture<T>(klass);
synchronized (results)
{
@@ -446,4 +451,9 @@ public class Session extends Invoker
}
}
+ public String toString()
+ {
+ return String.format("ssn:%s", str(name));
+ }
+
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
index a4c46be89c..7b680171da 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
@@ -208,7 +208,7 @@ abstract class AbstractDecoder implements Decoder
RangeSet ranges = new RangeSet();
for (int i = 0; i < count; i++)
{
- ranges.add(readUint32(), readUint32());
+ ranges.add(readSequenceNo(), readSequenceNo());
}
return ranges;
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
index f68884f812..3596c48956 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
@@ -246,8 +246,8 @@ abstract class AbstractEncoder implements Encoder
writeUint16(ranges.size() * 8);
for (Range range : ranges)
{
- writeUint32(range.getLower());
- writeUint32(range.getUpper());
+ writeSequenceNo(range.getLower());
+ writeSequenceNo(range.getUpper());
}
}
}
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
index c5fb7b9986..8bbbf08b22 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
@@ -150,7 +150,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer>
{
StringBuilder str = new StringBuilder();
str.append(String.format
- ("[%05d %05d %1d %1d %d%d%d%d] ", getChannel(), getSize(),
+ ("[%05d %05d %1d %s %d%d%d%d] ", getChannel(), getSize(),
getTrack(), getType(),
isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0,
isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0));
diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
index 14e756c047..a1d74a51ce 100644
--- a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
+++ b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
@@ -73,4 +73,14 @@ public class Functions
return str.toString();
}
+ public static final String str(byte[] bytes)
+ {
+ return str(ByteBuffer.wrap(bytes));
+ }
+
+ public static final String str(byte[] bytes, int limit)
+ {
+ return str(ByteBuffer.wrap(bytes), limit);
+ }
+
}
diff --git a/java/common/src/test/java/org/apache/qpid/util/SerialTest.java b/java/common/src/test/java/org/apache/qpid/util/SerialTest.java
index 312b1ab9ce..d3ab817b5d 100644
--- a/java/common/src/test/java/org/apache/qpid/util/SerialTest.java
+++ b/java/common/src/test/java/org/apache/qpid/util/SerialTest.java
@@ -13,84 +13,18 @@ public class SerialTest extends TestCase
{
/**
- * The simplest meaningful serial number space has SERIAL_BITS == 2. In
- * this space, the integers that make up the serial number space are 0,
- * 1, 2, and 3. That is, 3 == 2^SERIAL_BITS - 1.
- *
- * In this space, the largest integer that it is meaningful to add to a
- * sequence number is 2^(SERIAL_BITS - 1) - 1, or 1.
- *
- * Then, as defined 0+1 == 1, 1+1 == 2, 2+1 == 3, and 3+1 == 0.
- * Further, 1 > 0, 2 > 1, 3 > 2, and 0 > 3. It is undefined whether
- * 2 > 0 or 0 > 2, and whether 1 > 3 or 3 > 1.
+ * Test the key boundaries where wraparound occurs.
*/
- public void testTrivialSample()
+ public void testBoundaries()
{
- Serial serial = new Serial(2);
- assertEquals( serial.increment(0, 1), 1);
- assertEquals( serial.increment(1, 1), 2);
- assertEquals( serial.increment(2, 1), 3);
- assertEquals( serial.increment(3, 1), 0);
- try
- {
- serial.increment(4, 1);
- fail("IllegalArgumentException was not trhown");
- }
- catch (IllegalArgumentException e)
- {
- // expected
- }
- try
- {
- assertTrue( serial.compare(1, 0) > 0);
- assertTrue( serial.compare(2, 1) > 0);
- assertTrue( serial.compare(3, 2) > 0);
- assertTrue( serial.compare(0, 3) > 0);
- assertTrue( serial.compare(0, 1) < 0);
- assertTrue( serial.compare(1, 2) < 0);
- assertTrue( serial.compare(2, 3) < 0);
- assertTrue( serial.compare(3, 0) < 0);
- }
- catch (SerialException e)
- {
- fail("Unexpected exception " + e);
- }
- try
- {
- serial.compare(2, 0);
- fail("AMQSerialException not thrown as expected");
- }
- catch (SerialException e)
- {
- // expected
- }
- try
- {
- serial.compare(0, 2);
- fail("AMQSerialException not thrown as expected");
- }
- catch (SerialException e)
- {
- // expected
- }
- try
- {
- serial.compare(3, 1);
- fail("AMQSerialException not thrown as expected");
- }
- catch (SerialException e)
- {
- // expected
- }
- try
- {
- serial.compare(3, 1);
- fail("AMQSerialException not thrown as expected");
- }
- catch (SerialException e)
- {
- // expected
- }
+ assertTrue(Serial.gt(1, 0));
+ assertTrue(Serial.lt(0, 1));
+
+ assertTrue(Serial.gt(Integer.MAX_VALUE+1, Integer.MAX_VALUE));
+ assertTrue(Serial.lt(Integer.MAX_VALUE, Integer.MAX_VALUE+1));
+
+ assertTrue(Serial.gt(0xFFFFFFFF + 1, 0xFFFFFFFF));
+ assertTrue(Serial.lt(0xFFFFFFFF, 0xFFFFFFFF + 1));
}
/**
@@ -98,38 +32,30 @@ public class SerialTest extends TestCase
* For any sequence number s and any integer n such that addition of n
* to s is well defined, (s + n) >= s. Further (s + n) == s only when
* n == 0, in all other defined cases, (s + n) > s.
- * strategy:
- * Create a serial number with 32 bits and check in a loop that adding integers
- * respect the corollary
*/
public void testCorollary1()
{
- Serial serial = new Serial(32);
- Random random = new Random();
- long number = random.nextInt((int) Math.pow(2.0 , 32.0) - 1);
- for(int i = 1; i<= 10000; i++ )
+ int wrapcount = 0;
+
+ int s = 0;
+
+ for (int i = 0; i < 67108664; i++)
{
- long nextInt = random.nextInt((int) Math.pow(2.0 , 32.0) - 1);
- long inc = serial.increment(number, nextInt);
- int res =0;
- try
- {
- res=serial.compare(inc, number);
- }
- catch (SerialException e)
- {
- fail("un-expected exception " + e);
- }
- if( res < 1 )
+ for (int n = 1; n < 4096; n += 512)
{
- fail("Corollary 1 violated " + number + " + " + nextInt + " < " + number);
+ assertTrue(Serial.gt(s+n, s));
+ assertTrue(Serial.lt(s, s+n));
}
- else if( res == 0 && nextInt > 0)
+
+ s += 1024;
+
+ if (s == 0)
{
- fail("Corollary 1 violated " + number + " + " + nextInt + " = " + number);
+ wrapcount += 1;
}
}
+
+ assertTrue(wrapcount > 0);
}
-
}