diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2008-05-22 21:43:13 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2008-05-22 21:43:13 +0000 |
| commit | 40c23abab70a4c1a5f67e42517323a7964324917 (patch) | |
| tree | 0632eb5c5cf9e8138de3f7fab2fd2cb0d7e090e3 /java | |
| parent | 1829eedb1e82a0777a6ea8a180aacef4d89a59f0 (diff) | |
| download | qpid-python-40c23abab70a4c1a5f67e42517323a7964324917.tar.gz | |
Made Range, RangeSet, and Session all use proper RFC1982 comparisons per QPID-861. Also switched command ids from long -> int, and added a mutex to channel to prevent multi-frame commands from interleaving when invoked from separate threads.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@659271 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
22 files changed, 183 insertions, 256 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index d071bcf0c2..202bd90991 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -173,7 +173,7 @@ public class AMQSession_0_10 extends AMQSession { if( messageTag <= deliveryTag ) { - ranges.add(messageTag); + ranges.add((int) (long) messageTag); _unacknowledgedMessageTags.remove(messageTag); } } @@ -182,7 +182,7 @@ public class AMQSession_0_10 extends AMQSession } else { - ranges.add(deliveryTag); + ranges.add((int) deliveryTag); _unacknowledgedMessageTags.remove(deliveryTag); } getQpidSession().messageAcknowledge(ranges); @@ -287,7 +287,7 @@ public class AMQSession_0_10 extends AMQSession { break; } - ranges.add(tag); + ranges.add((int) (long) tag); } getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); // We need to sync so that we get notify of an error. @@ -311,7 +311,7 @@ public class AMQSession_0_10 extends AMQSession break; } - ranges.add(tag); + ranges.add((int) (long) tag); } getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); } @@ -326,7 +326,7 @@ public class AMQSession_0_10 extends AMQSession { // The value of requeue is always true RangeSet ranges = new RangeSet(); - ranges.add(deliveryTag); + ranges.add((int) deliveryTag); getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED); //I don't think we need to sync } diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index fcc21428e9..f050cbe455 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -358,7 +358,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By if (!_preAcquire) { RangeSet ranges = new RangeSet(); - ranges.add(message.getDeliveryTag()); + ranges.add((int) message.getDeliveryTag()); _0_10session.getQpidSession().messageAcknowledge(ranges); _0_10session.getCurrentException(); } @@ -375,7 +375,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By if (_preAcquire) { RangeSet ranges = new RangeSet(); - ranges.add(message.getDeliveryTag()); + ranges.add((int) message.getDeliveryTag()); _0_10session.getQpidSession().messageRelease(ranges); _0_10session.getCurrentException(); } @@ -394,7 +394,7 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By if (!_preAcquire) { RangeSet ranges = new RangeSet(); - ranges.add(message.getDeliveryTag()); + ranges.add((int) message.getDeliveryTag()); Acquired acq = _0_10session.getQpidSession().messageAcquire(ranges).get(); diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java b/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java index ecdd2d7952..19e12adc3c 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java @@ -39,7 +39,7 @@ public interface MessagePartListener * * @param transferId */ - public void messageTransfer(long transferId); + public void messageTransfer(int transferId); /** * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties} diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java index 56443e2aeb..833f905b58 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java @@ -27,7 +27,7 @@ public class ByteBufferMessage implements Message private int _dataSize; private DeliveryProperties _currentDeliveryProps; private MessageProperties _currentMessageProps; - private long _transferId; + private int _transferId; private Header _header; public void setHeader(Header header) { @@ -44,12 +44,12 @@ public class ByteBufferMessage implements Message _currentMessageProps = new MessageProperties(); } - public ByteBufferMessage(long transferId) + public ByteBufferMessage(int transferId) { _transferId = transferId; } - public long getMessageTransferId() + public int getMessageTransferId() { return _transferId; } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java index 308a16ce36..289d03574d 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java @@ -89,7 +89,7 @@ public class FileMessage extends ReadOnlyMessage implements Message * does not have a transfer id. Hence this method is not * applicable to this implementation. */ - public long getMessageTransferId() + public int getMessageTransferId() { throw new UnsupportedOperationException(); } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java index 885841bc2a..757d44fbbb 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java @@ -26,7 +26,7 @@ public class MessagePartListenerAdapter implements MessagePartListener _adaptee = listener; } - public void messageTransfer(long transferId) + public void messageTransfer(int transferId) { _currentMsg = new ByteBufferMessage(transferId); } diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java index fd3e812cbc..6c7f9e9db7 100644 --- a/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java +++ b/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java @@ -61,7 +61,7 @@ public class StreamingMessage extends ReadOnlyMessage implements Message * does not have a transfer id. Hence this method is not * applicable to this implementation. */ - public long getMessageTransferId() + public int getMessageTransferId() { throw new UnsupportedOperationException(); } diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java index a49765baaf..74c0098d72 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java @@ -199,7 +199,7 @@ public class ChannelCloseOkTest extends QpidTestCase private void waitFor(List<Message> received, int count) throws InterruptedException { - long timeout = 6000; + long timeout = 20000; synchronized (received) { diff --git a/java/common/src/main/java/org/apache/qpid/util/Serial.java b/java/common/src/main/java/org/apache/qpid/util/Serial.java index c828290644..cfc97c3d04 100644 --- a/java/common/src/main/java/org/apache/qpid/util/Serial.java +++ b/java/common/src/main/java/org/apache/qpid/util/Serial.java @@ -3,119 +3,75 @@ package org.apache.qpid.util; import org.apache.qpid.SerialException; /** - * This class provides basic - * serial number arithmetic as defined in RFC 1982. + * This class provides basic serial number comparisons as defined in + * RFC 1982. */ public class Serial { - private long _maxIncrement; - private long _max; - private long _maxComparison; - - public Serial(long serialbits) - { - if( serialbits < 2) - { - throw new IllegalArgumentException("Meaningful serial number space has SERIAL_BITS >= 2, wrong value " - + serialbits); - } - _max = (long) Math.pow(2.0 , serialbits) - 1; - _maxIncrement = (long) Math.pow(2.0, serialbits - 1) - 1; - _maxComparison = (long) Math.pow(2.0, serialbits -1); - } /** * Compares two numbers using serial arithmetic. * - * @param serial1 The first serial number - * @param serial2 The second serial number - * @return 0 if the 2 serials numbers are equal, a positive number if serial1 is greater - * than serial2, and a negative number if serial2 is greater than serial1. - * @throws IllegalArgumentException serial1 or serial2 is out of range - * @throws SerialException serial1 and serial2 are not comparable. + * @param s1 the first serial number + * @param s2 the second serial number + * + * @return a negative integer, zero, or a positive integer as the + * first argument is less than, equal to, or greater than the + * second */ - public int compare(long serial1, long serial2) throws IllegalArgumentException, SerialException + public static final int compare(int s1, int s2) { - int result; - if (serial1 < 0 || serial1 > _max) - { - throw new IllegalArgumentException(serial1 + " out of range"); - } - if (serial2 < 0 || serial2 > _max) - { - throw new IllegalArgumentException(serial2 + " out of range"); - } - double diff; - if( serial1 < serial2 ) - { - diff = serial2 - serial1; - if( diff < _maxComparison ) - { - result = -1; - } - else if ( diff > _maxComparison ) - { - result = 1; - } - else - { - throw new SerialException("Cannot compare " + serial1 + " and " + serial2); - } - } - else if( serial1 > serial2 ) + return s1 - s2; + } + + public static final boolean lt(int s1, int s2) + { + return compare(s1, s2) < 0; + } + + public static final boolean le(int s1, int s2) + { + return compare(s1, s2) <= 0; + } + + public static final boolean gt(int s1, int s2) + { + return compare(s1, s2) > 0; + } + + public static final boolean ge(int s1, int s2) + { + return compare(s1, s2) >= 0; + } + + public static final boolean eq(int s1, int s2) + { + return s1 == s2; + } + + public static final int min(int s1, int s2) + { + if (lt(s1, s2)) { - diff = serial1 - serial2; - if( diff > _maxComparison ) - { - result = -1; - } - else if( diff < _maxComparison ) - { - result = 1; - } - else - { - throw new SerialException("Cannot compare " + serial1 + " and " + serial2); - } + return s1; } else { - result = 0; + return s2; } - return result; } - - /** - * Increments a serial numbers by the addition of a positive integer n, - * Serial numbers may be incremented by the addition of a positive - * integer n, where n is taken from the range of integers - * [0 .. (2^(SERIAL_BITS - 1) - 1)]. For a sequence number s, the - * result of such an addition, s', is defined as - * s' = (s + n) modulo (2 ^ SERIAL_BITS) - * @param serial The serila number to be incremented - * @param n The integer to be added to the serial number - * @return The incremented serial number - * @throws IllegalArgumentException serial number or n is out of range - */ - public long increment(long serial, long n) throws IllegalArgumentException + public static final int max(int s1, int s2) { - if (serial < 0 || serial > _max) - { - throw new IllegalArgumentException("Serial number: " + serial + " is out of range"); - } - if( n < 0 || n > _maxIncrement ) + if (gt(s1, s2)) { - throw new IllegalArgumentException("Increment: " + n + " is out of range"); + return s1; } - long result = serial + n; - // apply modulo (2 ^ SERIAL_BITS) - if(result > _max) + else { - result = result - _max - 1; + return s2; } - return result; } } diff --git a/java/common/src/main/java/org/apache/qpidity/api/Message.java b/java/common/src/main/java/org/apache/qpidity/api/Message.java index d4b8343ba5..f5488fde52 100644 --- a/java/common/src/main/java/org/apache/qpidity/api/Message.java +++ b/java/common/src/main/java/org/apache/qpidity/api/Message.java @@ -121,6 +121,6 @@ public interface Message * * @return the message transfer id. */ - public long getMessageTransferId(); + public int getMessageTransferId(); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java index 651402bcb7..ad727676c4 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Channel.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Channel.java @@ -20,13 +20,15 @@ */ package org.apache.qpidity.transport; +import org.apache.qpidity.transport.network.Frame; import org.apache.qpidity.transport.util.Logger; import java.nio.ByteBuffer; -import java.util.List; import java.util.ArrayList; - +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static org.apache.qpidity.transport.network.Frame.*; import static org.apache.qpidity.transport.util.Functions.*; @@ -51,6 +53,7 @@ public class Channel extends Invoker // session may be null private Session session; + private Lock commandLock = new ReentrantLock(); private boolean first = true; private ByteBuffer data = null; @@ -156,7 +159,17 @@ public class Channel extends Invoker public void method(Method m) { + if (m.getEncodedTrack() == Frame.L4) + { + commandLock.lock(); + } + emit(m); + + if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload()) + { + commandLock.unlock(); + } } public void header(Header header) @@ -190,6 +203,7 @@ public class Channel extends Invoker emit(new Data(data, first, true)); first = true; data = null; + commandLock.unlock(); } protected void invoke(Method m) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java index 56b927f802..7d707ce17b 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Connection.java @@ -77,14 +77,14 @@ public class Connection public void received(ConnectionEvent event) { - log.debug("RECV: %s", event); + log.debug("RECV: [%s] %s", this, event); Channel channel = getChannel(event.getChannel()); channel.received(event.getProtocolEvent()); } public void send(ConnectionEvent event) { - log.debug("SEND: %s", event); + log.debug("SEND: [%s] %s", this, event); sender.send(event); } @@ -135,4 +135,9 @@ public class Connection sender.close(); } + public String toString() + { + return String.format("conn:%x", System.identityHashCode(this)); + } + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Method.java b/java/common/src/main/java/org/apache/qpidity/transport/Method.java index bc4ec6289d..f2063fe964 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Method.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Method.java @@ -39,15 +39,15 @@ public abstract class Method extends Struct implements ProtocolEvent } // XXX: command subclass? - private long id; + private int id; private boolean sync = false; - public final long getId() + public final int getId() { return id; } - void setId(long id) + void setId(int id) { this.id = id; } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java index 4d1b83d43e..59fc3d2552 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/ProtocolError.java @@ -64,4 +64,9 @@ public class ProtocolError implements NetworkEvent, ProtocolEvent delegate.error(this); } + public String toString() + { + return String.format("protocol error: %s", getMessage()); + } + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Range.java b/java/common/src/main/java/org/apache/qpidity/transport/Range.java index ed745bf5ec..827eade9f8 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Range.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Range.java @@ -20,7 +20,7 @@ */ package org.apache.qpidity.transport; -import static java.lang.Math.*; +import static org.apache.qpid.util.Serial.*; /** @@ -31,28 +31,28 @@ import static java.lang.Math.*; public class Range { - private final long lower; - private final long upper; + private final int lower; + private final int upper; - public Range(long lower, long upper) + public Range(int lower, int upper) { this.lower = lower; this.upper = upper; } - public long getLower() + public int getLower() { return lower; } - public long getUpper() + public int getUpper() { return upper; } - public boolean includes(long value) + public boolean includes(int value) { - return lower <= value && value <= upper; + return le(lower, value) && le(value, upper); } public boolean includes(Range range) diff --git a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java index ccbcfa99d0..5097c849ee 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/RangeSet.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.ListIterator; import java.util.LinkedList; +import static org.apache.qpid.util.Serial.*; /** * RangeSet @@ -72,7 +73,7 @@ public class RangeSet implements Iterable<Range> it.remove(); range = range.span(next); } - else if (range.getUpper() < next.getLower()) + else if (lt(range.getUpper(), next.getLower())) { it.previous(); it.add(range); @@ -83,12 +84,12 @@ public class RangeSet implements Iterable<Range> it.add(range); } - public void add(long lower, long upper) + public void add(int lower, int upper) { add(new Range(lower, upper)); } - public void add(long value) + public void add(int value) { add(value, value); } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/Session.java b/java/common/src/main/java/org/apache/qpidity/transport/Session.java index f6f4062c6d..a61f5c2aa3 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/Session.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/Session.java @@ -33,6 +33,8 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.qpidity.transport.Option.*; +import static org.apache.qpidity.transport.util.Functions.*; +import static org.apache.qpid.util.Serial.*; /** * Session @@ -42,6 +44,11 @@ import static org.apache.qpidity.transport.Option.*; public class Session extends Invoker { + + private static final Logger log = Logger.get(Session.class); + + private static boolean ENABLE_REPLAY = false; + static { String enableReplay = "enable_command_replay"; @@ -54,8 +61,6 @@ public class Session extends Invoker ENABLE_REPLAY = false; } } - private static boolean ENABLE_REPLAY = false; - private static final Logger log = Logger.get(Session.class); private byte[] name; private long timeout = 60000; @@ -64,15 +69,15 @@ public class Session extends Invoker Channel channel; // incoming command count - long commandsIn = 0; + int commandsIn = 0; // completed incoming commands private final RangeSet processed = new RangeSet(); private Range syncPoint = null; // outgoing command count - private long commandsOut = 0; - private Map<Long,Method> commands = new HashMap<Long,Method>(); - private long maxComplete = -1; + private int commandsOut = 0; + private Map<Integer,Method> commands = new HashMap<Integer,Method>(); + private int maxComplete = commandsOut - 1; private AtomicBoolean closed = new AtomicBoolean(false); @@ -86,22 +91,22 @@ public class Session extends Invoker return name; } - public Map<Long,Method> getOutstandingCommands() + public Map<Integer,Method> getOutstandingCommands() { return commands; } - public long getCommandsOut() + public int getCommandsOut() { return commandsOut; } - public long getCommandsIn() + public int getCommandsIn() { return commandsIn; } - public long nextCommandId() + public int nextCommandId() { return commandsIn++; } @@ -116,12 +121,12 @@ public class Session extends Invoker processed(command.getId()); } - public void processed(long command) + public void processed(int command) { processed(new Range(command, command)); } - public void processed(long lower, long upper) + public void processed(int lower, int upper) { processed(new Range(lower, upper)); @@ -155,7 +160,7 @@ public class Session extends Invoker void syncPoint() { - long id = getCommandsIn() - 1; + int id = getCommandsIn() - 1; log.debug("%s synced to %d", this, id); Range range = new Range(0, id - 1); boolean flush; @@ -179,7 +184,7 @@ public class Session extends Invoker channel.setSession(this); } - public Method getCommand(long id) + public Method getCommand(int id) { synchronized (commands) { @@ -187,18 +192,18 @@ public class Session extends Invoker } } - void complete(long lower, long upper) + void complete(int lower, int upper) { log.debug("%s complete(%d, %d)", this, lower, upper); synchronized (commands) { - for (long id = maxComplete; id <= upper; id++) + for (int id = max(maxComplete, lower); le(id, upper); id++) { commands.remove(id); } - if (lower <= maxComplete + 1) + if (le(lower, maxComplete + 1)) { - maxComplete = Math.max(maxComplete, upper); + maxComplete = max(maxComplete, upper); } commands.notifyAll(); log.debug("%s commands remaining: %s", this, commands); @@ -211,7 +216,7 @@ public class Session extends Invoker { synchronized (commands) { - long next = commandsOut++; + int next = commandsOut++; if (next == 0) { sessionCommandPoint(0, 0); @@ -276,9 +281,9 @@ public class Session extends Invoker log.debug("%s sync()", this); synchronized (commands) { - long point = commandsOut - 1; + int point = commandsOut - 1; - if (maxComplete < point) + if (lt(maxComplete, point)) { ExecutionSync sync = new ExecutionSync(); sync.setSync(true); @@ -287,7 +292,7 @@ public class Session extends Invoker long start = System.currentTimeMillis(); long elapsed = 0; - while (!closed.get() && elapsed < timeout && maxComplete < point) + while (!closed.get() && elapsed < timeout && lt(maxComplete, point)) { try { log.debug("%s waiting for[%d]: %d, %s", this, point, @@ -301,7 +306,7 @@ public class Session extends Invoker } } - if (maxComplete < point) + if (lt(maxComplete, point)) { if (closed.get()) { @@ -315,10 +320,10 @@ public class Session extends Invoker } } - private Map<Long,ResultFuture<?>> results = - new HashMap<Long,ResultFuture<?>>(); + private Map<Integer,ResultFuture<?>> results = + new HashMap<Integer,ResultFuture<?>>(); - void result(long command, Struct result) + void result(int command, Struct result) { ResultFuture<?> future; synchronized (results) @@ -332,7 +337,7 @@ public class Session extends Invoker { synchronized (commands) { - long command = commandsOut; + int command = commandsOut; ResultFuture<T> future = new ResultFuture<T>(klass); synchronized (results) { @@ -446,4 +451,9 @@ public class Session extends Invoker } } + public String toString() + { + return String.format("ssn:%s", str(name)); + } + } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java index a4c46be89c..7b680171da 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java @@ -208,7 +208,7 @@ abstract class AbstractDecoder implements Decoder RangeSet ranges = new RangeSet(); for (int i = 0; i < count; i++) { - ranges.add(readUint32(), readUint32()); + ranges.add(readSequenceNo(), readSequenceNo()); } return ranges; } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java index f68884f812..3596c48956 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java @@ -246,8 +246,8 @@ abstract class AbstractEncoder implements Encoder writeUint16(ranges.size() * 8); for (Range range : ranges) { - writeUint32(range.getLower()); - writeUint32(range.getUpper()); + writeSequenceNo(range.getLower()); + writeSequenceNo(range.getUpper()); } } } diff --git a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java index c5fb7b9986..8bbbf08b22 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java @@ -150,7 +150,7 @@ public class Frame implements NetworkEvent, Iterable<ByteBuffer> { StringBuilder str = new StringBuilder(); str.append(String.format - ("[%05d %05d %1d %1d %d%d%d%d] ", getChannel(), getSize(), + ("[%05d %05d %1d %s %d%d%d%d] ", getChannel(), getSize(), getTrack(), getType(), isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0, isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0)); diff --git a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java index 14e756c047..a1d74a51ce 100644 --- a/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java +++ b/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java @@ -73,4 +73,14 @@ public class Functions return str.toString(); } + public static final String str(byte[] bytes) + { + return str(ByteBuffer.wrap(bytes)); + } + + public static final String str(byte[] bytes, int limit) + { + return str(ByteBuffer.wrap(bytes), limit); + } + } diff --git a/java/common/src/test/java/org/apache/qpid/util/SerialTest.java b/java/common/src/test/java/org/apache/qpid/util/SerialTest.java index 312b1ab9ce..d3ab817b5d 100644 --- a/java/common/src/test/java/org/apache/qpid/util/SerialTest.java +++ b/java/common/src/test/java/org/apache/qpid/util/SerialTest.java @@ -13,84 +13,18 @@ public class SerialTest extends TestCase { /** - * The simplest meaningful serial number space has SERIAL_BITS == 2. In - * this space, the integers that make up the serial number space are 0, - * 1, 2, and 3. That is, 3 == 2^SERIAL_BITS - 1. - * - * In this space, the largest integer that it is meaningful to add to a - * sequence number is 2^(SERIAL_BITS - 1) - 1, or 1. - * - * Then, as defined 0+1 == 1, 1+1 == 2, 2+1 == 3, and 3+1 == 0. - * Further, 1 > 0, 2 > 1, 3 > 2, and 0 > 3. It is undefined whether - * 2 > 0 or 0 > 2, and whether 1 > 3 or 3 > 1. + * Test the key boundaries where wraparound occurs. */ - public void testTrivialSample() + public void testBoundaries() { - Serial serial = new Serial(2); - assertEquals( serial.increment(0, 1), 1); - assertEquals( serial.increment(1, 1), 2); - assertEquals( serial.increment(2, 1), 3); - assertEquals( serial.increment(3, 1), 0); - try - { - serial.increment(4, 1); - fail("IllegalArgumentException was not trhown"); - } - catch (IllegalArgumentException e) - { - // expected - } - try - { - assertTrue( serial.compare(1, 0) > 0); - assertTrue( serial.compare(2, 1) > 0); - assertTrue( serial.compare(3, 2) > 0); - assertTrue( serial.compare(0, 3) > 0); - assertTrue( serial.compare(0, 1) < 0); - assertTrue( serial.compare(1, 2) < 0); - assertTrue( serial.compare(2, 3) < 0); - assertTrue( serial.compare(3, 0) < 0); - } - catch (SerialException e) - { - fail("Unexpected exception " + e); - } - try - { - serial.compare(2, 0); - fail("AMQSerialException not thrown as expected"); - } - catch (SerialException e) - { - // expected - } - try - { - serial.compare(0, 2); - fail("AMQSerialException not thrown as expected"); - } - catch (SerialException e) - { - // expected - } - try - { - serial.compare(3, 1); - fail("AMQSerialException not thrown as expected"); - } - catch (SerialException e) - { - // expected - } - try - { - serial.compare(3, 1); - fail("AMQSerialException not thrown as expected"); - } - catch (SerialException e) - { - // expected - } + assertTrue(Serial.gt(1, 0)); + assertTrue(Serial.lt(0, 1)); + + assertTrue(Serial.gt(Integer.MAX_VALUE+1, Integer.MAX_VALUE)); + assertTrue(Serial.lt(Integer.MAX_VALUE, Integer.MAX_VALUE+1)); + + assertTrue(Serial.gt(0xFFFFFFFF + 1, 0xFFFFFFFF)); + assertTrue(Serial.lt(0xFFFFFFFF, 0xFFFFFFFF + 1)); } /** @@ -98,38 +32,30 @@ public class SerialTest extends TestCase * For any sequence number s and any integer n such that addition of n * to s is well defined, (s + n) >= s. Further (s + n) == s only when * n == 0, in all other defined cases, (s + n) > s. - * strategy: - * Create a serial number with 32 bits and check in a loop that adding integers - * respect the corollary */ public void testCorollary1() { - Serial serial = new Serial(32); - Random random = new Random(); - long number = random.nextInt((int) Math.pow(2.0 , 32.0) - 1); - for(int i = 1; i<= 10000; i++ ) + int wrapcount = 0; + + int s = 0; + + for (int i = 0; i < 67108664; i++) { - long nextInt = random.nextInt((int) Math.pow(2.0 , 32.0) - 1); - long inc = serial.increment(number, nextInt); - int res =0; - try - { - res=serial.compare(inc, number); - } - catch (SerialException e) - { - fail("un-expected exception " + e); - } - if( res < 1 ) + for (int n = 1; n < 4096; n += 512) { - fail("Corollary 1 violated " + number + " + " + nextInt + " < " + number); + assertTrue(Serial.gt(s+n, s)); + assertTrue(Serial.lt(s, s+n)); } - else if( res == 0 && nextInt > 0) + + s += 1024; + + if (s == 0) { - fail("Corollary 1 violated " + number + " + " + nextInt + " = " + number); + wrapcount += 1; } } + + assertTrue(wrapcount > 0); } - } |
